2 Commits

Author SHA1 Message Date
92ff80fba2 Import and calculate tweaks and fixes 2025-04-06 17:12:36 -04:00
a4c1a19d2e Try to synchronize time zones across import 2025-04-05 16:20:43 -04:00
12 changed files with 913 additions and 754 deletions

View File

@@ -150,7 +150,7 @@ CREATE TABLE IF NOT EXISTS calculate_history (
); );
CREATE TABLE IF NOT EXISTS calculate_status ( CREATE TABLE IF NOT EXISTS calculate_status (
module_name module_name PRIMARY KEY, module_name text PRIMARY KEY,
last_calculation_timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP last_calculation_timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
); );

View File

@@ -280,7 +280,7 @@ CREATE TABLE public.vendor_metrics (
lifetime_sales INT NOT NULL DEFAULT 0, lifetime_revenue NUMERIC(18, 4) NOT NULL DEFAULT 0.00, lifetime_sales INT NOT NULL DEFAULT 0, lifetime_revenue NUMERIC(18, 4) NOT NULL DEFAULT 0.00,
-- Calculated KPIs (Based on 30d aggregates) -- Calculated KPIs (Based on 30d aggregates)
avg_margin_30d NUMERIC(7, 3) -- (profit / revenue) * 100 avg_margin_30d NUMERIC(14, 4) -- (profit / revenue) * 100
-- Add more KPIs if needed (e.g., avg product value, sell-through rate for vendor) -- Add more KPIs if needed (e.g., avg product value, sell-through rate for vendor)
); );
CREATE INDEX idx_vendor_metrics_active_count ON public.vendor_metrics(active_product_count); CREATE INDEX idx_vendor_metrics_active_count ON public.vendor_metrics(active_product_count);

View File

@@ -213,55 +213,55 @@ SET session_replication_role = 'origin'; -- Re-enable foreign key checks
-- Create views for common calculations -- Create views for common calculations
-- product_sales_trends view moved to metrics-schema.sql -- product_sales_trends view moved to metrics-schema.sql
-- Historical data tables imported from production -- -- Historical data tables imported from production
CREATE TABLE imported_product_current_prices ( -- CREATE TABLE imported_product_current_prices (
price_id BIGSERIAL PRIMARY KEY, -- price_id BIGSERIAL PRIMARY KEY,
pid BIGINT NOT NULL, -- pid BIGINT NOT NULL,
qty_buy SMALLINT NOT NULL, -- qty_buy SMALLINT NOT NULL,
is_min_qty_buy BOOLEAN NOT NULL, -- is_min_qty_buy BOOLEAN NOT NULL,
price_each NUMERIC(10,3) NOT NULL, -- price_each NUMERIC(10,3) NOT NULL,
qty_limit SMALLINT NOT NULL, -- qty_limit SMALLINT NOT NULL,
no_promo BOOLEAN NOT NULL, -- no_promo BOOLEAN NOT NULL,
checkout_offer BOOLEAN NOT NULL, -- checkout_offer BOOLEAN NOT NULL,
active BOOLEAN NOT NULL, -- active BOOLEAN NOT NULL,
date_active TIMESTAMP WITH TIME ZONE, -- date_active TIMESTAMP WITH TIME ZONE,
date_deactive TIMESTAMP WITH TIME ZONE, -- date_deactive TIMESTAMP WITH TIME ZONE,
updated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP -- updated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
); -- );
CREATE INDEX idx_imported_product_current_prices_pid ON imported_product_current_prices(pid, active, qty_buy); -- CREATE INDEX idx_imported_product_current_prices_pid ON imported_product_current_prices(pid, active, qty_buy);
CREATE INDEX idx_imported_product_current_prices_checkout ON imported_product_current_prices(checkout_offer, active); -- CREATE INDEX idx_imported_product_current_prices_checkout ON imported_product_current_prices(checkout_offer, active);
CREATE INDEX idx_imported_product_current_prices_deactive ON imported_product_current_prices(date_deactive, active); -- CREATE INDEX idx_imported_product_current_prices_deactive ON imported_product_current_prices(date_deactive, active);
CREATE INDEX idx_imported_product_current_prices_active ON imported_product_current_prices(date_active, active); -- CREATE INDEX idx_imported_product_current_prices_active ON imported_product_current_prices(date_active, active);
CREATE TABLE imported_daily_inventory ( -- CREATE TABLE imported_daily_inventory (
date DATE NOT NULL, -- date DATE NOT NULL,
pid BIGINT NOT NULL, -- pid BIGINT NOT NULL,
amountsold SMALLINT NOT NULL DEFAULT 0, -- amountsold SMALLINT NOT NULL DEFAULT 0,
times_sold SMALLINT NOT NULL DEFAULT 0, -- times_sold SMALLINT NOT NULL DEFAULT 0,
qtyreceived SMALLINT NOT NULL DEFAULT 0, -- qtyreceived SMALLINT NOT NULL DEFAULT 0,
price NUMERIC(7,2) NOT NULL DEFAULT 0, -- price NUMERIC(7,2) NOT NULL DEFAULT 0,
costeach NUMERIC(7,2) NOT NULL DEFAULT 0, -- costeach NUMERIC(7,2) NOT NULL DEFAULT 0,
stamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, -- stamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, -- updated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (date, pid) -- PRIMARY KEY (date, pid)
); -- );
CREATE INDEX idx_imported_daily_inventory_pid ON imported_daily_inventory(pid); -- CREATE INDEX idx_imported_daily_inventory_pid ON imported_daily_inventory(pid);
CREATE TABLE imported_product_stat_history ( -- CREATE TABLE imported_product_stat_history (
pid BIGINT NOT NULL, -- pid BIGINT NOT NULL,
date DATE NOT NULL, -- date DATE NOT NULL,
score NUMERIC(10,2) NOT NULL, -- score NUMERIC(10,2) NOT NULL,
score2 NUMERIC(10,2) NOT NULL, -- score2 NUMERIC(10,2) NOT NULL,
qty_in_baskets SMALLINT NOT NULL, -- qty_in_baskets SMALLINT NOT NULL,
qty_sold SMALLINT NOT NULL, -- qty_sold SMALLINT NOT NULL,
notifies_set SMALLINT NOT NULL, -- notifies_set SMALLINT NOT NULL,
visibility_score NUMERIC(10,2) NOT NULL, -- visibility_score NUMERIC(10,2) NOT NULL,
health_score VARCHAR(5) NOT NULL, -- health_score VARCHAR(5) NOT NULL,
sold_view_score NUMERIC(6,3) NOT NULL, -- sold_view_score NUMERIC(6,3) NOT NULL,
updated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, -- updated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (pid, date) -- PRIMARY KEY (pid, date)
); -- );
CREATE INDEX idx_imported_product_stat_history_date ON imported_product_stat_history(date); -- CREATE INDEX idx_imported_product_stat_history_date ON imported_product_stat_history(date);

View File

@@ -1,7 +1,7 @@
const path = require('path'); const path = require('path');
const fs = require('fs'); const fs = require('fs');
const progress = require('../utils/progress'); // Assuming progress utils are here const progress = require('../scripts/metrics-new/utils/progress'); // Assuming progress utils are here
const { getConnection, closePool } = require('../utils/db'); // Assuming db utils are here const { getConnection, closePool } = require('../scripts/metrics-new/utils/db'); // Assuming db utils are here
const os = require('os'); // For detecting number of CPU cores const os = require('os'); // For detecting number of CPU cores
// --- Configuration --- // --- Configuration ---

