const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress'); async function importPurchaseOrders(prodConnection, localConnection, incrementalUpdate = true) { const startTime = Date.now(); let recordsAdded = 0; let recordsUpdated = 0; try { // Get last sync info const [syncInfo] = await localConnection.query( "SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'purchase_orders'" ); const lastSyncTime = syncInfo?.[0]?.last_sync_timestamp || '1970-01-01'; console.log('Purchase Orders: Using last sync time:', lastSyncTime); // Insert temporary table creation query for purchase orders await localConnection.query(` CREATE TABLE IF NOT EXISTS temp_purchase_orders ( po_id INT UNSIGNED NOT NULL, pid INT UNSIGNED NOT NULL, vendor VARCHAR(255), date DATE, expected_date DATE, status INT, notes TEXT, PRIMARY KEY (po_id, pid) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; `); outputProgress({ operation: `Starting ${incrementalUpdate ? 'incremental' : 'full'} purchase orders import`, status: "running", }); // Get column names for the insert const [columns] = await localConnection.query(` SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'purchase_orders' ORDER BY ORDINAL_POSITION `); const columnNames = columns .map((col) => col.COLUMN_NAME) .filter((name) => name !== "id"); // Build incremental conditions const incrementalWhereClause = incrementalUpdate ? `AND ( p.date_updated > ? OR p.date_ordered > ? OR p.date_estin > ? OR r.date_updated > ? OR r.date_created > ? OR r.date_checked > ? OR rp.stamp > ? OR rp.received_date > ? )` : ""; const incrementalParams = incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []; // First get all relevant PO IDs with basic info const [[{ total }]] = await prodConnection.query(` SELECT COUNT(*) as total FROM ( SELECT DISTINCT pop.po_id, pop.pid FROM po p USE INDEX (idx_date_created) JOIN po_products pop ON p.po_id = pop.po_id JOIN suppliers s ON p.supplier_id = s.supplierid WHERE p.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) ${incrementalUpdate ? ` AND ( p.date_updated > ? OR p.date_ordered > ? OR p.date_estin > ? ) ` : ''} UNION SELECT DISTINCT r.receiving_id as po_id, rp.pid FROM receivings_products rp USE INDEX (received_date) LEFT JOIN receivings r ON r.receiving_id = rp.receiving_id WHERE rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) ${incrementalUpdate ? ` AND ( r.date_created > ? OR r.date_checked > ? OR rp.stamp > ? OR rp.received_date > ? ) ` : ''} ) all_items `, incrementalUpdate ? [ lastSyncTime, lastSyncTime, lastSyncTime, // PO conditions lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime // Receiving conditions ] : []); console.log('Purchase Orders: Found changes:', total); const [poList] = await prodConnection.query(` SELECT DISTINCT COALESCE(p.po_id, r.receiving_id) as po_id, COALESCE( NULLIF(s1.companyname, ''), NULLIF(s2.companyname, ''), 'Unknown Vendor' ) as vendor, CASE WHEN p.po_id IS NOT NULL THEN DATE(COALESCE( NULLIF(p.date_ordered, '0000-00-00 00:00:00'), p.date_created )) WHEN r.receiving_id IS NOT NULL THEN DATE(r.date_created) END as date, CASE WHEN p.date_estin = '0000-00-00' THEN NULL WHEN p.date_estin IS NULL THEN NULL WHEN p.date_estin NOT REGEXP '^[0-9]{4}-[0-9]{2}-[0-9]{2}$' THEN NULL ELSE p.date_estin END as expected_date, COALESCE(p.status, 50) as status, p.short_note as notes, p.notes as long_note FROM ( SELECT po_id FROM po USE INDEX (idx_date_created) WHERE date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) ${incrementalUpdate ? ` AND ( date_ordered > ? OR date_updated > ? OR date_estin > ? ) ` : ''} UNION SELECT DISTINCT r.receiving_id as po_id FROM receivings r JOIN receivings_products rp USE INDEX (received_date) ON r.receiving_id = rp.receiving_id WHERE rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) ${incrementalUpdate ? ` AND ( r.date_created > ? OR r.date_checked > ? OR rp.stamp > ? OR rp.received_date > ? ) ` : ''} ) ids LEFT JOIN po p ON ids.po_id = p.po_id LEFT JOIN suppliers s1 ON p.supplier_id = s1.supplierid LEFT JOIN receivings r ON ids.po_id = r.receiving_id LEFT JOIN suppliers s2 ON r.supplier_id = s2.supplierid ORDER BY po_id `, incrementalUpdate ? [ lastSyncTime, lastSyncTime, lastSyncTime, // PO conditions lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime // Receiving conditions ] : []); console.log('Sample PO dates:', poList.slice(0, 5).map(po => ({ po_id: po.po_id, raw_date_ordered: po.raw_date_ordered, raw_date_created: po.raw_date_created, raw_date_estin: po.raw_date_estin, computed_date: po.date, expected_date: po.expected_date }))); const totalItems = total; let processed = 0; const BATCH_SIZE = 5000; const PROGRESS_INTERVAL = 500; let lastProgressUpdate = Date.now(); outputProgress({ operation: `Starting purchase orders import - Processing ${totalItems} purchase order items`, status: "running", }); for (let i = 0; i < poList.length; i += BATCH_SIZE) { const batch = poList.slice(i, Math.min(i + BATCH_SIZE, poList.length)); const poIds = batch.map(po => po.po_id); // Get all products for these POs in one query const [poProducts] = await prodConnection.query(` SELECT pop.po_id, pop.pid, pr.itemnumber as sku, pr.description as name, pop.cost_each, pop.qty_each as ordered FROM po_products pop USE INDEX (PRIMARY) JOIN products pr ON pop.pid = pr.pid WHERE pop.po_id IN (?) `, [poIds]); // Process PO products in smaller sub-batches to avoid packet size issues const SUB_BATCH_SIZE = 5000; for (let j = 0; j < poProducts.length; j += SUB_BATCH_SIZE) { const productBatch = poProducts.slice(j, j + SUB_BATCH_SIZE); const productPids = [...new Set(productBatch.map(p => p.pid))]; const batchPoIds = [...new Set(productBatch.map(p => p.po_id))]; // Get receivings for this batch with employee names const [receivings] = await prodConnection.query(` SELECT r.po_id, rp.pid, rp.receiving_id, rp.qty_each, rp.cost_each, COALESCE(rp.received_date, r.date_created) as received_date, rp.received_by, CONCAT(e.firstname, ' ', e.lastname) as received_by_name, CASE WHEN r.po_id IS NULL THEN 2 -- No PO WHEN r.po_id IN (?) THEN 0 -- Original PO ELSE 1 -- Different PO END as is_alt_po FROM receivings_products rp USE INDEX (received_date) LEFT JOIN receivings r ON r.receiving_id = rp.receiving_id LEFT JOIN employees e ON rp.received_by = e.employeeid WHERE rp.pid IN (?) AND rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR) ORDER BY r.po_id, rp.pid, rp.received_date `, [batchPoIds, productPids]); // Create maps for this sub-batch const poProductMap = new Map(); productBatch.forEach(product => { const key = `${product.po_id}-${product.pid}`; poProductMap.set(key, product); }); const receivingMap = new Map(); const altReceivingMap = new Map(); const noPOReceivingMap = new Map(); receivings.forEach(receiving => { const key = `${receiving.po_id}-${receiving.pid}`; if (receiving.is_alt_po === 2) { // No PO if (!noPOReceivingMap.has(receiving.pid)) { noPOReceivingMap.set(receiving.pid, []); } noPOReceivingMap.get(receiving.pid).push(receiving); } else if (receiving.is_alt_po === 1) { // Different PO if (!altReceivingMap.has(receiving.pid)) { altReceivingMap.set(receiving.pid, []); } altReceivingMap.get(receiving.pid).push(receiving); } else { // Original PO if (!receivingMap.has(key)) { receivingMap.set(key, []); } receivingMap.get(key).push(receiving); } }); // Verify PIDs exist const [existingPids] = await localConnection.query( 'SELECT pid FROM products WHERE pid IN (?)', [productPids] ); const validPids = new Set(existingPids.map(p => p.pid)); // First check which PO lines already exist and get their current values const poLines = Array.from(poProductMap.values()) .filter(p => validPids.has(p.pid)) .map(p => [p.po_id, p.pid]); const [existingPOs] = await localConnection.query( `SELECT ${columnNames.join(',')} FROM purchase_orders WHERE (po_id, pid) IN (${poLines.map(() => "(?,?)").join(",")})`, poLines.flat() ); const existingPOMap = new Map( existingPOs.map(po => [`${po.po_id}-${po.pid}`, po]) ); // Split into inserts and updates const insertsAndUpdates = { inserts: [], updates: [] }; let batchProcessed = 0; for (const po of batch) { const poProducts = Array.from(poProductMap.values()) .filter(p => p.po_id === po.po_id && validPids.has(p.pid)); for (const product of poProducts) { const key = `${po.po_id}-${product.pid}`; const receivingHistory = receivingMap.get(key) || []; const altReceivingHistory = altReceivingMap.get(product.pid) || []; const noPOReceivingHistory = noPOReceivingMap.get(product.pid) || []; // Combine all receivings and sort by date const allReceivings = [ ...receivingHistory.map(r => ({ ...r, type: 'original' })), ...altReceivingHistory.map(r => ({ ...r, type: 'alternate' })), ...noPOReceivingHistory.map(r => ({ ...r, type: 'no_po' })) ].sort((a, b) => new Date(a.received_date || '9999-12-31') - new Date(b.received_date || '9999-12-31')); // Split receivings into original PO and others const originalPOReceivings = allReceivings.filter(r => r.type === 'original'); const otherReceivings = allReceivings.filter(r => r.type !== 'original'); // Track FIFO fulfillment let remainingToFulfill = product.ordered; const fulfillmentTracking = []; let totalReceived = 0; let actualCost = null; // Will store the cost of the first receiving that fulfills this PO let firstFulfillmentReceiving = null; let lastFulfillmentReceiving = null; for (const receiving of allReceivings) { const qtyToApply = Math.min(remainingToFulfill, receiving.qty_each); if (qtyToApply > 0) { // If this is the first receiving being applied, use its cost if (actualCost === null) { actualCost = receiving.cost_each; firstFulfillmentReceiving = receiving; } lastFulfillmentReceiving = receiving; fulfillmentTracking.push({ receiving_id: receiving.receiving_id, qty_applied: qtyToApply, qty_total: receiving.qty_each, cost: receiving.cost_each, date: receiving.received_date, received_by: receiving.received_by, received_by_name: receiving.received_by_name || 'Unknown', type: receiving.type, remaining_qty: receiving.qty_each - qtyToApply }); remainingToFulfill -= qtyToApply; } else { // Track excess receivings fulfillmentTracking.push({ receiving_id: receiving.receiving_id, qty_applied: 0, qty_total: receiving.qty_each, cost: receiving.cost_each, date: receiving.received_date, received_by: receiving.received_by, received_by_name: receiving.received_by_name || 'Unknown', type: receiving.type, is_excess: true }); } totalReceived += receiving.qty_each; } const receiving_status = !totalReceived ? 1 : // created remainingToFulfill > 0 ? 30 : // partial 40; // full function formatDate(dateStr) { if (!dateStr) return null; if (dateStr === '0000-00-00' || dateStr === '0000-00-00 00:00:00') return null; if (typeof dateStr === 'string' && !dateStr.match(/^\d{4}-\d{2}-\d{2}/)) return null; try { const date = new Date(dateStr); if (isNaN(date.getTime())) return null; if (date.getFullYear() < 1900 || date.getFullYear() > 2100) return null; return date.toISOString().split('T')[0]; } catch (e) { return null; } } const rowValues = columnNames.map(col => { switch (col) { case 'po_id': return po.po_id; case 'vendor': return po.vendor; case 'date': return formatDate(po.date); case 'expected_date': return formatDate(po.expected_date); case 'pid': return product.pid; case 'sku': return product.sku; case 'name': return product.name; case 'cost_price': return actualCost || product.cost_each; case 'po_cost_price': return product.cost_each; case 'status': return po.status; case 'notes': return po.notes; case 'long_note': return po.long_note; case 'ordered': return product.ordered; case 'received': return totalReceived; case 'unfulfilled': return remainingToFulfill; case 'excess_received': return Math.max(0, totalReceived - product.ordered); case 'received_date': return formatDate(firstFulfillmentReceiving?.received_date); case 'last_received_date': return formatDate(lastFulfillmentReceiving?.received_date); case 'received_by': return firstFulfillmentReceiving?.received_by_name || null; case 'receiving_status': return receiving_status; case 'receiving_history': return JSON.stringify({ fulfillment: fulfillmentTracking, ordered_qty: product.ordered, total_received: totalReceived, remaining_unfulfilled: remainingToFulfill, excess_received: Math.max(0, totalReceived - product.ordered), po_cost: product.cost_each, actual_cost: actualCost || product.cost_each }); default: return null; } }); if (existingPOMap.has(key)) { const existing = existingPOMap.get(key); // Check if any values are different const hasChanges = columnNames.some(col => { const newVal = rowValues[columnNames.indexOf(col)]; const oldVal = existing[col] ?? null; // Special handling for numbers to avoid type coercion issues if (typeof newVal === 'number' && typeof oldVal === 'number') { return Math.abs(newVal - oldVal) > 0.00001; // Allow for tiny floating point differences } // Special handling for receiving_history - parse and compare if (col === 'receiving_history') { const newHistory = JSON.parse(newVal || '{}'); const oldHistory = JSON.parse(oldVal || '{}'); return JSON.stringify(newHistory) !== JSON.stringify(oldHistory); } return newVal !== oldVal; }); if (hasChanges) { insertsAndUpdates.updates.push({ po_id: po.po_id, pid: product.pid, values: rowValues }); } } else { insertsAndUpdates.inserts.push({ po_id: po.po_id, pid: product.pid, values: rowValues }); } batchProcessed++; } } // Handle inserts if (insertsAndUpdates.inserts.length > 0) { const insertPlaceholders = insertsAndUpdates.inserts .map(() => `(${Array(columnNames.length).fill("?").join(",")})`) .join(","); const insertResult = await localConnection.query(` INSERT INTO purchase_orders (${columnNames.join(",")}) VALUES ${insertPlaceholders} `, insertsAndUpdates.inserts.map(i => i.values).flat()); recordsAdded += insertResult[0].affectedRows; } // Handle updates - now we know these actually have changes if (insertsAndUpdates.updates.length > 0) { const updatePlaceholders = insertsAndUpdates.updates .map(() => `(${Array(columnNames.length).fill("?").join(",")})`) .join(","); const updateResult = await localConnection.query(` INSERT INTO purchase_orders (${columnNames.join(",")}) VALUES ${updatePlaceholders} ON DUPLICATE KEY UPDATE ${columnNames .filter((col) => col !== "po_id" && col !== "pid") .map((col) => `${col} = VALUES(${col})`) .join(",")}; `, insertsAndUpdates.updates.map(u => u.values).flat()); recordsUpdated += updateResult[0].affectedRows / 2; // Each update counts as 2 in affectedRows } processed += batchProcessed; // Update progress based on time interval const now = Date.now(); if (now - lastProgressUpdate >= PROGRESS_INTERVAL || processed === totalItems) { outputProgress({ status: "running", operation: "Purchase orders import", current: processed, total: totalItems, elapsed: formatElapsedTime((Date.now() - startTime) / 1000), remaining: estimateRemaining(startTime, processed, totalItems), rate: calculateRate(startTime, processed) }); lastProgressUpdate = now; } } } // Only update sync status if we get here (no errors thrown) await localConnection.query(` INSERT INTO sync_status (table_name, last_sync_timestamp) VALUES ('purchase_orders', NOW()) ON DUPLICATE KEY UPDATE last_sync_timestamp = NOW(), last_sync_id = LAST_INSERT_ID(last_sync_id) `); return { status: "complete", totalImported: totalItems, recordsAdded: recordsAdded || 0, recordsUpdated: recordsUpdated || 0, incrementalUpdate, lastSyncTime }; } catch (error) { outputProgress({ operation: `${incrementalUpdate ? 'Incremental' : 'Full'} purchase orders import failed`, status: "error", error: error.message, }); throw error; } } module.exports = importPurchaseOrders;