From d57239c40c9231016f3b491d95a6b6181dc06df3 Mon Sep 17 00:00:00 2001 From: Matt Date: Fri, 31 Jan 2025 16:01:21 -0500 Subject: [PATCH] Finish up import script incremental and reliability updates --- inventory-server/db/schema.sql | 3 +- inventory-server/scripts/import/orders.js | 241 +++++++++++++----- inventory-server/scripts/import/products.js | 183 ++++++++----- .../scripts/import/purchase-orders.js | 105 +++++++- .../scripts/import/purchase_orders.js | 82 ++++++ 5 files changed, 462 insertions(+), 152 deletions(-) create mode 100644 inventory-server/scripts/import/purchase_orders.js diff --git a/inventory-server/db/schema.sql b/inventory-server/db/schema.sql index 9b96a70..372dfb6 100644 --- a/inventory-server/db/schema.sql +++ b/inventory-server/db/schema.sql @@ -52,7 +52,7 @@ CREATE TABLE products ( notifies INT UNSIGNED DEFAULT 0, date_last_sold DATE, PRIMARY KEY (pid), - UNIQUE KEY unique_sku (SKU), + INDEX idx_sku (SKU), INDEX idx_vendor (vendor), INDEX idx_brand (brand), INDEX idx_location (location), @@ -148,7 +148,6 @@ CREATE TABLE purchase_orders ( received_by INT, receiving_history JSON COMMENT 'Array of receiving records with qty, date, cost, receiving_id, and alt_po flag', FOREIGN KEY (pid) REFERENCES products(pid), - FOREIGN KEY (sku) REFERENCES products(SKU), INDEX idx_po_id (po_id), INDEX idx_vendor (vendor), INDEX idx_status (status), diff --git a/inventory-server/scripts/import/orders.js b/inventory-server/scripts/import/orders.js index a976fa1..1ba7d93 100644 --- a/inventory-server/scripts/import/orders.js +++ b/inventory-server/scripts/import/orders.js @@ -21,6 +21,46 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = let recordsUpdated = 0; try { + // Insert temporary table creation queries + await localConnection.query(` + CREATE TABLE IF NOT EXISTS temp_order_items ( + order_id INT UNSIGNED NOT NULL, + pid INT UNSIGNED NOT NULL, + SKU VARCHAR(50) NOT NULL, + price DECIMAL(10,2) NOT NULL, + quantity INT NOT NULL, + base_discount DECIMAL(10,2) DEFAULT 0, + PRIMARY KEY (order_id, pid) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8; + `); + await localConnection.query(` + CREATE TABLE IF NOT EXISTS temp_order_meta ( + order_id INT UNSIGNED NOT NULL, + date DATE NOT NULL, + customer VARCHAR(100) NOT NULL, + customer_name VARCHAR(150) NOT NULL, + status INT, + canceled TINYINT(1), + PRIMARY KEY (order_id) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8; + `); + await localConnection.query(` + CREATE TABLE IF NOT EXISTS temp_order_discounts ( + order_id INT UNSIGNED NOT NULL, + pid INT UNSIGNED NOT NULL, + discount DECIMAL(10,2) NOT NULL, + PRIMARY KEY (order_id, pid) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8; + `); + await localConnection.query(` + CREATE TABLE IF NOT EXISTS temp_order_taxes ( + order_id INT UNSIGNED NOT NULL, + pid INT UNSIGNED NOT NULL, + tax DECIMAL(10,2) NOT NULL, + PRIMARY KEY (order_id, pid) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8; + `); + // Get column names from the local table const [columns] = await localConnection.query(` SELECT COLUMN_NAME @@ -36,52 +76,11 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = ); const lastSyncTime = syncInfo?.[0]?.last_sync_timestamp || '1970-01-01'; - // Create temporary tables for staging data - await localConnection.query(` - CREATE TEMPORARY TABLE temp_order_items ( - order_id INT UNSIGNED, - pid INT UNSIGNED, - SKU VARCHAR(50), - price DECIMAL(10,3), - quantity INT, - base_discount DECIMAL(10,3), - PRIMARY KEY (order_id, pid) - ) ENGINE=InnoDB; + console.log('Orders: Using last sync time:', lastSyncTime); - CREATE TEMPORARY TABLE temp_order_meta ( - order_id INT UNSIGNED PRIMARY KEY, - date DATE, - customer INT UNSIGNED, - customer_name VARCHAR(100), - status TINYINT UNSIGNED, - canceled TINYINT UNSIGNED - ) ENGINE=InnoDB; - - CREATE TEMPORARY TABLE temp_order_discounts ( - order_id INT UNSIGNED, - pid INT UNSIGNED, - discount DECIMAL(10,3), - PRIMARY KEY (order_id, pid) - ) ENGINE=InnoDB; - - CREATE TEMPORARY TABLE temp_order_taxes ( - order_id INT UNSIGNED, - pid INT UNSIGNED, - tax DECIMAL(10,3), - PRIMARY KEY (order_id, pid) - ) ENGINE=InnoDB; - `); - - // Get base order items first - console.log('Last sync time:', lastSyncTime); - const [orderItems] = await prodConnection.query(` - SELECT - oi.order_id, - oi.prod_pid as pid, - oi.prod_itemnumber as SKU, - oi.prod_price as price, - oi.qty_ordered as quantity, - COALESCE(oi.prod_price_reg - oi.prod_price, 0) * oi.qty_ordered as base_discount + // First get all relevant order items with basic info + const [[{ total }]] = await prodConnection.query(` + SELECT COUNT(*) as total FROM order_items oi USE INDEX (PRIMARY) JOIN _order o ON oi.order_id = o.order_id @@ -92,11 +91,61 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = AND ( o.stamp > ? OR oi.stamp > ? + OR EXISTS ( + SELECT 1 FROM order_discount_items odi + WHERE odi.order_id = o.order_id + AND odi.pid = oi.prod_pid + ) + OR EXISTS ( + SELECT 1 FROM order_tax_info oti + JOIN order_tax_info_products otip ON oti.taxinfo_id = otip.taxinfo_id + WHERE oti.order_id = o.order_id + AND otip.pid = oi.prod_pid + AND oti.stamp > ? + ) ) ` : ''} - `, incrementalUpdate ? [lastSyncTime, lastSyncTime] : []); + `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []); - console.log('Found', orderItems.length, 'orders to process'); + console.log('Orders: Found changes:', total); + + // Get order items in batches + const [orderItems] = await prodConnection.query(` + SELECT + oi.order_id, + oi.prod_pid as pid, + oi.prod_itemnumber as SKU, + oi.prod_price as price, + oi.qty_ordered as quantity, + COALESCE(oi.prod_price_reg - oi.prod_price, 0) * oi.qty_ordered as base_discount, + oi.stamp as last_modified + FROM order_items oi + USE INDEX (PRIMARY) + JOIN _order o ON oi.order_id = o.order_id + WHERE o.order_status >= 15 + AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) + AND o.date_placed_onlydate IS NOT NULL + ${incrementalUpdate ? ` + AND ( + o.stamp > ? + OR oi.stamp > ? + OR EXISTS ( + SELECT 1 FROM order_discount_items odi + WHERE odi.order_id = o.order_id + AND odi.pid = oi.prod_pid + ) + OR EXISTS ( + SELECT 1 FROM order_tax_info oti + JOIN order_tax_info_products otip ON oti.taxinfo_id = otip.taxinfo_id + WHERE oti.order_id = o.order_id + AND otip.pid = oi.prod_pid + AND oti.stamp > ? + ) + ) + ` : ''} + `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []); + + console.log('Orders: Processing', orderItems.length, 'order items'); const totalOrders = orderItems.length; let processed = 0; @@ -280,30 +329,82 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = const singlePlaceholder = `(${columnNames.map(() => "?").join(",")})`; const placeholders = Array(validOrders.length).fill(singlePlaceholder).join(","); - const query = ` - INSERT INTO orders (${columnNames.join(",")}) - VALUES ${placeholders} - ON DUPLICATE KEY UPDATE - SKU = VALUES(SKU), - date = VALUES(date), - price = VALUES(price), - quantity = VALUES(quantity), - discount = VALUES(discount), - tax = VALUES(tax), - tax_included = VALUES(tax_included), - shipping = VALUES(shipping), - customer = VALUES(customer), - customer_name = VALUES(customer_name), - status = VALUES(status), - canceled = VALUES(canceled) - `; + // First check which orders exist and get their current values + const [existingOrders] = await localConnection.query( + `SELECT ${columnNames.join(',')} FROM orders WHERE (order_number, pid) IN (${validOrders.map(() => "(?,?)").join(",")})`, + validOrders.flatMap(o => [o.order_number, o.pid]) + ); + const existingOrderMap = new Map( + existingOrders.map(o => [`${o.order_number}-${o.pid}`, o]) + ); - const result = await localConnection.query(query, values); - // For INSERT ... ON DUPLICATE KEY UPDATE: - // - affectedRows is 1 for each inserted row and 2 for each updated row - // - changedRows is 1 for each row that was actually changed during update - recordsAdded += result[0].affectedRows - (2 * result[0].changedRows); // New rows - recordsUpdated += result[0].changedRows; // Actually changed rows + // Split into inserts and updates + const insertsAndUpdates = validOrders.reduce((acc, order) => { + const key = `${order.order_number}-${order.pid}`; + if (existingOrderMap.has(key)) { + const existing = existingOrderMap.get(key); + // Check if any values are different + const hasChanges = columnNames.some(col => { + const newVal = order[col] ?? null; + const oldVal = existing[col] ?? null; + // Special handling for numbers to avoid type coercion issues + if (typeof newVal === 'number' && typeof oldVal === 'number') { + return Math.abs(newVal - oldVal) > 0.00001; // Allow for tiny floating point differences + } + return newVal !== oldVal; + }); + + if (hasChanges) { + acc.updates.push({ + order_number: order.order_number, + pid: order.pid, + values: columnNames.map(col => order[col] ?? null) + }); + } else { + acc.inserts.push({ + order_number: order.order_number, + pid: order.pid, + values: columnNames.map(col => order[col] ?? null) + }); + } + return acc; + + // Handle inserts + if (insertsAndUpdates.inserts.length > 0) { + const insertPlaceholders = Array(insertsAndUpdates.inserts.length).fill(singlePlaceholder).join(","); + + const insertResult = await localConnection.query(` + INSERT INTO orders (${columnNames.join(",")}) + VALUES ${insertPlaceholders} + `, insertsAndUpdates.inserts.map(i => i.values).flat()); + + recordsAdded += insertResult[0].affectedRows; + } + + // Handle updates - now we know these actually have changes + if (insertsAndUpdates.updates.length > 0) { + const updatePlaceholders = Array(insertsAndUpdates.updates.length).fill(singlePlaceholder).join(","); + + const updateResult = await localConnection.query(` + INSERT INTO orders (${columnNames.join(",")}) + VALUES ${updatePlaceholders} + ON DUPLICATE KEY UPDATE + SKU = VALUES(SKU), + date = VALUES(date), + price = VALUES(price), + quantity = VALUES(quantity), + discount = VALUES(discount), + tax = VALUES(tax), + tax_included = VALUES(tax_included), + shipping = VALUES(shipping), + customer = VALUES(customer), + customer_name = VALUES(customer_name), + status = VALUES(status), + canceled = VALUES(canceled) + `, insertsAndUpdates.updates.map(u => u.values).flat()); + + recordsUpdated += updateResult[0].affectedRows / 2; // Each update counts as 2 in affectedRows + } importedCount += validOrders.length; } diff --git a/inventory-server/scripts/import/products.js b/inventory-server/scripts/import/products.js index ad17c18..ce43418 100644 --- a/inventory-server/scripts/import/products.js +++ b/inventory-server/scripts/import/products.js @@ -13,40 +13,12 @@ const getImageUrls = (pid) => { }; async function setupTemporaryTables(connection) { - await connection.query(` - CREATE TEMPORARY TABLE IF NOT EXISTS temp_categories ( - cat_id INT PRIMARY KEY, - name VARCHAR(255) - ) ENGINE=InnoDB; - - CREATE TEMPORARY TABLE IF NOT EXISTS temp_product_images ( - pid INT, - iid INT, - image_type ENUM('thumbnail', '175', 'full'), - url VARCHAR(255), - PRIMARY KEY (pid, image_type) - ) ENGINE=InnoDB; - - CREATE TEMPORARY TABLE IF NOT EXISTS temp_inventory_status ( - pid INT PRIMARY KEY, - stock_quantity INT, - pending_qty INT, - preorder_count INT, - notions_inv_count INT - ) ENGINE=InnoDB; - - CREATE TEMPORARY TABLE IF NOT EXISTS temp_product_prices ( - pid INT PRIMARY KEY, - price DECIMAL(10,2), - regular_price DECIMAL(10,2), - cost_price DECIMAL(10,5) - ) ENGINE=InnoDB; - - INSERT INTO temp_categories - SELECT cat_id, name FROM categories; - - CREATE INDEX idx_temp_cat_id ON temp_categories(cat_id); - `); + await connection.query(`CREATE TEMPORARY TABLE IF NOT EXISTS temp_categories ( cat_id INT PRIMARY KEY, name VARCHAR(255) ) ENGINE=InnoDB;`); + await connection.query(`CREATE TEMPORARY TABLE IF NOT EXISTS temp_product_images ( pid INT, iid INT, image_type ENUM('thumbnail', '175', 'full'), url VARCHAR(255), PRIMARY KEY (pid, image_type) ) ENGINE=InnoDB;`); + await connection.query(`CREATE TEMPORARY TABLE IF NOT EXISTS temp_inventory_status ( pid INT PRIMARY KEY, stock_quantity INT, pending_qty INT, preorder_count INT, notions_inv_count INT, needs_update BOOLEAN ) ENGINE=InnoDB;`); + await connection.query(`CREATE TEMPORARY TABLE IF NOT EXISTS temp_product_prices ( pid INT PRIMARY KEY, price DECIMAL(10,2), regular_price DECIMAL(10,2), cost_price DECIMAL(10,5), needs_update BOOLEAN ) ENGINE=InnoDB;`); + await connection.query(`INSERT INTO temp_categories SELECT cat_id, name FROM categories;`); + await connection.query(`CREATE INDEX idx_temp_cat_id ON temp_categories(cat_id);`); } async function cleanupTemporaryTables(connection) { @@ -108,18 +80,20 @@ async function materializeCalculations(prodConnection, localConnection) { Math.max(0, row.stock_quantity - row.pending_qty), // Calculate final stock quantity row.pending_qty, row.preorder_count, - row.notions_inv_count + row.notions_inv_count, + true // Mark as needing update ]); if (values.length > 0) { await localConnection.query(` - INSERT INTO temp_inventory_status (pid, stock_quantity, pending_qty, preorder_count, notions_inv_count) + INSERT INTO temp_inventory_status (pid, stock_quantity, pending_qty, preorder_count, notions_inv_count, needs_update) VALUES ? ON DUPLICATE KEY UPDATE stock_quantity = VALUES(stock_quantity), pending_qty = VALUES(pending_qty), preorder_count = VALUES(preorder_count), - notions_inv_count = VALUES(notions_inv_count) + notions_inv_count = VALUES(notions_inv_count), + needs_update = TRUE `, [values]); } @@ -168,17 +142,19 @@ async function materializeCalculations(prodConnection, localConnection) { row.pid, row.price, row.regular_price, - row.cost_price + row.cost_price, + true // Mark as needing update ]); if (values.length > 0) { await localConnection.query(` - INSERT INTO temp_product_prices (pid, price, regular_price, cost_price) + INSERT INTO temp_product_prices (pid, price, regular_price, cost_price, needs_update) VALUES ? ON DUPLICATE KEY UPDATE price = VALUES(price), regular_price = VALUES(regular_price), - cost_price = VALUES(cost_price) + cost_price = VALUES(cost_price), + needs_update = TRUE `, [values]); } @@ -218,6 +194,8 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate "SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'products'" ); const lastSyncTime = syncInfo?.[0]?.last_sync_timestamp || '1970-01-01'; + + console.log('Products: Using last sync time:', lastSyncTime); // Setup temporary tables await setupTemporaryTables(localConnection); @@ -245,6 +223,8 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate ` : 'TRUE'} `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []); + console.log('Products: Found changes:', countResult[0].total); + const totalProducts = countResult[0].total; // Main product query using materialized data - modified for incremental @@ -415,10 +395,16 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate let recordsAdded = 0; let recordsUpdated = 0; - // Get actual count from temp table - const [[{ actualTotal }]] = await localConnection.query( - "SELECT COUNT(*) as actualTotal FROM temp_prod_data WHERE needs_update = 1" - ); + // Get actual count from temp table - only count products that need updates + const [[{ actualTotal }]] = await localConnection.query(` + SELECT COUNT(DISTINCT p.pid) as actualTotal + FROM temp_prod_data p + LEFT JOIN temp_inventory_status tis ON p.pid = tis.pid + LEFT JOIN temp_product_prices tpp ON p.pid = tpp.pid + WHERE p.needs_update = 1 + OR tis.needs_update = 1 + OR tpp.needs_update = 1 + `); while (processed < actualTotal) { const [batch] = await localConnection.query(` @@ -433,7 +419,9 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate FROM temp_prod_data p LEFT JOIN temp_inventory_status tis ON p.pid = tis.pid LEFT JOIN temp_product_prices tpp ON p.pid = tpp.pid - WHERE p.needs_update = 1 + WHERE p.needs_update = 1 + OR tis.needs_update = 1 + OR tpp.needs_update = 1 LIMIT ? OFFSET ? `, [BATCH_SIZE, processed]); @@ -447,34 +435,93 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate row.image_full = urls.image_full; }); - // Prepare product values - now using columnNames from above - const productValues = batch.flatMap(row => - columnNames.map(col => { - const val = row[col] ?? null; + if (batch.length > 0) { + // MySQL 8.0 optimized insert with proper placeholders + const placeholderGroup = `(${Array(columnNames.length).fill("?").join(",")})`; + + // First check which products already exist and get their current values + const [existingProducts] = await localConnection.query( + `SELECT ${columnNames.join(',')} FROM products WHERE pid IN (?)`, + [batch.map(p => p.pid)] + ); + const existingPidsMap = new Map(existingProducts.map(p => [p.pid, p])); + + // Helper function to map values consistently + const mapValues = (product) => columnNames.map(col => { + const val = product[col] ?? null; if (col === "managing_stock") return 1; if (typeof val === "number") return val || 0; return val; - }) - ); + }); - if (productValues.length > 0) { - // MySQL 8.0 optimized insert with proper placeholders - const placeholderGroup = `(${Array(columnNames.length).fill("?").join(",")})`; - const productPlaceholders = Array(batch.length).fill(placeholderGroup).join(","); - - const insertQuery = ` - INSERT INTO products (${columnNames.join(",")}) - VALUES ${productPlaceholders} - ON DUPLICATE KEY UPDATE - ${columnNames - .filter(col => col !== "pid") - .map(col => `${col} = VALUES(${col})`) - .join(",")}; - `; + // Split into inserts and updates, comparing values for updates + const insertsAndUpdates = batch.reduce((acc, product) => { + if (existingPidsMap.has(product.pid)) { + const existing = existingPidsMap.get(product.pid); + // Check if any values are different + const hasChanges = columnNames.some(col => { + const newVal = product[col] ?? null; + const oldVal = existing[col] ?? null; + // Special handling for numbers to avoid type coercion issues + if (typeof newVal === 'number' && typeof oldVal === 'number') { + // Handle NaN and Infinity + if (isNaN(newVal) || isNaN(oldVal)) return isNaN(newVal) !== isNaN(oldVal); + if (!isFinite(newVal) || !isFinite(oldVal)) return !isFinite(newVal) !== !isFinite(oldVal); + // Allow for tiny floating point differences + return Math.abs(newVal - oldVal) > 0.00001; + } + if (col === 'managing_stock') return false; // Skip this as it's always 1 + return newVal !== oldVal; + }); - const result = await localConnection.query(insertQuery, productValues); - recordsAdded += result.affectedRows - (2 * result.changedRows); // New rows - recordsUpdated += result.changedRows; // Actually changed rows + if (hasChanges) { + acc.updates.push({ + pid: product.pid, + values: mapValues(product) + }); + } + } else { + acc.inserts.push({ + pid: product.pid, + values: mapValues(product) + }); + } + return acc; + }, { inserts: [], updates: [] }); + + // Log summary for this batch + if (insertsAndUpdates.inserts.length > 0 || insertsAndUpdates.updates.length > 0) { + console.log(`Batch summary: ${insertsAndUpdates.inserts.length} new products, ${insertsAndUpdates.updates.length} updates`); + } + + // Handle inserts + if (insertsAndUpdates.inserts.length > 0) { + const insertPlaceholders = Array(insertsAndUpdates.inserts.length).fill(placeholderGroup).join(","); + + const insertResult = await localConnection.query(` + INSERT INTO products (${columnNames.join(",")}) + VALUES ${insertPlaceholders} + `, insertsAndUpdates.inserts.map(i => i.values).flat()); + + recordsAdded += insertResult[0].affectedRows; + } + + // Handle updates - now we know these actually have changes + if (insertsAndUpdates.updates.length > 0) { + const updatePlaceholders = Array(insertsAndUpdates.updates.length).fill(placeholderGroup).join(","); + + const updateResult = await localConnection.query(` + INSERT INTO products (${columnNames.join(",")}) + VALUES ${updatePlaceholders} + ON DUPLICATE KEY UPDATE + ${columnNames + .filter(col => col !== "pid") + .map(col => `${col} = VALUES(${col})`) + .join(",")}; + `, insertsAndUpdates.updates.map(u => u.values).flat()); + + recordsUpdated += insertsAndUpdates.updates.length; + } } // Insert category relationships diff --git a/inventory-server/scripts/import/purchase-orders.js b/inventory-server/scripts/import/purchase-orders.js index d2da638..b492e9e 100644 --- a/inventory-server/scripts/import/purchase-orders.js +++ b/inventory-server/scripts/import/purchase-orders.js @@ -12,6 +12,22 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental ); const lastSyncTime = syncInfo?.[0]?.last_sync_timestamp || '1970-01-01'; + console.log('Purchase Orders: Using last sync time:', lastSyncTime); + + // Insert temporary table creation query for purchase orders + await localConnection.query(` + CREATE TABLE IF NOT EXISTS temp_purchase_orders ( + po_id INT UNSIGNED NOT NULL, + pid INT UNSIGNED NOT NULL, + vendor VARCHAR(255), + date DATE, + expected_date DATE, + status INT, + notes TEXT, + PRIMARY KEY (po_id, pid) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8; + `); + outputProgress({ operation: `Starting ${incrementalUpdate ? 'incremental' : 'full'} purchase orders import`, status: "running", @@ -82,6 +98,8 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime // Receiving conditions ] : []); + console.log('Purchase Orders: Found changes:', total); + const [poList] = await prodConnection.query(` SELECT DISTINCT COALESCE(p.po_id, r.receiving_id) as po_id, @@ -221,6 +239,22 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental const values = []; let batchProcessed = 0; + // First check which PO lines already exist and get their current values + const poLines = Array.from(poProductMap.values()) + .filter(p => validPids.has(p.pid)) + .map(p => [p.po_id, p.pid]); + + const [existingPOs] = await localConnection.query( + `SELECT ${columnNames.join(',')} FROM purchase_orders WHERE (po_id, pid) IN (${poLines.map(() => "(?,?)").join(",")})`, + poLines.flat() + ); + const existingPOMap = new Map( + existingPOs.map(po => [`${po.po_id}-${po.pid}`, po]) + ); + + // Split into inserts and updates + const insertsAndUpdates = { inserts: [], updates: [] }; + for (const po of batch) { const poProducts = Array.from(poProductMap.values()) .filter(p => p.po_id === po.po_id && validPids.has(p.pid)); @@ -280,7 +314,7 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental const firstReceiving = allReceivings[0] || {}; const lastReceiving = allReceivings[allReceivings.length - 1] || {}; - values.push(columnNames.map(col => { + const rowValues = columnNames.map(col => { switch (col) { case 'po_id': return po.po_id; case 'vendor': return po.vendor; @@ -309,28 +343,75 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental }); default: return null; } - })); + }); + + if (existingPOMap.has(key)) { + const existing = existingPOMap.get(key); + // Check if any values are different + const hasChanges = columnNames.some(col => { + const newVal = rowValues[columnNames.indexOf(col)]; + const oldVal = existing[col] ?? null; + // Special handling for numbers to avoid type coercion issues + if (typeof newVal === 'number' && typeof oldVal === 'number') { + return Math.abs(newVal - oldVal) > 0.00001; // Allow for tiny floating point differences + } + // Special handling for receiving_history - parse and compare + if (col === 'receiving_history') { + const newHistory = JSON.parse(newVal || '{}'); + const oldHistory = JSON.parse(oldVal || '{}'); + return JSON.stringify(newHistory) !== JSON.stringify(oldHistory); + } + return newVal !== oldVal; + }); + + if (hasChanges) { + insertsAndUpdates.updates.push({ + po_id: po.po_id, + pid: product.pid, + values: rowValues + }); + } + } else { + insertsAndUpdates.inserts.push({ + po_id: po.po_id, + pid: product.pid, + values: rowValues + }); + } batchProcessed++; } } - if (values.length > 0) { - const placeholders = values.map(() => - `(${Array(columnNames.length).fill("?").join(",")})` - ).join(","); + // Handle inserts + if (insertsAndUpdates.inserts.length > 0) { + const insertPlaceholders = insertsAndUpdates.inserts + .map(() => `(${Array(columnNames.length).fill("?").join(",")})`) + .join(","); - const query = ` + const insertResult = await localConnection.query(` INSERT INTO purchase_orders (${columnNames.join(",")}) - VALUES ${placeholders} + VALUES ${insertPlaceholders} + `, insertsAndUpdates.inserts.map(i => i.values).flat()); + + recordsAdded += insertResult[0].affectedRows; + } + + // Handle updates - now we know these actually have changes + if (insertsAndUpdates.updates.length > 0) { + const updatePlaceholders = insertsAndUpdates.updates + .map(() => `(${Array(columnNames.length).fill("?").join(",")})`) + .join(","); + + const updateResult = await localConnection.query(` + INSERT INTO purchase_orders (${columnNames.join(",")}) + VALUES ${updatePlaceholders} ON DUPLICATE KEY UPDATE ${columnNames .filter((col) => col !== "po_id" && col !== "pid") .map((col) => `${col} = VALUES(${col})`) .join(",")}; - `; + `, insertsAndUpdates.updates.map(u => u.values).flat()); - const result = await localConnection.query(query, values.flat()); - recordsAdded += result.affectedRows - (2 * result.changedRows); - recordsUpdated += result.changedRows; + recordsUpdated += updateResult[0].affectedRows / 2; // Each update counts as 2 in affectedRows } processed += batchProcessed; diff --git a/inventory-server/scripts/import/purchase_orders.js b/inventory-server/scripts/import/purchase_orders.js new file mode 100644 index 0000000..c127c87 --- /dev/null +++ b/inventory-server/scripts/import/purchase_orders.js @@ -0,0 +1,82 @@ +// Split into inserts and updates +const insertsAndUpdates = batch.reduce((acc, po) => { + const key = `${po.po_id}-${po.pid}`; + if (existingPOMap.has(key)) { + const existing = existingPOMap.get(key); + // Check if any values are different + const hasChanges = columnNames.some(col => { + const newVal = po[col] ?? null; + const oldVal = existing[col] ?? null; + // Special handling for numbers to avoid type coercion issues + if (typeof newVal === 'number' && typeof oldVal === 'number') { + return Math.abs(newVal - oldVal) > 0.00001; // Allow for tiny floating point differences + } + // Special handling for receiving_history JSON + if (col === 'receiving_history') { + return JSON.stringify(newVal) !== JSON.stringify(oldVal); + } + return newVal !== oldVal; + }); + + if (hasChanges) { + console.log(`PO line changed: ${key}`, { + po_id: po.po_id, + pid: po.pid, + changes: columnNames.filter(col => { + const newVal = po[col] ?? null; + const oldVal = existing[col] ?? null; + if (typeof newVal === 'number' && typeof oldVal === 'number') { + return Math.abs(newVal - oldVal) > 0.00001; + } + if (col === 'receiving_history') { + return JSON.stringify(newVal) !== JSON.stringify(oldVal); + } + return newVal !== oldVal; + }) + }); + acc.updates.push({ + po_id: po.po_id, + pid: po.pid, + values: columnNames.map(col => po[col] ?? null) + }); + } + } else { + console.log(`New PO line: ${key}`); + acc.inserts.push({ + po_id: po.po_id, + pid: po.pid, + values: columnNames.map(col => po[col] ?? null) + }); + } + return acc; +}, { inserts: [], updates: [] }); + +// Handle inserts +if (insertsAndUpdates.inserts.length > 0) { + const insertPlaceholders = Array(insertsAndUpdates.inserts.length).fill(placeholderGroup).join(","); + + const insertResult = await localConnection.query(` + INSERT INTO purchase_orders (${columnNames.join(",")}) + VALUES ${insertPlaceholders} + `, insertsAndUpdates.inserts.map(i => i.values).flat()); + + recordsAdded += insertResult[0].affectedRows; +} + +// Handle updates +if (insertsAndUpdates.updates.length > 0) { + const updatePlaceholders = Array(insertsAndUpdates.updates.length).fill(placeholderGroup).join(","); + + const updateResult = await localConnection.query(` + INSERT INTO purchase_orders (${columnNames.join(",")}) + VALUES ${updatePlaceholders} + ON DUPLICATE KEY UPDATE + ${columnNames + .filter(col => col !== "po_id" && col !== "pid") + .map(col => `${col} = VALUES(${col})`) + .join(",")}; + `, insertsAndUpdates.updates.map(u => u.values).flat()); + + // Each update affects 2 rows in affectedRows, so we divide by 2 to get actual count + recordsUpdated += insertsAndUpdates.updates.length; +} \ No newline at end of file