Update import scripts through POs

This commit is contained in:
2025-02-17 22:17:01 -05:00
parent a8d3fd8033
commit ca2653ea1a
2 changed files with 342 additions and 120 deletions

View File

@@ -10,9 +10,9 @@ const importPurchaseOrders = require('./import/purchase-orders');
dotenv.config({ path: path.join(__dirname, "../.env") }); dotenv.config({ path: path.join(__dirname, "../.env") });
// Constants to control which imports run // Constants to control which imports run
const IMPORT_CATEGORIES = true; const IMPORT_CATEGORIES = false;
const IMPORT_PRODUCTS = true; const IMPORT_PRODUCTS = false;
const IMPORT_ORDERS = true; const IMPORT_ORDERS = false;
const IMPORT_PURCHASE_ORDERS = true; const IMPORT_PURCHASE_ORDERS = true;
// Add flag for incremental updates // Add flag for incremental updates

View File

@@ -22,11 +22,15 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
CREATE TEMP TABLE temp_purchase_orders ( CREATE TEMP TABLE temp_purchase_orders (
po_id INTEGER NOT NULL, po_id INTEGER NOT NULL,
pid INTEGER NOT NULL, pid INTEGER NOT NULL,
sku VARCHAR(50),
name VARCHAR(255),
vendor VARCHAR(255), vendor VARCHAR(255),
date DATE, date DATE,
expected_date DATE, expected_date DATE,
status INTEGER, status INTEGER,
notes TEXT, notes TEXT,
ordered INTEGER,
cost_price DECIMAL(10,3),
PRIMARY KEY (po_id, pid) PRIMARY KEY (po_id, pid)
); );
@@ -224,7 +228,7 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
const productPids = [...new Set(productBatch.map(p => p.pid))]; const productPids = [...new Set(productBatch.map(p => p.pid))];
const batchPoIds = [...new Set(productBatch.map(p => p.po_id))]; const batchPoIds = [...new Set(productBatch.map(p => p.po_id))];
// Get receivings for this batch with employee names - Keep MySQL compatible for production // Get receivings for this batch with employee names
const [receivings] = await prodConnection.query(` const [receivings] = await prodConnection.query(`
SELECT SELECT
r.po_id, r.po_id,
@@ -251,142 +255,360 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
// Insert receivings into temp table // Insert receivings into temp table
if (receivings.length > 0) { if (receivings.length > 0) {
const placeholders = receivings.map((_, idx) => { // Process in smaller chunks to avoid parameter limits
const base = idx * 9; const CHUNK_SIZE = 100; // Reduce chunk size to avoid parameter limits
return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9})`; for (let i = 0; i < receivings.length; i += CHUNK_SIZE) {
}).join(','); const chunk = receivings.slice(i, Math.min(i + CHUNK_SIZE, receivings.length));
const values = receivings.flatMap(r => [ const values = [];
r.po_id, const placeholders = [];
r.pid,
r.receiving_id,
r.qty_each,
r.cost_each,
r.received_date,
r.received_by,
r.received_by_name,
r.is_alt_po
]);
await localConnection.query(` chunk.forEach((r, idx) => {
INSERT INTO temp_po_receivings ( values.push(
po_id, pid, receiving_id, qty_each, cost_each, received_date, r.po_id,
received_by, received_by_name, is_alt_po r.pid,
) r.receiving_id,
VALUES ${placeholders} r.qty_each,
ON CONFLICT (receiving_id, pid) DO UPDATE SET r.cost_each,
po_id = EXCLUDED.po_id, r.received_date,
qty_each = EXCLUDED.qty_each, r.received_by,
cost_each = EXCLUDED.cost_each, r.received_by_name || null,
received_date = EXCLUDED.received_date, r.is_alt_po
received_by = EXCLUDED.received_by, );
received_by_name = EXCLUDED.received_by_name,
is_alt_po = EXCLUDED.is_alt_po const offset = idx * 9;
`, values); placeholders.push(`($${offset + 1}, $${offset + 2}, $${offset + 3}, $${offset + 4}, $${offset + 5}, $${offset + 6}, $${offset + 7}, $${offset + 8}, $${offset + 9})`);
});
await localConnection.query(`
INSERT INTO temp_po_receivings (
po_id, pid, receiving_id, qty_each, cost_each, received_date,
received_by, received_by_name, is_alt_po
)
VALUES ${placeholders.join(',')}
ON CONFLICT (receiving_id, pid) DO UPDATE SET
po_id = EXCLUDED.po_id,
qty_each = EXCLUDED.qty_each,
cost_each = EXCLUDED.cost_each,
received_date = EXCLUDED.received_date,
received_by = EXCLUDED.received_by,
received_by_name = EXCLUDED.received_by_name,
is_alt_po = EXCLUDED.is_alt_po
`, values);
}
} }
// Process each PO product // Process each PO product in chunks
for (const product of productBatch) { const PRODUCT_CHUNK_SIZE = 100;
const po = batch.find(p => p.po_id === product.po_id); for (let i = 0; i < productBatch.length; i += PRODUCT_CHUNK_SIZE) {
if (!po) continue; const chunk = productBatch.slice(i, Math.min(i + PRODUCT_CHUNK_SIZE, productBatch.length));
const values = [];
const placeholders = [];
// Insert into temp_purchase_orders chunk.forEach((product, idx) => {
const placeholders = `($1, $2, $3, $4, $5, $6, $7)`; const po = batch.find(p => p.po_id === product.po_id);
const values = [ if (!po) return;
product.po_id,
product.pid,
po.vendor,
po.date,
po.expected_date,
po.status,
po.notes || po.long_note
];
await localConnection.query(` values.push(
INSERT INTO temp_purchase_orders ( product.po_id,
po_id, pid, vendor, date, expected_date, status, notes product.pid,
) product.sku,
VALUES ${placeholders} product.name,
ON CONFLICT (po_id, pid) DO UPDATE SET po.vendor,
vendor = EXCLUDED.vendor, po.date,
date = EXCLUDED.date, po.expected_date,
expected_date = EXCLUDED.expected_date, po.status,
status = EXCLUDED.status, po.notes || po.long_note,
notes = EXCLUDED.notes product.ordered,
`, values); product.cost_each
);
processed++; const offset = idx * 11; // Updated to match 11 fields
placeholders.push(`($${offset + 1}, $${offset + 2}, $${offset + 3}, $${offset + 4}, $${offset + 5}, $${offset + 6}, $${offset + 7}, $${offset + 8}, $${offset + 9}, $${offset + 10}, $${offset + 11})`);
});
// Update progress periodically if (placeholders.length > 0) {
if (Date.now() - lastProgressUpdate > PROGRESS_INTERVAL) { await localConnection.query(`
INSERT INTO temp_purchase_orders (
po_id, pid, sku, name, vendor, date, expected_date,
status, notes, ordered, cost_price
)
VALUES ${placeholders.join(',')}
ON CONFLICT (po_id, pid) DO UPDATE SET
sku = EXCLUDED.sku,
name = EXCLUDED.name,
vendor = EXCLUDED.vendor,
date = EXCLUDED.date,
expected_date = EXCLUDED.expected_date,
status = EXCLUDED.status,
notes = EXCLUDED.notes,
ordered = EXCLUDED.ordered,
cost_price = EXCLUDED.cost_price
`, values);
}
processed += chunk.length;
// Update progress based on time interval
const now = Date.now();
if (now - lastProgressUpdate >= PROGRESS_INTERVAL || processed === totalItems) {
outputProgress({ outputProgress({
status: "running", status: "running",
operation: "Purchase orders import", operation: "Purchase orders import",
message: `Processing purchase orders: ${processed} of ${totalItems}`,
current: processed, current: processed,
total: totalItems, total: totalItems,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000), elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, processed, totalItems), remaining: estimateRemaining(startTime, processed, totalItems),
rate: calculateRate(startTime, processed) rate: calculateRate(startTime, processed)
}); });
lastProgressUpdate = Date.now(); lastProgressUpdate = now;
} }
} }
} }
} }
// Insert final data into purchase_orders table // Insert final data into purchase_orders table in chunks
const [result] = await localConnection.query(` const FINAL_CHUNK_SIZE = 1000;
WITH inserted_pos AS ( let totalProcessed = 0;
INSERT INTO purchase_orders ( const totalPosResult = await localConnection.query('SELECT COUNT(*) as total_pos FROM temp_purchase_orders');
po_id, pid, vendor, date, expected_date, status, notes, const total_pos = parseInt(totalPosResult.rows?.[0]?.total_pos || '0', 10);
received_qty, received_cost, last_received_date, last_received_by,
alt_po_received_qty, alt_po_last_received_date, outputProgress({
no_po_received_qty, no_po_last_received_date status: "running",
operation: "Purchase orders final import",
message: `Processing ${total_pos} purchase orders for final import`,
current: 0,
total: total_pos
});
// Process in chunks using cursor-based pagination
let lastPoId = 0;
let lastPid = 0;
let recordsAdded = 0;
let recordsUpdated = 0;
while (true) {
console.log('Fetching next chunk with lastPoId:', lastPoId, 'lastPid:', lastPid);
const chunkResult = await localConnection.query(`
SELECT po_id, pid FROM temp_purchase_orders
WHERE (po_id, pid) > ($1, $2)
ORDER BY po_id, pid
LIMIT $3
`, [lastPoId, lastPid, FINAL_CHUNK_SIZE]);
if (!chunkResult?.rows) {
console.error('No rows returned from chunk query:', chunkResult);
break;
}
const chunk = chunkResult.rows;
console.log('Got chunk of size:', chunk.length);
if (chunk.length === 0) break;
const result = await localConnection.query(`
WITH inserted_pos AS (
INSERT INTO purchase_orders (
po_id, pid, sku, name, cost_price, po_cost_price,
vendor, date, expected_date, status, notes,
ordered, received, receiving_status,
received_date, last_received_date, received_by,
receiving_history
)
SELECT
po.po_id,
po.pid,
po.sku,
po.name,
COALESCE(
(
SELECT cost_each
FROM temp_po_receivings r2
WHERE r2.pid = po.pid
AND r2.is_alt_po = 0
AND r2.cost_each > 0
ORDER BY r2.received_date
LIMIT 1
),
po.cost_price
) as cost_price,
po.cost_price as po_cost_price,
po.vendor,
po.date,
po.expected_date,
po.status,
po.notes,
po.ordered,
COALESCE(SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END), 0) as received,
CASE
WHEN COUNT(r.receiving_id) = 0 THEN 1 -- created
WHEN SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END) < po.ordered THEN 30 -- partial
ELSE 40 -- full
END as receiving_status,
MIN(CASE WHEN r.is_alt_po = 0 THEN r.received_date END) as received_date,
MAX(CASE WHEN r.is_alt_po = 0 THEN r.received_date END) as last_received_date,
(
SELECT r2.received_by_name
FROM temp_po_receivings r2
WHERE r2.pid = po.pid
AND r2.is_alt_po = 0
ORDER BY r2.received_date
LIMIT 1
) as received_by,
jsonb_build_object(
'ordered_qty', po.ordered,
'total_received', COALESCE(SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END), 0),
'remaining_unfulfilled', GREATEST(0, po.ordered - COALESCE(SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END), 0)),
'excess_received', GREATEST(0, COALESCE(SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END), 0) - po.ordered),
'po_cost', po.cost_price,
'actual_cost', COALESCE(
(
SELECT cost_each
FROM temp_po_receivings r2
WHERE r2.pid = po.pid
AND r2.is_alt_po = 0
AND r2.cost_each > 0
ORDER BY r2.received_date
LIMIT 1
),
po.cost_price
),
'fulfillment', (
SELECT jsonb_agg(
jsonb_build_object(
'receiving_id', r2.receiving_id,
'qty_applied', CASE
WHEN r2.running_total <= po.ordered THEN r2.qty_each
WHEN r2.running_total - r2.qty_each < po.ordered THEN po.ordered - (r2.running_total - r2.qty_each)
ELSE 0
END,
'qty_total', r2.qty_each,
'cost', r2.cost_each,
'date', r2.received_date,
'received_by', r2.received_by,
'received_by_name', r2.received_by_name,
'type', CASE r2.is_alt_po
WHEN 0 THEN 'original'
WHEN 1 THEN 'alternate'
ELSE 'no_po'
END,
'remaining_qty', CASE
WHEN r2.running_total <= po.ordered THEN 0
WHEN r2.running_total - r2.qty_each < po.ordered THEN r2.running_total - po.ordered
ELSE r2.qty_each
END,
'is_excess', r2.running_total > po.ordered
)
ORDER BY r2.received_date
)
FROM (
SELECT
r2.*,
SUM(r2.qty_each) OVER (
PARTITION BY r2.pid
ORDER BY r2.received_date
ROWS UNBOUNDED PRECEDING
) as running_total
FROM temp_po_receivings r2
WHERE r2.pid = po.pid
) r2
),
'alternate_po_receivings', (
SELECT jsonb_agg(
jsonb_build_object(
'receiving_id', r2.receiving_id,
'qty', r2.qty_each,
'cost', r2.cost_each,
'date', r2.received_date,
'received_by', r2.received_by,
'received_by_name', r2.received_by_name
)
ORDER BY r2.received_date
)
FROM temp_po_receivings r2
WHERE r2.pid = po.pid AND r2.is_alt_po = 1
),
'no_po_receivings', (
SELECT jsonb_agg(
jsonb_build_object(
'receiving_id', r2.receiving_id,
'qty', r2.qty_each,
'cost', r2.cost_each,
'date', r2.received_date,
'received_by', r2.received_by,
'received_by_name', r2.received_by_name
)
ORDER BY r2.received_date
)
FROM temp_po_receivings r2
WHERE r2.pid = po.pid AND r2.is_alt_po = 2
)
) as receiving_history
FROM temp_purchase_orders po
LEFT JOIN temp_po_receivings r ON po.pid = r.pid
WHERE (po.po_id, po.pid) IN (
SELECT po_id, pid FROM UNNEST($1::int[], $2::int[])
)
GROUP BY po.po_id, po.pid, po.sku, po.name, po.vendor, po.date,
po.expected_date, po.status, po.notes, po.ordered, po.cost_price
ON CONFLICT (po_id, pid) DO UPDATE SET
vendor = EXCLUDED.vendor,
date = EXCLUDED.date,
expected_date = EXCLUDED.expected_date,
status = EXCLUDED.status,
notes = EXCLUDED.notes,
ordered = EXCLUDED.ordered,
received = EXCLUDED.received,
receiving_status = EXCLUDED.receiving_status,
received_date = EXCLUDED.received_date,
last_received_date = EXCLUDED.last_received_date,
received_by = EXCLUDED.received_by,
receiving_history = EXCLUDED.receiving_history,
cost_price = EXCLUDED.cost_price,
po_cost_price = EXCLUDED.po_cost_price
RETURNING xmax
) )
SELECT SELECT
po.po_id, COUNT(*) FILTER (WHERE xmax = 0) as inserted,
po.pid, COUNT(*) FILTER (WHERE xmax <> 0) as updated
po.vendor, FROM inserted_pos
po.date, `, [
po.expected_date, chunk.map(r => r.po_id),
po.status, chunk.map(r => r.pid)
po.notes, ]);
COALESCE(SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END), 0) as received_qty,
COALESCE(AVG(CASE WHEN r.is_alt_po = 0 THEN r.cost_each END), 0) as received_cost,
MAX(CASE WHEN r.is_alt_po = 0 THEN r.received_date END) as last_received_date,
MAX(CASE WHEN r.is_alt_po = 0 THEN r.received_by_name END) as last_received_by,
COALESCE(SUM(CASE WHEN r.is_alt_po = 1 THEN r.qty_each END), 0) as alt_po_received_qty,
MAX(CASE WHEN r.is_alt_po = 1 THEN r.received_date END) as alt_po_last_received_date,
COALESCE(SUM(CASE WHEN r.is_alt_po = 2 THEN r.qty_each END), 0) as no_po_received_qty,
MAX(CASE WHEN r.is_alt_po = 2 THEN r.received_date END) as no_po_last_received_date
FROM temp_purchase_orders po
LEFT JOIN temp_po_receivings r ON po.pid = r.pid
GROUP BY po.po_id, po.pid, po.vendor, po.date, po.expected_date, po.status, po.notes
ON CONFLICT (po_id, pid) DO UPDATE SET
vendor = EXCLUDED.vendor,
date = EXCLUDED.date,
expected_date = EXCLUDED.expected_date,
status = EXCLUDED.status,
notes = EXCLUDED.notes,
received_qty = EXCLUDED.received_qty,
received_cost = EXCLUDED.received_cost,
last_received_date = EXCLUDED.last_received_date,
last_received_by = EXCLUDED.last_received_by,
alt_po_received_qty = EXCLUDED.alt_po_received_qty,
alt_po_last_received_date = EXCLUDED.alt_po_last_received_date,
no_po_received_qty = EXCLUDED.no_po_received_qty,
no_po_last_received_date = EXCLUDED.no_po_last_received_date
RETURNING xmax
)
SELECT
COUNT(*) FILTER (WHERE xmax = 0) as inserted,
COUNT(*) FILTER (WHERE xmax <> 0) as updated
FROM inserted_pos
`);
recordsAdded = parseInt(result.rows[0].inserted, 10) || 0; // Add debug logging
recordsUpdated = parseInt(result.rows[0].updated, 10) || 0; console.log('Insert result:', result?.rows?.[0]);
// Handle the result properly for PostgreSQL with more defensive coding
const resultRow = result?.rows?.[0] || {};
const insertCount = parseInt(resultRow.inserted || '0', 10);
const updateCount = parseInt(resultRow.updated || '0', 10);
recordsAdded += insertCount;
recordsUpdated += updateCount;
totalProcessed += chunk.length;
// Update progress
outputProgress({
status: "running",
operation: "Purchase orders final import",
message: `Processed ${totalProcessed} of ${total_pos} purchase orders`,
current: totalProcessed,
total: total_pos,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, totalProcessed, total_pos),
rate: calculateRate(startTime, totalProcessed)
});
// Update last processed IDs for next chunk with safety check
if (chunk.length > 0) {
const lastItem = chunk[chunk.length - 1];
if (lastItem) {
lastPoId = lastItem.po_id;
lastPid = lastItem.pid;
}
}
}
// Update sync status // Update sync status
await localConnection.query(` await localConnection.query(`