const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress'); async function importPurchaseOrders(prodConnection, localConnection, incrementalUpdate = true) { const startTime = Date.now(); let recordsAdded = 0; let recordsUpdated = 0; try { // Begin transaction for the entire import process await localConnection.beginTransaction(); // Get last sync info const [syncInfo] = await localConnection.query( "SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'purchase_orders'" ); const lastSyncTime = syncInfo?.rows?.[0]?.last_sync_timestamp || '1970-01-01'; console.log('Purchase Orders: Using last sync time:', lastSyncTime); // Create temp tables await localConnection.query(` DROP TABLE IF EXISTS temp_purchase_orders; CREATE TABLE temp_purchase_orders ( po_id INTEGER NOT NULL, pid INTEGER NOT NULL, sku VARCHAR(50), name VARCHAR(255), vendor VARCHAR(255), date TIMESTAMP WITH TIME ZONE, expected_date TIMESTAMP WITH TIME ZONE, status INTEGER, notes TEXT, ordered INTEGER, cost_price DECIMAL(10,3), PRIMARY KEY (po_id, pid) ); `); // First get all relevant PO IDs with basic info - Keep MySQL compatible for production const [[{ total }]] = await prodConnection.query(` SELECT COUNT(*) as total FROM ( SELECT DISTINCT pop.po_id, pop.pid FROM po p JOIN po_products pop ON p.po_id = pop.po_id JOIN suppliers s ON p.supplier_id = s.supplierid WHERE p.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) ${incrementalUpdate ? ` AND ( p.date_updated > ? OR p.date_ordered > ? OR p.date_estin > ? ) ` : ''} ) all_items `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []); console.log('Purchase Orders: Found changes:', total); // Get PO list - Keep MySQL compatible for production console.log('Fetching purchase orders in batches...'); const FETCH_BATCH_SIZE = 5000; const INSERT_BATCH_SIZE = 200; // Process 200 records at a time for inserts let offset = 0; let allProcessed = false; let totalProcessed = 0; while (!allProcessed) { console.log(`Fetching batch at offset ${offset}...`); const [poList] = await prodConnection.query(` SELECT DISTINCT COALESCE(p.po_id, 0) as po_id, pop.pid, COALESCE(NULLIF(pr.itemnumber, ''), 'NO-SKU') as sku, COALESCE(pr.description, 'Unknown Product') as name, COALESCE(NULLIF(s.companyname, ''), 'Unknown Vendor') as vendor, COALESCE(p.date_ordered, p.date_created) as date, p.date_estin as expected_date, COALESCE(p.status, 1) as status, COALESCE(p.short_note, p.notes) as notes, pop.qty_each as ordered, pop.cost_each as cost_price FROM po p JOIN po_products pop ON p.po_id = pop.po_id JOIN products pr ON pop.pid = pr.pid LEFT JOIN suppliers s ON p.supplier_id = s.supplierid WHERE p.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) ${incrementalUpdate ? ` AND ( p.date_updated > ? OR p.date_ordered > ? OR p.date_estin > ? ) ` : ''} ORDER BY p.po_id, pop.pid LIMIT ${FETCH_BATCH_SIZE} OFFSET ${offset} `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []); if (poList.length === 0) { allProcessed = true; break; } console.log(`Processing batch of ${poList.length} purchase order items (${offset}-${offset + poList.length})`); // Process in smaller batches for inserts for (let i = 0; i < poList.length; i += INSERT_BATCH_SIZE) { const batch = poList.slice(i, Math.min(i + INSERT_BATCH_SIZE, poList.length)); // Create parameterized query with placeholders const placeholders = batch.map((_, idx) => { const base = idx * 11; // 11 columns return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, $${base + 10}, $${base + 11})`; }).join(','); // Create flattened values array const values = batch.flatMap(po => [ po.po_id, po.pid, po.sku, po.name, po.vendor, po.date, po.expected_date, po.status, po.notes, po.ordered, po.cost_price ]); // Execute batch insert await localConnection.query(` INSERT INTO temp_purchase_orders ( po_id, pid, sku, name, vendor, date, expected_date, status, notes, ordered, cost_price ) VALUES ${placeholders} ON CONFLICT (po_id, pid) DO UPDATE SET sku = EXCLUDED.sku, name = EXCLUDED.name, vendor = EXCLUDED.vendor, date = EXCLUDED.date, expected_date = EXCLUDED.expected_date, status = EXCLUDED.status, notes = EXCLUDED.notes, ordered = EXCLUDED.ordered, cost_price = EXCLUDED.cost_price `, values); totalProcessed += batch.length; outputProgress({ status: "running", operation: "Purchase orders import", message: `Processed ${totalProcessed}/${total} purchase order items`, current: totalProcessed, total: total, elapsed: formatElapsedTime((Date.now() - startTime) / 1000), remaining: estimateRemaining(startTime, totalProcessed, total), rate: calculateRate(startTime, totalProcessed) }); } // Update offset for next batch offset += poList.length; // Check if we've received fewer records than the batch size, meaning we're done if (poList.length < FETCH_BATCH_SIZE) { allProcessed = true; } } // Count the temp table contents const [tempCount] = await localConnection.query(`SELECT COUNT(*) FROM temp_purchase_orders`); const tempRowCount = parseInt(tempCount.rows[0].count); console.log(`Successfully inserted ${tempRowCount} rows into temp_purchase_orders`); // Now insert into the final table const [result] = await localConnection.query(` WITH inserted_pos AS ( INSERT INTO purchase_orders ( po_id, pid, sku, name, cost_price, po_cost_price, vendor, date, expected_date, status, notes, ordered, received, receiving_status ) SELECT po_id, pid, sku, name, cost_price, cost_price, vendor, date, expected_date, status, notes, ordered, 0 as received, 1 as receiving_status FROM temp_purchase_orders ON CONFLICT (po_id, pid) DO UPDATE SET vendor = EXCLUDED.vendor, date = EXCLUDED.date, expected_date = EXCLUDED.expected_date, status = EXCLUDED.status, notes = EXCLUDED.notes, ordered = EXCLUDED.ordered, cost_price = EXCLUDED.cost_price, po_cost_price = EXCLUDED.po_cost_price RETURNING xmax = 0 as inserted ) SELECT COUNT(*) FILTER (WHERE inserted) as inserted, COUNT(*) FILTER (WHERE NOT inserted) as updated FROM inserted_pos `); // Parse the result const { inserted, updated } = result.rows[0]; recordsAdded = parseInt(inserted) || 0; recordsUpdated = parseInt(updated) || 0; // Update sync status await localConnection.query(` INSERT INTO sync_status (table_name, last_sync_timestamp) VALUES ('purchase_orders', NOW()) ON CONFLICT (table_name) DO UPDATE SET last_sync_timestamp = NOW() `); // Clean up temporary tables await localConnection.query(`DROP TABLE IF EXISTS temp_purchase_orders;`); // Commit transaction await localConnection.commit(); return { status: "complete", recordsAdded: recordsAdded || 0, recordsUpdated: recordsUpdated || 0, totalRecords: totalProcessed }; } catch (error) { console.error("Error during purchase orders import:", error); // Rollback transaction try { await localConnection.rollback(); } catch (rollbackError) { console.error('Error during rollback:', rollbackError.message); } return { status: "error", error: error.message, recordsAdded: 0, recordsUpdated: 0, totalRecords: 0 }; } } module.exports = importPurchaseOrders;