From a8d3fd8033c35c4561f21293603b086ba711c8fb Mon Sep 17 00:00:00 2001 From: Matt Date: Mon, 17 Feb 2025 00:53:07 -0500 Subject: [PATCH] Update import scripts through orders --- inventory-server/scripts/import-from-prod.js | 22 +- inventory-server/scripts/import/orders.js | 545 ++++++++++-------- inventory-server/scripts/import/products.js | 2 +- .../scripts/import/purchase-orders.js | 4 +- 4 files changed, 307 insertions(+), 266 deletions(-) diff --git a/inventory-server/scripts/import-from-prod.js b/inventory-server/scripts/import-from-prod.js index 1f08c05..8ecfb0d 100644 --- a/inventory-server/scripts/import-from-prod.js +++ b/inventory-server/scripts/import-from-prod.js @@ -10,10 +10,10 @@ const importPurchaseOrders = require('./import/purchase-orders'); dotenv.config({ path: path.join(__dirname, "../.env") }); // Constants to control which imports run -const IMPORT_CATEGORIES = false; -const IMPORT_PRODUCTS = false; +const IMPORT_CATEGORIES = true; +const IMPORT_PRODUCTS = true; const IMPORT_ORDERS = true; -const IMPORT_PURCHASE_ORDERS = false; +const IMPORT_PURCHASE_ORDERS = true; // Add flag for incremental updates const INCREMENTAL_UPDATE = process.env.INCREMENTAL_UPDATE !== 'false'; // Default to true unless explicitly set to false @@ -158,8 +158,8 @@ async function main() { if (isImportCancelled) throw new Error("Import cancelled"); completedSteps++; console.log('Categories import result:', results.categories); - totalRecordsAdded += results.categories?.recordsAdded || 0; - totalRecordsUpdated += results.categories?.recordsUpdated || 0; + totalRecordsAdded += parseInt(results.categories?.recordsAdded || 0); + totalRecordsUpdated += parseInt(results.categories?.recordsUpdated || 0); } if (IMPORT_PRODUCTS) { @@ -167,8 +167,8 @@ async function main() { if (isImportCancelled) throw new Error("Import cancelled"); completedSteps++; console.log('Products import result:', results.products); - totalRecordsAdded += results.products?.recordsAdded || 0; - totalRecordsUpdated += results.products?.recordsUpdated || 0; + totalRecordsAdded += parseInt(results.products?.recordsAdded || 0); + totalRecordsUpdated += parseInt(results.products?.recordsUpdated || 0); } if (IMPORT_ORDERS) { @@ -176,8 +176,8 @@ async function main() { if (isImportCancelled) throw new Error("Import cancelled"); completedSteps++; console.log('Orders import result:', results.orders); - totalRecordsAdded += results.orders?.recordsAdded || 0; - totalRecordsUpdated += results.orders?.recordsUpdated || 0; + totalRecordsAdded += parseInt(results.orders?.recordsAdded || 0); + totalRecordsUpdated += parseInt(results.orders?.recordsUpdated || 0); } if (IMPORT_PURCHASE_ORDERS) { @@ -185,8 +185,8 @@ async function main() { if (isImportCancelled) throw new Error("Import cancelled"); completedSteps++; console.log('Purchase orders import result:', results.purchaseOrders); - totalRecordsAdded += results.purchaseOrders?.recordsAdded || 0; - totalRecordsUpdated += results.purchaseOrders?.recordsUpdated || 0; + totalRecordsAdded += parseInt(results.purchaseOrders?.recordsAdded || 0); + totalRecordsUpdated += parseInt(results.purchaseOrders?.recordsUpdated || 0); } const endTime = Date.now(); diff --git a/inventory-server/scripts/import/orders.js b/inventory-server/scripts/import/orders.js index 2d069e5..40c34a9 100644 --- a/inventory-server/scripts/import/orders.js +++ b/inventory-server/scripts/import/orders.js @@ -67,11 +67,12 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = console.log('Orders: Found changes:', totalOrderItems); // Get order items - Keep MySQL compatible for production + console.log('Orders: Starting MySQL query...'); const [orderItems] = await prodConnection.query(` SELECT oi.order_id, - oi.prod_pid as pid, - oi.prod_itemnumber as SKU, + oi.prod_pid, + COALESCE(NULLIF(TRIM(oi.prod_itemnumber), ''), 'NO-SKU') as SKU, oi.prod_price as price, oi.qty_ordered as quantity, COALESCE(oi.prod_price_reg - oi.prod_price, 0) as base_discount, @@ -102,17 +103,17 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = ` : ''} `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []); - console.log('Orders: Processing', orderItems.length, 'order items'); + console.log('Orders: Found', orderItems.length, 'order items to process'); - // Create temporary tables in PostgreSQL + // Create tables in PostgreSQL for debugging await localConnection.query(` - DROP TABLE IF EXISTS temp_order_items; - DROP TABLE IF EXISTS temp_order_meta; - DROP TABLE IF EXISTS temp_order_discounts; - DROP TABLE IF EXISTS temp_order_taxes; - DROP TABLE IF EXISTS temp_order_costs; + DROP TABLE IF EXISTS debug_order_items; + DROP TABLE IF EXISTS debug_order_meta; + DROP TABLE IF EXISTS debug_order_discounts; + DROP TABLE IF EXISTS debug_order_taxes; + DROP TABLE IF EXISTS debug_order_costs; - CREATE TEMP TABLE temp_order_items ( + CREATE TABLE debug_order_items ( order_id INTEGER NOT NULL, pid INTEGER NOT NULL, SKU VARCHAR(50) NOT NULL, @@ -122,7 +123,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = PRIMARY KEY (order_id, pid) ); - CREATE TEMP TABLE temp_order_meta ( + CREATE TABLE debug_order_meta ( order_id INTEGER NOT NULL, date DATE NOT NULL, customer VARCHAR(100) NOT NULL, @@ -134,21 +135,21 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = PRIMARY KEY (order_id) ); - CREATE TEMP TABLE temp_order_discounts ( + CREATE TABLE debug_order_discounts ( order_id INTEGER NOT NULL, pid INTEGER NOT NULL, discount DECIMAL(10,2) NOT NULL, PRIMARY KEY (order_id, pid) ); - CREATE TEMP TABLE temp_order_taxes ( + CREATE TABLE debug_order_taxes ( order_id INTEGER NOT NULL, pid INTEGER NOT NULL, tax DECIMAL(10,2) NOT NULL, PRIMARY KEY (order_id, pid) ); - CREATE TEMP TABLE temp_order_costs ( + CREATE TABLE debug_order_costs ( order_id INTEGER NOT NULL, pid INTEGER NOT NULL, costeach DECIMAL(10,3) DEFAULT 0.000, @@ -163,11 +164,11 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = `($${idx * 6 + 1}, $${idx * 6 + 2}, $${idx * 6 + 3}, $${idx * 6 + 4}, $${idx * 6 + 5}, $${idx * 6 + 6})` ).join(","); const values = batch.flatMap(item => [ - item.order_id, item.pid, item.SKU, item.price, item.quantity, item.base_discount + item.order_id, item.prod_pid, item.SKU, item.price, item.quantity, item.base_discount ]); await localConnection.query(` - INSERT INTO temp_order_items (order_id, pid, SKU, price, quantity, base_discount) + INSERT INTO debug_order_items (order_id, pid, SKU, price, quantity, base_discount) VALUES ${placeholders} ON CONFLICT (order_id, pid) DO UPDATE SET SKU = EXCLUDED.SKU, @@ -182,23 +183,26 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = operation: "Orders import", message: `Loading order items: ${processedCount} of ${totalOrderItems}`, current: processedCount, - total: totalOrderItems + total: totalOrderItems, + elapsed: formatElapsedTime((Date.now() - startTime) / 1000), + remaining: estimateRemaining(startTime, processedCount, totalOrderItems), + rate: calculateRate(startTime, processedCount) }); } // Get unique order IDs const orderIds = [...new Set(orderItems.map(item => item.order_id))]; totalUniqueOrders = orderIds.length; - console.log('Total unique order IDs:', totalUniqueOrders); + console.log('Orders: Processing', totalUniqueOrders, 'unique orders'); // Reset processed count for order processing phase processedCount = 0; - // Get order metadata in batches - Keep MySQL compatible for production - for (let i = 0; i < orderIds.length; i += 5000) { - const batchIds = orderIds.slice(i, i + 5000); - console.log(`Processing batch ${i/5000 + 1}, size: ${batchIds.length}`); - + // Process metadata, discounts, taxes, and costs in parallel + const METADATA_BATCH_SIZE = 2000; + const PG_BATCH_SIZE = 200; + + const processMetadataBatch = async (batchIds) => { const [orders] = await prodConnection.query(` SELECT o.order_id, @@ -214,24 +218,28 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = WHERE o.order_id IN (?) `, [batchIds]); - // Insert into PostgreSQL temp table - if (orders.length > 0) { - const placeholders = orders.map((_, idx) => + // Process in sub-batches for PostgreSQL + for (let j = 0; j < orders.length; j += PG_BATCH_SIZE) { + const subBatch = orders.slice(j, j + PG_BATCH_SIZE); + if (subBatch.length === 0) continue; + + const placeholders = subBatch.map((_, idx) => `($${idx * 8 + 1}, $${idx * 8 + 2}, $${idx * 8 + 3}, $${idx * 8 + 4}, $${idx * 8 + 5}, $${idx * 8 + 6}, $${idx * 8 + 7}, $${idx * 8 + 8})` ).join(","); - const values = orders.flatMap(order => [ + + const values = subBatch.flatMap(order => [ order.order_id, order.date, order.customer, - order.customer_name, + order.customer_name || '', order.status, order.canceled, - order.summary_discount, - order.summary_subtotal + order.summary_discount || 0, + order.summary_subtotal || 0 ]); await localConnection.query(` - INSERT INTO temp_order_meta ( + INSERT INTO debug_order_meta ( order_id, date, customer, customer_name, status, canceled, summary_discount, summary_subtotal ) @@ -246,20 +254,9 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = summary_subtotal = EXCLUDED.summary_subtotal `, values); } + }; - processedCount = i + orders.length; - outputProgress({ - status: "running", - operation: "Orders import", - message: `Loading order metadata: ${processedCount} of ${totalUniqueOrders}`, - current: processedCount, - total: totalUniqueOrders - }); - } - - // Process promotional discounts in batches - Keep MySQL compatible for production - for (let i = 0; i < orderIds.length; i += 5000) { - const batchIds = orderIds.slice(i, i + 5000); + const processDiscountsBatch = async (batchIds) => { const [discounts] = await prodConnection.query(` SELECT order_id, pid, SUM(amount) as discount FROM order_discount_items @@ -267,246 +264,290 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = GROUP BY order_id, pid `, [batchIds]); - if (discounts.length > 0) { - const uniqueDiscounts = new Map(); - discounts.forEach(d => { - const key = `${d.order_id}-${d.pid}`; - uniqueDiscounts.set(key, d); - }); + if (discounts.length === 0) return; - const values = Array.from(uniqueDiscounts.values()).flatMap(d => [d.order_id, d.pid, d.discount || 0]); - if (values.length > 0) { - const placeholders = Array.from({length: uniqueDiscounts.size}, (_, idx) => { - const base = idx * 3; - return `($${base + 1}, $${base + 2}, $${base + 3})`; - }).join(","); - await localConnection.query(` - INSERT INTO temp_order_discounts (order_id, pid, discount) - VALUES ${placeholders} - ON CONFLICT (order_id, pid) DO UPDATE SET - discount = EXCLUDED.discount - `, values); - } + for (let j = 0; j < discounts.length; j += PG_BATCH_SIZE) { + const subBatch = discounts.slice(j, j + PG_BATCH_SIZE); + if (subBatch.length === 0) continue; + + const placeholders = subBatch.map((_, idx) => + `($${idx * 3 + 1}, $${idx * 3 + 2}, $${idx * 3 + 3})` + ).join(","); + + const values = subBatch.flatMap(d => [ + d.order_id, + d.pid, + d.discount || 0 + ]); + + await localConnection.query(` + INSERT INTO debug_order_discounts (order_id, pid, discount) + VALUES ${placeholders} + ON CONFLICT (order_id, pid) DO UPDATE SET + discount = EXCLUDED.discount + `, values); } - } + }; - // Get tax information in batches - Keep MySQL compatible for production - for (let i = 0; i < orderIds.length; i += 5000) { - const batchIds = orderIds.slice(i, i + 5000); + const processTaxesBatch = async (batchIds) => { + // Optimized tax query to avoid subquery const [taxes] = await prodConnection.query(` - SELECT DISTINCT - oti.order_id, - otip.pid, - otip.item_taxes_to_collect as tax - FROM order_tax_info oti - JOIN ( - SELECT order_id, MAX(stamp) as max_stamp + SELECT oti.order_id, otip.pid, otip.item_taxes_to_collect as tax + FROM ( + SELECT order_id, MAX(taxinfo_id) as latest_taxinfo_id FROM order_tax_info WHERE order_id IN (?) GROUP BY order_id - ) latest ON oti.order_id = latest.order_id AND oti.stamp = latest.max_stamp + ) latest_info + JOIN order_tax_info oti ON oti.order_id = latest_info.order_id + AND oti.taxinfo_id = latest_info.latest_taxinfo_id JOIN order_tax_info_products otip ON oti.taxinfo_id = otip.taxinfo_id `, [batchIds]); - if (taxes.length > 0) { - const uniqueTaxes = new Map(); - taxes.forEach(t => { - const key = `${t.order_id}-${t.pid}`; - uniqueTaxes.set(key, t); - }); + if (taxes.length === 0) return; - const values = Array.from(uniqueTaxes.values()).flatMap(t => [t.order_id, t.pid, t.tax]); - if (values.length > 0) { - const placeholders = Array.from({length: uniqueTaxes.size}, (_, idx) => { - const base = idx * 3; - return `($${base + 1}, $${base + 2}, $${base + 3})`; - }).join(","); - await localConnection.query(` - INSERT INTO temp_order_taxes (order_id, pid, tax) - VALUES ${placeholders} - ON CONFLICT (order_id, pid) DO UPDATE SET - tax = EXCLUDED.tax - `, values); - } - } - } + for (let j = 0; j < taxes.length; j += PG_BATCH_SIZE) { + const subBatch = taxes.slice(j, j + PG_BATCH_SIZE); + if (subBatch.length === 0) continue; - // Get costeach values in batches - Keep MySQL compatible for production - for (let i = 0; i < orderIds.length; i += 5000) { - const batchIds = orderIds.slice(i, i + 5000); - const [costs] = await prodConnection.query(` - SELECT - oc.orderid as order_id, - oc.pid, - COALESCE( - oc.costeach, - (SELECT pi.costeach - FROM product_inventory pi - WHERE pi.pid = oc.pid - AND pi.daterec <= o.date_placed - ORDER BY pi.daterec DESC LIMIT 1) - ) as costeach - FROM order_costs oc - JOIN _order o ON oc.orderid = o.order_id - WHERE oc.orderid IN (?) - `, [batchIds]); - - if (costs.length > 0) { - const placeholders = costs.map((_, idx) => + const placeholders = subBatch.map((_, idx) => `($${idx * 3 + 1}, $${idx * 3 + 2}, $${idx * 3 + 3})` ).join(","); - const values = costs.flatMap(c => [c.order_id, c.pid, c.costeach || 0]); + + const values = subBatch.flatMap(t => [ + t.order_id, + t.pid, + t.tax || 0 + ]); await localConnection.query(` - INSERT INTO temp_order_costs (order_id, pid, costeach) + INSERT INTO debug_order_taxes (order_id, pid, tax) + VALUES ${placeholders} + ON CONFLICT (order_id, pid) DO UPDATE SET + tax = EXCLUDED.tax + `, values); + } + }; + + const processCostsBatch = async (batchIds) => { + const [costs] = await prodConnection.query(` + SELECT + oi.order_id, + oi.prod_pid as pid, + oi.prod_price as costeach + FROM order_items oi + WHERE oi.order_id IN (?) + `, [batchIds]); + + if (costs.length === 0) return; + + for (let j = 0; j < costs.length; j += PG_BATCH_SIZE) { + const subBatch = costs.slice(j, j + PG_BATCH_SIZE); + if (subBatch.length === 0) continue; + + const placeholders = subBatch.map((_, idx) => + `($${idx * 3 + 1}, $${idx * 3 + 2}, $${idx * 3 + 3})` + ).join(","); + + const values = subBatch.flatMap(c => [ + c.order_id, + c.pid, + c.costeach || 0 + ]); + + await localConnection.query(` + INSERT INTO debug_order_costs (order_id, pid, costeach) VALUES ${placeholders} ON CONFLICT (order_id, pid) DO UPDATE SET costeach = EXCLUDED.costeach `, values); } - } + }; - // Pre-check all products at once - const allOrderPids = [...new Set(orderItems.map(item => item.pid))]; - const [existingProducts] = allOrderPids.length > 0 ? await localConnection.query( - "SELECT pid FROM products WHERE pid = ANY($1)", - [allOrderPids] - ) : [[]]; - const existingPids = new Set(existingProducts.rows.map(p => p.pid)); - - // Process in larger batches - for (let i = 0; i < orderIds.length; i += 5000) { - const batchIds = orderIds.slice(i, i + 5000); - - // Get combined data for this batch - const [orders] = await localConnection.query(` - SELECT - oi.order_id as order_number, - oi.pid, - oi.SKU, - om.date, - oi.price, - oi.quantity, - oi.base_discount + COALESCE(od.discount, 0) + - CASE - WHEN om.summary_discount > 0 THEN - ROUND((om.summary_discount * (oi.price * oi.quantity)) / - NULLIF(om.summary_subtotal, 0), 2) - ELSE 0 - END as discount, - COALESCE(ot.tax, 0) as tax, - false as tax_included, - 0 as shipping, - om.customer, - om.customer_name, - om.status, - om.canceled, - COALESCE(tc.costeach, 0) as costeach - FROM temp_order_items oi - JOIN temp_order_meta om ON oi.order_id = om.order_id - LEFT JOIN temp_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid - LEFT JOIN temp_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid - LEFT JOIN temp_order_costs tc ON oi.order_id = tc.order_id AND oi.pid = tc.pid - WHERE oi.order_id = ANY($1) - `, [batchIds]); - - // Filter orders and track missing products - const validOrders = []; - const processedOrderItems = new Set(); - const processedOrders = new Set(); + // Process all data types in parallel for each batch + for (let i = 0; i < orderIds.length; i += METADATA_BATCH_SIZE) { + const batchIds = orderIds.slice(i, i + METADATA_BATCH_SIZE); - for (const order of orders.rows) { - if (!existingPids.has(order.pid)) { - missingProducts.add(order.pid); - skippedOrders.add(order.order_number); - continue; - } - validOrders.push(order); - processedOrderItems.add(`${order.order_number}-${order.pid}`); - processedOrders.add(order.order_number); - } + await Promise.all([ + processMetadataBatch(batchIds), + processDiscountsBatch(batchIds), + processTaxesBatch(batchIds), + processCostsBatch(batchIds) + ]); - if (validOrders.length > 0) { - const placeholders = validOrders.map((_, idx) => { - const base = idx * 15; - return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, $${base + 10}, $${base + 11}, $${base + 12}, $${base + 13}, $${base + 14}, $${base + 15})`; - }).join(','); - - const batchValues = validOrders.flatMap(o => [ - o.order_number, - o.pid, - o.SKU, - o.date, - o.price, - o.quantity, - o.discount, - o.tax, - o.tax_included, - o.shipping, - o.customer, - o.customer_name, - o.status, - o.canceled, - o.costeach - ]); - - const [result] = await localConnection.query(` - WITH inserted_orders AS ( - INSERT INTO orders ( - order_number, pid, SKU, date, price, quantity, discount, - tax, tax_included, shipping, customer, customer_name, - status, canceled, costeach - ) - VALUES ${placeholders} - ON CONFLICT (order_number, pid) DO UPDATE SET - SKU = EXCLUDED.SKU, - date = EXCLUDED.date, - price = EXCLUDED.price, - quantity = EXCLUDED.quantity, - discount = EXCLUDED.discount, - tax = EXCLUDED.tax, - tax_included = EXCLUDED.tax_included, - shipping = EXCLUDED.shipping, - customer = EXCLUDED.customer, - customer_name = EXCLUDED.customer_name, - status = EXCLUDED.status, - canceled = EXCLUDED.canceled, - costeach = EXCLUDED.costeach - RETURNING xmax, xmin - ) - SELECT - COUNT(*) FILTER (WHERE xmax = 0) as inserted, - COUNT(*) FILTER (WHERE xmax <> 0) as updated - FROM inserted_orders - `, batchValues); - - const { inserted, updated } = result.rows[0]; - recordsAdded += inserted; - recordsUpdated += updated; - importedCount += processedOrderItems.size; - } - - cumulativeProcessedOrders += processedOrders.size; + processedCount = i + batchIds.length; outputProgress({ status: "running", operation: "Orders import", - message: `Imported ${importedCount} order items (${cumulativeProcessedOrders} of ${totalUniqueOrders} orders processed)`, - current: cumulativeProcessedOrders, + message: `Loading order data: ${processedCount} of ${totalUniqueOrders}`, + current: processedCount, total: totalUniqueOrders, elapsed: formatElapsedTime((Date.now() - startTime) / 1000), - remaining: estimateRemaining(startTime, cumulativeProcessedOrders, totalUniqueOrders), - rate: calculateRate(startTime, cumulativeProcessedOrders) + remaining: estimateRemaining(startTime, processedCount, totalUniqueOrders), + rate: calculateRate(startTime, processedCount) }); } - // Clean up temporary tables - await localConnection.query(` - DROP TABLE IF EXISTS temp_order_items; - DROP TABLE IF EXISTS temp_order_meta; - DROP TABLE IF EXISTS temp_order_discounts; - DROP TABLE IF EXISTS temp_order_taxes; - DROP TABLE IF EXISTS temp_order_costs; - `); + // Pre-check all products at once + const allOrderPids = [...new Set(orderItems.map(item => item.prod_pid))]; + console.log('Orders: Checking', allOrderPids.length, 'unique products'); + + const [existingProducts] = allOrderPids.length > 0 ? await localConnection.query( + "SELECT pid FROM products WHERE pid = ANY($1::bigint[])", + [allOrderPids] + ) : [[]]; + + const existingPids = new Set(existingProducts.rows.map(p => p.pid)); + + // Process in smaller batches + for (let i = 0; i < orderIds.length; i += 1000) { + const batchIds = orderIds.slice(i, i + 1000); + + // Get combined data for this batch in sub-batches + const PG_BATCH_SIZE = 100; // Process 100 records at a time + for (let j = 0; j < batchIds.length; j += PG_BATCH_SIZE) { + const subBatchIds = batchIds.slice(j, j + PG_BATCH_SIZE); + + const [orders] = await localConnection.query(` + WITH order_totals AS ( + SELECT + oi.order_id, + oi.pid, + SUM(COALESCE(od.discount, 0)) as promo_discount, + COALESCE(ot.tax, 0) as total_tax + FROM debug_order_items oi + LEFT JOIN debug_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid + LEFT JOIN debug_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid + GROUP BY oi.order_id, oi.pid, ot.tax + ) + SELECT + oi.order_id as order_number, + oi.pid::bigint as pid, + oi.SKU as sku, + om.date, + oi.price, + oi.quantity, + (oi.base_discount + + COALESCE(ot.promo_discount, 0) + + CASE + WHEN om.summary_discount > 0 AND om.summary_subtotal > 0 THEN + ROUND((om.summary_discount * (oi.price * oi.quantity)) / NULLIF(om.summary_subtotal, 0), 2) + ELSE 0 + END)::DECIMAL(10,2) as discount, + COALESCE(ot.total_tax, 0)::DECIMAL(10,2) as tax, + false as tax_included, + 0 as shipping, + om.customer, + om.customer_name, + om.status, + om.canceled, + COALESCE(oc.costeach, oi.price)::DECIMAL(10,3) as costeach + FROM ( + SELECT DISTINCT ON (order_id, pid) + order_id, pid, SKU, price, quantity, base_discount + FROM debug_order_items + WHERE order_id = ANY($1) + ORDER BY order_id, pid + ) oi + JOIN debug_order_meta om ON oi.order_id = om.order_id + LEFT JOIN order_totals ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid + LEFT JOIN debug_order_costs oc ON oi.order_id = oc.order_id AND oi.pid = oc.pid + ORDER BY oi.order_id, oi.pid + `, [subBatchIds]); + + // Filter orders and track missing products + const validOrders = []; + const processedOrderItems = new Set(); + const processedOrders = new Set(); + + for (const order of orders.rows) { + if (!existingPids.has(order.pid)) { + missingProducts.add(order.pid); + skippedOrders.add(order.order_number); + continue; + } + validOrders.push(order); + processedOrderItems.add(`${order.order_number}-${order.pid}`); + processedOrders.add(order.order_number); + } + + // Process valid orders in smaller sub-batches + const FINAL_BATCH_SIZE = 50; + for (let k = 0; k < validOrders.length; k += FINAL_BATCH_SIZE) { + const subBatch = validOrders.slice(k, k + FINAL_BATCH_SIZE); + + const placeholders = subBatch.map((_, idx) => { + const base = idx * 15; + return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, $${base + 10}, $${base + 11}, $${base + 12}, $${base + 13}, $${base + 14}, $${base + 15})`; + }).join(','); + + const batchValues = subBatch.flatMap(o => [ + o.order_number, + o.pid, + o.sku || 'NO-SKU', + o.date, + o.price, + o.quantity, + o.discount, + o.tax, + o.tax_included, + o.shipping, + o.customer, + o.customer_name, + o.status, + o.canceled, + o.costeach + ]); + + const [result] = await localConnection.query(` + WITH inserted_orders AS ( + INSERT INTO orders ( + order_number, pid, SKU, date, price, quantity, discount, + tax, tax_included, shipping, customer, customer_name, + status, canceled, costeach + ) + VALUES ${placeholders} + ON CONFLICT (order_number, pid) DO UPDATE SET + SKU = EXCLUDED.SKU, + date = EXCLUDED.date, + price = EXCLUDED.price, + quantity = EXCLUDED.quantity, + discount = EXCLUDED.discount, + tax = EXCLUDED.tax, + tax_included = EXCLUDED.tax_included, + shipping = EXCLUDED.shipping, + customer = EXCLUDED.customer, + customer_name = EXCLUDED.customer_name, + status = EXCLUDED.status, + canceled = EXCLUDED.canceled, + costeach = EXCLUDED.costeach + RETURNING xmax, xmin + ) + SELECT + COUNT(*) FILTER (WHERE xmax = 0) as inserted, + COUNT(*) FILTER (WHERE xmax <> 0) as updated + FROM inserted_orders + `, batchValues); + + const { inserted, updated } = result.rows[0]; + recordsAdded += inserted; + recordsUpdated += updated; + importedCount += subBatch.length; + } + + cumulativeProcessedOrders += processedOrders.size; + outputProgress({ + status: "running", + operation: "Orders import", + message: `Importing orders: ${cumulativeProcessedOrders} of ${totalUniqueOrders}`, + current: cumulativeProcessedOrders, + total: totalUniqueOrders, + elapsed: formatElapsedTime((Date.now() - startTime) / 1000), + remaining: estimateRemaining(startTime, cumulativeProcessedOrders, totalUniqueOrders), + rate: calculateRate(startTime, cumulativeProcessedOrders) + }); + } + } // Update sync status await localConnection.query(` diff --git a/inventory-server/scripts/import/products.js b/inventory-server/scripts/import/products.js index 5c475c7..c1006c2 100644 --- a/inventory-server/scripts/import/products.js +++ b/inventory-server/scripts/import/products.js @@ -298,7 +298,7 @@ async function importMissingProducts(prodConnection, localConnection, missingPid SELECT COUNT(*) as inserted FROM inserted_products `, values); - recordsAdded += result.rows[0].inserted; + recordsAdded += parseInt(result.rows[0].inserted, 10) || 0; } return { diff --git a/inventory-server/scripts/import/purchase-orders.js b/inventory-server/scripts/import/purchase-orders.js index f0dfc79..fd95c34 100644 --- a/inventory-server/scripts/import/purchase-orders.js +++ b/inventory-server/scripts/import/purchase-orders.js @@ -385,8 +385,8 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental FROM inserted_pos `); - recordsAdded = result.rows[0].inserted; - recordsUpdated = result.rows[0].updated; + recordsAdded = parseInt(result.rows[0].inserted, 10) || 0; + recordsUpdated = parseInt(result.rows[0].updated, 10) || 0; // Update sync status await localConnection.query(`