const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics-new/utils/progress'); /** * Validates a date from MySQL before inserting it into PostgreSQL * @param {string|Date|null} mysqlDate - Date string or object from MySQL * @returns {string|null} Valid date string or null if invalid */ function validateDate(mysqlDate) { // Handle null, undefined, or empty values if (!mysqlDate) { return null; } // Convert to string if it's not already const dateStr = String(mysqlDate); // Handle MySQL zero dates and empty values if (dateStr === '0000-00-00' || dateStr === '0000-00-00 00:00:00' || dateStr.indexOf('0000-00-00') !== -1 || dateStr === '') { return null; } // Check if the date is valid const date = new Date(mysqlDate); // If the date is invalid or suspiciously old (pre-1970), return null if (isNaN(date.getTime()) || date.getFullYear() < 1970) { return null; } return mysqlDate; } /** * Imports purchase orders and receivings from a production MySQL database to a local PostgreSQL database. * Handles these as separate data streams without complex FIFO allocation. * * @param {object} prodConnection - A MySQL connection to production DB * @param {object} localConnection - A PostgreSQL connection to local DB * @param {boolean} incrementalUpdate - Set to false for a full sync; true for incremental * @returns {object} Information about the sync operation */ async function importPurchaseOrders(prodConnection, localConnection, incrementalUpdate = true) { const startTime = Date.now(); let poRecordsAdded = 0; let poRecordsUpdated = 0; let poRecordsDeleted = 0; let receivingRecordsAdded = 0; let receivingRecordsUpdated = 0; let receivingRecordsDeleted = 0; let totalProcessed = 0; // Batch size constants const PO_BATCH_SIZE = 500; const INSERT_BATCH_SIZE = 100; try { // Begin transaction for the entire import process await localConnection.beginTransaction(); // Get last sync info const [syncInfo] = await localConnection.query( "SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'purchase_orders'" ); const lastSyncTime = syncInfo?.rows?.[0]?.last_sync_timestamp || '1970-01-01'; console.log('Purchase Orders: Using last sync time:', lastSyncTime); // Create temp tables for processing await localConnection.query(` DROP TABLE IF EXISTS temp_purchase_orders; DROP TABLE IF EXISTS temp_receivings; DROP TABLE IF EXISTS employee_names; DROP TABLE IF EXISTS temp_supplier_names; -- Temporary table for purchase orders CREATE TEMP TABLE temp_purchase_orders ( po_id TEXT NOT NULL, pid BIGINT NOT NULL, sku TEXT, name TEXT, vendor TEXT, date TIMESTAMP WITH TIME ZONE, expected_date DATE, status TEXT, notes TEXT, long_note TEXT, ordered INTEGER, po_cost_price NUMERIC(14, 4), supplier_id INTEGER, date_created TIMESTAMP WITH TIME ZONE, date_ordered TIMESTAMP WITH TIME ZONE, PRIMARY KEY (po_id, pid) ); -- Temporary table for receivings CREATE TEMP TABLE temp_receivings ( receiving_id TEXT NOT NULL, pid BIGINT NOT NULL, sku TEXT, name TEXT, vendor TEXT, qty_each INTEGER, qty_each_orig INTEGER, cost_each NUMERIC(14, 5), cost_each_orig NUMERIC(14, 5), received_by INTEGER, received_by_name TEXT, received_date TIMESTAMP WITH TIME ZONE, receiving_created_date TIMESTAMP WITH TIME ZONE, supplier_id INTEGER, status TEXT, PRIMARY KEY (receiving_id, pid) ); -- Temporary table for employee names CREATE TEMP TABLE employee_names ( employeeid INTEGER PRIMARY KEY, firstname TEXT, lastname TEXT ); -- Create indexes for efficient joins CREATE INDEX idx_temp_po_pid ON temp_purchase_orders(pid); CREATE INDEX idx_temp_receiving_pid ON temp_receivings(pid); `); // Map status codes to text values const poStatusMap = { 0: 'canceled', 1: 'created', 10: 'electronically_ready_send', 11: 'ordered', 12: 'preordered', 13: 'electronically_sent', 15: 'receiving_started', 50: 'done' }; const receivingStatusMap = { 0: 'canceled', 1: 'created', 30: 'partial_received', 40: 'full_received', 50: 'paid' }; // Get time window for data retrieval const yearInterval = incrementalUpdate ? 1 : 5; // Fetch employee data from production outputProgress({ status: "running", operation: "Purchase orders import", message: "Fetching employee data" }); const [employees] = await prodConnection.query(` SELECT employeeid, firstname, lastname FROM employees `); // Insert employee data into temp table if (employees.length > 0) { const employeeValues = employees.map(emp => [ emp.employeeid, emp.firstname || '', emp.lastname || '' ]).flat(); const placeholders = employees.map((_, idx) => { const base = idx * 3; return `($${base + 1}, $${base + 2}, $${base + 3})`; }).join(','); await localConnection.query(` INSERT INTO employee_names (employeeid, firstname, lastname) VALUES ${placeholders} ON CONFLICT (employeeid) DO UPDATE SET firstname = EXCLUDED.firstname, lastname = EXCLUDED.lastname `, employeeValues); } // Add this section before the PO import to create a supplier names mapping outputProgress({ status: "running", operation: "Purchase orders import", message: "Fetching supplier data for vendor mapping" }); // Fetch supplier data from production and store in a temp table const [suppliers] = await prodConnection.query(` SELECT supplierid, companyname FROM suppliers WHERE companyname IS NOT NULL AND companyname != '' `); if (suppliers.length > 0) { // Create temp table for supplier names await localConnection.query(` DROP TABLE IF EXISTS temp_supplier_names; CREATE TEMP TABLE temp_supplier_names ( supplier_id INTEGER PRIMARY KEY, company_name TEXT NOT NULL ); `); // Insert supplier data in batches for (let i = 0; i < suppliers.length; i += INSERT_BATCH_SIZE) { const batch = suppliers.slice(i, i + INSERT_BATCH_SIZE); const placeholders = batch.map((_, idx) => { const base = idx * 2; return `($${base + 1}, $${base + 2})`; }).join(','); const values = batch.flatMap(s => [ s.supplierid, s.companyname || 'Unnamed Supplier' ]); await localConnection.query(` INSERT INTO temp_supplier_names (supplier_id, company_name) VALUES ${placeholders} ON CONFLICT (supplier_id) DO UPDATE SET company_name = EXCLUDED.company_name `, values); } } // 1. Fetch and process purchase orders outputProgress({ status: "running", operation: "Purchase orders import", message: "Fetching purchase orders" }); const [poCount] = await prodConnection.query(` SELECT COUNT(*) as total FROM po p WHERE p.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL ${yearInterval} YEAR) ${incrementalUpdate ? ` AND ( p.date_updated > ? OR p.date_ordered > ? OR p.date_estin > ? ) ` : ''} `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []); const totalPOs = poCount[0].total; console.log(`Found ${totalPOs} relevant purchase orders`); // Skip processing if no POs to process if (totalPOs === 0) { console.log('No purchase orders to process, skipping PO import step'); } else { // Fetch and process POs in batches let offset = 0; let allPOsProcessed = false; while (!allPOsProcessed) { const [poList] = await prodConnection.query(` SELECT p.po_id, p.supplier_id, s.companyname AS vendor, p.status, p.notes AS long_note, p.short_note AS notes, p.date_created, p.date_ordered, p.date_estin FROM po p LEFT JOIN suppliers s ON p.supplier_id = s.supplierid WHERE p.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL ${yearInterval} YEAR) ${incrementalUpdate ? ` AND ( p.date_updated > ? OR p.date_ordered > ? OR p.date_estin > ? ) ` : ''} ORDER BY p.po_id LIMIT ${PO_BATCH_SIZE} OFFSET ${offset} `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []); if (poList.length === 0) { allPOsProcessed = true; break; } // Get products for these POs const poIds = poList.map(po => po.po_id); const [poProducts] = await prodConnection.query(` SELECT pp.po_id, pp.pid, pp.qty_each, pp.cost_each, COALESCE(p.itemnumber, 'NO-SKU') AS sku, COALESCE(p.description, 'Unknown Product') AS name FROM po_products pp LEFT JOIN products p ON pp.pid = p.pid WHERE pp.po_id IN (?) `, [poIds]); // Build complete PO records const completePOs = []; for (const product of poProducts) { const po = poList.find(p => p.po_id == product.po_id); if (!po) continue; completePOs.push({ po_id: po.po_id.toString(), pid: product.pid, sku: product.sku, name: product.name, vendor: po.vendor || 'Unknown Vendor', date: validateDate(po.date_ordered) || validateDate(po.date_created), expected_date: validateDate(po.date_estin), status: poStatusMap[po.status] || 'created', notes: po.notes || '', long_note: po.long_note || '', ordered: product.qty_each, po_cost_price: product.cost_each, supplier_id: po.supplier_id, date_created: validateDate(po.date_created), date_ordered: validateDate(po.date_ordered) }); } // Insert PO data in batches for (let i = 0; i < completePOs.length; i += INSERT_BATCH_SIZE) { const batch = completePOs.slice(i, i + INSERT_BATCH_SIZE); const placeholders = batch.map((_, idx) => { const base = idx * 15; return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, $${base + 10}, $${base + 11}, $${base + 12}, $${base + 13}, $${base + 14}, $${base + 15})`; }).join(','); const values = batch.flatMap(po => [ po.po_id, po.pid, po.sku, po.name, po.vendor, po.date, po.expected_date, po.status, po.notes, po.long_note, po.ordered, po.po_cost_price, po.supplier_id, po.date_created, po.date_ordered ]); await localConnection.query(` INSERT INTO temp_purchase_orders ( po_id, pid, sku, name, vendor, date, expected_date, status, notes, long_note, ordered, po_cost_price, supplier_id, date_created, date_ordered ) VALUES ${placeholders} ON CONFLICT (po_id, pid) DO UPDATE SET sku = EXCLUDED.sku, name = EXCLUDED.name, vendor = EXCLUDED.vendor, date = EXCLUDED.date, expected_date = EXCLUDED.expected_date, status = EXCLUDED.status, notes = EXCLUDED.notes, long_note = EXCLUDED.long_note, ordered = EXCLUDED.ordered, po_cost_price = EXCLUDED.po_cost_price, supplier_id = EXCLUDED.supplier_id, date_created = EXCLUDED.date_created, date_ordered = EXCLUDED.date_ordered `, values); } offset += poList.length; totalProcessed += completePOs.length; outputProgress({ status: "running", operation: "Purchase orders import", message: `Processed ${offset} of ${totalPOs} purchase orders (${totalProcessed} line items)`, current: offset, total: totalPOs, elapsed: formatElapsedTime((Date.now() - startTime) / 1000), remaining: estimateRemaining(startTime, offset, totalPOs), rate: calculateRate(startTime, offset) }); if (poList.length < PO_BATCH_SIZE) { allPOsProcessed = true; } } } // 2. Next, fetch all relevant receivings outputProgress({ status: "running", operation: "Purchase orders import", message: "Fetching receivings data" }); const [receivingCount] = await prodConnection.query(` SELECT COUNT(*) as total FROM receivings r WHERE r.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL ${yearInterval} YEAR) ${incrementalUpdate ? ` AND ( r.date_updated > ? OR r.date_created > ? ) ` : ''} `, incrementalUpdate ? [lastSyncTime, lastSyncTime] : []); const totalReceivings = receivingCount[0].total; console.log(`Found ${totalReceivings} relevant receivings`); // Skip processing if no receivings to process if (totalReceivings === 0) { console.log('No receivings to process, skipping receivings import step'); } else { // Fetch and process receivings in batches offset = 0; // Reset offset for receivings let allReceivingsProcessed = false; while (!allReceivingsProcessed) { const [receivingList] = await prodConnection.query(` SELECT r.receiving_id, r.supplier_id, r.status, r.notes, r.shipping, r.total_amount, r.hold, r.for_storefront, r.date_created, r.date_paid, r.date_checked FROM receivings r WHERE r.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL ${yearInterval} YEAR) ${incrementalUpdate ? ` AND ( r.date_updated > ? OR r.date_created > ? ) ` : ''} ORDER BY r.receiving_id LIMIT ${PO_BATCH_SIZE} OFFSET ${offset} `, incrementalUpdate ? [lastSyncTime, lastSyncTime] : []); if (receivingList.length === 0) { allReceivingsProcessed = true; break; } // Get products for these receivings const receivingIds = receivingList.map(r => r.receiving_id); const [receivingProducts] = await prodConnection.query(` SELECT rp.receiving_id, rp.pid, rp.qty_each, rp.qty_each_orig, rp.cost_each, rp.cost_each_orig, rp.received_by, rp.received_date, r.date_created as receiving_created_date, COALESCE(p.itemnumber, 'NO-SKU') AS sku, COALESCE(p.description, 'Unknown Product') AS name FROM receivings_products rp JOIN receivings r ON rp.receiving_id = r.receiving_id LEFT JOIN products p ON rp.pid = p.pid WHERE rp.receiving_id IN (?) `, [receivingIds]); // Build complete receiving records const completeReceivings = []; for (const product of receivingProducts) { const receiving = receivingList.find(r => r.receiving_id == product.receiving_id); if (!receiving) continue; // Get employee name if available let receivedByName = null; if (product.received_by) { const [employeeResult] = await localConnection.query(` SELECT CONCAT(firstname, ' ', lastname) as full_name FROM employee_names WHERE employeeid = $1 `, [product.received_by]); if (employeeResult.rows.length > 0) { receivedByName = employeeResult.rows[0].full_name; } } // Get vendor name if available let vendorName = 'Unknown Vendor'; if (receiving.supplier_id) { const [vendorResult] = await localConnection.query(` SELECT company_name FROM temp_supplier_names WHERE supplier_id = $1 `, [receiving.supplier_id]); if (vendorResult.rows.length > 0) { vendorName = vendorResult.rows[0].company_name; } } completeReceivings.push({ receiving_id: receiving.receiving_id.toString(), pid: product.pid, sku: product.sku, name: product.name, vendor: vendorName, qty_each: product.qty_each, qty_each_orig: product.qty_each_orig, cost_each: product.cost_each, cost_each_orig: product.cost_each_orig, received_by: product.received_by, received_by_name: receivedByName, received_date: validateDate(product.received_date) || validateDate(product.receiving_created_date), receiving_created_date: validateDate(product.receiving_created_date), supplier_id: receiving.supplier_id, status: receivingStatusMap[receiving.status] || 'created' }); } // Insert receiving data in batches for (let i = 0; i < completeReceivings.length; i += INSERT_BATCH_SIZE) { const batch = completeReceivings.slice(i, i + INSERT_BATCH_SIZE); const placeholders = batch.map((_, idx) => { const base = idx * 15; return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, $${base + 10}, $${base + 11}, $${base + 12}, $${base + 13}, $${base + 14}, $${base + 15})`; }).join(','); const values = batch.flatMap(r => [ r.receiving_id, r.pid, r.sku, r.name, r.vendor, r.qty_each, r.qty_each_orig, r.cost_each, r.cost_each_orig, r.received_by, r.received_by_name, r.received_date, r.receiving_created_date, r.supplier_id, r.status ]); await localConnection.query(` INSERT INTO temp_receivings ( receiving_id, pid, sku, name, vendor, qty_each, qty_each_orig, cost_each, cost_each_orig, received_by, received_by_name, received_date, receiving_created_date, supplier_id, status ) VALUES ${placeholders} ON CONFLICT (receiving_id, pid) DO UPDATE SET sku = EXCLUDED.sku, name = EXCLUDED.name, vendor = EXCLUDED.vendor, qty_each = EXCLUDED.qty_each, qty_each_orig = EXCLUDED.qty_each_orig, cost_each = EXCLUDED.cost_each, cost_each_orig = EXCLUDED.cost_each_orig, received_by = EXCLUDED.received_by, received_by_name = EXCLUDED.received_by_name, received_date = EXCLUDED.received_date, receiving_created_date = EXCLUDED.receiving_created_date, supplier_id = EXCLUDED.supplier_id, status = EXCLUDED.status `, values); } offset += receivingList.length; totalProcessed += completeReceivings.length; outputProgress({ status: "running", operation: "Purchase orders import", message: `Processed ${offset} of ${totalReceivings} receivings (${totalProcessed} line items total)`, current: offset, total: totalReceivings, elapsed: formatElapsedTime((Date.now() - startTime) / 1000), remaining: estimateRemaining(startTime, offset, totalReceivings), rate: calculateRate(startTime, offset) }); if (receivingList.length < PO_BATCH_SIZE) { allReceivingsProcessed = true; } } } // Add this section to filter out invalid PIDs before final import outputProgress({ status: "running", operation: "Purchase orders import", message: "Validating product IDs before final import" }); await localConnection.query(` -- Create temp table to store invalid PIDs DROP TABLE IF EXISTS temp_invalid_pids; CREATE TEMP TABLE temp_invalid_pids AS ( -- Get all unique PIDs from our temp tables WITH all_pids AS ( SELECT DISTINCT pid FROM temp_purchase_orders UNION SELECT DISTINCT pid FROM temp_receivings ) -- Filter to only those that don't exist in products table SELECT p.pid FROM all_pids p WHERE NOT EXISTS ( SELECT 1 FROM products WHERE pid = p.pid ) ); -- Remove purchase orders with invalid PIDs DELETE FROM temp_purchase_orders WHERE pid IN (SELECT pid FROM temp_invalid_pids); -- Remove receivings with invalid PIDs DELETE FROM temp_receivings WHERE pid IN (SELECT pid FROM temp_invalid_pids); `); // Get count of filtered items for reporting const [filteredResult] = await localConnection.query(` SELECT COUNT(*) as count FROM temp_invalid_pids `); const filteredCount = filteredResult.rows[0].count; if (filteredCount > 0) { console.log(`Filtered out ${filteredCount} items with invalid product IDs`); } // 3. Insert final purchase order records to the actual table outputProgress({ status: "running", operation: "Purchase orders import", message: "Inserting final purchase order records" }); // Create a temp table to track PO IDs being processed await localConnection.query(` DROP TABLE IF EXISTS processed_po_ids; CREATE TEMP TABLE processed_po_ids AS ( SELECT DISTINCT po_id FROM temp_purchase_orders ); `); // Delete products that were removed from POs and count them const [poDeletedResult] = await localConnection.query(` WITH deleted AS ( DELETE FROM purchase_orders WHERE po_id IN (SELECT po_id FROM processed_po_ids) AND NOT EXISTS ( SELECT 1 FROM temp_purchase_orders tp WHERE purchase_orders.po_id = tp.po_id AND purchase_orders.pid = tp.pid ) RETURNING po_id, pid ) SELECT COUNT(*) as count FROM deleted `); poRecordsDeleted = poDeletedResult.rows[0]?.count || 0; console.log(`Deleted ${poRecordsDeleted} products that were removed from purchase orders`); const [poResult] = await localConnection.query(` INSERT INTO purchase_orders ( po_id, vendor, date, expected_date, pid, sku, name, po_cost_price, status, notes, long_note, ordered, supplier_id, date_created, date_ordered ) SELECT po_id, vendor, COALESCE(date, date_created, now()) as date, expected_date, pid, sku, name, po_cost_price, status, notes, long_note, ordered, supplier_id, date_created, date_ordered FROM temp_purchase_orders ON CONFLICT (po_id, pid) DO UPDATE SET vendor = EXCLUDED.vendor, date = EXCLUDED.date, expected_date = EXCLUDED.expected_date, sku = EXCLUDED.sku, name = EXCLUDED.name, po_cost_price = EXCLUDED.po_cost_price, status = EXCLUDED.status, notes = EXCLUDED.notes, long_note = EXCLUDED.long_note, ordered = EXCLUDED.ordered, supplier_id = EXCLUDED.supplier_id, date_created = EXCLUDED.date_created, date_ordered = EXCLUDED.date_ordered, updated = CURRENT_TIMESTAMP RETURNING (xmax = 0) as inserted `); poRecordsAdded = poResult.rows.filter(r => r.inserted).length; poRecordsUpdated = poResult.rows.filter(r => !r.inserted).length; // 4. Insert final receiving records to the actual table outputProgress({ status: "running", operation: "Purchase orders import", message: "Inserting final receiving records" }); // Create a temp table to track receiving IDs being processed await localConnection.query(` DROP TABLE IF EXISTS processed_receiving_ids; CREATE TEMP TABLE processed_receiving_ids AS ( SELECT DISTINCT receiving_id FROM temp_receivings ); `); // Delete products that were removed from receivings and count them const [receivingDeletedResult] = await localConnection.query(` WITH deleted AS ( DELETE FROM receivings WHERE receiving_id IN (SELECT receiving_id FROM processed_receiving_ids) AND NOT EXISTS ( SELECT 1 FROM temp_receivings tr WHERE receivings.receiving_id = tr.receiving_id AND receivings.pid = tr.pid ) RETURNING receiving_id, pid ) SELECT COUNT(*) as count FROM deleted `); receivingRecordsDeleted = receivingDeletedResult.rows[0]?.count || 0; console.log(`Deleted ${receivingRecordsDeleted} products that were removed from receivings`); const [receivingsResult] = await localConnection.query(` INSERT INTO receivings ( receiving_id, pid, sku, name, vendor, qty_each, qty_each_orig, cost_each, cost_each_orig, received_by, received_by_name, received_date, receiving_created_date, supplier_id, status ) SELECT receiving_id, pid, sku, name, vendor, qty_each, qty_each_orig, cost_each, cost_each_orig, received_by, received_by_name, COALESCE(received_date, receiving_created_date, now()) as received_date, receiving_created_date, supplier_id, status FROM temp_receivings ON CONFLICT (receiving_id, pid) DO UPDATE SET sku = EXCLUDED.sku, name = EXCLUDED.name, vendor = EXCLUDED.vendor, qty_each = EXCLUDED.qty_each, qty_each_orig = EXCLUDED.qty_each_orig, cost_each = EXCLUDED.cost_each, cost_each_orig = EXCLUDED.cost_each_orig, received_by = EXCLUDED.received_by, received_by_name = EXCLUDED.received_by_name, received_date = EXCLUDED.received_date, receiving_created_date = EXCLUDED.receiving_created_date, supplier_id = EXCLUDED.supplier_id, status = EXCLUDED.status, updated = CURRENT_TIMESTAMP RETURNING (xmax = 0) as inserted `); receivingRecordsAdded = receivingsResult.rows.filter(r => r.inserted).length; receivingRecordsUpdated = receivingsResult.rows.filter(r => !r.inserted).length; // Update sync status await localConnection.query(` INSERT INTO sync_status (table_name, last_sync_timestamp) VALUES ('purchase_orders', NOW()) ON CONFLICT (table_name) DO UPDATE SET last_sync_timestamp = NOW() `); // Clean up temporary tables await localConnection.query(` DROP TABLE IF EXISTS temp_purchase_orders; DROP TABLE IF EXISTS temp_receivings; DROP TABLE IF EXISTS employee_names; DROP TABLE IF EXISTS temp_supplier_names; DROP TABLE IF EXISTS temp_invalid_pids; DROP TABLE IF EXISTS processed_po_ids; DROP TABLE IF EXISTS processed_receiving_ids; `); // Commit transaction await localConnection.commit(); return { status: "complete", recordsAdded: poRecordsAdded + receivingRecordsAdded, recordsUpdated: poRecordsUpdated + receivingRecordsUpdated, recordsDeleted: poRecordsDeleted + receivingRecordsDeleted, poRecordsAdded, poRecordsUpdated, poRecordsDeleted, receivingRecordsAdded, receivingRecordsUpdated, receivingRecordsDeleted, totalRecords: totalProcessed }; } catch (error) { console.error("Error during purchase orders import:", error); // Rollback transaction try { await localConnection.rollback(); } catch (rollbackError) { console.error('Error during rollback:', rollbackError.message); } return { status: "error", error: error.message, recordsAdded: 0, recordsUpdated: 0, recordsDeleted: 0, totalRecords: 0 }; } } module.exports = importPurchaseOrders;