Fix more import script bugs/missing data

This commit is contained in:
2025-03-25 22:23:06 -04:00
parent 5dd779cb4a
commit 108181c63d
5 changed files with 622 additions and 215 deletions

View File

@@ -10,9 +10,9 @@ const importPurchaseOrders = require('./import/purchase-orders');
dotenv.config({ path: path.join(__dirname, "../.env") }); dotenv.config({ path: path.join(__dirname, "../.env") });
// Constants to control which imports run // Constants to control which imports run
const IMPORT_CATEGORIES = true; const IMPORT_CATEGORIES = false;
const IMPORT_PRODUCTS = true; const IMPORT_PRODUCTS = false;
const IMPORT_ORDERS = true; const IMPORT_ORDERS = false;
const IMPORT_PURCHASE_ORDERS = true; const IMPORT_PURCHASE_ORDERS = true;
// Add flag for incremental updates // Add flag for incremental updates

View File

@@ -206,6 +206,14 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
const METADATA_BATCH_SIZE = 2000; const METADATA_BATCH_SIZE = 2000;
const PG_BATCH_SIZE = 200; const PG_BATCH_SIZE = 200;
// Add a helper function for title case conversion
function toTitleCase(str) {
if (!str) return '';
return str.toLowerCase().split(' ').map(word => {
return word.charAt(0).toUpperCase() + word.slice(1);
}).join(' ');
}
const processMetadataBatch = async (batchIds) => { const processMetadataBatch = async (batchIds) => {
const [orders] = await prodConnection.query(` const [orders] = await prodConnection.query(`
SELECT SELECT
@@ -235,7 +243,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
order.order_id, order.order_id,
order.date, order.date,
order.customer, order.customer,
order.customer_name || '', toTitleCase(order.customer_name) || '',
order.status, order.status,
order.canceled, order.canceled,
order.summary_discount || 0, order.summary_discount || 0,
@@ -429,11 +437,12 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
oi.pid, oi.pid,
SUM(COALESCE(od.discount, 0)) as promo_discount, SUM(COALESCE(od.discount, 0)) as promo_discount,
COALESCE(ot.tax, 0) as total_tax, COALESCE(ot.tax, 0) as total_tax,
COALESCE(oi.price * 0.5, 0) as costeach COALESCE(oc.costeach, oi.price * 0.5) as costeach
FROM temp_order_items oi FROM temp_order_items oi
LEFT JOIN temp_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid LEFT JOIN temp_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid
LEFT JOIN temp_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid LEFT JOIN temp_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid
GROUP BY oi.order_id, oi.pid, ot.tax LEFT JOIN temp_order_costs oc ON oi.order_id = oc.order_id AND oi.pid = oc.pid
GROUP BY oi.order_id, oi.pid, ot.tax, oc.costeach
) )
SELECT SELECT
oi.order_id as order_number, oi.order_id as order_number,
@@ -491,8 +500,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
const subBatch = validOrders.slice(k, k + FINAL_BATCH_SIZE); const subBatch = validOrders.slice(k, k + FINAL_BATCH_SIZE);
const placeholders = subBatch.map((_, idx) => { const placeholders = subBatch.map((_, idx) => {
const base = idx * 14; // 14 columns (removed updated) const base = idx * 15; // 15 columns including costeach
return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, $${base + 10}, $${base + 11}, $${base + 12}, $${base + 13}, $${base + 14})`; return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, $${base + 10}, $${base + 11}, $${base + 12}, $${base + 13}, $${base + 14}, $${base + 15})`;
}).join(','); }).join(',');
const batchValues = subBatch.flatMap(o => [ const batchValues = subBatch.flatMap(o => [
@@ -509,7 +518,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
o.customer, o.customer,
o.customer_name, o.customer_name,
o.status, o.status,
o.canceled o.canceled,
o.costeach
]); ]);
const [result] = await localConnection.query(` const [result] = await localConnection.query(`
@@ -517,7 +527,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
INSERT INTO orders ( INSERT INTO orders (
order_number, pid, sku, date, price, quantity, discount, order_number, pid, sku, date, price, quantity, discount,
tax, tax_included, shipping, customer, customer_name, tax, tax_included, shipping, customer, customer_name,
status, canceled status, canceled, costeach
) )
VALUES ${placeholders} VALUES ${placeholders}
ON CONFLICT (order_number, pid) DO UPDATE SET ON CONFLICT (order_number, pid) DO UPDATE SET
@@ -532,7 +542,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
customer = EXCLUDED.customer, customer = EXCLUDED.customer,
customer_name = EXCLUDED.customer_name, customer_name = EXCLUDED.customer_name,
status = EXCLUDED.status, status = EXCLUDED.status,
canceled = EXCLUDED.canceled canceled = EXCLUDED.canceled,
costeach = EXCLUDED.costeach
RETURNING xmax = 0 as inserted RETURNING xmax = 0 as inserted
) )
SELECT SELECT

