Sync up POs with receivings, fix and optimize PO import, show total time
This commit is contained in:
@@ -135,18 +135,22 @@ CREATE TABLE purchase_orders (
|
||||
pid BIGINT NOT NULL,
|
||||
sku VARCHAR(50) NOT NULL,
|
||||
cost_price DECIMAL(10, 3) NOT NULL,
|
||||
status VARCHAR(20) DEFAULT 'pending' COMMENT 'canceled,created,electronically_ready_send,ordered,preordered,electronically_sent,receiving_started,closed',
|
||||
status TINYINT UNSIGNED DEFAULT 1 COMMENT '0=canceled,1=created,10=electronically_ready_send,11=ordered,12=preordered,13=electronically_sent,15=receiving_started,50=done',
|
||||
receiving_status TINYINT UNSIGNED DEFAULT 1 COMMENT '0=canceled,1=created,30=partial_received,40=full_received,50=paid',
|
||||
notes TEXT,
|
||||
long_note TEXT,
|
||||
ordered INT NOT NULL,
|
||||
received INT DEFAULT 0,
|
||||
received_date DATE,
|
||||
received_date DATE COMMENT 'Date of first receiving',
|
||||
last_received_date DATE COMMENT 'Date of most recent receiving',
|
||||
received_by INT,
|
||||
receiving_history JSON COMMENT 'Array of receiving records with qty, date, cost, receiving_id, and alt_po flag',
|
||||
FOREIGN KEY (pid) REFERENCES products(pid),
|
||||
FOREIGN KEY (sku) REFERENCES products(SKU),
|
||||
INDEX idx_po_id (po_id),
|
||||
INDEX idx_vendor (vendor),
|
||||
INDEX idx_status (status),
|
||||
INDEX idx_receiving_status (receiving_status),
|
||||
INDEX idx_purchase_orders_metrics (pid, date, status, ordered, received),
|
||||
INDEX idx_po_product_date (pid, date),
|
||||
INDEX idx_po_product_status (pid, status),
|
||||
|
||||
@@ -95,6 +95,19 @@ async function setupSshTunnel() {
|
||||
return new Promise((resolve, reject) => {
|
||||
const ssh = new Client();
|
||||
|
||||
ssh.on('error', (err) => {
|
||||
console.error('SSH connection error:', err);
|
||||
// Don't reject here, just log the error
|
||||
});
|
||||
|
||||
ssh.on('end', () => {
|
||||
console.log('SSH connection ended normally');
|
||||
});
|
||||
|
||||
ssh.on('close', () => {
|
||||
console.log('SSH connection closed');
|
||||
});
|
||||
|
||||
ssh
|
||||
.on("ready", () => {
|
||||
ssh.forwardOut(
|
||||
@@ -304,10 +317,12 @@ async function importProducts(prodConnection, localConnection) {
|
||||
|
||||
const [countResult] = await prodConnection.query(`
|
||||
SELECT COUNT(*) as total
|
||||
FROM order_items oi FORCE INDEX (PRIMARY)
|
||||
JOIN _order o FORCE INDEX (PRIMARY) ON oi.order_id = o.order_id
|
||||
WHERE o.order_status >= 15
|
||||
AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
|
||||
FROM products p
|
||||
LEFT JOIN product_last_sold pls ON p.pid = pls.pid
|
||||
WHERE pls.date_sold >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
|
||||
OR p.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
|
||||
OR p.datein >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
|
||||
OR pls.date_sold IS NULL
|
||||
`);
|
||||
const totalProducts = countResult[0].total;
|
||||
|
||||
@@ -1106,106 +1121,274 @@ async function importPurchaseOrders(prodConnection, localConnection) {
|
||||
const startTime = Date.now();
|
||||
|
||||
try {
|
||||
// First get the column names from the table structure
|
||||
// Get column names for the insert
|
||||
const [columns] = await localConnection.query(`
|
||||
SELECT COLUMN_NAME
|
||||
FROM INFORMATION_SCHEMA.COLUMNS
|
||||
WHERE TABLE_NAME = 'purchase_orders'
|
||||
ORDER BY ORDINAL_POSITION
|
||||
`);
|
||||
|
||||
const columnNames = columns
|
||||
.map((col) => col.COLUMN_NAME)
|
||||
.filter((name) => name !== "id"); // Skip auto-increment ID
|
||||
.filter((name) => name !== "id");
|
||||
|
||||
// Get total count first for progress indication
|
||||
const [countResult] = await prodConnection.query(`
|
||||
// First get all relevant PO IDs with basic info - this is much faster than the full join
|
||||
const [[{ total }]] = await prodConnection.query(`
|
||||
SELECT COUNT(*) as total
|
||||
FROM po_products pop
|
||||
JOIN po ON pop.po_id = po.po_id
|
||||
WHERE po.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
|
||||
FROM (
|
||||
SELECT DISTINCT pop.po_id, pop.pid
|
||||
FROM po p
|
||||
FORCE INDEX (idx_date_created)
|
||||
JOIN po_products pop ON p.po_id = pop.po_id
|
||||
JOIN suppliers s ON p.supplier_id = s.supplierid
|
||||
WHERE p.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
|
||||
UNION
|
||||
SELECT DISTINCT r.receiving_id as po_id, rp.pid
|
||||
FROM receivings_products rp
|
||||
LEFT JOIN receivings r ON r.receiving_id = rp.receiving_id
|
||||
WHERE rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
|
||||
) all_items
|
||||
`);
|
||||
const total = countResult[0].total;
|
||||
|
||||
const [poList] = await prodConnection.query(`
|
||||
SELECT DISTINCT
|
||||
COALESCE(p.po_id, r.receiving_id) as po_id,
|
||||
CASE
|
||||
WHEN p.po_id IS NOT NULL THEN s1.companyname
|
||||
WHEN r.supplier_id IS NOT NULL THEN s2.companyname
|
||||
ELSE 'No Supplier'
|
||||
END as vendor,
|
||||
CASE WHEN p.po_id IS NOT NULL THEN DATE(p.date_ordered) END as date,
|
||||
CASE WHEN p.po_id IS NOT NULL THEN DATE(p.date_estin) END as expected_date,
|
||||
COALESCE(p.status, 50) as status,
|
||||
COALESCE(p.short_note, '') as notes,
|
||||
COALESCE(p.notes, '') as long_note
|
||||
FROM (
|
||||
SELECT po_id FROM po
|
||||
WHERE date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
|
||||
UNION
|
||||
SELECT DISTINCT r.receiving_id as po_id
|
||||
FROM receivings r
|
||||
JOIN receivings_products rp ON r.receiving_id = rp.receiving_id
|
||||
WHERE rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
|
||||
) ids
|
||||
LEFT JOIN po p ON ids.po_id = p.po_id
|
||||
LEFT JOIN suppliers s1 ON p.supplier_id = s1.supplierid
|
||||
LEFT JOIN receivings r ON ids.po_id = r.receiving_id
|
||||
LEFT JOIN suppliers s2 ON r.supplier_id = s2.supplierid
|
||||
ORDER BY po_id
|
||||
`);
|
||||
|
||||
const totalItems = total;
|
||||
let processed = 0;
|
||||
|
||||
const BATCH_SIZE = 5000;
|
||||
const PROGRESS_INTERVAL = 500;
|
||||
let lastProgressUpdate = Date.now();
|
||||
|
||||
outputProgress({
|
||||
operation: `Starting purchase orders import - Fetching ${total} PO items from production`,
|
||||
operation: `Starting purchase orders import - Processing ${totalItems} purchase order items`,
|
||||
status: "running",
|
||||
});
|
||||
|
||||
// Process in batches
|
||||
const batchSize = 1000;
|
||||
let processed = 0;
|
||||
let offset = 0;
|
||||
for (let i = 0; i < poList.length; i += BATCH_SIZE) {
|
||||
const batch = poList.slice(i, Math.min(i + BATCH_SIZE, poList.length));
|
||||
const poIds = batch.map(po => po.po_id);
|
||||
|
||||
while (offset < total) {
|
||||
const [purchaseOrders] = await prodConnection.query(`
|
||||
// Get all products for these POs in one query
|
||||
const [poProducts] = await prodConnection.query(`
|
||||
SELECT
|
||||
pop.po_id,
|
||||
s.companyname as vendor,
|
||||
DATE(po.date_ordered) as date,
|
||||
DATE(po.date_estin) as expected_date,
|
||||
pop.pid,
|
||||
p.itemnumber as sku,
|
||||
COALESCE(rp.cost_each, pop.cost_each) as cost_price,
|
||||
po.status,
|
||||
COALESCE(po.short_note, '') as notes,
|
||||
COALESCE(po.notes, '') as long_note,
|
||||
pop.qty_each as ordered,
|
||||
COALESCE(rp.qty_each, 0) as received,
|
||||
DATE(rp.received_date) as received_date,
|
||||
CAST(NULLIF(rp.received_by, '') AS SIGNED) as received_by
|
||||
pr.itemnumber as sku,
|
||||
pop.cost_each as cost_price,
|
||||
pop.qty_each as ordered
|
||||
FROM po_products pop
|
||||
JOIN po ON pop.po_id = po.po_id
|
||||
JOIN products p ON pop.pid = p.pid
|
||||
JOIN suppliers s ON po.supplier_id = s.supplierid
|
||||
LEFT JOIN receivings r ON po.po_id = r.po_id
|
||||
LEFT JOIN receivings_products rp ON r.receiving_id = rp.receiving_id
|
||||
AND pop.pid = rp.pid
|
||||
WHERE po.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
|
||||
LIMIT ? OFFSET ?
|
||||
`, [batchSize, offset]);
|
||||
FORCE INDEX (PRIMARY)
|
||||
JOIN products pr ON pop.pid = pr.pid
|
||||
WHERE pop.po_id IN (?)
|
||||
`, [poIds]);
|
||||
|
||||
if (purchaseOrders.length > 0) {
|
||||
const placeholders = purchaseOrders
|
||||
.map(() => `(${Array(columnNames.length).fill("?").join(",")})`)
|
||||
.join(",");
|
||||
const updateClauses = columnNames
|
||||
.filter((col) => col !== "po_id") // Don't update primary key
|
||||
.map((col) => `${col} = VALUES(${col})`)
|
||||
.join(",");
|
||||
// Process PO products in smaller sub-batches to avoid packet size issues
|
||||
const SUB_BATCH_SIZE = 5000;
|
||||
for (let j = 0; j < poProducts.length; j += SUB_BATCH_SIZE) {
|
||||
const productBatch = poProducts.slice(j, j + SUB_BATCH_SIZE);
|
||||
const productPids = [...new Set(productBatch.map(p => p.pid))];
|
||||
const batchPoIds = [...new Set(productBatch.map(p => p.po_id))];
|
||||
|
||||
const query = `
|
||||
INSERT INTO purchase_orders (${columnNames.join(",")})
|
||||
VALUES ${placeholders}
|
||||
ON DUPLICATE KEY UPDATE ${updateClauses}
|
||||
`;
|
||||
// Get receivings for this batch
|
||||
const [receivings] = await prodConnection.query(`
|
||||
SELECT
|
||||
r.po_id,
|
||||
rp.pid,
|
||||
rp.receiving_id,
|
||||
rp.qty_each,
|
||||
rp.cost_each,
|
||||
DATE(NULLIF(rp.received_date, '0000-00-00 00:00:00')) as received_date,
|
||||
rp.received_by,
|
||||
CASE
|
||||
WHEN r.po_id IS NULL THEN 2 -- No PO
|
||||
WHEN r.po_id IN (?) THEN 0 -- Original PO
|
||||
ELSE 1 -- Different PO
|
||||
END as is_alt_po
|
||||
FROM receivings_products rp
|
||||
LEFT JOIN receivings r ON r.receiving_id = rp.receiving_id
|
||||
WHERE rp.pid IN (?)
|
||||
AND rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
|
||||
ORDER BY r.po_id, rp.pid, rp.received_date
|
||||
`, [batchPoIds, productPids]);
|
||||
|
||||
await localConnection.query(
|
||||
query,
|
||||
purchaseOrders.flatMap(po => columnNames.map(col => po[col]))
|
||||
// Create maps for this sub-batch
|
||||
const poProductMap = new Map();
|
||||
productBatch.forEach(product => {
|
||||
const key = `${product.po_id}-${product.pid}`;
|
||||
poProductMap.set(key, product);
|
||||
});
|
||||
|
||||
const receivingMap = new Map();
|
||||
const altReceivingMap = new Map();
|
||||
const noPOReceivingMap = new Map();
|
||||
|
||||
receivings.forEach(receiving => {
|
||||
const key = `${receiving.po_id}-${receiving.pid}`;
|
||||
if (receiving.is_alt_po === 2) {
|
||||
// No PO
|
||||
if (!noPOReceivingMap.has(receiving.pid)) {
|
||||
noPOReceivingMap.set(receiving.pid, []);
|
||||
}
|
||||
noPOReceivingMap.get(receiving.pid).push(receiving);
|
||||
} else if (receiving.is_alt_po === 1) {
|
||||
// Different PO
|
||||
if (!altReceivingMap.has(receiving.pid)) {
|
||||
altReceivingMap.set(receiving.pid, []);
|
||||
}
|
||||
altReceivingMap.get(receiving.pid).push(receiving);
|
||||
} else {
|
||||
// Original PO
|
||||
if (!receivingMap.has(key)) {
|
||||
receivingMap.set(key, []);
|
||||
}
|
||||
receivingMap.get(key).push(receiving);
|
||||
}
|
||||
});
|
||||
|
||||
// Verify PIDs exist
|
||||
const [existingPids] = await localConnection.query(
|
||||
'SELECT pid FROM products WHERE pid IN (?)',
|
||||
[productPids]
|
||||
);
|
||||
const validPids = new Set(existingPids.map(p => p.pid));
|
||||
|
||||
processed += purchaseOrders.length;
|
||||
offset += batchSize;
|
||||
// Prepare values for this sub-batch
|
||||
const values = [];
|
||||
let batchProcessed = 0;
|
||||
|
||||
updateProgress(
|
||||
processed,
|
||||
total,
|
||||
"Purchase orders import",
|
||||
startTime
|
||||
);
|
||||
} else {
|
||||
break;
|
||||
for (const po of batch) {
|
||||
const poProducts = Array.from(poProductMap.values())
|
||||
.filter(p => p.po_id === po.po_id && validPids.has(p.pid));
|
||||
|
||||
for (const product of poProducts) {
|
||||
const key = `${po.po_id}-${product.pid}`;
|
||||
const receivingHistory = receivingMap.get(key) || [];
|
||||
const altReceivingHistory = altReceivingMap.get(product.pid) || [];
|
||||
const noPOReceivingHistory = noPOReceivingMap.get(product.pid) || [];
|
||||
|
||||
const received = receivingHistory.reduce((sum, r) => sum + r.qty_each, 0);
|
||||
const altReceived = altReceivingHistory.reduce((sum, r) => sum + r.qty_each, 0);
|
||||
const noPOReceived = noPOReceivingHistory.reduce((sum, r) => sum + r.qty_each, 0);
|
||||
const totalReceived = received + altReceived + noPOReceived;
|
||||
|
||||
const receiving_status = !totalReceived ? 1 : // created
|
||||
totalReceived < product.ordered ? 30 : // partial
|
||||
40; // full
|
||||
|
||||
const allReceivings = [...receivingHistory];
|
||||
if (altReceivingHistory.length > 0) {
|
||||
allReceivings.push(...altReceivingHistory);
|
||||
}
|
||||
if (noPOReceivingHistory.length > 0) {
|
||||
allReceivings.push(...noPOReceivingHistory);
|
||||
}
|
||||
allReceivings.sort((a, b) => new Date(a.received_date) - new Date(b.received_date));
|
||||
|
||||
const firstReceiving = allReceivings[0] || {};
|
||||
const lastReceiving = allReceivings[allReceivings.length - 1] || {};
|
||||
|
||||
values.push(columnNames.map(col => {
|
||||
switch (col) {
|
||||
case 'po_id': return po.po_id;
|
||||
case 'vendor': return po.vendor;
|
||||
case 'date': return po.date;
|
||||
case 'expected_date': return po.expected_date;
|
||||
case 'pid': return product.pid;
|
||||
case 'sku': return product.sku;
|
||||
case 'cost_price': return product.cost_price;
|
||||
case 'status': return po.status;
|
||||
case 'notes': return po.notes;
|
||||
case 'long_note': return po.long_note;
|
||||
case 'ordered': return product.ordered;
|
||||
case 'received': return totalReceived;
|
||||
case 'received_date': return firstReceiving.received_date || null;
|
||||
case 'last_received_date': return lastReceiving.received_date || null;
|
||||
case 'received_by': return firstReceiving.received_by || null;
|
||||
case 'receiving_status': return receiving_status;
|
||||
case 'receiving_history': return JSON.stringify(allReceivings.map(r => ({
|
||||
receiving_id: r.receiving_id,
|
||||
qty: r.qty_each,
|
||||
cost: r.cost_each,
|
||||
date: r.received_date,
|
||||
received_by: r.received_by,
|
||||
alt_po: r.is_alt_po
|
||||
})));
|
||||
default: return null;
|
||||
}
|
||||
}));
|
||||
batchProcessed++;
|
||||
}
|
||||
}
|
||||
|
||||
if (values.length > 0) {
|
||||
const placeholders = values.map(() =>
|
||||
`(${Array(columnNames.length).fill("?").join(",")})`
|
||||
).join(",");
|
||||
|
||||
const query = `
|
||||
INSERT INTO purchase_orders (${columnNames.join(",")})
|
||||
VALUES ${placeholders}
|
||||
ON DUPLICATE KEY UPDATE ${columnNames
|
||||
.filter((col) => col !== "po_id" && col !== "pid")
|
||||
.map((col) => `${col} = VALUES(${col})`)
|
||||
.join(",")};
|
||||
`;
|
||||
|
||||
await localConnection.query(query, values.flat());
|
||||
}
|
||||
|
||||
processed += batchProcessed;
|
||||
|
||||
// Update progress based on time interval
|
||||
const now = Date.now();
|
||||
if (now - lastProgressUpdate >= PROGRESS_INTERVAL || processed === totalItems) {
|
||||
updateProgress(processed, totalItems, "Purchase orders import", startTime);
|
||||
lastProgressUpdate = now;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const endTime = Date.now();
|
||||
outputProgress({
|
||||
operation: `Purchase orders import complete in ${Math.round(
|
||||
(endTime - startTime) / 1000
|
||||
)}s - Imported ${processed} PO items`,
|
||||
operation: `Purchase orders import complete`,
|
||||
status: "complete",
|
||||
processed_records: processed,
|
||||
total_records: totalItems,
|
||||
timing: {
|
||||
start_time: new Date(startTime).toISOString(),
|
||||
end_time: new Date(endTime).toISOString(),
|
||||
elapsed_time: formatDuration((endTime - startTime) / 1000),
|
||||
elapsed_seconds: Math.round((endTime - startTime) / 1000)
|
||||
}
|
||||
});
|
||||
|
||||
} catch (error) {
|
||||
outputProgress({
|
||||
operation: "Purchase orders import failed",
|
||||
@@ -1230,7 +1413,6 @@ async function main() {
|
||||
message: "Setting up connections...",
|
||||
});
|
||||
|
||||
// Set up connections
|
||||
const tunnel = await setupSshTunnel();
|
||||
ssh = tunnel.ssh;
|
||||
|
||||
@@ -1239,7 +1421,12 @@ async function main() {
|
||||
stream: tunnel.stream,
|
||||
});
|
||||
|
||||
localConnection = await mysql.createPool(localDbConfig);
|
||||
localConnection = await mysql.createPool({
|
||||
...localDbConfig,
|
||||
waitForConnections: true,
|
||||
connectionLimit: 10,
|
||||
queueLimit: 0
|
||||
});
|
||||
|
||||
if (isImportCancelled) throw new Error("Import cancelled");
|
||||
|
||||
@@ -1277,9 +1464,24 @@ async function main() {
|
||||
});
|
||||
throw error;
|
||||
} finally {
|
||||
if (prodConnection) await prodConnection.end();
|
||||
if (localConnection) await localConnection.end();
|
||||
if (ssh) ssh.end();
|
||||
try {
|
||||
// Close connections in order
|
||||
if (prodConnection) await prodConnection.end();
|
||||
if (localConnection) await localConnection.end();
|
||||
|
||||
// Wait a bit for any pending data to be written before closing SSH
|
||||
await new Promise(resolve => setTimeout(resolve, 100));
|
||||
|
||||
if (ssh) {
|
||||
// Properly close the SSH connection
|
||||
ssh.on('close', () => {
|
||||
console.log('SSH connection closed cleanly');
|
||||
});
|
||||
ssh.end();
|
||||
}
|
||||
} catch (err) {
|
||||
console.error('Error during cleanup:', err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user