const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress'); async function importPurchaseOrders(prodConnection, localConnection, incrementalUpdate = true) { const startTime = Date.now(); let recordsAdded = 0; let recordsUpdated = 0; try { // Get last sync info const [syncInfo] = await localConnection.query( "SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'purchase_orders'" ); const lastSyncTime = syncInfo?.[0]?.last_sync_timestamp || '1970-01-01'; console.log('Purchase Orders: Using last sync time:', lastSyncTime); // Insert temporary table creation query for purchase orders await localConnection.query(` CREATE TABLE IF NOT EXISTS temp_purchase_orders ( po_id INT UNSIGNED NOT NULL, pid INT UNSIGNED NOT NULL, vendor VARCHAR(255), date DATE, expected_date DATE, status INT, notes TEXT, PRIMARY KEY (po_id, pid) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; `); outputProgress({ operation: `Starting ${incrementalUpdate ? 'incremental' : 'full'} purchase orders import`, status: "running", }); // Get column names for the insert const [columns] = await localConnection.query(` SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'purchase_orders' ORDER BY ORDINAL_POSITION `); const columnNames = columns .map((col) => col.COLUMN_NAME) .filter((name) => name !== "id"); // Build incremental conditions const incrementalWhereClause = incrementalUpdate ? `AND ( p.date_updated > ? OR p.date_ordered > ? OR p.date_estin > ? OR r.date_updated > ? OR r.date_created > ? OR r.date_checked > ? OR rp.stamp > ? OR rp.received_date > ? )` : ""; const incrementalParams = incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []; // First get all relevant PO IDs with basic info const [[{ total }]] = await prodConnection.query(` SELECT COUNT(*) as total FROM ( SELECT DISTINCT pop.po_id, pop.pid FROM po p USE INDEX (idx_date_created) JOIN po_products pop ON p.po_id = pop.po_id JOIN suppliers s ON p.supplier_id = s.supplierid WHERE p.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) ${incrementalUpdate ? ` AND ( p.date_updated > ? OR p.date_ordered > ? OR p.date_estin > ? ) ` : ''} UNION SELECT DISTINCT r.receiving_id as po_id, rp.pid FROM receivings_products rp USE INDEX (received_date) LEFT JOIN receivings r ON r.receiving_id = rp.receiving_id WHERE rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) ${incrementalUpdate ? ` AND ( r.date_created > ? OR r.date_checked > ? OR rp.stamp > ? OR rp.received_date > ? ) ` : ''} ) all_items `, incrementalUpdate ? [ lastSyncTime, lastSyncTime, lastSyncTime, // PO conditions lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime // Receiving conditions ] : []); console.log('Purchase Orders: Found changes:', total); const [poList] = await prodConnection.query(` SELECT DISTINCT COALESCE(p.po_id, r.receiving_id) as po_id, COALESCE( NULLIF(s1.companyname, ''), NULLIF(s2.companyname, ''), 'Unknown Vendor' ) as vendor, CASE WHEN p.po_id IS NOT NULL THEN DATE(p.date_ordered) END as date, CASE WHEN p.po_id IS NOT NULL THEN DATE(p.date_estin) END as expected_date, COALESCE(p.status, 50) as status, COALESCE(p.short_note, '') as notes, COALESCE(p.notes, '') as long_note FROM ( SELECT po_id FROM po USE INDEX (idx_date_created) WHERE date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) AND (date_ordered > ? OR date_updated > ?) UNION SELECT DISTINCT r.receiving_id as po_id FROM receivings r JOIN receivings_products rp USE INDEX (received_date) ON r.receiving_id = rp.receiving_id WHERE rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) AND (rp.received_date > ? OR rp.stamp > ?) ) ids LEFT JOIN po p ON ids.po_id = p.po_id LEFT JOIN suppliers s1 ON p.supplier_id = s1.supplierid LEFT JOIN receivings r ON ids.po_id = r.receiving_id LEFT JOIN suppliers s2 ON r.supplier_id = s2.supplierid ORDER BY po_id `, [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime]); const totalItems = total; let processed = 0; const BATCH_SIZE = 5000; const PROGRESS_INTERVAL = 500; let lastProgressUpdate = Date.now(); outputProgress({ operation: `Starting purchase orders import - Processing ${totalItems} purchase order items`, status: "running", }); for (let i = 0; i < poList.length; i += BATCH_SIZE) { const batch = poList.slice(i, Math.min(i + BATCH_SIZE, poList.length)); const poIds = batch.map(po => po.po_id); // Get all products for these POs in one query const [poProducts] = await prodConnection.query(` SELECT pop.po_id, pop.pid, pr.itemnumber as sku, pop.cost_each as cost_price, pop.qty_each as ordered FROM po_products pop USE INDEX (PRIMARY) JOIN products pr ON pop.pid = pr.pid WHERE pop.po_id IN (?) `, [poIds]); // Process PO products in smaller sub-batches to avoid packet size issues const SUB_BATCH_SIZE = 5000; for (let j = 0; j < poProducts.length; j += SUB_BATCH_SIZE) { const productBatch = poProducts.slice(j, j + SUB_BATCH_SIZE); const productPids = [...new Set(productBatch.map(p => p.pid))]; const batchPoIds = [...new Set(productBatch.map(p => p.po_id))]; // Get receivings for this batch const [receivings] = await prodConnection.query(` SELECT r.po_id, rp.pid, rp.receiving_id, rp.qty_each, rp.cost_each, DATE(NULLIF(rp.received_date, '0000-00-00 00:00:00')) as received_date, rp.received_by, CASE WHEN r.po_id IS NULL THEN 2 -- No PO WHEN r.po_id IN (?) THEN 0 -- Original PO ELSE 1 -- Different PO END as is_alt_po FROM receivings_products rp USE INDEX (received_date) LEFT JOIN receivings r ON r.receiving_id = rp.receiving_id WHERE rp.pid IN (?) AND rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) ORDER BY r.po_id, rp.pid, rp.received_date `, [batchPoIds, productPids]); // Create maps for this sub-batch const poProductMap = new Map(); productBatch.forEach(product => { const key = `${product.po_id}-${product.pid}`; poProductMap.set(key, product); }); const receivingMap = new Map(); const altReceivingMap = new Map(); const noPOReceivingMap = new Map(); receivings.forEach(receiving => { const key = `${receiving.po_id}-${receiving.pid}`; if (receiving.is_alt_po === 2) { // No PO if (!noPOReceivingMap.has(receiving.pid)) { noPOReceivingMap.set(receiving.pid, []); } noPOReceivingMap.get(receiving.pid).push(receiving); } else if (receiving.is_alt_po === 1) { // Different PO if (!altReceivingMap.has(receiving.pid)) { altReceivingMap.set(receiving.pid, []); } altReceivingMap.get(receiving.pid).push(receiving); } else { // Original PO if (!receivingMap.has(key)) { receivingMap.set(key, []); } receivingMap.get(key).push(receiving); } }); // Verify PIDs exist const [existingPids] = await localConnection.query( 'SELECT pid FROM products WHERE pid IN (?)', [productPids] ); const validPids = new Set(existingPids.map(p => p.pid)); // Prepare values for this sub-batch const values = []; let batchProcessed = 0; // First check which PO lines already exist and get their current values const poLines = Array.from(poProductMap.values()) .filter(p => validPids.has(p.pid)) .map(p => [p.po_id, p.pid]); const [existingPOs] = await localConnection.query( `SELECT ${columnNames.join(',')} FROM purchase_orders WHERE (po_id, pid) IN (${poLines.map(() => "(?,?)").join(",")})`, poLines.flat() ); const existingPOMap = new Map( existingPOs.map(po => [`${po.po_id}-${po.pid}`, po]) ); // Split into inserts and updates const insertsAndUpdates = { inserts: [], updates: [] }; for (const po of batch) { const poProducts = Array.from(poProductMap.values()) .filter(p => p.po_id === po.po_id && validPids.has(p.pid)); for (const product of poProducts) { const key = `${po.po_id}-${product.pid}`; const receivingHistory = receivingMap.get(key) || []; const altReceivingHistory = altReceivingMap.get(product.pid) || []; const noPOReceivingHistory = noPOReceivingMap.get(product.pid) || []; // Combine all receivings and sort by date const allReceivings = [ ...receivingHistory.map(r => ({ ...r, type: 'original' })), ...altReceivingHistory.map(r => ({ ...r, type: 'alternate' })), ...noPOReceivingHistory.map(r => ({ ...r, type: 'no_po' })) ].sort((a, b) => new Date(a.received_date) - new Date(b.received_date)); // Track FIFO fulfillment let remainingToFulfill = product.ordered; const fulfillmentTracking = []; let totalReceived = 0; for (const receiving of allReceivings) { const qtyToApply = Math.min(remainingToFulfill, receiving.qty_each); if (qtyToApply > 0) { fulfillmentTracking.push({ receiving_id: receiving.receiving_id, qty_applied: qtyToApply, qty_total: receiving.qty_each, cost: receiving.cost_each, date: receiving.received_date, received_by: receiving.received_by, type: receiving.type, remaining_qty: receiving.qty_each - qtyToApply }); remainingToFulfill -= qtyToApply; } else { // Track excess receivings fulfillmentTracking.push({ receiving_id: receiving.receiving_id, qty_applied: 0, qty_total: receiving.qty_each, cost: receiving.cost_each, date: receiving.received_date, received_by: receiving.received_by, type: receiving.type, is_excess: true }); } totalReceived += receiving.qty_each; } const receiving_status = !totalReceived ? 1 : // created remainingToFulfill > 0 ? 30 : // partial 40; // full const firstReceiving = allReceivings[0] || {}; const lastReceiving = allReceivings[allReceivings.length - 1] || {}; const rowValues = columnNames.map(col => { switch (col) { case 'po_id': return po.po_id; case 'vendor': return po.vendor; case 'date': return po.date; case 'expected_date': return po.expected_date; case 'pid': return product.pid; case 'sku': return product.sku; case 'cost_price': return product.cost_price; case 'status': return po.status; case 'notes': return po.notes; case 'long_note': return po.long_note; case 'ordered': return product.ordered; case 'received': return totalReceived; case 'unfulfilled': return remainingToFulfill; case 'excess_received': return Math.max(0, totalReceived - product.ordered); case 'received_date': return firstReceiving.received_date || null; case 'last_received_date': return lastReceiving.received_date || null; case 'received_by': return firstReceiving.received_by || null; case 'receiving_status': return receiving_status; case 'receiving_history': return JSON.stringify({ fulfillment: fulfillmentTracking, ordered_qty: product.ordered, total_received: totalReceived, remaining_unfulfilled: remainingToFulfill, excess_received: Math.max(0, totalReceived - product.ordered) }); default: return null; } }); if (existingPOMap.has(key)) { const existing = existingPOMap.get(key); // Check if any values are different const hasChanges = columnNames.some(col => { const newVal = rowValues[columnNames.indexOf(col)]; const oldVal = existing[col] ?? null; // Special handling for numbers to avoid type coercion issues if (typeof newVal === 'number' && typeof oldVal === 'number') { return Math.abs(newVal - oldVal) > 0.00001; // Allow for tiny floating point differences } // Special handling for receiving_history - parse and compare if (col === 'receiving_history') { const newHistory = JSON.parse(newVal || '{}'); const oldHistory = JSON.parse(oldVal || '{}'); return JSON.stringify(newHistory) !== JSON.stringify(oldHistory); } return newVal !== oldVal; }); if (hasChanges) { insertsAndUpdates.updates.push({ po_id: po.po_id, pid: product.pid, values: rowValues }); } } else { insertsAndUpdates.inserts.push({ po_id: po.po_id, pid: product.pid, values: rowValues }); } batchProcessed++; } } // Handle inserts if (insertsAndUpdates.inserts.length > 0) { const insertPlaceholders = insertsAndUpdates.inserts .map(() => `(${Array(columnNames.length).fill("?").join(",")})`) .join(","); const insertResult = await localConnection.query(` INSERT INTO purchase_orders (${columnNames.join(",")}) VALUES ${insertPlaceholders} `, insertsAndUpdates.inserts.map(i => i.values).flat()); recordsAdded += insertResult[0].affectedRows; } // Handle updates - now we know these actually have changes if (insertsAndUpdates.updates.length > 0) { const updatePlaceholders = insertsAndUpdates.updates .map(() => `(${Array(columnNames.length).fill("?").join(",")})`) .join(","); const updateResult = await localConnection.query(` INSERT INTO purchase_orders (${columnNames.join(",")}) VALUES ${updatePlaceholders} ON DUPLICATE KEY UPDATE ${columnNames .filter((col) => col !== "po_id" && col !== "pid") .map((col) => `${col} = VALUES(${col})`) .join(",")}; `, insertsAndUpdates.updates.map(u => u.values).flat()); recordsUpdated += updateResult[0].affectedRows / 2; // Each update counts as 2 in affectedRows } processed += batchProcessed; // Update progress based on time interval const now = Date.now(); if (now - lastProgressUpdate >= PROGRESS_INTERVAL || processed === totalItems) { outputProgress({ status: "running", operation: "Purchase orders import", current: processed, total: totalItems, elapsed: formatElapsedTime((Date.now() - startTime) / 1000), remaining: estimateRemaining(startTime, processed, totalItems), rate: calculateRate(startTime, processed) }); lastProgressUpdate = now; } } } // Only update sync status if we get here (no errors thrown) await localConnection.query(` INSERT INTO sync_status (table_name, last_sync_timestamp) VALUES ('purchase_orders', NOW()) ON DUPLICATE KEY UPDATE last_sync_timestamp = NOW(), last_sync_id = LAST_INSERT_ID(last_sync_id) `); return { status: "complete", totalImported: totalItems, recordsAdded: recordsAdded || 0, recordsUpdated: recordsUpdated || 0, incrementalUpdate, lastSyncTime }; } catch (error) { outputProgress({ operation: `${incrementalUpdate ? 'Incremental' : 'Full'} purchase orders import failed`, status: "error", error: error.message, }); throw error; } } module.exports = importPurchaseOrders;