diff --git a/inventory-server/scripts/import-from-prod.js b/inventory-server/scripts/import-from-prod.js index 8ecfb0d..cd00f39 100644 --- a/inventory-server/scripts/import-from-prod.js +++ b/inventory-server/scripts/import-from-prod.js @@ -10,9 +10,9 @@ const importPurchaseOrders = require('./import/purchase-orders'); dotenv.config({ path: path.join(__dirname, "../.env") }); // Constants to control which imports run -const IMPORT_CATEGORIES = true; -const IMPORT_PRODUCTS = true; -const IMPORT_ORDERS = true; +const IMPORT_CATEGORIES = false; +const IMPORT_PRODUCTS = false; +const IMPORT_ORDERS = false; const IMPORT_PURCHASE_ORDERS = true; // Add flag for incremental updates diff --git a/inventory-server/scripts/import/purchase-orders.js b/inventory-server/scripts/import/purchase-orders.js index fd95c34..555f9b8 100644 --- a/inventory-server/scripts/import/purchase-orders.js +++ b/inventory-server/scripts/import/purchase-orders.js @@ -22,11 +22,15 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental CREATE TEMP TABLE temp_purchase_orders ( po_id INTEGER NOT NULL, pid INTEGER NOT NULL, + sku VARCHAR(50), + name VARCHAR(255), vendor VARCHAR(255), date DATE, expected_date DATE, status INTEGER, notes TEXT, + ordered INTEGER, + cost_price DECIMAL(10,3), PRIMARY KEY (po_id, pid) ); @@ -224,7 +228,7 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental const productPids = [...new Set(productBatch.map(p => p.pid))]; const batchPoIds = [...new Set(productBatch.map(p => p.po_id))]; - // Get receivings for this batch with employee names - Keep MySQL compatible for production + // Get receivings for this batch with employee names const [receivings] = await prodConnection.query(` SELECT r.po_id, @@ -251,142 +255,360 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental // Insert receivings into temp table if (receivings.length > 0) { - const placeholders = receivings.map((_, idx) => { - const base = idx * 9; - return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9})`; - }).join(','); + // Process in smaller chunks to avoid parameter limits + const CHUNK_SIZE = 100; // Reduce chunk size to avoid parameter limits + for (let i = 0; i < receivings.length; i += CHUNK_SIZE) { + const chunk = receivings.slice(i, Math.min(i + CHUNK_SIZE, receivings.length)); + + const values = []; + const placeholders = []; + + chunk.forEach((r, idx) => { + values.push( + r.po_id, + r.pid, + r.receiving_id, + r.qty_each, + r.cost_each, + r.received_date, + r.received_by, + r.received_by_name || null, + r.is_alt_po + ); + + const offset = idx * 9; + placeholders.push(`($${offset + 1}, $${offset + 2}, $${offset + 3}, $${offset + 4}, $${offset + 5}, $${offset + 6}, $${offset + 7}, $${offset + 8}, $${offset + 9})`); + }); - const values = receivings.flatMap(r => [ - r.po_id, - r.pid, - r.receiving_id, - r.qty_each, - r.cost_each, - r.received_date, - r.received_by, - r.received_by_name, - r.is_alt_po - ]); - - await localConnection.query(` - INSERT INTO temp_po_receivings ( - po_id, pid, receiving_id, qty_each, cost_each, received_date, - received_by, received_by_name, is_alt_po - ) - VALUES ${placeholders} - ON CONFLICT (receiving_id, pid) DO UPDATE SET - po_id = EXCLUDED.po_id, - qty_each = EXCLUDED.qty_each, - cost_each = EXCLUDED.cost_each, - received_date = EXCLUDED.received_date, - received_by = EXCLUDED.received_by, - received_by_name = EXCLUDED.received_by_name, - is_alt_po = EXCLUDED.is_alt_po - `, values); + await localConnection.query(` + INSERT INTO temp_po_receivings ( + po_id, pid, receiving_id, qty_each, cost_each, received_date, + received_by, received_by_name, is_alt_po + ) + VALUES ${placeholders.join(',')} + ON CONFLICT (receiving_id, pid) DO UPDATE SET + po_id = EXCLUDED.po_id, + qty_each = EXCLUDED.qty_each, + cost_each = EXCLUDED.cost_each, + received_date = EXCLUDED.received_date, + received_by = EXCLUDED.received_by, + received_by_name = EXCLUDED.received_by_name, + is_alt_po = EXCLUDED.is_alt_po + `, values); + } } - // Process each PO product - for (const product of productBatch) { - const po = batch.find(p => p.po_id === product.po_id); - if (!po) continue; - - // Insert into temp_purchase_orders - const placeholders = `($1, $2, $3, $4, $5, $6, $7)`; - const values = [ - product.po_id, - product.pid, - po.vendor, - po.date, - po.expected_date, - po.status, - po.notes || po.long_note - ]; - - await localConnection.query(` - INSERT INTO temp_purchase_orders ( - po_id, pid, vendor, date, expected_date, status, notes - ) - VALUES ${placeholders} - ON CONFLICT (po_id, pid) DO UPDATE SET - vendor = EXCLUDED.vendor, - date = EXCLUDED.date, - expected_date = EXCLUDED.expected_date, - status = EXCLUDED.status, - notes = EXCLUDED.notes - `, values); - - processed++; + // Process each PO product in chunks + const PRODUCT_CHUNK_SIZE = 100; + for (let i = 0; i < productBatch.length; i += PRODUCT_CHUNK_SIZE) { + const chunk = productBatch.slice(i, Math.min(i + PRODUCT_CHUNK_SIZE, productBatch.length)); + const values = []; + const placeholders = []; - // Update progress periodically - if (Date.now() - lastProgressUpdate > PROGRESS_INTERVAL) { + chunk.forEach((product, idx) => { + const po = batch.find(p => p.po_id === product.po_id); + if (!po) return; + + values.push( + product.po_id, + product.pid, + product.sku, + product.name, + po.vendor, + po.date, + po.expected_date, + po.status, + po.notes || po.long_note, + product.ordered, + product.cost_each + ); + + const offset = idx * 11; // Updated to match 11 fields + placeholders.push(`($${offset + 1}, $${offset + 2}, $${offset + 3}, $${offset + 4}, $${offset + 5}, $${offset + 6}, $${offset + 7}, $${offset + 8}, $${offset + 9}, $${offset + 10}, $${offset + 11})`); + }); + + if (placeholders.length > 0) { + await localConnection.query(` + INSERT INTO temp_purchase_orders ( + po_id, pid, sku, name, vendor, date, expected_date, + status, notes, ordered, cost_price + ) + VALUES ${placeholders.join(',')} + ON CONFLICT (po_id, pid) DO UPDATE SET + sku = EXCLUDED.sku, + name = EXCLUDED.name, + vendor = EXCLUDED.vendor, + date = EXCLUDED.date, + expected_date = EXCLUDED.expected_date, + status = EXCLUDED.status, + notes = EXCLUDED.notes, + ordered = EXCLUDED.ordered, + cost_price = EXCLUDED.cost_price + `, values); + } + + processed += chunk.length; + + // Update progress based on time interval + const now = Date.now(); + if (now - lastProgressUpdate >= PROGRESS_INTERVAL || processed === totalItems) { outputProgress({ status: "running", operation: "Purchase orders import", - message: `Processing purchase orders: ${processed} of ${totalItems}`, current: processed, total: totalItems, elapsed: formatElapsedTime((Date.now() - startTime) / 1000), remaining: estimateRemaining(startTime, processed, totalItems), rate: calculateRate(startTime, processed) }); - lastProgressUpdate = Date.now(); + lastProgressUpdate = now; } } } } - // Insert final data into purchase_orders table - const [result] = await localConnection.query(` - WITH inserted_pos AS ( - INSERT INTO purchase_orders ( - po_id, pid, vendor, date, expected_date, status, notes, - received_qty, received_cost, last_received_date, last_received_by, - alt_po_received_qty, alt_po_last_received_date, - no_po_received_qty, no_po_last_received_date + // Insert final data into purchase_orders table in chunks + const FINAL_CHUNK_SIZE = 1000; + let totalProcessed = 0; + const totalPosResult = await localConnection.query('SELECT COUNT(*) as total_pos FROM temp_purchase_orders'); + const total_pos = parseInt(totalPosResult.rows?.[0]?.total_pos || '0', 10); + + outputProgress({ + status: "running", + operation: "Purchase orders final import", + message: `Processing ${total_pos} purchase orders for final import`, + current: 0, + total: total_pos + }); + + // Process in chunks using cursor-based pagination + let lastPoId = 0; + let lastPid = 0; + let recordsAdded = 0; + let recordsUpdated = 0; + + while (true) { + console.log('Fetching next chunk with lastPoId:', lastPoId, 'lastPid:', lastPid); + const chunkResult = await localConnection.query(` + SELECT po_id, pid FROM temp_purchase_orders + WHERE (po_id, pid) > ($1, $2) + ORDER BY po_id, pid + LIMIT $3 + `, [lastPoId, lastPid, FINAL_CHUNK_SIZE]); + + if (!chunkResult?.rows) { + console.error('No rows returned from chunk query:', chunkResult); + break; + } + + const chunk = chunkResult.rows; + console.log('Got chunk of size:', chunk.length); + if (chunk.length === 0) break; + + const result = await localConnection.query(` + WITH inserted_pos AS ( + INSERT INTO purchase_orders ( + po_id, pid, sku, name, cost_price, po_cost_price, + vendor, date, expected_date, status, notes, + ordered, received, receiving_status, + received_date, last_received_date, received_by, + receiving_history + ) + SELECT + po.po_id, + po.pid, + po.sku, + po.name, + COALESCE( + ( + SELECT cost_each + FROM temp_po_receivings r2 + WHERE r2.pid = po.pid + AND r2.is_alt_po = 0 + AND r2.cost_each > 0 + ORDER BY r2.received_date + LIMIT 1 + ), + po.cost_price + ) as cost_price, + po.cost_price as po_cost_price, + po.vendor, + po.date, + po.expected_date, + po.status, + po.notes, + po.ordered, + COALESCE(SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END), 0) as received, + CASE + WHEN COUNT(r.receiving_id) = 0 THEN 1 -- created + WHEN SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END) < po.ordered THEN 30 -- partial + ELSE 40 -- full + END as receiving_status, + MIN(CASE WHEN r.is_alt_po = 0 THEN r.received_date END) as received_date, + MAX(CASE WHEN r.is_alt_po = 0 THEN r.received_date END) as last_received_date, + ( + SELECT r2.received_by_name + FROM temp_po_receivings r2 + WHERE r2.pid = po.pid + AND r2.is_alt_po = 0 + ORDER BY r2.received_date + LIMIT 1 + ) as received_by, + jsonb_build_object( + 'ordered_qty', po.ordered, + 'total_received', COALESCE(SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END), 0), + 'remaining_unfulfilled', GREATEST(0, po.ordered - COALESCE(SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END), 0)), + 'excess_received', GREATEST(0, COALESCE(SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END), 0) - po.ordered), + 'po_cost', po.cost_price, + 'actual_cost', COALESCE( + ( + SELECT cost_each + FROM temp_po_receivings r2 + WHERE r2.pid = po.pid + AND r2.is_alt_po = 0 + AND r2.cost_each > 0 + ORDER BY r2.received_date + LIMIT 1 + ), + po.cost_price + ), + 'fulfillment', ( + SELECT jsonb_agg( + jsonb_build_object( + 'receiving_id', r2.receiving_id, + 'qty_applied', CASE + WHEN r2.running_total <= po.ordered THEN r2.qty_each + WHEN r2.running_total - r2.qty_each < po.ordered THEN po.ordered - (r2.running_total - r2.qty_each) + ELSE 0 + END, + 'qty_total', r2.qty_each, + 'cost', r2.cost_each, + 'date', r2.received_date, + 'received_by', r2.received_by, + 'received_by_name', r2.received_by_name, + 'type', CASE r2.is_alt_po + WHEN 0 THEN 'original' + WHEN 1 THEN 'alternate' + ELSE 'no_po' + END, + 'remaining_qty', CASE + WHEN r2.running_total <= po.ordered THEN 0 + WHEN r2.running_total - r2.qty_each < po.ordered THEN r2.running_total - po.ordered + ELSE r2.qty_each + END, + 'is_excess', r2.running_total > po.ordered + ) + ORDER BY r2.received_date + ) + FROM ( + SELECT + r2.*, + SUM(r2.qty_each) OVER ( + PARTITION BY r2.pid + ORDER BY r2.received_date + ROWS UNBOUNDED PRECEDING + ) as running_total + FROM temp_po_receivings r2 + WHERE r2.pid = po.pid + ) r2 + ), + 'alternate_po_receivings', ( + SELECT jsonb_agg( + jsonb_build_object( + 'receiving_id', r2.receiving_id, + 'qty', r2.qty_each, + 'cost', r2.cost_each, + 'date', r2.received_date, + 'received_by', r2.received_by, + 'received_by_name', r2.received_by_name + ) + ORDER BY r2.received_date + ) + FROM temp_po_receivings r2 + WHERE r2.pid = po.pid AND r2.is_alt_po = 1 + ), + 'no_po_receivings', ( + SELECT jsonb_agg( + jsonb_build_object( + 'receiving_id', r2.receiving_id, + 'qty', r2.qty_each, + 'cost', r2.cost_each, + 'date', r2.received_date, + 'received_by', r2.received_by, + 'received_by_name', r2.received_by_name + ) + ORDER BY r2.received_date + ) + FROM temp_po_receivings r2 + WHERE r2.pid = po.pid AND r2.is_alt_po = 2 + ) + ) as receiving_history + FROM temp_purchase_orders po + LEFT JOIN temp_po_receivings r ON po.pid = r.pid + WHERE (po.po_id, po.pid) IN ( + SELECT po_id, pid FROM UNNEST($1::int[], $2::int[]) + ) + GROUP BY po.po_id, po.pid, po.sku, po.name, po.vendor, po.date, + po.expected_date, po.status, po.notes, po.ordered, po.cost_price + ON CONFLICT (po_id, pid) DO UPDATE SET + vendor = EXCLUDED.vendor, + date = EXCLUDED.date, + expected_date = EXCLUDED.expected_date, + status = EXCLUDED.status, + notes = EXCLUDED.notes, + ordered = EXCLUDED.ordered, + received = EXCLUDED.received, + receiving_status = EXCLUDED.receiving_status, + received_date = EXCLUDED.received_date, + last_received_date = EXCLUDED.last_received_date, + received_by = EXCLUDED.received_by, + receiving_history = EXCLUDED.receiving_history, + cost_price = EXCLUDED.cost_price, + po_cost_price = EXCLUDED.po_cost_price + RETURNING xmax ) SELECT - po.po_id, - po.pid, - po.vendor, - po.date, - po.expected_date, - po.status, - po.notes, - COALESCE(SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END), 0) as received_qty, - COALESCE(AVG(CASE WHEN r.is_alt_po = 0 THEN r.cost_each END), 0) as received_cost, - MAX(CASE WHEN r.is_alt_po = 0 THEN r.received_date END) as last_received_date, - MAX(CASE WHEN r.is_alt_po = 0 THEN r.received_by_name END) as last_received_by, - COALESCE(SUM(CASE WHEN r.is_alt_po = 1 THEN r.qty_each END), 0) as alt_po_received_qty, - MAX(CASE WHEN r.is_alt_po = 1 THEN r.received_date END) as alt_po_last_received_date, - COALESCE(SUM(CASE WHEN r.is_alt_po = 2 THEN r.qty_each END), 0) as no_po_received_qty, - MAX(CASE WHEN r.is_alt_po = 2 THEN r.received_date END) as no_po_last_received_date - FROM temp_purchase_orders po - LEFT JOIN temp_po_receivings r ON po.pid = r.pid - GROUP BY po.po_id, po.pid, po.vendor, po.date, po.expected_date, po.status, po.notes - ON CONFLICT (po_id, pid) DO UPDATE SET - vendor = EXCLUDED.vendor, - date = EXCLUDED.date, - expected_date = EXCLUDED.expected_date, - status = EXCLUDED.status, - notes = EXCLUDED.notes, - received_qty = EXCLUDED.received_qty, - received_cost = EXCLUDED.received_cost, - last_received_date = EXCLUDED.last_received_date, - last_received_by = EXCLUDED.last_received_by, - alt_po_received_qty = EXCLUDED.alt_po_received_qty, - alt_po_last_received_date = EXCLUDED.alt_po_last_received_date, - no_po_received_qty = EXCLUDED.no_po_received_qty, - no_po_last_received_date = EXCLUDED.no_po_last_received_date - RETURNING xmax - ) - SELECT - COUNT(*) FILTER (WHERE xmax = 0) as inserted, - COUNT(*) FILTER (WHERE xmax <> 0) as updated - FROM inserted_pos - `); + COUNT(*) FILTER (WHERE xmax = 0) as inserted, + COUNT(*) FILTER (WHERE xmax <> 0) as updated + FROM inserted_pos + `, [ + chunk.map(r => r.po_id), + chunk.map(r => r.pid) + ]); - recordsAdded = parseInt(result.rows[0].inserted, 10) || 0; - recordsUpdated = parseInt(result.rows[0].updated, 10) || 0; + // Add debug logging + console.log('Insert result:', result?.rows?.[0]); + + // Handle the result properly for PostgreSQL with more defensive coding + const resultRow = result?.rows?.[0] || {}; + const insertCount = parseInt(resultRow.inserted || '0', 10); + const updateCount = parseInt(resultRow.updated || '0', 10); + + recordsAdded += insertCount; + recordsUpdated += updateCount; + totalProcessed += chunk.length; + + // Update progress + outputProgress({ + status: "running", + operation: "Purchase orders final import", + message: `Processed ${totalProcessed} of ${total_pos} purchase orders`, + current: totalProcessed, + total: total_pos, + elapsed: formatElapsedTime((Date.now() - startTime) / 1000), + remaining: estimateRemaining(startTime, totalProcessed, total_pos), + rate: calculateRate(startTime, totalProcessed) + }); + + // Update last processed IDs for next chunk with safety check + if (chunk.length > 0) { + const lastItem = chunk[chunk.length - 1]; + if (lastItem) { + lastPoId = lastItem.po_id; + lastPid = lastItem.pid; + } + } + } // Update sync status await localConnection.query(` @@ -423,4 +645,4 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental } } -module.exports = importPurchaseOrders; \ No newline at end of file +module.exports = importPurchaseOrders; \ No newline at end of file