const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress'); async function importPurchaseOrders(prodConnection, localConnection, incrementalUpdate = true) { const startTime = Date.now(); let recordsAdded = 0; let recordsUpdated = 0; try { // Get last sync info const [syncInfo] = await localConnection.query( "SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'purchase_orders'" ); const lastSyncTime = syncInfo?.rows?.[0]?.last_sync_timestamp || '1970-01-01'; console.log('Purchase Orders: Using last sync time:', lastSyncTime); // Create temp tables await localConnection.query(` DROP TABLE IF EXISTS temp_purchase_orders; CREATE TABLE temp_purchase_orders ( po_id INTEGER NOT NULL, pid INTEGER NOT NULL, sku VARCHAR(50), name VARCHAR(255), vendor VARCHAR(255), date TIMESTAMP WITH TIME ZONE, expected_date TIMESTAMP WITH TIME ZONE, status INTEGER, notes TEXT, ordered INTEGER, cost_price DECIMAL(10,3), PRIMARY KEY (po_id, pid) ); `); // First get all relevant PO IDs with basic info - Keep MySQL compatible for production const [[{ total }]] = await prodConnection.query(` SELECT COUNT(*) as total FROM ( SELECT DISTINCT pop.po_id, pop.pid FROM po p USE INDEX (idx_date_created) JOIN po_products pop ON p.po_id = pop.po_id JOIN suppliers s ON p.supplier_id = s.supplierid WHERE p.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) ${incrementalUpdate ? ` AND ( p.date_updated > ? OR p.date_ordered > ? OR p.date_estin > ? ) ` : ''} ) all_items `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []); console.log('Purchase Orders: Found changes:', total); // Get PO list - Keep MySQL compatible for production console.log('Fetching purchase orders in batches...'); const FETCH_BATCH_SIZE = 5000; let offset = 0; let allProcessed = false; let totalProcessed = 0; while (!allProcessed) { console.log(`Fetching batch at offset ${offset}...`); const [poList] = await prodConnection.query(` SELECT DISTINCT COALESCE(p.po_id, 0) as po_id, pop.pid, COALESCE(NULLIF(pr.itemnumber, ''), 'NO-SKU') as sku, COALESCE(pr.description, 'Unknown Product') as name, COALESCE(NULLIF(s.companyname, ''), 'Unknown Vendor') as vendor, COALESCE(p.date_ordered, p.date_created) as date, p.date_estin as expected_date, COALESCE(p.status, 1) as status, COALESCE(p.short_note, p.notes) as notes, pop.qty_each as ordered, pop.cost_each as cost_price FROM po p JOIN po_products pop ON p.po_id = pop.po_id JOIN products pr ON pop.pid = pr.pid LEFT JOIN suppliers s ON p.supplier_id = s.supplierid WHERE p.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) ${incrementalUpdate ? ` AND ( p.date_updated > ? OR p.date_ordered > ? OR p.date_estin > ? ) ` : ''} ORDER BY p.po_id, pop.pid LIMIT ${FETCH_BATCH_SIZE} OFFSET ${offset} `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []); if (poList.length === 0) { allProcessed = true; break; } console.log(`Processing batch of ${poList.length} purchase order items (${offset}-${offset + poList.length})`); let processed = 0; // Process each PO in a separate insert to avoid parameter issues for (let i = 0; i < poList.length; i++) { const po = poList[i]; try { // Single row insert await localConnection.query(` INSERT INTO temp_purchase_orders ( po_id, pid, sku, name, vendor, date, expected_date, status, notes, ordered, cost_price ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT (po_id, pid) DO UPDATE SET sku = EXCLUDED.sku, name = EXCLUDED.name, vendor = EXCLUDED.vendor, date = EXCLUDED.date, expected_date = EXCLUDED.expected_date, status = EXCLUDED.status, notes = EXCLUDED.notes, ordered = EXCLUDED.ordered, cost_price = EXCLUDED.cost_price `, [ po.po_id, po.pid, po.sku, po.name, po.vendor, po.date, po.expected_date, po.status, po.notes, po.ordered, po.cost_price ]); processed++; totalProcessed++; // Only log occasionally if (processed % 500 === 0 || processed === 1 || processed === poList.length) { outputProgress({ status: "running", operation: "Purchase orders import", message: `Batch ${Math.floor(offset/FETCH_BATCH_SIZE) + 1}: ${processed}/${poList.length} (Total: ${totalProcessed}/${total})`, current: totalProcessed, total: total, elapsed: formatElapsedTime((Date.now() - startTime) / 1000), remaining: estimateRemaining(startTime, totalProcessed, total), rate: calculateRate(startTime, totalProcessed) }); } } catch (error) { console.error(`Error inserting PO #${po.po_id} product #${po.pid}:`, error.message); console.log('PO data:', po); } } // Update offset for next batch offset += poList.length; // Check if we've received fewer records than the batch size, meaning we're done if (poList.length < FETCH_BATCH_SIZE) { allProcessed = true; } } // Count the temp table contents const [tempCount] = await localConnection.query(`SELECT COUNT(*) FROM temp_purchase_orders`); const tempRowCount = parseInt(tempCount.rows[0].count); console.log(`Successfully inserted ${tempRowCount} rows into temp_purchase_orders`); // Now insert into the final table const [result] = await localConnection.query(` WITH inserted_pos AS ( INSERT INTO purchase_orders ( po_id, pid, sku, name, cost_price, po_cost_price, vendor, date, expected_date, status, notes, ordered, received, receiving_status ) SELECT po_id, pid, sku, name, cost_price, cost_price, vendor, date, expected_date, status, notes, ordered, 0 as received, 1 as receiving_status FROM temp_purchase_orders ON CONFLICT (po_id, pid) DO UPDATE SET vendor = EXCLUDED.vendor, date = EXCLUDED.date, expected_date = EXCLUDED.expected_date, status = EXCLUDED.status, notes = EXCLUDED.notes, ordered = EXCLUDED.ordered, cost_price = EXCLUDED.cost_price, po_cost_price = EXCLUDED.po_cost_price RETURNING xmax = 0 as inserted ) SELECT COUNT(*) FILTER (WHERE inserted) as inserted, COUNT(*) FILTER (WHERE NOT inserted) as updated FROM inserted_pos `); // Parse the result const { inserted, updated } = result.rows[0]; recordsAdded = parseInt(inserted) || 0; recordsUpdated = parseInt(updated) || 0; // Update sync status await localConnection.query(` INSERT INTO sync_status (table_name, last_sync_timestamp) VALUES ('purchase_orders', NOW()) ON CONFLICT (table_name) DO UPDATE SET last_sync_timestamp = NOW() `); // Clean up temporary tables await localConnection.query(`DROP TABLE IF EXISTS temp_purchase_orders;`); return { status: "complete", recordsAdded: recordsAdded || 0, recordsUpdated: recordsUpdated || 0, totalRecords: totalProcessed }; } catch (error) { console.error("Error during purchase orders import:", error); // Attempt cleanup on error try { await localConnection.query(`DROP TABLE IF EXISTS temp_purchase_orders;`); } catch (cleanupError) { console.error('Error during cleanup:', cleanupError.message); } return { status: "error", error: error.message, recordsAdded: 0, recordsUpdated: 0, totalRecords: 0 }; } } module.exports = importPurchaseOrders;