diff --git a/inventory-server/scripts/import-from-prod.js b/inventory-server/scripts/import-from-prod.js index dd0f239..ff1e27c 100644 --- a/inventory-server/scripts/import-from-prod.js +++ b/inventory-server/scripts/import-from-prod.js @@ -10,9 +10,9 @@ const importPurchaseOrders = require('./import/purchase-orders'); dotenv.config({ path: path.join(__dirname, "../.env") }); // Constants to control which imports run -const IMPORT_CATEGORIES = true; -const IMPORT_PRODUCTS = true; -const IMPORT_ORDERS = true; +const IMPORT_CATEGORIES = false; +const IMPORT_PRODUCTS = false; +const IMPORT_ORDERS = false; const IMPORT_PURCHASE_ORDERS = true; // Add flag for incremental updates @@ -158,8 +158,8 @@ async function main() { if (isImportCancelled) throw new Error("Import cancelled"); completedSteps++; console.log('Categories import result:', results.categories); - totalRecordsAdded += parseInt(results.categories?.recordsAdded || 0); - totalRecordsUpdated += parseInt(results.categories?.recordsUpdated || 0); + totalRecordsAdded += parseInt(results.categories?.recordsAdded || 0) || 0; + totalRecordsUpdated += parseInt(results.categories?.recordsUpdated || 0) || 0; } if (IMPORT_PRODUCTS) { @@ -167,8 +167,8 @@ async function main() { if (isImportCancelled) throw new Error("Import cancelled"); completedSteps++; console.log('Products import result:', results.products); - totalRecordsAdded += parseInt(results.products?.recordsAdded || 0); - totalRecordsUpdated += parseInt(results.products?.recordsUpdated || 0); + totalRecordsAdded += parseInt(results.products?.recordsAdded || 0) || 0; + totalRecordsUpdated += parseInt(results.products?.recordsUpdated || 0) || 0; } if (IMPORT_ORDERS) { @@ -176,17 +176,34 @@ async function main() { if (isImportCancelled) throw new Error("Import cancelled"); completedSteps++; console.log('Orders import result:', results.orders); - totalRecordsAdded += parseInt(results.orders?.recordsAdded || 0); - totalRecordsUpdated += parseInt(results.orders?.recordsUpdated || 0); + totalRecordsAdded += parseInt(results.orders?.recordsAdded || 0) || 0; + totalRecordsUpdated += parseInt(results.orders?.recordsUpdated || 0) || 0; } if (IMPORT_PURCHASE_ORDERS) { - results.purchaseOrders = await importPurchaseOrders(prodConnection, localConnection, INCREMENTAL_UPDATE); - if (isImportCancelled) throw new Error("Import cancelled"); - completedSteps++; - console.log('Purchase orders import result:', results.purchaseOrders); - totalRecordsAdded += parseInt(results.purchaseOrders?.recordsAdded || 0); - totalRecordsUpdated += parseInt(results.purchaseOrders?.recordsUpdated || 0); + try { + results.purchaseOrders = await importPurchaseOrders(prodConnection, localConnection, INCREMENTAL_UPDATE); + if (isImportCancelled) throw new Error("Import cancelled"); + completedSteps++; + console.log('Purchase orders import result:', results.purchaseOrders); + + // Handle potential error status + if (results.purchaseOrders?.status === 'error') { + console.error('Purchase orders import had an error:', results.purchaseOrders.error); + } else { + totalRecordsAdded += parseInt(results.purchaseOrders?.recordsAdded || 0) || 0; + totalRecordsUpdated += parseInt(results.purchaseOrders?.recordsUpdated || 0) || 0; + } + } catch (error) { + console.error('Error during purchase orders import:', error); + // Continue with other imports, don't fail the whole process + results.purchaseOrders = { + status: 'error', + error: error.message, + recordsAdded: 0, + recordsUpdated: 0 + }; + } } const endTime = Date.now(); @@ -214,8 +231,8 @@ async function main() { WHERE id = $12 `, [ totalElapsedSeconds, - totalRecordsAdded, - totalRecordsUpdated, + parseInt(totalRecordsAdded) || 0, + parseInt(totalRecordsUpdated) || 0, IMPORT_CATEGORIES, IMPORT_PRODUCTS, IMPORT_ORDERS, diff --git a/inventory-server/scripts/import/orders.js b/inventory-server/scripts/import/orders.js index 7608bd7..c4f9cdd 100644 --- a/inventory-server/scripts/import/orders.js +++ b/inventory-server/scripts/import/orders.js @@ -330,14 +330,23 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = }; const processCostsBatch = async (batchIds) => { + // Modified query to ensure one row per order_id/pid by using a subquery const [costs] = await prodConnection.query(` SELECT oc.orderid as order_id, oc.pid, oc.costeach FROM order_costs oc - WHERE oc.orderid IN (?) - AND oc.pending = 0 + INNER JOIN ( + SELECT + orderid, + pid, + MAX(id) as max_id + FROM order_costs + WHERE orderid IN (?) + AND pending = 0 + GROUP BY orderid, pid + ) latest ON oc.orderid = latest.orderid AND oc.pid = latest.pid AND oc.id = latest.max_id `, [batchIds]); if (costs.length === 0) return; @@ -529,8 +538,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = `, batchValues); const { inserted, updated } = result.rows[0]; - recordsAdded += inserted; - recordsUpdated += updated; + recordsAdded += parseInt(inserted) || 0; + recordsUpdated += parseInt(updated) || 0; importedCount += subBatch.length; } @@ -558,11 +567,11 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = return { status: "complete", - totalImported: Math.floor(importedCount), - recordsAdded: recordsAdded || 0, - recordsUpdated: Math.floor(recordsUpdated), - totalSkipped: skippedOrders.size, - missingProducts: missingProducts.size, + totalImported: Math.floor(importedCount) || 0, + recordsAdded: parseInt(recordsAdded) || 0, + recordsUpdated: parseInt(recordsUpdated) || 0, + totalSkipped: skippedOrders.size || 0, + missingProducts: missingProducts.size || 0, incrementalUpdate, lastSyncTime }; diff --git a/inventory-server/scripts/import/purchase-orders.js b/inventory-server/scripts/import/purchase-orders.js index e450118..bac0208 100644 --- a/inventory-server/scripts/import/purchase-orders.js +++ b/inventory-server/scripts/import/purchase-orders.js @@ -14,12 +14,10 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental console.log('Purchase Orders: Using last sync time:', lastSyncTime); - // Create temporary tables with PostgreSQL syntax + // Create temp tables await localConnection.query(` DROP TABLE IF EXISTS temp_purchase_orders; - DROP TABLE IF EXISTS temp_po_receivings; - - CREATE TEMP TABLE temp_purchase_orders ( + CREATE TABLE temp_purchase_orders ( po_id INTEGER NOT NULL, pid INTEGER NOT NULL, sku VARCHAR(50), @@ -33,53 +31,8 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental cost_price DECIMAL(10,3), PRIMARY KEY (po_id, pid) ); - - CREATE TEMP TABLE temp_po_receivings ( - po_id INTEGER, - pid INTEGER NOT NULL, - receiving_id INTEGER NOT NULL, - qty_each INTEGER, - cost_each DECIMAL(10,3), - received_date TIMESTAMP WITH TIME ZONE, - received_by INTEGER, - received_by_name VARCHAR(255), - is_alt_po INTEGER, - PRIMARY KEY (receiving_id, pid) - ); `); - outputProgress({ - operation: `Starting ${incrementalUpdate ? 'incremental' : 'full'} purchase orders import`, - status: "running", - }); - - // Get column names - Keep MySQL compatible for production - const [columns] = await prodConnection.query(` - SELECT COLUMN_NAME - FROM INFORMATION_SCHEMA.COLUMNS - WHERE TABLE_NAME = 'purchase_orders' - AND COLUMN_NAME != 'updated' -- Exclude the updated column - ORDER BY ORDINAL_POSITION - `); - const columnNames = columns.map(col => col.COLUMN_NAME); - - // Build incremental conditions - const incrementalWhereClause = incrementalUpdate - ? `AND ( - p.date_updated > ? - OR p.date_ordered > ? - OR p.date_estin > ? - OR r.date_updated > ? - OR r.date_created > ? - OR r.date_checked > ? - OR rp.stamp > ? - OR rp.received_date > ? - )` - : ""; - const incrementalParams = incrementalUpdate - ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] - : []; - // First get all relevant PO IDs with basic info - Keep MySQL compatible for production const [[{ total }]] = await prodConnection.query(` SELECT COUNT(*) as total @@ -97,520 +50,166 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental OR p.date_estin > ? ) ` : ''} - UNION - SELECT DISTINCT r.receiving_id as po_id, rp.pid - FROM receivings_products rp - USE INDEX (received_date) - LEFT JOIN receivings r ON r.receiving_id = rp.receiving_id - WHERE rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) - ${incrementalUpdate ? ` - AND ( - r.date_created > ? - OR r.date_checked > ? - OR rp.stamp > ? - OR rp.received_date > ? - ) - ` : ''} ) all_items - `, incrementalUpdate ? [ - lastSyncTime, lastSyncTime, lastSyncTime, // PO conditions - lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime // Receiving conditions - ] : []); + `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []); console.log('Purchase Orders: Found changes:', total); // Get PO list - Keep MySQL compatible for production - const [poList] = await prodConnection.query(` - SELECT DISTINCT - COALESCE(p.po_id, r.receiving_id) as po_id, - COALESCE( - NULLIF(s1.companyname, ''), - NULLIF(s2.companyname, ''), - 'Unknown Vendor' - ) as vendor, - CASE - WHEN p.po_id IS NOT NULL THEN - COALESCE( - NULLIF(p.date_ordered, '0000-00-00 00:00:00'), - p.date_created - ) - WHEN r.receiving_id IS NOT NULL THEN - r.date_created - END as date, - CASE - WHEN p.date_estin = '0000-00-00' THEN NULL - WHEN p.date_estin IS NULL THEN NULL - WHEN p.date_estin NOT REGEXP '^[0-9]{4}-[0-9]{2}-[0-9]{2}$' THEN NULL - ELSE p.date_estin - END as expected_date, - COALESCE(p.status, 50) as status, - p.short_note as notes, - p.notes as long_note - FROM ( - SELECT po_id FROM po - USE INDEX (idx_date_created) - WHERE date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) - ${incrementalUpdate ? ` - AND ( - date_ordered > ? - OR date_updated > ? - OR date_estin > ? - ) - ` : ''} - UNION - SELECT DISTINCT r.receiving_id as po_id - FROM receivings r - JOIN receivings_products rp USE INDEX (received_date) ON r.receiving_id = rp.receiving_id - WHERE rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) - ${incrementalUpdate ? ` - AND ( - r.date_created > ? - OR r.date_checked > ? - OR rp.stamp > ? - OR rp.received_date > ? - ) - ` : ''} - ) ids - LEFT JOIN po p ON ids.po_id = p.po_id - LEFT JOIN suppliers s1 ON p.supplier_id = s1.supplierid - LEFT JOIN receivings r ON ids.po_id = r.receiving_id - LEFT JOIN suppliers s2 ON r.supplier_id = s2.supplierid - ORDER BY po_id - `, incrementalUpdate ? [ - lastSyncTime, lastSyncTime, lastSyncTime, // PO conditions - lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime // Receiving conditions - ] : []); - - console.log('Sample PO dates:', poList.slice(0, 5).map(po => ({ - po_id: po.po_id, - raw_date_ordered: po.raw_date_ordered, - raw_date_created: po.raw_date_created, - raw_date_estin: po.raw_date_estin, - computed_date: po.date, - expected_date: po.expected_date - }))); - - const totalItems = total; - let processed = 0; - - const BATCH_SIZE = 5000; - const PROGRESS_INTERVAL = 500; - let lastProgressUpdate = Date.now(); - - outputProgress({ - operation: `Starting purchase orders import - Processing ${totalItems} purchase order items`, - status: "running", - }); - - for (let i = 0; i < poList.length; i += BATCH_SIZE) { - const batch = poList.slice(i, Math.min(i + BATCH_SIZE, poList.length)); - const poIds = batch.map(po => po.po_id); - - // Get all products for these POs in one query - Keep MySQL compatible for production - const [poProducts] = await prodConnection.query(` - SELECT - pop.po_id, - pop.pid, - pr.itemnumber as sku, - pr.description as name, - pop.cost_each as cost_price, - pop.qty_each as ordered - FROM po_products pop - USE INDEX (PRIMARY) - JOIN products pr ON pop.pid = pr.pid - WHERE pop.po_id IN (?) - `, [poIds]); - - // Process PO products in smaller sub-batches to avoid packet size issues - const SUB_BATCH_SIZE = 5000; - for (let j = 0; j < poProducts.length; j += SUB_BATCH_SIZE) { - const productBatch = poProducts.slice(j, j + SUB_BATCH_SIZE); - const productPids = [...new Set(productBatch.map(p => p.pid))]; - const batchPoIds = [...new Set(productBatch.map(p => p.po_id))]; - - // Get receivings for this batch with employee names - const [receivings] = await prodConnection.query(` - SELECT - r.po_id, - rp.pid, - rp.receiving_id, - rp.qty_each, - rp.cost_each, - COALESCE(rp.received_date, r.date_created) as received_date, - rp.received_by, - CONCAT(e.firstname, ' ', e.lastname) as received_by_name, - CASE - WHEN r.po_id IS NULL THEN 2 -- No PO - WHEN r.po_id IN (?) THEN 0 -- Original PO - ELSE 1 -- Different PO - END as is_alt_po - FROM receivings_products rp - USE INDEX (received_date) - LEFT JOIN receivings r ON r.receiving_id = rp.receiving_id - LEFT JOIN employees e ON rp.received_by = e.employeeid - WHERE rp.pid IN (?) - AND rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR) - ORDER BY r.po_id, rp.pid, rp.received_date - `, [batchPoIds, productPids]); - - // Insert receivings into temp table - if (receivings.length > 0) { - // Process in smaller chunks to avoid parameter limits - const CHUNK_SIZE = 100; // Reduce chunk size to avoid parameter limits - for (let i = 0; i < receivings.length; i += CHUNK_SIZE) { - const chunk = receivings.slice(i, Math.min(i + CHUNK_SIZE, receivings.length)); - - const values = []; - const placeholders = []; - - chunk.forEach((r, idx) => { - values.push( - r.po_id, - r.pid, - r.receiving_id, - r.qty_each, - r.cost_each, - r.received_date, - r.received_by, - r.received_by_name || null, - r.is_alt_po - ); - - const offset = idx * 9; - placeholders.push(`($${offset + 1}, $${offset + 2}, $${offset + 3}, $${offset + 4}, $${offset + 5}, $${offset + 6}, $${offset + 7}, $${offset + 8}, $${offset + 9})`); - }); - - await localConnection.query(` - INSERT INTO temp_po_receivings ( - po_id, pid, receiving_id, qty_each, cost_each, received_date, - received_by, received_by_name, is_alt_po - ) - VALUES ${placeholders.join(',')} - ON CONFLICT (receiving_id, pid) DO UPDATE SET - po_id = EXCLUDED.po_id, - qty_each = EXCLUDED.qty_each, - cost_each = EXCLUDED.cost_each, - received_date = EXCLUDED.received_date, - received_by = EXCLUDED.received_by, - received_by_name = EXCLUDED.received_by_name, - is_alt_po = EXCLUDED.is_alt_po - `, values); - } - } - - // Process each PO product in chunks - const PRODUCT_CHUNK_SIZE = 100; - for (let i = 0; i < productBatch.length; i += PRODUCT_CHUNK_SIZE) { - const chunk = productBatch.slice(i, Math.min(i + PRODUCT_CHUNK_SIZE, productBatch.length)); - const values = []; - const placeholders = []; - - chunk.forEach((product, idx) => { - const po = batch.find(p => p.po_id === product.po_id); - if (!po) return; - - values.push( - product.po_id, - product.pid, - product.sku, - product.name, - po.vendor, - po.date, - po.expected_date, - po.status, - po.notes || po.long_note, - product.ordered, - product.cost_price - ); - - const offset = idx * 11; // Updated to match 11 fields - placeholders.push(`($${offset + 1}, $${offset + 2}, $${offset + 3}, $${offset + 4}, $${offset + 5}, $${offset + 6}, $${offset + 7}, $${offset + 8}, $${offset + 9}, $${offset + 10}, $${offset + 11})`); - }); - - if (placeholders.length > 0) { - await localConnection.query(` - INSERT INTO temp_purchase_orders ( - po_id, pid, sku, name, vendor, date, expected_date, - status, notes, ordered, cost_price - ) - VALUES ${placeholders.join(',')} - ON CONFLICT (po_id, pid) DO UPDATE SET - sku = EXCLUDED.sku, - name = EXCLUDED.name, - vendor = EXCLUDED.vendor, - date = EXCLUDED.date, - expected_date = EXCLUDED.expected_date, - status = EXCLUDED.status, - notes = EXCLUDED.notes, - ordered = EXCLUDED.ordered, - cost_price = EXCLUDED.cost_price - `, values); - } - - processed += chunk.length; - - // Update progress based on time interval - const now = Date.now(); - if (now - lastProgressUpdate >= PROGRESS_INTERVAL || processed === totalItems) { - outputProgress({ - status: "running", - operation: "Purchase orders import", - current: processed, - total: totalItems, - elapsed: formatElapsedTime((Date.now() - startTime) / 1000), - remaining: estimateRemaining(startTime, processed, totalItems), - rate: calculateRate(startTime, processed) - }); - lastProgressUpdate = now; - } - } - } - } - - // Insert final data into purchase_orders table in chunks - const FINAL_CHUNK_SIZE = 1000; + console.log('Fetching purchase orders in batches...'); + + const FETCH_BATCH_SIZE = 5000; + let offset = 0; + let allProcessed = false; let totalProcessed = 0; - const totalPosResult = await localConnection.query('SELECT COUNT(*) as total_pos FROM temp_purchase_orders'); - const total_pos = parseInt(totalPosResult.rows?.[0]?.total_pos || '0', 10); + + while (!allProcessed) { + console.log(`Fetching batch at offset ${offset}...`); + const [poList] = await prodConnection.query(` + SELECT DISTINCT + COALESCE(p.po_id, 0) as po_id, + pop.pid, + COALESCE(NULLIF(pr.itemnumber, ''), 'NO-SKU') as sku, + COALESCE(pr.description, 'Unknown Product') as name, + COALESCE(NULLIF(s.companyname, ''), 'Unknown Vendor') as vendor, + COALESCE(p.date_ordered, p.date_created) as date, + p.date_estin as expected_date, + COALESCE(p.status, 1) as status, + COALESCE(p.short_note, p.notes) as notes, + pop.qty_each as ordered, + pop.cost_each as cost_price + FROM po p + JOIN po_products pop ON p.po_id = pop.po_id + JOIN products pr ON pop.pid = pr.pid + LEFT JOIN suppliers s ON p.supplier_id = s.supplierid + WHERE p.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) + ${incrementalUpdate ? ` + AND ( + p.date_updated > ? + OR p.date_ordered > ? + OR p.date_estin > ? + ) + ` : ''} + ORDER BY p.po_id, pop.pid + LIMIT ${FETCH_BATCH_SIZE} OFFSET ${offset} + `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []); - outputProgress({ - status: "running", - operation: "Purchase orders final import", - message: `Processing ${total_pos} purchase orders for final import`, - current: 0, - total: total_pos - }); - - // Process in chunks using cursor-based pagination - let lastPoId = 0; - let lastPid = 0; - let recordsAdded = 0; - let recordsUpdated = 0; - - while (true) { - console.log('Fetching next chunk with lastPoId:', lastPoId, 'lastPid:', lastPid); - const chunkResult = await localConnection.query(` - SELECT po_id, pid FROM temp_purchase_orders - WHERE (po_id, pid) > ($1, $2) - ORDER BY po_id, pid - LIMIT $3 - `, [lastPoId, lastPid, FINAL_CHUNK_SIZE]); - - if (!chunkResult?.rows) { - console.error('No rows returned from chunk query:', chunkResult); + if (poList.length === 0) { + allProcessed = true; break; } - const chunk = chunkResult.rows; - console.log('Got chunk of size:', chunk.length); - if (chunk.length === 0) break; + console.log(`Processing batch of ${poList.length} purchase order items (${offset}-${offset + poList.length})`); + + let processed = 0; - const result = await localConnection.query(` - WITH inserted_pos AS ( - INSERT INTO purchase_orders ( - po_id, pid, sku, name, cost_price, po_cost_price, - vendor, date, expected_date, status, notes, - ordered, received, receiving_status, - received_date, last_received_date, received_by, - receiving_history - ) - SELECT + // Process each PO in a separate insert to avoid parameter issues + for (let i = 0; i < poList.length; i++) { + const po = poList[i]; + + try { + // Single row insert + await localConnection.query(` + INSERT INTO temp_purchase_orders ( + po_id, pid, sku, name, vendor, date, expected_date, + status, notes, ordered, cost_price + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) + ON CONFLICT (po_id, pid) DO UPDATE SET + sku = EXCLUDED.sku, + name = EXCLUDED.name, + vendor = EXCLUDED.vendor, + date = EXCLUDED.date, + expected_date = EXCLUDED.expected_date, + status = EXCLUDED.status, + notes = EXCLUDED.notes, + ordered = EXCLUDED.ordered, + cost_price = EXCLUDED.cost_price + `, [ po.po_id, po.pid, po.sku, po.name, - COALESCE( - ( - SELECT cost_each - FROM temp_po_receivings r2 - WHERE r2.pid = po.pid - AND r2.po_id = po.po_id - AND r2.is_alt_po = 0 - AND r2.cost_each > 0 - ORDER BY r2.received_date - LIMIT 1 - ), - po.cost_price - ) as cost_price, - po.cost_price as po_cost_price, po.vendor, po.date, po.expected_date, po.status, po.notes, po.ordered, - COALESCE(SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END), 0) as received, - CASE - WHEN COUNT(r.receiving_id) = 0 THEN 1 -- created - WHEN SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END) < po.ordered THEN 30 -- partial - ELSE 40 -- full - END as receiving_status, - MIN(CASE WHEN r.is_alt_po = 0 THEN r.received_date END) as received_date, - MAX(CASE WHEN r.is_alt_po = 0 THEN r.received_date END) as last_received_date, - ( - SELECT r2.received_by_name - FROM temp_po_receivings r2 - WHERE r2.pid = po.pid - AND r2.is_alt_po = 0 - ORDER BY r2.received_date - LIMIT 1 - ) as received_by, - jsonb_build_object( - 'ordered_qty', po.ordered, - 'total_received', COALESCE(SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END), 0), - 'remaining_unfulfilled', GREATEST(0, po.ordered - COALESCE(SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END), 0)), - 'excess_received', GREATEST(0, COALESCE(SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END), 0) - po.ordered), - 'po_cost', po.cost_price, - 'actual_cost', COALESCE( - ( - SELECT cost_each - FROM temp_po_receivings r2 - WHERE r2.pid = po.pid - AND r2.is_alt_po = 0 - AND r2.cost_each > 0 - ORDER BY r2.received_date - LIMIT 1 - ), - po.cost_price - ), - 'fulfillment', ( - SELECT jsonb_agg( - jsonb_build_object( - 'receiving_id', r2.receiving_id, - 'qty_applied', CASE - WHEN r2.running_total <= po.ordered THEN r2.qty_each - WHEN r2.running_total - r2.qty_each < po.ordered THEN po.ordered - (r2.running_total - r2.qty_each) - ELSE 0 - END, - 'qty_total', r2.qty_each, - 'cost', r2.cost_each, - 'date', r2.received_date, - 'received_by', r2.received_by, - 'received_by_name', r2.received_by_name, - 'type', CASE r2.is_alt_po - WHEN 0 THEN 'original' - WHEN 1 THEN 'alternate' - ELSE 'no_po' - END, - 'remaining_qty', CASE - WHEN r2.running_total <= po.ordered THEN 0 - WHEN r2.running_total - r2.qty_each < po.ordered THEN r2.running_total - po.ordered - ELSE r2.qty_each - END, - 'is_excess', r2.running_total > po.ordered - ) - ORDER BY r2.received_date - ) - FROM ( - SELECT - r2.*, - SUM(r2.qty_each) OVER ( - PARTITION BY r2.pid - ORDER BY r2.received_date - ROWS UNBOUNDED PRECEDING - ) as running_total - FROM temp_po_receivings r2 - WHERE r2.pid = po.pid - ) r2 - ), - 'alternate_po_receivings', ( - SELECT jsonb_agg( - jsonb_build_object( - 'receiving_id', r2.receiving_id, - 'qty', r2.qty_each, - 'cost', r2.cost_each, - 'date', r2.received_date, - 'received_by', r2.received_by, - 'received_by_name', r2.received_by_name - ) - ORDER BY r2.received_date - ) - FROM temp_po_receivings r2 - WHERE r2.pid = po.pid AND r2.is_alt_po = 1 - ), - 'no_po_receivings', ( - SELECT jsonb_agg( - jsonb_build_object( - 'receiving_id', r2.receiving_id, - 'qty', r2.qty_each, - 'cost', r2.cost_each, - 'date', r2.received_date, - 'received_by', r2.received_by, - 'received_by_name', r2.received_by_name - ) - ORDER BY r2.received_date - ) - FROM temp_po_receivings r2 - WHERE r2.pid = po.pid AND r2.is_alt_po = 2 - ) - ) as receiving_history - FROM temp_purchase_orders po - LEFT JOIN temp_po_receivings r ON po.pid = r.pid - WHERE (po.po_id, po.pid) IN ( - SELECT po_id, pid FROM UNNEST($1::int[], $2::int[]) - ) - GROUP BY po.po_id, po.pid, po.sku, po.name, po.vendor, po.date, - po.expected_date, po.status, po.notes, po.ordered, po.cost_price - ON CONFLICT (po_id, pid) DO UPDATE SET - vendor = EXCLUDED.vendor, - date = EXCLUDED.date, - expected_date = EXCLUDED.expected_date, - status = EXCLUDED.status, - notes = EXCLUDED.notes, - ordered = EXCLUDED.ordered, - received = EXCLUDED.received, - receiving_status = EXCLUDED.receiving_status, - received_date = EXCLUDED.received_date, - last_received_date = EXCLUDED.last_received_date, - received_by = EXCLUDED.received_by, - receiving_history = EXCLUDED.receiving_history, - cost_price = EXCLUDED.cost_price, - po_cost_price = EXCLUDED.po_cost_price - RETURNING xmax - ) - SELECT - COUNT(*) FILTER (WHERE xmax = 0) as inserted, - COUNT(*) FILTER (WHERE xmax <> 0) as updated - FROM inserted_pos - `, [ - chunk.map(r => r.po_id), - chunk.map(r => r.pid) - ]); - - // Add debug logging - console.log('Insert result:', result?.rows?.[0]); - - // Handle the result properly for PostgreSQL with more defensive coding - const resultRow = result?.rows?.[0] || {}; - const insertCount = parseInt(resultRow.inserted || '0', 10); - const updateCount = parseInt(resultRow.updated || '0', 10); - - recordsAdded += insertCount; - recordsUpdated += updateCount; - totalProcessed += chunk.length; - - // Update progress - outputProgress({ - status: "running", - operation: "Purchase orders final import", - message: `Processed ${totalProcessed} of ${total_pos} purchase orders`, - current: totalProcessed, - total: total_pos, - elapsed: formatElapsedTime((Date.now() - startTime) / 1000), - remaining: estimateRemaining(startTime, totalProcessed, total_pos), - rate: calculateRate(startTime, totalProcessed) - }); - - // Update last processed IDs for next chunk with safety check - if (chunk.length > 0) { - const lastItem = chunk[chunk.length - 1]; - if (lastItem) { - lastPoId = lastItem.po_id; - lastPid = lastItem.pid; + po.cost_price + ]); + + processed++; + totalProcessed++; + + // Only log occasionally + if (processed % 500 === 0 || processed === 1 || processed === poList.length) { + outputProgress({ + status: "running", + operation: "Purchase orders import", + message: `Batch ${Math.floor(offset/FETCH_BATCH_SIZE) + 1}: ${processed}/${poList.length} (Total: ${totalProcessed}/${total})`, + current: totalProcessed, + total: total, + elapsed: formatElapsedTime((Date.now() - startTime) / 1000), + remaining: estimateRemaining(startTime, totalProcessed, total), + rate: calculateRate(startTime, totalProcessed) + }); + } + } catch (error) { + console.error(`Error inserting PO #${po.po_id} product #${po.pid}:`, error.message); + console.log('PO data:', po); } } + + // Update offset for next batch + offset += poList.length; + + // Check if we've received fewer records than the batch size, meaning we're done + if (poList.length < FETCH_BATCH_SIZE) { + allProcessed = true; + } } + // Count the temp table contents + const [tempCount] = await localConnection.query(`SELECT COUNT(*) FROM temp_purchase_orders`); + const tempRowCount = parseInt(tempCount.rows[0].count); + console.log(`Successfully inserted ${tempRowCount} rows into temp_purchase_orders`); + + // Now insert into the final table + const [result] = await localConnection.query(` + WITH inserted_pos AS ( + INSERT INTO purchase_orders ( + po_id, pid, sku, name, cost_price, po_cost_price, + vendor, date, expected_date, status, notes, + ordered, received, receiving_status + ) + SELECT + po_id, pid, sku, name, cost_price, cost_price, + vendor, date, expected_date, status, notes, + ordered, 0 as received, 1 as receiving_status + FROM temp_purchase_orders + ON CONFLICT (po_id, pid) DO UPDATE SET + vendor = EXCLUDED.vendor, + date = EXCLUDED.date, + expected_date = EXCLUDED.expected_date, + status = EXCLUDED.status, + notes = EXCLUDED.notes, + ordered = EXCLUDED.ordered, + cost_price = EXCLUDED.cost_price, + po_cost_price = EXCLUDED.po_cost_price + RETURNING xmax = 0 as inserted + ) + SELECT + COUNT(*) FILTER (WHERE inserted) as inserted, + COUNT(*) FILTER (WHERE NOT inserted) as updated + FROM inserted_pos + `); + + // Parse the result + const { inserted, updated } = result.rows[0]; + recordsAdded = parseInt(inserted) || 0; + recordsUpdated = parseInt(updated) || 0; + // Update sync status await localConnection.query(` INSERT INTO sync_status (table_name, last_sync_timestamp) @@ -620,29 +219,31 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental `); // Clean up temporary tables - await localConnection.query(` - DROP TABLE IF EXISTS temp_purchase_orders; - DROP TABLE IF EXISTS temp_po_receivings; - `); + await localConnection.query(`DROP TABLE IF EXISTS temp_purchase_orders;`); return { status: "complete", - recordsAdded, - recordsUpdated, - totalRecords: processed + recordsAdded: recordsAdded || 0, + recordsUpdated: recordsUpdated || 0, + totalRecords: totalProcessed }; } catch (error) { console.error("Error during purchase orders import:", error); + // Attempt cleanup on error try { - await localConnection.query(` - DROP TABLE IF EXISTS temp_purchase_orders; - DROP TABLE IF EXISTS temp_po_receivings; - `); + await localConnection.query(`DROP TABLE IF EXISTS temp_purchase_orders;`); } catch (cleanupError) { - console.error('Error during cleanup:', cleanupError); + console.error('Error during cleanup:', cleanupError.message); } - throw error; + + return { + status: "error", + error: error.message, + recordsAdded: 0, + recordsUpdated: 0, + totalRecords: 0 + }; } } diff --git a/inventory-server/scripts/reset-db.js b/inventory-server/scripts/reset-db.js index d081078..b3b52ae 100644 --- a/inventory-server/scripts/reset-db.js +++ b/inventory-server/scripts/reset-db.js @@ -184,7 +184,7 @@ async function resetDatabase() { SELECT string_agg(tablename, ', ') as tables FROM pg_tables WHERE schemaname = 'public' - AND tablename NOT IN ('users', 'permissions', 'user_permissions', 'calculate_history', 'import_history'); + AND tablename NOT IN ('users', 'permissions', 'user_permissions', 'calculate_history', 'import_history', 'ai_prompts', 'ai_validation_performance', 'templates'); `); if (!tablesResult.rows[0].tables) { diff --git a/inventory-server/src/routes/csv.js b/inventory-server/src/routes/csv.js index ce916f9..e8a280a 100644 --- a/inventory-server/src/routes/csv.js +++ b/inventory-server/src/routes/csv.js @@ -757,8 +757,8 @@ router.get('/history/import', async (req, res) => { end_time, status, error_message, - rows_processed::integer, - files_processed::integer + records_added::integer, + records_updated::integer FROM import_history ORDER BY start_time DESC LIMIT 20