const mysql = require('mysql2/promise'); const { Client } = require('ssh2'); const dotenv = require('dotenv'); const path = require('path'); dotenv.config({ path: path.join(__dirname, '../.env') }); // SSH configuration const sshConfig = { host: process.env.PROD_SSH_HOST, port: process.env.PROD_SSH_PORT || 22, username: process.env.PROD_SSH_USER, privateKey: process.env.PROD_SSH_KEY_PATH ? require('fs').readFileSync(process.env.PROD_SSH_KEY_PATH) : undefined }; // Production database configuration const prodDbConfig = { host: process.env.PROD_DB_HOST || 'localhost', user: process.env.PROD_DB_USER, password: process.env.PROD_DB_PASSWORD, database: process.env.PROD_DB_NAME, port: process.env.PROD_DB_PORT || 3306 }; // Local database configuration const localDbConfig = { host: process.env.DB_HOST, user: process.env.DB_USER, password: process.env.DB_PASSWORD, database: process.env.DB_NAME, multipleStatements: true, waitForConnections: true, connectionLimit: 10, queueLimit: 0, namedPlaceholders: true }; // Helper function to output progress function outputProgress(data) { process.stdout.write(JSON.stringify(data) + '\n'); } // Helper function to format duration function formatDuration(seconds) { const hours = Math.floor(seconds / 3600); const minutes = Math.floor((seconds % 3600) / 60); seconds = Math.floor(seconds % 60); const parts = []; if (hours > 0) parts.push(`${hours}h`); if (minutes > 0) parts.push(`${minutes}m`); if (seconds > 0 || parts.length === 0) parts.push(`${seconds}s`); return parts.join(' '); } // Helper function to update progress with time estimate function updateProgress(current, total, operation, startTime) { const elapsed = (Date.now() - startTime) / 1000; const rate = current / elapsed; const remaining = (total - current) / rate; outputProgress({ status: 'running', operation, current, total, rate, elapsed: formatDuration(elapsed), remaining: formatDuration(remaining), percentage: ((current / total) * 100).toFixed(1) }); } let isImportCancelled = false; // Add cancel function function cancelImport() { isImportCancelled = true; outputProgress({ status: 'cancelled', operation: 'Import cancelled' }); } async function setupSshTunnel() { return new Promise((resolve, reject) => { const ssh = new Client(); ssh.on('ready', () => { ssh.forwardOut( '127.0.0.1', 0, prodDbConfig.host, prodDbConfig.port, async (err, stream) => { if (err) reject(err); resolve({ ssh, stream }); } ); }).connect(sshConfig); }); } async function importCategories(prodConnection, localConnection) { outputProgress({ operation: 'Starting categories import', status: 'running' }); const startTime = Date.now(); const typeOrder = [10, 20, 11, 21, 12, 13]; let totalInserted = 0; let skippedCategories = []; try { // Process each type in order with its own query for (const type of typeOrder) { const [categories] = await prodConnection.query(` SELECT pc.cat_id, pc.name, pc.type, CASE WHEN pc.type IN (10, 20) THEN NULL -- Top level categories should have no parent WHEN pc.master_cat_id IS NULL THEN NULL ELSE pc.master_cat_id END as parent_id, pc.combined_name as description FROM product_categories pc WHERE pc.type = ? ORDER BY pc.cat_id `, [type]); if (categories.length === 0) continue; console.log(`\nProcessing ${categories.length} type ${type} categories`); if (type === 10) { console.log('Type 10 categories:', JSON.stringify(categories, null, 2)); } // For types that can have parents (11, 21, 12, 13), verify parent existence let categoriesToInsert = categories; if (![10, 20].includes(type)) { // Get all parent IDs const parentIds = [...new Set(categories.map(c => c.parent_id).filter(id => id !== null))]; // Check which parents exist const [existingParents] = await localConnection.query( 'SELECT cat_id FROM categories WHERE cat_id IN (?)', [parentIds] ); const existingParentIds = new Set(existingParents.map(p => p.cat_id)); // Filter categories and track skipped ones categoriesToInsert = categories.filter(cat => cat.parent_id === null || existingParentIds.has(cat.parent_id) ); const invalidCategories = categories.filter(cat => cat.parent_id !== null && !existingParentIds.has(cat.parent_id) ); if (invalidCategories.length > 0) { const skippedInfo = invalidCategories.map(c => ({ id: c.cat_id, name: c.name, type: c.type, missing_parent: c.parent_id })); skippedCategories.push(...skippedInfo); console.log('\nSkipping categories with missing parents:', invalidCategories.map(c => `${c.cat_id} - ${c.name} (missing parent: ${c.parent_id})`).join('\n') ); } if (categoriesToInsert.length === 0) { console.log(`No valid categories of type ${type} to insert - all had missing parents`); continue; } } console.log(`Inserting ${categoriesToInsert.length} type ${type} categories`); const placeholders = categoriesToInsert.map(() => '(?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)' ).join(','); const values = categoriesToInsert.flatMap(cat => [ cat.cat_id, cat.name, cat.type, cat.parent_id, cat.description, 'active' ]); // Insert categories and create relationships in one query to avoid race conditions await localConnection.query(` INSERT INTO categories (cat_id, name, type, parent_id, description, status, created_at, updated_at) VALUES ${placeholders} ON DUPLICATE KEY UPDATE name = VALUES(name), type = VALUES(type), parent_id = VALUES(parent_id), description = VALUES(description), status = VALUES(status), updated_at = CURRENT_TIMESTAMP `, values); totalInserted += categoriesToInsert.length; updateProgress(totalInserted, totalInserted, 'Categories import', startTime); } // After all imports, if we skipped any categories, throw an error if (skippedCategories.length > 0) { const error = new Error('Categories import completed with errors - some categories were skipped due to missing parents'); error.skippedCategories = skippedCategories; throw error; } outputProgress({ status: 'complete', operation: 'Categories import completed', current: totalInserted, total: totalInserted, duration: formatDuration((Date.now() - startTime) / 1000) }); } catch (error) { console.error('Error importing categories:', error); if (error.skippedCategories) { console.error('Skipped categories:', JSON.stringify(error.skippedCategories, null, 2)); } throw error; } } async function importProducts(prodConnection, localConnection) { outputProgress({ operation: 'Starting products import', status: 'running' }); const startTime = Date.now(); try { // First get the column names from the table structure const [columns] = await localConnection.query(` SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'products' ORDER BY ORDINAL_POSITION `); const columnNames = columns.map(col => col.COLUMN_NAME); // Get products from production with optimized query const [rows] = await prodConnection.query(` SELECT p.pid, p.description AS title, p.notes AS description, p.itemnumber AS SKU, p.date_created, p.datein AS first_received, p.location, COALESCE(si.available_local, 0) - COALESCE( (SELECT SUM(oi.qty_ordered - oi.qty_placed) FROM order_items oi JOIN _order o ON oi.order_id = o.order_id WHERE oi.prod_pid = p.pid AND o.date_placed != '0000-00-00 00:00:00' AND o.date_shipped = '0000-00-00 00:00:00' AND oi.pick_finished = 0 AND oi.qty_back = 0 AND o.order_status != 15 AND o.order_status < 90 AND oi.qty_ordered >= oi.qty_placed AND oi.qty_ordered > 0), 0) AS stock_quantity, ci.onpreorder AS preorder_count, pnb.inventory AS notions_inv_count, COALESCE(pcp.price_each, 0) as price, COALESCE(p.sellingprice, 0) AS regular_price, COALESCE((SELECT ROUND(AVG(costeach), 5) FROM product_inventory WHERE pid = p.pid AND COUNT > 0), 0) AS cost_price, NULL AS landing_cost_price, p.upc AS barcode, p.harmonized_tariff_code, p.stamp AS updated_at, CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible, CASE WHEN p.reorder >= 0 THEN 1 ELSE 0 END AS replenishable, s.companyname AS vendor, sid.supplier_itemnumber AS vendor_reference, sid.notions_itemnumber AS notions_reference, CONCAT('https://www.acherryontop.com/shop/product/', p.pid) AS permalink, (SELECT CONCAT('https://sbing.com/i/products/0000/', SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/', p.pid, '-t-', MIN(PI.iid), '.jpg') FROM product_images PI WHERE PI.pid = p.pid AND PI.hidden = 0) AS image, (SELECT CONCAT('https://sbing.com/i/products/0000/', SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/', p.pid, '-175x175-', MIN(PI.iid), '.jpg') FROM product_images PI WHERE PI.pid = p.pid AND PI.hidden = 0 AND PI.width = 175) AS image_175, (SELECT CONCAT('https://sbing.com/i/products/0000/', SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/', p.pid, '-o-', MIN(PI.iid), '.jpg') FROM product_images PI WHERE PI.pid = p.pid AND PI.hidden = 0) AS image_full, pc1.name AS brand, pc2.name AS line, pc3.name AS subline, pc4.name AS artist, NULL AS options, NULL AS tags, COALESCE(CASE WHEN sid.supplier_id = 92 THEN sid.notions_qty_per_unit ELSE sid.supplier_qty_per_unit END, sid.notions_qty_per_unit) AS moq, NULL AS uom, p.rating, p.rating_votes AS reviews, p.weight, p.length, p.width, p.height, (SELECT COUNT(*) FROM mybasket mb WHERE mb.item = p.pid AND mb.qty > 0) AS baskets, (SELECT COUNT(*) FROM product_notify pn WHERE pn.pid = p.pid) AS notifies, p.totalsold AS total_sold, p.country_of_origin, pls.date_sold as date_last_sold, GROUP_CONCAT(DISTINCT CASE WHEN pc.cat_id IS NOT NULL THEN pci.cat_id END) as category_ids FROM products p LEFT JOIN current_inventory ci ON p.pid = ci.pid LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0 LEFT JOIN supplier_item_data sid ON p.pid = sid.pid LEFT JOIN suppliers s ON sid.supplier_id = s.supplierid LEFT JOIN product_category_index pci ON p.pid = pci.pid LEFT JOIN product_categories pc ON pci.cat_id = pc.cat_id AND pc.type IN (10, 20, 11, 21, 12, 13) LEFT JOIN product_categories pc1 ON p.company = pc1.cat_id LEFT JOIN product_categories pc2 ON p.line = pc2.cat_id LEFT JOIN product_categories pc3 ON p.subline = pc3.cat_id LEFT JOIN product_categories pc4 ON p.artist = pc4.cat_id LEFT JOIN product_last_sold pls ON p.pid = pls.pid LEFT JOIN ( SELECT pid, MIN(price_each) as price_each FROM product_current_prices WHERE active = 1 GROUP BY pid ) pcp ON p.pid = pcp.pid WHERE p.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) GROUP BY p.pid `); let current = 0; const total = rows.length; // Process products in larger batches const BATCH_SIZE = 100; for (let i = 0; i < rows.length; i += BATCH_SIZE) { const batch = rows.slice(i, i + BATCH_SIZE); // Prepare product values and category relationships in parallel const productValues = []; const categoryRelationships = []; batch.forEach(row => { // Map values in the same order as columns const rowValues = columnNames.map(col => { const val = row[col] ?? null; if (col === 'managing_stock') return 1; if (typeof val === 'number') return val || 0; return val; }); productValues.push(...rowValues); // Add category relationships if (row.category_ids) { const catIds = row.category_ids.split(',') .map(id => id.trim()) .filter(id => id) .map(Number); catIds.forEach(catId => { if (catId) categoryRelationships.push([catId, row.pid]); }); } }); // Generate placeholders based on column count const placeholderGroup = `(${Array(columnNames.length).fill('?').join(',')})`; const productPlaceholders = Array(batch.length).fill(placeholderGroup).join(','); // Build the query dynamically const insertQuery = ` INSERT INTO products (${columnNames.join(',')}) VALUES ${productPlaceholders} ON DUPLICATE KEY UPDATE ${ columnNames .filter(col => col !== 'pid') .map(col => `${col} = VALUES(${col})`) .join(',') } `; // First insert the products and wait for it to complete await localConnection.query(insertQuery, productValues); // Now that products are inserted, handle category relationships if (categoryRelationships.length > 0) { // Get unique category IDs to verify they exist const uniqueCatIds = [...new Set(categoryRelationships.map(([catId]) => catId))]; console.log('Checking categories:', uniqueCatIds); // Check which categories exist const [existingCats] = await localConnection.query( 'SELECT cat_id FROM categories WHERE cat_id IN (?)', [uniqueCatIds] ); const existingCatIds = new Set(existingCats.map(c => c.cat_id)); // Log missing categories const missingCatIds = uniqueCatIds.filter(id => !existingCatIds.has(id)); if (missingCatIds.length > 0) { console.error('Missing categories:', missingCatIds); // Query production to see what these categories are const [missingCats] = await prodConnection.query(` SELECT cat_id, name, type, master_cat_id, hidden FROM product_categories WHERE cat_id IN (?) `, [missingCatIds]); console.error('Missing category details:', missingCats); throw new Error('Missing categories found - import needs to be fixed'); } // Verify products exist before inserting relationships const productIds = [...new Set(categoryRelationships.map(([_, pid]) => pid))]; const [existingProducts] = await localConnection.query( 'SELECT pid FROM products WHERE pid IN (?)', [productIds] ); const existingProductIds = new Set(existingProducts.map(p => p.pid)); // Filter relationships to only include existing products const validRelationships = categoryRelationships.filter(([_, pid]) => existingProductIds.has(pid)); if (validRelationships.length > 0) { const catPlaceholders = validRelationships.map(() => '(?, ?)').join(','); await localConnection.query(` INSERT INTO product_categories (cat_id, pid) VALUES ${catPlaceholders} ON DUPLICATE KEY UPDATE cat_id = VALUES(cat_id) `, validRelationships.flat()); } } current += batch.length; updateProgress(current, total, 'Products import', startTime); } outputProgress({ status: 'complete', operation: 'Products import completed', current: total, total, duration: formatDuration((Date.now() - startTime) / 1000) }); } catch (error) { console.error('Error importing products:', error); throw error; } } async function importOrders(prodConnection, localConnection) { outputProgress({ operation: 'Starting orders import', status: 'running' }); const startTime = Date.now(); try { // Get orders from production const [rows] = await prodConnection.query(` SELECT oi.order_id AS order_number, oi.prod_pid AS product_id, oi.prod_itemnumber AS SKU, o.date_placed_onlydate AS date, oi.prod_price_reg AS price, oi.qty_ordered AS quantity, (oi.prod_price_reg - oi.prod_price) AS discount, COALESCE(( SELECT otp.item_taxes_to_collect FROM order_tax_info oti JOIN order_tax_info_products otp ON oti.taxinfo_id = otp.taxinfo_id WHERE oti.order_id = o.order_id AND otp.pid = oi.prod_pid ORDER BY oti.stamp DESC LIMIT 1 ), 0) AS tax, 0 AS tax_included, COALESCE(ROUND( ((o.summary_shipping - COALESCE(o.summary_discount_shipping, 0)) * (oi.prod_price * oi.qty_ordered) / NULLIF(o.summary_subtotal, 0)), 2 ), 0) as shipping, o.order_cid AS customer, CONCAT(COALESCE(u.firstname, ''), ' ', COALESCE(u.lastname, '')) AS customer_name, CASE WHEN o.order_status = 15 THEN 1 ELSE 0 END AS canceled FROM _order o JOIN order_items oi ON o.order_id = oi.order_id LEFT JOIN users u ON o.order_cid = u.cid WHERE o.order_status >= 15 AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) ORDER BY o.date_placed_onlydate DESC `); let current = 0; const total = rows.length; // Process in batches const BATCH_SIZE = 500; for (let i = 0; i < rows.length; i += BATCH_SIZE) { const batch = rows.slice(i, i + BATCH_SIZE); // Create placeholders for batch insert const placeholders = batch.map(() => '(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)' ).join(','); // Flatten values for batch insert const values = batch.flatMap(row => [ row.order_number, row.product_id, row.SKU, row.date, row.price, row.quantity, row.discount, row.tax, row.tax_included, row.shipping, row.customer, row.customer_name, row.canceled ]); await localConnection.query(` INSERT INTO orders ( order_number, product_id, SKU, date, price, quantity, discount, tax, tax_included, shipping, customer, customer_name, canceled ) VALUES ${placeholders} ON DUPLICATE KEY UPDATE price = VALUES(price), quantity = VALUES(quantity), discount = VALUES(discount), tax = VALUES(tax), tax_included = VALUES(tax_included), shipping = VALUES(shipping), customer_name = VALUES(customer_name), canceled = VALUES(canceled) `, values); current += batch.length; updateProgress(current, total, 'Orders import', startTime); } outputProgress({ status: 'complete', operation: 'Orders import completed', current: total, total, duration: formatDuration((Date.now() - startTime) / 1000) }); } catch (error) { console.error('Error importing orders:', error); throw error; } } async function importPurchaseOrders(prodConnection, localConnection) { outputProgress({ operation: 'Starting purchase orders import', status: 'running' }); const startTime = Date.now(); try { // Get purchase orders from production const [rows] = await prodConnection.query(` SELECT po.po_id, s.companyname as vendor, po.date_ordered as date, po.date_estin as expected_date, pop.pid as product_id, p.itemnumber as sku, COALESCE(rp.cost_each, pop.cost_each, 0) as cost_price, CASE WHEN po.status = 0 THEN 'canceled' WHEN po.status = 1 THEN 'created' WHEN po.status = 10 THEN 'electronically_ready_send' WHEN po.status = 11 THEN 'ordered' WHEN po.status = 12 THEN 'preordered' WHEN po.status = 13 THEN 'electronically_sent' WHEN po.status = 15 THEN 'receiving_started' WHEN po.status = 50 THEN 'closed' ELSE 'unknown' END as status, NULLIF(CONCAT_WS(' ', NULLIF(po.short_note, ''), NULLIF(po.notes, '')), '') as notes, pop.qty_each as ordered, COALESCE(rp.received_qty, 0) as received, rp.received_date, rp.received_by FROM po JOIN po_products pop ON po.po_id = pop.po_id JOIN products p ON pop.pid = p.pid JOIN suppliers s ON po.supplier_id = s.supplierid LEFT JOIN receivings r ON po.po_id = r.po_id LEFT JOIN receivings_products rp ON r.receiving_id = rp.receiving_id AND pop.pid = rp.pid WHERE po.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) ORDER BY po.date_ordered DESC `); let current = 0; const total = rows.length; // Process in batches const BATCH_SIZE = 500; for (let i = 0; i < rows.length; i += BATCH_SIZE) { const batch = rows.slice(i, i + BATCH_SIZE); // Create placeholders for batch insert const placeholders = batch.map(() => '(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)' ).join(','); // Flatten values for batch insert const values = batch.flatMap(row => [ row.po_id, row.vendor, row.date, row.expected_date, row.product_id, row.sku, row.cost_price, row.status, row.notes, row.ordered, row.received, row.received_date, row.received_by ]); await localConnection.query(` INSERT INTO purchase_orders ( po_id, vendor, date, expected_date, product_id, sku, cost_price, status, notes, ordered, received, received_date, received_by ) VALUES ${placeholders} ON DUPLICATE KEY UPDATE vendor = VALUES(vendor), expected_date = VALUES(expected_date), cost_price = VALUES(cost_price), status = VALUES(status), notes = VALUES(notes), ordered = VALUES(ordered), received = VALUES(received), received_date = VALUES(received_date), received_by = VALUES(received_by) `, values); current += batch.length; updateProgress(current, total, 'Purchase orders import', startTime); } outputProgress({ status: 'complete', operation: 'Purchase orders import completed', current: total, total, duration: formatDuration((Date.now() - startTime) / 1000) }); } catch (error) { console.error('Error importing purchase orders:', error); throw error; } } // Modify main function to handle cancellation and avoid process.exit async function main() { let ssh; let prodConnection; let localConnection; try { outputProgress({ status: 'running', operation: 'Starting import process', message: 'Setting up connections...' }); // Set up connections const tunnel = await setupSshTunnel(); ssh = tunnel.ssh; prodConnection = await mysql.createConnection({ ...prodDbConfig, stream: tunnel.stream }); localConnection = await mysql.createPool(localDbConfig); if (isImportCancelled) throw new Error('Import cancelled'); // First import all categories await importCategories(prodConnection, localConnection); if (isImportCancelled) throw new Error('Import cancelled'); // Then import products await importProducts(prodConnection, localConnection); if (isImportCancelled) throw new Error('Import cancelled'); await importOrders(prodConnection, localConnection); if (isImportCancelled) throw new Error('Import cancelled'); await importPurchaseOrders(prodConnection, localConnection); if (isImportCancelled) throw new Error('Import cancelled'); outputProgress({ status: 'complete', operation: 'Import process completed' }); } catch (error) { console.error('Error during import process:', error); outputProgress({ status: error.message === 'Import cancelled' ? 'cancelled' : 'error', operation: 'Import process', error: error.message }); throw error; } finally { if (prodConnection) await prodConnection.end(); if (localConnection) await localConnection.end(); if (ssh) ssh.end(); } } // Run the import only if this is the main module if (require.main === module) { main().catch(error => { console.error('Unhandled error in main process:', error); process.exit(1); }); } // Export the functions needed by the route module.exports = { main, outputProgress, cancelImport };