Import and calculate tweaks and fixes

This commit is contained in:
2025-04-06 17:12:36 -04:00
parent a4c1a19d2e
commit 92ff80fba2
8 changed files with 661 additions and 596 deletions

View File

@@ -26,10 +26,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
let cumulativeProcessedOrders = 0;
try {
// Begin transaction
await localConnection.beginTransaction();
// Get last sync info
// Get last sync info - NOT in a transaction anymore
const [syncInfo] = await localConnection.query(
"SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'orders'"
);
@@ -43,8 +40,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
FROM order_items oi
JOIN _order o ON oi.order_id = o.order_id
WHERE o.order_status >= 15
AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR)
AND o.date_placed_onlydate IS NOT NULL
AND o.date_placed >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR)
AND o.date_placed IS NOT NULL
${incrementalUpdate ? `
AND (
o.stamp > ?
@@ -82,8 +79,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
FROM order_items oi
JOIN _order o ON oi.order_id = o.order_id
WHERE o.order_status >= 15
AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR)
AND o.date_placed_onlydate IS NOT NULL
AND o.date_placed >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR)
AND o.date_placed IS NOT NULL
${incrementalUpdate ? `
AND (
o.stamp > ?
@@ -107,91 +104,111 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
console.log('Orders: Found', orderItems.length, 'order items to process');
// Create tables in PostgreSQL for data processing
await localConnection.query(`
DROP TABLE IF EXISTS temp_order_items;
DROP TABLE IF EXISTS temp_order_meta;
DROP TABLE IF EXISTS temp_order_discounts;
DROP TABLE IF EXISTS temp_order_taxes;
DROP TABLE IF EXISTS temp_order_costs;
CREATE TEMP TABLE temp_order_items (
order_id INTEGER NOT NULL,
pid INTEGER NOT NULL,
sku TEXT NOT NULL,
price NUMERIC(14, 4) NOT NULL,
quantity INTEGER NOT NULL,
base_discount NUMERIC(14, 4) DEFAULT 0,
PRIMARY KEY (order_id, pid)
);
CREATE TEMP TABLE temp_order_meta (
order_id INTEGER NOT NULL,
date TIMESTAMP WITH TIME ZONE NOT NULL,
customer TEXT NOT NULL,
customer_name TEXT NOT NULL,
status TEXT,
canceled BOOLEAN,
summary_discount NUMERIC(14, 4) DEFAULT 0.0000,
summary_subtotal NUMERIC(14, 4) DEFAULT 0.0000,
PRIMARY KEY (order_id)
);
CREATE TEMP TABLE temp_order_discounts (
order_id INTEGER NOT NULL,
pid INTEGER NOT NULL,
discount NUMERIC(14, 4) NOT NULL,
PRIMARY KEY (order_id, pid)
);
CREATE TEMP TABLE temp_order_taxes (
order_id INTEGER NOT NULL,
pid INTEGER NOT NULL,
tax NUMERIC(14, 4) NOT NULL,
PRIMARY KEY (order_id, pid)
);
CREATE TEMP TABLE temp_order_costs (
order_id INTEGER NOT NULL,
pid INTEGER NOT NULL,
costeach NUMERIC(14, 4) DEFAULT 0.0000,
PRIMARY KEY (order_id, pid)
);
CREATE INDEX idx_temp_order_items_pid ON temp_order_items(pid);
CREATE INDEX idx_temp_order_meta_order_id ON temp_order_meta(order_id);
`);
// Insert order items in batches
for (let i = 0; i < orderItems.length; i += 5000) {
const batch = orderItems.slice(i, Math.min(i + 5000, orderItems.length));
const placeholders = batch.map((_, idx) =>
`($${idx * 6 + 1}, $${idx * 6 + 2}, $${idx * 6 + 3}, $${idx * 6 + 4}, $${idx * 6 + 5}, $${idx * 6 + 6})`
).join(",");
const values = batch.flatMap(item => [
item.order_id, item.prod_pid, item.SKU, item.price, item.quantity, item.base_discount
]);
// Start a transaction just for creating the temp tables
await localConnection.beginTransaction();
try {
await localConnection.query(`
INSERT INTO temp_order_items (order_id, pid, sku, price, quantity, base_discount)
VALUES ${placeholders}
ON CONFLICT (order_id, pid) DO UPDATE SET
sku = EXCLUDED.sku,
price = EXCLUDED.price,
quantity = EXCLUDED.quantity,
base_discount = EXCLUDED.base_discount
`, values);
DROP TABLE IF EXISTS temp_order_items;
DROP TABLE IF EXISTS temp_order_meta;
DROP TABLE IF EXISTS temp_order_discounts;
DROP TABLE IF EXISTS temp_order_taxes;
DROP TABLE IF EXISTS temp_order_costs;
processedCount = i + batch.length;
outputProgress({
status: "running",
operation: "Orders import",
message: `Loading order items: ${processedCount} of ${totalOrderItems}`,
current: processedCount,
total: totalOrderItems,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, processedCount, totalOrderItems),
rate: calculateRate(startTime, processedCount)
});
CREATE TEMP TABLE temp_order_items (
order_id INTEGER NOT NULL,
pid INTEGER NOT NULL,
sku TEXT NOT NULL,
price NUMERIC(14, 4) NOT NULL,
quantity INTEGER NOT NULL,
base_discount NUMERIC(14, 4) DEFAULT 0,
PRIMARY KEY (order_id, pid)
);
CREATE TEMP TABLE temp_order_meta (
order_id INTEGER NOT NULL,
date TIMESTAMP WITH TIME ZONE NOT NULL,
customer TEXT NOT NULL,
customer_name TEXT NOT NULL,
status TEXT,
canceled BOOLEAN,
summary_discount NUMERIC(14, 4) DEFAULT 0.0000,
summary_subtotal NUMERIC(14, 4) DEFAULT 0.0000,
summary_discount_subtotal NUMERIC(14, 4) DEFAULT 0.0000,
PRIMARY KEY (order_id)
);
CREATE TEMP TABLE temp_order_discounts (
order_id INTEGER NOT NULL,
pid INTEGER NOT NULL,
discount NUMERIC(14, 4) NOT NULL,
PRIMARY KEY (order_id, pid)
);
CREATE TEMP TABLE temp_order_taxes (
order_id INTEGER NOT NULL,
pid INTEGER NOT NULL,
tax NUMERIC(14, 4) NOT NULL,
PRIMARY KEY (order_id, pid)
);
CREATE TEMP TABLE temp_order_costs (
order_id INTEGER NOT NULL,
pid INTEGER NOT NULL,
costeach NUMERIC(14, 4) DEFAULT 0.0000,
PRIMARY KEY (order_id, pid)
);
CREATE INDEX idx_temp_order_items_pid ON temp_order_items(pid);
CREATE INDEX idx_temp_order_meta_order_id ON temp_order_meta(order_id);
CREATE INDEX idx_temp_order_discounts_order_pid ON temp_order_discounts(order_id, pid);
CREATE INDEX idx_temp_order_taxes_order_pid ON temp_order_taxes(order_id, pid);
CREATE INDEX idx_temp_order_costs_order_pid ON temp_order_costs(order_id, pid);
`);
await localConnection.commit();
} catch (error) {
await localConnection.rollback();
throw error;
}
// Insert order items in batches - each batch gets its own transaction
for (let i = 0; i < orderItems.length; i += 5000) {
await localConnection.beginTransaction();
try {
const batch = orderItems.slice(i, Math.min(i + 5000, orderItems.length));
const placeholders = batch.map((_, idx) =>
`($${idx * 6 + 1}, $${idx * 6 + 2}, $${idx * 6 + 3}, $${idx * 6 + 4}, $${idx * 6 + 5}, $${idx * 6 + 6})`
).join(",");
const values = batch.flatMap(item => [
item.order_id, item.prod_pid, item.SKU, item.price, item.quantity, item.base_discount
]);
await localConnection.query(`
INSERT INTO temp_order_items (order_id, pid, sku, price, quantity, base_discount)
VALUES ${placeholders}
ON CONFLICT (order_id, pid) DO UPDATE SET
sku = EXCLUDED.sku,
price = EXCLUDED.price,
quantity = EXCLUDED.quantity,
base_discount = EXCLUDED.base_discount
`, values);
await localConnection.commit();
processedCount = i + batch.length;
outputProgress({
status: "running",
operation: "Orders import",
message: `Loading order items: ${processedCount} of ${totalOrderItems}`,
current: processedCount,
total: totalOrderItems,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, processedCount, totalOrderItems),
rate: calculateRate(startTime, processedCount)
});
} catch (error) {
await localConnection.rollback();
throw error;
}
}
// Get unique order IDs
@@ -218,53 +235,63 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
const [orders] = await prodConnection.query(`
SELECT
o.order_id,
o.date_placed_onlydate as date,
o.date_placed as date,
o.order_cid as customer,
CONCAT(COALESCE(u.firstname, ''), ' ', COALESCE(u.lastname, '')) as customer_name,
o.order_status as status,
CASE WHEN o.date_cancelled != '0000-00-00 00:00:00' THEN 1 ELSE 0 END as canceled,
o.summary_discount,
o.summary_subtotal
o.summary_subtotal,
o.summary_discount_subtotal
FROM _order o
LEFT JOIN users u ON o.order_cid = u.cid
WHERE o.order_id IN (?)
`, [batchIds]);
// Process in sub-batches for PostgreSQL
for (let j = 0; j < orders.length; j += PG_BATCH_SIZE) {
const subBatch = orders.slice(j, j + PG_BATCH_SIZE);
if (subBatch.length === 0) continue;
await localConnection.beginTransaction();
try {
for (let j = 0; j < orders.length; j += PG_BATCH_SIZE) {
const subBatch = orders.slice(j, j + PG_BATCH_SIZE);
if (subBatch.length === 0) continue;
const placeholders = subBatch.map((_, idx) =>
`($${idx * 8 + 1}, $${idx * 8 + 2}, $${idx * 8 + 3}, $${idx * 8 + 4}, $${idx * 8 + 5}, $${idx * 8 + 6}, $${idx * 8 + 7}, $${idx * 8 + 8})`
).join(",");
const values = subBatch.flatMap(order => [
order.order_id,
new Date(order.date), // Convert to TIMESTAMP WITH TIME ZONE
order.customer,
toTitleCase(order.customer_name) || '',
order.status.toString(), // Convert status to TEXT
order.canceled,
order.summary_discount || 0,
order.summary_subtotal || 0
]);
const placeholders = subBatch.map((_, idx) =>
`($${idx * 9 + 1}, $${idx * 9 + 2}, $${idx * 9 + 3}, $${idx * 9 + 4}, $${idx * 9 + 5}, $${idx * 9 + 6}, $${idx * 9 + 7}, $${idx * 9 + 8}, $${idx * 9 + 9})`
).join(",");
const values = subBatch.flatMap(order => [
order.order_id,
new Date(order.date), // Convert to TIMESTAMP WITH TIME ZONE
order.customer,
toTitleCase(order.customer_name) || '',
order.status.toString(), // Convert status to TEXT
order.canceled,
order.summary_discount || 0,
order.summary_subtotal || 0,
order.summary_discount_subtotal || 0
]);
await localConnection.query(`
INSERT INTO temp_order_meta (
order_id, date, customer, customer_name, status, canceled,
summary_discount, summary_subtotal
)
VALUES ${placeholders}
ON CONFLICT (order_id) DO UPDATE SET
date = EXCLUDED.date,
customer = EXCLUDED.customer,
customer_name = EXCLUDED.customer_name,
status = EXCLUDED.status,
canceled = EXCLUDED.canceled,
summary_discount = EXCLUDED.summary_discount,
summary_subtotal = EXCLUDED.summary_subtotal
`, values);
await localConnection.query(`
INSERT INTO temp_order_meta (
order_id, date, customer, customer_name, status, canceled,
summary_discount, summary_subtotal, summary_discount_subtotal
)
VALUES ${placeholders}
ON CONFLICT (order_id) DO UPDATE SET
date = EXCLUDED.date,
customer = EXCLUDED.customer,
customer_name = EXCLUDED.customer_name,
status = EXCLUDED.status,
canceled = EXCLUDED.canceled,
summary_discount = EXCLUDED.summary_discount,
summary_subtotal = EXCLUDED.summary_subtotal,
summary_discount_subtotal = EXCLUDED.summary_discount_subtotal
`, values);
}
await localConnection.commit();
} catch (error) {
await localConnection.rollback();
throw error;
}
};
@@ -278,26 +305,33 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
if (discounts.length === 0) return;
for (let j = 0; j < discounts.length; j += PG_BATCH_SIZE) {
const subBatch = discounts.slice(j, j + PG_BATCH_SIZE);
if (subBatch.length === 0) continue;
await localConnection.beginTransaction();
try {
for (let j = 0; j < discounts.length; j += PG_BATCH_SIZE) {
const subBatch = discounts.slice(j, j + PG_BATCH_SIZE);
if (subBatch.length === 0) continue;
const placeholders = subBatch.map((_, idx) =>
`($${idx * 3 + 1}, $${idx * 3 + 2}, $${idx * 3 + 3})`
).join(",");
const values = subBatch.flatMap(d => [
d.order_id,
d.pid,
d.discount || 0
]);
const placeholders = subBatch.map((_, idx) =>
`($${idx * 3 + 1}, $${idx * 3 + 2}, $${idx * 3 + 3})`
).join(",");
const values = subBatch.flatMap(d => [
d.order_id,
d.pid,
d.discount || 0
]);
await localConnection.query(`
INSERT INTO temp_order_discounts (order_id, pid, discount)
VALUES ${placeholders}
ON CONFLICT (order_id, pid) DO UPDATE SET
discount = EXCLUDED.discount
`, values);
await localConnection.query(`
INSERT INTO temp_order_discounts (order_id, pid, discount)
VALUES ${placeholders}
ON CONFLICT (order_id, pid) DO UPDATE SET
discount = EXCLUDED.discount
`, values);
}
await localConnection.commit();
} catch (error) {
await localConnection.rollback();
throw error;
}
};
@@ -318,26 +352,33 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
if (taxes.length === 0) return;
for (let j = 0; j < taxes.length; j += PG_BATCH_SIZE) {
const subBatch = taxes.slice(j, j + PG_BATCH_SIZE);
if (subBatch.length === 0) continue;
await localConnection.beginTransaction();
try {
for (let j = 0; j < taxes.length; j += PG_BATCH_SIZE) {
const subBatch = taxes.slice(j, j + PG_BATCH_SIZE);
if (subBatch.length === 0) continue;
const placeholders = subBatch.map((_, idx) =>
`($${idx * 3 + 1}, $${idx * 3 + 2}, $${idx * 3 + 3})`
).join(",");
const values = subBatch.flatMap(t => [
t.order_id,
t.pid,
t.tax || 0
]);
const placeholders = subBatch.map((_, idx) =>
`($${idx * 3 + 1}, $${idx * 3 + 2}, $${idx * 3 + 3})`
).join(",");
const values = subBatch.flatMap(t => [
t.order_id,
t.pid,
t.tax || 0
]);
await localConnection.query(`
INSERT INTO temp_order_taxes (order_id, pid, tax)
VALUES ${placeholders}
ON CONFLICT (order_id, pid) DO UPDATE SET
tax = EXCLUDED.tax
`, values);
await localConnection.query(`
INSERT INTO temp_order_taxes (order_id, pid, tax)
VALUES ${placeholders}
ON CONFLICT (order_id, pid) DO UPDATE SET
tax = EXCLUDED.tax
`, values);
}
await localConnection.commit();
} catch (error) {
await localConnection.rollback();
throw error;
}
};
@@ -363,39 +404,45 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
if (costs.length === 0) return;
for (let j = 0; j < costs.length; j += PG_BATCH_SIZE) {
const subBatch = costs.slice(j, j + PG_BATCH_SIZE);
if (subBatch.length === 0) continue;
await localConnection.beginTransaction();
try {
for (let j = 0; j < costs.length; j += PG_BATCH_SIZE) {
const subBatch = costs.slice(j, j + PG_BATCH_SIZE);
if (subBatch.length === 0) continue;
const placeholders = subBatch.map((_, idx) =>
`($${idx * 3 + 1}, $${idx * 3 + 2}, $${idx * 3 + 3})`
).join(",");
const values = subBatch.flatMap(c => [
c.order_id,
c.pid,
c.costeach || 0
]);
const placeholders = subBatch.map((_, idx) =>
`($${idx * 3 + 1}, $${idx * 3 + 2}, $${idx * 3 + 3})`
).join(",");
const values = subBatch.flatMap(c => [
c.order_id,
c.pid,
c.costeach || 0
]);
await localConnection.query(`
INSERT INTO temp_order_costs (order_id, pid, costeach)
VALUES ${placeholders}
ON CONFLICT (order_id, pid) DO UPDATE SET
costeach = EXCLUDED.costeach
`, values);
await localConnection.query(`
INSERT INTO temp_order_costs (order_id, pid, costeach)
VALUES ${placeholders}
ON CONFLICT (order_id, pid) DO UPDATE SET
costeach = EXCLUDED.costeach
`, values);
}
await localConnection.commit();
} catch (error) {
await localConnection.rollback();
throw error;
}
};
// Process all data types in parallel for each batch
// Process all data types SEQUENTIALLY for each batch - not in parallel
for (let i = 0; i < orderIds.length; i += METADATA_BATCH_SIZE) {
const batchIds = orderIds.slice(i, i + METADATA_BATCH_SIZE);
await Promise.all([
processMetadataBatch(batchIds),
processDiscountsBatch(batchIds),
processTaxesBatch(batchIds),
processCostsBatch(batchIds)
]);
// Run these sequentially instead of in parallel to avoid transaction conflicts
await processMetadataBatch(batchIds);
await processDiscountsBatch(batchIds);
await processTaxesBatch(batchIds);
await processCostsBatch(batchIds);
processedCount = i + batchIds.length;
outputProgress({
@@ -422,175 +469,194 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
const existingPids = new Set(existingProducts.rows.map(p => p.pid));
// Process in smaller batches
for (let i = 0; i < orderIds.length; i += 1000) {
const batchIds = orderIds.slice(i, i + 1000);
for (let i = 0; i < orderIds.length; i += 2000) { // Increased from 1000 to 2000
const batchIds = orderIds.slice(i, i + 2000);
// Get combined data for this batch in sub-batches
const PG_BATCH_SIZE = 100; // Process 100 records at a time
const PG_BATCH_SIZE = 200; // Increased from 100 to 200
for (let j = 0; j < batchIds.length; j += PG_BATCH_SIZE) {
const subBatchIds = batchIds.slice(j, j + PG_BATCH_SIZE);
const [orders] = await localConnection.query(`
WITH order_totals AS (
SELECT
oi.order_id,
oi.pid,
SUM(COALESCE(od.discount, 0)) as promo_discount,
COALESCE(ot.tax, 0) as total_tax,
COALESCE(oc.costeach, oi.price * 0.5) as costeach
FROM temp_order_items oi
LEFT JOIN temp_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid
LEFT JOIN temp_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid
LEFT JOIN temp_order_costs oc ON oi.order_id = oc.order_id AND oi.pid = oc.pid
GROUP BY oi.order_id, oi.pid, ot.tax, oc.costeach
)
SELECT
oi.order_id as order_number,
oi.pid::bigint as pid,
oi.sku,
om.date,
oi.price,
oi.quantity,
(oi.base_discount +
COALESCE(ot.promo_discount, 0) +
CASE
WHEN om.summary_discount > 0 AND om.summary_subtotal > 0 THEN
ROUND((om.summary_discount * (oi.price * oi.quantity)) / NULLIF(om.summary_subtotal, 0), 2)
ELSE 0
END)::NUMERIC(14, 4) as discount,
COALESCE(ot.total_tax, 0)::NUMERIC(14, 4) as tax,
false as tax_included,
0 as shipping,
om.customer,
om.customer_name,
om.status,
om.canceled,
COALESCE(ot.costeach, oi.price * 0.5)::NUMERIC(14, 4) as costeach
FROM (
SELECT DISTINCT ON (order_id, pid)
order_id, pid, sku, price, quantity, base_discount
FROM temp_order_items
WHERE order_id = ANY($1)
ORDER BY order_id, pid
) oi
JOIN temp_order_meta om ON oi.order_id = om.order_id
LEFT JOIN order_totals ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid
ORDER BY oi.order_id, oi.pid
`, [subBatchIds]);
// Filter orders and track missing products
const validOrders = [];
const processedOrderItems = new Set();
const processedOrders = new Set();
for (const order of orders.rows) {
if (!existingPids.has(order.pid)) {
missingProducts.add(order.pid);
skippedOrders.add(order.order_number);
continue;
}
validOrders.push(order);
processedOrderItems.add(`${order.order_number}-${order.pid}`);
processedOrders.add(order.order_number);
}
// Process valid orders in smaller sub-batches
const FINAL_BATCH_SIZE = 50;
for (let k = 0; k < validOrders.length; k += FINAL_BATCH_SIZE) {
const subBatch = validOrders.slice(k, k + FINAL_BATCH_SIZE);
const placeholders = subBatch.map((_, idx) => {
const base = idx * 15; // 15 columns including costeach
return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, $${base + 10}, $${base + 11}, $${base + 12}, $${base + 13}, $${base + 14}, $${base + 15})`;
}).join(',');
const batchValues = subBatch.flatMap(o => [
o.order_number,
o.pid,
o.sku || 'NO-SKU',
o.date, // This is now a TIMESTAMP WITH TIME ZONE
o.price,
o.quantity,
o.discount,
o.tax,
o.tax_included,
o.shipping,
o.customer,
o.customer_name,
o.status.toString(), // Convert status to TEXT
o.canceled,
o.costeach
]);
const [result] = await localConnection.query(`
WITH inserted_orders AS (
INSERT INTO orders (
order_number, pid, sku, date, price, quantity, discount,
tax, tax_included, shipping, customer, customer_name,
status, canceled, costeach
)
VALUES ${placeholders}
ON CONFLICT (order_number, pid) DO UPDATE SET
sku = EXCLUDED.sku,
date = EXCLUDED.date,
price = EXCLUDED.price,
quantity = EXCLUDED.quantity,
discount = EXCLUDED.discount,
tax = EXCLUDED.tax,
tax_included = EXCLUDED.tax_included,
shipping = EXCLUDED.shipping,
customer = EXCLUDED.customer,
customer_name = EXCLUDED.customer_name,
status = EXCLUDED.status,
canceled = EXCLUDED.canceled,
costeach = EXCLUDED.costeach
RETURNING xmax = 0 as inserted
// Start a transaction for this sub-batch
await localConnection.beginTransaction();
try {
const [orders] = await localConnection.query(`
WITH order_totals AS (
SELECT
oi.order_id,
oi.pid,
SUM(COALESCE(od.discount, 0)) as promo_discount,
COALESCE(ot.tax, 0) as total_tax,
COALESCE(oc.costeach, oi.price * 0.5) as costeach
FROM temp_order_items oi
LEFT JOIN temp_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid
LEFT JOIN temp_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid
LEFT JOIN temp_order_costs oc ON oi.order_id = oc.order_id AND oi.pid = oc.pid
WHERE oi.order_id = ANY($1)
GROUP BY oi.order_id, oi.pid, ot.tax, oc.costeach
)
SELECT
COUNT(*) FILTER (WHERE inserted) as inserted,
COUNT(*) FILTER (WHERE NOT inserted) as updated
FROM inserted_orders
`, batchValues);
const { inserted, updated } = result.rows[0];
recordsAdded += parseInt(inserted) || 0;
recordsUpdated += parseInt(updated) || 0;
importedCount += subBatch.length;
}
oi.order_id as order_number,
oi.pid::bigint as pid,
oi.sku,
om.date,
oi.price,
oi.quantity,
(
-- Part 1: Sale Savings for the Line
(oi.base_discount * oi.quantity)
+
-- Part 2: Prorated Points Discount (if applicable)
CASE
WHEN om.summary_discount_subtotal > 0 AND om.summary_subtotal > 0 THEN
COALESCE(ROUND((om.summary_discount_subtotal * (oi.price * oi.quantity)) / NULLIF(om.summary_subtotal, 0), 4), 0)
ELSE 0
END
+
-- Part 3: Specific Promo Code Discount (if applicable)
COALESCE(ot.promo_discount, 0)
)::NUMERIC(14, 4) as discount,
COALESCE(ot.total_tax, 0)::NUMERIC(14, 4) as tax,
false as tax_included,
0 as shipping,
om.customer,
om.customer_name,
om.status,
om.canceled,
COALESCE(ot.costeach, oi.price * 0.5)::NUMERIC(14, 4) as costeach
FROM temp_order_items oi
JOIN temp_order_meta om ON oi.order_id = om.order_id
LEFT JOIN order_totals ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid
WHERE oi.order_id = ANY($1)
ORDER BY oi.order_id, oi.pid
`, [subBatchIds]);
cumulativeProcessedOrders += processedOrders.size;
outputProgress({
status: "running",
operation: "Orders import",
message: `Importing orders: ${cumulativeProcessedOrders} of ${totalUniqueOrders}`,
current: cumulativeProcessedOrders,
total: totalUniqueOrders,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, cumulativeProcessedOrders, totalUniqueOrders),
rate: calculateRate(startTime, cumulativeProcessedOrders)
});
// Filter orders and track missing products
const validOrders = [];
const processedOrderItems = new Set();
const processedOrders = new Set();
for (const order of orders.rows) {
if (!existingPids.has(order.pid)) {
missingProducts.add(order.pid);
skippedOrders.add(order.order_number);
continue;
}
validOrders.push(order);
processedOrderItems.add(`${order.order_number}-${order.pid}`);
processedOrders.add(order.order_number);
}
// Process valid orders in smaller sub-batches
const FINAL_BATCH_SIZE = 100; // Increased from 50 to 100
for (let k = 0; k < validOrders.length; k += FINAL_BATCH_SIZE) {
const subBatch = validOrders.slice(k, k + FINAL_BATCH_SIZE);
const placeholders = subBatch.map((_, idx) => {
const base = idx * 15; // 15 columns including costeach
return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, $${base + 10}, $${base + 11}, $${base + 12}, $${base + 13}, $${base + 14}, $${base + 15})`;
}).join(',');
const batchValues = subBatch.flatMap(o => [
o.order_number,
o.pid,
o.sku || 'NO-SKU',
o.date, // This is now a TIMESTAMP WITH TIME ZONE
o.price,
o.quantity,
o.discount,
o.tax,
o.tax_included,
o.shipping,
o.customer,
o.customer_name,
o.status.toString(), // Convert status to TEXT
o.canceled,
o.costeach
]);
const [result] = await localConnection.query(`
WITH inserted_orders AS (
INSERT INTO orders (
order_number, pid, sku, date, price, quantity, discount,
tax, tax_included, shipping, customer, customer_name,
status, canceled, costeach
)
VALUES ${placeholders}
ON CONFLICT (order_number, pid) DO UPDATE SET
sku = EXCLUDED.sku,
date = EXCLUDED.date,
price = EXCLUDED.price,
quantity = EXCLUDED.quantity,
discount = EXCLUDED.discount,
tax = EXCLUDED.tax,
tax_included = EXCLUDED.tax_included,
shipping = EXCLUDED.shipping,
customer = EXCLUDED.customer,
customer_name = EXCLUDED.customer_name,
status = EXCLUDED.status,
canceled = EXCLUDED.canceled,
costeach = EXCLUDED.costeach
RETURNING xmax = 0 as inserted
)
SELECT
COUNT(*) FILTER (WHERE inserted) as inserted,
COUNT(*) FILTER (WHERE NOT inserted) as updated
FROM inserted_orders
`, batchValues);
const { inserted, updated } = result.rows[0];
recordsAdded += parseInt(inserted) || 0;
recordsUpdated += parseInt(updated) || 0;
importedCount += subBatch.length;
}
await localConnection.commit();
cumulativeProcessedOrders += processedOrders.size;
outputProgress({
status: "running",
operation: "Orders import",
message: `Importing orders: ${cumulativeProcessedOrders} of ${totalUniqueOrders}`,
current: cumulativeProcessedOrders,
total: totalUniqueOrders,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, cumulativeProcessedOrders, totalUniqueOrders),
rate: calculateRate(startTime, cumulativeProcessedOrders)
});
} catch (error) {
await localConnection.rollback();
throw error;
}
}
}
// Update sync status
await localConnection.query(`
INSERT INTO sync_status (table_name, last_sync_timestamp)
VALUES ('orders', NOW())
ON CONFLICT (table_name) DO UPDATE SET
last_sync_timestamp = NOW()
`);
// Cleanup temporary tables
await localConnection.query(`
DROP TABLE IF EXISTS temp_order_items;
DROP TABLE IF EXISTS temp_order_meta;
DROP TABLE IF EXISTS temp_order_discounts;
DROP TABLE IF EXISTS temp_order_taxes;
DROP TABLE IF EXISTS temp_order_costs;
`);
// Commit transaction
await localConnection.commit();
// Start a transaction for updating sync status and dropping temp tables
await localConnection.beginTransaction();
try {
// Update sync status
await localConnection.query(`
INSERT INTO sync_status (table_name, last_sync_timestamp)
VALUES ('orders', NOW())
ON CONFLICT (table_name) DO UPDATE SET
last_sync_timestamp = NOW()
`);
// Cleanup temporary tables
await localConnection.query(`
DROP TABLE IF EXISTS temp_order_items;
DROP TABLE IF EXISTS temp_order_meta;
DROP TABLE IF EXISTS temp_order_discounts;
DROP TABLE IF EXISTS temp_order_taxes;
DROP TABLE IF EXISTS temp_order_costs;
`);
// Commit final transaction
await localConnection.commit();
} catch (error) {
await localConnection.rollback();
throw error;
}
return {
status: "complete",
@@ -604,16 +670,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
};
} catch (error) {
console.error("Error during orders import:", error);
// Rollback transaction
try {
await localConnection.rollback();
} catch (rollbackError) {
console.error("Error during rollback:", rollbackError);
}
throw error;
}
}
module.exports = importOrders;
module.exports = importOrders;

View File

@@ -8,29 +8,7 @@ dotenv.config({ path: path.join(__dirname, "../../.env") });
// Utility functions
const imageUrlBase = process.env.PRODUCT_IMAGE_URL_BASE || 'https://sbing.com/i/products/0000/';
// Modified to accept a db connection for querying product_images
const getImageUrls = async (pid, prodConnection, iid = null) => {
// If iid isn't provided, try to get it from product_images
if (iid === null && prodConnection) {
try {
// Query for images with order=255 (default/primary images)
const [primaryImages] = await prodConnection.query(
'SELECT iid FROM product_images WHERE pid = ? AND `order` = 255 LIMIT 1',
[pid]
);
// Use the found iid or default to 1
iid = primaryImages.length > 0 ? primaryImages[0].iid : 1;
} catch (error) {
console.error(`Error fetching primary image for pid ${pid}:`, error);
iid = 1; // Fallback to default
}
} else {
// Use default if connection not provided
iid = iid || 1;
}
const getImageUrls = (pid, iid = 1) => {
const paddedPid = pid.toString().padStart(6, '0');
// Use padded PID only for the first 3 digits
const prefix = paddedPid.slice(0, 3);
@@ -120,6 +98,7 @@ async function setupTemporaryTables(connection) {
baskets INTEGER,
notifies INTEGER,
date_last_sold TIMESTAMP WITH TIME ZONE,
primary_iid INTEGER,
image TEXT,
image_175 TEXT,
image_full TEXT,
@@ -217,6 +196,7 @@ async function importMissingProducts(prodConnection, localConnection, missingPid
(SELECT COUNT(*) FROM product_notify pn WHERE pn.pid = p.pid) AS notifies,
(SELECT COALESCE(SUM(oi.qty_ordered), 0) FROM order_items oi WHERE oi.prod_pid = p.pid) AS total_sold,
pls.date_sold as date_last_sold,
(SELECT iid FROM product_images WHERE pid = p.pid AND \`order\` = 255 LIMIT 1) AS primary_iid,
GROUP_CONCAT(DISTINCT CASE
WHEN pc.cat_id IS NOT NULL
AND pc.type IN (10, 20, 11, 21, 12, 13)
@@ -255,15 +235,13 @@ async function importMissingProducts(prodConnection, localConnection, missingPid
const batch = prodData.slice(i, i + BATCH_SIZE);
const placeholders = batch.map((_, idx) => {
const base = idx * 47; // 47 columns
return `(${Array.from({ length: 47 }, (_, i) => `$${base + i + 1}`).join(', ')})`;
const base = idx * 48; // 48 columns
return `(${Array.from({ length: 48 }, (_, i) => `$${base + i + 1}`).join(', ')})`;
}).join(',');
// Process image URLs for the batch
const processedValues = [];
for (const row of batch) {
const imageUrls = await getImageUrls(row.pid, prodConnection);
processedValues.push([
const values = batch.flatMap(row => {
const imageUrls = getImageUrls(row.pid, row.primary_iid || 1);
return [
row.pid,
row.title,
row.description,
@@ -306,15 +284,14 @@ async function importMissingProducts(prodConnection, localConnection, missingPid
row.baskets,
row.notifies,
validateDate(row.date_last_sold),
row.primary_iid,
imageUrls.image,
imageUrls.image_175,
imageUrls.image_full,
null,
null
]);
}
const values = processedValues.flat();
];
});
const [result] = await localConnection.query(`
WITH inserted_products AS (
@@ -325,7 +302,7 @@ async function importMissingProducts(prodConnection, localConnection, missingPid
landing_cost_price, barcode, harmonized_tariff_code, updated_at, visible,
managing_stock, replenishable, permalink, moq, uom, rating, reviews,
weight, length, width, height, country_of_origin, location, total_sold,
baskets, notifies, date_last_sold, image, image_175, image_full, options, tags
baskets, notifies, date_last_sold, primary_iid, image, image_175, image_full, options, tags
)
VALUES ${placeholders}
ON CONFLICT (pid) DO NOTHING
@@ -422,6 +399,7 @@ async function materializeCalculations(prodConnection, localConnection, incremen
(SELECT COUNT(*) FROM product_notify pn WHERE pn.pid = p.pid) AS notifies,
(SELECT COALESCE(SUM(oi.qty_ordered), 0) FROM order_items oi WHERE oi.prod_pid = p.pid) AS total_sold,
pls.date_sold as date_last_sold,
(SELECT iid FROM product_images WHERE pid = p.pid AND \`order\` = 255 LIMIT 1) AS primary_iid,
GROUP_CONCAT(DISTINCT CASE
WHEN pc.cat_id IS NOT NULL
AND pc.type IN (10, 20, 11, 21, 12, 13)
@@ -448,9 +426,11 @@ async function materializeCalculations(prodConnection, localConnection, incremen
pcp.date_deactive > ? OR
pcp.date_active > ? OR
pnb.date_updated > ?
-- Add condition for product_images changes if needed for incremental updates
-- OR EXISTS (SELECT 1 FROM product_images pi WHERE pi.pid = p.pid AND pi.stamp > ?)
` : 'TRUE'}
GROUP BY p.pid
`, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []);
`, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime /*, lastSyncTime */] : []);
outputProgress({
status: "running",
@@ -464,15 +444,13 @@ async function materializeCalculations(prodConnection, localConnection, incremen
await withRetry(async () => {
const placeholders = batch.map((_, idx) => {
const base = idx * 47; // 47 columns
return `(${Array.from({ length: 47 }, (_, i) => `$${base + i + 1}`).join(', ')})`;
const base = idx * 48; // 48 columns
return `(${Array.from({ length: 48 }, (_, i) => `$${base + i + 1}`).join(', ')})`;
}).join(',');
// Process image URLs for the batch
const processedValues = [];
for (const row of batch) {
const imageUrls = await getImageUrls(row.pid, prodConnection);
processedValues.push([
const values = batch.flatMap(row => {
const imageUrls = getImageUrls(row.pid, row.primary_iid || 1);
return [
row.pid,
row.title,
row.description,
@@ -515,15 +493,14 @@ async function materializeCalculations(prodConnection, localConnection, incremen
row.baskets,
row.notifies,
validateDate(row.date_last_sold),
row.primary_iid,
imageUrls.image,
imageUrls.image_175,
imageUrls.image_full,
null,
null
]);
}
const values = processedValues.flat();
];
});
await localConnection.query(`
INSERT INTO temp_products (
@@ -533,7 +510,7 @@ async function materializeCalculations(prodConnection, localConnection, incremen
landing_cost_price, barcode, harmonized_tariff_code, updated_at, visible,
managing_stock, replenishable, permalink, moq, uom, rating, reviews,
weight, length, width, height, country_of_origin, location, total_sold,
baskets, notifies, date_last_sold, image, image_175, image_full, options, tags
baskets, notifies, date_last_sold, primary_iid, image, image_175, image_full, options, tags
) VALUES ${placeholders}
ON CONFLICT (pid) DO UPDATE SET
title = EXCLUDED.title,
@@ -576,6 +553,7 @@ async function materializeCalculations(prodConnection, localConnection, incremen
baskets = EXCLUDED.baskets,
notifies = EXCLUDED.notifies,
date_last_sold = EXCLUDED.date_last_sold,
primary_iid = EXCLUDED.primary_iid,
image = EXCLUDED.image,
image_175 = EXCLUDED.image_175,
image_full = EXCLUDED.image_full,
@@ -674,6 +652,7 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
t.baskets,
t.notifies,
t.date_last_sold,
t.primary_iid,
t.image,
t.image_175,
t.image_full,
@@ -695,11 +674,9 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
return `(${Array.from({ length: 47 }, (_, i) => `$${base + i + 1}`).join(', ')})`;
}).join(',');
// Process image URLs for the batch
const processedValues = [];
for (const row of batch) {
const imageUrls = await getImageUrls(row.pid, prodConnection);
processedValues.push([
const values = batch.flatMap(row => {
const imageUrls = getImageUrls(row.pid, row.primary_iid || 1);
return [
row.pid,
row.title,
row.description,
@@ -747,10 +724,8 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
imageUrls.image_full,
row.options,
row.tags
]);
}
const values = processedValues.flat();
];
});
const [result] = await localConnection.query(`
WITH upserted AS (

View File

@@ -1,426 +0,0 @@
const path = require('path');
const fs = require('fs');
const progress = require('../utils/progress'); // Assuming progress utils are here
const { getConnection, closePool } = require('../utils/db'); // Assuming db utils are here
const os = require('os'); // For detecting number of CPU cores
// --- Configuration ---
const BATCH_SIZE_DAYS = 1; // Process 1 day per database function call
const SQL_FUNCTION_FILE = path.resolve(__dirname, 'backfill_historical_snapshots.sql'); // Correct path
const LOG_PROGRESS_INTERVAL_MS = 5000; // Update console progress roughly every 5 seconds
const HISTORY_TYPE = 'backfill_snapshots'; // Identifier for history table
const MAX_WORKERS = Math.max(1, Math.floor(os.cpus().length / 2)); // Use half of available CPU cores
const USE_PARALLEL = false; // Set to true to enable parallel processing
const PG_STATEMENT_TIMEOUT_MS = 1800000; // 30 minutes max per query
// --- Cancellation Handling ---
let isCancelled = false;
let runningQueryPromise = null; // To potentially track the active query
function requestCancellation() {
if (!isCancelled) {
isCancelled = true;
console.warn('\nCancellation requested. Finishing current batch then stopping...');
// Note: We are NOT forcefully cancelling the backend query anymore.
}
}
process.on('SIGINT', requestCancellation); // Handle Ctrl+C
process.on('SIGTERM', requestCancellation); // Handle termination signals
// --- Main Backfill Function ---
async function backfillSnapshots(cmdStartDate, cmdEndDate, cmdStartBatch = 1) {
let connection;
const overallStartTime = Date.now();
let calculateHistoryId = null;
let processedDaysTotal = 0; // Track total days processed across all batches executed in this run
let currentBatchNum = cmdStartBatch > 0 ? cmdStartBatch : 1;
let totalBatches = 0; // Initialize totalBatches
let totalDays = 0; // Initialize totalDays
console.log(`Starting snapshot backfill process...`);
console.log(`SQL Function definition file: ${SQL_FUNCTION_FILE}`);
if (!fs.existsSync(SQL_FUNCTION_FILE)) {
console.error(`FATAL: SQL file not found at ${SQL_FUNCTION_FILE}`);
process.exit(1); // Exit early if file doesn't exist
}
try {
// Set up a connection with higher memory limits
connection = await getConnection({
// Add performance-related settings
application_name: 'backfill_snapshots',
statement_timeout: PG_STATEMENT_TIMEOUT_MS, // 30 min timeout per statement
// These parameters may need to be configured in your database:
// work_mem: '1GB',
// maintenance_work_mem: '2GB',
// temp_buffers: '1GB',
});
console.log('Database connection acquired.');
// --- Ensure Function Exists ---
console.log('Ensuring database function is up-to-date...');
try {
const sqlFunctionDef = fs.readFileSync(SQL_FUNCTION_FILE, 'utf8');
if (!sqlFunctionDef.includes('CREATE OR REPLACE FUNCTION backfill_daily_snapshots_range_final')) {
throw new Error(`SQL file ${SQL_FUNCTION_FILE} does not seem to contain the function definition.`);
}
await connection.query(sqlFunctionDef); // Execute the whole file
console.log('Database function `backfill_daily_snapshots_range_final` created/updated.');
// Add performance query hints to the database
await connection.query(`
-- Analyze tables for better query planning
ANALYZE public.products;
ANALYZE public.imported_daily_inventory;
ANALYZE public.imported_product_stat_history;
ANALYZE public.daily_product_snapshots;
ANALYZE public.imported_product_current_prices;
`).catch(err => {
// Non-fatal if analyze fails
console.warn('Failed to analyze tables (non-fatal):', err.message);
});
} catch (err) {
console.error(`Error processing SQL function file ${SQL_FUNCTION_FILE}:`, err);
throw new Error(`Failed to create or replace DB function: ${err.message}`);
}
// --- Prepare History Record ---
console.log('Preparing calculation history record...');
// Ensure history table exists (optional, could be done elsewhere)
await connection.query(`
CREATE TABLE IF NOT EXISTS public.calculate_history (
id SERIAL PRIMARY KEY,
start_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
end_time TIMESTAMPTZ,
duration_seconds INTEGER,
status VARCHAR(20) NOT NULL, -- e.g., 'running', 'completed', 'failed', 'cancelled'
error_message TEXT,
additional_info JSONB -- Store type, file, batch info etc.
);
`);
// Mark previous runs of this type as potentially failed if they were left 'running'
await connection.query(`
UPDATE public.calculate_history
SET status = 'failed', error_message = 'Interrupted by new run.'
WHERE status = 'running' AND additional_info->>'type' = $1;
`, [HISTORY_TYPE]);
// Create new history record
const historyResult = await connection.query(`
INSERT INTO public.calculate_history (start_time, status, additional_info)
VALUES (NOW(), 'running', jsonb_build_object('type', $1::text, 'sql_file', $2::text, 'start_batch', $3::integer))
RETURNING id;
`, [HISTORY_TYPE, path.basename(SQL_FUNCTION_FILE), cmdStartBatch]);
calculateHistoryId = historyResult.rows[0].id;
console.log(`Calculation history record created with ID: ${calculateHistoryId}`);
// --- Determine Date Range ---
console.log('Determining date range...');
let effectiveStartDate, effectiveEndDate;
// Use command-line dates if provided, otherwise query DB
if (cmdStartDate) {
effectiveStartDate = cmdStartDate;
} else {
const minDateResult = await connection.query(`
SELECT LEAST(
COALESCE((SELECT MIN(date) FROM public.imported_daily_inventory WHERE date > '1970-01-01'), CURRENT_DATE),
COALESCE((SELECT MIN(date) FROM public.imported_product_stat_history WHERE date > '1970-01-01'), CURRENT_DATE)
)::date as min_date;
`);
effectiveStartDate = minDateResult.rows[0]?.min_date || new Date().toISOString().split('T')[0]; // Fallback
console.log(`Auto-detected start date: ${effectiveStartDate}`);
}
if (cmdEndDate) {
effectiveEndDate = cmdEndDate;
} else {
const maxDateResult = await connection.query(`
SELECT GREATEST(
COALESCE((SELECT MAX(date) FROM public.imported_daily_inventory WHERE date < CURRENT_DATE), '1970-01-01'::date),
COALESCE((SELECT MAX(date) FROM public.imported_product_stat_history WHERE date < CURRENT_DATE), '1970-01-01'::date)
)::date as max_date;
`);
// Ensure end date is not today or in the future
effectiveEndDate = maxDateResult.rows[0]?.max_date || new Date(Date.now() - 86400000).toISOString().split('T')[0]; // Default yesterday
if (new Date(effectiveEndDate) >= new Date(new Date().toISOString().split('T')[0])) {
effectiveEndDate = new Date(Date.now() - 86400000).toISOString().split('T')[0]; // Set to yesterday if >= today
}
console.log(`Auto-detected end date: ${effectiveEndDate}`);
}
// Validate dates
const dStart = new Date(effectiveStartDate);
const dEnd = new Date(effectiveEndDate);
if (isNaN(dStart.getTime()) || isNaN(dEnd.getTime()) || dStart > dEnd) {
throw new Error(`Invalid date range: Start "${effectiveStartDate}", End "${effectiveEndDate}"`);
}
// --- Batch Processing ---
totalDays = Math.ceil((dEnd - dStart) / (1000 * 60 * 60 * 24)) + 1; // Inclusive
totalBatches = Math.ceil(totalDays / BATCH_SIZE_DAYS);
console.log(`Target Date Range: ${effectiveStartDate} to ${effectiveEndDate} (${totalDays} days)`);
console.log(`Total Batches: ${totalBatches} (Batch Size: ${BATCH_SIZE_DAYS} days)`);
console.log(`Starting from Batch: ${currentBatchNum}`);
// Initial progress update
progress.outputProgress({
status: 'running',
operation: 'Starting Batch Processing',
currentBatch: currentBatchNum,
totalBatches: totalBatches,
totalDays: totalDays,
elapsed: '0s',
remaining: 'Calculating...',
rate: 0,
historyId: calculateHistoryId // Include history ID in the object
});
while (currentBatchNum <= totalBatches && !isCancelled) {
const batchOffset = (currentBatchNum - 1) * BATCH_SIZE_DAYS;
const batchStartDate = new Date(dStart);
batchStartDate.setDate(dStart.getDate() + batchOffset);
const batchEndDate = new Date(batchStartDate);
batchEndDate.setDate(batchStartDate.getDate() + BATCH_SIZE_DAYS - 1);
// Clamp batch end date to the overall effective end date
if (batchEndDate > dEnd) {
batchEndDate.setTime(dEnd.getTime());
}
const batchStartDateStr = batchStartDate.toISOString().split('T')[0];
const batchEndDateStr = batchEndDate.toISOString().split('T')[0];
const batchStartTime = Date.now();
console.log(`\n--- Processing Batch ${currentBatchNum} / ${totalBatches} ---`);
console.log(` Dates: ${batchStartDateStr} to ${batchEndDateStr}`);
// Execute the function for the batch
try {
progress.outputProgress({
status: 'running',
operation: `Executing DB function for batch ${currentBatchNum}...`,
currentBatch: currentBatchNum,
totalBatches: totalBatches,
totalDays: totalDays,
elapsed: progress.formatElapsedTime(overallStartTime),
remaining: 'Executing...',
rate: 0,
historyId: calculateHistoryId
});
// Performance improvement: Add batch processing hint
await connection.query('SET LOCAL enable_parallel_append = on; SET LOCAL enable_parallel_hash = on; SET LOCAL max_parallel_workers_per_gather = 4;');
// Store promise in case we need to try and cancel (though not implemented forcefully)
runningQueryPromise = connection.query(
`SELECT backfill_daily_snapshots_range_final($1::date, $2::date);`,
[batchStartDateStr, batchEndDateStr]
);
await runningQueryPromise; // Wait for the function call to complete
runningQueryPromise = null; // Clear the promise
const batchDurationMs = Date.now() - batchStartTime;
const daysInThisBatch = Math.ceil((batchEndDate - batchStartDate) / (1000 * 60 * 60 * 24)) + 1;
processedDaysTotal += daysInThisBatch;
console.log(` Batch ${currentBatchNum} completed in ${progress.formatElapsedTime(batchStartTime)}.`);
// --- Update Progress & History ---
const overallElapsedSec = Math.round((Date.now() - overallStartTime) / 1000);
progress.outputProgress({
status: 'running',
operation: `Completed batch ${currentBatchNum}`,
currentBatch: currentBatchNum,
totalBatches: totalBatches,
totalDays: totalDays,
processedDays: processedDaysTotal,
elapsed: progress.formatElapsedTime(overallStartTime),
remaining: progress.estimateRemaining(overallStartTime, processedDaysTotal, totalDays),
rate: progress.calculateRate(overallStartTime, processedDaysTotal),
batchDuration: progress.formatElapsedTime(batchStartTime),
historyId: calculateHistoryId
});
// Save checkpoint in history
await connection.query(`
UPDATE public.calculate_history
SET additional_info = jsonb_set(additional_info, '{last_completed_batch}', $1::jsonb)
|| jsonb_build_object('last_processed_date', $2::text)
WHERE id = $3::integer;
`, [JSON.stringify(currentBatchNum), batchEndDateStr, calculateHistoryId]);
} catch (batchError) {
console.error(`\n--- ERROR in Batch ${currentBatchNum} (${batchStartDateStr} to ${batchEndDateStr}) ---`);
console.error(' Database Error:', batchError.message);
console.error(' DB Error Code:', batchError.code);
// Log detailed error to history and re-throw to stop the process
await connection.query(`
UPDATE public.calculate_history
SET status = 'failed',
end_time = NOW(),
duration_seconds = $1::integer,
error_message = $2::text,
additional_info = additional_info || jsonb_build_object('failed_batch', $3::integer, 'failed_date_range', $4::text)
WHERE id = $5::integer;
`, [
Math.round((Date.now() - overallStartTime) / 1000),
`Batch ${currentBatchNum} failed: ${batchError.message} (Code: ${batchError.code || 'N/A'})`,
currentBatchNum,
`${batchStartDateStr} to ${batchEndDateStr}`,
calculateHistoryId
]);
throw batchError; // Stop execution
}
currentBatchNum++;
// Optional delay between batches
// await new Promise(resolve => setTimeout(resolve, 500));
} // End while loop
// --- Final Outcome ---
const finalStatus = isCancelled ? 'cancelled' : 'completed';
const finalMessage = isCancelled ? `Calculation stopped after completing batch ${currentBatchNum - 1}.` : 'Historical snapshots backfill completed successfully.';
const finalDurationSec = Math.round((Date.now() - overallStartTime) / 1000);
console.log(`\n--- Backfill ${finalStatus.toUpperCase()} ---`);
console.log(finalMessage);
console.log(`Total duration: ${progress.formatElapsedTime(overallStartTime)}`);
// Update history record
await connection.query(`
UPDATE public.calculate_history SET status = $1::calculation_status, end_time = NOW(), duration_seconds = $2::integer, error_message = $3
WHERE id = $4::integer;
`, [finalStatus, finalDurationSec, (isCancelled ? 'User cancelled' : null), calculateHistoryId]);
if (!isCancelled) {
progress.clearProgress(); // Clear progress state only on successful completion
} else {
progress.outputProgress({ // Final cancelled status update
status: 'cancelled',
operation: finalMessage,
currentBatch: currentBatchNum - 1,
totalBatches: totalBatches,
totalDays: totalDays,
processedDays: processedDaysTotal,
elapsed: progress.formatElapsedTime(overallStartTime),
remaining: 'Cancelled',
rate: 0,
historyId: calculateHistoryId
});
}
return { success: true, status: finalStatus, message: finalMessage, duration: finalDurationSec };
} catch (error) {
console.error('\n--- Backfill encountered an unrecoverable error ---');
console.error(error.message);
const finalDurationSec = Math.round((Date.now() - overallStartTime) / 1000);
// Update history if possible
if (connection && calculateHistoryId) {
try {
await connection.query(`
UPDATE public.calculate_history
SET status = $1::calculation_status, end_time = NOW(), duration_seconds = $2::integer, error_message = $3::text
WHERE id = $4::integer;
`, [
isCancelled ? 'cancelled' : 'failed',
finalDurationSec,
error.message,
calculateHistoryId
]);
} catch (histError) {
console.error("Failed to update history record with error state:", histError);
}
} else {
console.error("Could not update history record (no ID or connection).");
}
// FIX: Use initialized value or a default if loop never started
const batchNumForError = currentBatchNum > cmdStartBatch ? currentBatchNum - 1 : cmdStartBatch - 1;
// Update progress.outputProgress call to match actual function signature
try {
// Create progress data object
const progressData = {
status: 'failed',
operation: 'Backfill failed',
message: error.message,
currentBatch: batchNumForError,
totalBatches: totalBatches,
totalDays: totalDays,
processedDays: processedDaysTotal,
elapsed: progress.formatElapsedTime(overallStartTime),
remaining: 'Failed',
rate: 0,
// Include history ID in progress data if needed
historyId: calculateHistoryId
};
// Call with single object parameter (not separate historyId)
progress.outputProgress(progressData);
} catch (progressError) {
console.error('Failed to report progress:', progressError);
}
return { success: false, status: 'failed', error: error.message, duration: finalDurationSec };
} finally {
if (connection) {
console.log('Releasing database connection.');
connection.release();
}
// Close pool only if this script is meant to be standalone
// If part of a larger app, the app should manage pool closure
// console.log('Closing database pool.');
// await closePool();
}
}
// --- Script Execution ---
// Parse command-line arguments
const args = process.argv.slice(2);
let cmdStartDateArg, cmdEndDateArg, cmdStartBatchArg = 1; // Default start batch is 1
for (let i = 0; i < args.length; i++) {
if (args[i] === '--start-date' && args[i+1]) cmdStartDateArg = args[++i];
else if (args[i] === '--end-date' && args[i+1]) cmdEndDateArg = args[++i];
else if (args[i] === '--start-batch' && args[i+1]) cmdStartBatchArg = parseInt(args[++i], 10);
}
if (isNaN(cmdStartBatchArg) || cmdStartBatchArg < 1) {
console.warn(`Invalid --start-batch value. Defaulting to 1.`);
cmdStartBatchArg = 1;
}
// Run the backfill process
backfillSnapshots(cmdStartDateArg, cmdEndDateArg, cmdStartBatchArg)
.then(result => {
if (result.success) {
console.log(`\n${result.message} (Duration: ${result.duration}s)`);
process.exitCode = 0; // Success
} else {
console.error(`\n❌ Backfill failed: ${result.error || 'Unknown error'} (Duration: ${result.duration}s)`);
process.exitCode = 1; // Failure
}
})
.catch(err => {
console.error('\n❌ Unexpected error during backfill execution:', err);
process.exitCode = 1; // Failure
})
.finally(async () => {
// Ensure pool is closed if run standalone
console.log('Backfill script finished. Closing pool.');
await closePool(); // Make sure closePool exists and works in your db utils
process.exit(process.exitCode); // Exit with appropriate code
});

View File

@@ -1,161 +0,0 @@
-- Description: Backfills the daily_product_snapshots table using imported historical unit data
-- (daily inventory/stats) and historical price data (current prices table).
-- - Uses imported daily sales/receipt UNIT counts for accuracy.
-- - ESTIMATES historical stock levels using a forward calculation.
-- - APPROXIMATES historical REVENUE using looked-up historical base prices.
-- - APPROXIMATES historical COGS, PROFIT, and STOCK VALUE using CURRENT product costs/prices.
-- Run ONCE after importing historical data and before initial product_metrics population.
-- Dependencies: Core import tables (products), imported history tables (imported_daily_inventory,
-- imported_product_stat_history, imported_product_current_prices),
-- daily_product_snapshots table must exist.
-- Frequency: Run ONCE.
CREATE OR REPLACE FUNCTION backfill_daily_snapshots_range_final(
_start_date DATE,
_end_date DATE
)
RETURNS VOID AS $$
DECLARE
_current_processing_date DATE := _start_date;
_batch_start_time TIMESTAMPTZ;
_row_count INTEGER;
BEGIN
RAISE NOTICE 'Starting FINAL historical snapshot backfill from % to %.', _start_date, _end_date;
RAISE NOTICE 'Using historical units and historical prices (for revenue approximation).';
RAISE NOTICE 'WARNING: Historical COGS, Profit, and Stock Value use CURRENT product costs/prices.';
-- Ensure end date is not in the future
IF _end_date >= CURRENT_DATE THEN
_end_date := CURRENT_DATE - INTERVAL '1 day';
RAISE NOTICE 'Adjusted end date to % to avoid conflict with hourly script.', _end_date;
END IF;
-- Performance: Create temporary table with product info to avoid repeated lookups
CREATE TEMP TABLE IF NOT EXISTS temp_product_info AS
SELECT
pid,
sku,
COALESCE(landing_cost_price, cost_price, 0.00) as effective_cost_price,
COALESCE(price, 0.00) as current_price,
COALESCE(regular_price, 0.00) as current_regular_price
FROM public.products;
-- Performance: Create index on temporary table
CREATE INDEX IF NOT EXISTS temp_product_info_pid_idx ON temp_product_info(pid);
ANALYZE temp_product_info;
RAISE NOTICE 'Created temporary product info table with % products', (SELECT COUNT(*) FROM temp_product_info);
WHILE _current_processing_date <= _end_date LOOP
_batch_start_time := clock_timestamp();
RAISE NOTICE 'Processing date: %', _current_processing_date;
-- Get Daily Transaction Unit Info from imported history
WITH DailyHistoryUnits AS (
SELECT
pids.pid,
-- Prioritize daily_inventory, fallback to product_stat_history for sold qty
COALESCE(di.amountsold, ps.qty_sold, 0)::integer as units_sold_today,
COALESCE(di.qtyreceived, 0)::integer as units_received_today
FROM
(SELECT DISTINCT pid FROM temp_product_info) pids -- Ensure all products are considered
LEFT JOIN public.imported_daily_inventory di
ON pids.pid = di.pid AND di.date = _current_processing_date
LEFT JOIN public.imported_product_stat_history ps
ON pids.pid = ps.pid AND ps.date = _current_processing_date
-- Removed WHERE clause to ensure snapshots are created even for days with 0 activity,
-- allowing stock carry-over. The main query will handle products properly.
),
HistoricalPrice AS (
-- Find the base price (qty_buy=1) active on the processing date
SELECT DISTINCT ON (pid)
pid,
price_each
FROM public.imported_product_current_prices
WHERE
qty_buy = 1
-- Use TIMESTAMPTZ comparison logic:
AND date_active <= (_current_processing_date + interval '1 day' - interval '1 second') -- Active sometime on or before end of processing day
AND (date_deactive IS NULL OR date_deactive > _current_processing_date) -- Not deactivated before start of processing day
-- Assuming 'active' flag isn't needed if dates are correct; add 'AND active != 0' if necessary
ORDER BY
pid, date_active DESC -- Get the most recently activated price
),
PreviousStock AS (
-- Get the estimated stock from the PREVIOUS day snapshot
SELECT pid, eod_stock_quantity
FROM public.daily_product_snapshots
WHERE snapshot_date = _current_processing_date - INTERVAL '1 day'
)
-- Insert into the daily snapshots table
INSERT INTO public.daily_product_snapshots (
snapshot_date, pid, sku,
eod_stock_quantity, eod_stock_cost, eod_stock_retail, eod_stock_gross, stockout_flag,
units_sold, units_returned,
gross_revenue, discounts, returns_revenue,
net_revenue, cogs, gross_regular_revenue, profit,
units_received, cost_received,
calculation_timestamp
)
SELECT
_current_processing_date AS snapshot_date,
p.pid,
p.sku,
-- Estimated EOD Stock (using historical daily units)
-- Handle potential NULL from joins with COALESCE 0
COALESCE(ps.eod_stock_quantity, 0) + COALESCE(dh.units_received_today, 0) - COALESCE(dh.units_sold_today, 0) AS estimated_eod_stock,
-- Valued Stock (using estimated stock and CURRENT prices/costs - APPROXIMATION)
GREATEST(0, COALESCE(ps.eod_stock_quantity, 0) + COALESCE(dh.units_received_today, 0) - COALESCE(dh.units_sold_today, 0)) * p.effective_cost_price AS eod_stock_cost,
GREATEST(0, COALESCE(ps.eod_stock_quantity, 0) + COALESCE(dh.units_received_today, 0) - COALESCE(dh.units_sold_today, 0)) * p.current_price AS eod_stock_retail, -- Stock retail uses current price
GREATEST(0, COALESCE(ps.eod_stock_quantity, 0) + COALESCE(dh.units_received_today, 0) - COALESCE(dh.units_sold_today, 0)) * p.current_regular_price AS eod_stock_gross, -- Stock gross uses current regular price
-- Stockout Flag (based on estimated stock)
(COALESCE(ps.eod_stock_quantity, 0) + COALESCE(dh.units_received_today, 0) - COALESCE(dh.units_sold_today, 0)) <= 0 AS stockout_flag,
-- Today's Unit Aggregates from History
COALESCE(dh.units_sold_today, 0) as units_sold,
0 AS units_returned, -- Placeholder: Cannot determine returns from daily summary
-- Monetary Values using looked-up Historical Price and CURRENT Cost/RegPrice
COALESCE(dh.units_sold_today, 0) * COALESCE(hp.price_each, p.current_price) AS gross_revenue, -- Approx Revenue
0 AS discounts, -- Placeholder
0 AS returns_revenue, -- Placeholder
COALESCE(dh.units_sold_today, 0) * COALESCE(hp.price_each, p.current_price) AS net_revenue, -- Approx Net Revenue
COALESCE(dh.units_sold_today, 0) * p.effective_cost_price AS cogs, -- Approx COGS (uses CURRENT cost)
COALESCE(dh.units_sold_today, 0) * p.current_regular_price AS gross_regular_revenue, -- Approx Gross Regular Revenue
-- Approx Profit
(COALESCE(dh.units_sold_today, 0) * COALESCE(hp.price_each, p.current_price)) - (COALESCE(dh.units_sold_today, 0) * p.effective_cost_price) AS profit,
COALESCE(dh.units_received_today, 0) as units_received,
-- Estimate received cost using CURRENT product cost
COALESCE(dh.units_received_today, 0) * p.effective_cost_price AS cost_received, -- Approx
clock_timestamp() -- Timestamp of this specific calculation
FROM temp_product_info p -- Use the temp table for better performance
LEFT JOIN PreviousStock ps ON p.pid = ps.pid
LEFT JOIN DailyHistoryUnits dh ON p.pid = dh.pid -- Join today's historical activity
LEFT JOIN HistoricalPrice hp ON p.pid = hp.pid -- Join the looked-up historical price
-- Optimization: Only process products with activity or previous stock
WHERE (dh.units_sold_today > 0 OR dh.units_received_today > 0 OR COALESCE(ps.eod_stock_quantity, 0) > 0)
ON CONFLICT (snapshot_date, pid) DO NOTHING; -- Avoid errors if rerunning parts, but prefer clean runs
GET DIAGNOSTICS _row_count = ROW_COUNT;
RAISE NOTICE 'Processed %: Inserted/Skipped % rows. Duration: %',
_current_processing_date,
_row_count,
clock_timestamp() - _batch_start_time;
_current_processing_date := _current_processing_date + INTERVAL '1 day';
END LOOP;
-- Clean up temporary tables
DROP TABLE IF EXISTS temp_product_info;
RAISE NOTICE 'Finished FINAL historical snapshot backfill.';
END;
$$ LANGUAGE plpgsql;
-- Example usage:
-- SELECT backfill_daily_snapshots_range_final('2023-01-01'::date, '2023-12-31'::date);

View File

@@ -1,4 +1,4 @@
-- Description: Calculates and updates daily aggregated product data for the current day.
-- Description: Calculates and updates daily aggregated product data for recent days.
-- Uses UPSERT (INSERT ON CONFLICT UPDATE) for idempotency.
-- Dependencies: Core import tables (products, orders, purchase_orders), calculate_status table.
-- Frequency: Hourly (Run ~5-10 minutes after hourly data import completes).
@@ -8,211 +8,243 @@ DECLARE
_module_name TEXT := 'daily_snapshots';
_start_time TIMESTAMPTZ := clock_timestamp(); -- Time execution started
_last_calc_time TIMESTAMPTZ;
_target_date DATE := CURRENT_DATE; -- Always recalculate today for simplicity with hourly runs
_target_date DATE; -- Will be set in the loop
_total_records INT := 0;
_has_orders BOOLEAN := FALSE;
_process_days INT := 5; -- Number of days to check/process (today plus previous 4 days)
_day_counter INT;
_missing_days INT[] := ARRAY[]::INT[]; -- Array to store days with missing or incomplete data
BEGIN
-- Get the timestamp before the last successful run of this module
SELECT last_calculation_timestamp INTO _last_calc_time
FROM public.calculate_status
WHERE module_name = _module_name;
RAISE NOTICE 'Running % for date %. Start Time: %', _module_name, _target_date, _start_time;
-- CRITICAL FIX: Check if we have any orders or receiving activity for today
-- to prevent creating artificial records when no real activity exists
SELECT EXISTS (
SELECT 1 FROM public.orders WHERE date::date = _target_date
UNION
SELECT 1 FROM public.purchase_orders
WHERE date::date = _target_date
OR EXISTS (
SELECT 1 FROM jsonb_array_elements(receiving_history) AS rh
WHERE jsonb_typeof(receiving_history) = 'array'
AND (
(rh->>'date')::date = _target_date OR
(rh->>'received_at')::date = _target_date OR
(rh->>'receipt_date')::date = _target_date
)
)
LIMIT 1
) INTO _has_orders;
RAISE NOTICE 'Running % script. Start Time: %', _module_name, _start_time;
-- If no orders or receiving activity found for today, log and exit
IF NOT _has_orders THEN
RAISE NOTICE 'No orders or receiving activity found for % - skipping daily snapshot creation', _target_date;
-- First, check which days need processing by comparing orders data with snapshot data
FOR _day_counter IN 0..(_process_days-1) LOOP
_target_date := CURRENT_DATE - (_day_counter * INTERVAL '1 day');
-- Still update the calculate_status to prevent repeated attempts
-- Check if this date needs updating by comparing orders to snapshot data
-- If the date has orders but not enough snapshots, or if snapshots show zero sales but orders exist, it's incomplete
SELECT
CASE WHEN (
-- We have orders for this date but not enough snapshots, or snapshots with wrong total
(EXISTS (SELECT 1 FROM public.orders WHERE date::date = _target_date) AND
(
-- No snapshots exist for this date
NOT EXISTS (SELECT 1 FROM public.daily_product_snapshots WHERE snapshot_date = _target_date) OR
-- Or snapshots show zero sales but orders exist
(SELECT COALESCE(SUM(units_sold), 0) FROM public.daily_product_snapshots WHERE snapshot_date = _target_date) = 0 OR
-- Or the count of snapshot records is significantly less than distinct products in orders
(SELECT COUNT(*) FROM public.daily_product_snapshots WHERE snapshot_date = _target_date) <
(SELECT COUNT(DISTINCT pid) FROM public.orders WHERE date::date = _target_date) * 0.8
)
)
) THEN TRUE ELSE FALSE END
INTO _has_orders;
IF _has_orders THEN
-- This day needs processing - add to our array
_missing_days := _missing_days || _day_counter;
RAISE NOTICE 'Day % needs updating (incomplete or missing data)', _target_date;
END IF;
END LOOP;
-- If no days need updating, exit early
IF array_length(_missing_days, 1) IS NULL THEN
RAISE NOTICE 'No days need updating - all snapshot data appears complete';
-- Still update the calculate_status to record this run
UPDATE public.calculate_status
SET last_calculation_timestamp = _start_time
WHERE module_name = _module_name;
RETURN; -- Exit without creating snapshots
RETURN;
END IF;
RAISE NOTICE 'Need to update % days with missing or incomplete data', array_length(_missing_days, 1);
-- IMPORTANT: First delete any existing data for this date to prevent duplication
DELETE FROM public.daily_product_snapshots
WHERE snapshot_date = _target_date;
-- Process only the days that need updating
FOREACH _day_counter IN ARRAY _missing_days LOOP
_target_date := CURRENT_DATE - (_day_counter * INTERVAL '1 day');
RAISE NOTICE 'Processing date: %', _target_date;
-- IMPORTANT: First delete any existing data for this date to prevent duplication
DELETE FROM public.daily_product_snapshots
WHERE snapshot_date = _target_date;
-- Proceed with calculating daily metrics only for products with actual activity
WITH SalesData AS (
SELECT
p.pid,
p.sku,
-- Track number of orders to ensure we have real data
COUNT(o.id) as order_count,
-- Aggregate Sales (Quantity > 0, Status not Canceled/Returned)
COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN o.quantity ELSE 0 END), 0) AS units_sold,
COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN o.price * o.quantity ELSE 0 END), 0.00) AS gross_revenue_unadjusted, -- Before discount
COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN o.discount ELSE 0 END), 0.00) AS discounts,
COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN COALESCE(o.costeach, p.landing_cost_price, p.cost_price) * o.quantity ELSE 0 END), 0.00) AS cogs,
COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN p.regular_price * o.quantity ELSE 0 END), 0.00) AS gross_regular_revenue, -- Use current regular price for simplicity here
-- Proceed with calculating daily metrics only for products with actual activity
WITH SalesData AS (
SELECT
p.pid,
p.sku,
-- Track number of orders to ensure we have real data
COUNT(o.id) as order_count,
-- Aggregate Sales (Quantity > 0, Status not Canceled/Returned)
COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN o.quantity ELSE 0 END), 0) AS units_sold,
COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN o.price * o.quantity ELSE 0 END), 0.00) AS gross_revenue_unadjusted, -- Before discount
COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN o.discount ELSE 0 END), 0.00) AS discounts,
COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN COALESCE(o.costeach, p.landing_cost_price, p.cost_price) * o.quantity ELSE 0 END), 0.00) AS cogs,
COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN p.regular_price * o.quantity ELSE 0 END), 0.00) AS gross_regular_revenue, -- Use current regular price for simplicity here
-- Aggregate Returns (Quantity < 0 or Status = Returned)
COALESCE(SUM(CASE WHEN o.quantity < 0 OR COALESCE(o.status, 'pending') = 'returned' THEN ABS(o.quantity) ELSE 0 END), 0) AS units_returned,
COALESCE(SUM(CASE WHEN o.quantity < 0 OR COALESCE(o.status, 'pending') = 'returned' THEN o.price * ABS(o.quantity) ELSE 0 END), 0.00) AS returns_revenue
FROM public.products p -- Start from products to include those with no orders today
LEFT JOIN public.orders o
ON p.pid = o.pid
AND o.date::date = _target_date -- Cast to date to ensure compatibility regardless of original type
GROUP BY p.pid, p.sku
HAVING COUNT(o.id) > 0 -- CRITICAL: Only include products with actual orders
),
ReceivingData AS (
SELECT
po.pid,
-- Track number of POs to ensure we have real data
COUNT(po.po_id) as po_count,
-- Prioritize the actual table fields over the JSON data
COALESCE(
-- First try the received field from purchase_orders table
SUM(CASE WHEN po.date::date = _target_date THEN po.received ELSE 0 END),
-- Aggregate Returns (Quantity < 0 or Status = Returned)
COALESCE(SUM(CASE WHEN o.quantity < 0 OR COALESCE(o.status, 'pending') = 'returned' THEN ABS(o.quantity) ELSE 0 END), 0) AS units_returned,
COALESCE(SUM(CASE WHEN o.quantity < 0 OR COALESCE(o.status, 'pending') = 'returned' THEN o.price * ABS(o.quantity) ELSE 0 END), 0.00) AS returns_revenue
FROM public.products p -- Start from products to include those with no orders today
JOIN public.orders o -- Changed to INNER JOIN to only process products with orders
ON p.pid = o.pid
AND o.date::date = _target_date -- Cast to date to ensure compatibility regardless of original type
GROUP BY p.pid, p.sku
-- No HAVING clause here - we always want to include all orders
),
ReceivingData AS (
SELECT
po.pid,
-- Track number of POs to ensure we have real data
COUNT(po.po_id) as po_count,
-- Prioritize the actual table fields over the JSON data
COALESCE(
-- First try the received field from purchase_orders table
SUM(CASE WHEN po.date::date = _target_date THEN po.received ELSE 0 END),
-- Otherwise fall back to the receiving_history JSON as secondary source
SUM(
CASE
WHEN (rh.item->>'date')::date = _target_date THEN (rh.item->>'qty')::numeric
WHEN (rh.item->>'received_at')::date = _target_date THEN (rh.item->>'qty')::numeric
WHEN (rh.item->>'receipt_date')::date = _target_date THEN (rh.item->>'qty')::numeric
ELSE 0
END
),
0
) AS units_received,
-- Otherwise fall back to the receiving_history JSON as secondary source
SUM(
CASE
WHEN (rh.item->>'date')::date = _target_date THEN (rh.item->>'qty')::numeric
WHEN (rh.item->>'received_at')::date = _target_date THEN (rh.item->>'qty')::numeric
WHEN (rh.item->>'receipt_date')::date = _target_date THEN (rh.item->>'qty')::numeric
ELSE 0
END
),
0
) AS units_received,
COALESCE(
-- First try the actual cost_price from purchase_orders
SUM(CASE WHEN po.date::date = _target_date THEN po.received * po.cost_price ELSE 0 END),
-- Otherwise fall back to receiving_history JSON
SUM(
CASE
WHEN (rh.item->>'date')::date = _target_date THEN (rh.item->>'qty')::numeric
WHEN (rh.item->>'received_at')::date = _target_date THEN (rh.item->>'qty')::numeric
WHEN (rh.item->>'receipt_date')::date = _target_date THEN (rh.item->>'qty')::numeric
ELSE 0
END
* COALESCE((rh.item->>'cost')::numeric, po.cost_price)
),
0.00
) AS cost_received
FROM public.purchase_orders po
LEFT JOIN LATERAL jsonb_array_elements(po.receiving_history) AS rh(item) ON
jsonb_typeof(po.receiving_history) = 'array' AND
jsonb_array_length(po.receiving_history) > 0 AND
(
(rh.item->>'date')::date = _target_date OR
(rh.item->>'received_at')::date = _target_date OR
(rh.item->>'receipt_date')::date = _target_date
)
-- Include POs with the current date or relevant receiving_history
WHERE
po.date::date = _target_date OR
jsonb_typeof(po.receiving_history) = 'array' AND
jsonb_array_length(po.receiving_history) > 0
GROUP BY po.pid
-- CRITICAL: Only include products with actual receiving activity
HAVING COUNT(po.po_id) > 0 OR SUM(
CASE
WHEN (rh.item->>'date')::date = _target_date THEN (rh.item->>'qty')::numeric
WHEN (rh.item->>'received_at')::date = _target_date THEN (rh.item->>'qty')::numeric
WHEN (rh.item->>'receipt_date')::date = _target_date THEN (rh.item->>'qty')::numeric
ELSE 0
END
) > 0
),
CurrentStock AS (
-- Select current stock values directly from products table
SELECT
COALESCE(
-- First try the actual cost_price from purchase_orders
SUM(CASE WHEN po.date::date = _target_date THEN po.received * po.cost_price ELSE 0 END),
-- Otherwise fall back to receiving_history JSON
SUM(
CASE
WHEN (rh.item->>'date')::date = _target_date THEN (rh.item->>'qty')::numeric
WHEN (rh.item->>'received_at')::date = _target_date THEN (rh.item->>'qty')::numeric
WHEN (rh.item->>'receipt_date')::date = _target_date THEN (rh.item->>'qty')::numeric
ELSE 0
END
* COALESCE((rh.item->>'cost')::numeric, po.cost_price)
),
0.00
) AS cost_received
FROM public.purchase_orders po
LEFT JOIN LATERAL jsonb_array_elements(po.receiving_history) AS rh(item) ON
jsonb_typeof(po.receiving_history) = 'array' AND
jsonb_array_length(po.receiving_history) > 0 AND
(
(rh.item->>'date')::date = _target_date OR
(rh.item->>'received_at')::date = _target_date OR
(rh.item->>'receipt_date')::date = _target_date
)
-- Include POs with the current date or relevant receiving_history
WHERE
po.date::date = _target_date OR
jsonb_typeof(po.receiving_history) = 'array' AND
jsonb_array_length(po.receiving_history) > 0
GROUP BY po.pid
-- CRITICAL: Only include products with actual receiving activity
HAVING COUNT(po.po_id) > 0 OR SUM(
CASE
WHEN (rh.item->>'date')::date = _target_date THEN (rh.item->>'qty')::numeric
WHEN (rh.item->>'received_at')::date = _target_date THEN (rh.item->>'qty')::numeric
WHEN (rh.item->>'receipt_date')::date = _target_date THEN (rh.item->>'qty')::numeric
ELSE 0
END
) > 0
),
CurrentStock AS (
-- Select current stock values directly from products table
SELECT
pid,
stock_quantity,
COALESCE(landing_cost_price, cost_price, 0.00) as effective_cost_price,
COALESCE(price, 0.00) as current_price,
COALESCE(regular_price, 0.00) as current_regular_price
FROM public.products
),
ProductsWithActivity AS (
-- Quick pre-filter to only process products with activity
SELECT DISTINCT pid
FROM (
SELECT pid FROM SalesData
UNION
SELECT pid FROM ReceivingData
) a
)
-- Now insert records, but ONLY for products with actual activity
INSERT INTO public.daily_product_snapshots (
snapshot_date,
pid,
stock_quantity,
COALESCE(landing_cost_price, cost_price, 0.00) as effective_cost_price,
COALESCE(price, 0.00) as current_price,
COALESCE(regular_price, 0.00) as current_regular_price
FROM public.products
)
-- Now insert records, but ONLY for products with actual activity
INSERT INTO public.daily_product_snapshots (
snapshot_date,
pid,
sku,
eod_stock_quantity,
eod_stock_cost,
eod_stock_retail,
eod_stock_gross,
stockout_flag,
units_sold,
units_returned,
gross_revenue,
discounts,
returns_revenue,
net_revenue,
cogs,
gross_regular_revenue,
profit,
units_received,
cost_received,
calculation_timestamp
)
SELECT
_target_date AS snapshot_date,
COALESCE(sd.pid, rd.pid) AS pid, -- Use sales or receiving PID
COALESCE(sd.sku, p.sku) AS sku, -- Get SKU from sales data or products table
-- Inventory Metrics (Using CurrentStock)
cs.stock_quantity AS eod_stock_quantity,
cs.stock_quantity * cs.effective_cost_price AS eod_stock_cost,
cs.stock_quantity * cs.current_price AS eod_stock_retail,
cs.stock_quantity * cs.current_regular_price AS eod_stock_gross,
(cs.stock_quantity <= 0) AS stockout_flag,
-- Sales Metrics (From SalesData)
COALESCE(sd.units_sold, 0),
COALESCE(sd.units_returned, 0),
COALESCE(sd.gross_revenue_unadjusted, 0.00),
COALESCE(sd.discounts, 0.00),
COALESCE(sd.returns_revenue, 0.00),
COALESCE(sd.gross_revenue_unadjusted, 0.00) - COALESCE(sd.discounts, 0.00) AS net_revenue,
COALESCE(sd.cogs, 0.00),
COALESCE(sd.gross_regular_revenue, 0.00),
(COALESCE(sd.gross_revenue_unadjusted, 0.00) - COALESCE(sd.discounts, 0.00)) - COALESCE(sd.cogs, 0.00) AS profit, -- Basic profit: Net Revenue - COGS
-- Receiving Metrics (From ReceivingData)
COALESCE(rd.units_received, 0),
COALESCE(rd.cost_received, 0.00),
_start_time -- Timestamp of this calculation run
FROM SalesData sd
FULL OUTER JOIN ReceivingData rd ON sd.pid = rd.pid
LEFT JOIN public.products p ON COALESCE(sd.pid, rd.pid) = p.pid
LEFT JOIN CurrentStock cs ON COALESCE(sd.pid, rd.pid) = cs.pid
WHERE p.pid IS NOT NULL; -- Ensure we only insert for existing products
sku,
eod_stock_quantity,
eod_stock_cost,
eod_stock_retail,
eod_stock_gross,
stockout_flag,
units_sold,
units_returned,
gross_revenue,
discounts,
returns_revenue,
net_revenue,
cogs,
gross_regular_revenue,
profit,
units_received,
cost_received,
calculation_timestamp
)
SELECT
_target_date AS snapshot_date,
COALESCE(sd.pid, rd.pid) AS pid, -- Use sales or receiving PID
COALESCE(sd.sku, p.sku) AS sku, -- Get SKU from sales data or products table
-- Inventory Metrics (Using CurrentStock)
cs.stock_quantity AS eod_stock_quantity,
cs.stock_quantity * cs.effective_cost_price AS eod_stock_cost,
cs.stock_quantity * cs.current_price AS eod_stock_retail,
cs.stock_quantity * cs.current_regular_price AS eod_stock_gross,
(cs.stock_quantity <= 0) AS stockout_flag,
-- Sales Metrics (From SalesData)
COALESCE(sd.units_sold, 0),
COALESCE(sd.units_returned, 0),
COALESCE(sd.gross_revenue_unadjusted, 0.00),
COALESCE(sd.discounts, 0.00),
COALESCE(sd.returns_revenue, 0.00),
COALESCE(sd.gross_revenue_unadjusted, 0.00) - COALESCE(sd.discounts, 0.00) AS net_revenue,
COALESCE(sd.cogs, 0.00),
COALESCE(sd.gross_regular_revenue, 0.00),
(COALESCE(sd.gross_revenue_unadjusted, 0.00) - COALESCE(sd.discounts, 0.00)) - COALESCE(sd.cogs, 0.00) AS profit, -- Basic profit: Net Revenue - COGS
-- Receiving Metrics (From ReceivingData)
COALESCE(rd.units_received, 0),
COALESCE(rd.cost_received, 0.00),
_start_time -- Timestamp of this calculation run
FROM SalesData sd
FULL OUTER JOIN ReceivingData rd ON sd.pid = rd.pid
JOIN ProductsWithActivity pwa ON COALESCE(sd.pid, rd.pid) = pwa.pid
LEFT JOIN public.products p ON COALESCE(sd.pid, rd.pid) = p.pid
LEFT JOIN CurrentStock cs ON COALESCE(sd.pid, rd.pid) = cs.pid
WHERE p.pid IS NOT NULL; -- Ensure we only insert for existing products
-- Get the total number of records inserted
GET DIAGNOSTICS _total_records = ROW_COUNT;
RAISE NOTICE 'Created % daily snapshot records for % with sales/receiving activity', _total_records, _target_date;
-- Get the total number of records inserted for this date
GET DIAGNOSTICS _total_records = ROW_COUNT;
RAISE NOTICE 'Created % daily snapshot records for % with sales/receiving activity', _total_records, _target_date;
END LOOP;
-- Update the status table with the timestamp from the START of this run
UPDATE public.calculate_status
SET last_calculation_timestamp = _start_time
WHERE module_name = _module_name;
RAISE NOTICE 'Finished % for date %. Duration: %', _module_name, _target_date, clock_timestamp() - _start_time;
RAISE NOTICE 'Finished % processing for multiple dates. Duration: %', _module_name, clock_timestamp() - _start_time;
END $$;