const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics-new/utils/progress'); /** * Validates a date from MySQL before inserting it into PostgreSQL * @param {string|Date|null} mysqlDate - Date string or object from MySQL * @returns {string|null} Valid date string or null if invalid */ function validateDate(mysqlDate) { // Handle null, undefined, or empty values if (!mysqlDate) { return null; } // Convert to string if it's not already const dateStr = String(mysqlDate); // Handle MySQL zero dates and empty values if (dateStr === '0000-00-00' || dateStr === '0000-00-00 00:00:00' || dateStr.indexOf('0000-00-00') !== -1 || dateStr === '') { return null; } // Check if the date is valid const date = new Date(mysqlDate); // If the date is invalid or suspiciously old (pre-1970), return null if (isNaN(date.getTime()) || date.getFullYear() < 1970) { return null; } return mysqlDate; } /** * Imports purchase orders and receivings from a production MySQL database to a local PostgreSQL database. * Implements FIFO allocation of receivings to purchase orders. * * @param {object} prodConnection - A MySQL connection to production DB * @param {object} localConnection - A PostgreSQL connection to local DB * @param {boolean} incrementalUpdate - Set to false for a full sync; true for incremental * @returns {object} Information about the sync operation */ async function importPurchaseOrders(prodConnection, localConnection, incrementalUpdate = true) { const startTime = Date.now(); let recordsAdded = 0; let recordsUpdated = 0; let totalProcessed = 0; // Batch size constants const PO_BATCH_SIZE = 500; const INSERT_BATCH_SIZE = 100; try { // Begin transaction for the entire import process await localConnection.beginTransaction(); // Get last sync info const [syncInfo] = await localConnection.query( "SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'purchase_orders'" ); const lastSyncTime = syncInfo?.rows?.[0]?.last_sync_timestamp || '1970-01-01'; console.log('Purchase Orders: Using last sync time:', lastSyncTime); // Create temp tables for processing await localConnection.query(` DROP TABLE IF EXISTS temp_purchase_orders; DROP TABLE IF EXISTS temp_receivings; DROP TABLE IF EXISTS temp_receiving_allocations; DROP TABLE IF EXISTS employee_names; -- Temporary table for purchase orders CREATE TEMP TABLE temp_purchase_orders ( po_id TEXT NOT NULL, pid BIGINT NOT NULL, sku TEXT, name TEXT, vendor TEXT, date TIMESTAMP WITH TIME ZONE, expected_date DATE, status TEXT, notes TEXT, long_note TEXT, ordered INTEGER, po_cost_price NUMERIC(14, 4), supplier_id INTEGER, date_created TIMESTAMP WITH TIME ZONE, date_ordered TIMESTAMP WITH TIME ZONE, PRIMARY KEY (po_id, pid) ); -- Temporary table for receivings CREATE TEMP TABLE temp_receivings ( receiving_id TEXT NOT NULL, po_id TEXT, pid BIGINT NOT NULL, qty_each INTEGER, cost_each NUMERIC(14, 4), received_by INTEGER, received_date TIMESTAMP WITH TIME ZONE, receiving_created_date TIMESTAMP WITH TIME ZONE, supplier_id INTEGER, status TEXT, PRIMARY KEY (receiving_id, pid) ); -- Temporary table for tracking FIFO allocations CREATE TEMP TABLE temp_receiving_allocations ( po_id TEXT NOT NULL, pid BIGINT NOT NULL, receiving_id TEXT NOT NULL, allocated_qty INTEGER NOT NULL, cost_each NUMERIC(14, 4) NOT NULL, received_date TIMESTAMP WITH TIME ZONE NOT NULL, received_by INTEGER, PRIMARY KEY (po_id, pid, receiving_id) ); -- Temporary table for employee names CREATE TEMP TABLE employee_names ( employeeid INTEGER PRIMARY KEY, firstname TEXT, lastname TEXT ); -- Create indexes for efficient joins CREATE INDEX idx_temp_po_pid ON temp_purchase_orders(pid); CREATE INDEX idx_temp_receiving_pid ON temp_receivings(pid); CREATE INDEX idx_temp_receiving_po_id ON temp_receivings(po_id); `); // Map status codes to text values const poStatusMap = { 0: 'canceled', 1: 'created', 10: 'electronically_ready_send', 11: 'ordered', 12: 'preordered', 13: 'electronically_sent', 15: 'receiving_started', 50: 'done' }; const receivingStatusMap = { 0: 'canceled', 1: 'created', 30: 'partial_received', 40: 'full_received', 50: 'paid' }; // Get time window for data retrieval const yearInterval = incrementalUpdate ? 1 : 5; // Fetch employee data from production outputProgress({ status: "running", operation: "Purchase orders import", message: "Fetching employee data" }); const [employees] = await prodConnection.query(` SELECT employeeid, firstname, lastname FROM employees `); // Insert employee data into temp table if (employees.length > 0) { const employeeValues = employees.map(emp => [ emp.employeeid, emp.firstname || '', emp.lastname || '' ]).flat(); const placeholders = employees.map((_, idx) => { const base = idx * 3; return `($${base + 1}, $${base + 2}, $${base + 3})`; }).join(','); await localConnection.query(` INSERT INTO employee_names (employeeid, firstname, lastname) VALUES ${placeholders} ON CONFLICT (employeeid) DO UPDATE SET firstname = EXCLUDED.firstname, lastname = EXCLUDED.lastname `, employeeValues); } // 1. First, fetch all relevant POs outputProgress({ status: "running", operation: "Purchase orders import", message: "Fetching purchase orders" }); const [poCount] = await prodConnection.query(` SELECT COUNT(*) as total FROM po p WHERE p.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL ${yearInterval} YEAR) ${incrementalUpdate ? ` AND ( p.date_updated > ? OR p.date_ordered > ? OR p.date_estin > ? ) ` : ''} `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []); const totalPOs = poCount[0].total; console.log(`Found ${totalPOs} relevant purchase orders`); // Fetch and process POs in batches let offset = 0; let allPOsProcessed = false; while (!allPOsProcessed) { const [poList] = await prodConnection.query(` SELECT p.po_id, p.supplier_id, s.companyname AS vendor, p.status, p.notes AS long_note, p.short_note AS notes, p.date_created, p.date_ordered, p.date_estin FROM po p LEFT JOIN suppliers s ON p.supplier_id = s.supplierid WHERE p.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL ${yearInterval} YEAR) ${incrementalUpdate ? ` AND ( p.date_updated > ? OR p.date_ordered > ? OR p.date_estin > ? ) ` : ''} ORDER BY p.po_id LIMIT ${PO_BATCH_SIZE} OFFSET ${offset} `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []); if (poList.length === 0) { allPOsProcessed = true; break; } // Get products for these POs const poIds = poList.map(po => po.po_id); const [poProducts] = await prodConnection.query(` SELECT pp.po_id, pp.pid, pp.qty_each, pp.cost_each, COALESCE(p.itemnumber, 'NO-SKU') AS sku, COALESCE(p.description, 'Unknown Product') AS name FROM po_products pp LEFT JOIN products p ON pp.pid = p.pid WHERE pp.po_id IN (?) `, [poIds]); // Build complete PO records const completePOs = []; for (const product of poProducts) { const po = poList.find(p => p.po_id == product.po_id); if (!po) continue; completePOs.push({ po_id: po.po_id.toString(), pid: product.pid, sku: product.sku, name: product.name, vendor: po.vendor || 'Unknown Vendor', date: validateDate(po.date_ordered) || validateDate(po.date_created), expected_date: validateDate(po.date_estin), status: poStatusMap[po.status] || 'created', notes: po.notes || '', long_note: po.long_note || '', ordered: product.qty_each, po_cost_price: product.cost_each, supplier_id: po.supplier_id, date_created: validateDate(po.date_created), date_ordered: validateDate(po.date_ordered) }); } // Insert PO data in batches for (let i = 0; i < completePOs.length; i += INSERT_BATCH_SIZE) { const batch = completePOs.slice(i, i + INSERT_BATCH_SIZE); const placeholders = batch.map((_, idx) => { const base = idx * 15; return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, $${base + 10}, $${base + 11}, $${base + 12}, $${base + 13}, $${base + 14}, $${base + 15})`; }).join(','); const values = batch.flatMap(po => [ po.po_id, po.pid, po.sku, po.name, po.vendor, po.date, po.expected_date, po.status, po.notes, po.long_note, po.ordered, po.po_cost_price, po.supplier_id, po.date_created, po.date_ordered ]); await localConnection.query(` INSERT INTO temp_purchase_orders ( po_id, pid, sku, name, vendor, date, expected_date, status, notes, long_note, ordered, po_cost_price, supplier_id, date_created, date_ordered ) VALUES ${placeholders} ON CONFLICT (po_id, pid) DO UPDATE SET sku = EXCLUDED.sku, name = EXCLUDED.name, vendor = EXCLUDED.vendor, date = EXCLUDED.date, expected_date = EXCLUDED.expected_date, status = EXCLUDED.status, notes = EXCLUDED.notes, long_note = EXCLUDED.long_note, ordered = EXCLUDED.ordered, po_cost_price = EXCLUDED.po_cost_price, supplier_id = EXCLUDED.supplier_id, date_created = EXCLUDED.date_created, date_ordered = EXCLUDED.date_ordered `, values); } offset += poList.length; totalProcessed += completePOs.length; outputProgress({ status: "running", operation: "Purchase orders import", message: `Processed ${offset} of ${totalPOs} purchase orders (${totalProcessed} line items)`, current: offset, total: totalPOs, elapsed: formatElapsedTime((Date.now() - startTime) / 1000), remaining: estimateRemaining(startTime, offset, totalPOs), rate: calculateRate(startTime, offset) }); if (poList.length < PO_BATCH_SIZE) { allPOsProcessed = true; } } // 2. Next, fetch all relevant receivings outputProgress({ status: "running", operation: "Purchase orders import", message: "Fetching receivings data" }); const [receivingCount] = await prodConnection.query(` SELECT COUNT(*) as total FROM receivings r WHERE r.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL ${yearInterval} YEAR) ${incrementalUpdate ? ` AND ( r.date_updated > ? OR r.date_created > ? ) ` : ''} `, incrementalUpdate ? [lastSyncTime, lastSyncTime] : []); const totalReceivings = receivingCount[0].total; console.log(`Found ${totalReceivings} relevant receivings`); // Fetch and process receivings in batches offset = 0; // Reset offset for receivings let allReceivingsProcessed = false; while (!allReceivingsProcessed) { const [receivingList] = await prodConnection.query(` SELECT r.receiving_id, r.po_id, r.supplier_id, r.status, r.date_created FROM receivings r WHERE r.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL ${yearInterval} YEAR) ${incrementalUpdate ? ` AND ( r.date_updated > ? OR r.date_created > ? ) ` : ''} ORDER BY r.receiving_id LIMIT ${PO_BATCH_SIZE} OFFSET ${offset} `, incrementalUpdate ? [lastSyncTime, lastSyncTime] : []); if (receivingList.length === 0) { allReceivingsProcessed = true; break; } // Get products for these receivings const receivingIds = receivingList.map(r => r.receiving_id); const [receivingProducts] = await prodConnection.query(` SELECT rp.receiving_id, rp.pid, rp.qty_each, rp.cost_each, rp.received_by, rp.received_date, r.date_created as receiving_created_date FROM receivings_products rp JOIN receivings r ON rp.receiving_id = r.receiving_id WHERE rp.receiving_id IN (?) `, [receivingIds]); // Build complete receiving records const completeReceivings = []; for (const product of receivingProducts) { const receiving = receivingList.find(r => r.receiving_id == product.receiving_id); if (!receiving) continue; completeReceivings.push({ receiving_id: receiving.receiving_id.toString(), po_id: receiving.po_id ? receiving.po_id.toString() : null, pid: product.pid, qty_each: product.qty_each, cost_each: product.cost_each, received_by: product.received_by, received_date: validateDate(product.received_date) || validateDate(product.receiving_created_date), receiving_created_date: validateDate(product.receiving_created_date), supplier_id: receiving.supplier_id, status: receivingStatusMap[receiving.status] || 'created' }); } // Insert receiving data in batches for (let i = 0; i < completeReceivings.length; i += INSERT_BATCH_SIZE) { const batch = completeReceivings.slice(i, i + INSERT_BATCH_SIZE); const placeholders = batch.map((_, idx) => { const base = idx * 10; return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, $${base + 10})`; }).join(','); const values = batch.flatMap(r => [ r.receiving_id, r.po_id, r.pid, r.qty_each, r.cost_each, r.received_by, r.received_date, r.receiving_created_date, r.supplier_id, r.status ]); await localConnection.query(` INSERT INTO temp_receivings ( receiving_id, po_id, pid, qty_each, cost_each, received_by, received_date, receiving_created_date, supplier_id, status ) VALUES ${placeholders} ON CONFLICT (receiving_id, pid) DO UPDATE SET po_id = EXCLUDED.po_id, qty_each = EXCLUDED.qty_each, cost_each = EXCLUDED.cost_each, received_by = EXCLUDED.received_by, received_date = EXCLUDED.received_date, receiving_created_date = EXCLUDED.receiving_created_date, supplier_id = EXCLUDED.supplier_id, status = EXCLUDED.status `, values); } offset += receivingList.length; totalProcessed += completeReceivings.length; outputProgress({ status: "running", operation: "Purchase orders import", message: `Processed ${offset} of ${totalReceivings} receivings (${totalProcessed} line items total)`, current: offset, total: totalReceivings, elapsed: formatElapsedTime((Date.now() - startTime) / 1000), remaining: estimateRemaining(startTime, offset, totalReceivings), rate: calculateRate(startTime, offset) }); if (receivingList.length < PO_BATCH_SIZE) { allReceivingsProcessed = true; } } // 3. Implement FIFO allocation of receivings to purchase orders outputProgress({ status: "running", operation: "Purchase orders import", message: "Validating product IDs before allocation" }); // Add this section to filter out invalid PIDs before allocation // This will check all PIDs in our temp tables against the products table await localConnection.query(` -- Create temp table to store invalid PIDs DROP TABLE IF EXISTS temp_invalid_pids; CREATE TEMP TABLE temp_invalid_pids AS ( -- Get all unique PIDs from our temp tables WITH all_pids AS ( SELECT DISTINCT pid FROM temp_purchase_orders UNION SELECT DISTINCT pid FROM temp_receivings ) -- Filter to only those that don't exist in products table SELECT p.pid FROM all_pids p WHERE NOT EXISTS ( SELECT 1 FROM products WHERE pid = p.pid ) ); -- Remove purchase orders with invalid PIDs DELETE FROM temp_purchase_orders WHERE pid IN (SELECT pid FROM temp_invalid_pids); -- Remove receivings with invalid PIDs DELETE FROM temp_receivings WHERE pid IN (SELECT pid FROM temp_invalid_pids); `); // Get count of filtered items for reporting const [filteredResult] = await localConnection.query(` SELECT COUNT(*) as count FROM temp_invalid_pids `); const filteredCount = filteredResult.rows[0].count; if (filteredCount > 0) { console.log(`Filtered out ${filteredCount} items with invalid product IDs`); } // Break FIFO allocation into steps with progress tracking const fifoSteps = [ { name: "Direct allocations", query: ` INSERT INTO temp_receiving_allocations ( po_id, pid, receiving_id, allocated_qty, cost_each, received_date, received_by ) SELECT r.po_id, r.pid, r.receiving_id, LEAST(r.qty_each, po.ordered) as allocated_qty, r.cost_each, COALESCE(r.received_date, NOW()) as received_date, r.received_by FROM temp_receivings r JOIN temp_purchase_orders po ON r.po_id = po.po_id AND r.pid = po.pid WHERE r.po_id IS NOT NULL ` }, { name: "Handling standalone receivings", query: ` INSERT INTO temp_purchase_orders ( po_id, pid, sku, name, vendor, date, status, ordered, po_cost_price, supplier_id, date_created, date_ordered ) SELECT r.receiving_id::text as po_id, r.pid, COALESCE(p.sku, 'NO-SKU') as sku, COALESCE(p.name, 'Unknown Product') as name, COALESCE( (SELECT vendor FROM temp_purchase_orders WHERE supplier_id = r.supplier_id LIMIT 1), 'Unknown Vendor' ) as vendor, COALESCE(r.received_date, r.receiving_created_date) as date, 'created' as status, NULL as ordered, r.cost_each as po_cost_price, r.supplier_id, COALESCE(r.receiving_created_date, r.received_date) as date_created, NULL as date_ordered FROM temp_receivings r LEFT JOIN ( SELECT DISTINCT pid, sku, name FROM temp_purchase_orders ) p ON r.pid = p.pid WHERE r.po_id IS NULL OR NOT EXISTS ( SELECT 1 FROM temp_purchase_orders po WHERE po.po_id = r.po_id AND po.pid = r.pid ) ON CONFLICT (po_id, pid) DO NOTHING ` }, { name: "Allocating standalone receivings", query: ` INSERT INTO temp_receiving_allocations ( po_id, pid, receiving_id, allocated_qty, cost_each, received_date, received_by ) SELECT r.receiving_id::text as po_id, r.pid, r.receiving_id, r.qty_each as allocated_qty, r.cost_each, COALESCE(r.received_date, NOW()) as received_date, r.received_by FROM temp_receivings r WHERE r.po_id IS NULL OR NOT EXISTS ( SELECT 1 FROM temp_purchase_orders po WHERE po.po_id = r.po_id AND po.pid = r.pid ) ` }, { name: "FIFO allocation logic", query: ` WITH -- Calculate remaining quantities after direct allocations remaining_po_quantities AS ( SELECT po.po_id, po.pid, po.ordered, COALESCE(SUM(ra.allocated_qty), 0) as already_allocated, po.ordered - COALESCE(SUM(ra.allocated_qty), 0) as remaining_qty, po.date_ordered, po.date_created FROM temp_purchase_orders po LEFT JOIN temp_receiving_allocations ra ON po.po_id = ra.po_id AND po.pid = ra.pid WHERE po.ordered IS NOT NULL GROUP BY po.po_id, po.pid, po.ordered, po.date_ordered, po.date_created HAVING po.ordered > COALESCE(SUM(ra.allocated_qty), 0) ), remaining_receiving_quantities AS ( SELECT r.receiving_id, r.pid, r.qty_each, COALESCE(SUM(ra.allocated_qty), 0) as already_allocated, r.qty_each - COALESCE(SUM(ra.allocated_qty), 0) as remaining_qty, r.received_date, r.cost_each, r.received_by FROM temp_receivings r LEFT JOIN temp_receiving_allocations ra ON r.receiving_id = ra.receiving_id AND r.pid = ra.pid GROUP BY r.receiving_id, r.pid, r.qty_each, r.received_date, r.cost_each, r.received_by HAVING r.qty_each > COALESCE(SUM(ra.allocated_qty), 0) ), -- Rank POs by age, with a cutoff for very old POs (1 year) ranked_pos AS ( SELECT po.po_id, po.pid, po.remaining_qty, CASE WHEN po.date_ordered IS NULL OR po.date_ordered < NOW() - INTERVAL '1 year' THEN 2 ELSE 1 END as age_group, ROW_NUMBER() OVER ( PARTITION BY po.pid, (CASE WHEN po.date_ordered IS NULL OR po.date_ordered < NOW() - INTERVAL '1 year' THEN 2 ELSE 1 END) ORDER BY COALESCE(po.date_ordered, po.date_created, NOW()) ) as rank_in_group FROM remaining_po_quantities po ), -- Rank receivings by date ranked_receivings AS ( SELECT r.receiving_id, r.pid, r.remaining_qty, r.received_date, r.cost_each, r.received_by, ROW_NUMBER() OVER (PARTITION BY r.pid ORDER BY COALESCE(r.received_date, NOW())) as rank FROM remaining_receiving_quantities r ), -- First allocate to recent POs allocations_recent AS ( SELECT po.po_id, po.pid, r.receiving_id, LEAST(po.remaining_qty, r.remaining_qty) as allocated_qty, r.cost_each, COALESCE(r.received_date, NOW()) as received_date, r.received_by, po.age_group, po.rank_in_group, r.rank, 'recent' as allocation_type FROM ranked_pos po JOIN ranked_receivings r ON po.pid = r.pid WHERE po.age_group = 1 ORDER BY po.pid, po.rank_in_group, r.rank ), -- Then allocate to older POs remaining_after_recent AS ( SELECT r.receiving_id, r.pid, r.remaining_qty - COALESCE(SUM(a.allocated_qty), 0) as remaining_qty, r.received_date, r.cost_each, r.received_by, r.rank FROM ranked_receivings r LEFT JOIN allocations_recent a ON r.receiving_id = a.receiving_id AND r.pid = a.pid GROUP BY r.receiving_id, r.pid, r.remaining_qty, r.received_date, r.cost_each, r.received_by, r.rank HAVING r.remaining_qty > COALESCE(SUM(a.allocated_qty), 0) ), allocations_old AS ( SELECT po.po_id, po.pid, r.receiving_id, LEAST(po.remaining_qty, r.remaining_qty) as allocated_qty, r.cost_each, COALESCE(r.received_date, NOW()) as received_date, r.received_by, po.age_group, po.rank_in_group, r.rank, 'old' as allocation_type FROM ranked_pos po JOIN remaining_after_recent r ON po.pid = r.pid WHERE po.age_group = 2 ORDER BY po.pid, po.rank_in_group, r.rank ), -- Combine allocations combined_allocations AS ( SELECT * FROM allocations_recent UNION ALL SELECT * FROM allocations_old ) -- Insert into allocations table INSERT INTO temp_receiving_allocations ( po_id, pid, receiving_id, allocated_qty, cost_each, received_date, received_by ) SELECT po_id, pid, receiving_id, allocated_qty, cost_each, COALESCE(received_date, NOW()) as received_date, received_by FROM combined_allocations WHERE allocated_qty > 0 ` } ]; // Execute FIFO steps with progress tracking for (let i = 0; i < fifoSteps.length; i++) { const step = fifoSteps[i]; outputProgress({ status: "running", operation: "Purchase orders import", message: `FIFO allocation step ${i+1}/${fifoSteps.length}: ${step.name}`, current: i, total: fifoSteps.length }); await localConnection.query(step.query); } // 4. Generate final purchase order records with receiving data outputProgress({ status: "running", operation: "Purchase orders import", message: "Generating final purchase order records" }); const [finalResult] = await localConnection.query(` WITH receiving_summaries AS ( SELECT po_id, pid, SUM(allocated_qty) as total_received, JSONB_AGG( JSONB_BUILD_OBJECT( 'receiving_id', receiving_id, 'qty', allocated_qty, 'date', COALESCE(received_date, NOW()), 'cost', cost_each, 'received_by', received_by, 'received_by_name', CASE WHEN received_by IS NOT NULL AND received_by > 0 THEN (SELECT CONCAT(firstname, ' ', lastname) FROM employee_names WHERE employeeid = received_by) ELSE NULL END ) ORDER BY COALESCE(received_date, NOW()) ) as receiving_history, MIN(COALESCE(received_date, NOW())) as first_received_date, MAX(COALESCE(received_date, NOW())) as last_received_date, STRING_AGG( DISTINCT CASE WHEN received_by IS NOT NULL AND received_by > 0 THEN CAST(received_by AS TEXT) ELSE NULL END, ',' ) as received_by_list, STRING_AGG( DISTINCT CASE WHEN ra.received_by IS NOT NULL AND ra.received_by > 0 THEN (SELECT CONCAT(firstname, ' ', lastname) FROM employee_names WHERE employeeid = ra.received_by) ELSE NULL END, ', ' ) as received_by_names FROM temp_receiving_allocations ra GROUP BY po_id, pid ), cost_averaging AS ( SELECT ra.po_id, ra.pid, SUM(ra.allocated_qty * ra.cost_each) / NULLIF(SUM(ra.allocated_qty), 0) as avg_cost FROM temp_receiving_allocations ra GROUP BY ra.po_id, ra.pid ) INSERT INTO purchase_orders ( po_id, vendor, date, expected_date, pid, sku, name, cost_price, po_cost_price, status, receiving_status, notes, long_note, ordered, received, received_date, last_received_date, received_by, receiving_history ) SELECT po.po_id, po.vendor, CASE WHEN po.date IS NOT NULL THEN po.date -- For standalone receivings, try to use the receiving date from history WHEN po.po_id LIKE 'R%' AND rs.first_received_date IS NOT NULL THEN rs.first_received_date -- As a last resort for data integrity, use Unix epoch (Jan 1, 1970) ELSE to_timestamp(0) END as date, NULLIF(po.expected_date::text, '0000-00-00')::date as expected_date, po.pid, po.sku, po.name, COALESCE(ca.avg_cost, po.po_cost_price) as cost_price, po.po_cost_price, COALESCE(po.status, 'created'), CASE WHEN rs.total_received IS NULL THEN 'created' WHEN rs.total_received = 0 THEN 'created' WHEN rs.total_received < po.ordered THEN 'partial_received' WHEN rs.total_received >= po.ordered THEN 'full_received' ELSE 'created' END as receiving_status, po.notes, po.long_note, COALESCE(po.ordered, 0), COALESCE(rs.total_received, 0), NULLIF(rs.first_received_date::text, '0000-00-00 00:00:00')::timestamp with time zone as received_date, NULLIF(rs.last_received_date::text, '0000-00-00 00:00:00')::timestamp with time zone as last_received_date, CASE WHEN rs.received_by_list IS NULL THEN NULL ELSE rs.received_by_names END as received_by, rs.receiving_history FROM temp_purchase_orders po LEFT JOIN receiving_summaries rs ON po.po_id = rs.po_id AND po.pid = rs.pid LEFT JOIN cost_averaging ca ON po.po_id = ca.po_id AND po.pid = ca.pid ON CONFLICT (po_id, pid) DO UPDATE SET vendor = EXCLUDED.vendor, date = EXCLUDED.date, expected_date = EXCLUDED.expected_date, sku = EXCLUDED.sku, name = EXCLUDED.name, cost_price = EXCLUDED.cost_price, po_cost_price = EXCLUDED.po_cost_price, status = EXCLUDED.status, receiving_status = EXCLUDED.receiving_status, notes = EXCLUDED.notes, long_note = EXCLUDED.long_note, ordered = EXCLUDED.ordered, received = EXCLUDED.received, received_date = EXCLUDED.received_date, last_received_date = EXCLUDED.last_received_date, received_by = EXCLUDED.received_by, receiving_history = EXCLUDED.receiving_history, updated = CURRENT_TIMESTAMP RETURNING (xmax = 0) as inserted `); recordsAdded = finalResult.rows.filter(r => r.inserted).length; recordsUpdated = finalResult.rows.filter(r => !r.inserted).length; // Update sync status await localConnection.query(` INSERT INTO sync_status (table_name, last_sync_timestamp) VALUES ('purchase_orders', NOW()) ON CONFLICT (table_name) DO UPDATE SET last_sync_timestamp = NOW() `); // Clean up temporary tables await localConnection.query(` DROP TABLE IF EXISTS temp_purchase_orders; DROP TABLE IF EXISTS temp_receivings; DROP TABLE IF EXISTS temp_receiving_allocations; DROP TABLE IF EXISTS employee_names; `); // Commit transaction await localConnection.commit(); return { status: "complete", recordsAdded: recordsAdded || 0, recordsUpdated: recordsUpdated || 0, totalRecords: totalProcessed }; } catch (error) { console.error("Error during purchase orders import:", error); // Rollback transaction try { await localConnection.rollback(); } catch (rollbackError) { console.error('Error during rollback:', rollbackError.message); } return { status: "error", error: error.message, recordsAdded: 0, recordsUpdated: 0, totalRecords: 0 }; } } module.exports = importPurchaseOrders;