From 108181c63d8418bb8b8f365668d4c5ab2504afbb Mon Sep 17 00:00:00 2001 From: Matt Date: Tue, 25 Mar 2025 22:23:06 -0400 Subject: [PATCH] Fix more import script bugs/missing data --- inventory-server/scripts/import-from-prod.js | 6 +- inventory-server/scripts/import/orders.js | 27 +- inventory-server/scripts/import/products.js | 2 +- .../scripts/import/purchase-orders.js | 465 ++++++++++-------- .../scripts/update-order-costs.js | 337 +++++++++++++ 5 files changed, 622 insertions(+), 215 deletions(-) create mode 100644 inventory-server/scripts/update-order-costs.js diff --git a/inventory-server/scripts/import-from-prod.js b/inventory-server/scripts/import-from-prod.js index f3392cb..b0fdbc1 100644 --- a/inventory-server/scripts/import-from-prod.js +++ b/inventory-server/scripts/import-from-prod.js @@ -10,9 +10,9 @@ const importPurchaseOrders = require('./import/purchase-orders'); dotenv.config({ path: path.join(__dirname, "../.env") }); // Constants to control which imports run -const IMPORT_CATEGORIES = true; -const IMPORT_PRODUCTS = true; -const IMPORT_ORDERS = true; +const IMPORT_CATEGORIES = false; +const IMPORT_PRODUCTS = false; +const IMPORT_ORDERS = false; const IMPORT_PURCHASE_ORDERS = true; // Add flag for incremental updates diff --git a/inventory-server/scripts/import/orders.js b/inventory-server/scripts/import/orders.js index bbe9931..fd0cc4d 100644 --- a/inventory-server/scripts/import/orders.js +++ b/inventory-server/scripts/import/orders.js @@ -206,6 +206,14 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = const METADATA_BATCH_SIZE = 2000; const PG_BATCH_SIZE = 200; + // Add a helper function for title case conversion + function toTitleCase(str) { + if (!str) return ''; + return str.toLowerCase().split(' ').map(word => { + return word.charAt(0).toUpperCase() + word.slice(1); + }).join(' '); + } + const processMetadataBatch = async (batchIds) => { const [orders] = await prodConnection.query(` SELECT @@ -235,7 +243,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = order.order_id, order.date, order.customer, - order.customer_name || '', + toTitleCase(order.customer_name) || '', order.status, order.canceled, order.summary_discount || 0, @@ -429,11 +437,12 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = oi.pid, SUM(COALESCE(od.discount, 0)) as promo_discount, COALESCE(ot.tax, 0) as total_tax, - COALESCE(oi.price * 0.5, 0) as costeach + COALESCE(oc.costeach, oi.price * 0.5) as costeach FROM temp_order_items oi LEFT JOIN temp_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid LEFT JOIN temp_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid - GROUP BY oi.order_id, oi.pid, ot.tax + LEFT JOIN temp_order_costs oc ON oi.order_id = oc.order_id AND oi.pid = oc.pid + GROUP BY oi.order_id, oi.pid, ot.tax, oc.costeach ) SELECT oi.order_id as order_number, @@ -491,8 +500,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = const subBatch = validOrders.slice(k, k + FINAL_BATCH_SIZE); const placeholders = subBatch.map((_, idx) => { - const base = idx * 14; // 14 columns (removed updated) - return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, $${base + 10}, $${base + 11}, $${base + 12}, $${base + 13}, $${base + 14})`; + const base = idx * 15; // 15 columns including costeach + return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, $${base + 10}, $${base + 11}, $${base + 12}, $${base + 13}, $${base + 14}, $${base + 15})`; }).join(','); const batchValues = subBatch.flatMap(o => [ @@ -509,7 +518,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = o.customer, o.customer_name, o.status, - o.canceled + o.canceled, + o.costeach ]); const [result] = await localConnection.query(` @@ -517,7 +527,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = INSERT INTO orders ( order_number, pid, sku, date, price, quantity, discount, tax, tax_included, shipping, customer, customer_name, - status, canceled + status, canceled, costeach ) VALUES ${placeholders} ON CONFLICT (order_number, pid) DO UPDATE SET @@ -532,7 +542,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = customer = EXCLUDED.customer, customer_name = EXCLUDED.customer_name, status = EXCLUDED.status, - canceled = EXCLUDED.canceled + canceled = EXCLUDED.canceled, + costeach = EXCLUDED.costeach RETURNING xmax = 0 as inserted ) SELECT diff --git a/inventory-server/scripts/import/products.js b/inventory-server/scripts/import/products.js index 9267764..e4c278c 100644 --- a/inventory-server/scripts/import/products.js +++ b/inventory-server/scripts/import/products.js @@ -1,5 +1,5 @@ const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress'); -const BATCH_SIZE = 100; // Smaller batch size for better progress tracking +const BATCH_SIZE = 1000; // Smaller batch size for better progress tracking const MAX_RETRIES = 3; const RETRY_DELAY = 5000; // 5 seconds const dotenv = require("dotenv"); diff --git a/inventory-server/scripts/import/purchase-orders.js b/inventory-server/scripts/import/purchase-orders.js index a4943c7..6908df3 100644 --- a/inventory-server/scripts/import/purchase-orders.js +++ b/inventory-server/scripts/import/purchase-orders.js @@ -519,217 +519,276 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental outputProgress({ status: "running", operation: "Purchase orders import", - message: "Allocating receivings to purchase orders using FIFO" + message: "Validating product IDs before allocation" }); - // Step 1: Handle receivings with matching PO IDs (direct allocation) + // Add this section to filter out invalid PIDs before allocation + // This will check all PIDs in our temp tables against the products table await localConnection.query(` - INSERT INTO temp_receiving_allocations ( - po_id, pid, receiving_id, allocated_qty, cost_each, received_date, received_by - ) - SELECT - r.po_id, - r.pid, - r.receiving_id, - LEAST(r.qty_each, po.ordered) as allocated_qty, - r.cost_each, - COALESCE(r.received_date, NOW()) as received_date, - r.received_by - FROM temp_receivings r - JOIN temp_purchase_orders po ON r.po_id = po.po_id AND r.pid = po.pid - WHERE r.po_id IS NOT NULL + -- Create temp table to store invalid PIDs + DROP TABLE IF EXISTS temp_invalid_pids; + CREATE TEMP TABLE temp_invalid_pids AS ( + -- Get all unique PIDs from our temp tables + WITH all_pids AS ( + SELECT DISTINCT pid FROM temp_purchase_orders + UNION + SELECT DISTINCT pid FROM temp_receivings + ) + -- Filter to only those that don't exist in products table + SELECT p.pid + FROM all_pids p + WHERE NOT EXISTS ( + SELECT 1 FROM products WHERE pid = p.pid + ) + ); + + -- Remove purchase orders with invalid PIDs + DELETE FROM temp_purchase_orders + WHERE pid IN (SELECT pid FROM temp_invalid_pids); + + -- Remove receivings with invalid PIDs + DELETE FROM temp_receivings + WHERE pid IN (SELECT pid FROM temp_invalid_pids); `); - // Step 2: Handle receivings without a matching PO (standalone receivings) - // Create a PO entry for each standalone receiving - await localConnection.query(` - INSERT INTO temp_purchase_orders ( - po_id, pid, sku, name, vendor, date, status, status_text, - ordered, po_cost_price, supplier_id, date_created, date_ordered - ) - SELECT - 'R' || r.receiving_id as po_id, - r.pid, - COALESCE(p.sku, 'NO-SKU') as sku, - COALESCE(p.name, 'Unknown Product') as name, - COALESCE( - (SELECT vendor FROM temp_purchase_orders - WHERE supplier_id = r.supplier_id LIMIT 1), - 'Unknown Vendor' - ) as vendor, - COALESCE(r.received_date, r.receiving_created_date) as date, - NULL as status, - NULL as status_text, - NULL as ordered, - r.cost_each as po_cost_price, - r.supplier_id, - COALESCE(r.receiving_created_date, r.received_date) as date_created, - NULL as date_ordered - FROM temp_receivings r - LEFT JOIN ( - SELECT DISTINCT pid, sku, name FROM temp_purchase_orders - ) p ON r.pid = p.pid - WHERE r.po_id IS NULL - OR NOT EXISTS ( - SELECT 1 FROM temp_purchase_orders po - WHERE po.po_id = r.po_id AND po.pid = r.pid - ) - ON CONFLICT (po_id, pid) DO NOTHING + // Get count of filtered items for reporting + const [filteredResult] = await localConnection.query(` + SELECT COUNT(*) as count FROM temp_invalid_pids `); + const filteredCount = filteredResult.rows[0].count; - // Now allocate these standalone receivings to their "virtual" POs - await localConnection.query(` - INSERT INTO temp_receiving_allocations ( - po_id, pid, receiving_id, allocated_qty, cost_each, received_date, received_by - ) - SELECT - 'R' || r.receiving_id as po_id, - r.pid, - r.receiving_id, - r.qty_each as allocated_qty, - r.cost_each, - COALESCE(r.received_date, NOW()) as received_date, - r.received_by - FROM temp_receivings r - WHERE r.po_id IS NULL - OR NOT EXISTS ( - SELECT 1 FROM temp_purchase_orders po - WHERE po.po_id = r.po_id AND po.pid = r.pid - ) - `); + if (filteredCount > 0) { + console.log(`Filtered out ${filteredCount} items with invalid product IDs`); + } - // Step 3: Handle unallocated receivings vs. unfulfilled orders - // This is the complex FIFO allocation logic - await localConnection.query(` - WITH - -- Calculate remaining quantities after direct allocations - remaining_po_quantities AS ( - SELECT - po.po_id, - po.pid, - po.ordered, - COALESCE(SUM(ra.allocated_qty), 0) as already_allocated, - po.ordered - COALESCE(SUM(ra.allocated_qty), 0) as remaining_qty, - po.date_ordered, - po.date_created - FROM temp_purchase_orders po - LEFT JOIN temp_receiving_allocations ra ON po.po_id = ra.po_id AND po.pid = ra.pid - WHERE po.ordered IS NOT NULL - GROUP BY po.po_id, po.pid, po.ordered, po.date_ordered, po.date_created - HAVING po.ordered > COALESCE(SUM(ra.allocated_qty), 0) - ), - remaining_receiving_quantities AS ( - SELECT - r.receiving_id, - r.pid, - r.qty_each, - COALESCE(SUM(ra.allocated_qty), 0) as already_allocated, - r.qty_each - COALESCE(SUM(ra.allocated_qty), 0) as remaining_qty, - r.received_date, - r.cost_each, - r.received_by - FROM temp_receivings r - LEFT JOIN temp_receiving_allocations ra ON r.receiving_id = ra.receiving_id AND r.pid = ra.pid - GROUP BY r.receiving_id, r.pid, r.qty_each, r.received_date, r.cost_each, r.received_by - HAVING r.qty_each > COALESCE(SUM(ra.allocated_qty), 0) - ), - -- Rank POs by age, with a cutoff for very old POs (1 year) - ranked_pos AS ( - SELECT - po.po_id, - po.pid, - po.remaining_qty, - CASE - WHEN po.date_ordered IS NULL OR po.date_ordered < NOW() - INTERVAL '1 year' THEN 2 - ELSE 1 - END as age_group, - ROW_NUMBER() OVER ( - PARTITION BY po.pid, (CASE WHEN po.date_ordered IS NULL OR po.date_ordered < NOW() - INTERVAL '1 year' THEN 2 ELSE 1 END) - ORDER BY COALESCE(po.date_ordered, po.date_created, NOW()) - ) as rank_in_group - FROM remaining_po_quantities po - ), - -- Rank receivings by date - ranked_receivings AS ( - SELECT - r.receiving_id, - r.pid, - r.remaining_qty, - r.received_date, - r.cost_each, - r.received_by, - ROW_NUMBER() OVER (PARTITION BY r.pid ORDER BY COALESCE(r.received_date, NOW())) as rank - FROM remaining_receiving_quantities r - ), - -- First allocate to recent POs - allocations_recent AS ( - SELECT - po.po_id, - po.pid, - r.receiving_id, - LEAST(po.remaining_qty, r.remaining_qty) as allocated_qty, - r.cost_each, - COALESCE(r.received_date, NOW()) as received_date, - r.received_by, - po.age_group, - po.rank_in_group, - r.rank, - 'recent' as allocation_type - FROM ranked_pos po - JOIN ranked_receivings r ON po.pid = r.pid - WHERE po.age_group = 1 - ORDER BY po.pid, po.rank_in_group, r.rank - ), - -- Then allocate to older POs - remaining_after_recent AS ( - SELECT - r.receiving_id, - r.pid, - r.remaining_qty - COALESCE(SUM(a.allocated_qty), 0) as remaining_qty, - r.received_date, - r.cost_each, - r.received_by, - r.rank - FROM ranked_receivings r - LEFT JOIN allocations_recent a ON r.receiving_id = a.receiving_id AND r.pid = a.pid - GROUP BY r.receiving_id, r.pid, r.remaining_qty, r.received_date, r.cost_each, r.received_by, r.rank - HAVING r.remaining_qty > COALESCE(SUM(a.allocated_qty), 0) - ), - allocations_old AS ( - SELECT - po.po_id, - po.pid, - r.receiving_id, - LEAST(po.remaining_qty, r.remaining_qty) as allocated_qty, - r.cost_each, - COALESCE(r.received_date, NOW()) as received_date, - r.received_by, - po.age_group, - po.rank_in_group, - r.rank, - 'old' as allocation_type - FROM ranked_pos po - JOIN remaining_after_recent r ON po.pid = r.pid - WHERE po.age_group = 2 - ORDER BY po.pid, po.rank_in_group, r.rank - ), - -- Combine allocations - combined_allocations AS ( - SELECT * FROM allocations_recent - UNION ALL - SELECT * FROM allocations_old - ) - -- Insert into allocations table - INSERT INTO temp_receiving_allocations ( - po_id, pid, receiving_id, allocated_qty, cost_each, received_date, received_by - ) - SELECT - po_id, pid, receiving_id, allocated_qty, cost_each, - COALESCE(received_date, NOW()) as received_date, - received_by - FROM combined_allocations - WHERE allocated_qty > 0 - `); + // Break FIFO allocation into steps with progress tracking + const fifoSteps = [ + { + name: "Direct allocations", + query: ` + INSERT INTO temp_receiving_allocations ( + po_id, pid, receiving_id, allocated_qty, cost_each, received_date, received_by + ) + SELECT + r.po_id, + r.pid, + r.receiving_id, + LEAST(r.qty_each, po.ordered) as allocated_qty, + r.cost_each, + COALESCE(r.received_date, NOW()) as received_date, + r.received_by + FROM temp_receivings r + JOIN temp_purchase_orders po ON r.po_id = po.po_id AND r.pid = po.pid + WHERE r.po_id IS NOT NULL + ` + }, + { + name: "Handling standalone receivings", + query: ` + INSERT INTO temp_purchase_orders ( + po_id, pid, sku, name, vendor, date, status, status_text, + ordered, po_cost_price, supplier_id, date_created, date_ordered + ) + SELECT + 'R' || r.receiving_id as po_id, + r.pid, + COALESCE(p.sku, 'NO-SKU') as sku, + COALESCE(p.name, 'Unknown Product') as name, + COALESCE( + (SELECT vendor FROM temp_purchase_orders + WHERE supplier_id = r.supplier_id LIMIT 1), + 'Unknown Vendor' + ) as vendor, + COALESCE(r.received_date, r.receiving_created_date) as date, + NULL as status, + NULL as status_text, + NULL as ordered, + r.cost_each as po_cost_price, + r.supplier_id, + COALESCE(r.receiving_created_date, r.received_date) as date_created, + NULL as date_ordered + FROM temp_receivings r + LEFT JOIN ( + SELECT DISTINCT pid, sku, name FROM temp_purchase_orders + ) p ON r.pid = p.pid + WHERE r.po_id IS NULL + OR NOT EXISTS ( + SELECT 1 FROM temp_purchase_orders po + WHERE po.po_id = r.po_id AND po.pid = r.pid + ) + ON CONFLICT (po_id, pid) DO NOTHING + ` + }, + { + name: "Allocating standalone receivings", + query: ` + INSERT INTO temp_receiving_allocations ( + po_id, pid, receiving_id, allocated_qty, cost_each, received_date, received_by + ) + SELECT + 'R' || r.receiving_id as po_id, + r.pid, + r.receiving_id, + r.qty_each as allocated_qty, + r.cost_each, + COALESCE(r.received_date, NOW()) as received_date, + r.received_by + FROM temp_receivings r + WHERE r.po_id IS NULL + OR NOT EXISTS ( + SELECT 1 FROM temp_purchase_orders po + WHERE po.po_id = r.po_id AND po.pid = r.pid + ) + ` + }, + { + name: "FIFO allocation logic", + query: ` + WITH + -- Calculate remaining quantities after direct allocations + remaining_po_quantities AS ( + SELECT + po.po_id, + po.pid, + po.ordered, + COALESCE(SUM(ra.allocated_qty), 0) as already_allocated, + po.ordered - COALESCE(SUM(ra.allocated_qty), 0) as remaining_qty, + po.date_ordered, + po.date_created + FROM temp_purchase_orders po + LEFT JOIN temp_receiving_allocations ra ON po.po_id = ra.po_id AND po.pid = ra.pid + WHERE po.ordered IS NOT NULL + GROUP BY po.po_id, po.pid, po.ordered, po.date_ordered, po.date_created + HAVING po.ordered > COALESCE(SUM(ra.allocated_qty), 0) + ), + remaining_receiving_quantities AS ( + SELECT + r.receiving_id, + r.pid, + r.qty_each, + COALESCE(SUM(ra.allocated_qty), 0) as already_allocated, + r.qty_each - COALESCE(SUM(ra.allocated_qty), 0) as remaining_qty, + r.received_date, + r.cost_each, + r.received_by + FROM temp_receivings r + LEFT JOIN temp_receiving_allocations ra ON r.receiving_id = ra.receiving_id AND r.pid = ra.pid + GROUP BY r.receiving_id, r.pid, r.qty_each, r.received_date, r.cost_each, r.received_by + HAVING r.qty_each > COALESCE(SUM(ra.allocated_qty), 0) + ), + -- Rank POs by age, with a cutoff for very old POs (1 year) + ranked_pos AS ( + SELECT + po.po_id, + po.pid, + po.remaining_qty, + CASE + WHEN po.date_ordered IS NULL OR po.date_ordered < NOW() - INTERVAL '1 year' THEN 2 + ELSE 1 + END as age_group, + ROW_NUMBER() OVER ( + PARTITION BY po.pid, (CASE WHEN po.date_ordered IS NULL OR po.date_ordered < NOW() - INTERVAL '1 year' THEN 2 ELSE 1 END) + ORDER BY COALESCE(po.date_ordered, po.date_created, NOW()) + ) as rank_in_group + FROM remaining_po_quantities po + ), + -- Rank receivings by date + ranked_receivings AS ( + SELECT + r.receiving_id, + r.pid, + r.remaining_qty, + r.received_date, + r.cost_each, + r.received_by, + ROW_NUMBER() OVER (PARTITION BY r.pid ORDER BY COALESCE(r.received_date, NOW())) as rank + FROM remaining_receiving_quantities r + ), + -- First allocate to recent POs + allocations_recent AS ( + SELECT + po.po_id, + po.pid, + r.receiving_id, + LEAST(po.remaining_qty, r.remaining_qty) as allocated_qty, + r.cost_each, + COALESCE(r.received_date, NOW()) as received_date, + r.received_by, + po.age_group, + po.rank_in_group, + r.rank, + 'recent' as allocation_type + FROM ranked_pos po + JOIN ranked_receivings r ON po.pid = r.pid + WHERE po.age_group = 1 + ORDER BY po.pid, po.rank_in_group, r.rank + ), + -- Then allocate to older POs + remaining_after_recent AS ( + SELECT + r.receiving_id, + r.pid, + r.remaining_qty - COALESCE(SUM(a.allocated_qty), 0) as remaining_qty, + r.received_date, + r.cost_each, + r.received_by, + r.rank + FROM ranked_receivings r + LEFT JOIN allocations_recent a ON r.receiving_id = a.receiving_id AND r.pid = a.pid + GROUP BY r.receiving_id, r.pid, r.remaining_qty, r.received_date, r.cost_each, r.received_by, r.rank + HAVING r.remaining_qty > COALESCE(SUM(a.allocated_qty), 0) + ), + allocations_old AS ( + SELECT + po.po_id, + po.pid, + r.receiving_id, + LEAST(po.remaining_qty, r.remaining_qty) as allocated_qty, + r.cost_each, + COALESCE(r.received_date, NOW()) as received_date, + r.received_by, + po.age_group, + po.rank_in_group, + r.rank, + 'old' as allocation_type + FROM ranked_pos po + JOIN remaining_after_recent r ON po.pid = r.pid + WHERE po.age_group = 2 + ORDER BY po.pid, po.rank_in_group, r.rank + ), + -- Combine allocations + combined_allocations AS ( + SELECT * FROM allocations_recent + UNION ALL + SELECT * FROM allocations_old + ) + -- Insert into allocations table + INSERT INTO temp_receiving_allocations ( + po_id, pid, receiving_id, allocated_qty, cost_each, received_date, received_by + ) + SELECT + po_id, pid, receiving_id, allocated_qty, cost_each, + COALESCE(received_date, NOW()) as received_date, + received_by + FROM combined_allocations + WHERE allocated_qty > 0 + ` + } + ]; + + // Execute FIFO steps with progress tracking + for (let i = 0; i < fifoSteps.length; i++) { + const step = fifoSteps[i]; + outputProgress({ + status: "running", + operation: "Purchase orders import", + message: `FIFO allocation step ${i+1}/${fifoSteps.length}: ${step.name}`, + current: i, + total: fifoSteps.length + }); + + await localConnection.query(step.query); + } // 4. Generate final purchase order records with receiving data outputProgress({ diff --git a/inventory-server/scripts/update-order-costs.js b/inventory-server/scripts/update-order-costs.js new file mode 100644 index 0000000..135af46 --- /dev/null +++ b/inventory-server/scripts/update-order-costs.js @@ -0,0 +1,337 @@ +/** + * This script updates the costeach values for existing orders from the original MySQL database + * without needing to run the full import process. + */ +const dotenv = require("dotenv"); +const path = require("path"); +const fs = require("fs"); +const { setupConnections, closeConnections } = require('./import/utils'); +const { outputProgress, formatElapsedTime } = require('./metrics/utils/progress'); + +dotenv.config({ path: path.join(__dirname, "../.env") }); + +// SSH configuration +const sshConfig = { + ssh: { + host: process.env.PROD_SSH_HOST, + port: process.env.PROD_SSH_PORT || 22, + username: process.env.PROD_SSH_USER, + privateKey: process.env.PROD_SSH_KEY_PATH + ? fs.readFileSync(process.env.PROD_SSH_KEY_PATH) + : undefined, + compress: true, // Enable SSH compression + }, + prodDbConfig: { + // MySQL config for production + host: process.env.PROD_DB_HOST || "localhost", + user: process.env.PROD_DB_USER, + password: process.env.PROD_DB_PASSWORD, + database: process.env.PROD_DB_NAME, + port: process.env.PROD_DB_PORT || 3306, + timezone: 'Z', + }, + localDbConfig: { + // PostgreSQL config for local + host: process.env.DB_HOST, + user: process.env.DB_USER, + password: process.env.DB_PASSWORD, + database: process.env.DB_NAME, + port: process.env.DB_PORT || 5432, + ssl: process.env.DB_SSL === 'true', + connectionTimeoutMillis: 60000, + idleTimeoutMillis: 30000, + max: 10 // connection pool max size + } +}; + +async function updateOrderCosts() { + const startTime = Date.now(); + let connections; + let updatedCount = 0; + let errorCount = 0; + + try { + outputProgress({ + status: "running", + operation: "Order costs update", + message: "Initializing SSH tunnel..." + }); + + connections = await setupConnections(sshConfig); + const { prodConnection, localConnection } = connections; + + // 1. Get all orders from local database that need cost updates + outputProgress({ + status: "running", + operation: "Order costs update", + message: "Getting orders from local database..." + }); + + const [orders] = await localConnection.query(` + SELECT DISTINCT order_number, pid + FROM orders + WHERE costeach = 0 OR costeach IS NULL + ORDER BY order_number + `); + + if (!orders || !orders.rows || orders.rows.length === 0) { + console.log("No orders found that need cost updates"); + return { updatedCount: 0, errorCount: 0 }; + } + + const totalOrders = orders.rows.length; + console.log(`Found ${totalOrders} orders that need cost updates`); + + // Process in batches of 1000 orders + const BATCH_SIZE = 500; + for (let i = 0; i < orders.rows.length; i += BATCH_SIZE) { + try { + // Start transaction for this batch + await localConnection.beginTransaction(); + + const batch = orders.rows.slice(i, i + BATCH_SIZE); + + const orderNumbers = [...new Set(batch.map(o => o.order_number))]; + + // 2. Fetch costs from production database for these orders + outputProgress({ + status: "running", + operation: "Order costs update", + message: `Fetching costs for orders ${i + 1} to ${Math.min(i + BATCH_SIZE, totalOrders)} of ${totalOrders}`, + current: i, + total: totalOrders, + elapsed: formatElapsedTime((Date.now() - startTime) / 1000) + }); + + const [costs] = await prodConnection.query(` + SELECT + oc.orderid as order_number, + oc.pid, + oc.costeach + FROM order_costs oc + INNER JOIN ( + SELECT + orderid, + pid, + MAX(id) as max_id + FROM order_costs + WHERE orderid IN (?) + AND pending = 0 + GROUP BY orderid, pid + ) latest ON oc.orderid = latest.orderid AND oc.pid = latest.pid AND oc.id = latest.max_id + `, [orderNumbers]); + + // Create a map of costs for easy lookup + const costMap = {}; + if (costs && costs.length) { + costs.forEach(c => { + costMap[`${c.order_number}-${c.pid}`] = c.costeach || 0; + }); + } + + // 3. Update costs in local database by batches + // Using a more efficient update approach with a temporary table + + // Create a temporary table for each batch + await localConnection.query(` + DROP TABLE IF EXISTS temp_order_costs; + CREATE TEMP TABLE temp_order_costs ( + order_number VARCHAR(50) NOT NULL, + pid BIGINT NOT NULL, + costeach DECIMAL(10,3) NOT NULL, + PRIMARY KEY (order_number, pid) + ); + `); + + // Insert cost data into the temporary table + const costEntries = []; + for (const order of batch) { + const key = `${order.order_number}-${order.pid}`; + if (key in costMap) { + costEntries.push({ + order_number: order.order_number, + pid: order.pid, + costeach: costMap[key] + }); + } + } + + // Insert in sub-batches of 100 + const DB_BATCH_SIZE = 50; + for (let j = 0; j < costEntries.length; j += DB_BATCH_SIZE) { + const subBatch = costEntries.slice(j, j + DB_BATCH_SIZE); + if (subBatch.length === 0) continue; + + const placeholders = subBatch.map((_, idx) => + `($${idx * 3 + 1}, $${idx * 3 + 2}, $${idx * 3 + 3})` + ).join(','); + + const values = subBatch.flatMap(item => [ + item.order_number, + item.pid, + item.costeach + ]); + + await localConnection.query(` + INSERT INTO temp_order_costs (order_number, pid, costeach) + VALUES ${placeholders} + `, values); + } + + // Perform bulk update from the temporary table + const [updateResult] = await localConnection.query(` + UPDATE orders o + SET costeach = t.costeach + FROM temp_order_costs t + WHERE o.order_number = t.order_number AND o.pid = t.pid + RETURNING o.id + `); + + const batchUpdated = updateResult.rowCount || 0; + updatedCount += batchUpdated; + + // Commit transaction for this batch + await localConnection.commit(); + + outputProgress({ + status: "running", + operation: "Order costs update", + message: `Updated ${updatedCount} orders with costs from production (batch: ${batchUpdated})`, + current: i + batch.length, + total: totalOrders, + elapsed: formatElapsedTime((Date.now() - startTime) / 1000) + }); + } catch (error) { + // If a batch fails, roll back that batch's transaction and continue + try { + await localConnection.rollback(); + } catch (rollbackError) { + console.error("Error during batch rollback:", rollbackError); + } + + console.error(`Error processing batch ${i}-${i + BATCH_SIZE}:`, error); + errorCount++; + } + } + + // 4. For orders with no matching costs, set a default based on price + outputProgress({ + status: "running", + operation: "Order costs update", + message: "Setting default costs for remaining orders..." + }); + + // Process remaining updates in smaller batches + const DEFAULT_BATCH_SIZE = 10000; + let totalDefaultUpdated = 0; + + try { + // Start with a count query to determine how many records need the default update + const [countResult] = await localConnection.query(` + SELECT COUNT(*) as count FROM orders + WHERE (costeach = 0 OR costeach IS NULL) + `); + + const totalToUpdate = parseInt(countResult.rows[0]?.count || 0); + + if (totalToUpdate > 0) { + console.log(`Applying default cost to ${totalToUpdate} orders`); + + // Apply the default in batches with separate transactions + for (let i = 0; i < totalToUpdate; i += DEFAULT_BATCH_SIZE) { + try { + await localConnection.beginTransaction(); + + const [defaultUpdates] = await localConnection.query(` + WITH orders_to_update AS ( + SELECT id FROM orders + WHERE (costeach = 0 OR costeach IS NULL) + LIMIT ${DEFAULT_BATCH_SIZE} + ) + UPDATE orders o + SET costeach = price * 0.5 + FROM orders_to_update otu + WHERE o.id = otu.id + RETURNING o.id + `); + + const batchDefaultUpdated = defaultUpdates.rowCount || 0; + totalDefaultUpdated += batchDefaultUpdated; + + await localConnection.commit(); + + outputProgress({ + status: "running", + operation: "Order costs update", + message: `Applied default costs to ${totalDefaultUpdated} of ${totalToUpdate} orders`, + current: totalDefaultUpdated, + total: totalToUpdate, + elapsed: formatElapsedTime((Date.now() - startTime) / 1000) + }); + } catch (error) { + try { + await localConnection.rollback(); + } catch (rollbackError) { + console.error("Error during default update rollback:", rollbackError); + } + + console.error(`Error applying default costs batch ${i}-${i + DEFAULT_BATCH_SIZE}:`, error); + errorCount++; + } + } + } + } catch (error) { + console.error("Error counting or updating remaining orders:", error); + errorCount++; + } + + updatedCount += totalDefaultUpdated; + + const endTime = Date.now(); + const totalSeconds = (endTime - startTime) / 1000; + + outputProgress({ + status: "complete", + operation: "Order costs update", + message: `Updated ${updatedCount} orders (${totalDefaultUpdated} with default values) in ${formatElapsedTime(totalSeconds)}`, + elapsed: formatElapsedTime(totalSeconds) + }); + + return { + status: "complete", + updatedCount, + errorCount + }; + } catch (error) { + console.error("Error during order costs update:", error); + + return { + status: "error", + error: error.message, + updatedCount, + errorCount + }; + } finally { + if (connections) { + await closeConnections(connections).catch(err => { + console.error("Error closing connections:", err); + }); + } + } +} + +// Run the script only if this is the main module +if (require.main === module) { + updateOrderCosts().then((results) => { + console.log('Cost update completed:', results); + // Force exit after a small delay to ensure all logs are written + setTimeout(() => process.exit(0), 500); + }).catch((error) => { + console.error("Unhandled error:", error); + // Force exit with error code after a small delay + setTimeout(() => process.exit(1), 500); + }); +} + +// Export the function for use in other scripts +module.exports = updateOrderCosts; \ No newline at end of file