const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics-new/utils/progress'); const { importMissingProducts, setupTemporaryTables, cleanupTemporaryTables, materializeCalculations } = require('./products'); /** * Imports orders from a production MySQL database to a local PostgreSQL database. * It can run in two modes: * 1. Incremental update mode (default): Only fetch orders that have changed since the last sync time. * 2. Full update mode: Fetch all eligible orders within the last 5 years regardless of timestamp. * * @param {object} prodConnection - A MySQL connection to production DB (MySQL 5.7). * @param {object} localConnection - A MySQL connection to local DB (MySQL 8.0). * @param {boolean} incrementalUpdate - Set to false for a full sync; true for incremental. * * @returns {object} Information about the sync operation. */ async function importOrders(prodConnection, localConnection, incrementalUpdate = true) { const startTime = Date.now(); const skippedOrders = new Set(); const missingProducts = new Set(); let recordsAdded = 0; let recordsUpdated = 0; let processedCount = 0; let importedCount = 0; let totalOrderItems = 0; let totalUniqueOrders = 0; let cumulativeProcessedOrders = 0; try { // Begin transaction await localConnection.beginTransaction(); // Get last sync info const [syncInfo] = await localConnection.query( "SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'orders'" ); const lastSyncTime = syncInfo?.rows?.[0]?.last_sync_timestamp || '1970-01-01'; console.log('Orders: Using last sync time:', lastSyncTime); // First get count of order items - Keep MySQL compatible for production const [[{ total }]] = await prodConnection.query(` SELECT COUNT(*) as total FROM order_items oi JOIN _order o ON oi.order_id = o.order_id WHERE o.order_status >= 15 AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) AND o.date_placed_onlydate IS NOT NULL ${incrementalUpdate ? ` AND ( o.stamp > ? OR oi.stamp > ? OR EXISTS ( SELECT 1 FROM order_discount_items odi WHERE odi.order_id = o.order_id AND odi.pid = oi.prod_pid ) OR EXISTS ( SELECT 1 FROM order_tax_info oti JOIN order_tax_info_products otip ON oti.taxinfo_id = otip.taxinfo_id WHERE oti.order_id = o.order_id AND otip.pid = oi.prod_pid AND oti.stamp > ? ) ) ` : ''} `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []); totalOrderItems = total; console.log('Orders: Found changes:', totalOrderItems); // Get order items - Keep MySQL compatible for production console.log('Orders: Starting MySQL query...'); const [orderItems] = await prodConnection.query(` SELECT oi.order_id, oi.prod_pid, COALESCE(NULLIF(TRIM(oi.prod_itemnumber), ''), 'NO-SKU') as SKU, oi.prod_price as price, oi.qty_ordered as quantity, COALESCE(oi.prod_price_reg - oi.prod_price, 0) as base_discount, oi.stamp as last_modified FROM order_items oi JOIN _order o ON oi.order_id = o.order_id WHERE o.order_status >= 15 AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) AND o.date_placed_onlydate IS NOT NULL ${incrementalUpdate ? ` AND ( o.stamp > ? OR oi.stamp > ? OR EXISTS ( SELECT 1 FROM order_discount_items odi WHERE odi.order_id = o.order_id AND odi.pid = oi.prod_pid ) OR EXISTS ( SELECT 1 FROM order_tax_info oti JOIN order_tax_info_products otip ON oti.taxinfo_id = otip.taxinfo_id WHERE oti.order_id = o.order_id AND otip.pid = oi.prod_pid AND oti.stamp > ? ) ) ` : ''} `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []); console.log('Orders: Found', orderItems.length, 'order items to process'); // Create tables in PostgreSQL for data processing await localConnection.query(` DROP TABLE IF EXISTS temp_order_items; DROP TABLE IF EXISTS temp_order_meta; DROP TABLE IF EXISTS temp_order_discounts; DROP TABLE IF EXISTS temp_order_taxes; DROP TABLE IF EXISTS temp_order_costs; CREATE TEMP TABLE temp_order_items ( order_id INTEGER NOT NULL, pid INTEGER NOT NULL, sku TEXT NOT NULL, price NUMERIC(14, 4) NOT NULL, quantity INTEGER NOT NULL, base_discount NUMERIC(14, 4) DEFAULT 0, PRIMARY KEY (order_id, pid) ); CREATE TEMP TABLE temp_order_meta ( order_id INTEGER NOT NULL, date TIMESTAMP WITH TIME ZONE NOT NULL, customer TEXT NOT NULL, customer_name TEXT NOT NULL, status TEXT, canceled BOOLEAN, summary_discount NUMERIC(14, 4) DEFAULT 0.0000, summary_subtotal NUMERIC(14, 4) DEFAULT 0.0000, PRIMARY KEY (order_id) ); CREATE TEMP TABLE temp_order_discounts ( order_id INTEGER NOT NULL, pid INTEGER NOT NULL, discount NUMERIC(14, 4) NOT NULL, PRIMARY KEY (order_id, pid) ); CREATE TEMP TABLE temp_order_taxes ( order_id INTEGER NOT NULL, pid INTEGER NOT NULL, tax NUMERIC(14, 4) NOT NULL, PRIMARY KEY (order_id, pid) ); CREATE TEMP TABLE temp_order_costs ( order_id INTEGER NOT NULL, pid INTEGER NOT NULL, costeach NUMERIC(14, 4) DEFAULT 0.0000, PRIMARY KEY (order_id, pid) ); CREATE INDEX idx_temp_order_items_pid ON temp_order_items(pid); CREATE INDEX idx_temp_order_meta_order_id ON temp_order_meta(order_id); `); // Insert order items in batches for (let i = 0; i < orderItems.length; i += 5000) { const batch = orderItems.slice(i, Math.min(i + 5000, orderItems.length)); const placeholders = batch.map((_, idx) => `($${idx * 6 + 1}, $${idx * 6 + 2}, $${idx * 6 + 3}, $${idx * 6 + 4}, $${idx * 6 + 5}, $${idx * 6 + 6})` ).join(","); const values = batch.flatMap(item => [ item.order_id, item.prod_pid, item.SKU, item.price, item.quantity, item.base_discount ]); await localConnection.query(` INSERT INTO temp_order_items (order_id, pid, sku, price, quantity, base_discount) VALUES ${placeholders} ON CONFLICT (order_id, pid) DO UPDATE SET sku = EXCLUDED.sku, price = EXCLUDED.price, quantity = EXCLUDED.quantity, base_discount = EXCLUDED.base_discount `, values); processedCount = i + batch.length; outputProgress({ status: "running", operation: "Orders import", message: `Loading order items: ${processedCount} of ${totalOrderItems}`, current: processedCount, total: totalOrderItems, elapsed: formatElapsedTime((Date.now() - startTime) / 1000), remaining: estimateRemaining(startTime, processedCount, totalOrderItems), rate: calculateRate(startTime, processedCount) }); } // Get unique order IDs const orderIds = [...new Set(orderItems.map(item => item.order_id))]; totalUniqueOrders = orderIds.length; console.log('Orders: Processing', totalUniqueOrders, 'unique orders'); // Reset processed count for order processing phase processedCount = 0; // Process metadata, discounts, taxes, and costs in parallel const METADATA_BATCH_SIZE = 2000; const PG_BATCH_SIZE = 200; // Add a helper function for title case conversion function toTitleCase(str) { if (!str) return ''; return str.toLowerCase().split(' ').map(word => { return word.charAt(0).toUpperCase() + word.slice(1); }).join(' '); } const processMetadataBatch = async (batchIds) => { const [orders] = await prodConnection.query(` SELECT o.order_id, o.date_placed_onlydate as date, o.order_cid as customer, CONCAT(COALESCE(u.firstname, ''), ' ', COALESCE(u.lastname, '')) as customer_name, o.order_status as status, CASE WHEN o.date_cancelled != '0000-00-00 00:00:00' THEN 1 ELSE 0 END as canceled, o.summary_discount, o.summary_subtotal FROM _order o LEFT JOIN users u ON o.order_cid = u.cid WHERE o.order_id IN (?) `, [batchIds]); // Process in sub-batches for PostgreSQL for (let j = 0; j < orders.length; j += PG_BATCH_SIZE) { const subBatch = orders.slice(j, j + PG_BATCH_SIZE); if (subBatch.length === 0) continue; const placeholders = subBatch.map((_, idx) => `($${idx * 8 + 1}, $${idx * 8 + 2}, $${idx * 8 + 3}, $${idx * 8 + 4}, $${idx * 8 + 5}, $${idx * 8 + 6}, $${idx * 8 + 7}, $${idx * 8 + 8})` ).join(","); const values = subBatch.flatMap(order => [ order.order_id, new Date(order.date), // Convert to TIMESTAMP WITH TIME ZONE order.customer, toTitleCase(order.customer_name) || '', order.status.toString(), // Convert status to TEXT order.canceled, order.summary_discount || 0, order.summary_subtotal || 0 ]); await localConnection.query(` INSERT INTO temp_order_meta ( order_id, date, customer, customer_name, status, canceled, summary_discount, summary_subtotal ) VALUES ${placeholders} ON CONFLICT (order_id) DO UPDATE SET date = EXCLUDED.date, customer = EXCLUDED.customer, customer_name = EXCLUDED.customer_name, status = EXCLUDED.status, canceled = EXCLUDED.canceled, summary_discount = EXCLUDED.summary_discount, summary_subtotal = EXCLUDED.summary_subtotal `, values); } }; const processDiscountsBatch = async (batchIds) => { const [discounts] = await prodConnection.query(` SELECT order_id, pid, SUM(amount) as discount FROM order_discount_items WHERE order_id IN (?) GROUP BY order_id, pid `, [batchIds]); if (discounts.length === 0) return; for (let j = 0; j < discounts.length; j += PG_BATCH_SIZE) { const subBatch = discounts.slice(j, j + PG_BATCH_SIZE); if (subBatch.length === 0) continue; const placeholders = subBatch.map((_, idx) => `($${idx * 3 + 1}, $${idx * 3 + 2}, $${idx * 3 + 3})` ).join(","); const values = subBatch.flatMap(d => [ d.order_id, d.pid, d.discount || 0 ]); await localConnection.query(` INSERT INTO temp_order_discounts (order_id, pid, discount) VALUES ${placeholders} ON CONFLICT (order_id, pid) DO UPDATE SET discount = EXCLUDED.discount `, values); } }; const processTaxesBatch = async (batchIds) => { // Optimized tax query to avoid subquery const [taxes] = await prodConnection.query(` SELECT oti.order_id, otip.pid, otip.item_taxes_to_collect as tax FROM ( SELECT order_id, MAX(taxinfo_id) as latest_taxinfo_id FROM order_tax_info WHERE order_id IN (?) GROUP BY order_id ) latest_info JOIN order_tax_info oti ON oti.order_id = latest_info.order_id AND oti.taxinfo_id = latest_info.latest_taxinfo_id JOIN order_tax_info_products otip ON oti.taxinfo_id = otip.taxinfo_id `, [batchIds]); if (taxes.length === 0) return; for (let j = 0; j < taxes.length; j += PG_BATCH_SIZE) { const subBatch = taxes.slice(j, j + PG_BATCH_SIZE); if (subBatch.length === 0) continue; const placeholders = subBatch.map((_, idx) => `($${idx * 3 + 1}, $${idx * 3 + 2}, $${idx * 3 + 3})` ).join(","); const values = subBatch.flatMap(t => [ t.order_id, t.pid, t.tax || 0 ]); await localConnection.query(` INSERT INTO temp_order_taxes (order_id, pid, tax) VALUES ${placeholders} ON CONFLICT (order_id, pid) DO UPDATE SET tax = EXCLUDED.tax `, values); } }; const processCostsBatch = async (batchIds) => { // Modified query to ensure one row per order_id/pid by using a subquery const [costs] = await prodConnection.query(` SELECT oc.orderid as order_id, oc.pid, oc.costeach FROM order_costs oc INNER JOIN ( SELECT orderid, pid, MAX(id) as max_id FROM order_costs WHERE orderid IN (?) AND pending = 0 GROUP BY orderid, pid ) latest ON oc.orderid = latest.orderid AND oc.pid = latest.pid AND oc.id = latest.max_id `, [batchIds]); if (costs.length === 0) return; for (let j = 0; j < costs.length; j += PG_BATCH_SIZE) { const subBatch = costs.slice(j, j + PG_BATCH_SIZE); if (subBatch.length === 0) continue; const placeholders = subBatch.map((_, idx) => `($${idx * 3 + 1}, $${idx * 3 + 2}, $${idx * 3 + 3})` ).join(","); const values = subBatch.flatMap(c => [ c.order_id, c.pid, c.costeach || 0 ]); await localConnection.query(` INSERT INTO temp_order_costs (order_id, pid, costeach) VALUES ${placeholders} ON CONFLICT (order_id, pid) DO UPDATE SET costeach = EXCLUDED.costeach `, values); } }; // Process all data types in parallel for each batch for (let i = 0; i < orderIds.length; i += METADATA_BATCH_SIZE) { const batchIds = orderIds.slice(i, i + METADATA_BATCH_SIZE); await Promise.all([ processMetadataBatch(batchIds), processDiscountsBatch(batchIds), processTaxesBatch(batchIds), processCostsBatch(batchIds) ]); processedCount = i + batchIds.length; outputProgress({ status: "running", operation: "Orders import", message: `Loading order data: ${processedCount} of ${totalUniqueOrders}`, current: processedCount, total: totalUniqueOrders, elapsed: formatElapsedTime((Date.now() - startTime) / 1000), remaining: estimateRemaining(startTime, processedCount, totalUniqueOrders), rate: calculateRate(startTime, processedCount) }); } // Pre-check all products at once const allOrderPids = [...new Set(orderItems.map(item => item.prod_pid))]; console.log('Orders: Checking', allOrderPids.length, 'unique products'); const [existingProducts] = allOrderPids.length > 0 ? await localConnection.query( "SELECT pid FROM products WHERE pid = ANY($1::bigint[])", [allOrderPids] ) : [[]]; const existingPids = new Set(existingProducts.rows.map(p => p.pid)); // Process in smaller batches for (let i = 0; i < orderIds.length; i += 1000) { const batchIds = orderIds.slice(i, i + 1000); // Get combined data for this batch in sub-batches const PG_BATCH_SIZE = 100; // Process 100 records at a time for (let j = 0; j < batchIds.length; j += PG_BATCH_SIZE) { const subBatchIds = batchIds.slice(j, j + PG_BATCH_SIZE); const [orders] = await localConnection.query(` WITH order_totals AS ( SELECT oi.order_id, oi.pid, SUM(COALESCE(od.discount, 0)) as promo_discount, COALESCE(ot.tax, 0) as total_tax, COALESCE(oc.costeach, oi.price * 0.5) as costeach FROM temp_order_items oi LEFT JOIN temp_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid LEFT JOIN temp_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid LEFT JOIN temp_order_costs oc ON oi.order_id = oc.order_id AND oi.pid = oc.pid GROUP BY oi.order_id, oi.pid, ot.tax, oc.costeach ) SELECT oi.order_id as order_number, oi.pid::bigint as pid, oi.sku, om.date, oi.price, oi.quantity, (oi.base_discount + COALESCE(ot.promo_discount, 0) + CASE WHEN om.summary_discount > 0 AND om.summary_subtotal > 0 THEN ROUND((om.summary_discount * (oi.price * oi.quantity)) / NULLIF(om.summary_subtotal, 0), 2) ELSE 0 END)::NUMERIC(14, 4) as discount, COALESCE(ot.total_tax, 0)::NUMERIC(14, 4) as tax, false as tax_included, 0 as shipping, om.customer, om.customer_name, om.status, om.canceled, COALESCE(ot.costeach, oi.price * 0.5)::NUMERIC(14, 4) as costeach FROM ( SELECT DISTINCT ON (order_id, pid) order_id, pid, sku, price, quantity, base_discount FROM temp_order_items WHERE order_id = ANY($1) ORDER BY order_id, pid ) oi JOIN temp_order_meta om ON oi.order_id = om.order_id LEFT JOIN order_totals ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid ORDER BY oi.order_id, oi.pid `, [subBatchIds]); // Filter orders and track missing products const validOrders = []; const processedOrderItems = new Set(); const processedOrders = new Set(); for (const order of orders.rows) { if (!existingPids.has(order.pid)) { missingProducts.add(order.pid); skippedOrders.add(order.order_number); continue; } validOrders.push(order); processedOrderItems.add(`${order.order_number}-${order.pid}`); processedOrders.add(order.order_number); } // Process valid orders in smaller sub-batches const FINAL_BATCH_SIZE = 50; for (let k = 0; k < validOrders.length; k += FINAL_BATCH_SIZE) { const subBatch = validOrders.slice(k, k + FINAL_BATCH_SIZE); const placeholders = subBatch.map((_, idx) => { const base = idx * 15; // 15 columns including costeach return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, $${base + 10}, $${base + 11}, $${base + 12}, $${base + 13}, $${base + 14}, $${base + 15})`; }).join(','); const batchValues = subBatch.flatMap(o => [ o.order_number, o.pid, o.sku || 'NO-SKU', o.date, // This is now a TIMESTAMP WITH TIME ZONE o.price, o.quantity, o.discount, o.tax, o.tax_included, o.shipping, o.customer, o.customer_name, o.status.toString(), // Convert status to TEXT o.canceled, o.costeach ]); const [result] = await localConnection.query(` WITH inserted_orders AS ( INSERT INTO orders ( order_number, pid, sku, date, price, quantity, discount, tax, tax_included, shipping, customer, customer_name, status, canceled, costeach ) VALUES ${placeholders} ON CONFLICT (order_number, pid) DO UPDATE SET sku = EXCLUDED.sku, date = EXCLUDED.date, price = EXCLUDED.price, quantity = EXCLUDED.quantity, discount = EXCLUDED.discount, tax = EXCLUDED.tax, tax_included = EXCLUDED.tax_included, shipping = EXCLUDED.shipping, customer = EXCLUDED.customer, customer_name = EXCLUDED.customer_name, status = EXCLUDED.status, canceled = EXCLUDED.canceled, costeach = EXCLUDED.costeach RETURNING xmax = 0 as inserted ) SELECT COUNT(*) FILTER (WHERE inserted) as inserted, COUNT(*) FILTER (WHERE NOT inserted) as updated FROM inserted_orders `, batchValues); const { inserted, updated } = result.rows[0]; recordsAdded += parseInt(inserted) || 0; recordsUpdated += parseInt(updated) || 0; importedCount += subBatch.length; } cumulativeProcessedOrders += processedOrders.size; outputProgress({ status: "running", operation: "Orders import", message: `Importing orders: ${cumulativeProcessedOrders} of ${totalUniqueOrders}`, current: cumulativeProcessedOrders, total: totalUniqueOrders, elapsed: formatElapsedTime((Date.now() - startTime) / 1000), remaining: estimateRemaining(startTime, cumulativeProcessedOrders, totalUniqueOrders), rate: calculateRate(startTime, cumulativeProcessedOrders) }); } } // Update sync status await localConnection.query(` INSERT INTO sync_status (table_name, last_sync_timestamp) VALUES ('orders', NOW()) ON CONFLICT (table_name) DO UPDATE SET last_sync_timestamp = NOW() `); // Cleanup temporary tables await localConnection.query(` DROP TABLE IF EXISTS temp_order_items; DROP TABLE IF EXISTS temp_order_meta; DROP TABLE IF EXISTS temp_order_discounts; DROP TABLE IF EXISTS temp_order_taxes; DROP TABLE IF EXISTS temp_order_costs; `); // Commit transaction await localConnection.commit(); return { status: "complete", totalImported: Math.floor(importedCount) || 0, recordsAdded: parseInt(recordsAdded) || 0, recordsUpdated: parseInt(recordsUpdated) || 0, totalSkipped: skippedOrders.size || 0, missingProducts: missingProducts.size || 0, incrementalUpdate, lastSyncTime }; } catch (error) { console.error("Error during orders import:", error); // Rollback transaction try { await localConnection.rollback(); } catch (rollbackError) { console.error("Error during rollback:", rollbackError); } throw error; } } module.exports = importOrders;