diff --git a/inventory-server/db/schema.sql b/inventory-server/db/schema.sql index 9835edf..b65820d 100644 --- a/inventory-server/db/schema.sql +++ b/inventory-server/db/schema.sql @@ -135,18 +135,22 @@ CREATE TABLE purchase_orders ( pid BIGINT NOT NULL, sku VARCHAR(50) NOT NULL, cost_price DECIMAL(10, 3) NOT NULL, - status VARCHAR(20) DEFAULT 'pending' COMMENT 'canceled,created,electronically_ready_send,ordered,preordered,electronically_sent,receiving_started,closed', + status TINYINT UNSIGNED DEFAULT 1 COMMENT '0=canceled,1=created,10=electronically_ready_send,11=ordered,12=preordered,13=electronically_sent,15=receiving_started,50=done', + receiving_status TINYINT UNSIGNED DEFAULT 1 COMMENT '0=canceled,1=created,30=partial_received,40=full_received,50=paid', notes TEXT, long_note TEXT, ordered INT NOT NULL, received INT DEFAULT 0, - received_date DATE, + received_date DATE COMMENT 'Date of first receiving', + last_received_date DATE COMMENT 'Date of most recent receiving', received_by INT, + receiving_history JSON COMMENT 'Array of receiving records with qty, date, cost, receiving_id, and alt_po flag', FOREIGN KEY (pid) REFERENCES products(pid), FOREIGN KEY (sku) REFERENCES products(SKU), INDEX idx_po_id (po_id), INDEX idx_vendor (vendor), INDEX idx_status (status), + INDEX idx_receiving_status (receiving_status), INDEX idx_purchase_orders_metrics (pid, date, status, ordered, received), INDEX idx_po_product_date (pid, date), INDEX idx_po_product_status (pid, status), diff --git a/inventory-server/scripts/import-from-prod.js b/inventory-server/scripts/import-from-prod.js index 06aea98..42964a5 100644 --- a/inventory-server/scripts/import-from-prod.js +++ b/inventory-server/scripts/import-from-prod.js @@ -95,6 +95,19 @@ async function setupSshTunnel() { return new Promise((resolve, reject) => { const ssh = new Client(); + ssh.on('error', (err) => { + console.error('SSH connection error:', err); + // Don't reject here, just log the error + }); + + ssh.on('end', () => { + console.log('SSH connection ended normally'); + }); + + ssh.on('close', () => { + console.log('SSH connection closed'); + }); + ssh .on("ready", () => { ssh.forwardOut( @@ -304,10 +317,12 @@ async function importProducts(prodConnection, localConnection) { const [countResult] = await prodConnection.query(` SELECT COUNT(*) as total - FROM order_items oi FORCE INDEX (PRIMARY) - JOIN _order o FORCE INDEX (PRIMARY) ON oi.order_id = o.order_id - WHERE o.order_status >= 15 - AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) + FROM products p + LEFT JOIN product_last_sold pls ON p.pid = pls.pid + WHERE pls.date_sold >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) + OR p.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) + OR p.datein >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) + OR pls.date_sold IS NULL `); const totalProducts = countResult[0].total; @@ -1106,106 +1121,274 @@ async function importPurchaseOrders(prodConnection, localConnection) { const startTime = Date.now(); try { - // First get the column names from the table structure + // Get column names for the insert const [columns] = await localConnection.query(` SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'purchase_orders' ORDER BY ORDINAL_POSITION `); - const columnNames = columns .map((col) => col.COLUMN_NAME) - .filter((name) => name !== "id"); // Skip auto-increment ID + .filter((name) => name !== "id"); - // Get total count first for progress indication - const [countResult] = await prodConnection.query(` + // First get all relevant PO IDs with basic info - this is much faster than the full join + const [[{ total }]] = await prodConnection.query(` SELECT COUNT(*) as total - FROM po_products pop - JOIN po ON pop.po_id = po.po_id - WHERE po.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) + FROM ( + SELECT DISTINCT pop.po_id, pop.pid + FROM po p + FORCE INDEX (idx_date_created) + JOIN po_products pop ON p.po_id = pop.po_id + JOIN suppliers s ON p.supplier_id = s.supplierid + WHERE p.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) + UNION + SELECT DISTINCT r.receiving_id as po_id, rp.pid + FROM receivings_products rp + LEFT JOIN receivings r ON r.receiving_id = rp.receiving_id + WHERE rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) + ) all_items `); - const total = countResult[0].total; + + const [poList] = await prodConnection.query(` + SELECT DISTINCT + COALESCE(p.po_id, r.receiving_id) as po_id, + CASE + WHEN p.po_id IS NOT NULL THEN s1.companyname + WHEN r.supplier_id IS NOT NULL THEN s2.companyname + ELSE 'No Supplier' + END as vendor, + CASE WHEN p.po_id IS NOT NULL THEN DATE(p.date_ordered) END as date, + CASE WHEN p.po_id IS NOT NULL THEN DATE(p.date_estin) END as expected_date, + COALESCE(p.status, 50) as status, + COALESCE(p.short_note, '') as notes, + COALESCE(p.notes, '') as long_note + FROM ( + SELECT po_id FROM po + WHERE date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) + UNION + SELECT DISTINCT r.receiving_id as po_id + FROM receivings r + JOIN receivings_products rp ON r.receiving_id = rp.receiving_id + WHERE rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) + ) ids + LEFT JOIN po p ON ids.po_id = p.po_id + LEFT JOIN suppliers s1 ON p.supplier_id = s1.supplierid + LEFT JOIN receivings r ON ids.po_id = r.receiving_id + LEFT JOIN suppliers s2 ON r.supplier_id = s2.supplierid + ORDER BY po_id + `); + + const totalItems = total; + let processed = 0; + + const BATCH_SIZE = 5000; + const PROGRESS_INTERVAL = 500; + let lastProgressUpdate = Date.now(); outputProgress({ - operation: `Starting purchase orders import - Fetching ${total} PO items from production`, + operation: `Starting purchase orders import - Processing ${totalItems} purchase order items`, status: "running", }); - // Process in batches - const batchSize = 1000; - let processed = 0; - let offset = 0; + for (let i = 0; i < poList.length; i += BATCH_SIZE) { + const batch = poList.slice(i, Math.min(i + BATCH_SIZE, poList.length)); + const poIds = batch.map(po => po.po_id); - while (offset < total) { - const [purchaseOrders] = await prodConnection.query(` + // Get all products for these POs in one query + const [poProducts] = await prodConnection.query(` SELECT pop.po_id, - s.companyname as vendor, - DATE(po.date_ordered) as date, - DATE(po.date_estin) as expected_date, pop.pid, - p.itemnumber as sku, - COALESCE(rp.cost_each, pop.cost_each) as cost_price, - po.status, - COALESCE(po.short_note, '') as notes, - COALESCE(po.notes, '') as long_note, - pop.qty_each as ordered, - COALESCE(rp.qty_each, 0) as received, - DATE(rp.received_date) as received_date, - CAST(NULLIF(rp.received_by, '') AS SIGNED) as received_by + pr.itemnumber as sku, + pop.cost_each as cost_price, + pop.qty_each as ordered FROM po_products pop - JOIN po ON pop.po_id = po.po_id - JOIN products p ON pop.pid = p.pid - JOIN suppliers s ON po.supplier_id = s.supplierid - LEFT JOIN receivings r ON po.po_id = r.po_id - LEFT JOIN receivings_products rp ON r.receiving_id = rp.receiving_id - AND pop.pid = rp.pid - WHERE po.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) - LIMIT ? OFFSET ? - `, [batchSize, offset]); + FORCE INDEX (PRIMARY) + JOIN products pr ON pop.pid = pr.pid + WHERE pop.po_id IN (?) + `, [poIds]); - if (purchaseOrders.length > 0) { - const placeholders = purchaseOrders - .map(() => `(${Array(columnNames.length).fill("?").join(",")})`) - .join(","); - const updateClauses = columnNames - .filter((col) => col !== "po_id") // Don't update primary key - .map((col) => `${col} = VALUES(${col})`) - .join(","); + // Process PO products in smaller sub-batches to avoid packet size issues + const SUB_BATCH_SIZE = 5000; + for (let j = 0; j < poProducts.length; j += SUB_BATCH_SIZE) { + const productBatch = poProducts.slice(j, j + SUB_BATCH_SIZE); + const productPids = [...new Set(productBatch.map(p => p.pid))]; + const batchPoIds = [...new Set(productBatch.map(p => p.po_id))]; - const query = ` - INSERT INTO purchase_orders (${columnNames.join(",")}) - VALUES ${placeholders} - ON DUPLICATE KEY UPDATE ${updateClauses} - `; + // Get receivings for this batch + const [receivings] = await prodConnection.query(` + SELECT + r.po_id, + rp.pid, + rp.receiving_id, + rp.qty_each, + rp.cost_each, + DATE(NULLIF(rp.received_date, '0000-00-00 00:00:00')) as received_date, + rp.received_by, + CASE + WHEN r.po_id IS NULL THEN 2 -- No PO + WHEN r.po_id IN (?) THEN 0 -- Original PO + ELSE 1 -- Different PO + END as is_alt_po + FROM receivings_products rp + LEFT JOIN receivings r ON r.receiving_id = rp.receiving_id + WHERE rp.pid IN (?) + AND rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) + ORDER BY r.po_id, rp.pid, rp.received_date + `, [batchPoIds, productPids]); - await localConnection.query( - query, - purchaseOrders.flatMap(po => columnNames.map(col => po[col])) + // Create maps for this sub-batch + const poProductMap = new Map(); + productBatch.forEach(product => { + const key = `${product.po_id}-${product.pid}`; + poProductMap.set(key, product); + }); + + const receivingMap = new Map(); + const altReceivingMap = new Map(); + const noPOReceivingMap = new Map(); + + receivings.forEach(receiving => { + const key = `${receiving.po_id}-${receiving.pid}`; + if (receiving.is_alt_po === 2) { + // No PO + if (!noPOReceivingMap.has(receiving.pid)) { + noPOReceivingMap.set(receiving.pid, []); + } + noPOReceivingMap.get(receiving.pid).push(receiving); + } else if (receiving.is_alt_po === 1) { + // Different PO + if (!altReceivingMap.has(receiving.pid)) { + altReceivingMap.set(receiving.pid, []); + } + altReceivingMap.get(receiving.pid).push(receiving); + } else { + // Original PO + if (!receivingMap.has(key)) { + receivingMap.set(key, []); + } + receivingMap.get(key).push(receiving); + } + }); + + // Verify PIDs exist + const [existingPids] = await localConnection.query( + 'SELECT pid FROM products WHERE pid IN (?)', + [productPids] ); + const validPids = new Set(existingPids.map(p => p.pid)); - processed += purchaseOrders.length; - offset += batchSize; + // Prepare values for this sub-batch + const values = []; + let batchProcessed = 0; - updateProgress( - processed, - total, - "Purchase orders import", - startTime - ); - } else { - break; + for (const po of batch) { + const poProducts = Array.from(poProductMap.values()) + .filter(p => p.po_id === po.po_id && validPids.has(p.pid)); + + for (const product of poProducts) { + const key = `${po.po_id}-${product.pid}`; + const receivingHistory = receivingMap.get(key) || []; + const altReceivingHistory = altReceivingMap.get(product.pid) || []; + const noPOReceivingHistory = noPOReceivingMap.get(product.pid) || []; + + const received = receivingHistory.reduce((sum, r) => sum + r.qty_each, 0); + const altReceived = altReceivingHistory.reduce((sum, r) => sum + r.qty_each, 0); + const noPOReceived = noPOReceivingHistory.reduce((sum, r) => sum + r.qty_each, 0); + const totalReceived = received + altReceived + noPOReceived; + + const receiving_status = !totalReceived ? 1 : // created + totalReceived < product.ordered ? 30 : // partial + 40; // full + + const allReceivings = [...receivingHistory]; + if (altReceivingHistory.length > 0) { + allReceivings.push(...altReceivingHistory); + } + if (noPOReceivingHistory.length > 0) { + allReceivings.push(...noPOReceivingHistory); + } + allReceivings.sort((a, b) => new Date(a.received_date) - new Date(b.received_date)); + + const firstReceiving = allReceivings[0] || {}; + const lastReceiving = allReceivings[allReceivings.length - 1] || {}; + + values.push(columnNames.map(col => { + switch (col) { + case 'po_id': return po.po_id; + case 'vendor': return po.vendor; + case 'date': return po.date; + case 'expected_date': return po.expected_date; + case 'pid': return product.pid; + case 'sku': return product.sku; + case 'cost_price': return product.cost_price; + case 'status': return po.status; + case 'notes': return po.notes; + case 'long_note': return po.long_note; + case 'ordered': return product.ordered; + case 'received': return totalReceived; + case 'received_date': return firstReceiving.received_date || null; + case 'last_received_date': return lastReceiving.received_date || null; + case 'received_by': return firstReceiving.received_by || null; + case 'receiving_status': return receiving_status; + case 'receiving_history': return JSON.stringify(allReceivings.map(r => ({ + receiving_id: r.receiving_id, + qty: r.qty_each, + cost: r.cost_each, + date: r.received_date, + received_by: r.received_by, + alt_po: r.is_alt_po + }))); + default: return null; + } + })); + batchProcessed++; + } + } + + if (values.length > 0) { + const placeholders = values.map(() => + `(${Array(columnNames.length).fill("?").join(",")})` + ).join(","); + + const query = ` + INSERT INTO purchase_orders (${columnNames.join(",")}) + VALUES ${placeholders} + ON DUPLICATE KEY UPDATE ${columnNames + .filter((col) => col !== "po_id" && col !== "pid") + .map((col) => `${col} = VALUES(${col})`) + .join(",")}; + `; + + await localConnection.query(query, values.flat()); + } + + processed += batchProcessed; + + // Update progress based on time interval + const now = Date.now(); + if (now - lastProgressUpdate >= PROGRESS_INTERVAL || processed === totalItems) { + updateProgress(processed, totalItems, "Purchase orders import", startTime); + lastProgressUpdate = now; + } } } const endTime = Date.now(); outputProgress({ - operation: `Purchase orders import complete in ${Math.round( - (endTime - startTime) / 1000 - )}s - Imported ${processed} PO items`, + operation: `Purchase orders import complete`, status: "complete", + processed_records: processed, + total_records: totalItems, + timing: { + start_time: new Date(startTime).toISOString(), + end_time: new Date(endTime).toISOString(), + elapsed_time: formatDuration((endTime - startTime) / 1000), + elapsed_seconds: Math.round((endTime - startTime) / 1000) + } }); + } catch (error) { outputProgress({ operation: "Purchase orders import failed", @@ -1230,7 +1413,6 @@ async function main() { message: "Setting up connections...", }); - // Set up connections const tunnel = await setupSshTunnel(); ssh = tunnel.ssh; @@ -1239,7 +1421,12 @@ async function main() { stream: tunnel.stream, }); - localConnection = await mysql.createPool(localDbConfig); + localConnection = await mysql.createPool({ + ...localDbConfig, + waitForConnections: true, + connectionLimit: 10, + queueLimit: 0 + }); if (isImportCancelled) throw new Error("Import cancelled"); @@ -1277,9 +1464,24 @@ async function main() { }); throw error; } finally { - if (prodConnection) await prodConnection.end(); - if (localConnection) await localConnection.end(); - if (ssh) ssh.end(); + try { + // Close connections in order + if (prodConnection) await prodConnection.end(); + if (localConnection) await localConnection.end(); + + // Wait a bit for any pending data to be written before closing SSH + await new Promise(resolve => setTimeout(resolve, 100)); + + if (ssh) { + // Properly close the SSH connection + ssh.on('close', () => { + console.log('SSH connection closed cleanly'); + }); + ssh.end(); + } + } catch (err) { + console.error('Error during cleanup:', err); + } } }