const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress'); async function importPurchaseOrders(prodConnection, localConnection) { const startTime = Date.now(); let importHistoryId; try { // Get last sync info const [syncInfo] = await localConnection.query( "SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'purchase_orders'" ); const lastSyncTime = syncInfo?.[0]?.last_sync_timestamp || '1970-01-01'; // Create import history record const [historyResult] = await localConnection.query(` INSERT INTO import_history ( table_name, start_time, is_incremental, status ) VALUES ( 'purchase_orders', NOW(), ?, 'running' ) `, [!!syncInfo?.[0]]); importHistoryId = historyResult.insertId; outputProgress({ operation: "Starting purchase orders import - Initializing", status: "running", }); // Get column names for the insert const [columns] = await localConnection.query(` SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'purchase_orders' ORDER BY ORDINAL_POSITION `); const columnNames = columns .map((col) => col.COLUMN_NAME) .filter((name) => name !== "id"); // First get all relevant PO IDs with basic info - modified for incremental const [[{ total }]] = await prodConnection.query(` SELECT COUNT(*) as total FROM ( SELECT DISTINCT pop.po_id, pop.pid FROM po p FORCE INDEX (idx_date_created) JOIN po_products pop ON p.po_id = pop.po_id JOIN suppliers s ON p.supplier_id = s.supplierid WHERE p.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR) AND (p.date_ordered > ? OR p.stamp > ? OR p.date_modified > ?) UNION SELECT DISTINCT r.receiving_id as po_id, rp.pid FROM receivings_products rp LEFT JOIN receivings r ON r.receiving_id = rp.receiving_id WHERE rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR) AND (rp.received_date > ? OR rp.stamp > ?) ) all_items `, [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime]); const [poList] = await prodConnection.query(` SELECT DISTINCT COALESCE(p.po_id, r.receiving_id) as po_id, CASE WHEN p.po_id IS NOT NULL THEN s1.companyname WHEN r.supplier_id IS NOT NULL THEN s2.companyname ELSE 'No Supplier' END as vendor, CASE WHEN p.po_id IS NOT NULL THEN DATE(p.date_ordered) END as date, CASE WHEN p.po_id IS NOT NULL THEN DATE(p.date_estin) END as expected_date, COALESCE(p.status, 50) as status, COALESCE(p.short_note, '') as notes, COALESCE(p.notes, '') as long_note FROM ( SELECT po_id FROM po WHERE date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR) AND (date_ordered > ? OR stamp > ? OR date_modified > ?) UNION SELECT DISTINCT r.receiving_id as po_id FROM receivings r JOIN receivings_products rp ON r.receiving_id = rp.receiving_id WHERE rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR) AND (rp.received_date > ? OR rp.stamp > ?) ) ids LEFT JOIN po p ON ids.po_id = p.po_id LEFT JOIN suppliers s1 ON p.supplier_id = s1.supplierid LEFT JOIN receivings r ON ids.po_id = r.receiving_id LEFT JOIN suppliers s2 ON r.supplier_id = s2.supplierid ORDER BY po_id `, [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime]); const totalItems = total; let processed = 0; let recordsAdded = 0; let recordsUpdated = 0; const BATCH_SIZE = 5000; const PROGRESS_INTERVAL = 500; let lastProgressUpdate = Date.now(); outputProgress({ operation: `Starting purchase orders import - Processing ${totalItems} purchase order items`, status: "running", }); for (let i = 0; i < poList.length; i += BATCH_SIZE) { const batch = poList.slice(i, Math.min(i + BATCH_SIZE, poList.length)); const poIds = batch.map(po => po.po_id); // Get all products for these POs in one query const [poProducts] = await prodConnection.query(` SELECT pop.po_id, pop.pid, pr.itemnumber as sku, pop.cost_each as cost_price, pop.qty_each as ordered FROM po_products pop FORCE INDEX (PRIMARY) JOIN products pr ON pop.pid = pr.pid WHERE pop.po_id IN (?) `, [poIds]); // Process PO products in smaller sub-batches to avoid packet size issues const SUB_BATCH_SIZE = 5000; for (let j = 0; j < poProducts.length; j += SUB_BATCH_SIZE) { const productBatch = poProducts.slice(j, j + SUB_BATCH_SIZE); const productPids = [...new Set(productBatch.map(p => p.pid))]; const batchPoIds = [...new Set(productBatch.map(p => p.po_id))]; // Get receivings for this batch const [receivings] = await prodConnection.query(` SELECT r.po_id, rp.pid, rp.receiving_id, rp.qty_each, rp.cost_each, DATE(NULLIF(rp.received_date, '0000-00-00 00:00:00')) as received_date, rp.received_by, CASE WHEN r.po_id IS NULL THEN 2 -- No PO WHEN r.po_id IN (?) THEN 0 -- Original PO ELSE 1 -- Different PO END as is_alt_po FROM receivings_products rp LEFT JOIN receivings r ON r.receiving_id = rp.receiving_id WHERE rp.pid IN (?) AND rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) ORDER BY r.po_id, rp.pid, rp.received_date `, [batchPoIds, productPids]); // Create maps for this sub-batch const poProductMap = new Map(); productBatch.forEach(product => { const key = `${product.po_id}-${product.pid}`; poProductMap.set(key, product); }); const receivingMap = new Map(); const altReceivingMap = new Map(); const noPOReceivingMap = new Map(); receivings.forEach(receiving => { const key = `${receiving.po_id}-${receiving.pid}`; if (receiving.is_alt_po === 2) { // No PO if (!noPOReceivingMap.has(receiving.pid)) { noPOReceivingMap.set(receiving.pid, []); } noPOReceivingMap.get(receiving.pid).push(receiving); } else if (receiving.is_alt_po === 1) { // Different PO if (!altReceivingMap.has(receiving.pid)) { altReceivingMap.set(receiving.pid, []); } altReceivingMap.get(receiving.pid).push(receiving); } else { // Original PO if (!receivingMap.has(key)) { receivingMap.set(key, []); } receivingMap.get(key).push(receiving); } }); // Verify PIDs exist const [existingPids] = await localConnection.query( 'SELECT pid FROM products WHERE pid IN (?)', [productPids] ); const validPids = new Set(existingPids.map(p => p.pid)); // Prepare values for this sub-batch const values = []; let batchProcessed = 0; for (const po of batch) { const poProducts = Array.from(poProductMap.values()) .filter(p => p.po_id === po.po_id && validPids.has(p.pid)); for (const product of poProducts) { const key = `${po.po_id}-${product.pid}`; const receivingHistory = receivingMap.get(key) || []; const altReceivingHistory = altReceivingMap.get(product.pid) || []; const noPOReceivingHistory = noPOReceivingMap.get(product.pid) || []; const received = receivingHistory.reduce((sum, r) => sum + r.qty_each, 0); const altReceived = altReceivingHistory.reduce((sum, r) => sum + r.qty_each, 0); const noPOReceived = noPOReceivingHistory.reduce((sum, r) => sum + r.qty_each, 0); const totalReceived = received + altReceived + noPOReceived; const receiving_status = !totalReceived ? 1 : // created totalReceived < product.ordered ? 30 : // partial 40; // full const allReceivings = [...receivingHistory]; if (altReceivingHistory.length > 0) { allReceivings.push(...altReceivingHistory); } if (noPOReceivingHistory.length > 0) { allReceivings.push(...noPOReceivingHistory); } allReceivings.sort((a, b) => new Date(a.received_date) - new Date(b.received_date)); const firstReceiving = allReceivings[0] || {}; const lastReceiving = allReceivings[allReceivings.length - 1] || {}; values.push(columnNames.map(col => { switch (col) { case 'po_id': return po.po_id; case 'vendor': return po.vendor; case 'date': return po.date; case 'expected_date': return po.expected_date; case 'pid': return product.pid; case 'sku': return product.sku; case 'cost_price': return product.cost_price; case 'status': return po.status; case 'notes': return po.notes; case 'long_note': return po.long_note; case 'ordered': return product.ordered; case 'received': return totalReceived; case 'received_date': return firstReceiving.received_date || null; case 'last_received_date': return lastReceiving.received_date || null; case 'received_by': return firstReceiving.received_by || null; case 'receiving_status': return receiving_status; case 'receiving_history': return JSON.stringify(allReceivings.map(r => ({ receiving_id: r.receiving_id, qty: r.qty_each, cost: r.cost_each, date: r.received_date, received_by: r.received_by, alt_po: r.is_alt_po }))); default: return null; } })); batchProcessed++; } } if (values.length > 0) { const placeholders = values.map(() => `(${Array(columnNames.length).fill("?").join(",")})` ).join(","); const query = ` INSERT INTO purchase_orders (${columnNames.join(",")}) VALUES ${placeholders} ON DUPLICATE KEY UPDATE ${columnNames .filter((col) => col !== "po_id" && col !== "pid") .map((col) => `${col} = VALUES(${col})`) .join(",")}; `; const result = await localConnection.query(query, values.flat()); recordsAdded += result.affectedRows - result.changedRows; recordsUpdated += result.changedRows; } processed += batchProcessed; // Update progress based on time interval const now = Date.now(); if (now - lastProgressUpdate >= PROGRESS_INTERVAL || processed === totalItems) { outputProgress({ status: "running", operation: "Purchase orders import", current: processed, total: totalItems, elapsed: formatElapsedTime((Date.now() - startTime) / 1000), remaining: estimateRemaining(startTime, processed, totalItems), rate: calculateRate(startTime, processed) }); lastProgressUpdate = now; } } } // After successful import, update sync status await localConnection.query(` INSERT INTO sync_status (table_name, last_sync_timestamp) VALUES ('purchase_orders', NOW()) ON DUPLICATE KEY UPDATE last_sync_timestamp = NOW() `); // Update import history with final stats const endTime = Date.now(); const durationSeconds = Math.round((endTime - startTime) / 1000); await localConnection.query(` UPDATE import_history SET end_time = NOW(), duration_seconds = ?, records_added = ?, records_updated = ?, status = 'completed', additional_info = JSON_OBJECT( 'total_processed', ?, 'last_sync_time', ?, 'next_sync_time', NOW() ) WHERE id = ? `, [durationSeconds, recordsAdded, recordsUpdated, totalItems, lastSyncTime, importHistoryId]); return { status: "complete", totalImported: totalItems, recordsAdded, recordsUpdated, durationSeconds, incrementalUpdate: !!syncInfo?.[0] }; } catch (error) { // Update import history with error if (importHistoryId) { await localConnection.query(` UPDATE import_history SET end_time = NOW(), duration_seconds = ?, status = 'failed', error_message = ? WHERE id = ? `, [Math.round((Date.now() - startTime) / 1000), error.message, importHistoryId]); } outputProgress({ operation: "Purchase orders import failed", status: "error", error: error.message, }); throw error; } } module.exports = importPurchaseOrders;