From c433f1aae89ec90db20aa695e3431d6922bfe43c Mon Sep 17 00:00:00 2001 From: Matt Date: Thu, 30 Jan 2025 15:49:47 -0500 Subject: [PATCH] Enhance import scripts with incremental update support and improved error handling - Update import-from-prod.js to support granular incremental updates for different import types - Modify orders.js to handle complex order data retrieval with better performance and error tracking - Add support for incremental updates in products.js import function - Improve logging and progress tracking for import processes --- inventory-server/scripts/import-from-prod.js | 12 +- inventory-server/scripts/import/orders.js | 529 ++++++++++--------- inventory-server/scripts/import/products.js | 14 +- 3 files changed, 287 insertions(+), 268 deletions(-) diff --git a/inventory-server/scripts/import-from-prod.js b/inventory-server/scripts/import-from-prod.js index ede5aa2..84d8613 100644 --- a/inventory-server/scripts/import-from-prod.js +++ b/inventory-server/scripts/import-from-prod.js @@ -10,10 +10,10 @@ const importPurchaseOrders = require('./import/purchase-orders'); dotenv.config({ path: path.join(__dirname, "../.env") }); // Constants to control which imports run -const IMPORT_CATEGORIES = true; -const IMPORT_PRODUCTS = true; +const IMPORT_CATEGORIES = false; +const IMPORT_PRODUCTS = false; const IMPORT_ORDERS = true; -const IMPORT_PURCHASE_ORDERS = true; +const IMPORT_PURCHASE_ORDERS = false; // Add flag for incremental updates const INCREMENTAL_UPDATE = process.env.INCREMENTAL_UPDATE === 'true'; @@ -156,7 +156,7 @@ async function main() { } if (IMPORT_PRODUCTS) { - results.products = await importProducts(prodConnection, localConnection); + results.products = await importProducts(prodConnection, localConnection, INCREMENTAL_UPDATE); if (isImportCancelled) throw new Error("Import cancelled"); completedSteps++; if (results.products.recordsAdded) totalRecordsAdded += results.products.recordsAdded; @@ -164,7 +164,7 @@ async function main() { } if (IMPORT_ORDERS) { - results.orders = await importOrders(prodConnection, localConnection); + results.orders = await importOrders(prodConnection, localConnection, INCREMENTAL_UPDATE); if (isImportCancelled) throw new Error("Import cancelled"); completedSteps++; if (results.orders.recordsAdded) totalRecordsAdded += results.orders.recordsAdded; @@ -172,7 +172,7 @@ async function main() { } if (IMPORT_PURCHASE_ORDERS) { - results.purchaseOrders = await importPurchaseOrders(prodConnection, localConnection); + results.purchaseOrders = await importPurchaseOrders(prodConnection, localConnection, INCREMENTAL_UPDATE); if (isImportCancelled) throw new Error("Import cancelled"); completedSteps++; if (results.purchaseOrders.recordsAdded) totalRecordsAdded += results.purchaseOrders.recordsAdded; diff --git a/inventory-server/scripts/import/orders.js b/inventory-server/scripts/import/orders.js index 081e0d2..6c6478d 100644 --- a/inventory-server/scripts/import/orders.js +++ b/inventory-server/scripts/import/orders.js @@ -19,317 +19,334 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = const missingProducts = new Set(); try { - // Get the last sync time - const [syncInfo] = await localConnection.query( - "SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'orders'" - ); - const lastSyncTime = syncInfo?.[0]?.last_sync_timestamp || '1970-01-01'; - - // Retrieve column names for the 'orders' table, skip 'id' since it's auto-increment + // Get column names from the local table const [columns] = await localConnection.query(` SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'orders' ORDER BY ORDINAL_POSITION `); - const columnNames = columns - .map(col => col.COLUMN_NAME) - .filter(name => name !== "id"); + const columnNames = columns.map(col => col.COLUMN_NAME); - // Build query clauses for incremental vs. full update - const incrementalWhereClause = incrementalUpdate - ? `AND ( - o.stamp > ? - OR o.date_modified > ? - OR o.date_placed > ? - OR o.date_shipped > ? - OR oi.stamp > ? - )` - : ""; - const incrementalParams = incrementalUpdate - ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] - : []; - - // Count how many orders we need to process - const [countResult] = await prodConnection.query( - ` - SELECT COUNT(*) AS total - FROM order_items oi USE INDEX (PRIMARY) - JOIN _order o USE INDEX (PRIMARY) - ON oi.order_id = o.order_id - WHERE o.order_status >= 15 - AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR) - ${incrementalWhereClause} - `, - incrementalParams + // Get last sync info + const [syncInfo] = await localConnection.query( + "SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'orders'" ); + const lastSyncTime = syncInfo?.[0]?.last_sync_timestamp || '1970-01-01'; - const total = countResult[0].total; - outputProgress({ - operation: `Starting ${incrementalUpdate ? 'incremental' : 'full'} orders import - Fetching ${total} orders`, - status: "running", + // Count the total number of orders to be imported + const [countResults] = await prodConnection.query(` + SELECT + COUNT(DISTINCT oi.order_id, oi.prod_pid) as total_all, + SUM(CASE + WHEN o.stamp > ? OR o.date_placed > ? OR o.date_shipped > ? OR oi.stamp > ? + THEN 1 ELSE 0 + END) as total_incremental + FROM order_items oi + JOIN _order o ON oi.order_id = o.order_id + WHERE o.order_status >= 15 + AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR) + AND o.date_placed_onlydate IS NOT NULL + `, [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime]); + + console.log('Count details:', { + total_all: countResults[0].total_all, + total_incremental: countResults[0].total_incremental, + lastSyncTime, + incrementalUpdate }); - let processed = 0; - // Increase or decrease this if you find a more optimal size - const batchSize = 20000; - let offset = 0; + const totalOrders = incrementalUpdate ? countResults[0].total_incremental : countResults[0].total_all; - // Process in batches for memory efficiency - while (offset < total) { - // Fetch orders (initially with tax set to 0, to be updated later) - const [orders] = await prodConnection.query( - ` - SELECT - oi.order_id AS order_number, - oi.prod_pid AS pid, - oi.prod_itemnumber AS SKU, - o.date_placed_onlydate AS date, - oi.prod_price_reg AS price, - oi.qty_ordered AS quantity, - (oi.prod_price_reg - oi.prod_price) AS discount, - 0 AS tax, - 0 AS tax_included, - ROUND( - ( - (o.summary_shipping - COALESCE(o.summary_discount_shipping, 0)) - * (oi.prod_price * oi.qty_ordered) - ) / NULLIF(o.summary_subtotal, 0), - 2 - ) AS shipping, - o.order_cid AS customer, - CONCAT(o.bill_firstname, ' ', o.bill_lastname) AS customer_name, - 'pending' AS status, - CASE WHEN o.order_status = 15 THEN 1 ELSE 0 END AS canceled - FROM order_items oi - FORCE INDEX (PRIMARY) - JOIN _order o - ON oi.order_id = o.order_id - WHERE o.order_status >= 15 - AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR) - ${incrementalWhereClause} - LIMIT ? OFFSET ? - `, - [...incrementalParams, batchSize, offset] + outputProgress({ + status: "running", + operation: "Orders import", + message: `Starting ${incrementalUpdate ? 'incremental' : 'full'} import of ${totalOrders} orders`, + current: 0, + total: totalOrders + }); + + // Fetch orders in batches + const batchSize = 5000; + let offset = 0; + let importedCount = 0; + let lastProgressUpdate = Date.now(); + + while (offset < totalOrders) { + // First get the base order data + const [prodOrders] = await prodConnection.query(` + SELECT + oi.order_id as order_number, + oi.prod_pid as pid, + oi.prod_itemnumber as SKU, + o.date_placed_onlydate as date, + oi.prod_price as price, + oi.qty_ordered as quantity, + COALESCE(oi.prod_price_reg - oi.prod_price, 0) * oi.qty_ordered as base_discount, + o.order_cid as customer, + CONCAT(COALESCE(u.firstname, ''), ' ', COALESCE(u.lastname, '')) as customer_name, + o.order_status as status, + CASE WHEN o.date_cancelled != '0000-00-00 00:00:00' THEN 1 ELSE 0 END as canceled + FROM order_items oi + JOIN _order o ON oi.order_id = o.order_id + LEFT JOIN users u ON o.order_cid = u.cid + WHERE o.order_status >= 15 + AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR) + AND o.date_placed_onlydate IS NOT NULL + ${incrementalUpdate ? ` + AND ( + o.stamp > ? + OR o.date_placed > ? + OR o.date_shipped > ? + OR oi.stamp > ? + ) + ` : ''} + ORDER BY oi.order_id, oi.prod_pid + LIMIT ? OFFSET ? + `, incrementalUpdate ? + [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, batchSize, offset] : + [batchSize, offset] ); - // Fetch the latest tax info for these orders - if (orders.length > 0) { - const orderIds = [...new Set(orders.map(o => o.order_number))]; - const [taxInfo] = await prodConnection.query(` - SELECT oti.order_id, otp.pid, otp.item_taxes_to_collect - FROM ( - SELECT order_id, MAX(stamp) AS latest_stamp + if (prodOrders.length === 0) break; + + // Get order numbers for this batch + const orderNumbers = [...new Set(prodOrders.map(o => o.order_number))]; + const orderPids = prodOrders.map(o => o.pid); + + // Get promotional discounts in a separate query + const [promoDiscounts] = await prodConnection.query(` + SELECT order_id, pid, amount + FROM order_discount_items + WHERE order_id IN (?) + `, [orderNumbers]); + + // Create a map for quick discount lookups + const discountMap = new Map(); + promoDiscounts.forEach(d => { + const key = `${d.order_id}-${d.pid}`; + discountMap.set(key, d.amount || 0); + }); + + // Get tax information in a separate query + const [taxInfo] = await prodConnection.query(` + SELECT oti.order_id, otip.pid, otip.item_taxes_to_collect + FROM order_tax_info oti + JOIN order_tax_info_products otip ON oti.taxinfo_id = otip.taxinfo_id + WHERE oti.order_id IN (?) + AND (oti.order_id, oti.stamp) IN ( + SELECT order_id, MAX(stamp) FROM order_tax_info WHERE order_id IN (?) GROUP BY order_id - ) latest - JOIN order_tax_info oti - ON oti.order_id = latest.order_id - AND oti.stamp = latest.latest_stamp - JOIN order_tax_info_products otp - ON oti.taxinfo_id = otp.taxinfo_id - `, [orderIds]); + ) + `, [orderNumbers, orderNumbers]); - // Map (order_id-pid) -> tax amount - const taxMap = new Map(); - taxInfo.forEach(t => { - taxMap.set(`${t.order_id}-${t.pid}`, t.item_taxes_to_collect); - }); + // Create a map for quick tax lookups + const taxMap = new Map(); + taxInfo.forEach(t => { + const key = `${t.order_id}-${t.pid}`; + taxMap.set(key, t.item_taxes_to_collect || 0); + }); - // Merge tax into the orders array - orders.forEach(order => { - const key = `${order.order_number}-${order.pid}`; - if (taxMap.has(key)) { - order.tax = taxMap.get(key) || 0; - } - }); - } - - // Check local DB for existing products to ensure we don't insert orders for missing products - const orderProductPids = [...new Set(orders.map(o => o.pid))]; + // Check for missing products const [existingProducts] = await localConnection.query( "SELECT pid FROM products WHERE pid IN (?)", - [orderProductPids] + [orderPids] ); const existingPids = new Set(existingProducts.map(p => p.pid)); - // Separate valid orders from those referencing missing products - const validOrders = []; - for (const order of orders) { + // Track missing products and filter orders + const validOrders = prodOrders.filter(order => { + if (!order.date) return false; if (!existingPids.has(order.pid)) { missingProducts.add(order.pid); skippedOrders.add(order.order_number); - } else { - validOrders.push(order); + return false; } - } + return true; + }); - // Bulk insert valid orders - if (validOrders.length > 0) { - const placeholders = validOrders - .map(() => `(${Array(columnNames.length).fill("?").join(",")})`) - .join(","); - const updateClauses = columnNames - .filter(col => col !== "order_number") // don't overwrite primary key - .map(col => `${col} = VALUES(${col})`) - .join(","); + // Prepare values for insertion + const orderValues = validOrders.map(order => { + const orderKey = `${order.order_number}-${order.pid}`; + const orderData = { + id: order.order_number, + order_number: order.order_number, + pid: order.pid, + SKU: order.SKU, + date: order.date, + price: order.price, + quantity: order.quantity, + discount: Number(order.base_discount || 0) + Number(discountMap.get(orderKey) || 0), + tax: Number(taxMap.get(orderKey) || 0), + tax_included: 0, + shipping: 0, + customer: order.customer, + customer_name: order.customer_name || '', + status: order.status, + canceled: order.canceled, + }; - const upsertQuery = ` - INSERT INTO orders (${columnNames.join(",")}) + return columnNames.map(colName => orderData[colName] !== undefined ? orderData[colName] : null); + }); + + // Execute the insert + if (orderValues.length > 0) { + const placeholders = validOrders.map(() => `(${columnNames.map(() => "?").join(", ")})`).join(","); + const insertQuery = ` + INSERT INTO orders (${columnNames.join(", ")}) VALUES ${placeholders} - ON DUPLICATE KEY UPDATE ${updateClauses} + ON DUPLICATE KEY UPDATE + ${columnNames.map(col => `${col} = VALUES(${col})`).join(", ")} `; - await localConnection.query( - upsertQuery, - validOrders.flatMap(order => columnNames.map(col => order[col])) - ); + await localConnection.query(insertQuery, orderValues.flat()); } - processed += orders.length; + importedCount += validOrders.length; offset += batchSize; - outputProgress({ - status: "running", - operation: "Orders import", - current: processed, - total, - elapsed: formatElapsedTime((Date.now() - startTime) / 1000), - remaining: estimateRemaining(startTime, processed, total), - rate: calculateRate(startTime, processed) - }); - } - - // If we found missing products, import them and retry the skipped orders - if (missingProducts.size > 0) { - outputProgress({ - operation: `Found ${missingProducts.size} missing products, importing them now`, - status: "running", - }); - - // Import missing products - await importMissingProducts(prodConnection, localConnection, [...missingProducts]); - - // Retry orders that were skipped due to missing products - if (skippedOrders.size > 0) { + // Update progress every second + const now = Date.now(); + if (now - lastProgressUpdate >= 1000) { outputProgress({ - operation: `Retrying ${skippedOrders.size} skipped orders`, status: "running", + operation: "Orders import", + message: `Imported ${importedCount} of ${totalOrders} orders`, + current: importedCount, + total: totalOrders, + elapsed: formatElapsedTime((now - startTime) / 1000), + remaining: estimateRemaining(startTime, importedCount, totalOrders), + rate: calculateRate(startTime, importedCount) }); - - const [retryOrders] = await prodConnection.query(` - SELECT - oi.order_id AS order_number, - oi.prod_pid AS pid, - oi.prod_itemnumber AS SKU, - o.date_placed_onlydate AS date, - oi.prod_price_reg AS price, - oi.qty_ordered AS quantity, - (oi.prod_price_reg - oi.prod_price) AS discount, - 0 AS tax, - 0 AS tax_included, - ROUND( - ( - (o.summary_shipping - COALESCE(o.summary_discount_shipping, 0)) - * (oi.prod_price * oi.qty_ordered) - ) / NULLIF(o.summary_subtotal, 0), - 2 - ) AS shipping, - o.order_cid AS customer, - CONCAT(o.bill_firstname, ' ', o.bill_lastname) AS customer_name, - 'pending' AS status, - CASE WHEN o.order_status = 15 THEN 1 ELSE 0 END AS canceled - FROM order_items oi - JOIN _order o ON oi.order_id = o.order_id - WHERE oi.order_id IN (?) - `, [[...skippedOrders]]); - - if (retryOrders.length > 0) { - // Fetch tax data for these specific retry orders - const retryOrderIds = [...new Set(retryOrders.map(o => o.order_number))]; - const [retryTaxInfo] = await prodConnection.query(` - SELECT oti.order_id, otp.pid, otp.item_taxes_to_collect - FROM ( - SELECT order_id, MAX(stamp) AS latest_stamp - FROM order_tax_info - WHERE order_id IN (?) - GROUP BY order_id - ) latest - JOIN order_tax_info oti - ON oti.order_id = latest.order_id - AND oti.stamp = latest.latest_stamp - JOIN order_tax_info_products otp - ON oti.taxinfo_id = otp.taxinfo_id - `, [retryOrderIds]); - - const taxMap = new Map(); - retryTaxInfo.forEach(t => { - taxMap.set(`${t.order_id}-${t.pid}`, t.item_taxes_to_collect); - }); - - retryOrders.forEach(order => { - const key = `${order.order_number}-${order.pid}`; - if (taxMap.has(key)) { - order.tax = taxMap.get(key) || 0; - } - }); - - const placeholders = retryOrders - .map(() => `(${Array(columnNames.length).fill("?").join(",")})`) - .join(","); - const updateClauses = columnNames - .filter(col => col !== "order_number") - .map(col => `${col} = VALUES(${col})`) - .join(","); - - const upsertQuery = ` - INSERT INTO orders (${columnNames.join(",")}) - VALUES ${placeholders} - ON DUPLICATE KEY UPDATE ${updateClauses} - `; - - await localConnection.query( - upsertQuery, - retryOrders.flatMap(order => columnNames.map(col => order[col])) - ); - } + lastProgressUpdate = now; } } - // Update the sync timestamp + // Import missing products if any + if (missingProducts.size > 0) { + await importMissingProducts(prodConnection, localConnection, Array.from(missingProducts)); + + // Retry skipped orders after importing products + if (skippedOrders.size > 0) { + outputProgress({ + status: "running", + operation: "Orders import", + message: `Retrying import of ${skippedOrders.size} orders with previously missing products` + }); + + const [skippedProdOrders] = await prodConnection.query(` + SELECT + o.order_id, + CASE + WHEN o.date_placed = '0000-00-00 00:00:00' OR o.date_placed IS NULL THEN o.stamp + ELSE o.date_placed + END as date, + o.order_cid, + o.bill_firstname, + o.bill_lastname, + o.order_email, + o.order_status, + o.date_shipped, + o.date_cancelled, + oi.prod_pid, + oi.prod_itemnumber, + oi.prod_price, + oi.qty_ordered, + oi.qty_back, + oi.qty_placed, + oi.qty_placed_2, + oi.discounted, + oi.summary_cogs, + oi.summary_profit, + oi.summary_orderdate, + oi.summary_paiddate, + oi.date_added, + oi.stamp + FROM order_items oi + JOIN _order o ON oi.order_id = o.order_id + WHERE o.order_id IN (?) + `, [Array.from(skippedOrders)]); + + // Prepare values for insertion + const skippedOrderValues = skippedProdOrders.flatMap(order => { + if (!order.date) { + console.log(`Warning: Skipped order ${order.order_id} has null date:`, JSON.stringify(order, null, 2)); + return []; + } + + const canceled = order.date_cancelled !== '0000-00-00 00:00:00' ? 1 : 0; + const customerName = `${order.bill_firstname} ${order.bill_lastname}`; + + // Create an object with keys based on column names + const orderData = { + id: order.order_id, + order_number: order.order_id, + pid: order.prod_pid, + SKU: order.prod_itemnumber, + date: order.date ? ( + order.date instanceof Date ? + order.date.toJSON()?.slice(0,10) || null : + (typeof order.date === 'string' ? order.date.split(' ')[0] : null) + ) : null, + price: order.prod_price, + quantity: order.qty_ordered, + discount: order.discounted, + tax: 0, // Placeholder, will be calculated later + tax_included: 0, // Placeholder, will be calculated later + shipping: 0, // Placeholder, will be calculated later + customer: order.order_email, + customer_name: customerName, + status: order.order_status, + canceled: canceled, + }; + + // Map column names to values, handling missing columns + return [columnNames.map(colName => orderData[colName] !== undefined ? orderData[colName] : null)]; + }); + + // Construct the insert query dynamically + const skippedPlaceholders = skippedProdOrders.map(() => `(${columnNames.map(() => "?").join(", ")})`).join(","); + const skippedInsertQuery = ` + INSERT INTO orders (${columnNames.join(", ")}) + VALUES ${skippedPlaceholders} + ON DUPLICATE KEY UPDATE + ${columnNames.map(col => `${col} = VALUES(${col})`).join(", ")} + `; + + // Execute the insert query + if (skippedOrderValues.length > 0) { + await localConnection.query(skippedInsertQuery, skippedOrderValues.flat()); + } + + importedCount += skippedProdOrders.length; + + outputProgress({ + status: "running", + operation: "Orders import", + message: `Successfully imported ${skippedProdOrders.length} previously skipped orders`, + }); + } + } + + // Update sync status await localConnection.query(` INSERT INTO sync_status (table_name, last_sync_timestamp) VALUES ('orders', NOW()) - ON DUPLICATE KEY UPDATE - last_sync_timestamp = NOW(), - last_sync_id = LAST_INSERT_ID(last_sync_id) + ON DUPLICATE KEY UPDATE last_sync_timestamp = NOW() `); - const endTime = Date.now(); - - outputProgress({ - status: "complete", - operation: `${incrementalUpdate ? 'Incremental' : 'Full'} orders import completed`, - current: total, - total, - duration: formatElapsedTime((endTime - startTime) / 1000), - }); - return { status: "complete", - totalImported: total, + totalImported: importedCount, + totalSkipped: skippedOrders.size, missingProducts: missingProducts.size, - retriedOrders: skippedOrders.size, incrementalUpdate, lastSyncTime }; } catch (error) { - outputProgress({ - operation: `${incrementalUpdate ? 'Incremental' : 'Full'} orders import failed`, - status: "error", - error: error.message, - }); + console.error("Error during orders import:", error); throw error; } } diff --git a/inventory-server/scripts/import/products.js b/inventory-server/scripts/import/products.js index 3046da5..d6c838f 100644 --- a/inventory-server/scripts/import/products.js +++ b/inventory-server/scripts/import/products.js @@ -198,7 +198,7 @@ async function materializeCalculations(prodConnection, localConnection) { }); } -async function importProducts(prodConnection, localConnection) { +async function importProducts(prodConnection, localConnection, incrementalUpdate = true) { const startTime = Date.now(); try { @@ -332,12 +332,14 @@ async function importProducts(prodConnection, localConnection) { LEFT JOIN product_categories pc3 ON p.subline = pc3.cat_id LEFT JOIN product_categories pc4 ON p.artist = pc4.cat_id LEFT JOIN product_last_sold pls ON p.pid = pls.pid - WHERE p.stamp > ? - OR pls.date_sold > ? - OR p.date_created > ? - OR p.datein > ? + ${incrementalUpdate ? ` + WHERE p.stamp > ? + OR pls.date_sold > ? + OR p.date_created > ? + OR p.datein > ? + ` : ''} GROUP BY p.pid - `, [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime]); + `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []); // Insert production data in batches for (let i = 0; i < prodData.length; i += 1000) {