From 996d3d36af8b1d10aa8b06a2e4b32e5b39f50864 Mon Sep 17 00:00:00 2001 From: Matt Date: Fri, 31 Jan 2025 10:01:50 -0500 Subject: [PATCH] Streamline incremental imports --- inventory-server/scripts/import-from-prod.js | 21 +-- inventory-server/scripts/import/orders.js | 10 +- inventory-server/scripts/import/products.js | 187 +++++++------------ 3 files changed, 77 insertions(+), 141 deletions(-) diff --git a/inventory-server/scripts/import-from-prod.js b/inventory-server/scripts/import-from-prod.js index 2f054da..1fb63f1 100644 --- a/inventory-server/scripts/import-from-prod.js +++ b/inventory-server/scripts/import-from-prod.js @@ -10,8 +10,8 @@ const importPurchaseOrders = require('./import/purchase-orders'); dotenv.config({ path: path.join(__dirname, "../.env") }); // Constants to control which imports run -const IMPORT_CATEGORIES = false; -const IMPORT_PRODUCTS = false; +const IMPORT_CATEGORIES = true; +const IMPORT_PRODUCTS = true; const IMPORT_ORDERS = true; const IMPORT_PURCHASE_ORDERS = true; @@ -48,7 +48,6 @@ const sshConfig = { connectionLimit: 10, queueLimit: 0, namedPlaceholders: true, - maxAllowedPacket: 64 * 1024 * 1024, // 64MB connectTimeout: 60000, enableKeepAlive: true, keepAliveInitialDelay: 10000, @@ -162,32 +161,32 @@ async function main() { results.categories = await importCategories(prodConnection, localConnection); if (isImportCancelled) throw new Error("Import cancelled"); completedSteps++; - if (results.categories.recordsAdded) totalRecordsAdded += results.categories.recordsAdded; - if (results.categories.recordsUpdated) totalRecordsUpdated += results.categories.recordsUpdated; + if (results.categories?.recordsAdded) totalRecordsAdded += results.categories.recordsAdded; + if (results.categories?.recordsUpdated) totalRecordsUpdated += results.categories.recordsUpdated; } if (IMPORT_PRODUCTS) { results.products = await importProducts(prodConnection, localConnection, INCREMENTAL_UPDATE); if (isImportCancelled) throw new Error("Import cancelled"); completedSteps++; - if (results.products.recordsAdded) totalRecordsAdded += results.products.recordsAdded; - if (results.products.recordsUpdated) totalRecordsUpdated += results.products.recordsUpdated; + if (results.products?.recordsAdded) totalRecordsAdded += results.products.recordsAdded; + if (results.products?.recordsUpdated) totalRecordsUpdated += results.products.recordsUpdated; } if (IMPORT_ORDERS) { results.orders = await importOrders(prodConnection, localConnection, INCREMENTAL_UPDATE); if (isImportCancelled) throw new Error("Import cancelled"); completedSteps++; - if (results.orders.recordsAdded) totalRecordsAdded += results.orders.recordsAdded; - if (results.orders.recordsUpdated) totalRecordsUpdated += results.orders.recordsUpdated; + if (results.orders?.recordsAdded) totalRecordsAdded += results.orders.recordsAdded; + if (results.orders?.recordsUpdated) totalRecordsUpdated += results.orders.recordsUpdated; } if (IMPORT_PURCHASE_ORDERS) { results.purchaseOrders = await importPurchaseOrders(prodConnection, localConnection, INCREMENTAL_UPDATE); if (isImportCancelled) throw new Error("Import cancelled"); completedSteps++; - if (results.purchaseOrders.recordsAdded) totalRecordsAdded += results.purchaseOrders.recordsAdded; - if (results.purchaseOrders.recordsUpdated) totalRecordsUpdated += results.purchaseOrders.recordsUpdated; + if (results.purchaseOrders?.recordsAdded) totalRecordsAdded += results.purchaseOrders.recordsAdded; + if (results.purchaseOrders?.recordsUpdated) totalRecordsUpdated += results.purchaseOrders.recordsUpdated; } const endTime = Date.now(); diff --git a/inventory-server/scripts/import/orders.js b/inventory-server/scripts/import/orders.js index c000555..1cfdea8 100644 --- a/inventory-server/scripts/import/orders.js +++ b/inventory-server/scripts/import/orders.js @@ -1,5 +1,5 @@ const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress'); -const { importMissingProducts } = require('./products'); +const { importMissingProducts, setupTemporaryTables, cleanupTemporaryTables, materializeCalculations } = require('./products'); /** * Imports orders from a production MySQL database to a local MySQL database. @@ -312,15 +312,9 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = // Import missing products if any if (missingProducts.size > 0) { try { - // Setup temporary tables again since they were dropped - await setupTemporaryTables(localConnection); - await materializeCalculations(prodConnection, localConnection); - + // Import missing products directly without materialization await importMissingProducts(prodConnection, localConnection, Array.from(missingProducts)); - // Clean up temporary tables after missing products import - await cleanupTemporaryTables(localConnection); - // Retry skipped orders after importing products if (skippedOrders.size > 0) { outputProgress({ diff --git a/inventory-server/scripts/import/products.js b/inventory-server/scripts/import/products.js index cbd67a1..fa1f2ce 100644 --- a/inventory-server/scripts/import/products.js +++ b/inventory-server/scripts/import/products.js @@ -227,15 +227,21 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate const [countResult] = await prodConnection.query(` SELECT COUNT(*) as total FROM products p - WHERE p.stamp > ? - OR EXISTS ( - SELECT 1 FROM product_last_sold pls - WHERE p.pid = pls.pid - AND pls.date_sold > ? - ) - OR p.date_created > ? - OR p.datein > ? - `, [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime]); + LEFT JOIN current_inventory ci ON p.pid = ci.pid + LEFT JOIN product_current_prices pcp ON p.pid = pcp.pid AND pcp.active = 1 + LEFT JOIN supplier_item_data sid ON p.pid = sid.pid + LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid + LEFT JOIN product_last_sold pls ON p.pid = pls.pid + WHERE ${incrementalUpdate ? ` + p.stamp > ? OR + ci.stamp > ? OR + pcp.date_deactive > ? OR + pcp.date_active > ? OR + sid.stamp > ? OR + pnb.date_updated > ? OR + pls.date_sold > ? + ` : 'TRUE'} + `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []); const totalProducts = countResult[0].total; @@ -243,7 +249,7 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate outputProgress({ status: "running", operation: "Products import", - message: "Fetching product data from production" + message: `Fetching ${incrementalUpdate ? 'updated' : 'all'} product data from production` }); // Create temporary table for production data @@ -279,7 +285,7 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate country_of_origin VARCHAR(5), date_last_sold DATE, category_ids TEXT, - needs_update BOOLEAN DEFAULT FALSE, + needs_update BOOLEAN DEFAULT TRUE, PRIMARY KEY (pid) ) ENGINE=InnoDB `); @@ -322,18 +328,7 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate p.totalsold AS total_sold, p.country_of_origin, pls.date_sold as date_last_sold, - GROUP_CONCAT(DISTINCT pci.cat_id) as category_ids, - CASE WHEN - ${incrementalUpdate ? ` - p.stamp > ? OR - ci.stamp > ? OR - pcp.date_deactive > ? OR - pcp.date_active > ? OR - sid.stamp > ? OR - pnb.date_updated > ? OR - pls.date_sold > ? - ` : 'TRUE'} - THEN 1 ELSE 0 END as needs_update + GROUP_CONCAT(DISTINCT pci.cat_id) as category_ids FROM products p LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0 LEFT JOIN supplier_item_data sid ON p.pid = sid.pid @@ -347,6 +342,15 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate LEFT JOIN current_inventory ci ON p.pid = ci.pid LEFT JOIN product_current_prices pcp ON p.pid = pcp.pid AND pcp.active = 1 LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid + WHERE ${incrementalUpdate ? ` + p.stamp > ? OR + ci.stamp > ? OR + pcp.date_deactive > ? OR + pcp.date_active > ? OR + sid.stamp > ? OR + pnb.date_updated > ? OR + pls.date_sold > ? + ` : 'TRUE'} GROUP BY p.pid `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []); @@ -521,67 +525,16 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate async function importMissingProducts(prodConnection, localConnection, missingPids) { try { - // Setup temporary tables - await setupTemporaryTables(localConnection); - - // Get inventory data from production first - const [prodInventory] = await prodConnection.query(` - SELECT - p.pid, - COALESCE(si.available_local, 0) - COALESCE(ps.pending_qty, 0) as stock_quantity, - COALESCE(ps.pending_qty, 0) as pending_qty, - COALESCE(ci.onpreorder, 0) as preorder_count, - COALESCE(pnb.inventory, 0) as notions_inv_count - FROM products p - LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0 - LEFT JOIN current_inventory ci ON p.pid = ci.pid - LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid - LEFT JOIN ( - SELECT oi.prod_pid, - SUM(oi.qty_ordered - oi.qty_placed) as pending_qty - FROM order_items oi - JOIN _order o ON oi.order_id = o.order_id - WHERE oi.prod_pid IN (?) - AND o.date_placed != '0000-00-00 00:00:00' - AND o.date_shipped = '0000-00-00 00:00:00' - AND oi.pick_finished = 0 - AND oi.qty_back = 0 - AND o.order_status != 15 - AND o.order_status < 90 - AND oi.qty_ordered >= oi.qty_placed - AND oi.qty_ordered > 0 - GROUP BY oi.prod_pid - ) ps ON p.pid = ps.prod_pid - WHERE p.pid IN (?) - `, [missingPids, missingPids]); - - // Insert inventory data into temp table - if (prodInventory.length > 0) { - const placeholders = prodInventory.map(() => "(?, ?, ?, ?, ?)").join(","); - const values = prodInventory.flatMap(p => [ - p.pid, - p.stock_quantity, - p.pending_qty, - p.preorder_count, - p.notions_inv_count - ]); - - await localConnection.query(` - INSERT INTO temp_inventory_status VALUES ${placeholders} - `, values); - } - - // First get the column names from the table structure + // Get column names first const [columns] = await localConnection.query(` SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'products' ORDER BY ORDINAL_POSITION `); - const columnNames = columns.map((col) => col.COLUMN_NAME); - // Get the missing products from production + // Get the missing products with all their data in one optimized query const [products] = await prodConnection.query(` SELECT p.pid, @@ -591,9 +544,22 @@ async function importMissingProducts(prodConnection, localConnection, missingPid p.date_created, p.datein AS first_received, p.location, - tis.stock_quantity, - tis.preorder_count, - tis.notions_inv_count, + COALESCE(si.available_local, 0) - COALESCE( + (SELECT SUM(oi.qty_ordered - oi.qty_placed) + FROM order_items oi + JOIN _order o ON oi.order_id = o.order_id + WHERE oi.prod_pid = p.pid + AND o.date_placed != '0000-00-00 00:00:00' + AND o.date_shipped = '0000-00-00 00:00:00' + AND oi.pick_finished = 0 + AND oi.qty_back = 0 + AND o.order_status != 15 + AND o.order_status < 90 + AND oi.qty_ordered >= oi.qty_placed + AND oi.qty_ordered > 0), 0 + ) as stock_quantity, + COALESCE(ci.onpreorder, 0) as preorder_count, + COALESCE(pnb.inventory, 0) as notions_inv_count, COALESCE(pcp.price_each, 0) as price, COALESCE(p.sellingprice, 0) AS regular_price, COALESCE((SELECT ROUND(AVG(costeach), 5) @@ -610,21 +576,6 @@ async function importMissingProducts(prodConnection, localConnection, missingPid CASE WHEN s.companyname = 'Notions' THEN sid.notions_itemnumber ELSE sid.supplier_itemnumber END AS vendor_reference, sid.notions_itemnumber AS notions_reference, CONCAT('https://www.acherryontop.com/shop/product/', p.pid) AS permalink, - (SELECT CONCAT('https://sbing.com/i/products/0000/', - SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/', - p.pid, '-t-', MIN(PI.iid), '.jpg') - FROM product_images PI - WHERE PI.pid = p.pid AND PI.hidden = 0) AS image, - (SELECT CONCAT('https://sbing.com/i/products/0000/', - SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/', - p.pid, '-175x175-', MIN(PI.iid), '.jpg') - FROM product_images PI - WHERE PI.pid = p.pid AND PI.hidden = 0 AND PI.width = 175) AS image_175, - (SELECT CONCAT('https://sbing.com/i/products/0000/', - SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/', - p.pid, '-o-', MIN(PI.iid), '.jpg') - FROM product_images PI - WHERE PI.pid = p.pid AND PI.hidden = 0) AS image_full, pc1.name AS brand, pc2.name AS line, pc3.name AS subline, @@ -649,7 +600,6 @@ async function importMissingProducts(prodConnection, localConnection, missingPid pls.date_sold as date_last_sold, GROUP_CONCAT(DISTINCT CASE WHEN pc.cat_id IS NOT NULL THEN pci.cat_id END) as category_ids FROM products p - LEFT JOIN temp_inventory_status tis ON p.pid = tis.pid LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0 LEFT JOIN supplier_item_data sid ON p.pid = sid.pid LEFT JOIN suppliers s ON sid.supplier_id = s.supplierid @@ -662,16 +612,24 @@ async function importMissingProducts(prodConnection, localConnection, missingPid LEFT JOIN product_categories pc3 ON p.subline = pc3.cat_id LEFT JOIN product_categories pc4 ON p.artist = pc4.cat_id LEFT JOIN product_last_sold pls ON p.pid = pls.pid - LEFT JOIN ( - SELECT pid, MIN(price_each) as price_each - FROM product_current_prices - WHERE active = 1 - GROUP BY pid - ) pcp ON p.pid = pcp.pid + LEFT JOIN current_inventory ci ON p.pid = ci.pid + LEFT JOIN product_current_prices pcp ON p.pid = pcp.pid AND pcp.active = 1 + LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid WHERE p.pid IN (?) GROUP BY p.pid `, [missingPids]); + // Add image URLs + products.forEach(product => { + const urls = getImageUrls(product.pid); + product.image = urls.image; + product.image_175 = urls.image_175; + product.image_full = urls.image_full; + }); + + let recordsAdded = 0; + let recordsUpdated = 0; + if (products.length > 0) { // Map values in the same order as columns const productValues = products.flatMap(product => @@ -699,21 +657,13 @@ async function importMissingProducts(prodConnection, localConnection, missingPid `; const result = await localConnection.query(query, productValues); - recordsAdded += result.affectedRows - result.changedRows; - recordsUpdated += result.changedRows; - - // Verify products were inserted before proceeding with categories - const [insertedProducts] = await localConnection.query( - "SELECT pid FROM products WHERE pid IN (?)", - [products.map(p => p.pid)] - ); - const insertedPids = new Set(insertedProducts.map(p => p.pid)); + recordsAdded = result.affectedRows - result.changedRows; + recordsUpdated = result.changedRows; // Handle category relationships if any const categoryRelationships = []; products.forEach(product => { - // Only add category relationships for products that were successfully inserted - if (insertedPids.has(product.pid) && product.category_ids) { + if (product.category_ids) { const catIds = product.category_ids .split(",") .map(id => id.trim()) @@ -744,10 +694,8 @@ async function importMissingProducts(prodConnection, localConnection, missingPid .map(() => "(?, ?)") .join(","); await localConnection.query( - ` - INSERT IGNORE INTO product_categories (cat_id, pid) - VALUES ${catPlaceholders} - `, + `INSERT IGNORE INTO product_categories (cat_id, pid) + VALUES ${catPlaceholders}`, validRelationships.flat() ); } @@ -758,15 +706,10 @@ async function importMissingProducts(prodConnection, localConnection, missingPid status: "complete", totalImported: products.length, recordsAdded, - recordsUpdated, - incrementalUpdate: true, - lastSyncTime + recordsUpdated }; } catch (error) { throw error; - } finally { - // Cleanup temporary tables - await cleanupTemporaryTables(localConnection); } }