View File

@@ -1,5 +1,5 @@
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress'); const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress');
const BATCH_SIZE = 100; // Smaller batch size for better progress tracking const BATCH_SIZE = 1000; // Smaller batch size for better progress tracking
const MAX_RETRIES = 3; const MAX_RETRIES = 3;
const RETRY_DELAY = 5000; // 5 seconds const RETRY_DELAY = 5000; // 5 seconds
const dotenv = require("dotenv"); const dotenv = require("dotenv");

View File

@@ -519,217 +519,276 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
outputProgress({ outputProgress({
status: "running", status: "running",
operation: "Purchase orders import", operation: "Purchase orders import",
message: "Allocating receivings to purchase orders using FIFO" message: "Validating product IDs before allocation"
}); });
// Step 1: Handle receivings with matching PO IDs (direct allocation) // Add this section to filter out invalid PIDs before allocation
// This will check all PIDs in our temp tables against the products table
await localConnection.query(` await localConnection.query(`
INSERT INTO temp_receiving_allocations ( -- Create temp table to store invalid PIDs
po_id, pid, receiving_id, allocated_qty, cost_each, received_date, received_by DROP TABLE IF EXISTS temp_invalid_pids;
) CREATE TEMP TABLE temp_invalid_pids AS (
SELECT -- Get all unique PIDs from our temp tables
r.po_id, WITH all_pids AS (
r.pid, SELECT DISTINCT pid FROM temp_purchase_orders
r.receiving_id, UNION
LEAST(r.qty_each, po.ordered) as allocated_qty, SELECT DISTINCT pid FROM temp_receivings
r.cost_each, )
COALESCE(r.received_date, NOW()) as received_date, -- Filter to only those that don't exist in products table
r.received_by SELECT p.pid
FROM temp_receivings r FROM all_pids p
JOIN temp_purchase_orders po ON r.po_id = po.po_id AND r.pid = po.pid WHERE NOT EXISTS (
WHERE r.po_id IS NOT NULL SELECT 1 FROM products WHERE pid = p.pid
)
);
-- Remove purchase orders with invalid PIDs
DELETE FROM temp_purchase_orders
WHERE pid IN (SELECT pid FROM temp_invalid_pids);
-- Remove receivings with invalid PIDs
DELETE FROM temp_receivings
WHERE pid IN (SELECT pid FROM temp_invalid_pids);
`); `);
// Step 2: Handle receivings without a matching PO (standalone receivings) // Get count of filtered items for reporting
// Create a PO entry for each standalone receiving const [filteredResult] = await localConnection.query(`
await localConnection.query(` SELECT COUNT(*) as count FROM temp_invalid_pids
INSERT INTO temp_purchase_orders (
po_id, pid, sku, name, vendor, date, status, status_text,
ordered, po_cost_price, supplier_id, date_created, date_ordered
)
SELECT
'R' || r.receiving_id as po_id,
r.pid,
COALESCE(p.sku, 'NO-SKU') as sku,
COALESCE(p.name, 'Unknown Product') as name,
COALESCE(
(SELECT vendor FROM temp_purchase_orders
WHERE supplier_id = r.supplier_id LIMIT 1),
'Unknown Vendor'
) as vendor,
COALESCE(r.received_date, r.receiving_created_date) as date,
NULL as status,
NULL as status_text,
NULL as ordered,
r.cost_each as po_cost_price,
r.supplier_id,
COALESCE(r.receiving_created_date, r.received_date) as date_created,
NULL as date_ordered
FROM temp_receivings r
LEFT JOIN (
SELECT DISTINCT pid, sku, name FROM temp_purchase_orders
) p ON r.pid = p.pid
WHERE r.po_id IS NULL
OR NOT EXISTS (
SELECT 1 FROM temp_purchase_orders po
WHERE po.po_id = r.po_id AND po.pid = r.pid
)
ON CONFLICT (po_id, pid) DO NOTHING
`); `);
const filteredCount = filteredResult.rows[0].count;
// Now allocate these standalone receivings to their "virtual" POs if (filteredCount > 0) {
await localConnection.query(` console.log(`Filtered out ${filteredCount} items with invalid product IDs`);
INSERT INTO temp_receiving_allocations ( }
po_id, pid, receiving_id, allocated_qty, cost_each, received_date, received_by
)
SELECT
'R' || r.receiving_id as po_id,
r.pid,
r.receiving_id,
r.qty_each as allocated_qty,
r.cost_each,
COALESCE(r.received_date, NOW()) as received_date,
r.received_by
FROM temp_receivings r
WHERE r.po_id IS NULL
OR NOT EXISTS (
SELECT 1 FROM temp_purchase_orders po
WHERE po.po_id = r.po_id AND po.pid = r.pid
)
`);
// Step 3: Handle unallocated receivings vs. unfulfilled orders // Break FIFO allocation into steps with progress tracking
// This is the complex FIFO allocation logic const fifoSteps = [
await localConnection.query(` {
WITH name: "Direct allocations",
-- Calculate remaining quantities after direct allocations query: `
remaining_po_quantities AS ( INSERT INTO temp_receiving_allocations (
SELECT po_id, pid, receiving_id, allocated_qty, cost_each, received_date, received_by
po.po_id, )
po.pid, SELECT
po.ordered, r.po_id,
COALESCE(SUM(ra.allocated_qty), 0) as already_allocated, r.pid,
po.ordered - COALESCE(SUM(ra.allocated_qty), 0) as remaining_qty, r.receiving_id,
po.date_ordered, LEAST(r.qty_each, po.ordered) as allocated_qty,
po.date_created r.cost_each,
FROM temp_purchase_orders po COALESCE(r.received_date, NOW()) as received_date,
LEFT JOIN temp_receiving_allocations ra ON po.po_id = ra.po_id AND po.pid = ra.pid r.received_by
WHERE po.ordered IS NOT NULL FROM temp_receivings r
GROUP BY po.po_id, po.pid, po.ordered, po.date_ordered, po.date_created JOIN temp_purchase_orders po ON r.po_id = po.po_id AND r.pid = po.pid
HAVING po.ordered > COALESCE(SUM(ra.allocated_qty), 0) WHERE r.po_id IS NOT NULL
), `
remaining_receiving_quantities AS ( },
SELECT {
r.receiving_id, name: "Handling standalone receivings",
r.pid, query: `
r.qty_each, INSERT INTO temp_purchase_orders (
COALESCE(SUM(ra.allocated_qty), 0) as already_allocated, po_id, pid, sku, name, vendor, date, status, status_text,
r.qty_each - COALESCE(SUM(ra.allocated_qty), 0) as remaining_qty, ordered, po_cost_price, supplier_id, date_created, date_ordered
r.received_date, )
r.cost_each, SELECT
r.received_by 'R' || r.receiving_id as po_id,
FROM temp_receivings r r.pid,
LEFT JOIN temp_receiving_allocations ra ON r.receiving_id = ra.receiving_id AND r.pid = ra.pid COALESCE(p.sku, 'NO-SKU') as sku,
GROUP BY r.receiving_id, r.pid, r.qty_each, r.received_date, r.cost_each, r.received_by COALESCE(p.name, 'Unknown Product') as name,
HAVING r.qty_each > COALESCE(SUM(ra.allocated_qty), 0) COALESCE(
), (SELECT vendor FROM temp_purchase_orders
-- Rank POs by age, with a cutoff for very old POs (1 year) WHERE supplier_id = r.supplier_id LIMIT 1),
ranked_pos AS ( 'Unknown Vendor'
SELECT ) as vendor,
po.po_id, COALESCE(r.received_date, r.receiving_created_date) as date,
po.pid, NULL as status,
po.remaining_qty, NULL as status_text,
CASE NULL as ordered,
WHEN po.date_ordered IS NULL OR po.date_ordered < NOW() - INTERVAL '1 year' THEN 2 r.cost_each as po_cost_price,
ELSE 1 r.supplier_id,
END as age_group, COALESCE(r.receiving_created_date, r.received_date) as date_created,
ROW_NUMBER() OVER ( NULL as date_ordered
PARTITION BY po.pid, (CASE WHEN po.date_ordered IS NULL OR po.date_ordered < NOW() - INTERVAL '1 year' THEN 2 ELSE 1 END) FROM temp_receivings r
ORDER BY COALESCE(po.date_ordered, po.date_created, NOW()) LEFT JOIN (
) as rank_in_group SELECT DISTINCT pid, sku, name FROM temp_purchase_orders
FROM remaining_po_quantities po ) p ON r.pid = p.pid
), WHERE r.po_id IS NULL
-- Rank receivings by date OR NOT EXISTS (
ranked_receivings AS ( SELECT 1 FROM temp_purchase_orders po
SELECT WHERE po.po_id = r.po_id AND po.pid = r.pid
r.receiving_id, )
r.pid, ON CONFLICT (po_id, pid) DO NOTHING
r.remaining_qty, `
r.received_date, },
r.cost_each, {
r.received_by, name: "Allocating standalone receivings",
ROW_NUMBER() OVER (PARTITION BY r.pid ORDER BY COALESCE(r.received_date, NOW())) as rank query: `
FROM remaining_receiving_quantities r INSERT INTO temp_receiving_allocations (
), po_id, pid, receiving_id, allocated_qty, cost_each, received_date, received_by
-- First allocate to recent POs )
allocations_recent AS ( SELECT
SELECT 'R' || r.receiving_id as po_id,
po.po_id, r.pid,
po.pid, r.receiving_id,
r.receiving_id, r.qty_each as allocated_qty,
LEAST(po.remaining_qty, r.remaining_qty) as allocated_qty, r.cost_each,
r.cost_each, COALESCE(r.received_date, NOW()) as received_date,
COALESCE(r.received_date, NOW()) as received_date, r.received_by
r.received_by, FROM temp_receivings r
po.age_group, WHERE r.po_id IS NULL
po.rank_in_group, OR NOT EXISTS (
r.rank, SELECT 1 FROM temp_purchase_orders po
'recent' as allocation_type WHERE po.po_id = r.po_id AND po.pid = r.pid
FROM ranked_pos po )
JOIN ranked_receivings r ON po.pid = r.pid `
WHERE po.age_group = 1 },
ORDER BY po.pid, po.rank_in_group, r.rank {
), name: "FIFO allocation logic",
-- Then allocate to older POs query: `
remaining_after_recent AS ( WITH
SELECT -- Calculate remaining quantities after direct allocations
r.receiving_id, remaining_po_quantities AS (
r.pid, SELECT
r.remaining_qty - COALESCE(SUM(a.allocated_qty), 0) as remaining_qty, po.po_id,
r.received_date, po.pid,
r.cost_each, po.ordered,
r.received_by, COALESCE(SUM(ra.allocated_qty), 0) as already_allocated,
r.rank po.ordered - COALESCE(SUM(ra.allocated_qty), 0) as remaining_qty,
FROM ranked_receivings r po.date_ordered,
LEFT JOIN allocations_recent a ON r.receiving_id = a.receiving_id AND r.pid = a.pid po.date_created
GROUP BY r.receiving_id, r.pid, r.remaining_qty, r.received_date, r.cost_each, r.received_by, r.rank FROM temp_purchase_orders po
HAVING r.remaining_qty > COALESCE(SUM(a.allocated_qty), 0) LEFT JOIN temp_receiving_allocations ra ON po.po_id = ra.po_id AND po.pid = ra.pid
), WHERE po.ordered IS NOT NULL
allocations_old AS ( GROUP BY po.po_id, po.pid, po.ordered, po.date_ordered, po.date_created
SELECT HAVING po.ordered > COALESCE(SUM(ra.allocated_qty), 0)
po.po_id, ),
po.pid, remaining_receiving_quantities AS (
r.receiving_id, SELECT
LEAST(po.remaining_qty, r.remaining_qty) as allocated_qty, r.receiving_id,
r.cost_each, r.pid,
COALESCE(r.received_date, NOW()) as received_date, r.qty_each,
r.received_by, COALESCE(SUM(ra.allocated_qty), 0) as already_allocated,
po.age_group, r.qty_each - COALESCE(SUM(ra.allocated_qty), 0) as remaining_qty,
po.rank_in_group, r.received_date,
r.rank, r.cost_each,
'old' as allocation_type r.received_by
FROM ranked_pos po FROM temp_receivings r
JOIN remaining_after_recent r ON po.pid = r.pid LEFT JOIN temp_receiving_allocations ra ON r.receiving_id = ra.receiving_id AND r.pid = ra.pid
WHERE po.age_group = 2 GROUP BY r.receiving_id, r.pid, r.qty_each, r.received_date, r.cost_each, r.received_by
ORDER BY po.pid, po.rank_in_group, r.rank HAVING r.qty_each > COALESCE(SUM(ra.allocated_qty), 0)
), ),
-- Combine allocations -- Rank POs by age, with a cutoff for very old POs (1 year)
combined_allocations AS ( ranked_pos AS (
SELECT * FROM allocations_recent SELECT
UNION ALL po.po_id,
SELECT * FROM allocations_old po.pid,
) po.remaining_qty,
-- Insert into allocations table CASE
INSERT INTO temp_receiving_allocations ( WHEN po.date_ordered IS NULL OR po.date_ordered < NOW() - INTERVAL '1 year' THEN 2
po_id, pid, receiving_id, allocated_qty, cost_each, received_date, received_by ELSE 1
) END as age_group,
SELECT ROW_NUMBER() OVER (
po_id, pid, receiving_id, allocated_qty, cost_each, PARTITION BY po.pid, (CASE WHEN po.date_ordered IS NULL OR po.date_ordered < NOW() - INTERVAL '1 year' THEN 2 ELSE 1 END)
COALESCE(received_date, NOW()) as received_date, ORDER BY COALESCE(po.date_ordered, po.date_created, NOW())
received_by ) as rank_in_group
FROM combined_allocations FROM remaining_po_quantities po
WHERE allocated_qty > 0 ),
`); -- Rank receivings by date
ranked_receivings AS (
SELECT
r.receiving_id,
r.pid,
r.remaining_qty,
r.received_date,
r.cost_each,
r.received_by,
ROW_NUMBER() OVER (PARTITION BY r.pid ORDER BY COALESCE(r.received_date, NOW())) as rank
FROM remaining_receiving_quantities r
),
-- First allocate to recent POs
allocations_recent AS (
SELECT
po.po_id,
po.pid,
r.receiving_id,
LEAST(po.remaining_qty, r.remaining_qty) as allocated_qty,
r.cost_each,
COALESCE(r.received_date, NOW()) as received_date,
r.received_by,
po.age_group,
po.rank_in_group,
r.rank,
'recent' as allocation_type
FROM ranked_pos po
JOIN ranked_receivings r ON po.pid = r.pid
WHERE po.age_group = 1
ORDER BY po.pid, po.rank_in_group, r.rank
),
-- Then allocate to older POs
remaining_after_recent AS (
SELECT
r.receiving_id,
r.pid,
r.remaining_qty - COALESCE(SUM(a.allocated_qty), 0) as remaining_qty,
r.received_date,
r.cost_each,
r.received_by,
r.rank
FROM ranked_receivings r
LEFT JOIN allocations_recent a ON r.receiving_id = a.receiving_id AND r.pid = a.pid
GROUP BY r.receiving_id, r.pid, r.remaining_qty, r.received_date, r.cost_each, r.received_by, r.rank
HAVING r.remaining_qty > COALESCE(SUM(a.allocated_qty), 0)
),
allocations_old AS (
SELECT
po.po_id,
po.pid,
r.receiving_id,
LEAST(po.remaining_qty, r.remaining_qty) as allocated_qty,
r.cost_each,
COALESCE(r.received_date, NOW()) as received_date,
r.received_by,
po.age_group,
po.rank_in_group,
r.rank,
'old' as allocation_type
FROM ranked_pos po
JOIN remaining_after_recent r ON po.pid = r.pid
WHERE po.age_group = 2
ORDER BY po.pid, po.rank_in_group, r.rank
),
-- Combine allocations
combined_allocations AS (
SELECT * FROM allocations_recent
UNION ALL
SELECT * FROM allocations_old
)
-- Insert into allocations table
INSERT INTO temp_receiving_allocations (
po_id, pid, receiving_id, allocated_qty, cost_each, received_date, received_by
)
SELECT
po_id, pid, receiving_id, allocated_qty, cost_each,
COALESCE(received_date, NOW()) as received_date,
received_by
FROM combined_allocations
WHERE allocated_qty > 0
`
}
];
// Execute FIFO steps with progress tracking
for (let i = 0; i < fifoSteps.length; i++) {
const step = fifoSteps[i];
outputProgress({
status: "running",
operation: "Purchase orders import",
message: `FIFO allocation step ${i+1}/${fifoSteps.length}: ${step.name}`,
current: i,
total: fifoSteps.length
});
await localConnection.query(step.query);
}
// 4. Generate final purchase order records with receiving data // 4. Generate final purchase order records with receiving data
outputProgress({ outputProgress({

View File

@@ -0,0 +1,337 @@
/**
* This script updates the costeach values for existing orders from the original MySQL database
* without needing to run the full import process.
*/
const dotenv = require("dotenv");
const path = require("path");
const fs = require("fs");
const { setupConnections, closeConnections } = require('./import/utils');
const { outputProgress, formatElapsedTime } = require('./metrics/utils/progress');
dotenv.config({ path: path.join(__dirname, "../.env") });
// SSH configuration
const sshConfig = {
ssh: {
host: process.env.PROD_SSH_HOST,
port: process.env.PROD_SSH_PORT || 22,
username: process.env.PROD_SSH_USER,
privateKey: process.env.PROD_SSH_KEY_PATH
? fs.readFileSync(process.env.PROD_SSH_KEY_PATH)
: undefined,
compress: true, // Enable SSH compression
},
prodDbConfig: {
// MySQL config for production
host: process.env.PROD_DB_HOST || "localhost",
user: process.env.PROD_DB_USER,
password: process.env.PROD_DB_PASSWORD,
database: process.env.PROD_DB_NAME,
port: process.env.PROD_DB_PORT || 3306,
timezone: 'Z',
},
localDbConfig: {
// PostgreSQL config for local
host: process.env.DB_HOST,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
database: process.env.DB_NAME,
port: process.env.DB_PORT || 5432,
ssl: process.env.DB_SSL === 'true',
connectionTimeoutMillis: 60000,
idleTimeoutMillis: 30000,
max: 10 // connection pool max size
}
};
async function updateOrderCosts() {
const startTime = Date.now();
let connections;
let updatedCount = 0;
let errorCount = 0;
try {
outputProgress({
status: "running",
operation: "Order costs update",
message: "Initializing SSH tunnel..."
});
connections = await setupConnections(sshConfig);
const { prodConnection, localConnection } = connections;
// 1. Get all orders from local database that need cost updates
outputProgress({
status: "running",
operation: "Order costs update",
message: "Getting orders from local database..."
});
const [orders] = await localConnection.query(`
SELECT DISTINCT order_number, pid
FROM orders
WHERE costeach = 0 OR costeach IS NULL
ORDER BY order_number
`);
if (!orders || !orders.rows || orders.rows.length === 0) {
console.log("No orders found that need cost updates");
return { updatedCount: 0, errorCount: 0 };
}
const totalOrders = orders.rows.length;
console.log(`Found ${totalOrders} orders that need cost updates`);
// Process in batches of 1000 orders
const BATCH_SIZE = 500;
for (let i = 0; i < orders.rows.length; i += BATCH_SIZE) {
try {
// Start transaction for this batch
await localConnection.beginTransaction();
const batch = orders.rows.slice(i, i + BATCH_SIZE);
const orderNumbers = [...new Set(batch.map(o => o.order_number))];
// 2. Fetch costs from production database for these orders
outputProgress({
status: "running",
operation: "Order costs update",
message: `Fetching costs for orders ${i + 1} to ${Math.min(i + BATCH_SIZE, totalOrders)} of ${totalOrders}`,
current: i,
total: totalOrders,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000)
});
const [costs] = await prodConnection.query(`
SELECT
oc.orderid as order_number,
oc.pid,
oc.costeach
FROM order_costs oc
INNER JOIN (
SELECT
orderid,
pid,
MAX(id) as max_id
FROM order_costs
WHERE orderid IN (?)
AND pending = 0
GROUP BY orderid, pid
) latest ON oc.orderid = latest.orderid AND oc.pid = latest.pid AND oc.id = latest.max_id
`, [orderNumbers]);
// Create a map of costs for easy lookup
const costMap = {};
if (costs && costs.length) {
costs.forEach(c => {
costMap[`${c.order_number}-${c.pid}`] = c.costeach || 0;
});
}
// 3. Update costs in local database by batches
// Using a more efficient update approach with a temporary table
// Create a temporary table for each batch
await localConnection.query(`
DROP TABLE IF EXISTS temp_order_costs;
CREATE TEMP TABLE temp_order_costs (
order_number VARCHAR(50) NOT NULL,
pid BIGINT NOT NULL,
costeach DECIMAL(10,3) NOT NULL,
PRIMARY KEY (order_number, pid)
);
`);
// Insert cost data into the temporary table
const costEntries = [];
for (const order of batch) {
const key = `${order.order_number}-${order.pid}`;
if (key in costMap) {
costEntries.push({
order_number: order.order_number,
pid: order.pid,
costeach: costMap[key]
});
}
}
// Insert in sub-batches of 100
const DB_BATCH_SIZE = 50;
for (let j = 0; j < costEntries.length; j += DB_BATCH_SIZE) {
const subBatch = costEntries.slice(j, j + DB_BATCH_SIZE);
if (subBatch.length === 0) continue;
const placeholders = subBatch.map((_, idx) =>
`($${idx * 3 + 1}, $${idx * 3 + 2}, $${idx * 3 + 3})`
).join(',');
const values = subBatch.flatMap(item => [
item.order_number,
item.pid,
item.costeach
]);
await localConnection.query(`
INSERT INTO temp_order_costs (order_number, pid, costeach)
VALUES ${placeholders}
`, values);
}
// Perform bulk update from the temporary table
const [updateResult] = await localConnection.query(`
UPDATE orders o
SET costeach = t.costeach
FROM temp_order_costs t
WHERE o.order_number = t.order_number AND o.pid = t.pid
RETURNING o.id
`);
const batchUpdated = updateResult.rowCount || 0;
updatedCount += batchUpdated;
// Commit transaction for this batch
await localConnection.commit();
outputProgress({
status: "running",
operation: "Order costs update",
message: `Updated ${updatedCount} orders with costs from production (batch: ${batchUpdated})`,
current: i + batch.length,
total: totalOrders,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000)
});
} catch (error) {
// If a batch fails, roll back that batch's transaction and continue
try {
await localConnection.rollback();
} catch (rollbackError) {
console.error("Error during batch rollback:", rollbackError);
}
console.error(`Error processing batch ${i}-${i + BATCH_SIZE}:`, error);
errorCount++;
}
}
// 4. For orders with no matching costs, set a default based on price
outputProgress({
status: "running",
operation: "Order costs update",
message: "Setting default costs for remaining orders..."
});
// Process remaining updates in smaller batches
const DEFAULT_BATCH_SIZE = 10000;
let totalDefaultUpdated = 0;
try {
// Start with a count query to determine how many records need the default update
const [countResult] = await localConnection.query(`
SELECT COUNT(*) as count FROM orders
WHERE (costeach = 0 OR costeach IS NULL)
`);
const totalToUpdate = parseInt(countResult.rows[0]?.count || 0);
if (totalToUpdate > 0) {
console.log(`Applying default cost to ${totalToUpdate} orders`);
// Apply the default in batches with separate transactions
for (let i = 0; i < totalToUpdate; i += DEFAULT_BATCH_SIZE) {
try {
await localConnection.beginTransaction();
const [defaultUpdates] = await localConnection.query(`
WITH orders_to_update AS (
SELECT id FROM orders
WHERE (costeach = 0 OR costeach IS NULL)
LIMIT ${DEFAULT_BATCH_SIZE}
)
UPDATE orders o
SET costeach = price * 0.5
FROM orders_to_update otu
WHERE o.id = otu.id
RETURNING o.id
`);
const batchDefaultUpdated = defaultUpdates.rowCount || 0;
totalDefaultUpdated += batchDefaultUpdated;
await localConnection.commit();
outputProgress({
status: "running",
operation: "Order costs update",
message: `Applied default costs to ${totalDefaultUpdated} of ${totalToUpdate} orders`,
current: totalDefaultUpdated,
total: totalToUpdate,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000)
});
} catch (error) {
try {
await localConnection.rollback();
} catch (rollbackError) {
console.error("Error during default update rollback:", rollbackError);
}
console.error(`Error applying default costs batch ${i}-${i + DEFAULT_BATCH_SIZE}:`, error);
errorCount++;
}
}
}
} catch (error) {
console.error("Error counting or updating remaining orders:", error);
errorCount++;
}
updatedCount += totalDefaultUpdated;
const endTime = Date.now();
const totalSeconds = (endTime - startTime) / 1000;
outputProgress({
status: "complete",
operation: "Order costs update",
message: `Updated ${updatedCount} orders (${totalDefaultUpdated} with default values) in ${formatElapsedTime(totalSeconds)}`,
elapsed: formatElapsedTime(totalSeconds)
});
return {
status: "complete",
updatedCount,
errorCount
};
} catch (error) {
console.error("Error during order costs update:", error);
return {
status: "error",
error: error.message,
updatedCount,
errorCount
};
} finally {
if (connections) {
await closeConnections(connections).catch(err => {
console.error("Error closing connections:", err);
});
}
}
}
// Run the script only if this is the main module
if (require.main === module) {
updateOrderCosts().then((results) => {
console.log('Cost update completed:', results);
// Force exit after a small delay to ensure all logs are written
setTimeout(() => process.exit(0), 500);
}).catch((error) => {
console.error("Unhandled error:", error);
// Force exit with error code after a small delay
setTimeout(() => process.exit(1), 500);
});
}
// Export the function for use in other scripts
module.exports = updateOrderCosts;