const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress'); const { importMissingProducts } = require('./products'); async function importOrders(prodConnection, localConnection) { outputProgress({ operation: "Starting orders import - Getting total count", status: "running", }); const startTime = Date.now(); const skippedOrders = new Set(); // Store orders that need to be retried const missingProducts = new Set(); // Store products that need to be imported try { // Get last sync info const [syncInfo] = await localConnection.query( "SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'orders'" ); const lastSyncTime = syncInfo?.[0]?.last_sync_timestamp || '1970-01-01'; // First get the column names from the table structure const [columns] = await localConnection.query(` SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'orders' ORDER BY ORDINAL_POSITION `); const columnNames = columns .map((col) => col.COLUMN_NAME) .filter((name) => name !== "id"); // Skip auto-increment ID // Get total count first for progress indication - modified for incremental const [countResult] = await prodConnection.query(` SELECT COUNT(*) as total FROM order_items oi FORCE INDEX (PRIMARY) JOIN _order o FORCE INDEX (PRIMARY) ON oi.order_id = o.order_id WHERE o.order_status >= 15 AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR) AND (o.date_placed_onlydate > ? OR o.stamp > ?) `, [lastSyncTime, lastSyncTime]); const totalOrders = countResult[0].total; outputProgress({ operation: `Starting orders import - Fetching ${totalOrders} orders from production`, status: "running", }); const total = countResult[0].total; let processed = 0; // Process in batches const batchSize = 10000; // Increased from 1000 since order records are small let offset = 0; while (offset < total) { // First get orders without tax info const [orders] = await prodConnection.query(` SELECT oi.order_id as order_number, oi.prod_pid as pid, oi.prod_itemnumber as SKU, o.date_placed_onlydate as date, oi.prod_price_reg as price, oi.qty_ordered as quantity, (oi.prod_price_reg - oi.prod_price) as discount, 0 as tax, 0 as tax_included, ROUND( ((o.summary_shipping - COALESCE(o.summary_discount_shipping, 0)) * (oi.prod_price * oi.qty_ordered) / NULLIF(o.summary_subtotal, 0)), 2 ) as shipping, o.order_cid as customer, CONCAT(o.bill_firstname, ' ', o.bill_lastname) as customer_name, 'pending' as status, CASE WHEN o.order_status = 15 THEN 1 ELSE 0 END as canceled FROM order_items oi FORCE INDEX (PRIMARY) JOIN _order o USE INDEX (date_placed_onlydate, idx_status) ON oi.order_id = o.order_id WHERE o.order_status >= 15 AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR) AND (o.date_placed_onlydate > ? OR o.stamp > ?) LIMIT ? OFFSET ? `, [lastSyncTime, lastSyncTime, batchSize, offset]); // Then get tax info for these orders if (orders.length > 0) { const orderIds = [...new Set(orders.map(o => o.order_number))]; const [taxInfo] = await prodConnection.query(` SELECT oti.order_id, otp.pid, otp.item_taxes_to_collect FROM ( SELECT order_id, MAX(stamp) as latest_stamp FROM order_tax_info USE INDEX (order_id, stamp) WHERE order_id IN (?) GROUP BY order_id ) latest JOIN order_tax_info oti USE INDEX (order_id, stamp) ON oti.order_id = latest.order_id AND oti.stamp = latest.latest_stamp JOIN order_tax_info_products otp FORCE INDEX (PRIMARY) ON oti.taxinfo_id = otp.taxinfo_id `, [orderIds]); // Create a map for quick tax lookup const taxMap = new Map(); taxInfo.forEach(t => { taxMap.set(`${t.order_id}-${t.pid}`, t.item_taxes_to_collect); }); // Add tax info to orders orders.forEach(order => { const taxKey = `${order.order_number}-${order.pid}`; order.tax = taxMap.get(taxKey) || 0; }); } // Check if all products exist before inserting orders const orderProductPids = [...new Set(orders.map((o) => o.pid))]; const [existingProducts] = await localConnection.query( "SELECT pid FROM products WHERE pid IN (?)", [orderProductPids] ); const existingPids = new Set(existingProducts.map((p) => p.pid)); // Filter out orders with missing products and track them const validOrders = orders.filter((order) => { if (!existingPids.has(order.pid)) { missingProducts.add(order.pid); skippedOrders.add(order.order_number); return false; } return true; }); if (validOrders.length > 0) { const placeholders = validOrders .map(() => `(${Array(columnNames.length).fill("?").join(",")})`) .join(","); const updateClauses = columnNames .filter((col) => col !== "order_number") // Don't update primary key .map((col) => `${col} = VALUES(${col})`) .join(","); const query = ` INSERT INTO orders (${columnNames.join(",")}) VALUES ${placeholders} ON DUPLICATE KEY UPDATE ${updateClauses} `; await localConnection.query( query, validOrders.flatMap(order => columnNames.map(col => order[col])) ); } processed += orders.length; offset += batchSize; outputProgress({ status: "running", operation: "Orders import", current: processed, total, elapsed: formatElapsedTime((Date.now() - startTime) / 1000), remaining: estimateRemaining(startTime, processed, total), rate: calculateRate(startTime, processed) }); } // Now handle missing products and retry skipped orders if (missingProducts.size > 0) { outputProgress({ operation: `Found ${missingProducts.size} missing products, importing them now`, status: "running", }); await importMissingProducts(prodConnection, localConnection, [ ...missingProducts, ]); // Retry skipped orders if (skippedOrders.size > 0) { outputProgress({ operation: `Retrying ${skippedOrders.size} skipped orders`, status: "running", }); const [retryOrders] = await prodConnection.query(` SELECT oi.order_id as order_number, oi.prod_pid as pid, oi.prod_itemnumber as SKU, o.date_placed_onlydate as date, oi.prod_price_reg as price, oi.qty_ordered as quantity, (oi.prod_price_reg - oi.prod_price) as discount, 0 as tax, 0 as tax_included, ROUND( ((o.summary_shipping - COALESCE(o.summary_discount_shipping, 0)) * (oi.prod_price * oi.qty_ordered) / NULLIF(o.summary_subtotal, 0)), 2 ) as shipping, o.order_cid as customer, CONCAT(o.bill_firstname, ' ', o.bill_lastname) as customer_name, 'pending' as status, CASE WHEN o.order_status = 15 THEN 1 ELSE 0 END as canceled FROM order_items oi JOIN _order o ON oi.order_id = o.order_id WHERE oi.order_id IN (?) `, [[...skippedOrders]]); if (retryOrders.length > 0) { const retryOrderIds = [...new Set(retryOrders.map(o => o.order_number))]; const [retryTaxInfo] = await prodConnection.query(` SELECT oti.order_id, otp.pid, otp.item_taxes_to_collect FROM ( SELECT order_id, MAX(stamp) as latest_stamp FROM order_tax_info USE INDEX (order_id, stamp) WHERE order_id IN (?) GROUP BY order_id ) latest JOIN order_tax_info oti USE INDEX (order_id, stamp) ON oti.order_id = latest.order_id AND oti.stamp = latest.latest_stamp JOIN order_tax_info_products otp FORCE INDEX (PRIMARY) ON oti.taxinfo_id = otp.taxinfo_id `, [retryOrderIds]); // Create a map for quick tax lookup const taxMap = new Map(); retryTaxInfo.forEach(t => { taxMap.set(`${t.order_id}-${t.pid}`, t.item_taxes_to_collect); }); // Add tax info to orders retryOrders.forEach(order => { const taxKey = `${order.order_number}-${order.pid}`; order.tax = taxMap.get(taxKey) || 0; }); } const placeholders = retryOrders .map(() => `(${Array(columnNames.length).fill("?").join(",")})`) .join(","); const updateClauses = columnNames .filter((col) => col !== "order_number") // Don't update primary key .map((col) => `${col} = VALUES(${col})`) .join(","); const query = ` INSERT INTO orders (${columnNames.join(",")}) VALUES ${placeholders} ON DUPLICATE KEY UPDATE ${updateClauses} `; await localConnection.query( query, retryOrders.flatMap(order => columnNames.map(col => order[col])) ); } } // After successful import, update the sync status await localConnection.query(` INSERT INTO sync_status (table_name, last_sync_timestamp) VALUES ('orders', NOW()) ON DUPLICATE KEY UPDATE last_sync_timestamp = NOW() `); const endTime = Date.now(); const durationSeconds = Math.round((endTime - startTime) / 1000); outputProgress({ status: "complete", operation: "Orders import completed", current: total, total, duration: formatElapsedTime((Date.now() - startTime) / 1000), }); return { status: "complete", totalImported: total, missingProducts: missingProducts.size, retriedOrders: skippedOrders.size, incrementalUpdate: true, lastSyncTime }; } catch (error) { outputProgress({ operation: "Orders import failed", status: "error", error: error.message, }); throw error; } } module.exports = importOrders;