const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress'); const { importMissingProducts, setupTemporaryTables, cleanupTemporaryTables, materializeCalculations } = require('./products'); /** * Imports orders from a production MySQL database to a local PostgreSQL database. * It can run in two modes: * 1. Incremental update mode (default): Only fetch orders that have changed since the last sync time. * 2. Full update mode: Fetch all eligible orders within the last 5 years regardless of timestamp. * * @param {object} prodConnection - A MySQL connection to production DB (MySQL 5.7). * @param {object} localConnection - A MySQL connection to local DB (MySQL 8.0). * @param {boolean} incrementalUpdate - Set to false for a full sync; true for incremental. * * @returns {object} Information about the sync operation. */ async function importOrders(prodConnection, localConnection, incrementalUpdate = true) { const startTime = Date.now(); const skippedOrders = new Set(); const missingProducts = new Set(); let recordsAdded = 0; let recordsUpdated = 0; let processedCount = 0; let importedCount = 0; let totalOrderItems = 0; let totalUniqueOrders = 0; let cumulativeProcessedOrders = 0; try { // Get last sync info const [syncInfo] = await localConnection.query( "SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'orders'" ); const lastSyncTime = syncInfo?.rows?.[0]?.last_sync_timestamp || '1970-01-01'; console.log('Orders: Using last sync time:', lastSyncTime); // First get count of order items - Keep MySQL compatible for production const [[{ total }]] = await prodConnection.query(` SELECT COUNT(*) as total FROM order_items oi USE INDEX (PRIMARY) JOIN _order o ON oi.order_id = o.order_id WHERE o.order_status >= 15 AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) AND o.date_placed_onlydate IS NOT NULL ${incrementalUpdate ? ` AND ( o.stamp > ? OR oi.stamp > ? OR EXISTS ( SELECT 1 FROM order_discount_items odi WHERE odi.order_id = o.order_id AND odi.pid = oi.prod_pid ) OR EXISTS ( SELECT 1 FROM order_tax_info oti JOIN order_tax_info_products otip ON oti.taxinfo_id = otip.taxinfo_id WHERE oti.order_id = o.order_id AND otip.pid = oi.prod_pid AND oti.stamp > ? ) ) ` : ''} `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []); totalOrderItems = total; console.log('Orders: Found changes:', totalOrderItems); // Get order items - Keep MySQL compatible for production const [orderItems] = await prodConnection.query(` SELECT oi.order_id, oi.prod_pid as pid, oi.prod_itemnumber as SKU, oi.prod_price as price, oi.qty_ordered as quantity, COALESCE(oi.prod_price_reg - oi.prod_price, 0) as base_discount, oi.stamp as last_modified FROM order_items oi USE INDEX (PRIMARY) JOIN _order o ON oi.order_id = o.order_id WHERE o.order_status >= 15 AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) AND o.date_placed_onlydate IS NOT NULL ${incrementalUpdate ? ` AND ( o.stamp > ? OR oi.stamp > ? OR EXISTS ( SELECT 1 FROM order_discount_items odi WHERE odi.order_id = o.order_id AND odi.pid = oi.prod_pid ) OR EXISTS ( SELECT 1 FROM order_tax_info oti JOIN order_tax_info_products otip ON oti.taxinfo_id = otip.taxinfo_id WHERE oti.order_id = o.order_id AND otip.pid = oi.prod_pid AND oti.stamp > ? ) ) ` : ''} `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []); console.log('Orders: Processing', orderItems.length, 'order items'); // Create temporary tables in PostgreSQL await localConnection.query(` DROP TABLE IF EXISTS temp_order_items; DROP TABLE IF EXISTS temp_order_meta; DROP TABLE IF EXISTS temp_order_discounts; DROP TABLE IF EXISTS temp_order_taxes; DROP TABLE IF EXISTS temp_order_costs; CREATE TEMP TABLE temp_order_items ( order_id INTEGER NOT NULL, pid INTEGER NOT NULL, SKU VARCHAR(50) NOT NULL, price DECIMAL(10,2) NOT NULL, quantity INTEGER NOT NULL, base_discount DECIMAL(10,2) DEFAULT 0, PRIMARY KEY (order_id, pid) ); CREATE TEMP TABLE temp_order_meta ( order_id INTEGER NOT NULL, date DATE NOT NULL, customer VARCHAR(100) NOT NULL, customer_name VARCHAR(150) NOT NULL, status INTEGER, canceled BOOLEAN, summary_discount DECIMAL(10,2) DEFAULT 0.00, summary_subtotal DECIMAL(10,2) DEFAULT 0.00, PRIMARY KEY (order_id) ); CREATE TEMP TABLE temp_order_discounts ( order_id INTEGER NOT NULL, pid INTEGER NOT NULL, discount DECIMAL(10,2) NOT NULL, PRIMARY KEY (order_id, pid) ); CREATE TEMP TABLE temp_order_taxes ( order_id INTEGER NOT NULL, pid INTEGER NOT NULL, tax DECIMAL(10,2) NOT NULL, PRIMARY KEY (order_id, pid) ); CREATE TEMP TABLE temp_order_costs ( order_id INTEGER NOT NULL, pid INTEGER NOT NULL, costeach DECIMAL(10,3) DEFAULT 0.000, PRIMARY KEY (order_id, pid) ); `); // Insert order items in batches for (let i = 0; i < orderItems.length; i += 5000) { const batch = orderItems.slice(i, Math.min(i + 5000, orderItems.length)); const placeholders = batch.map((_, idx) => `($${idx * 6 + 1}, $${idx * 6 + 2}, $${idx * 6 + 3}, $${idx * 6 + 4}, $${idx * 6 + 5}, $${idx * 6 + 6})` ).join(","); const values = batch.flatMap(item => [ item.order_id, item.pid, item.SKU, item.price, item.quantity, item.base_discount ]); await localConnection.query(` INSERT INTO temp_order_items (order_id, pid, SKU, price, quantity, base_discount) VALUES ${placeholders} ON CONFLICT (order_id, pid) DO UPDATE SET SKU = EXCLUDED.SKU, price = EXCLUDED.price, quantity = EXCLUDED.quantity, base_discount = EXCLUDED.base_discount `, values); processedCount = i + batch.length; outputProgress({ status: "running", operation: "Orders import", message: `Loading order items: ${processedCount} of ${totalOrderItems}`, current: processedCount, total: totalOrderItems }); } // Get unique order IDs const orderIds = [...new Set(orderItems.map(item => item.order_id))]; totalUniqueOrders = orderIds.length; console.log('Total unique order IDs:', totalUniqueOrders); // Reset processed count for order processing phase processedCount = 0; // Get order metadata in batches - Keep MySQL compatible for production for (let i = 0; i < orderIds.length; i += 5000) { const batchIds = orderIds.slice(i, i + 5000); console.log(`Processing batch ${i/5000 + 1}, size: ${batchIds.length}`); const [orders] = await prodConnection.query(` SELECT o.order_id, o.date_placed_onlydate as date, o.order_cid as customer, CONCAT(COALESCE(u.firstname, ''), ' ', COALESCE(u.lastname, '')) as customer_name, o.order_status as status, CASE WHEN o.date_cancelled != '0000-00-00 00:00:00' THEN 1 ELSE 0 END as canceled, o.summary_discount, o.summary_subtotal FROM _order o LEFT JOIN users u ON o.order_cid = u.cid WHERE o.order_id IN (?) `, [batchIds]); // Insert into PostgreSQL temp table if (orders.length > 0) { const placeholders = orders.map((_, idx) => `($${idx * 8 + 1}, $${idx * 8 + 2}, $${idx * 8 + 3}, $${idx * 8 + 4}, $${idx * 8 + 5}, $${idx * 8 + 6}, $${idx * 8 + 7}, $${idx * 8 + 8})` ).join(","); const values = orders.flatMap(order => [ order.order_id, order.date, order.customer, order.customer_name, order.status, order.canceled, order.summary_discount, order.summary_subtotal ]); await localConnection.query(` INSERT INTO temp_order_meta ( order_id, date, customer, customer_name, status, canceled, summary_discount, summary_subtotal ) VALUES ${placeholders} ON CONFLICT (order_id) DO UPDATE SET date = EXCLUDED.date, customer = EXCLUDED.customer, customer_name = EXCLUDED.customer_name, status = EXCLUDED.status, canceled = EXCLUDED.canceled, summary_discount = EXCLUDED.summary_discount, summary_subtotal = EXCLUDED.summary_subtotal `, values); } processedCount = i + orders.length; outputProgress({ status: "running", operation: "Orders import", message: `Loading order metadata: ${processedCount} of ${totalUniqueOrders}`, current: processedCount, total: totalUniqueOrders }); } // Process promotional discounts in batches - Keep MySQL compatible for production for (let i = 0; i < orderIds.length; i += 5000) { const batchIds = orderIds.slice(i, i + 5000); const [discounts] = await prodConnection.query(` SELECT order_id, pid, SUM(amount) as discount FROM order_discount_items WHERE order_id IN (?) GROUP BY order_id, pid `, [batchIds]); if (discounts.length > 0) { const uniqueDiscounts = new Map(); discounts.forEach(d => { const key = `${d.order_id}-${d.pid}`; uniqueDiscounts.set(key, d); }); const values = Array.from(uniqueDiscounts.values()).flatMap(d => [d.order_id, d.pid, d.discount || 0]); if (values.length > 0) { const placeholders = Array.from({length: uniqueDiscounts.size}, (_, idx) => { const base = idx * 3; return `($${base + 1}, $${base + 2}, $${base + 3})`; }).join(","); await localConnection.query(` INSERT INTO temp_order_discounts (order_id, pid, discount) VALUES ${placeholders} ON CONFLICT (order_id, pid) DO UPDATE SET discount = EXCLUDED.discount `, values); } } } // Get tax information in batches - Keep MySQL compatible for production for (let i = 0; i < orderIds.length; i += 5000) { const batchIds = orderIds.slice(i, i + 5000); const [taxes] = await prodConnection.query(` SELECT DISTINCT oti.order_id, otip.pid, otip.item_taxes_to_collect as tax FROM order_tax_info oti JOIN ( SELECT order_id, MAX(stamp) as max_stamp FROM order_tax_info WHERE order_id IN (?) GROUP BY order_id ) latest ON oti.order_id = latest.order_id AND oti.stamp = latest.max_stamp JOIN order_tax_info_products otip ON oti.taxinfo_id = otip.taxinfo_id `, [batchIds]); if (taxes.length > 0) { const uniqueTaxes = new Map(); taxes.forEach(t => { const key = `${t.order_id}-${t.pid}`; uniqueTaxes.set(key, t); }); const values = Array.from(uniqueTaxes.values()).flatMap(t => [t.order_id, t.pid, t.tax]); if (values.length > 0) { const placeholders = Array.from({length: uniqueTaxes.size}, (_, idx) => { const base = idx * 3; return `($${base + 1}, $${base + 2}, $${base + 3})`; }).join(","); await localConnection.query(` INSERT INTO temp_order_taxes (order_id, pid, tax) VALUES ${placeholders} ON CONFLICT (order_id, pid) DO UPDATE SET tax = EXCLUDED.tax `, values); } } } // Get costeach values in batches - Keep MySQL compatible for production for (let i = 0; i < orderIds.length; i += 5000) { const batchIds = orderIds.slice(i, i + 5000); const [costs] = await prodConnection.query(` SELECT oc.orderid as order_id, oc.pid, COALESCE( oc.costeach, (SELECT pi.costeach FROM product_inventory pi WHERE pi.pid = oc.pid AND pi.daterec <= o.date_placed ORDER BY pi.daterec DESC LIMIT 1) ) as costeach FROM order_costs oc JOIN _order o ON oc.orderid = o.order_id WHERE oc.orderid IN (?) `, [batchIds]); if (costs.length > 0) { const placeholders = costs.map((_, idx) => `($${idx * 3 + 1}, $${idx * 3 + 2}, $${idx * 3 + 3})` ).join(","); const values = costs.flatMap(c => [c.order_id, c.pid, c.costeach || 0]); await localConnection.query(` INSERT INTO temp_order_costs (order_id, pid, costeach) VALUES ${placeholders} ON CONFLICT (order_id, pid) DO UPDATE SET costeach = EXCLUDED.costeach `, values); } } // Pre-check all products at once const allOrderPids = [...new Set(orderItems.map(item => item.pid))]; const [existingProducts] = allOrderPids.length > 0 ? await localConnection.query( "SELECT pid FROM products WHERE pid = ANY($1)", [allOrderPids] ) : [[]]; const existingPids = new Set(existingProducts.rows.map(p => p.pid)); // Process in larger batches for (let i = 0; i < orderIds.length; i += 5000) { const batchIds = orderIds.slice(i, i + 5000); // Get combined data for this batch const [orders] = await localConnection.query(` SELECT oi.order_id as order_number, oi.pid, oi.SKU, om.date, oi.price, oi.quantity, oi.base_discount + COALESCE(od.discount, 0) + CASE WHEN om.summary_discount > 0 THEN ROUND((om.summary_discount * (oi.price * oi.quantity)) / NULLIF(om.summary_subtotal, 0), 2) ELSE 0 END as discount, COALESCE(ot.tax, 0) as tax, false as tax_included, 0 as shipping, om.customer, om.customer_name, om.status, om.canceled, COALESCE(tc.costeach, 0) as costeach FROM temp_order_items oi JOIN temp_order_meta om ON oi.order_id = om.order_id LEFT JOIN temp_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid LEFT JOIN temp_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid LEFT JOIN temp_order_costs tc ON oi.order_id = tc.order_id AND oi.pid = tc.pid WHERE oi.order_id = ANY($1) `, [batchIds]); // Filter orders and track missing products const validOrders = []; const processedOrderItems = new Set(); const processedOrders = new Set(); for (const order of orders.rows) { if (!existingPids.has(order.pid)) { missingProducts.add(order.pid); skippedOrders.add(order.order_number); continue; } validOrders.push(order); processedOrderItems.add(`${order.order_number}-${order.pid}`); processedOrders.add(order.order_number); } if (validOrders.length > 0) { const placeholders = validOrders.map((_, idx) => { const base = idx * 15; return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, $${base + 10}, $${base + 11}, $${base + 12}, $${base + 13}, $${base + 14}, $${base + 15})`; }).join(','); const batchValues = validOrders.flatMap(o => [ o.order_number, o.pid, o.SKU, o.date, o.price, o.quantity, o.discount, o.tax, o.tax_included, o.shipping, o.customer, o.customer_name, o.status, o.canceled, o.costeach ]); const [result] = await localConnection.query(` WITH inserted_orders AS ( INSERT INTO orders ( order_number, pid, SKU, date, price, quantity, discount, tax, tax_included, shipping, customer, customer_name, status, canceled, costeach ) VALUES ${placeholders} ON CONFLICT (order_number, pid) DO UPDATE SET SKU = EXCLUDED.SKU, date = EXCLUDED.date, price = EXCLUDED.price, quantity = EXCLUDED.quantity, discount = EXCLUDED.discount, tax = EXCLUDED.tax, tax_included = EXCLUDED.tax_included, shipping = EXCLUDED.shipping, customer = EXCLUDED.customer, customer_name = EXCLUDED.customer_name, status = EXCLUDED.status, canceled = EXCLUDED.canceled, costeach = EXCLUDED.costeach RETURNING xmax, xmin ) SELECT COUNT(*) FILTER (WHERE xmax = 0) as inserted, COUNT(*) FILTER (WHERE xmax <> 0) as updated FROM inserted_orders `, batchValues); const { inserted, updated } = result.rows[0]; recordsAdded += inserted; recordsUpdated += updated; importedCount += processedOrderItems.size; } cumulativeProcessedOrders += processedOrders.size; outputProgress({ status: "running", operation: "Orders import", message: `Imported ${importedCount} order items (${cumulativeProcessedOrders} of ${totalUniqueOrders} orders processed)`, current: cumulativeProcessedOrders, total: totalUniqueOrders, elapsed: formatElapsedTime((Date.now() - startTime) / 1000), remaining: estimateRemaining(startTime, cumulativeProcessedOrders, totalUniqueOrders), rate: calculateRate(startTime, cumulativeProcessedOrders) }); } // Clean up temporary tables await localConnection.query(` DROP TABLE IF EXISTS temp_order_items; DROP TABLE IF EXISTS temp_order_meta; DROP TABLE IF EXISTS temp_order_discounts; DROP TABLE IF EXISTS temp_order_taxes; DROP TABLE IF EXISTS temp_order_costs; `); // Update sync status await localConnection.query(` INSERT INTO sync_status (table_name, last_sync_timestamp) VALUES ('orders', NOW()) ON CONFLICT (table_name) DO UPDATE SET last_sync_timestamp = NOW() `); return { status: "complete", totalImported: Math.floor(importedCount), recordsAdded: recordsAdded || 0, recordsUpdated: Math.floor(recordsUpdated), totalSkipped: skippedOrders.size, missingProducts: missingProducts.size, incrementalUpdate, lastSyncTime }; } catch (error) { console.error("Error during orders import:", error); throw error; } } module.exports = importOrders;