From 5e4d1c3bd8f62020b0545a71a3899d42d9a4be3c Mon Sep 17 00:00:00 2001 From: Matt Date: Fri, 31 Jan 2025 01:39:48 -0500 Subject: [PATCH] Improve import scripts with enhanced incremental update tracking and performance - Add record tracking for added and updated records in import scripts - Modify products import to use a dynamic 'needs_update' flag for selective updates - Enhance order import with more comprehensive timestamp checks - Update import-from-prod.js to handle and clean up previously running imports - Improve error handling and connection management in import processes --- inventory-server/scripts/import-from-prod.js | 17 +++++-- inventory-server/scripts/import/orders.js | 18 +++++-- inventory-server/scripts/import/products.js | 49 ++++++++++++++----- .../scripts/import/purchase-orders.js | 13 +++-- 4 files changed, 72 insertions(+), 25 deletions(-) diff --git a/inventory-server/scripts/import-from-prod.js b/inventory-server/scripts/import-from-prod.js index 3148261..7395293 100644 --- a/inventory-server/scripts/import-from-prod.js +++ b/inventory-server/scripts/import-from-prod.js @@ -16,7 +16,7 @@ const IMPORT_ORDERS = true; const IMPORT_PURCHASE_ORDERS = true; // Add flag for incremental updates -const INCREMENTAL_UPDATE = process.env.INCREMENTAL_UPDATE === 'true'; +const INCREMENTAL_UPDATE = process.env.INCREMENTAL_UPDATE !== 'false'; // Default to true unless explicitly set to false // SSH configuration // In import-from-prod.js @@ -103,6 +103,17 @@ async function main() { if (isImportCancelled) throw new Error("Import cancelled"); + // Clean up any previously running imports that weren't completed + await localConnection.query(` + UPDATE import_history + SET + status = 'cancelled', + end_time = NOW(), + duration_seconds = TIMESTAMPDIFF(SECOND, start_time, NOW()), + error_message = 'Previous import was not completed properly' + WHERE status = 'running' + `); + // Initialize sync_status table if it doesn't exist await localConnection.query(` CREATE TABLE IF NOT EXISTS sync_status ( @@ -240,8 +251,8 @@ async function main() { const totalElapsedSeconds = Math.round((endTime - startTime) / 1000); // Update import history with error - if (importHistoryId) { - await connections?.localConnection?.query(` + if (importHistoryId && connections?.localConnection) { + await connections.localConnection.query(` UPDATE import_history SET end_time = NOW(), diff --git a/inventory-server/scripts/import/orders.js b/inventory-server/scripts/import/orders.js index ce33da4..38b0eae 100644 --- a/inventory-server/scripts/import/orders.js +++ b/inventory-server/scripts/import/orders.js @@ -17,6 +17,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = const startTime = Date.now(); const skippedOrders = new Set(); const missingProducts = new Set(); + let recordsAdded = 0; + let recordsUpdated = 0; try { // Get column names from the local table @@ -88,12 +90,14 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = ${incrementalUpdate ? ` AND ( o.stamp > ? + OR oi.stamp > ? OR o.date_placed > ? OR o.date_shipped > ? - OR oi.stamp > ? + OR o.date_cancelled > ? + OR o.date_updated > ? ) ` : ''} - `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []); + `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []); const totalOrders = orderItems.length; let processed = 0; @@ -271,12 +275,16 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = const singlePlaceholder = `(${columnNames.map(() => "?").join(",")})`; const placeholders = Array(validOrders.length).fill(singlePlaceholder).join(","); - await localConnection.query(` + const query = ` INSERT INTO orders (${columnNames.join(",")}) VALUES ${placeholders} ON DUPLICATE KEY UPDATE ${columnNames.map(col => `${col} = VALUES(${col})`).join(",")} - `, values); + `; + + const result = await localConnection.query(query, values.flat()); + recordsAdded += result.affectedRows - result.changedRows; + recordsUpdated += result.changedRows; importedCount += validOrders.length; } @@ -422,6 +430,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = return { status: "complete", totalImported: importedCount, + recordsAdded, + recordsUpdated, totalSkipped: skippedOrders.size, missingProducts: missingProducts.size, incrementalUpdate, diff --git a/inventory-server/scripts/import/products.js b/inventory-server/scripts/import/products.js index 86dcffd..41bbbaa 100644 --- a/inventory-server/scripts/import/products.js +++ b/inventory-server/scripts/import/products.js @@ -279,6 +279,7 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate country_of_origin VARCHAR(5), date_last_sold DATE, category_ids TEXT, + needs_update BOOLEAN DEFAULT FALSE, PRIMARY KEY (pid) ) ENGINE=InnoDB `); @@ -321,7 +322,19 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate p.totalsold AS total_sold, p.country_of_origin, pls.date_sold as date_last_sold, - GROUP_CONCAT(DISTINCT pci.cat_id) as category_ids + GROUP_CONCAT(DISTINCT pci.cat_id) as category_ids, + CASE WHEN + ${incrementalUpdate ? ` + p.stamp > ? OR + ci.stamp > ? OR + pcp.date_deactive > ? OR + pcp.date_active > ? OR + sid.stamp > ? OR + pnb.date_updated > ? OR + pls.date_sold > ? OR + si.stamp > ? + ` : 'TRUE'} + THEN 1 ELSE 0 END as needs_update FROM products p LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0 LEFT JOIN supplier_item_data sid ON p.pid = sid.pid @@ -332,16 +345,13 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate LEFT JOIN product_categories pc3 ON p.subline = pc3.cat_id LEFT JOIN product_categories pc4 ON p.artist = pc4.cat_id LEFT JOIN product_last_sold pls ON p.pid = pls.pid - ${incrementalUpdate ? ` - WHERE p.stamp > ? - OR pls.date_sold > ? - OR p.date_created > ? - OR p.datein > ? - ` : ''} + LEFT JOIN current_inventory ci ON p.pid = ci.pid + LEFT JOIN product_current_prices pcp ON p.pid = pcp.pid AND pcp.active = 1 + LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid GROUP BY p.pid - `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []); + `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []); - // Insert production data in batches + // Insert production data in batches, but only for products that need updates for (let i = 0; i < prodData.length; i += 1000) { const batch = prodData.slice(i, i + 1000); const placeholders = batch.map(() => "(?)").join(","); @@ -359,9 +369,11 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate }); } - // Now join with local temp tables and process in batches + // Now join with local temp tables and process in batches, but only for products that need updates const BATCH_SIZE = 2500; let processed = 0; + let recordsAdded = 0; + let recordsUpdated = 0; while (processed < totalProducts) { const [batch] = await localConnection.query(` @@ -376,6 +388,7 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate FROM temp_prod_data p LEFT JOIN temp_inventory_status tis ON p.pid = tis.pid LEFT JOIN temp_product_prices tpp ON p.pid = tpp.pid + WHERE p.needs_update = 1 LIMIT ? OFFSET ? `, [BATCH_SIZE, processed]); @@ -412,7 +425,9 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate .join(",")}; `; - await localConnection.query(insertQuery, productValues); + const result = await localConnection.query(insertQuery, productValues); + recordsAdded += result.affectedRows - result.changedRows; + recordsUpdated += result.changedRows; // Insert category relationships const categoryRelationships = []; @@ -495,6 +510,8 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate return { status: "complete", totalImported: totalProducts, + recordsAdded, + recordsUpdated, incrementalUpdate: true, lastSyncTime }; @@ -682,7 +699,9 @@ async function importMissingProducts(prodConnection, localConnection, missingPid .join(",")} `; - await localConnection.query(query, productValues); + const result = await localConnection.query(query, productValues); + recordsAdded += result.affectedRows - result.changedRows; + recordsUpdated += result.changedRows; // Verify products were inserted before proceeding with categories const [insertedProducts] = await localConnection.query( @@ -738,7 +757,11 @@ async function importMissingProducts(prodConnection, localConnection, missingPid return { status: "complete", - totalImported: products.length + totalImported: products.length, + recordsAdded, + recordsUpdated, + incrementalUpdate: true, + lastSyncTime }; } catch (error) { throw error; diff --git a/inventory-server/scripts/import/purchase-orders.js b/inventory-server/scripts/import/purchase-orders.js index 54ce02a..d05c231 100644 --- a/inventory-server/scripts/import/purchase-orders.js +++ b/inventory-server/scripts/import/purchase-orders.js @@ -2,6 +2,8 @@ const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = async function importPurchaseOrders(prodConnection, localConnection, incrementalUpdate = true) { const startTime = Date.now(); + let recordsAdded = 0; + let recordsUpdated = 0; try { // Get last sync info @@ -29,16 +31,19 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental // Build incremental conditions const incrementalWhereClause = incrementalUpdate ? `AND ( - p.date_updated > ? + p.stamp > ? + OR p.date_updated > ? OR p.date_ordered > ? OR p.date_estin > ? - OR r.stamp > ? + OR r.date_updated > ? + OR r.date_created > ? + OR r.date_checked > ? OR rp.stamp > ? OR rp.received_date > ? )` : ""; const incrementalParams = incrementalUpdate - ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] + ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []; // First get all relevant PO IDs with basic info @@ -98,8 +103,6 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental const totalItems = total; let processed = 0; - let recordsAdded = 0; - let recordsUpdated = 0; const BATCH_SIZE = 5000; const PROGRESS_INTERVAL = 500;