const mysql = require('mysql2/promise'); const { Client } = require('ssh2'); const dotenv = require('dotenv'); const path = require('path'); dotenv.config({ path: path.join(__dirname, '../.env') }); // SSH configuration const sshConfig = { host: process.env.PROD_SSH_HOST, port: process.env.PROD_SSH_PORT || 22, username: process.env.PROD_SSH_USER, privateKey: process.env.PROD_SSH_KEY_PATH ? require('fs').readFileSync(process.env.PROD_SSH_KEY_PATH) : undefined }; // Production database configuration const prodDbConfig = { host: process.env.PROD_DB_HOST || 'localhost', user: process.env.PROD_DB_USER, password: process.env.PROD_DB_PASSWORD, database: process.env.PROD_DB_NAME, port: process.env.PROD_DB_PORT || 3306 }; // Local database configuration const localDbConfig = { host: process.env.DB_HOST, user: process.env.DB_USER, password: process.env.DB_PASSWORD, database: process.env.DB_NAME, multipleStatements: true, waitForConnections: true, connectionLimit: 10, queueLimit: 0, namedPlaceholders: true }; // Helper function to output progress function outputProgress(data) { process.stdout.write(JSON.stringify(data) + '\n'); } // Helper function to format duration function formatDuration(seconds) { const hours = Math.floor(seconds / 3600); const minutes = Math.floor((seconds % 3600) / 60); seconds = Math.floor(seconds % 60); const parts = []; if (hours > 0) parts.push(`${hours}h`); if (minutes > 0) parts.push(`${minutes}m`); if (seconds > 0 || parts.length === 0) parts.push(`${seconds}s`); return parts.join(' '); } // Helper function to update progress with time estimate function updateProgress(current, total, operation, startTime) { const elapsed = (Date.now() - startTime) / 1000; const rate = current / elapsed; const remaining = (total - current) / rate; outputProgress({ status: 'running', operation, current, total, rate, elapsed: formatDuration(elapsed), remaining: formatDuration(remaining), percentage: ((current / total) * 100).toFixed(1) }); } let isImportCancelled = false; // Add cancel function function cancelImport() { isImportCancelled = true; outputProgress({ status: 'cancelled', operation: 'Import cancelled' }); } async function setupSshTunnel() { return new Promise((resolve, reject) => { const ssh = new Client(); ssh.on('ready', () => { ssh.forwardOut( '127.0.0.1', 0, prodDbConfig.host, prodDbConfig.port, async (err, stream) => { if (err) reject(err); resolve({ ssh, stream }); } ); }).connect(sshConfig); }); } async function importCategories(prodConnection, localConnection) { outputProgress({ operation: 'Starting categories import', status: 'running' }); const startTime = Date.now(); const typeOrder = [10, 20, 11, 21, 12, 13]; let totalInserted = 0; let skippedCategories = []; try { // Process each type in order with its own query for (const type of typeOrder) { const [categories] = await prodConnection.query(` SELECT pc.cat_id, pc.name, pc.type, CASE WHEN pc.type IN (10, 20) THEN NULL -- Top level categories should have no parent WHEN pc.master_cat_id IS NULL THEN NULL ELSE pc.master_cat_id END as parent_id, pc.combined_name as description FROM product_categories pc WHERE pc.type = ? ORDER BY pc.cat_id `, [type]); if (categories.length === 0) continue; console.log(`\nProcessing ${categories.length} type ${type} categories`); // For types that can have parents (11, 21, 12, 13), verify parent existence let categoriesToInsert = categories; if (![10, 20].includes(type)) { // Get all parent IDs const parentIds = [...new Set(categories.map(c => c.parent_id).filter(id => id !== null))]; // Check which parents exist const [existingParents] = await localConnection.query( 'SELECT cat_id FROM categories WHERE cat_id IN (?)', [parentIds] ); const existingParentIds = new Set(existingParents.map(p => p.cat_id)); // Filter categories and track skipped ones categoriesToInsert = categories.filter(cat => cat.parent_id === null || existingParentIds.has(cat.parent_id) ); const invalidCategories = categories.filter(cat => cat.parent_id !== null && !existingParentIds.has(cat.parent_id) ); if (invalidCategories.length > 0) { const skippedInfo = invalidCategories.map(c => ({ id: c.id, name: c.name, type: c.type, missing_parent: c.parent_id })); skippedCategories.push(...skippedInfo); console.log('\nSkipping categories with missing parents:', invalidCategories.map(c => `${c.id} - ${c.name} (missing parent: ${c.parent_id})`).join('\n') ); } if (categoriesToInsert.length === 0) { console.log(`No valid categories of type ${type} to insert - all had missing parents`); continue; } } console.log(`Inserting ${categoriesToInsert.length} type ${type} categories`); const placeholders = categoriesToInsert.map(() => '(?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)' ).join(','); const values = categoriesToInsert.flatMap(cat => [ cat.cat_id, cat.name, cat.type, cat.parent_id, cat.description, 'active' ]); // Insert categories and create relationships in one query to avoid race conditions await localConnection.query(` INSERT INTO categories (cat_id, name, type, parent_id, description, status, created_at, updated_at) VALUES ${placeholders} ON DUPLICATE KEY UPDATE name = VALUES(name), type = VALUES(type), parent_id = VALUES(parent_id), description = VALUES(description), status = VALUES(status), updated_at = CURRENT_TIMESTAMP `, values); totalInserted += categoriesToInsert.length; updateProgress(totalInserted, totalInserted, 'Categories import', startTime); } // After all imports, if we skipped any categories, throw an error if (skippedCategories.length > 0) { const error = new Error('Categories import completed with errors - some categories were skipped due to missing parents'); error.skippedCategories = skippedCategories; throw error; } outputProgress({ status: 'complete', operation: 'Categories import completed', current: totalInserted, total: totalInserted, duration: formatDuration((Date.now() - startTime) / 1000) }); } catch (error) { console.error('Error importing categories:', error); if (error.skippedCategories) { console.error('Skipped categories:', JSON.stringify(error.skippedCategories, null, 2)); } throw error; } } async function importProducts(prodConnection, localConnection) { outputProgress({ operation: 'Starting products import', status: 'running' }); const startTime = Date.now(); try { // Get products from production const [rows] = await prodConnection.query(` SELECT p.pid AS product_id, p.description AS title, p.notes AS description, p.itemnumber AS SKU, p.date_created AS created_at, p.datein AS first_received, p.location AS location, ( SELECT i.available_local - COALESCE( ( SELECT SUM(oi.qty_ordered - oi.qty_placed) FROM order_items oi JOIN _order o ON oi.order_id = o.order_id WHERE oi.prod_pid = i.pid AND o.date_placed != '0000-00-00 00:00:00' AND o.date_shipped = '0000-00-00 00:00:00' AND oi.pick_finished = 0 AND oi.qty_back = 0 AND o.order_status != 15 AND o.order_status < 90 AND oi.qty_ordered >= oi.qty_placed AND oi.qty_ordered > 0 ), 0 ) AS stock_quantity FROM shop_inventory i WHERE i.pid = p.pid AND i.store = 0 AND i.show + i.buyable > 0 LIMIT 1 ) AS stock_quantity, ci.onpreorder AS preorder_count, pnb.inventory AS notions_inv_count, ( SELECT price_each FROM product_current_prices WHERE pid = p.pid AND active = 1 ORDER BY qty_buy ASC LIMIT 1 ) AS price, p.sellingprice AS regular_price, ( SELECT ROUND(AVG(costeach), 5) FROM product_inventory WHERE pid = p.pid AND COUNT > 0 ) AS cost_price, NULL AS landing_cost_price, p.upc AS barcode, p.harmonized_tariff_code AS harmonized_tariff_code, p.stamp AS updated_at, CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible, CASE WHEN p.reorder >= 0 THEN 1 ELSE 0 END AS replenishable, s.companyname AS vendor, sid.supplier_itemnumber AS vendor_reference, sid.notions_itemnumber AS notions_reference, CONCAT('https://www.acherryontop.com/shop/product/', p.pid) AS permalink, ( SELECT GROUP_CONCAT(pc.name SEPARATOR ', ') FROM product_category_index pci JOIN product_categories pc ON pci.cat_id = pc.cat_id WHERE pci.pid = p.pid AND pc.hidden = 0 ) AS categories, ( SELECT CONCAT('https://sbing.com/i/products/0000/', SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/', p.pid, '-t-', PI.iid, '.jpg') FROM product_images PI WHERE PI.pid = p.pid AND PI.hidden = 0 ORDER BY PI.order DESC, PI.iid LIMIT 1 ) AS image, ( SELECT CONCAT('https://sbing.com/i/products/0000/', SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/', p.pid, '-175x175-', PI.iid, '.jpg') FROM product_images PI WHERE PI.pid = p.pid AND PI.hidden = 0 AND PI.width = 175 ORDER BY PI.order DESC, PI.iid LIMIT 1 ) AS image_175, ( SELECT CONCAT('https://sbing.com/i/products/0000/', SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/', p.pid, '-o-', PI.iid, '.jpg') FROM product_images PI WHERE PI.pid = p.pid AND PI.hidden = 0 ORDER BY PI.width DESC, PI.height DESC, PI.iid LIMIT 1 ) AS image_full, ( SELECT name FROM product_categories WHERE cat_id = p.company ) AS brand, ( SELECT name FROM product_categories WHERE cat_id = p.line ) AS line, ( SELECT name FROM product_categories WHERE cat_id = p.subline ) AS subline, ( SELECT name FROM product_categories WHERE cat_id = p.artist ) AS artist, NULL AS options, NULL AS tags, COALESCE( CASE WHEN sid.supplier_id = 92 THEN sid.notions_qty_per_unit ELSE sid.supplier_qty_per_unit END, sid.notions_qty_per_unit ) AS moq, NULL AS uom, p.rating, p.rating_votes AS reviews, p.weight, p.length, p.width, p.height, ( SELECT COUNT(*) FROM mybasket mb WHERE mb.item = p.pid AND mb.qty > 0 ) AS baskets, ( SELECT COUNT(*) FROM product_notify pn WHERE pn.pid = p.pid ) AS notifies, p.totalsold AS total_sold, p.country_of_origin as country_of_origin, pls.date_sold as date_last_sold FROM products p LEFT JOIN current_inventory ci ON p.pid = ci.pid LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0 LEFT JOIN supplier_item_data sid ON p.pid = sid.pid LEFT JOIN suppliers s ON sid.supplier_id = s.supplierid LEFT JOIN product_category_index pci ON p.pid = pci.pid LEFT JOIN product_categories pc ON pci.cat_id = pc.cat_id LEFT JOIN product_last_sold pls ON p.pid = pls.pid WHERE p.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) GROUP BY p.pid `); let current = 0; const total = rows.length; // Process products in batches const BATCH_SIZE = 100; for (let i = 0; i < rows.length; i += BATCH_SIZE) { const batch = rows.slice(i, i + BATCH_SIZE); console.log(`Inserting ${batch.length} products`); // First insert the products const values = batch.flatMap(r => [ r.product_id, r.title, r.description, r.SKU, r.created_at, r.first_received, r.stock_quantity, r.preorder_count, r.notions_inv_count, r.price, r.regular_price, r.cost_price, r.landing_cost_price, r.barcode, r.harmonized_tariff_code, r.updated_at, r.visible, 1, r.replenishable, r.vendor, r.vendor_reference, r.notions_reference, r.permalink, r.image, r.image_175, r.image_full, r.brand, r.line, r.subline, r.artist, r.options, r.tags, r.moq, r.uom, r.rating, r.reviews, r.weight, r.length, r.width, r.height, r.country_of_origin, r.location, r.total_sold, r.baskets, r.notifies, r.date_last_sold ]); // Get the number of columns from the first row's values const columnsPerRow = values.length / batch.length; const placeholders = batch.map(() => `(${Array(columnsPerRow).fill('?').join(',')})` ).join(','); const sql = `INSERT INTO products ( pid, title, description, SKU, created_at, first_received, stock_quantity, preorder_count, notions_inv_count, price, regular_price, cost_price, landing_cost_price, barcode, harmonized_tariff_code, updated_at, visible, managing_stock, replenishable, vendor, vendor_reference, notions_reference, permalink, image, image_175, image_full, brand, line, subline, artist, options, tags, moq, uom, rating, reviews, weight, length, width, height, country_of_origin, location, total_sold, baskets, notifies, date_last_sold ) VALUES ${placeholders} ON DUPLICATE KEY UPDATE title = VALUES(title), description = VALUES(description), stock_quantity = VALUES(stock_quantity), preorder_count = VALUES(preorder_count), notions_inv_count = VALUES(notions_inv_count), price = VALUES(price), regular_price = VALUES(regular_price), cost_price = VALUES(cost_price), landing_cost_price = VALUES(landing_cost_price), barcode = VALUES(barcode), harmonized_tariff_code = VALUES(harmonized_tariff_code), updated_at = VALUES(updated_at), visible = VALUES(visible), replenishable = VALUES(replenishable), vendor = VALUES(vendor), vendor_reference = VALUES(vendor_reference), notions_reference = VALUES(notions_reference), permalink = VALUES(permalink), image = VALUES(image), image_175 = VALUES(image_175), image_full = VALUES(image_full), brand = VALUES(brand), line = VALUES(line), subline = VALUES(subline), artist = VALUES(artist), options = VALUES(options), tags = VALUES(tags), moq = VALUES(moq), uom = VALUES(uom), rating = VALUES(rating), reviews = VALUES(reviews), weight = VALUES(weight), length = VALUES(length), width = VALUES(width), height = VALUES(height), country_of_origin = VALUES(country_of_origin), location = VALUES(location), total_sold = VALUES(total_sold), baskets = VALUES(baskets), notifies = VALUES(notifies), date_last_sold = VALUES(date_last_sold)`; await localConnection.query(sql, values); // Then insert the category relationships for this batch const categoryPromises = batch.map(async product => { // Get the category IDs from the production query const [categoryRows] = await prodConnection.query(` SELECT DISTINCT cat_id FROM product_category_index WHERE pid = ? AND cat_id IN ( SELECT cat_id FROM product_categories WHERE hidden = 0 ) `, [product.product_id]); return categoryRows.map(row => [row.cat_id, product.product_id]); }); const categoryResults = await Promise.all(categoryPromises); const categoryRelationships = categoryResults.flat(); if (categoryRelationships.length > 0) { const catPlaceholders = categoryRelationships.map(() => '(?, ?)').join(','); await localConnection.query(` INSERT INTO product_categories (cat_id, pid) VALUES ${catPlaceholders} ON DUPLICATE KEY UPDATE cat_id = VALUES(cat_id) `, categoryRelationships.flat()); } current += batch.length; updateProgress(current, total, 'Products import', startTime); } outputProgress({ status: 'complete', operation: 'Products import completed', current: total, total, duration: formatDuration((Date.now() - startTime) / 1000) }); } catch (error) { console.error('Error importing products:', error); throw error; } } async function importProductCategories(prodConnection, localConnection) { outputProgress({ operation: 'Starting product categories import', status: 'running' }); const startTime = Date.now(); try { // Get product category relationships from production const [rows] = await prodConnection.query(` SELECT DISTINCT pci.pid, pci.cat_id FROM product_category_index pci JOIN product_categories pc ON pci.cat_id = pc.cat_id WHERE pc.hidden = 0 `); let current = 0; const total = rows.length; // Process in batches const BATCH_SIZE = 500; for (let i = 0; i < rows.length; i += BATCH_SIZE) { const batch = rows.slice(i, i + BATCH_SIZE); // Create placeholders for batch insert const placeholders = batch.map(() => '(?, ?)').join(','); // Flatten values for batch insert const values = batch.flatMap(row => [row.cat_id, row.pid]); await localConnection.query(` INSERT INTO product_categories (cat_id, pid) VALUES ${placeholders} ON DUPLICATE KEY UPDATE cat_id = VALUES(cat_id) `, values); current += batch.length; updateProgress(current, total, 'Product categories import', startTime); } outputProgress({ status: 'complete', operation: 'Product categories import completed', current: total, total, duration: formatDuration((Date.now() - startTime) / 1000) }); } catch (error) { console.error('Error importing product categories:', error); throw error; } } async function importOrders(prodConnection, localConnection) { outputProgress({ operation: 'Starting orders import', status: 'running' }); const startTime = Date.now(); try { // Get orders from production const [rows] = await prodConnection.query(` SELECT oi.order_id AS order_number, oi.prod_pid AS product_id, oi.prod_itemnumber AS SKU, o.date_placed_onlydate AS date, oi.prod_price_reg AS price, oi.qty_ordered AS quantity, (oi.prod_price_reg - oi.prod_price) AS discount, COALESCE(( SELECT otp.item_taxes_to_collect FROM order_tax_info oti JOIN order_tax_info_products otp ON oti.taxinfo_id = otp.taxinfo_id WHERE oti.order_id = o.order_id AND otp.pid = oi.prod_pid ORDER BY oti.stamp DESC LIMIT 1 ), 0) AS tax, 0 AS tax_included, COALESCE(ROUND( ((o.summary_shipping - COALESCE(o.summary_discount_shipping, 0)) * (oi.prod_price * oi.qty_ordered) / NULLIF(o.summary_subtotal, 0)), 2 ), 0) as shipping, o.order_cid AS customer, CONCAT(COALESCE(u.firstname, ''), ' ', COALESCE(u.lastname, '')) AS customer_name, CASE WHEN o.order_status = 15 THEN 1 ELSE 0 END AS canceled FROM _order o JOIN order_items oi ON o.order_id = oi.order_id LEFT JOIN users u ON o.order_cid = u.cid WHERE o.order_status >= 15 AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) ORDER BY o.date_placed_onlydate DESC `); let current = 0; const total = rows.length; // Process in batches const BATCH_SIZE = 500; for (let i = 0; i < rows.length; i += BATCH_SIZE) { const batch = rows.slice(i, i + BATCH_SIZE); // Create placeholders for batch insert const placeholders = batch.map(() => '(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)' ).join(','); // Flatten values for batch insert const values = batch.flatMap(row => [ row.order_number, row.product_id, row.SKU, row.date, row.price, row.quantity, row.discount, row.tax, row.tax_included, row.shipping, row.customer, row.customer_name, row.canceled ]); await localConnection.query(` INSERT INTO orders ( order_number, product_id, SKU, date, price, quantity, discount, tax, tax_included, shipping, customer, customer_name, canceled ) VALUES ${placeholders} ON DUPLICATE KEY UPDATE price = VALUES(price), quantity = VALUES(quantity), discount = VALUES(discount), tax = VALUES(tax), tax_included = VALUES(tax_included), shipping = VALUES(shipping), customer_name = VALUES(customer_name), canceled = VALUES(canceled) `, values); current += batch.length; updateProgress(current, total, 'Orders import', startTime); } outputProgress({ status: 'complete', operation: 'Orders import completed', current: total, total, duration: formatDuration((Date.now() - startTime) / 1000) }); } catch (error) { console.error('Error importing orders:', error); throw error; } } async function importPurchaseOrders(prodConnection, localConnection) { outputProgress({ operation: 'Starting purchase orders import', status: 'running' }); const startTime = Date.now(); try { // Get purchase orders from production const [rows] = await prodConnection.query(` SELECT po.po_id, s.companyname as vendor, po.date_ordered as date, po.date_estin as expected_date, pop.pid as product_id, p.itemnumber as sku, COALESCE(rp.cost_each, pop.cost_each, 0) as cost_price, CASE WHEN po.status = 0 THEN 'canceled' WHEN po.status = 1 THEN 'created' WHEN po.status = 10 THEN 'electronically_ready_send' WHEN po.status = 11 THEN 'ordered' WHEN po.status = 12 THEN 'preordered' WHEN po.status = 13 THEN 'electronically_sent' WHEN po.status = 15 THEN 'receiving_started' WHEN po.status = 50 THEN 'closed' ELSE 'unknown' END as status, NULLIF(CONCAT_WS(' ', NULLIF(po.short_note, ''), NULLIF(po.notes, '')), '') as notes, pop.qty_each as ordered, COALESCE(rp.received_qty, 0) as received, rp.received_date, rp.received_by FROM po JOIN po_products pop ON po.po_id = pop.po_id JOIN products p ON pop.pid = p.pid JOIN suppliers s ON po.supplier_id = s.supplierid LEFT JOIN receivings r ON po.po_id = r.po_id LEFT JOIN receivings_products rp ON r.receiving_id = rp.receiving_id AND pop.pid = rp.pid WHERE po.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) ORDER BY po.date_ordered DESC `); let current = 0; const total = rows.length; // Process in batches const BATCH_SIZE = 500; for (let i = 0; i < rows.length; i += BATCH_SIZE) { const batch = rows.slice(i, i + BATCH_SIZE); // Create placeholders for batch insert const placeholders = batch.map(() => '(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)' ).join(','); // Flatten values for batch insert const values = batch.flatMap(row => [ row.po_id, row.vendor, row.date, row.expected_date, row.product_id, row.sku, row.cost_price, row.status, row.notes, row.ordered, row.received, row.received_date, row.received_by ]); await localConnection.query(` INSERT INTO purchase_orders ( po_id, vendor, date, expected_date, product_id, sku, cost_price, status, notes, ordered, received, received_date, received_by ) VALUES ${placeholders} ON DUPLICATE KEY UPDATE vendor = VALUES(vendor), expected_date = VALUES(expected_date), cost_price = VALUES(cost_price), status = VALUES(status), notes = VALUES(notes), ordered = VALUES(ordered), received = VALUES(received), received_date = VALUES(received_date), received_by = VALUES(received_by) `, values); current += batch.length; updateProgress(current, total, 'Purchase orders import', startTime); } outputProgress({ status: 'complete', operation: 'Purchase orders import completed', current: total, total, duration: formatDuration((Date.now() - startTime) / 1000) }); } catch (error) { console.error('Error importing purchase orders:', error); throw error; } } // Modify main function to handle cancellation and avoid process.exit async function main() { let ssh; let prodConnection; let localConnection; try { outputProgress({ status: 'running', operation: 'Starting import process', message: 'Setting up connections...' }); // Set up connections const tunnel = await setupSshTunnel(); ssh = tunnel.ssh; prodConnection = await mysql.createConnection({ ...prodDbConfig, stream: tunnel.stream }); localConnection = await mysql.createPool(localDbConfig); if (isImportCancelled) throw new Error('Import cancelled'); // First import all categories await importCategories(prodConnection, localConnection); if (isImportCancelled) throw new Error('Import cancelled'); // Then import products await importProducts(prodConnection, localConnection); if (isImportCancelled) throw new Error('Import cancelled'); // Then import product-category relationships await importProductCategories(prodConnection, localConnection); if (isImportCancelled) throw new Error('Import cancelled'); await importOrders(prodConnection, localConnection); if (isImportCancelled) throw new Error('Import cancelled'); await importPurchaseOrders(prodConnection, localConnection); if (isImportCancelled) throw new Error('Import cancelled'); outputProgress({ status: 'complete', operation: 'Import process completed' }); } catch (error) { console.error('Error during import process:', error); outputProgress({ status: error.message === 'Import cancelled' ? 'cancelled' : 'error', operation: 'Import process', error: error.message }); throw error; } finally { if (prodConnection) await prodConnection.end(); if (localConnection) await localConnection.end(); if (ssh) ssh.end(); } } // Run the import only if this is the main module if (require.main === module) { main().catch(error => { console.error('Unhandled error in main process:', error); process.exit(1); }); } // Export the functions needed by the route module.exports = { main, outputProgress, cancelImport };