diff --git a/inventory-server/scripts/import/orders.js b/inventory-server/scripts/import/orders.js index 45d16aa..081e0d2 100644 --- a/inventory-server/scripts/import/orders.js +++ b/inventory-server/scripts/import/orders.js @@ -1,158 +1,188 @@ const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress'); const { importMissingProducts } = require('./products'); -async function importOrders(prodConnection, localConnection) { - outputProgress({ - operation: "Starting orders import - Getting total count", - status: "running", - }); - +/** + * Imports orders from a production MySQL database to a local MySQL database. + * It can run in two modes: + * 1. Incremental update mode (default): Only fetch orders that have changed since the last sync time. + * 2. Full update mode: Fetch all eligible orders within the last 5 years regardless of timestamp. + * + * @param {object} prodConnection - A MySQL connection to production DB (MySQL 5.7). + * @param {object} localConnection - A MySQL connection to local DB (MySQL 8.0). + * @param {boolean} incrementalUpdate - Set to false for a full sync; true for incremental. + * + * @returns {object} Information about the sync operation. + */ +async function importOrders(prodConnection, localConnection, incrementalUpdate = true) { const startTime = Date.now(); - const skippedOrders = new Set(); // Store orders that need to be retried - const missingProducts = new Set(); // Store products that need to be imported + const skippedOrders = new Set(); + const missingProducts = new Set(); try { - // Get last sync info + // Get the last sync time const [syncInfo] = await localConnection.query( "SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'orders'" ); const lastSyncTime = syncInfo?.[0]?.last_sync_timestamp || '1970-01-01'; - // First get the column names from the table structure + // Retrieve column names for the 'orders' table, skip 'id' since it's auto-increment const [columns] = await localConnection.query(` SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'orders' ORDER BY ORDINAL_POSITION `); - const columnNames = columns - .map((col) => col.COLUMN_NAME) - .filter((name) => name !== "id"); // Skip auto-increment ID + .map(col => col.COLUMN_NAME) + .filter(name => name !== "id"); - // Get total count first for progress indication - modified for incremental - const [countResult] = await prodConnection.query(` - SELECT COUNT(*) as total - FROM order_items oi FORCE INDEX (PRIMARY) - JOIN _order o FORCE INDEX (PRIMARY) ON oi.order_id = o.order_id - WHERE o.order_status >= 15 - AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR) - AND (o.date_placed_onlydate > ? - OR o.stamp > ?) - `, [lastSyncTime, lastSyncTime]); + // Build query clauses for incremental vs. full update + const incrementalWhereClause = incrementalUpdate + ? `AND ( + o.stamp > ? + OR o.date_modified > ? + OR o.date_placed > ? + OR o.date_shipped > ? + OR oi.stamp > ? + )` + : ""; + const incrementalParams = incrementalUpdate + ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] + : []; - const totalOrders = countResult[0].total; - - outputProgress({ - operation: `Starting orders import - Fetching ${totalOrders} orders from production`, - status: "running", - }); - - const total = countResult[0].total; - let processed = 0; - - // Process in batches - const batchSize = 10000; // Increased from 1000 since order records are small - let offset = 0; - - while (offset < total) { - // First get orders without tax info - const [orders] = await prodConnection.query(` - SELECT - oi.order_id as order_number, - oi.prod_pid as pid, - oi.prod_itemnumber as SKU, - o.date_placed_onlydate as date, - oi.prod_price_reg as price, - oi.qty_ordered as quantity, - (oi.prod_price_reg - oi.prod_price) as discount, - 0 as tax, - 0 as tax_included, - ROUND( - ((o.summary_shipping - COALESCE(o.summary_discount_shipping, 0)) * - (oi.prod_price * oi.qty_ordered) / NULLIF(o.summary_subtotal, 0)), 2 - ) as shipping, - o.order_cid as customer, - CONCAT(o.bill_firstname, ' ', o.bill_lastname) as customer_name, - 'pending' as status, - CASE WHEN o.order_status = 15 THEN 1 ELSE 0 END as canceled - FROM order_items oi - FORCE INDEX (PRIMARY) - JOIN _order o USE INDEX (date_placed_onlydate, idx_status) + // Count how many orders we need to process + const [countResult] = await prodConnection.query( + ` + SELECT COUNT(*) AS total + FROM order_items oi USE INDEX (PRIMARY) + JOIN _order o USE INDEX (PRIMARY) ON oi.order_id = o.order_id WHERE o.order_status >= 15 AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR) - AND (o.date_placed_onlydate > ? - OR o.stamp > ?) - LIMIT ? OFFSET ? - `, [lastSyncTime, lastSyncTime, batchSize, offset]); + ${incrementalWhereClause} + `, + incrementalParams + ); - // Then get tax info for these orders + const total = countResult[0].total; + outputProgress({ + operation: `Starting ${incrementalUpdate ? 'incremental' : 'full'} orders import - Fetching ${total} orders`, + status: "running", + }); + + let processed = 0; + // Increase or decrease this if you find a more optimal size + const batchSize = 20000; + let offset = 0; + + // Process in batches for memory efficiency + while (offset < total) { + // Fetch orders (initially with tax set to 0, to be updated later) + const [orders] = await prodConnection.query( + ` + SELECT + oi.order_id AS order_number, + oi.prod_pid AS pid, + oi.prod_itemnumber AS SKU, + o.date_placed_onlydate AS date, + oi.prod_price_reg AS price, + oi.qty_ordered AS quantity, + (oi.prod_price_reg - oi.prod_price) AS discount, + 0 AS tax, + 0 AS tax_included, + ROUND( + ( + (o.summary_shipping - COALESCE(o.summary_discount_shipping, 0)) + * (oi.prod_price * oi.qty_ordered) + ) / NULLIF(o.summary_subtotal, 0), + 2 + ) AS shipping, + o.order_cid AS customer, + CONCAT(o.bill_firstname, ' ', o.bill_lastname) AS customer_name, + 'pending' AS status, + CASE WHEN o.order_status = 15 THEN 1 ELSE 0 END AS canceled + FROM order_items oi + FORCE INDEX (PRIMARY) + JOIN _order o + ON oi.order_id = o.order_id + WHERE o.order_status >= 15 + AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR) + ${incrementalWhereClause} + LIMIT ? OFFSET ? + `, + [...incrementalParams, batchSize, offset] + ); + + // Fetch the latest tax info for these orders if (orders.length > 0) { const orderIds = [...new Set(orders.map(o => o.order_number))]; const [taxInfo] = await prodConnection.query(` SELECT oti.order_id, otp.pid, otp.item_taxes_to_collect FROM ( - SELECT order_id, MAX(stamp) as latest_stamp - FROM order_tax_info USE INDEX (order_id, stamp) + SELECT order_id, MAX(stamp) AS latest_stamp + FROM order_tax_info WHERE order_id IN (?) GROUP BY order_id - ) latest - JOIN order_tax_info oti USE INDEX (order_id, stamp) + ) latest + JOIN order_tax_info oti ON oti.order_id = latest.order_id AND oti.stamp = latest.latest_stamp - JOIN order_tax_info_products otp FORCE INDEX (PRIMARY) + JOIN order_tax_info_products otp ON oti.taxinfo_id = otp.taxinfo_id `, [orderIds]); - // Create a map for quick tax lookup + // Map (order_id-pid) -> tax amount const taxMap = new Map(); taxInfo.forEach(t => { taxMap.set(`${t.order_id}-${t.pid}`, t.item_taxes_to_collect); }); - // Add tax info to orders + // Merge tax into the orders array orders.forEach(order => { - const taxKey = `${order.order_number}-${order.pid}`; - order.tax = taxMap.get(taxKey) || 0; + const key = `${order.order_number}-${order.pid}`; + if (taxMap.has(key)) { + order.tax = taxMap.get(key) || 0; + } }); } - // Check if all products exist before inserting orders - const orderProductPids = [...new Set(orders.map((o) => o.pid))]; + // Check local DB for existing products to ensure we don't insert orders for missing products + const orderProductPids = [...new Set(orders.map(o => o.pid))]; const [existingProducts] = await localConnection.query( "SELECT pid FROM products WHERE pid IN (?)", [orderProductPids] ); - const existingPids = new Set(existingProducts.map((p) => p.pid)); + const existingPids = new Set(existingProducts.map(p => p.pid)); - // Filter out orders with missing products and track them - const validOrders = orders.filter((order) => { + // Separate valid orders from those referencing missing products + const validOrders = []; + for (const order of orders) { if (!existingPids.has(order.pid)) { missingProducts.add(order.pid); skippedOrders.add(order.order_number); - return false; + } else { + validOrders.push(order); } - return true; - }); + } + // Bulk insert valid orders if (validOrders.length > 0) { const placeholders = validOrders .map(() => `(${Array(columnNames.length).fill("?").join(",")})`) .join(","); const updateClauses = columnNames - .filter((col) => col !== "order_number") // Don't update primary key - .map((col) => `${col} = VALUES(${col})`) + .filter(col => col !== "order_number") // don't overwrite primary key + .map(col => `${col} = VALUES(${col})`) .join(","); - const query = ` + const upsertQuery = ` INSERT INTO orders (${columnNames.join(",")}) VALUES ${placeholders} ON DUPLICATE KEY UPDATE ${updateClauses} `; await localConnection.query( - query, + upsertQuery, validOrders.flatMap(order => columnNames.map(col => order[col])) ); } @@ -171,18 +201,17 @@ async function importOrders(prodConnection, localConnection) { }); } - // Now handle missing products and retry skipped orders + // If we found missing products, import them and retry the skipped orders if (missingProducts.size > 0) { outputProgress({ operation: `Found ${missingProducts.size} missing products, importing them now`, status: "running", }); - await importMissingProducts(prodConnection, localConnection, [ - ...missingProducts, - ]); + // Import missing products + await importMissingProducts(prodConnection, localConnection, [...missingProducts]); - // Retry skipped orders + // Retry orders that were skipped due to missing products if (skippedOrders.size > 0) { outputProgress({ operation: `Retrying ${skippedOrders.size} skipped orders`, @@ -191,95 +220,100 @@ async function importOrders(prodConnection, localConnection) { const [retryOrders] = await prodConnection.query(` SELECT - oi.order_id as order_number, - oi.prod_pid as pid, - oi.prod_itemnumber as SKU, - o.date_placed_onlydate as date, - oi.prod_price_reg as price, - oi.qty_ordered as quantity, - (oi.prod_price_reg - oi.prod_price) as discount, - 0 as tax, - 0 as tax_included, + oi.order_id AS order_number, + oi.prod_pid AS pid, + oi.prod_itemnumber AS SKU, + o.date_placed_onlydate AS date, + oi.prod_price_reg AS price, + oi.qty_ordered AS quantity, + (oi.prod_price_reg - oi.prod_price) AS discount, + 0 AS tax, + 0 AS tax_included, ROUND( - ((o.summary_shipping - COALESCE(o.summary_discount_shipping, 0)) * - (oi.prod_price * oi.qty_ordered) / NULLIF(o.summary_subtotal, 0)), 2 - ) as shipping, - o.order_cid as customer, - CONCAT(o.bill_firstname, ' ', o.bill_lastname) as customer_name, - 'pending' as status, - CASE WHEN o.order_status = 15 THEN 1 ELSE 0 END as canceled + ( + (o.summary_shipping - COALESCE(o.summary_discount_shipping, 0)) + * (oi.prod_price * oi.qty_ordered) + ) / NULLIF(o.summary_subtotal, 0), + 2 + ) AS shipping, + o.order_cid AS customer, + CONCAT(o.bill_firstname, ' ', o.bill_lastname) AS customer_name, + 'pending' AS status, + CASE WHEN o.order_status = 15 THEN 1 ELSE 0 END AS canceled FROM order_items oi JOIN _order o ON oi.order_id = o.order_id WHERE oi.order_id IN (?) `, [[...skippedOrders]]); if (retryOrders.length > 0) { + // Fetch tax data for these specific retry orders const retryOrderIds = [...new Set(retryOrders.map(o => o.order_number))]; const [retryTaxInfo] = await prodConnection.query(` SELECT oti.order_id, otp.pid, otp.item_taxes_to_collect FROM ( - SELECT order_id, MAX(stamp) as latest_stamp - FROM order_tax_info USE INDEX (order_id, stamp) + SELECT order_id, MAX(stamp) AS latest_stamp + FROM order_tax_info WHERE order_id IN (?) GROUP BY order_id - ) latest - JOIN order_tax_info oti USE INDEX (order_id, stamp) + ) latest + JOIN order_tax_info oti ON oti.order_id = latest.order_id AND oti.stamp = latest.latest_stamp - JOIN order_tax_info_products otp FORCE INDEX (PRIMARY) + JOIN order_tax_info_products otp ON oti.taxinfo_id = otp.taxinfo_id `, [retryOrderIds]); - // Create a map for quick tax lookup const taxMap = new Map(); retryTaxInfo.forEach(t => { taxMap.set(`${t.order_id}-${t.pid}`, t.item_taxes_to_collect); }); - // Add tax info to orders retryOrders.forEach(order => { - const taxKey = `${order.order_number}-${order.pid}`; - order.tax = taxMap.get(taxKey) || 0; + const key = `${order.order_number}-${order.pid}`; + if (taxMap.has(key)) { + order.tax = taxMap.get(key) || 0; + } }); + + const placeholders = retryOrders + .map(() => `(${Array(columnNames.length).fill("?").join(",")})`) + .join(","); + const updateClauses = columnNames + .filter(col => col !== "order_number") + .map(col => `${col} = VALUES(${col})`) + .join(","); + + const upsertQuery = ` + INSERT INTO orders (${columnNames.join(",")}) + VALUES ${placeholders} + ON DUPLICATE KEY UPDATE ${updateClauses} + `; + + await localConnection.query( + upsertQuery, + retryOrders.flatMap(order => columnNames.map(col => order[col])) + ); } - - const placeholders = retryOrders - .map(() => `(${Array(columnNames.length).fill("?").join(",")})`) - .join(","); - const updateClauses = columnNames - .filter((col) => col !== "order_number") // Don't update primary key - .map((col) => `${col} = VALUES(${col})`) - .join(","); - - const query = ` - INSERT INTO orders (${columnNames.join(",")}) - VALUES ${placeholders} - ON DUPLICATE KEY UPDATE ${updateClauses} - `; - - await localConnection.query( - query, - retryOrders.flatMap(order => columnNames.map(col => order[col])) - ); } } - // After successful import, update the sync status + // Update the sync timestamp await localConnection.query(` INSERT INTO sync_status (table_name, last_sync_timestamp) VALUES ('orders', NOW()) - ON DUPLICATE KEY UPDATE last_sync_timestamp = NOW() + ON DUPLICATE KEY UPDATE + last_sync_timestamp = NOW(), + last_sync_id = LAST_INSERT_ID(last_sync_id) `); const endTime = Date.now(); - const durationSeconds = Math.round((endTime - startTime) / 1000); outputProgress({ status: "complete", - operation: "Orders import completed", + operation: `${incrementalUpdate ? 'Incremental' : 'Full'} orders import completed`, current: total, total, - duration: formatElapsedTime((Date.now() - startTime) / 1000), + duration: formatElapsedTime((endTime - startTime) / 1000), }); return { @@ -287,12 +321,12 @@ async function importOrders(prodConnection, localConnection) { totalImported: total, missingProducts: missingProducts.size, retriedOrders: skippedOrders.size, - incrementalUpdate: true, + incrementalUpdate, lastSyncTime }; } catch (error) { outputProgress({ - operation: "Orders import failed", + operation: `${incrementalUpdate ? 'Incremental' : 'Full'} orders import failed`, status: "error", error: error.message, }); @@ -300,4 +334,4 @@ async function importOrders(prodConnection, localConnection) { } } -module.exports = importOrders; \ No newline at end of file +module.exports = importOrders; diff --git a/inventory-server/scripts/import/products.js b/inventory-server/scripts/import/products.js index 2ff588c..1a0b777 100644 --- a/inventory-server/scripts/import/products.js +++ b/inventory-server/scripts/import/products.js @@ -456,16 +456,32 @@ async function importMissingProducts(prodConnection, localConnection, missingPid // Materialize calculations for missing products await localConnection.query(` INSERT INTO temp_inventory_status - WITH product_stock AS ( + SELECT + p.pid, + COALESCE(si.available_local, 0) - COALESCE(ps.pending_qty, 0) as stock_quantity, + COALESCE(ps.pending_qty, 0) as pending_qty, + COALESCE(ci.onpreorder, 0) as preorder_count, + COALESCE(pnb.inventory, 0) as notions_inv_count + FROM products p + LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0 + LEFT JOIN current_inventory ci ON p.pid = ci.pid + LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid + LEFT JOIN ( SELECT oi.prod_pid, SUM(oi.qty_ordered - oi.qty_placed) as pending_qty FROM order_items oi JOIN _order o ON oi.order_id = o.order_id WHERE oi.prod_pid IN (?) - AND [rest of conditions] + AND o.date_placed != '0000-00-00 00:00:00' + AND o.date_shipped = '0000-00-00 00:00:00' + AND oi.pick_finished = 0 + AND oi.qty_back = 0 + AND o.order_status != 15 + AND o.order_status < 90 + AND oi.qty_ordered >= oi.qty_placed + AND oi.qty_ordered > 0 GROUP BY oi.prod_pid - ) - SELECT [same as above] + ) ps ON p.pid = ps.prod_pid WHERE p.pid IN (?) `, [missingPids, missingPids]); diff --git a/inventory-server/scripts/import/purchase-orders.js b/inventory-server/scripts/import/purchase-orders.js index 4dff5f0..8019ee9 100644 --- a/inventory-server/scripts/import/purchase-orders.js +++ b/inventory-server/scripts/import/purchase-orders.js @@ -1,6 +1,6 @@ const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress'); -async function importPurchaseOrders(prodConnection, localConnection) { +async function importPurchaseOrders(prodConnection, localConnection, incrementalUpdate = true) { const startTime = Date.now(); try { @@ -11,7 +11,7 @@ async function importPurchaseOrders(prodConnection, localConnection) { const lastSyncTime = syncInfo?.[0]?.last_sync_timestamp || '1970-01-01'; outputProgress({ - operation: "Starting purchase orders import - Initializing", + operation: `Starting ${incrementalUpdate ? 'incremental' : 'full'} purchase orders import`, status: "running", }); @@ -26,7 +26,23 @@ async function importPurchaseOrders(prodConnection, localConnection) { .map((col) => col.COLUMN_NAME) .filter((name) => name !== "id"); - // First get all relevant PO IDs with basic info - modified for incremental + // Build incremental conditions + const incrementalWhereClause = incrementalUpdate + ? `AND ( + p.stamp > ? + OR p.date_modified > ? + OR p.date_ordered > ? + OR p.date_estin > ? + OR r.stamp > ? + OR rp.stamp > ? + OR rp.received_date > ? + )` + : ""; + const incrementalParams = incrementalUpdate + ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] + : []; + + // First get all relevant PO IDs with basic info const [[{ total }]] = await prodConnection.query(` SELECT COUNT(*) as total FROM ( @@ -36,19 +52,16 @@ async function importPurchaseOrders(prodConnection, localConnection) { JOIN po_products pop ON p.po_id = pop.po_id JOIN suppliers s ON p.supplier_id = s.supplierid WHERE p.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR) - AND (p.date_ordered > ? - OR p.stamp > ? - OR p.date_modified > ?) + ${incrementalWhereClause} UNION SELECT DISTINCT r.receiving_id as po_id, rp.pid FROM receivings_products rp USE INDEX (received_date) LEFT JOIN receivings r ON r.receiving_id = rp.receiving_id WHERE rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR) - AND (rp.received_date > ? - OR rp.stamp > ?) + ${incrementalWhereClause} ) all_items - `, [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime]); + `, [...incrementalParams, ...incrementalParams]); const [poList] = await prodConnection.query(` SELECT DISTINCT @@ -294,11 +307,13 @@ async function importPurchaseOrders(prodConnection, localConnection) { } } - // After successful import, update sync status + // Update sync status with proper incrementing of last_sync_id await localConnection.query(` INSERT INTO sync_status (table_name, last_sync_timestamp) VALUES ('purchase_orders', NOW()) - ON DUPLICATE KEY UPDATE last_sync_timestamp = NOW() + ON DUPLICATE KEY UPDATE + last_sync_timestamp = NOW(), + last_sync_id = LAST_INSERT_ID(last_sync_id) `); return { @@ -306,12 +321,13 @@ async function importPurchaseOrders(prodConnection, localConnection) { totalImported: totalItems, recordsAdded, recordsUpdated, - incrementalUpdate: !!syncInfo?.[0] + incrementalUpdate, + lastSyncTime }; } catch (error) { outputProgress({ - operation: "Purchase orders import failed", + operation: `${incrementalUpdate ? 'Incremental' : 'Full'} purchase orders import failed`, status: "error", error: error.message, });