const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress'); async function importPurchaseOrders(prodConnection, localConnection, incrementalUpdate = true) { const startTime = Date.now(); let recordsAdded = 0; let recordsUpdated = 0; try { // Get last sync info const [syncInfo] = await localConnection.query( "SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'purchase_orders'" ); const lastSyncTime = syncInfo?.rows?.[0]?.last_sync_timestamp || '1970-01-01'; console.log('Purchase Orders: Using last sync time:', lastSyncTime); // Create temporary tables with PostgreSQL syntax await localConnection.query(` DROP TABLE IF EXISTS temp_purchase_orders; DROP TABLE IF EXISTS temp_po_receivings; CREATE TEMP TABLE temp_purchase_orders ( po_id INTEGER NOT NULL, pid INTEGER NOT NULL, sku VARCHAR(50), name VARCHAR(255), vendor VARCHAR(255), date TIMESTAMP WITH TIME ZONE, expected_date TIMESTAMP WITH TIME ZONE, status INTEGER, notes TEXT, ordered INTEGER, cost_price DECIMAL(10,3), PRIMARY KEY (po_id, pid) ); CREATE TEMP TABLE temp_po_receivings ( po_id INTEGER, pid INTEGER NOT NULL, receiving_id INTEGER NOT NULL, qty_each INTEGER, cost_each DECIMAL(10,3), received_date TIMESTAMP WITH TIME ZONE, received_by INTEGER, received_by_name VARCHAR(255), is_alt_po INTEGER, PRIMARY KEY (receiving_id, pid) ); `); outputProgress({ operation: `Starting ${incrementalUpdate ? 'incremental' : 'full'} purchase orders import`, status: "running", }); // Get column names - Keep MySQL compatible for production const [columns] = await prodConnection.query(` SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'purchase_orders' AND COLUMN_NAME != 'updated' -- Exclude the updated column ORDER BY ORDINAL_POSITION `); const columnNames = columns.map(col => col.COLUMN_NAME); // Build incremental conditions const incrementalWhereClause = incrementalUpdate ? `AND ( p.date_updated > ? OR p.date_ordered > ? OR p.date_estin > ? OR r.date_updated > ? OR r.date_created > ? OR r.date_checked > ? OR rp.stamp > ? OR rp.received_date > ? )` : ""; const incrementalParams = incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []; // First get all relevant PO IDs with basic info - Keep MySQL compatible for production const [[{ total }]] = await prodConnection.query(` SELECT COUNT(*) as total FROM ( SELECT DISTINCT pop.po_id, pop.pid FROM po p USE INDEX (idx_date_created) JOIN po_products pop ON p.po_id = pop.po_id JOIN suppliers s ON p.supplier_id = s.supplierid WHERE p.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) ${incrementalUpdate ? ` AND ( p.date_updated > ? OR p.date_ordered > ? OR p.date_estin > ? ) ` : ''} UNION SELECT DISTINCT r.receiving_id as po_id, rp.pid FROM receivings_products rp USE INDEX (received_date) LEFT JOIN receivings r ON r.receiving_id = rp.receiving_id WHERE rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) ${incrementalUpdate ? ` AND ( r.date_created > ? OR r.date_checked > ? OR rp.stamp > ? OR rp.received_date > ? ) ` : ''} ) all_items `, incrementalUpdate ? [ lastSyncTime, lastSyncTime, lastSyncTime, // PO conditions lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime // Receiving conditions ] : []); console.log('Purchase Orders: Found changes:', total); // Get PO list - Keep MySQL compatible for production const [poList] = await prodConnection.query(` SELECT DISTINCT COALESCE(p.po_id, r.receiving_id) as po_id, COALESCE( NULLIF(s1.companyname, ''), NULLIF(s2.companyname, ''), 'Unknown Vendor' ) as vendor, CASE WHEN p.po_id IS NOT NULL THEN COALESCE( NULLIF(p.date_ordered, '0000-00-00 00:00:00'), p.date_created ) WHEN r.receiving_id IS NOT NULL THEN r.date_created END as date, CASE WHEN p.date_estin = '0000-00-00' THEN NULL WHEN p.date_estin IS NULL THEN NULL WHEN p.date_estin NOT REGEXP '^[0-9]{4}-[0-9]{2}-[0-9]{2}$' THEN NULL ELSE p.date_estin END as expected_date, COALESCE(p.status, 50) as status, p.short_note as notes, p.notes as long_note FROM ( SELECT po_id FROM po USE INDEX (idx_date_created) WHERE date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) ${incrementalUpdate ? ` AND ( date_ordered > ? OR date_updated > ? OR date_estin > ? ) ` : ''} UNION SELECT DISTINCT r.receiving_id as po_id FROM receivings r JOIN receivings_products rp USE INDEX (received_date) ON r.receiving_id = rp.receiving_id WHERE rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) ${incrementalUpdate ? ` AND ( r.date_created > ? OR r.date_checked > ? OR rp.stamp > ? OR rp.received_date > ? ) ` : ''} ) ids LEFT JOIN po p ON ids.po_id = p.po_id LEFT JOIN suppliers s1 ON p.supplier_id = s1.supplierid LEFT JOIN receivings r ON ids.po_id = r.receiving_id LEFT JOIN suppliers s2 ON r.supplier_id = s2.supplierid ORDER BY po_id `, incrementalUpdate ? [ lastSyncTime, lastSyncTime, lastSyncTime, // PO conditions lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime // Receiving conditions ] : []); console.log('Sample PO dates:', poList.slice(0, 5).map(po => ({ po_id: po.po_id, raw_date_ordered: po.raw_date_ordered, raw_date_created: po.raw_date_created, raw_date_estin: po.raw_date_estin, computed_date: po.date, expected_date: po.expected_date }))); const totalItems = total; let processed = 0; const BATCH_SIZE = 5000; const PROGRESS_INTERVAL = 500; let lastProgressUpdate = Date.now(); outputProgress({ operation: `Starting purchase orders import - Processing ${totalItems} purchase order items`, status: "running", }); for (let i = 0; i < poList.length; i += BATCH_SIZE) { const batch = poList.slice(i, Math.min(i + BATCH_SIZE, poList.length)); const poIds = batch.map(po => po.po_id); // Get all products for these POs in one query - Keep MySQL compatible for production const [poProducts] = await prodConnection.query(` SELECT pop.po_id, pop.pid, pr.itemnumber as sku, pr.description as name, pop.cost_each as cost_price, pop.qty_each as ordered FROM po_products pop USE INDEX (PRIMARY) JOIN products pr ON pop.pid = pr.pid WHERE pop.po_id IN (?) `, [poIds]); // Process PO products in smaller sub-batches to avoid packet size issues const SUB_BATCH_SIZE = 5000; for (let j = 0; j < poProducts.length; j += SUB_BATCH_SIZE) { const productBatch = poProducts.slice(j, j + SUB_BATCH_SIZE); const productPids = [...new Set(productBatch.map(p => p.pid))]; const batchPoIds = [...new Set(productBatch.map(p => p.po_id))]; // Get receivings for this batch with employee names const [receivings] = await prodConnection.query(` SELECT r.po_id, rp.pid, rp.receiving_id, rp.qty_each, rp.cost_each, COALESCE(rp.received_date, r.date_created) as received_date, rp.received_by, CONCAT(e.firstname, ' ', e.lastname) as received_by_name, CASE WHEN r.po_id IS NULL THEN 2 -- No PO WHEN r.po_id IN (?) THEN 0 -- Original PO ELSE 1 -- Different PO END as is_alt_po FROM receivings_products rp USE INDEX (received_date) LEFT JOIN receivings r ON r.receiving_id = rp.receiving_id LEFT JOIN employees e ON rp.received_by = e.employeeid WHERE rp.pid IN (?) AND rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR) ORDER BY r.po_id, rp.pid, rp.received_date `, [batchPoIds, productPids]); // Insert receivings into temp table if (receivings.length > 0) { // Process in smaller chunks to avoid parameter limits const CHUNK_SIZE = 100; // Reduce chunk size to avoid parameter limits for (let i = 0; i < receivings.length; i += CHUNK_SIZE) { const chunk = receivings.slice(i, Math.min(i + CHUNK_SIZE, receivings.length)); const values = []; const placeholders = []; chunk.forEach((r, idx) => { values.push( r.po_id, r.pid, r.receiving_id, r.qty_each, r.cost_each, r.received_date, r.received_by, r.received_by_name || null, r.is_alt_po ); const offset = idx * 9; placeholders.push(`($${offset + 1}, $${offset + 2}, $${offset + 3}, $${offset + 4}, $${offset + 5}, $${offset + 6}, $${offset + 7}, $${offset + 8}, $${offset + 9})`); }); await localConnection.query(` INSERT INTO temp_po_receivings ( po_id, pid, receiving_id, qty_each, cost_each, received_date, received_by, received_by_name, is_alt_po ) VALUES ${placeholders.join(',')} ON CONFLICT (receiving_id, pid) DO UPDATE SET po_id = EXCLUDED.po_id, qty_each = EXCLUDED.qty_each, cost_each = EXCLUDED.cost_each, received_date = EXCLUDED.received_date, received_by = EXCLUDED.received_by, received_by_name = EXCLUDED.received_by_name, is_alt_po = EXCLUDED.is_alt_po `, values); } } // Process each PO product in chunks const PRODUCT_CHUNK_SIZE = 100; for (let i = 0; i < productBatch.length; i += PRODUCT_CHUNK_SIZE) { const chunk = productBatch.slice(i, Math.min(i + PRODUCT_CHUNK_SIZE, productBatch.length)); const values = []; const placeholders = []; chunk.forEach((product, idx) => { const po = batch.find(p => p.po_id === product.po_id); if (!po) return; values.push( product.po_id, product.pid, product.sku, product.name, po.vendor, po.date, po.expected_date, po.status, po.notes || po.long_note, product.ordered, product.cost_price ); const offset = idx * 11; // Updated to match 11 fields placeholders.push(`($${offset + 1}, $${offset + 2}, $${offset + 3}, $${offset + 4}, $${offset + 5}, $${offset + 6}, $${offset + 7}, $${offset + 8}, $${offset + 9}, $${offset + 10}, $${offset + 11})`); }); if (placeholders.length > 0) { await localConnection.query(` INSERT INTO temp_purchase_orders ( po_id, pid, sku, name, vendor, date, expected_date, status, notes, ordered, cost_price ) VALUES ${placeholders.join(',')} ON CONFLICT (po_id, pid) DO UPDATE SET sku = EXCLUDED.sku, name = EXCLUDED.name, vendor = EXCLUDED.vendor, date = EXCLUDED.date, expected_date = EXCLUDED.expected_date, status = EXCLUDED.status, notes = EXCLUDED.notes, ordered = EXCLUDED.ordered, cost_price = EXCLUDED.cost_price `, values); } processed += chunk.length; // Update progress based on time interval const now = Date.now(); if (now - lastProgressUpdate >= PROGRESS_INTERVAL || processed === totalItems) { outputProgress({ status: "running", operation: "Purchase orders import", current: processed, total: totalItems, elapsed: formatElapsedTime((Date.now() - startTime) / 1000), remaining: estimateRemaining(startTime, processed, totalItems), rate: calculateRate(startTime, processed) }); lastProgressUpdate = now; } } } } // Insert final data into purchase_orders table in chunks const FINAL_CHUNK_SIZE = 1000; let totalProcessed = 0; const totalPosResult = await localConnection.query('SELECT COUNT(*) as total_pos FROM temp_purchase_orders'); const total_pos = parseInt(totalPosResult.rows?.[0]?.total_pos || '0', 10); outputProgress({ status: "running", operation: "Purchase orders final import", message: `Processing ${total_pos} purchase orders for final import`, current: 0, total: total_pos }); // Process in chunks using cursor-based pagination let lastPoId = 0; let lastPid = 0; let recordsAdded = 0; let recordsUpdated = 0; while (true) { console.log('Fetching next chunk with lastPoId:', lastPoId, 'lastPid:', lastPid); const chunkResult = await localConnection.query(` SELECT po_id, pid FROM temp_purchase_orders WHERE (po_id, pid) > ($1, $2) ORDER BY po_id, pid LIMIT $3 `, [lastPoId, lastPid, FINAL_CHUNK_SIZE]); if (!chunkResult?.rows) { console.error('No rows returned from chunk query:', chunkResult); break; } const chunk = chunkResult.rows; console.log('Got chunk of size:', chunk.length); if (chunk.length === 0) break; const result = await localConnection.query(` WITH inserted_pos AS ( INSERT INTO purchase_orders ( po_id, pid, sku, name, cost_price, po_cost_price, vendor, date, expected_date, status, notes, ordered, received, receiving_status, received_date, last_received_date, received_by, receiving_history ) SELECT po.po_id, po.pid, po.sku, po.name, COALESCE( ( SELECT cost_each FROM temp_po_receivings r2 WHERE r2.pid = po.pid AND r2.po_id = po.po_id AND r2.is_alt_po = 0 AND r2.cost_each > 0 ORDER BY r2.received_date LIMIT 1 ), po.cost_price ) as cost_price, po.cost_price as po_cost_price, po.vendor, po.date, po.expected_date, po.status, po.notes, po.ordered, COALESCE(SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END), 0) as received, CASE WHEN COUNT(r.receiving_id) = 0 THEN 1 -- created WHEN SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END) < po.ordered THEN 30 -- partial ELSE 40 -- full END as receiving_status, MIN(CASE WHEN r.is_alt_po = 0 THEN r.received_date END) as received_date, MAX(CASE WHEN r.is_alt_po = 0 THEN r.received_date END) as last_received_date, ( SELECT r2.received_by_name FROM temp_po_receivings r2 WHERE r2.pid = po.pid AND r2.is_alt_po = 0 ORDER BY r2.received_date LIMIT 1 ) as received_by, jsonb_build_object( 'ordered_qty', po.ordered, 'total_received', COALESCE(SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END), 0), 'remaining_unfulfilled', GREATEST(0, po.ordered - COALESCE(SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END), 0)), 'excess_received', GREATEST(0, COALESCE(SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END), 0) - po.ordered), 'po_cost', po.cost_price, 'actual_cost', COALESCE( ( SELECT cost_each FROM temp_po_receivings r2 WHERE r2.pid = po.pid AND r2.is_alt_po = 0 AND r2.cost_each > 0 ORDER BY r2.received_date LIMIT 1 ), po.cost_price ), 'fulfillment', ( SELECT jsonb_agg( jsonb_build_object( 'receiving_id', r2.receiving_id, 'qty_applied', CASE WHEN r2.running_total <= po.ordered THEN r2.qty_each WHEN r2.running_total - r2.qty_each < po.ordered THEN po.ordered - (r2.running_total - r2.qty_each) ELSE 0 END, 'qty_total', r2.qty_each, 'cost', r2.cost_each, 'date', r2.received_date, 'received_by', r2.received_by, 'received_by_name', r2.received_by_name, 'type', CASE r2.is_alt_po WHEN 0 THEN 'original' WHEN 1 THEN 'alternate' ELSE 'no_po' END, 'remaining_qty', CASE WHEN r2.running_total <= po.ordered THEN 0 WHEN r2.running_total - r2.qty_each < po.ordered THEN r2.running_total - po.ordered ELSE r2.qty_each END, 'is_excess', r2.running_total > po.ordered ) ORDER BY r2.received_date ) FROM ( SELECT r2.*, SUM(r2.qty_each) OVER ( PARTITION BY r2.pid ORDER BY r2.received_date ROWS UNBOUNDED PRECEDING ) as running_total FROM temp_po_receivings r2 WHERE r2.pid = po.pid ) r2 ), 'alternate_po_receivings', ( SELECT jsonb_agg( jsonb_build_object( 'receiving_id', r2.receiving_id, 'qty', r2.qty_each, 'cost', r2.cost_each, 'date', r2.received_date, 'received_by', r2.received_by, 'received_by_name', r2.received_by_name ) ORDER BY r2.received_date ) FROM temp_po_receivings r2 WHERE r2.pid = po.pid AND r2.is_alt_po = 1 ), 'no_po_receivings', ( SELECT jsonb_agg( jsonb_build_object( 'receiving_id', r2.receiving_id, 'qty', r2.qty_each, 'cost', r2.cost_each, 'date', r2.received_date, 'received_by', r2.received_by, 'received_by_name', r2.received_by_name ) ORDER BY r2.received_date ) FROM temp_po_receivings r2 WHERE r2.pid = po.pid AND r2.is_alt_po = 2 ) ) as receiving_history FROM temp_purchase_orders po LEFT JOIN temp_po_receivings r ON po.pid = r.pid WHERE (po.po_id, po.pid) IN ( SELECT po_id, pid FROM UNNEST($1::int[], $2::int[]) ) GROUP BY po.po_id, po.pid, po.sku, po.name, po.vendor, po.date, po.expected_date, po.status, po.notes, po.ordered, po.cost_price ON CONFLICT (po_id, pid) DO UPDATE SET vendor = EXCLUDED.vendor, date = EXCLUDED.date, expected_date = EXCLUDED.expected_date, status = EXCLUDED.status, notes = EXCLUDED.notes, ordered = EXCLUDED.ordered, received = EXCLUDED.received, receiving_status = EXCLUDED.receiving_status, received_date = EXCLUDED.received_date, last_received_date = EXCLUDED.last_received_date, received_by = EXCLUDED.received_by, receiving_history = EXCLUDED.receiving_history, cost_price = EXCLUDED.cost_price, po_cost_price = EXCLUDED.po_cost_price RETURNING xmax ) SELECT COUNT(*) FILTER (WHERE xmax = 0) as inserted, COUNT(*) FILTER (WHERE xmax <> 0) as updated FROM inserted_pos `, [ chunk.map(r => r.po_id), chunk.map(r => r.pid) ]); // Add debug logging console.log('Insert result:', result?.rows?.[0]); // Handle the result properly for PostgreSQL with more defensive coding const resultRow = result?.rows?.[0] || {}; const insertCount = parseInt(resultRow.inserted || '0', 10); const updateCount = parseInt(resultRow.updated || '0', 10); recordsAdded += insertCount; recordsUpdated += updateCount; totalProcessed += chunk.length; // Update progress outputProgress({ status: "running", operation: "Purchase orders final import", message: `Processed ${totalProcessed} of ${total_pos} purchase orders`, current: totalProcessed, total: total_pos, elapsed: formatElapsedTime((Date.now() - startTime) / 1000), remaining: estimateRemaining(startTime, totalProcessed, total_pos), rate: calculateRate(startTime, totalProcessed) }); // Update last processed IDs for next chunk with safety check if (chunk.length > 0) { const lastItem = chunk[chunk.length - 1]; if (lastItem) { lastPoId = lastItem.po_id; lastPid = lastItem.pid; } } } // Update sync status await localConnection.query(` INSERT INTO sync_status (table_name, last_sync_timestamp) VALUES ('purchase_orders', NOW()) ON CONFLICT (table_name) DO UPDATE SET last_sync_timestamp = NOW() `); // Clean up temporary tables await localConnection.query(` DROP TABLE IF EXISTS temp_purchase_orders; DROP TABLE IF EXISTS temp_po_receivings; `); return { status: "complete", recordsAdded, recordsUpdated, totalRecords: processed }; } catch (error) { console.error("Error during purchase orders import:", error); // Attempt cleanup on error try { await localConnection.query(` DROP TABLE IF EXISTS temp_purchase_orders; DROP TABLE IF EXISTS temp_po_receivings; `); } catch (cleanupError) { console.error('Error during cleanup:', cleanupError); } throw error; } } module.exports = importPurchaseOrders;