const mysql = require('mysql2/promise'); const { Client } = require('ssh2'); const dotenv = require('dotenv'); const path = require('path'); dotenv.config({ path: path.join(__dirname, '../.env') }); // SSH configuration const sshConfig = { host: process.env.PROD_SSH_HOST, port: process.env.PROD_SSH_PORT || 22, username: process.env.PROD_SSH_USER, privateKey: process.env.PROD_SSH_KEY_PATH ? require('fs').readFileSync(process.env.PROD_SSH_KEY_PATH) : undefined }; // Production database configuration const prodDbConfig = { host: process.env.PROD_DB_HOST || 'localhost', user: process.env.PROD_DB_USER, password: process.env.PROD_DB_PASSWORD, database: process.env.PROD_DB_NAME, port: process.env.PROD_DB_PORT || 3306 }; // Local database configuration const localDbConfig = { host: process.env.DB_HOST, user: process.env.DB_USER, password: process.env.DB_PASSWORD, database: process.env.DB_NAME, multipleStatements: true, waitForConnections: true, connectionLimit: 10, queueLimit: 0, namedPlaceholders: true }; // Helper function to output progress function outputProgress(data) { process.stdout.write(JSON.stringify(data) + '\n'); } // Helper function to format duration function formatDuration(seconds) { const hours = Math.floor(seconds / 3600); const minutes = Math.floor((seconds % 3600) / 60); seconds = Math.floor(seconds % 60); const parts = []; if (hours > 0) parts.push(`${hours}h`); if (minutes > 0) parts.push(`${minutes}m`); if (seconds > 0 || parts.length === 0) parts.push(`${seconds}s`); return parts.join(' '); } // Helper function to update progress with time estimate function updateProgress(current, total, operation, startTime) { const elapsed = (Date.now() - startTime) / 1000; const rate = current / elapsed; const remaining = (total - current) / rate; outputProgress({ status: 'running', operation, current, total, rate, elapsed: formatDuration(elapsed), remaining: formatDuration(remaining), percentage: ((current / total) * 100).toFixed(1) }); } let isImportCancelled = false; // Add cancel function function cancelImport() { isImportCancelled = true; outputProgress({ status: 'cancelled', operation: 'Import cancelled' }); } async function setupSshTunnel() { return new Promise((resolve, reject) => { const ssh = new Client(); ssh.on('ready', () => { ssh.forwardOut( '127.0.0.1', 0, prodDbConfig.host, prodDbConfig.port, async (err, stream) => { if (err) reject(err); resolve({ ssh, stream }); } ); }).connect(sshConfig); }); } async function importCategories(prodConnection, localConnection) { outputProgress({ operation: 'Starting categories import', status: 'running' }); const startTime = Date.now(); try { // First get all categories that we need const [allRows] = await prodConnection.query(` SELECT DISTINCT pc.cat_id as id, pc.name, pc.type, pc.master_cat_id as parent_id, pc.combined_name as description, 'active' as status FROM product_categories pc INNER JOIN product_category_index pci ON pc.cat_id = pci.cat_id INNER JOIN products p ON pci.pid = p.pid WHERE pc.hidden = 0 AND p.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) `); // Separate into root and child categories const rootCategories = allRows.filter(row => !row.parent_id || row.parent_id === 0); const childCategories = allRows.filter(row => row.parent_id && row.parent_id > 0); const total = allRows.length; let current = 0; // First insert root categories if (rootCategories.length > 0) { const placeholders = rootCategories.map(() => '(?, ?, ?, NULL, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)' ).join(','); const values = rootCategories.flatMap(row => [ row.id, row.name, row.type, row.description, row.status ]); await localConnection.query(` INSERT INTO categories (id, name, type, parent_id, description, status, created_at, updated_at) VALUES ${placeholders} ON DUPLICATE KEY UPDATE name = VALUES(name), type = VALUES(type), parent_id = NULL, description = VALUES(description), status = VALUES(status), updated_at = CURRENT_TIMESTAMP `, values); current += rootCategories.length; updateProgress(current, total, 'Categories import (root categories)', startTime); } // Then insert child categories in batches const BATCH_SIZE = 100; for (let i = 0; i < childCategories.length; i += BATCH_SIZE) { const batch = childCategories.slice(i, i + BATCH_SIZE); const placeholders = batch.map(() => '(?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)' ).join(','); const values = batch.flatMap(row => [ row.id, row.name, row.type, row.parent_id, row.description, row.status ]); await localConnection.query(` INSERT INTO categories (id, name, type, parent_id, description, status, created_at, updated_at) VALUES ${placeholders} ON DUPLICATE KEY UPDATE name = VALUES(name), type = VALUES(type), parent_id = VALUES(parent_id), description = VALUES(description), status = VALUES(status), updated_at = CURRENT_TIMESTAMP `, values); current += batch.length; updateProgress(current, total, 'Categories import (child categories)', startTime); } outputProgress({ status: 'complete', operation: 'Categories import completed', current: total, total, duration: formatDuration((Date.now() - startTime) / 1000) }); } catch (error) { console.error('Error importing categories:', error); throw error; } } async function importProducts(prodConnection, localConnection) { outputProgress({ operation: 'Starting products and categories import', status: 'running' }); const startTime = Date.now(); try { // First get all products with their categories const [rows] = await prodConnection.query(` WITH RECURSIVE category_hierarchy AS ( -- Get all categories and their full hierarchy SELECT c.cat_id, c.name, c.type, c.master_cat_id, c.combined_name, 1 as level FROM product_categories c WHERE c.master_cat_id = 0 OR c.master_cat_id IS NULL UNION ALL SELECT c.cat_id, c.name, c.type, c.master_cat_id, c.combined_name, h.level + 1 FROM product_categories c INNER JOIN category_hierarchy h ON c.master_cat_id = h.cat_id ) SELECT p.*, GROUP_CONCAT(DISTINCT CONCAT_WS(':', ch.cat_id, ch.name, ch.type, ch.master_cat_id, ch.combined_name, ch.level ) ORDER BY ch.level ) as categories FROM products p LEFT JOIN product_category_index pci ON p.pid = pci.pid LEFT JOIN category_hierarchy ch ON pci.cat_id = ch.cat_id WHERE p.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) GROUP BY p.pid `); let current = 0; const total = rows.length; // Track categories we need to insert const categories = new Map(); // First pass: collect all categories rows.forEach(row => { if (row.categories) { row.categories.split(',').forEach(catStr => { const [id, name, type, parentId, description, level] = catStr.split(':'); categories.set(id, { id: parseInt(id), name, type, parent_id: parentId === '0' ? null : parseInt(parentId), description, level: parseInt(level), status: 'active' }); }); } }); // Sort categories by level to ensure parents are inserted first const sortedCategories = Array.from(categories.values()) .sort((a, b) => a.level - b.level); // Insert categories level by level const levels = [...new Set(sortedCategories.map(c => c.level))]; outputProgress({ status: 'running', operation: 'Importing categories by level', current: 0, total: sortedCategories.length }); let insertedCategories = 0; for (const level of levels) { const levelCategories = sortedCategories.filter(c => c.level === level); if (levelCategories.length > 0) { const placeholders = levelCategories.map(() => '(?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)' ).join(','); const values = levelCategories.flatMap(cat => [ cat.id, cat.name, cat.type, cat.parent_id, cat.description, cat.status ]); await localConnection.query(` INSERT INTO categories (id, name, type, parent_id, description, status, created_at, updated_at) VALUES ${placeholders} ON DUPLICATE KEY UPDATE name = VALUES(name), type = VALUES(type), parent_id = VALUES(parent_id), description = VALUES(description), status = VALUES(status), updated_at = CURRENT_TIMESTAMP `, values); insertedCategories += levelCategories.length; updateProgress(insertedCategories, sortedCategories.length, 'Categories import', startTime); } } // Now import products in batches const BATCH_SIZE = 100; for (let i = 0; i < rows.length; i += BATCH_SIZE) { const batch = rows.slice(i, i + BATCH_SIZE); const placeholders = batch.map(() => '(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)' ).join(','); const values = batch.flatMap(row => [ row.pid, row.title, row.description || null, row.itemnumber, row.date_created, row.stock_quantity || 0, row.price || 0, row.price_reg || 0, row.cost_each || null, row.cost_landed || null, row.barcode || null, row.harmonized_tariff_code || null, row.visible === 1, row.managing_stock === 1, row.replenishable === 1, row.supplier_name || null, row.supplier_reference || null, row.notions_reference || null, row.permalink || null, row.image || null, row.image_175 || null, row.image_full || null, row.brand || null, row.line || null, row.subline || null, row.artist || null, row.options || null, row.tags || null, row.moq || 1, row.uom || 1, row.rating || null, row.reviews || null, row.weight || null, row.length || null, row.width || null, row.height || null, row.country_of_origin || null, row.location || null, row.total_sold || 0, row.baskets || 0, row.notifies || 0, row.date_last_sold || null ]); await localConnection.query(` INSERT INTO products VALUES ${placeholders} ON DUPLICATE KEY UPDATE title = VALUES(title), description = VALUES(description), stock_quantity = VALUES(stock_quantity), price = VALUES(price), regular_price = VALUES(regular_price), cost_price = VALUES(cost_price), landing_cost_price = VALUES(landing_cost_price), barcode = VALUES(barcode), harmonized_tariff_code = VALUES(harmonized_tariff_code), updated_at = VALUES(updated_at), visible = VALUES(visible), managing_stock = VALUES(managing_stock), replenishable = VALUES(replenishable), vendor = VALUES(vendor), vendor_reference = VALUES(vendor_reference), notions_reference = VALUES(notions_reference), permalink = VALUES(permalink), image = VALUES(image), image_175 = VALUES(image_175), image_full = VALUES(image_full), brand = VALUES(brand), line = VALUES(line), subline = VALUES(subline), artist = VALUES(artist), options = VALUES(options), tags = VALUES(tags), moq = VALUES(moq), uom = VALUES(uom), rating = VALUES(rating), reviews = VALUES(reviews), weight = VALUES(weight), length = VALUES(length), width = VALUES(width), height = VALUES(height), country_of_origin = VALUES(country_of_origin), location = VALUES(location), total_sold = VALUES(total_sold), baskets = VALUES(baskets), notifies = VALUES(notifies), date_last_sold = VALUES(date_last_sold) `, values); current += batch.length; updateProgress(current, total, 'Products import', startTime); } outputProgress({ status: 'complete', operation: 'Products and categories import completed', current: total, total, duration: formatDuration((Date.now() - startTime) / 1000) }); } catch (error) { console.error('Error importing products and categories:', error); throw error; } } async function importProductCategories(prodConnection, localConnection) { outputProgress({ operation: 'Starting product categories import', status: 'running' }); const startTime = Date.now(); try { // Get product category relationships from production const [rows] = await prodConnection.query(` SELECT DISTINCT pci.pid as product_id, pci.cat_id as category_id FROM product_category_index pci JOIN product_categories pc ON pci.cat_id = pc.cat_id WHERE pc.hidden = 0 `); let current = 0; const total = rows.length; // Process in batches const BATCH_SIZE = 500; for (let i = 0; i < rows.length; i += BATCH_SIZE) { const batch = rows.slice(i, i + BATCH_SIZE); // Create placeholders for batch insert const placeholders = batch.map(() => '(?, ?)').join(','); // Flatten values for batch insert const values = batch.flatMap(row => [row.product_id, row.category_id]); await localConnection.query(` INSERT INTO product_categories (product_id, category_id) VALUES ${placeholders} ON DUPLICATE KEY UPDATE category_id = VALUES(category_id) `, values); current += batch.length; updateProgress(current, total, 'Product categories import', startTime); } outputProgress({ status: 'complete', operation: 'Product categories import completed', current: total, total, duration: formatDuration((Date.now() - startTime) / 1000) }); } catch (error) { console.error('Error importing product categories:', error); throw error; } } async function importOrders(prodConnection, localConnection) { outputProgress({ operation: 'Starting orders import', status: 'running' }); const startTime = Date.now(); try { // Get orders from production const [rows] = await prodConnection.query(` SELECT oi.order_id AS order_number, oi.prod_pid AS product_id, oi.prod_itemnumber AS SKU, o.date_placed_onlydate AS date, oi.prod_price_reg AS price, oi.qty_ordered AS quantity, (oi.prod_price_reg - oi.prod_price) AS discount, COALESCE(( SELECT otp.item_taxes_to_collect FROM order_tax_info oti JOIN order_tax_info_products otp ON oti.taxinfo_id = otp.taxinfo_id WHERE oti.order_id = o.order_id AND otp.pid = oi.prod_pid ORDER BY oti.stamp DESC LIMIT 1 ), 0) AS tax, 0 AS tax_included, COALESCE(ROUND( ((o.summary_shipping - COALESCE(o.summary_discount_shipping, 0)) * (oi.prod_price * oi.qty_ordered) / NULLIF(o.summary_subtotal, 0)), 2 ), 0) as shipping, o.order_cid AS customer, CONCAT(COALESCE(u.firstname, ''), ' ', COALESCE(u.lastname, '')) AS customer_name, CASE WHEN o.order_status = 15 THEN 1 ELSE 0 END AS canceled FROM _order o JOIN order_items oi ON o.order_id = oi.order_id LEFT JOIN users u ON o.order_cid = u.cid WHERE o.order_status >= 15 AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) ORDER BY o.date_placed_onlydate DESC `); let current = 0; const total = rows.length; // Process in batches const BATCH_SIZE = 500; for (let i = 0; i < rows.length; i += BATCH_SIZE) { const batch = rows.slice(i, i + BATCH_SIZE); // Create placeholders for batch insert const placeholders = batch.map(() => '(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)' ).join(','); // Flatten values for batch insert const values = batch.flatMap(row => [ row.order_number, row.product_id, row.SKU, row.date, row.price, row.quantity, row.discount, row.tax, row.tax_included, row.shipping, row.customer, row.customer_name, row.canceled ]); await localConnection.query(` INSERT INTO orders ( order_number, product_id, SKU, date, price, quantity, discount, tax, tax_included, shipping, customer, customer_name, canceled ) VALUES ${placeholders} ON DUPLICATE KEY UPDATE price = VALUES(price), quantity = VALUES(quantity), discount = VALUES(discount), tax = VALUES(tax), tax_included = VALUES(tax_included), shipping = VALUES(shipping), customer_name = VALUES(customer_name), canceled = VALUES(canceled) `, values); current += batch.length; updateProgress(current, total, 'Orders import', startTime); } outputProgress({ status: 'complete', operation: 'Orders import completed', current: total, total, duration: formatDuration((Date.now() - startTime) / 1000) }); } catch (error) { console.error('Error importing orders:', error); throw error; } } async function importPurchaseOrders(prodConnection, localConnection) { outputProgress({ operation: 'Starting purchase orders import', status: 'running' }); const startTime = Date.now(); try { // Get purchase orders from production const [rows] = await prodConnection.query(` SELECT po.po_id, s.companyname as vendor, po.date_ordered as date, po.date_estin as expected_date, pop.pid as product_id, p.itemnumber as sku, COALESCE(rp.cost_each, pop.cost_each, 0) as cost_price, CASE WHEN po.status = 0 THEN 'canceled' WHEN po.status = 1 THEN 'created' WHEN po.status = 10 THEN 'electronically_ready_send' WHEN po.status = 11 THEN 'ordered' WHEN po.status = 12 THEN 'preordered' WHEN po.status = 13 THEN 'electronically_sent' WHEN po.status = 15 THEN 'receiving_started' WHEN po.status = 50 THEN 'closed' ELSE 'unknown' END as status, NULLIF(CONCAT_WS(' ', NULLIF(po.short_note, ''), NULLIF(po.notes, '')), '') as notes, pop.qty_each as ordered, COALESCE(rp.received_qty, 0) as received, rp.received_date, rp.received_by FROM po JOIN po_products pop ON po.po_id = pop.po_id JOIN products p ON pop.pid = p.pid JOIN suppliers s ON po.supplier_id = s.supplierid LEFT JOIN receivings r ON po.po_id = r.po_id LEFT JOIN receivings_products rp ON r.receiving_id = rp.receiving_id AND pop.pid = rp.pid WHERE po.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) ORDER BY po.date_ordered DESC `); let current = 0; const total = rows.length; // Process in batches const BATCH_SIZE = 500; for (let i = 0; i < rows.length; i += BATCH_SIZE) { const batch = rows.slice(i, i + BATCH_SIZE); // Create placeholders for batch insert const placeholders = batch.map(() => '(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)' ).join(','); // Flatten values for batch insert const values = batch.flatMap(row => [ row.po_id, row.vendor, row.date, row.expected_date, row.product_id, row.sku, row.cost_price, row.status, row.notes, row.ordered, row.received, row.received_date, row.received_by ]); await localConnection.query(` INSERT INTO purchase_orders ( po_id, vendor, date, expected_date, product_id, sku, cost_price, status, notes, ordered, received, received_date, received_by ) VALUES ${placeholders} ON DUPLICATE KEY UPDATE vendor = VALUES(vendor), expected_date = VALUES(expected_date), cost_price = VALUES(cost_price), status = VALUES(status), notes = VALUES(notes), ordered = VALUES(ordered), received = VALUES(received), received_date = VALUES(received_date), received_by = VALUES(received_by) `, values); current += batch.length; updateProgress(current, total, 'Purchase orders import', startTime); } outputProgress({ status: 'complete', operation: 'Purchase orders import completed', current: total, total, duration: formatDuration((Date.now() - startTime) / 1000) }); } catch (error) { console.error('Error importing purchase orders:', error); throw error; } } // Modify main function to handle cancellation and avoid process.exit async function main() { let ssh; let prodConnection; let localConnection; try { outputProgress({ status: 'running', operation: 'Starting import process', message: 'Setting up connections...' }); // Set up SSH tunnel and production database connection const tunnel = await setupSshTunnel(); ssh = tunnel.ssh; prodConnection = await mysql.createConnection({ ...prodDbConfig, stream: tunnel.stream }); // Set up local database connection localConnection = await mysql.createPool(localDbConfig); // Check for cancellation after connections if (isImportCancelled) { throw new Error('Import cancelled'); } // Import products (and categories) await importProducts(prodConnection, localConnection); if (isImportCancelled) throw new Error('Import cancelled'); await importProductCategories(prodConnection, localConnection); if (isImportCancelled) throw new Error('Import cancelled'); await importOrders(prodConnection, localConnection); if (isImportCancelled) throw new Error('Import cancelled'); await importPurchaseOrders(prodConnection, localConnection); if (isImportCancelled) throw new Error('Import cancelled'); outputProgress({ status: 'complete', operation: 'Import process completed' }); } catch (error) { console.error('Error during import process:', error); outputProgress({ status: error.message === 'Import cancelled' ? 'cancelled' : 'error', operation: 'Import process', error: error.message }); throw error; // Re-throw to be handled by caller } finally { if (prodConnection) await prodConnection.end(); if (localConnection) await localConnection.end(); if (ssh) ssh.end(); } } // Run the import only if this is the main module if (require.main === module) { main().catch(error => { console.error('Unhandled error in main process:', error); process.exit(1); }); } // Export the functions needed by the route module.exports = { main, outputProgress, cancelImport };