const mysql = require('mysql2/promise'); const { Client } = require('ssh2'); const dotenv = require('dotenv'); const path = require('path'); dotenv.config({ path: path.join(__dirname, '../.env') }); // SSH configuration const sshConfig = { host: process.env.PROD_SSH_HOST, port: process.env.PROD_SSH_PORT || 22, username: process.env.PROD_SSH_USER, privateKey: process.env.PROD_SSH_KEY_PATH ? require('fs').readFileSync(process.env.PROD_SSH_KEY_PATH) : undefined }; // Production database configuration const prodDbConfig = { host: process.env.PROD_DB_HOST || 'localhost', user: process.env.PROD_DB_USER, password: process.env.PROD_DB_PASSWORD, database: process.env.PROD_DB_NAME, port: process.env.PROD_DB_PORT || 3306 }; // Local database configuration const localDbConfig = { host: process.env.DB_HOST, user: process.env.DB_USER, password: process.env.DB_PASSWORD, database: process.env.DB_NAME, multipleStatements: true, waitForConnections: true, connectionLimit: 10, queueLimit: 0, namedPlaceholders: true }; // Helper function to output progress function outputProgress(data) { process.stdout.write(JSON.stringify(data) + '\n'); } // Helper function to format duration function formatDuration(seconds) { const hours = Math.floor(seconds / 3600); const minutes = Math.floor((seconds % 3600) / 60); seconds = Math.floor(seconds % 60); const parts = []; if (hours > 0) parts.push(`${hours}h`); if (minutes > 0) parts.push(`${minutes}m`); if (seconds > 0 || parts.length === 0) parts.push(`${seconds}s`); return parts.join(' '); } // Helper function to update progress with time estimate function updateProgress(current, total, operation, startTime) { const elapsed = (Date.now() - startTime) / 1000; const rate = current / elapsed; const remaining = (total - current) / rate; outputProgress({ status: 'running', operation, current, total, rate, elapsed: formatDuration(elapsed), remaining: formatDuration(remaining), percentage: ((current / total) * 100).toFixed(1) }); } async function setupSshTunnel() { return new Promise((resolve, reject) => { const ssh = new Client(); ssh.on('ready', () => { ssh.forwardOut( '127.0.0.1', 0, prodDbConfig.host, prodDbConfig.port, async (err, stream) => { if (err) reject(err); resolve({ ssh, stream }); } ); }).connect(sshConfig); }); } async function importCategories(prodConnection, localConnection) { outputProgress({ operation: 'Starting categories import', status: 'running' }); const startTime = Date.now(); try { // Get only categories that are associated with products we're importing const [rows] = await prodConnection.query(` SELECT DISTINCT pc.cat_id as id, pc.name, pc.type, pc.master_cat_id as parent_id, pc.combined_name as description, 'active' as status FROM product_categories pc INNER JOIN product_category_index pci ON pc.cat_id = pci.cat_id INNER JOIN products p ON pci.pid = p.pid WHERE pc.hidden = 0 AND p.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) ORDER BY pc.type, pc.cat_id `); let current = 0; const total = rows.length; // Process in batches const BATCH_SIZE = 100; for (let i = 0; i < rows.length; i += BATCH_SIZE) { const batch = rows.slice(i, i + BATCH_SIZE); // Create placeholders for batch insert const placeholders = batch.map(() => '(?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)' ).join(','); // Flatten values for batch insert const values = batch.flatMap(row => [ row.id, row.name, row.type, row.parent_id, row.description, row.status ]); await localConnection.query(` INSERT INTO categories (id, name, type, parent_id, description, status, created_at, updated_at) VALUES ${placeholders} ON DUPLICATE KEY UPDATE name = VALUES(name), type = VALUES(type), parent_id = VALUES(parent_id), description = VALUES(description), status = VALUES(status), updated_at = CURRENT_TIMESTAMP `, values); current += batch.length; updateProgress(current, total, 'Categories import', startTime); } outputProgress({ status: 'complete', operation: 'Categories import completed', current: total, total, duration: formatDuration((Date.now() - startTime) / 1000) }); } catch (error) { console.error('Error importing categories:', error); throw error; } } async function importProducts(prodConnection, localConnection) { outputProgress({ operation: 'Starting products import', status: 'running' }); const startTime = Date.now(); try { // Get products from production with all required fields const [rows] = await prodConnection.query(` SELECT p.pid AS product_id, p.description AS title, p.notes AS description, p.itemnumber AS SKU, p.date_created AS created_at, p.datein AS first_received, COALESCE(( SELECT i.available_local - COALESCE( ( SELECT SUM(oi.qty_ordered - oi.qty_placed) FROM order_items oi JOIN _order o ON oi.order_id = o.order_id WHERE oi.prod_pid = i.pid AND o.date_placed != '0000-00-00 00:00:00' AND o.date_shipped = '0000-00-00 00:00:00' AND oi.pick_finished = 0 AND oi.qty_back = 0 AND o.order_status != 15 AND o.order_status < 90 AND oi.qty_ordered >= oi.qty_placed AND oi.qty_ordered > 0 ), 0 ) FROM shop_inventory i WHERE i.pid = p.pid AND i.store = 0 AND i.show + i.buyable > 0 LIMIT 1 ), 0) AS stock_quantity, COALESCE(( SELECT price_each FROM product_current_prices WHERE pid = p.pid AND active = 1 ORDER BY qty_buy ASC LIMIT 1 ), 0) AS price, COALESCE(p.sellingprice, 0) AS regular_price, COALESCE(( SELECT ROUND(AVG(costeach), 5) FROM product_inventory WHERE pid = p.pid AND COUNT > 0 ), NULL) AS cost_price, NULL AS landing_cost_price, p.upc AS barcode, p.harmonized_tariff_code, p.stamp AS updated_at, CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible, 1 AS managing_stock, CASE WHEN p.reorder = 127 THEN 1 WHEN p.reorder = 0 THEN 1 ELSE 0 END AS replenishable, s.companyname AS vendor, sid.supplier_itemnumber AS vendor_reference, sid.notions_itemnumber AS notions_reference, CONCAT('https://www.acherryontop.com/shop/product/', p.pid) AS permalink, ( SELECT CONCAT('https://sbing.com/i/products/0000/', SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/', p.pid, '-t-', PI.iid, '.jpg') FROM product_images PI WHERE PI.pid = p.pid AND PI.hidden = 0 ORDER BY PI.order DESC, PI.iid LIMIT 1 ) AS image, ( SELECT CONCAT('https://sbing.com/i/products/0000/', SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/', p.pid, '-175x175-', PI.iid, '.jpg') FROM product_images PI WHERE PI.pid = p.pid AND PI.hidden = 0 AND PI.width = 175 ORDER BY PI.order DESC, PI.iid LIMIT 1 ) AS image_175, ( SELECT CONCAT('https://sbing.com/i/products/0000/', SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/', p.pid, '-o-', PI.iid, '.jpg') FROM product_images PI WHERE PI.pid = p.pid AND PI.hidden = 0 ORDER BY PI.width DESC, PI.height DESC, PI.iid LIMIT 1 ) AS image_full, ( SELECT name FROM product_categories WHERE cat_id = p.company ) AS brand, ( SELECT name FROM product_categories WHERE cat_id = p.line ) AS line, ( SELECT name FROM product_categories WHERE cat_id = p.subline ) AS subline, ( SELECT name FROM product_categories WHERE cat_id = p.artist ) AS artist, NULL AS options, NULL AS tags, COALESCE( CASE WHEN sid.supplier_id = 92 THEN sid.notions_qty_per_unit ELSE sid.supplier_qty_per_unit END, sid.notions_qty_per_unit, 1 ) AS moq, 1 AS uom, p.rating, p.rating_votes AS reviews, p.weight, p.length, p.width, p.height, p.country_of_origin, CONCAT_WS('-', NULLIF(p.aisle, ''), NULLIF(p.rack, ''), NULLIF(p.hook, '')) AS location, p.totalsold AS total_sold, ( SELECT COUNT(*) FROM mybasket mb WHERE mb.item = p.pid AND mb.qty > 0 ) AS baskets, ( SELECT COUNT(*) FROM product_notify pn WHERE pn.pid = p.pid ) AS notifies, pls.date_sold as date_last_sold FROM products p LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0 LEFT JOIN supplier_item_data sid ON p.pid = sid.pid LEFT JOIN suppliers s ON sid.supplier_id = s.supplierid LEFT JOIN product_last_sold pls ON p.pid = pls.pid GROUP BY p.pid `); let current = 0; const total = rows.length; // Process in batches const BATCH_SIZE = 100; for (let i = 0; i < rows.length; i += BATCH_SIZE) { const batch = rows.slice(i, i + BATCH_SIZE); // Create placeholders for batch insert const placeholders = batch.map(() => '(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)' ).join(','); // Flatten values for batch insert const values = batch.flatMap(row => [ row.product_id, row.title, row.description, row.SKU, row.created_at, row.first_received, row.stock_quantity, row.price, row.regular_price, row.cost_price, row.landing_cost_price, row.barcode, row.harmonized_tariff_code, row.updated_at, row.visible, row.managing_stock, row.replenishable, row.vendor, row.vendor_reference, row.notions_reference, row.permalink, null, // categories - handled separately row.image, row.image_175, row.image_full, row.brand, row.line, row.subline, row.artist, row.options, row.tags, row.moq, row.uom, row.rating, row.reviews, row.weight, row.length, row.width, row.height, row.country_of_origin, row.location, row.total_sold, row.baskets, row.notifies, row.date_last_sold ]); await localConnection.query(` INSERT INTO products VALUES ${placeholders} ON DUPLICATE KEY UPDATE title = VALUES(title), description = VALUES(description), stock_quantity = VALUES(stock_quantity), price = VALUES(price), regular_price = VALUES(regular_price), cost_price = VALUES(cost_price), landing_cost_price = VALUES(landing_cost_price), barcode = VALUES(barcode), harmonized_tariff_code = VALUES(harmonized_tariff_code), updated_at = VALUES(updated_at), visible = VALUES(visible), managing_stock = VALUES(managing_stock), replenishable = VALUES(replenishable), vendor = VALUES(vendor), vendor_reference = VALUES(vendor_reference), notions_reference = VALUES(notions_reference), permalink = VALUES(permalink), image = VALUES(image), image_175 = VALUES(image_175), image_full = VALUES(image_full), brand = VALUES(brand), line = VALUES(line), subline = VALUES(subline), artist = VALUES(artist), options = VALUES(options), tags = VALUES(tags), moq = VALUES(moq), uom = VALUES(uom), rating = VALUES(rating), reviews = VALUES(reviews), weight = VALUES(weight), length = VALUES(length), width = VALUES(width), height = VALUES(height), country_of_origin = VALUES(country_of_origin), location = VALUES(location), total_sold = VALUES(total_sold), baskets = VALUES(baskets), notifies = VALUES(notifies), date_last_sold = VALUES(date_last_sold) `, values); current += batch.length; updateProgress(current, total, 'Products import', startTime); } outputProgress({ status: 'complete', operation: 'Products import completed', current: total, total, duration: formatDuration((Date.now() - startTime) / 1000) }); } catch (error) { console.error('Error importing products:', error); throw error; } } async function importProductCategories(prodConnection, localConnection) { outputProgress({ operation: 'Starting product categories import', status: 'running' }); const startTime = Date.now(); try { // Get product category relationships from production const [rows] = await prodConnection.query(` SELECT DISTINCT pci.pid as product_id, pci.cat_id as category_id FROM product_category_index pci JOIN product_categories pc ON pci.cat_id = pc.cat_id WHERE pc.hidden = 0 `); let current = 0; const total = rows.length; // Process in batches const BATCH_SIZE = 500; for (let i = 0; i < rows.length; i += BATCH_SIZE) { const batch = rows.slice(i, i + BATCH_SIZE); // Create placeholders for batch insert const placeholders = batch.map(() => '(?, ?)').join(','); // Flatten values for batch insert const values = batch.flatMap(row => [row.product_id, row.category_id]); await localConnection.query(` INSERT INTO product_categories (product_id, category_id) VALUES ${placeholders} ON DUPLICATE KEY UPDATE category_id = VALUES(category_id) `, values); current += batch.length; updateProgress(current, total, 'Product categories import', startTime); } outputProgress({ status: 'complete', operation: 'Product categories import completed', current: total, total, duration: formatDuration((Date.now() - startTime) / 1000) }); } catch (error) { console.error('Error importing product categories:', error); throw error; } } async function importOrders(prodConnection, localConnection) { outputProgress({ operation: 'Starting orders import', status: 'running' }); const startTime = Date.now(); try { // Get orders from production const [rows] = await prodConnection.query(` SELECT oi.order_id AS order_number, oi.prod_pid AS product_id, oi.prod_itemnumber AS SKU, o.date_placed_onlydate AS date, oi.prod_price_reg AS price, oi.qty_ordered AS quantity, (oi.prod_price_reg - oi.prod_price) AS discount, COALESCE(( SELECT otp.item_taxes_to_collect FROM order_tax_info oti JOIN order_tax_info_products otp ON oti.taxinfo_id = otp.taxinfo_id WHERE oti.order_id = o.order_id AND otp.pid = oi.prod_pid ORDER BY oti.stamp DESC LIMIT 1 ), 0) AS tax, 0 AS tax_included, COALESCE(ROUND( ((o.summary_shipping - COALESCE(o.summary_discount_shipping, 0)) * (oi.prod_price * oi.qty_ordered) / NULLIF(o.summary_subtotal, 0)), 2 ), 0) as shipping, o.order_cid AS customer, CONCAT(COALESCE(u.firstname, ''), ' ', COALESCE(u.lastname, '')) AS customer_name, CASE WHEN o.order_status = 15 THEN 1 ELSE 0 END AS canceled FROM _order o JOIN order_items oi ON o.order_id = oi.order_id LEFT JOIN users u ON o.order_cid = u.cid WHERE o.order_status >= 15 AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) ORDER BY o.date_placed_onlydate DESC `); let current = 0; const total = rows.length; // Process in batches const BATCH_SIZE = 500; for (let i = 0; i < rows.length; i += BATCH_SIZE) { const batch = rows.slice(i, i + BATCH_SIZE); // Create placeholders for batch insert const placeholders = batch.map(() => '(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)' ).join(','); // Flatten values for batch insert const values = batch.flatMap(row => [ row.order_number, row.product_id, row.SKU, row.date, row.price, row.quantity, row.discount, row.tax, row.tax_included, row.shipping, row.customer, row.customer_name, row.canceled ]); await localConnection.query(` INSERT INTO orders ( order_number, product_id, SKU, date, price, quantity, discount, tax, tax_included, shipping, customer, customer_name, canceled ) VALUES ${placeholders} ON DUPLICATE KEY UPDATE price = VALUES(price), quantity = VALUES(quantity), discount = VALUES(discount), tax = VALUES(tax), tax_included = VALUES(tax_included), shipping = VALUES(shipping), customer_name = VALUES(customer_name), canceled = VALUES(canceled) `, values); current += batch.length; updateProgress(current, total, 'Orders import', startTime); } outputProgress({ status: 'complete', operation: 'Orders import completed', current: total, total, duration: formatDuration((Date.now() - startTime) / 1000) }); } catch (error) { console.error('Error importing orders:', error); throw error; } } async function importPurchaseOrders(prodConnection, localConnection) { outputProgress({ operation: 'Starting purchase orders import', status: 'running' }); const startTime = Date.now(); try { // Get purchase orders from production const [rows] = await prodConnection.query(` SELECT po.po_id, s.companyname as vendor, po.date_ordered as date, po.date_estin as expected_date, pop.pid as product_id, p.itemnumber as sku, COALESCE(rp.cost_each, pop.cost_each, 0) as cost_price, CASE WHEN po.status = 0 THEN 'canceled' WHEN po.status = 1 THEN 'created' WHEN po.status = 10 THEN 'electronically_ready_send' WHEN po.status = 11 THEN 'ordered' WHEN po.status = 12 THEN 'preordered' WHEN po.status = 13 THEN 'electronically_sent' WHEN po.status = 15 THEN 'receiving_started' WHEN po.status = 50 THEN 'closed' ELSE 'unknown' END as status, NULLIF(CONCAT_WS(' ', NULLIF(po.short_note, ''), NULLIF(po.notes, '')), '') as notes, pop.qty_each as ordered, COALESCE(rp.received_qty, 0) as received, rp.received_date, rp.received_by FROM po JOIN po_products pop ON po.po_id = pop.po_id JOIN products p ON pop.pid = p.pid JOIN suppliers s ON po.supplier_id = s.supplierid LEFT JOIN receivings r ON po.po_id = r.po_id LEFT JOIN receivings_products rp ON r.receiving_id = rp.receiving_id AND pop.pid = rp.pid WHERE po.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) ORDER BY po.date_ordered DESC `); let current = 0; const total = rows.length; // Process in batches const BATCH_SIZE = 500; for (let i = 0; i < rows.length; i += BATCH_SIZE) { const batch = rows.slice(i, i + BATCH_SIZE); // Create placeholders for batch insert const placeholders = batch.map(() => '(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)' ).join(','); // Flatten values for batch insert const values = batch.flatMap(row => [ row.po_id, row.vendor, row.date, row.expected_date, row.product_id, row.sku, row.cost_price, row.status, row.notes, row.ordered, row.received, row.received_date, row.received_by ]); await localConnection.query(` INSERT INTO purchase_orders ( po_id, vendor, date, expected_date, product_id, sku, cost_price, status, notes, ordered, received, received_date, received_by ) VALUES ${placeholders} ON DUPLICATE KEY UPDATE vendor = VALUES(vendor), expected_date = VALUES(expected_date), cost_price = VALUES(cost_price), status = VALUES(status), notes = VALUES(notes), ordered = VALUES(ordered), received = VALUES(received), received_date = VALUES(received_date), received_by = VALUES(received_by) `, values); current += batch.length; updateProgress(current, total, 'Purchase orders import', startTime); } outputProgress({ status: 'complete', operation: 'Purchase orders import completed', current: total, total, duration: formatDuration((Date.now() - startTime) / 1000) }); } catch (error) { console.error('Error importing purchase orders:', error); throw error; } } async function main() { let ssh; let prodConnection; let localConnection; try { outputProgress({ operation: 'Starting import process', message: 'Setting up connections...' }); // Set up SSH tunnel and production database connection const tunnel = await setupSshTunnel(); ssh = tunnel.ssh; prodConnection = await mysql.createConnection({ ...prodDbConfig, stream: tunnel.stream }); // Set up local database connection localConnection = await mysql.createPool(localDbConfig); // Import data await importCategories(prodConnection, localConnection); await importProducts(prodConnection, localConnection); await importProductCategories(prodConnection, localConnection); await importOrders(prodConnection, localConnection); await importPurchaseOrders(prodConnection, localConnection); outputProgress({ status: 'complete', operation: 'Import process completed', duration: formatDuration((Date.now() - startTime) / 1000) }); } catch (error) { console.error('Fatal error during import process:', error); outputProgress({ status: 'error', operation: 'Import process', error: error.message }); process.exit(1); } finally { if (prodConnection) await prodConnection.end(); if (localConnection) await localConnection.end(); if (ssh) ssh.end(); } } // Run the import main().catch(error => { console.error('Unhandled error in main process:', error); process.exit(1); });