View File

@@ -38,7 +38,7 @@ const sshConfig = {
password: process.env.PROD_DB_PASSWORD, password: process.env.PROD_DB_PASSWORD,
database: process.env.PROD_DB_NAME, database: process.env.PROD_DB_NAME,
port: process.env.PROD_DB_PORT || 3306, port: process.env.PROD_DB_PORT || 3306,
timezone: 'Z', timezone: '-05:00', // Production DB always stores times in EST (UTC-5) regardless of DST
}, },
localDbConfig: { localDbConfig: {
// PostgreSQL config for local // PostgreSQL config for local

View File

@@ -26,10 +26,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
let cumulativeProcessedOrders = 0; let cumulativeProcessedOrders = 0;
try { try {
// Begin transaction // Get last sync info - NOT in a transaction anymore
await localConnection.beginTransaction();
// Get last sync info
const [syncInfo] = await localConnection.query( const [syncInfo] = await localConnection.query(
"SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'orders'" "SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'orders'"
); );
@@ -43,8 +40,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
FROM order_items oi FROM order_items oi
JOIN _order o ON oi.order_id = o.order_id JOIN _order o ON oi.order_id = o.order_id
WHERE o.order_status >= 15 WHERE o.order_status >= 15
AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) AND o.date_placed >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR)
AND o.date_placed_onlydate IS NOT NULL AND o.date_placed IS NOT NULL
${incrementalUpdate ? ` ${incrementalUpdate ? `
AND ( AND (
o.stamp > ? o.stamp > ?
@@ -82,8 +79,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
FROM order_items oi FROM order_items oi
JOIN _order o ON oi.order_id = o.order_id JOIN _order o ON oi.order_id = o.order_id
WHERE o.order_status >= 15 WHERE o.order_status >= 15
AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) AND o.date_placed >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR)
AND o.date_placed_onlydate IS NOT NULL AND o.date_placed IS NOT NULL
${incrementalUpdate ? ` ${incrementalUpdate ? `
AND ( AND (
o.stamp > ? o.stamp > ?
@@ -107,6 +104,9 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
console.log('Orders: Found', orderItems.length, 'order items to process'); console.log('Orders: Found', orderItems.length, 'order items to process');
// Create tables in PostgreSQL for data processing // Create tables in PostgreSQL for data processing
// Start a transaction just for creating the temp tables
await localConnection.beginTransaction();
try {
await localConnection.query(` await localConnection.query(`
DROP TABLE IF EXISTS temp_order_items; DROP TABLE IF EXISTS temp_order_items;
DROP TABLE IF EXISTS temp_order_meta; DROP TABLE IF EXISTS temp_order_meta;
@@ -133,6 +133,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
canceled BOOLEAN, canceled BOOLEAN,
summary_discount NUMERIC(14, 4) DEFAULT 0.0000, summary_discount NUMERIC(14, 4) DEFAULT 0.0000,
summary_subtotal NUMERIC(14, 4) DEFAULT 0.0000, summary_subtotal NUMERIC(14, 4) DEFAULT 0.0000,
summary_discount_subtotal NUMERIC(14, 4) DEFAULT 0.0000,
PRIMARY KEY (order_id) PRIMARY KEY (order_id)
); );
@@ -159,10 +160,20 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
CREATE INDEX idx_temp_order_items_pid ON temp_order_items(pid); CREATE INDEX idx_temp_order_items_pid ON temp_order_items(pid);
CREATE INDEX idx_temp_order_meta_order_id ON temp_order_meta(order_id); CREATE INDEX idx_temp_order_meta_order_id ON temp_order_meta(order_id);
CREATE INDEX idx_temp_order_discounts_order_pid ON temp_order_discounts(order_id, pid);
CREATE INDEX idx_temp_order_taxes_order_pid ON temp_order_taxes(order_id, pid);
CREATE INDEX idx_temp_order_costs_order_pid ON temp_order_costs(order_id, pid);
`); `);
await localConnection.commit();
} catch (error) {
await localConnection.rollback();
throw error;
}
// Insert order items in batches // Insert order items in batches - each batch gets its own transaction
for (let i = 0; i < orderItems.length; i += 5000) { for (let i = 0; i < orderItems.length; i += 5000) {
await localConnection.beginTransaction();
try {
const batch = orderItems.slice(i, Math.min(i + 5000, orderItems.length)); const batch = orderItems.slice(i, Math.min(i + 5000, orderItems.length));
const placeholders = batch.map((_, idx) => const placeholders = batch.map((_, idx) =>
`($${idx * 6 + 1}, $${idx * 6 + 2}, $${idx * 6 + 3}, $${idx * 6 + 4}, $${idx * 6 + 5}, $${idx * 6 + 6})` `($${idx * 6 + 1}, $${idx * 6 + 2}, $${idx * 6 + 3}, $${idx * 6 + 4}, $${idx * 6 + 5}, $${idx * 6 + 6})`
@@ -181,6 +192,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
base_discount = EXCLUDED.base_discount base_discount = EXCLUDED.base_discount
`, values); `, values);
await localConnection.commit();
processedCount = i + batch.length; processedCount = i + batch.length;
outputProgress({ outputProgress({
status: "running", status: "running",
@@ -192,6 +205,10 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
remaining: estimateRemaining(startTime, processedCount, totalOrderItems), remaining: estimateRemaining(startTime, processedCount, totalOrderItems),
rate: calculateRate(startTime, processedCount) rate: calculateRate(startTime, processedCount)
}); });
} catch (error) {
await localConnection.rollback();
throw error;
}
} }
// Get unique order IDs // Get unique order IDs
@@ -218,25 +235,28 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
const [orders] = await prodConnection.query(` const [orders] = await prodConnection.query(`
SELECT SELECT
o.order_id, o.order_id,
o.date_placed_onlydate as date, o.date_placed as date,
o.order_cid as customer, o.order_cid as customer,
CONCAT(COALESCE(u.firstname, ''), ' ', COALESCE(u.lastname, '')) as customer_name, CONCAT(COALESCE(u.firstname, ''), ' ', COALESCE(u.lastname, '')) as customer_name,
o.order_status as status, o.order_status as status,
CASE WHEN o.date_cancelled != '0000-00-00 00:00:00' THEN 1 ELSE 0 END as canceled, CASE WHEN o.date_cancelled != '0000-00-00 00:00:00' THEN 1 ELSE 0 END as canceled,
o.summary_discount, o.summary_discount,
o.summary_subtotal o.summary_subtotal,
o.summary_discount_subtotal
FROM _order o FROM _order o
LEFT JOIN users u ON o.order_cid = u.cid LEFT JOIN users u ON o.order_cid = u.cid
WHERE o.order_id IN (?) WHERE o.order_id IN (?)
`, [batchIds]); `, [batchIds]);
// Process in sub-batches for PostgreSQL // Process in sub-batches for PostgreSQL
await localConnection.beginTransaction();
try {
for (let j = 0; j < orders.length; j += PG_BATCH_SIZE) { for (let j = 0; j < orders.length; j += PG_BATCH_SIZE) {
const subBatch = orders.slice(j, j + PG_BATCH_SIZE); const subBatch = orders.slice(j, j + PG_BATCH_SIZE);
if (subBatch.length === 0) continue; if (subBatch.length === 0) continue;
const placeholders = subBatch.map((_, idx) => const placeholders = subBatch.map((_, idx) =>
`($${idx * 8 + 1}, $${idx * 8 + 2}, $${idx * 8 + 3}, $${idx * 8 + 4}, $${idx * 8 + 5}, $${idx * 8 + 6}, $${idx * 8 + 7}, $${idx * 8 + 8})` `($${idx * 9 + 1}, $${idx * 9 + 2}, $${idx * 9 + 3}, $${idx * 9 + 4}, $${idx * 9 + 5}, $${idx * 9 + 6}, $${idx * 9 + 7}, $${idx * 9 + 8}, $${idx * 9 + 9})`
).join(","); ).join(",");
const values = subBatch.flatMap(order => [ const values = subBatch.flatMap(order => [
@@ -247,13 +267,14 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
order.status.toString(), // Convert status to TEXT order.status.toString(), // Convert status to TEXT
order.canceled, order.canceled,
order.summary_discount || 0, order.summary_discount || 0,
order.summary_subtotal || 0 order.summary_subtotal || 0,
order.summary_discount_subtotal || 0
]); ]);
await localConnection.query(` await localConnection.query(`
INSERT INTO temp_order_meta ( INSERT INTO temp_order_meta (
order_id, date, customer, customer_name, status, canceled, order_id, date, customer, customer_name, status, canceled,
summary_discount, summary_subtotal summary_discount, summary_subtotal, summary_discount_subtotal
) )
VALUES ${placeholders} VALUES ${placeholders}
ON CONFLICT (order_id) DO UPDATE SET ON CONFLICT (order_id) DO UPDATE SET
@@ -263,9 +284,15 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
status = EXCLUDED.status, status = EXCLUDED.status,
canceled = EXCLUDED.canceled, canceled = EXCLUDED.canceled,
summary_discount = EXCLUDED.summary_discount, summary_discount = EXCLUDED.summary_discount,
summary_subtotal = EXCLUDED.summary_subtotal summary_subtotal = EXCLUDED.summary_subtotal,
summary_discount_subtotal = EXCLUDED.summary_discount_subtotal
`, values); `, values);
} }
await localConnection.commit();
} catch (error) {
await localConnection.rollback();
throw error;
}
}; };
const processDiscountsBatch = async (batchIds) => { const processDiscountsBatch = async (batchIds) => {
@@ -278,6 +305,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
if (discounts.length === 0) return; if (discounts.length === 0) return;
await localConnection.beginTransaction();
try {
for (let j = 0; j < discounts.length; j += PG_BATCH_SIZE) { for (let j = 0; j < discounts.length; j += PG_BATCH_SIZE) {
const subBatch = discounts.slice(j, j + PG_BATCH_SIZE); const subBatch = discounts.slice(j, j + PG_BATCH_SIZE);
if (subBatch.length === 0) continue; if (subBatch.length === 0) continue;
@@ -299,6 +328,11 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
discount = EXCLUDED.discount discount = EXCLUDED.discount
`, values); `, values);
} }
await localConnection.commit();
} catch (error) {
await localConnection.rollback();
throw error;
}
}; };
const processTaxesBatch = async (batchIds) => { const processTaxesBatch = async (batchIds) => {
@@ -318,6 +352,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
if (taxes.length === 0) return; if (taxes.length === 0) return;
await localConnection.beginTransaction();
try {
for (let j = 0; j < taxes.length; j += PG_BATCH_SIZE) { for (let j = 0; j < taxes.length; j += PG_BATCH_SIZE) {
const subBatch = taxes.slice(j, j + PG_BATCH_SIZE); const subBatch = taxes.slice(j, j + PG_BATCH_SIZE);
if (subBatch.length === 0) continue; if (subBatch.length === 0) continue;
@@ -339,6 +375,11 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
tax = EXCLUDED.tax tax = EXCLUDED.tax
`, values); `, values);
} }
await localConnection.commit();
} catch (error) {
await localConnection.rollback();
throw error;
}
}; };
const processCostsBatch = async (batchIds) => { const processCostsBatch = async (batchIds) => {
@@ -363,6 +404,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
if (costs.length === 0) return; if (costs.length === 0) return;
await localConnection.beginTransaction();
try {
for (let j = 0; j < costs.length; j += PG_BATCH_SIZE) { for (let j = 0; j < costs.length; j += PG_BATCH_SIZE) {
const subBatch = costs.slice(j, j + PG_BATCH_SIZE); const subBatch = costs.slice(j, j + PG_BATCH_SIZE);
if (subBatch.length === 0) continue; if (subBatch.length === 0) continue;
@@ -384,18 +427,22 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
costeach = EXCLUDED.costeach costeach = EXCLUDED.costeach
`, values); `, values);
} }
await localConnection.commit();
} catch (error) {
await localConnection.rollback();
throw error;
}
}; };
// Process all data types in parallel for each batch // Process all data types SEQUENTIALLY for each batch - not in parallel
for (let i = 0; i < orderIds.length; i += METADATA_BATCH_SIZE) { for (let i = 0; i < orderIds.length; i += METADATA_BATCH_SIZE) {
const batchIds = orderIds.slice(i, i + METADATA_BATCH_SIZE); const batchIds = orderIds.slice(i, i + METADATA_BATCH_SIZE);
await Promise.all([ // Run these sequentially instead of in parallel to avoid transaction conflicts
processMetadataBatch(batchIds), await processMetadataBatch(batchIds);
processDiscountsBatch(batchIds), await processDiscountsBatch(batchIds);
processTaxesBatch(batchIds), await processTaxesBatch(batchIds);
processCostsBatch(batchIds) await processCostsBatch(batchIds);
]);
processedCount = i + batchIds.length; processedCount = i + batchIds.length;
outputProgress({ outputProgress({
@@ -422,14 +469,17 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
const existingPids = new Set(existingProducts.rows.map(p => p.pid)); const existingPids = new Set(existingProducts.rows.map(p => p.pid));
// Process in smaller batches // Process in smaller batches
for (let i = 0; i < orderIds.length; i += 1000) { for (let i = 0; i < orderIds.length; i += 2000) { // Increased from 1000 to 2000
const batchIds = orderIds.slice(i, i + 1000); const batchIds = orderIds.slice(i, i + 2000);
// Get combined data for this batch in sub-batches // Get combined data for this batch in sub-batches
const PG_BATCH_SIZE = 100; // Process 100 records at a time const PG_BATCH_SIZE = 200; // Increased from 100 to 200
for (let j = 0; j < batchIds.length; j += PG_BATCH_SIZE) { for (let j = 0; j < batchIds.length; j += PG_BATCH_SIZE) {
const subBatchIds = batchIds.slice(j, j + PG_BATCH_SIZE); const subBatchIds = batchIds.slice(j, j + PG_BATCH_SIZE);
// Start a transaction for this sub-batch
await localConnection.beginTransaction();
try {
const [orders] = await localConnection.query(` const [orders] = await localConnection.query(`
WITH order_totals AS ( WITH order_totals AS (
SELECT SELECT
@@ -442,6 +492,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
LEFT JOIN temp_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid LEFT JOIN temp_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid
LEFT JOIN temp_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid LEFT JOIN temp_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid
LEFT JOIN temp_order_costs oc ON oi.order_id = oc.order_id AND oi.pid = oc.pid LEFT JOIN temp_order_costs oc ON oi.order_id = oc.order_id AND oi.pid = oc.pid
WHERE oi.order_id = ANY($1)
GROUP BY oi.order_id, oi.pid, ot.tax, oc.costeach GROUP BY oi.order_id, oi.pid, ot.tax, oc.costeach
) )
SELECT SELECT
@@ -451,13 +502,20 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
om.date, om.date,
oi.price, oi.price,
oi.quantity, oi.quantity,
(oi.base_discount + (
COALESCE(ot.promo_discount, 0) + -- Part 1: Sale Savings for the Line
(oi.base_discount * oi.quantity)
+
-- Part 2: Prorated Points Discount (if applicable)
CASE CASE
WHEN om.summary_discount > 0 AND om.summary_subtotal > 0 THEN WHEN om.summary_discount_subtotal > 0 AND om.summary_subtotal > 0 THEN
ROUND((om.summary_discount * (oi.price * oi.quantity)) / NULLIF(om.summary_subtotal, 0), 2) COALESCE(ROUND((om.summary_discount_subtotal * (oi.price * oi.quantity)) / NULLIF(om.summary_subtotal, 0), 4), 0)
ELSE 0 ELSE 0
END)::NUMERIC(14, 4) as discount, END
+
-- Part 3: Specific Promo Code Discount (if applicable)
COALESCE(ot.promo_discount, 0)
)::NUMERIC(14, 4) as discount,
COALESCE(ot.total_tax, 0)::NUMERIC(14, 4) as tax, COALESCE(ot.total_tax, 0)::NUMERIC(14, 4) as tax,
false as tax_included, false as tax_included,
0 as shipping, 0 as shipping,
@@ -466,15 +524,10 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
om.status, om.status,
om.canceled, om.canceled,
COALESCE(ot.costeach, oi.price * 0.5)::NUMERIC(14, 4) as costeach COALESCE(ot.costeach, oi.price * 0.5)::NUMERIC(14, 4) as costeach
FROM ( FROM temp_order_items oi
SELECT DISTINCT ON (order_id, pid)
order_id, pid, sku, price, quantity, base_discount
FROM temp_order_items
WHERE order_id = ANY($1)
ORDER BY order_id, pid
) oi
JOIN temp_order_meta om ON oi.order_id = om.order_id JOIN temp_order_meta om ON oi.order_id = om.order_id
LEFT JOIN order_totals ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid LEFT JOIN order_totals ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid
WHERE oi.order_id = ANY($1)
ORDER BY oi.order_id, oi.pid ORDER BY oi.order_id, oi.pid
`, [subBatchIds]); `, [subBatchIds]);
@@ -495,7 +548,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
} }
// Process valid orders in smaller sub-batches // Process valid orders in smaller sub-batches
const FINAL_BATCH_SIZE = 50; const FINAL_BATCH_SIZE = 100; // Increased from 50 to 100
for (let k = 0; k < validOrders.length; k += FINAL_BATCH_SIZE) { for (let k = 0; k < validOrders.length; k += FINAL_BATCH_SIZE) {
const subBatch = validOrders.slice(k, k + FINAL_BATCH_SIZE); const subBatch = validOrders.slice(k, k + FINAL_BATCH_SIZE);
@@ -558,6 +611,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
importedCount += subBatch.length; importedCount += subBatch.length;
} }
await localConnection.commit();
cumulativeProcessedOrders += processedOrders.size; cumulativeProcessedOrders += processedOrders.size;
outputProgress({ outputProgress({
status: "running", status: "running",
@@ -569,9 +624,16 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
remaining: estimateRemaining(startTime, cumulativeProcessedOrders, totalUniqueOrders), remaining: estimateRemaining(startTime, cumulativeProcessedOrders, totalUniqueOrders),
rate: calculateRate(startTime, cumulativeProcessedOrders) rate: calculateRate(startTime, cumulativeProcessedOrders)
}); });
} catch (error) {
await localConnection.rollback();
throw error;
}
} }
} }
// Start a transaction for updating sync status and dropping temp tables
await localConnection.beginTransaction();
try {
// Update sync status // Update sync status
await localConnection.query(` await localConnection.query(`
INSERT INTO sync_status (table_name, last_sync_timestamp) INSERT INTO sync_status (table_name, last_sync_timestamp)
@@ -589,8 +651,12 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
DROP TABLE IF EXISTS temp_order_costs; DROP TABLE IF EXISTS temp_order_costs;
`); `);
// Commit transaction // Commit final transaction
await localConnection.commit(); await localConnection.commit();
} catch (error) {
await localConnection.rollback();
throw error;
}
return { return {
status: "complete", status: "complete",
@@ -604,14 +670,6 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
}; };
} catch (error) { } catch (error) {
console.error("Error during orders import:", error); console.error("Error during orders import:", error);
// Rollback transaction
try {
await localConnection.rollback();
} catch (rollbackError) {
console.error("Error during rollback:", rollbackError);
}
throw error; throw error;
} }
} }

View File

@@ -8,29 +8,7 @@ dotenv.config({ path: path.join(__dirname, "../../.env") });
// Utility functions // Utility functions
const imageUrlBase = process.env.PRODUCT_IMAGE_URL_BASE || 'https://sbing.com/i/products/0000/'; const imageUrlBase = process.env.PRODUCT_IMAGE_URL_BASE || 'https://sbing.com/i/products/0000/';
const getImageUrls = (pid, iid = 1) => {
// Modified to accept a db connection for querying product_images
const getImageUrls = async (pid, prodConnection, iid = null) => {
// If iid isn't provided, try to get it from product_images
if (iid === null && prodConnection) {
try {
// Query for images with order=255 (default/primary images)
const [primaryImages] = await prodConnection.query(
'SELECT iid FROM product_images WHERE pid = ? AND `order` = 255 LIMIT 1',
[pid]
);
// Use the found iid or default to 1
iid = primaryImages.length > 0 ? primaryImages[0].iid : 1;
} catch (error) {
console.error(`Error fetching primary image for pid ${pid}:`, error);
iid = 1; // Fallback to default
}
} else {
// Use default if connection not provided
iid = iid || 1;
}
const paddedPid = pid.toString().padStart(6, '0'); const paddedPid = pid.toString().padStart(6, '0');
// Use padded PID only for the first 3 digits // Use padded PID only for the first 3 digits
const prefix = paddedPid.slice(0, 3); const prefix = paddedPid.slice(0, 3);
@@ -120,6 +98,7 @@ async function setupTemporaryTables(connection) {
baskets INTEGER, baskets INTEGER,
notifies INTEGER, notifies INTEGER,
date_last_sold TIMESTAMP WITH TIME ZONE, date_last_sold TIMESTAMP WITH TIME ZONE,
primary_iid INTEGER,
image TEXT, image TEXT,
image_175 TEXT, image_175 TEXT,
image_full TEXT, image_full TEXT,
@@ -217,6 +196,7 @@ async function importMissingProducts(prodConnection, localConnection, missingPid
(SELECT COUNT(*) FROM product_notify pn WHERE pn.pid = p.pid) AS notifies, (SELECT COUNT(*) FROM product_notify pn WHERE pn.pid = p.pid) AS notifies,
(SELECT COALESCE(SUM(oi.qty_ordered), 0) FROM order_items oi WHERE oi.prod_pid = p.pid) AS total_sold, (SELECT COALESCE(SUM(oi.qty_ordered), 0) FROM order_items oi WHERE oi.prod_pid = p.pid) AS total_sold,
pls.date_sold as date_last_sold, pls.date_sold as date_last_sold,
(SELECT iid FROM product_images WHERE pid = p.pid AND \`order\` = 255 LIMIT 1) AS primary_iid,
GROUP_CONCAT(DISTINCT CASE GROUP_CONCAT(DISTINCT CASE
WHEN pc.cat_id IS NOT NULL WHEN pc.cat_id IS NOT NULL
AND pc.type IN (10, 20, 11, 21, 12, 13) AND pc.type IN (10, 20, 11, 21, 12, 13)
@@ -255,15 +235,13 @@ async function importMissingProducts(prodConnection, localConnection, missingPid
const batch = prodData.slice(i, i + BATCH_SIZE); const batch = prodData.slice(i, i + BATCH_SIZE);
const placeholders = batch.map((_, idx) => { const placeholders = batch.map((_, idx) => {
const base = idx * 47; // 47 columns const base = idx * 48; // 48 columns
return `(${Array.from({ length: 47 }, (_, i) => `$${base + i + 1}`).join(', ')})`; return `(${Array.from({ length: 48 }, (_, i) => `$${base + i + 1}`).join(', ')})`;
}).join(','); }).join(',');
// Process image URLs for the batch const values = batch.flatMap(row => {
const processedValues = []; const imageUrls = getImageUrls(row.pid, row.primary_iid || 1);
for (const row of batch) { return [
const imageUrls = await getImageUrls(row.pid, prodConnection);
processedValues.push([
row.pid, row.pid,
row.title, row.title,
row.description, row.description,
@@ -306,15 +284,14 @@ async function importMissingProducts(prodConnection, localConnection, missingPid
row.baskets, row.baskets,
row.notifies, row.notifies,
validateDate(row.date_last_sold), validateDate(row.date_last_sold),
row.primary_iid,
imageUrls.image, imageUrls.image,
imageUrls.image_175, imageUrls.image_175,
imageUrls.image_full, imageUrls.image_full,
null, null,
null null
]); ];
} });
const values = processedValues.flat();
const [result] = await localConnection.query(` const [result] = await localConnection.query(`
WITH inserted_products AS ( WITH inserted_products AS (
@@ -325,7 +302,7 @@ async function importMissingProducts(prodConnection, localConnection, missingPid
landing_cost_price, barcode, harmonized_tariff_code, updated_at, visible, landing_cost_price, barcode, harmonized_tariff_code, updated_at, visible,
managing_stock, replenishable, permalink, moq, uom, rating, reviews, managing_stock, replenishable, permalink, moq, uom, rating, reviews,
weight, length, width, height, country_of_origin, location, total_sold, weight, length, width, height, country_of_origin, location, total_sold,
baskets, notifies, date_last_sold, image, image_175, image_full, options, tags baskets, notifies, date_last_sold, primary_iid, image, image_175, image_full, options, tags
) )
VALUES ${placeholders} VALUES ${placeholders}
ON CONFLICT (pid) DO NOTHING ON CONFLICT (pid) DO NOTHING
@@ -422,6 +399,7 @@ async function materializeCalculations(prodConnection, localConnection, incremen
(SELECT COUNT(*) FROM product_notify pn WHERE pn.pid = p.pid) AS notifies, (SELECT COUNT(*) FROM product_notify pn WHERE pn.pid = p.pid) AS notifies,
(SELECT COALESCE(SUM(oi.qty_ordered), 0) FROM order_items oi WHERE oi.prod_pid = p.pid) AS total_sold, (SELECT COALESCE(SUM(oi.qty_ordered), 0) FROM order_items oi WHERE oi.prod_pid = p.pid) AS total_sold,
pls.date_sold as date_last_sold, pls.date_sold as date_last_sold,
(SELECT iid FROM product_images WHERE pid = p.pid AND \`order\` = 255 LIMIT 1) AS primary_iid,
GROUP_CONCAT(DISTINCT CASE GROUP_CONCAT(DISTINCT CASE
WHEN pc.cat_id IS NOT NULL WHEN pc.cat_id IS NOT NULL
AND pc.type IN (10, 20, 11, 21, 12, 13) AND pc.type IN (10, 20, 11, 21, 12, 13)
@@ -448,9 +426,11 @@ async function materializeCalculations(prodConnection, localConnection, incremen
pcp.date_deactive > ? OR pcp.date_deactive > ? OR
pcp.date_active > ? OR pcp.date_active > ? OR
pnb.date_updated > ? pnb.date_updated > ?
-- Add condition for product_images changes if needed for incremental updates
-- OR EXISTS (SELECT 1 FROM product_images pi WHERE pi.pid = p.pid AND pi.stamp > ?)
` : 'TRUE'} ` : 'TRUE'}
GROUP BY p.pid GROUP BY p.pid
`, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []); `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime /*, lastSyncTime */] : []);
outputProgress({ outputProgress({
status: "running", status: "running",
@@ -464,15 +444,13 @@ async function materializeCalculations(prodConnection, localConnection, incremen
await withRetry(async () => { await withRetry(async () => {
const placeholders = batch.map((_, idx) => { const placeholders = batch.map((_, idx) => {
const base = idx * 47; // 47 columns const base = idx * 48; // 48 columns
return `(${Array.from({ length: 47 }, (_, i) => `$${base + i + 1}`).join(', ')})`; return `(${Array.from({ length: 48 }, (_, i) => `$${base + i + 1}`).join(', ')})`;
}).join(','); }).join(',');
// Process image URLs for the batch const values = batch.flatMap(row => {
const processedValues = []; const imageUrls = getImageUrls(row.pid, row.primary_iid || 1);
for (const row of batch) { return [
const imageUrls = await getImageUrls(row.pid, prodConnection);
processedValues.push([
row.pid, row.pid,
row.title, row.title,
row.description, row.description,
@@ -515,15 +493,14 @@ async function materializeCalculations(prodConnection, localConnection, incremen
row.baskets, row.baskets,
row.notifies, row.notifies,
validateDate(row.date_last_sold), validateDate(row.date_last_sold),
row.primary_iid,
imageUrls.image, imageUrls.image,
imageUrls.image_175, imageUrls.image_175,
imageUrls.image_full, imageUrls.image_full,
null, null,
null null
]); ];
} });
const values = processedValues.flat();
await localConnection.query(` await localConnection.query(`
INSERT INTO temp_products ( INSERT INTO temp_products (
@@ -533,7 +510,7 @@ async function materializeCalculations(prodConnection, localConnection, incremen
landing_cost_price, barcode, harmonized_tariff_code, updated_at, visible, landing_cost_price, barcode, harmonized_tariff_code, updated_at, visible,
managing_stock, replenishable, permalink, moq, uom, rating, reviews, managing_stock, replenishable, permalink, moq, uom, rating, reviews,
weight, length, width, height, country_of_origin, location, total_sold, weight, length, width, height, country_of_origin, location, total_sold,
baskets, notifies, date_last_sold, image, image_175, image_full, options, tags baskets, notifies, date_last_sold, primary_iid, image, image_175, image_full, options, tags
) VALUES ${placeholders} ) VALUES ${placeholders}
ON CONFLICT (pid) DO UPDATE SET ON CONFLICT (pid) DO UPDATE SET
title = EXCLUDED.title, title = EXCLUDED.title,
@@ -576,6 +553,7 @@ async function materializeCalculations(prodConnection, localConnection, incremen
baskets = EXCLUDED.baskets, baskets = EXCLUDED.baskets,
notifies = EXCLUDED.notifies, notifies = EXCLUDED.notifies,
date_last_sold = EXCLUDED.date_last_sold, date_last_sold = EXCLUDED.date_last_sold,
primary_iid = EXCLUDED.primary_iid,
image = EXCLUDED.image, image = EXCLUDED.image,
image_175 = EXCLUDED.image_175, image_175 = EXCLUDED.image_175,
image_full = EXCLUDED.image_full, image_full = EXCLUDED.image_full,
@@ -674,6 +652,7 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
t.baskets, t.baskets,
t.notifies, t.notifies,
t.date_last_sold, t.date_last_sold,
t.primary_iid,
t.image, t.image,
t.image_175, t.image_175,
t.image_full, t.image_full,
@@ -695,11 +674,9 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
return `(${Array.from({ length: 47 }, (_, i) => `$${base + i + 1}`).join(', ')})`; return `(${Array.from({ length: 47 }, (_, i) => `$${base + i + 1}`).join(', ')})`;
}).join(','); }).join(',');
// Process image URLs for the batch const values = batch.flatMap(row => {
const processedValues = []; const imageUrls = getImageUrls(row.pid, row.primary_iid || 1);
for (const row of batch) { return [
const imageUrls = await getImageUrls(row.pid, prodConnection);
processedValues.push([
row.pid, row.pid,
row.title, row.title,
row.description, row.description,
@@ -747,10 +724,8 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
imageUrls.image_full, imageUrls.image_full,
row.options, row.options,
row.tags row.tags
]); ];
} });
const values = processedValues.flat();
const [result] = await localConnection.query(` const [result] = await localConnection.query(`
WITH upserted AS ( WITH upserted AS (

View File

@@ -1,4 +1,4 @@
-- Description: Calculates and updates daily aggregated product data for the current day. -- Description: Calculates and updates daily aggregated product data for recent days.
-- Uses UPSERT (INSERT ON CONFLICT UPDATE) for idempotency. -- Uses UPSERT (INSERT ON CONFLICT UPDATE) for idempotency.
-- Dependencies: Core import tables (products, orders, purchase_orders), calculate_status table. -- Dependencies: Core import tables (products, orders, purchase_orders), calculate_status table.
-- Frequency: Hourly (Run ~5-10 minutes after hourly data import completes). -- Frequency: Hourly (Run ~5-10 minutes after hourly data import completes).
@@ -8,48 +8,69 @@ DECLARE
_module_name TEXT := 'daily_snapshots'; _module_name TEXT := 'daily_snapshots';
_start_time TIMESTAMPTZ := clock_timestamp(); -- Time execution started _start_time TIMESTAMPTZ := clock_timestamp(); -- Time execution started
_last_calc_time TIMESTAMPTZ; _last_calc_time TIMESTAMPTZ;
_target_date DATE := CURRENT_DATE; -- Always recalculate today for simplicity with hourly runs _target_date DATE; -- Will be set in the loop
_total_records INT := 0; _total_records INT := 0;
_has_orders BOOLEAN := FALSE; _has_orders BOOLEAN := FALSE;
_process_days INT := 5; -- Number of days to check/process (today plus previous 4 days)
_day_counter INT;
_missing_days INT[] := ARRAY[]::INT[]; -- Array to store days with missing or incomplete data
BEGIN BEGIN
-- Get the timestamp before the last successful run of this module -- Get the timestamp before the last successful run of this module
SELECT last_calculation_timestamp INTO _last_calc_time SELECT last_calculation_timestamp INTO _last_calc_time
FROM public.calculate_status FROM public.calculate_status
WHERE module_name = _module_name; WHERE module_name = _module_name;
RAISE NOTICE 'Running % for date %. Start Time: %', _module_name, _target_date, _start_time; RAISE NOTICE 'Running % script. Start Time: %', _module_name, _start_time;
-- CRITICAL FIX: Check if we have any orders or receiving activity for today -- First, check which days need processing by comparing orders data with snapshot data
-- to prevent creating artificial records when no real activity exists FOR _day_counter IN 0..(_process_days-1) LOOP
SELECT EXISTS ( _target_date := CURRENT_DATE - (_day_counter * INTERVAL '1 day');
SELECT 1 FROM public.orders WHERE date::date = _target_date
UNION -- Check if this date needs updating by comparing orders to snapshot data
SELECT 1 FROM public.purchase_orders -- If the date has orders but not enough snapshots, or if snapshots show zero sales but orders exist, it's incomplete
WHERE date::date = _target_date SELECT
OR EXISTS ( CASE WHEN (
SELECT 1 FROM jsonb_array_elements(receiving_history) AS rh -- We have orders for this date but not enough snapshots, or snapshots with wrong total
WHERE jsonb_typeof(receiving_history) = 'array' (EXISTS (SELECT 1 FROM public.orders WHERE date::date = _target_date) AND
AND ( (
(rh->>'date')::date = _target_date OR -- No snapshots exist for this date
(rh->>'received_at')::date = _target_date OR NOT EXISTS (SELECT 1 FROM public.daily_product_snapshots WHERE snapshot_date = _target_date) OR
(rh->>'receipt_date')::date = _target_date -- Or snapshots show zero sales but orders exist
(SELECT COALESCE(SUM(units_sold), 0) FROM public.daily_product_snapshots WHERE snapshot_date = _target_date) = 0 OR
-- Or the count of snapshot records is significantly less than distinct products in orders
(SELECT COUNT(*) FROM public.daily_product_snapshots WHERE snapshot_date = _target_date) <
(SELECT COUNT(DISTINCT pid) FROM public.orders WHERE date::date = _target_date) * 0.8
) )
) )
LIMIT 1 ) THEN TRUE ELSE FALSE END
) INTO _has_orders; INTO _has_orders;
-- If no orders or receiving activity found for today, log and exit IF _has_orders THEN
IF NOT _has_orders THEN -- This day needs processing - add to our array
RAISE NOTICE 'No orders or receiving activity found for % - skipping daily snapshot creation', _target_date; _missing_days := _missing_days || _day_counter;
RAISE NOTICE 'Day % needs updating (incomplete or missing data)', _target_date;
END IF;
END LOOP;
-- Still update the calculate_status to prevent repeated attempts -- If no days need updating, exit early
IF array_length(_missing_days, 1) IS NULL THEN
RAISE NOTICE 'No days need updating - all snapshot data appears complete';
-- Still update the calculate_status to record this run
UPDATE public.calculate_status UPDATE public.calculate_status
SET last_calculation_timestamp = _start_time SET last_calculation_timestamp = _start_time
WHERE module_name = _module_name; WHERE module_name = _module_name;
RETURN; -- Exit without creating snapshots RETURN;
END IF; END IF;
RAISE NOTICE 'Need to update % days with missing or incomplete data', array_length(_missing_days, 1);
-- Process only the days that need updating
FOREACH _day_counter IN ARRAY _missing_days LOOP
_target_date := CURRENT_DATE - (_day_counter * INTERVAL '1 day');
RAISE NOTICE 'Processing date: %', _target_date;
-- IMPORTANT: First delete any existing data for this date to prevent duplication -- IMPORTANT: First delete any existing data for this date to prevent duplication
DELETE FROM public.daily_product_snapshots DELETE FROM public.daily_product_snapshots
WHERE snapshot_date = _target_date; WHERE snapshot_date = _target_date;
@@ -72,11 +93,11 @@ BEGIN
COALESCE(SUM(CASE WHEN o.quantity < 0 OR COALESCE(o.status, 'pending') = 'returned' THEN ABS(o.quantity) ELSE 0 END), 0) AS units_returned, COALESCE(SUM(CASE WHEN o.quantity < 0 OR COALESCE(o.status, 'pending') = 'returned' THEN ABS(o.quantity) ELSE 0 END), 0) AS units_returned,
COALESCE(SUM(CASE WHEN o.quantity < 0 OR COALESCE(o.status, 'pending') = 'returned' THEN o.price * ABS(o.quantity) ELSE 0 END), 0.00) AS returns_revenue COALESCE(SUM(CASE WHEN o.quantity < 0 OR COALESCE(o.status, 'pending') = 'returned' THEN o.price * ABS(o.quantity) ELSE 0 END), 0.00) AS returns_revenue
FROM public.products p -- Start from products to include those with no orders today FROM public.products p -- Start from products to include those with no orders today
LEFT JOIN public.orders o JOIN public.orders o -- Changed to INNER JOIN to only process products with orders
ON p.pid = o.pid ON p.pid = o.pid
AND o.date::date = _target_date -- Cast to date to ensure compatibility regardless of original type AND o.date::date = _target_date -- Cast to date to ensure compatibility regardless of original type
GROUP BY p.pid, p.sku GROUP BY p.pid, p.sku
HAVING COUNT(o.id) > 0 -- CRITICAL: Only include products with actual orders -- No HAVING clause here - we always want to include all orders
), ),
ReceivingData AS ( ReceivingData AS (
SELECT SELECT
@@ -150,6 +171,15 @@ BEGIN
COALESCE(price, 0.00) as current_price, COALESCE(price, 0.00) as current_price,
COALESCE(regular_price, 0.00) as current_regular_price COALESCE(regular_price, 0.00) as current_regular_price
FROM public.products FROM public.products
),
ProductsWithActivity AS (
-- Quick pre-filter to only process products with activity
SELECT DISTINCT pid
FROM (
SELECT pid FROM SalesData
UNION
SELECT pid FROM ReceivingData
) a
) )
-- Now insert records, but ONLY for products with actual activity -- Now insert records, but ONLY for products with actual activity
INSERT INTO public.daily_product_snapshots ( INSERT INTO public.daily_product_snapshots (
@@ -200,19 +230,21 @@ BEGIN
_start_time -- Timestamp of this calculation run _start_time -- Timestamp of this calculation run
FROM SalesData sd FROM SalesData sd
FULL OUTER JOIN ReceivingData rd ON sd.pid = rd.pid FULL OUTER JOIN ReceivingData rd ON sd.pid = rd.pid
JOIN ProductsWithActivity pwa ON COALESCE(sd.pid, rd.pid) = pwa.pid
LEFT JOIN public.products p ON COALESCE(sd.pid, rd.pid) = p.pid LEFT JOIN public.products p ON COALESCE(sd.pid, rd.pid) = p.pid
LEFT JOIN CurrentStock cs ON COALESCE(sd.pid, rd.pid) = cs.pid LEFT JOIN CurrentStock cs ON COALESCE(sd.pid, rd.pid) = cs.pid
WHERE p.pid IS NOT NULL; -- Ensure we only insert for existing products WHERE p.pid IS NOT NULL; -- Ensure we only insert for existing products
-- Get the total number of records inserted -- Get the total number of records inserted for this date
GET DIAGNOSTICS _total_records = ROW_COUNT; GET DIAGNOSTICS _total_records = ROW_COUNT;
RAISE NOTICE 'Created % daily snapshot records for % with sales/receiving activity', _total_records, _target_date; RAISE NOTICE 'Created % daily snapshot records for % with sales/receiving activity', _total_records, _target_date;
END LOOP;
-- Update the status table with the timestamp from the START of this run -- Update the status table with the timestamp from the START of this run
UPDATE public.calculate_status UPDATE public.calculate_status
SET last_calculation_timestamp = _start_time SET last_calculation_timestamp = _start_time
WHERE module_name = _module_name; WHERE module_name = _module_name;
RAISE NOTICE 'Finished % for date %. Duration: %', _module_name, _target_date, clock_timestamp() - _start_time; RAISE NOTICE 'Finished % processing for multiple dates. Duration: %', _module_name, clock_timestamp() - _start_time;
END $$; END $$;

View File

@@ -13,6 +13,22 @@ const dbConfig = {
port: process.env.DB_PORT || 5432 port: process.env.DB_PORT || 5432
}; };
// Tables to always protect from being dropped
const PROTECTED_TABLES = [
'users',
'permissions',
'user_permissions',
'calculate_history',
'import_history',
'ai_prompts',
'ai_validation_performance',
'templates',
'reusable_images',
'imported_daily_inventory',
'imported_product_stat_history',
'imported_product_current_prices'
];
// Helper function to output progress in JSON format // Helper function to output progress in JSON format
function outputProgress(data) { function outputProgress(data) {
if (!data.status) { if (!data.status) {
@@ -33,17 +49,6 @@ const CORE_TABLES = [
'product_categories' 'product_categories'
]; ];
// Config tables that must be created
const CONFIG_TABLES = [
'stock_thresholds',
'lead_time_thresholds',
'sales_velocity_config',
'abc_classification_config',
'safety_stock_config',
'sales_seasonality',
'turnover_config'
];
// Split SQL into individual statements // Split SQL into individual statements
function splitSQLStatements(sql) { function splitSQLStatements(sql) {
// First, normalize line endings // First, normalize line endings
@@ -184,8 +189,8 @@ async function resetDatabase() {
SELECT string_agg(tablename, ', ') as tables SELECT string_agg(tablename, ', ') as tables
FROM pg_tables FROM pg_tables
WHERE schemaname = 'public' WHERE schemaname = 'public'
AND tablename NOT IN ('users', 'permissions', 'user_permissions', 'calculate_history', 'import_history', 'ai_prompts', 'ai_validation_performance', 'templates', 'reusable_images'); AND tablename NOT IN (SELECT unnest($1::text[]));
`); `, [PROTECTED_TABLES]);
if (!tablesResult.rows[0].tables) { if (!tablesResult.rows[0].tables) {
outputProgress({ outputProgress({
@@ -204,7 +209,7 @@ async function resetDatabase() {
// Drop all tables except users // Drop all tables except users
const tables = tablesResult.rows[0].tables.split(', '); const tables = tablesResult.rows[0].tables.split(', ');
for (const table of tables) { for (const table of tables) {
if (!['users', 'reusable_images'].includes(table)) { if (!PROTECTED_TABLES.includes(table)) {
await client.query(`DROP TABLE IF EXISTS "${table}" CASCADE`); await client.query(`DROP TABLE IF EXISTS "${table}" CASCADE`);
} }
} }
@@ -259,7 +264,9 @@ async function resetDatabase() {
'category_metrics', 'category_metrics',
'brand_metrics', 'brand_metrics',
'sales_forecasts', 'sales_forecasts',
'abc_classification' 'abc_classification',
'daily_snapshots',
'periodic_metrics'
) )
`); `);
} }
@@ -301,6 +308,9 @@ async function resetDatabase() {
} }
}); });
// Start a transaction for better error handling
await client.query('BEGIN');
try {
for (let i = 0; i < statements.length; i++) { for (let i = 0; i < statements.length; i++) {
const stmt = statements[i]; const stmt = statements[i];
try { try {
@@ -336,7 +346,14 @@ async function resetDatabase() {
rowCount: result.rowCount rowCount: result.rowCount
} }
}); });
// Commit in chunks of 10 statements to avoid long-running transactions
if (i > 0 && i % 10 === 0) {
await client.query('COMMIT');
await client.query('BEGIN');
}
} catch (sqlError) { } catch (sqlError) {
await client.query('ROLLBACK');
outputProgress({ outputProgress({
status: 'error', status: 'error',
operation: 'SQL Error', operation: 'SQL Error',
@@ -347,6 +364,12 @@ async function resetDatabase() {
throw sqlError; throw sqlError;
} }
} }
// Commit the final transaction
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
}
// Verify core tables were created // Verify core tables were created
const existingTables = (await client.query(` const existingTables = (await client.query(`
@@ -383,10 +406,24 @@ async function resetDatabase() {
operation: 'Running config setup', operation: 'Running config setup',
message: 'Creating configuration tables...' message: 'Creating configuration tables...'
}); });
const configSchemaSQL = fs.readFileSync( const configSchemaPath = path.join(__dirname, '../db/config-schema-new.sql');
path.join(__dirname, '../db/config-schema-new.sql'),
'utf8' // Verify file exists
); if (!fs.existsSync(configSchemaPath)) {
throw new Error(`Config schema file not found at: ${configSchemaPath}`);
}
const configSchemaSQL = fs.readFileSync(configSchemaPath, 'utf8');
outputProgress({
operation: 'Config Schema file',
message: {
path: configSchemaPath,
exists: fs.existsSync(configSchemaPath),
size: fs.statSync(configSchemaPath).size,
firstFewLines: configSchemaSQL.split('\n').slice(0, 5).join('\n')
}
});
// Execute config schema statements one at a time // Execute config schema statements one at a time
const configStatements = splitSQLStatements(configSchemaSQL); const configStatements = splitSQLStatements(configSchemaSQL);
@@ -401,6 +438,9 @@ async function resetDatabase() {
} }
}); });
// Start a transaction for better error handling
await client.query('BEGIN');
try {
for (let i = 0; i < configStatements.length; i++) { for (let i = 0; i < configStatements.length; i++) {
const stmt = configStatements[i]; const stmt = configStatements[i];
try { try {
@@ -415,7 +455,14 @@ async function resetDatabase() {
rowCount: result.rowCount rowCount: result.rowCount
} }
}); });
// Commit in chunks of 10 statements to avoid long-running transactions
if (i > 0 && i % 10 === 0) {
await client.query('COMMIT');
await client.query('BEGIN');
}
} catch (sqlError) { } catch (sqlError) {
await client.query('ROLLBACK');
outputProgress({ outputProgress({
status: 'error', status: 'error',
operation: 'Config SQL Error', operation: 'Config SQL Error',
@@ -426,16 +473,36 @@ async function resetDatabase() {
throw sqlError; throw sqlError;
} }
} }
// Commit the final transaction
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
}
// Read and execute metrics schema (metrics tables) // Read and execute metrics schema (metrics tables)
outputProgress({ outputProgress({
operation: 'Running metrics setup', operation: 'Running metrics setup',
message: 'Creating metrics tables...' message: 'Creating metrics tables...'
}); });
const metricsSchemaSQL = fs.readFileSync( const metricsSchemaPath = path.join(__dirname, '../db/metrics-schema-new.sql');
path.join(__dirname, '../db/metrics-schema-new.sql'),
'utf8' // Verify file exists
); if (!fs.existsSync(metricsSchemaPath)) {
throw new Error(`Metrics schema file not found at: ${metricsSchemaPath}`);
}
const metricsSchemaSQL = fs.readFileSync(metricsSchemaPath, 'utf8');
outputProgress({
operation: 'Metrics Schema file',
message: {
path: metricsSchemaPath,
exists: fs.existsSync(metricsSchemaPath),
size: fs.statSync(metricsSchemaPath).size,
firstFewLines: metricsSchemaSQL.split('\n').slice(0, 5).join('\n')
}
});
// Execute metrics schema statements one at a time // Execute metrics schema statements one at a time
const metricsStatements = splitSQLStatements(metricsSchemaSQL); const metricsStatements = splitSQLStatements(metricsSchemaSQL);
@@ -450,6 +517,9 @@ async function resetDatabase() {
} }
}); });
// Start a transaction for better error handling
await client.query('BEGIN');
try {
for (let i = 0; i < metricsStatements.length; i++) { for (let i = 0; i < metricsStatements.length; i++) {
const stmt = metricsStatements[i]; const stmt = metricsStatements[i];
try { try {
@@ -464,7 +534,14 @@ async function resetDatabase() {
rowCount: result.rowCount rowCount: result.rowCount
} }
}); });
// Commit in chunks of 10 statements to avoid long-running transactions
if (i > 0 && i % 10 === 0) {
await client.query('COMMIT');
await client.query('BEGIN');
}
} catch (sqlError) { } catch (sqlError) {
await client.query('ROLLBACK');
outputProgress({ outputProgress({
status: 'error', status: 'error',
operation: 'Metrics SQL Error', operation: 'Metrics SQL Error',
@@ -475,6 +552,12 @@ async function resetDatabase() {
throw sqlError; throw sqlError;
} }
} }
// Commit the final transaction
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
}
outputProgress({ outputProgress({
status: 'complete', status: 'complete',
@@ -490,6 +573,14 @@ async function resetDatabase() {
}); });
process.exit(1); process.exit(1);
} finally { } finally {
// Make sure to re-enable foreign key checks if they were disabled
try {
await client.query('SET session_replication_role = \'origin\'');
} catch (e) {
console.error('Error re-enabling foreign key checks:', e.message);
}
// Close the database connection
await client.end(); await client.end();
} }
} }

View File

@@ -31,7 +31,10 @@ const PROTECTED_TABLES = [
'ai_prompts', 'ai_prompts',
'ai_validation_performance', 'ai_validation_performance',
'templates', 'templates',
'reusable_images' 'reusable_images',
'imported_daily_inventory',
'imported_product_stat_history',
'imported_product_current_prices'
]; ];
// Split SQL into individual statements // Split SQL into individual statements

View File

@@ -844,7 +844,7 @@ router.get('/status/table-counts', async (req, res) => {
// Core tables // Core tables
'products', 'categories', 'product_categories', 'orders', 'purchase_orders', 'products', 'categories', 'product_categories', 'orders', 'purchase_orders',
// New metrics tables // New metrics tables
'product_metrics', 'daily_product_snapshots', 'product_metrics', 'daily_product_snapshots','brand_metrics','category_metrics','vendor_metrics',
// Config tables // Config tables
'settings_global', 'settings_vendor', 'settings_product' 'settings_global', 'settings_vendor', 'settings_product'
]; ];
@@ -867,7 +867,7 @@ router.get('/status/table-counts', async (req, res) => {
// Group tables by type // Group tables by type
const groupedCounts = { const groupedCounts = {
core: counts.filter(c => ['products', 'categories', 'product_categories', 'orders', 'purchase_orders'].includes(c.table_name)), core: counts.filter(c => ['products', 'categories', 'product_categories', 'orders', 'purchase_orders'].includes(c.table_name)),
metrics: counts.filter(c => ['product_metrics', 'daily_product_snapshots'].includes(c.table_name)), metrics: counts.filter(c => ['product_metrics', 'daily_product_snapshots','brand_metrics','category_metrics','vendor_metrics'].includes(c.table_name)),
config: counts.filter(c => ['settings_global', 'settings_vendor', 'settings_product'].includes(c.table_name)) config: counts.filter(c => ['settings_global', 'settings_vendor', 'settings_product'].includes(c.table_name))
}; };