diff --git a/inventory-server/db/config-schema-new.sql b/inventory-server/db/config-schema-new.sql index 2236716..32f5762 100644 --- a/inventory-server/db/config-schema-new.sql +++ b/inventory-server/db/config-schema-new.sql @@ -150,7 +150,7 @@ CREATE TABLE IF NOT EXISTS calculate_history ( ); CREATE TABLE IF NOT EXISTS calculate_status ( - module_name module_name PRIMARY KEY, + module_name text PRIMARY KEY, last_calculation_timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP ); diff --git a/inventory-server/db/metrics-schema-new.sql b/inventory-server/db/metrics-schema-new.sql index 239a77d..0321e6a 100644 --- a/inventory-server/db/metrics-schema-new.sql +++ b/inventory-server/db/metrics-schema-new.sql @@ -280,7 +280,7 @@ CREATE TABLE public.vendor_metrics ( lifetime_sales INT NOT NULL DEFAULT 0, lifetime_revenue NUMERIC(18, 4) NOT NULL DEFAULT 0.00, -- Calculated KPIs (Based on 30d aggregates) - avg_margin_30d NUMERIC(7, 3) -- (profit / revenue) * 100 + avg_margin_30d NUMERIC(14, 4) -- (profit / revenue) * 100 -- Add more KPIs if needed (e.g., avg product value, sell-through rate for vendor) ); CREATE INDEX idx_vendor_metrics_active_count ON public.vendor_metrics(active_product_count); diff --git a/inventory-server/scripts/metrics-new/backfill/backfill-snapshots.js b/inventory-server/old/backfill-snapshots.js similarity index 98% rename from inventory-server/scripts/metrics-new/backfill/backfill-snapshots.js rename to inventory-server/old/backfill-snapshots.js index 09df5c1..455f155 100644 --- a/inventory-server/scripts/metrics-new/backfill/backfill-snapshots.js +++ b/inventory-server/old/backfill-snapshots.js @@ -1,7 +1,7 @@ const path = require('path'); const fs = require('fs'); -const progress = require('../utils/progress'); // Assuming progress utils are here -const { getConnection, closePool } = require('../utils/db'); // Assuming db utils are here +const progress = require('../scripts/metrics-new/utils/progress'); // Assuming progress utils are here +const { getConnection, closePool } = require('../scripts/metrics-new/utils/db'); // Assuming db utils are here const os = require('os'); // For detecting number of CPU cores // --- Configuration --- diff --git a/inventory-server/scripts/metrics-new/backfill/backfill_historical_snapshots.sql b/inventory-server/old/backfill_historical_snapshots.sql similarity index 100% rename from inventory-server/scripts/metrics-new/backfill/backfill_historical_snapshots.sql rename to inventory-server/old/backfill_historical_snapshots.sql diff --git a/inventory-server/scripts/import/orders.js b/inventory-server/scripts/import/orders.js index ad4439e..90f1f54 100644 --- a/inventory-server/scripts/import/orders.js +++ b/inventory-server/scripts/import/orders.js @@ -26,10 +26,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = let cumulativeProcessedOrders = 0; try { - // Begin transaction - await localConnection.beginTransaction(); - - // Get last sync info + // Get last sync info - NOT in a transaction anymore const [syncInfo] = await localConnection.query( "SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'orders'" ); @@ -43,8 +40,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = FROM order_items oi JOIN _order o ON oi.order_id = o.order_id WHERE o.order_status >= 15 - AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) - AND o.date_placed_onlydate IS NOT NULL + AND o.date_placed >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) + AND o.date_placed IS NOT NULL ${incrementalUpdate ? ` AND ( o.stamp > ? @@ -82,8 +79,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = FROM order_items oi JOIN _order o ON oi.order_id = o.order_id WHERE o.order_status >= 15 - AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) - AND o.date_placed_onlydate IS NOT NULL + AND o.date_placed >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) + AND o.date_placed IS NOT NULL ${incrementalUpdate ? ` AND ( o.stamp > ? @@ -107,91 +104,111 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = console.log('Orders: Found', orderItems.length, 'order items to process'); // Create tables in PostgreSQL for data processing - await localConnection.query(` - DROP TABLE IF EXISTS temp_order_items; - DROP TABLE IF EXISTS temp_order_meta; - DROP TABLE IF EXISTS temp_order_discounts; - DROP TABLE IF EXISTS temp_order_taxes; - DROP TABLE IF EXISTS temp_order_costs; - - CREATE TEMP TABLE temp_order_items ( - order_id INTEGER NOT NULL, - pid INTEGER NOT NULL, - sku TEXT NOT NULL, - price NUMERIC(14, 4) NOT NULL, - quantity INTEGER NOT NULL, - base_discount NUMERIC(14, 4) DEFAULT 0, - PRIMARY KEY (order_id, pid) - ); - - CREATE TEMP TABLE temp_order_meta ( - order_id INTEGER NOT NULL, - date TIMESTAMP WITH TIME ZONE NOT NULL, - customer TEXT NOT NULL, - customer_name TEXT NOT NULL, - status TEXT, - canceled BOOLEAN, - summary_discount NUMERIC(14, 4) DEFAULT 0.0000, - summary_subtotal NUMERIC(14, 4) DEFAULT 0.0000, - PRIMARY KEY (order_id) - ); - - CREATE TEMP TABLE temp_order_discounts ( - order_id INTEGER NOT NULL, - pid INTEGER NOT NULL, - discount NUMERIC(14, 4) NOT NULL, - PRIMARY KEY (order_id, pid) - ); - - CREATE TEMP TABLE temp_order_taxes ( - order_id INTEGER NOT NULL, - pid INTEGER NOT NULL, - tax NUMERIC(14, 4) NOT NULL, - PRIMARY KEY (order_id, pid) - ); - - CREATE TEMP TABLE temp_order_costs ( - order_id INTEGER NOT NULL, - pid INTEGER NOT NULL, - costeach NUMERIC(14, 4) DEFAULT 0.0000, - PRIMARY KEY (order_id, pid) - ); - - CREATE INDEX idx_temp_order_items_pid ON temp_order_items(pid); - CREATE INDEX idx_temp_order_meta_order_id ON temp_order_meta(order_id); - `); - - // Insert order items in batches - for (let i = 0; i < orderItems.length; i += 5000) { - const batch = orderItems.slice(i, Math.min(i + 5000, orderItems.length)); - const placeholders = batch.map((_, idx) => - `($${idx * 6 + 1}, $${idx * 6 + 2}, $${idx * 6 + 3}, $${idx * 6 + 4}, $${idx * 6 + 5}, $${idx * 6 + 6})` - ).join(","); - const values = batch.flatMap(item => [ - item.order_id, item.prod_pid, item.SKU, item.price, item.quantity, item.base_discount - ]); - + // Start a transaction just for creating the temp tables + await localConnection.beginTransaction(); + try { await localConnection.query(` - INSERT INTO temp_order_items (order_id, pid, sku, price, quantity, base_discount) - VALUES ${placeholders} - ON CONFLICT (order_id, pid) DO UPDATE SET - sku = EXCLUDED.sku, - price = EXCLUDED.price, - quantity = EXCLUDED.quantity, - base_discount = EXCLUDED.base_discount - `, values); + DROP TABLE IF EXISTS temp_order_items; + DROP TABLE IF EXISTS temp_order_meta; + DROP TABLE IF EXISTS temp_order_discounts; + DROP TABLE IF EXISTS temp_order_taxes; + DROP TABLE IF EXISTS temp_order_costs; - processedCount = i + batch.length; - outputProgress({ - status: "running", - operation: "Orders import", - message: `Loading order items: ${processedCount} of ${totalOrderItems}`, - current: processedCount, - total: totalOrderItems, - elapsed: formatElapsedTime((Date.now() - startTime) / 1000), - remaining: estimateRemaining(startTime, processedCount, totalOrderItems), - rate: calculateRate(startTime, processedCount) - }); + CREATE TEMP TABLE temp_order_items ( + order_id INTEGER NOT NULL, + pid INTEGER NOT NULL, + sku TEXT NOT NULL, + price NUMERIC(14, 4) NOT NULL, + quantity INTEGER NOT NULL, + base_discount NUMERIC(14, 4) DEFAULT 0, + PRIMARY KEY (order_id, pid) + ); + + CREATE TEMP TABLE temp_order_meta ( + order_id INTEGER NOT NULL, + date TIMESTAMP WITH TIME ZONE NOT NULL, + customer TEXT NOT NULL, + customer_name TEXT NOT NULL, + status TEXT, + canceled BOOLEAN, + summary_discount NUMERIC(14, 4) DEFAULT 0.0000, + summary_subtotal NUMERIC(14, 4) DEFAULT 0.0000, + summary_discount_subtotal NUMERIC(14, 4) DEFAULT 0.0000, + PRIMARY KEY (order_id) + ); + + CREATE TEMP TABLE temp_order_discounts ( + order_id INTEGER NOT NULL, + pid INTEGER NOT NULL, + discount NUMERIC(14, 4) NOT NULL, + PRIMARY KEY (order_id, pid) + ); + + CREATE TEMP TABLE temp_order_taxes ( + order_id INTEGER NOT NULL, + pid INTEGER NOT NULL, + tax NUMERIC(14, 4) NOT NULL, + PRIMARY KEY (order_id, pid) + ); + + CREATE TEMP TABLE temp_order_costs ( + order_id INTEGER NOT NULL, + pid INTEGER NOT NULL, + costeach NUMERIC(14, 4) DEFAULT 0.0000, + PRIMARY KEY (order_id, pid) + ); + + CREATE INDEX idx_temp_order_items_pid ON temp_order_items(pid); + CREATE INDEX idx_temp_order_meta_order_id ON temp_order_meta(order_id); + CREATE INDEX idx_temp_order_discounts_order_pid ON temp_order_discounts(order_id, pid); + CREATE INDEX idx_temp_order_taxes_order_pid ON temp_order_taxes(order_id, pid); + CREATE INDEX idx_temp_order_costs_order_pid ON temp_order_costs(order_id, pid); + `); + await localConnection.commit(); + } catch (error) { + await localConnection.rollback(); + throw error; + } + + // Insert order items in batches - each batch gets its own transaction + for (let i = 0; i < orderItems.length; i += 5000) { + await localConnection.beginTransaction(); + try { + const batch = orderItems.slice(i, Math.min(i + 5000, orderItems.length)); + const placeholders = batch.map((_, idx) => + `($${idx * 6 + 1}, $${idx * 6 + 2}, $${idx * 6 + 3}, $${idx * 6 + 4}, $${idx * 6 + 5}, $${idx * 6 + 6})` + ).join(","); + const values = batch.flatMap(item => [ + item.order_id, item.prod_pid, item.SKU, item.price, item.quantity, item.base_discount + ]); + + await localConnection.query(` + INSERT INTO temp_order_items (order_id, pid, sku, price, quantity, base_discount) + VALUES ${placeholders} + ON CONFLICT (order_id, pid) DO UPDATE SET + sku = EXCLUDED.sku, + price = EXCLUDED.price, + quantity = EXCLUDED.quantity, + base_discount = EXCLUDED.base_discount + `, values); + + await localConnection.commit(); + + processedCount = i + batch.length; + outputProgress({ + status: "running", + operation: "Orders import", + message: `Loading order items: ${processedCount} of ${totalOrderItems}`, + current: processedCount, + total: totalOrderItems, + elapsed: formatElapsedTime((Date.now() - startTime) / 1000), + remaining: estimateRemaining(startTime, processedCount, totalOrderItems), + rate: calculateRate(startTime, processedCount) + }); + } catch (error) { + await localConnection.rollback(); + throw error; + } } // Get unique order IDs @@ -218,53 +235,63 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = const [orders] = await prodConnection.query(` SELECT o.order_id, - o.date_placed_onlydate as date, + o.date_placed as date, o.order_cid as customer, CONCAT(COALESCE(u.firstname, ''), ' ', COALESCE(u.lastname, '')) as customer_name, o.order_status as status, CASE WHEN o.date_cancelled != '0000-00-00 00:00:00' THEN 1 ELSE 0 END as canceled, o.summary_discount, - o.summary_subtotal + o.summary_subtotal, + o.summary_discount_subtotal FROM _order o LEFT JOIN users u ON o.order_cid = u.cid WHERE o.order_id IN (?) `, [batchIds]); // Process in sub-batches for PostgreSQL - for (let j = 0; j < orders.length; j += PG_BATCH_SIZE) { - const subBatch = orders.slice(j, j + PG_BATCH_SIZE); - if (subBatch.length === 0) continue; + await localConnection.beginTransaction(); + try { + for (let j = 0; j < orders.length; j += PG_BATCH_SIZE) { + const subBatch = orders.slice(j, j + PG_BATCH_SIZE); + if (subBatch.length === 0) continue; - const placeholders = subBatch.map((_, idx) => - `($${idx * 8 + 1}, $${idx * 8 + 2}, $${idx * 8 + 3}, $${idx * 8 + 4}, $${idx * 8 + 5}, $${idx * 8 + 6}, $${idx * 8 + 7}, $${idx * 8 + 8})` - ).join(","); - - const values = subBatch.flatMap(order => [ - order.order_id, - new Date(order.date), // Convert to TIMESTAMP WITH TIME ZONE - order.customer, - toTitleCase(order.customer_name) || '', - order.status.toString(), // Convert status to TEXT - order.canceled, - order.summary_discount || 0, - order.summary_subtotal || 0 - ]); + const placeholders = subBatch.map((_, idx) => + `($${idx * 9 + 1}, $${idx * 9 + 2}, $${idx * 9 + 3}, $${idx * 9 + 4}, $${idx * 9 + 5}, $${idx * 9 + 6}, $${idx * 9 + 7}, $${idx * 9 + 8}, $${idx * 9 + 9})` + ).join(","); + + const values = subBatch.flatMap(order => [ + order.order_id, + new Date(order.date), // Convert to TIMESTAMP WITH TIME ZONE + order.customer, + toTitleCase(order.customer_name) || '', + order.status.toString(), // Convert status to TEXT + order.canceled, + order.summary_discount || 0, + order.summary_subtotal || 0, + order.summary_discount_subtotal || 0 + ]); - await localConnection.query(` - INSERT INTO temp_order_meta ( - order_id, date, customer, customer_name, status, canceled, - summary_discount, summary_subtotal - ) - VALUES ${placeholders} - ON CONFLICT (order_id) DO UPDATE SET - date = EXCLUDED.date, - customer = EXCLUDED.customer, - customer_name = EXCLUDED.customer_name, - status = EXCLUDED.status, - canceled = EXCLUDED.canceled, - summary_discount = EXCLUDED.summary_discount, - summary_subtotal = EXCLUDED.summary_subtotal - `, values); + await localConnection.query(` + INSERT INTO temp_order_meta ( + order_id, date, customer, customer_name, status, canceled, + summary_discount, summary_subtotal, summary_discount_subtotal + ) + VALUES ${placeholders} + ON CONFLICT (order_id) DO UPDATE SET + date = EXCLUDED.date, + customer = EXCLUDED.customer, + customer_name = EXCLUDED.customer_name, + status = EXCLUDED.status, + canceled = EXCLUDED.canceled, + summary_discount = EXCLUDED.summary_discount, + summary_subtotal = EXCLUDED.summary_subtotal, + summary_discount_subtotal = EXCLUDED.summary_discount_subtotal + `, values); + } + await localConnection.commit(); + } catch (error) { + await localConnection.rollback(); + throw error; } }; @@ -278,26 +305,33 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = if (discounts.length === 0) return; - for (let j = 0; j < discounts.length; j += PG_BATCH_SIZE) { - const subBatch = discounts.slice(j, j + PG_BATCH_SIZE); - if (subBatch.length === 0) continue; + await localConnection.beginTransaction(); + try { + for (let j = 0; j < discounts.length; j += PG_BATCH_SIZE) { + const subBatch = discounts.slice(j, j + PG_BATCH_SIZE); + if (subBatch.length === 0) continue; - const placeholders = subBatch.map((_, idx) => - `($${idx * 3 + 1}, $${idx * 3 + 2}, $${idx * 3 + 3})` - ).join(","); - - const values = subBatch.flatMap(d => [ - d.order_id, - d.pid, - d.discount || 0 - ]); + const placeholders = subBatch.map((_, idx) => + `($${idx * 3 + 1}, $${idx * 3 + 2}, $${idx * 3 + 3})` + ).join(","); + + const values = subBatch.flatMap(d => [ + d.order_id, + d.pid, + d.discount || 0 + ]); - await localConnection.query(` - INSERT INTO temp_order_discounts (order_id, pid, discount) - VALUES ${placeholders} - ON CONFLICT (order_id, pid) DO UPDATE SET - discount = EXCLUDED.discount - `, values); + await localConnection.query(` + INSERT INTO temp_order_discounts (order_id, pid, discount) + VALUES ${placeholders} + ON CONFLICT (order_id, pid) DO UPDATE SET + discount = EXCLUDED.discount + `, values); + } + await localConnection.commit(); + } catch (error) { + await localConnection.rollback(); + throw error; } }; @@ -318,26 +352,33 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = if (taxes.length === 0) return; - for (let j = 0; j < taxes.length; j += PG_BATCH_SIZE) { - const subBatch = taxes.slice(j, j + PG_BATCH_SIZE); - if (subBatch.length === 0) continue; + await localConnection.beginTransaction(); + try { + for (let j = 0; j < taxes.length; j += PG_BATCH_SIZE) { + const subBatch = taxes.slice(j, j + PG_BATCH_SIZE); + if (subBatch.length === 0) continue; - const placeholders = subBatch.map((_, idx) => - `($${idx * 3 + 1}, $${idx * 3 + 2}, $${idx * 3 + 3})` - ).join(","); - - const values = subBatch.flatMap(t => [ - t.order_id, - t.pid, - t.tax || 0 - ]); + const placeholders = subBatch.map((_, idx) => + `($${idx * 3 + 1}, $${idx * 3 + 2}, $${idx * 3 + 3})` + ).join(","); + + const values = subBatch.flatMap(t => [ + t.order_id, + t.pid, + t.tax || 0 + ]); - await localConnection.query(` - INSERT INTO temp_order_taxes (order_id, pid, tax) - VALUES ${placeholders} - ON CONFLICT (order_id, pid) DO UPDATE SET - tax = EXCLUDED.tax - `, values); + await localConnection.query(` + INSERT INTO temp_order_taxes (order_id, pid, tax) + VALUES ${placeholders} + ON CONFLICT (order_id, pid) DO UPDATE SET + tax = EXCLUDED.tax + `, values); + } + await localConnection.commit(); + } catch (error) { + await localConnection.rollback(); + throw error; } }; @@ -363,39 +404,45 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = if (costs.length === 0) return; - for (let j = 0; j < costs.length; j += PG_BATCH_SIZE) { - const subBatch = costs.slice(j, j + PG_BATCH_SIZE); - if (subBatch.length === 0) continue; + await localConnection.beginTransaction(); + try { + for (let j = 0; j < costs.length; j += PG_BATCH_SIZE) { + const subBatch = costs.slice(j, j + PG_BATCH_SIZE); + if (subBatch.length === 0) continue; - const placeholders = subBatch.map((_, idx) => - `($${idx * 3 + 1}, $${idx * 3 + 2}, $${idx * 3 + 3})` - ).join(","); - - const values = subBatch.flatMap(c => [ - c.order_id, - c.pid, - c.costeach || 0 - ]); + const placeholders = subBatch.map((_, idx) => + `($${idx * 3 + 1}, $${idx * 3 + 2}, $${idx * 3 + 3})` + ).join(","); + + const values = subBatch.flatMap(c => [ + c.order_id, + c.pid, + c.costeach || 0 + ]); - await localConnection.query(` - INSERT INTO temp_order_costs (order_id, pid, costeach) - VALUES ${placeholders} - ON CONFLICT (order_id, pid) DO UPDATE SET - costeach = EXCLUDED.costeach - `, values); + await localConnection.query(` + INSERT INTO temp_order_costs (order_id, pid, costeach) + VALUES ${placeholders} + ON CONFLICT (order_id, pid) DO UPDATE SET + costeach = EXCLUDED.costeach + `, values); + } + await localConnection.commit(); + } catch (error) { + await localConnection.rollback(); + throw error; } }; - // Process all data types in parallel for each batch + // Process all data types SEQUENTIALLY for each batch - not in parallel for (let i = 0; i < orderIds.length; i += METADATA_BATCH_SIZE) { const batchIds = orderIds.slice(i, i + METADATA_BATCH_SIZE); - await Promise.all([ - processMetadataBatch(batchIds), - processDiscountsBatch(batchIds), - processTaxesBatch(batchIds), - processCostsBatch(batchIds) - ]); + // Run these sequentially instead of in parallel to avoid transaction conflicts + await processMetadataBatch(batchIds); + await processDiscountsBatch(batchIds); + await processTaxesBatch(batchIds); + await processCostsBatch(batchIds); processedCount = i + batchIds.length; outputProgress({ @@ -422,175 +469,194 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = const existingPids = new Set(existingProducts.rows.map(p => p.pid)); // Process in smaller batches - for (let i = 0; i < orderIds.length; i += 1000) { - const batchIds = orderIds.slice(i, i + 1000); + for (let i = 0; i < orderIds.length; i += 2000) { // Increased from 1000 to 2000 + const batchIds = orderIds.slice(i, i + 2000); // Get combined data for this batch in sub-batches - const PG_BATCH_SIZE = 100; // Process 100 records at a time + const PG_BATCH_SIZE = 200; // Increased from 100 to 200 for (let j = 0; j < batchIds.length; j += PG_BATCH_SIZE) { const subBatchIds = batchIds.slice(j, j + PG_BATCH_SIZE); - const [orders] = await localConnection.query(` - WITH order_totals AS ( - SELECT - oi.order_id, - oi.pid, - SUM(COALESCE(od.discount, 0)) as promo_discount, - COALESCE(ot.tax, 0) as total_tax, - COALESCE(oc.costeach, oi.price * 0.5) as costeach - FROM temp_order_items oi - LEFT JOIN temp_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid - LEFT JOIN temp_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid - LEFT JOIN temp_order_costs oc ON oi.order_id = oc.order_id AND oi.pid = oc.pid - GROUP BY oi.order_id, oi.pid, ot.tax, oc.costeach - ) - SELECT - oi.order_id as order_number, - oi.pid::bigint as pid, - oi.sku, - om.date, - oi.price, - oi.quantity, - (oi.base_discount + - COALESCE(ot.promo_discount, 0) + - CASE - WHEN om.summary_discount > 0 AND om.summary_subtotal > 0 THEN - ROUND((om.summary_discount * (oi.price * oi.quantity)) / NULLIF(om.summary_subtotal, 0), 2) - ELSE 0 - END)::NUMERIC(14, 4) as discount, - COALESCE(ot.total_tax, 0)::NUMERIC(14, 4) as tax, - false as tax_included, - 0 as shipping, - om.customer, - om.customer_name, - om.status, - om.canceled, - COALESCE(ot.costeach, oi.price * 0.5)::NUMERIC(14, 4) as costeach - FROM ( - SELECT DISTINCT ON (order_id, pid) - order_id, pid, sku, price, quantity, base_discount - FROM temp_order_items - WHERE order_id = ANY($1) - ORDER BY order_id, pid - ) oi - JOIN temp_order_meta om ON oi.order_id = om.order_id - LEFT JOIN order_totals ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid - ORDER BY oi.order_id, oi.pid - `, [subBatchIds]); - - // Filter orders and track missing products - const validOrders = []; - const processedOrderItems = new Set(); - const processedOrders = new Set(); - - for (const order of orders.rows) { - if (!existingPids.has(order.pid)) { - missingProducts.add(order.pid); - skippedOrders.add(order.order_number); - continue; - } - validOrders.push(order); - processedOrderItems.add(`${order.order_number}-${order.pid}`); - processedOrders.add(order.order_number); - } - - // Process valid orders in smaller sub-batches - const FINAL_BATCH_SIZE = 50; - for (let k = 0; k < validOrders.length; k += FINAL_BATCH_SIZE) { - const subBatch = validOrders.slice(k, k + FINAL_BATCH_SIZE); - - const placeholders = subBatch.map((_, idx) => { - const base = idx * 15; // 15 columns including costeach - return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, $${base + 10}, $${base + 11}, $${base + 12}, $${base + 13}, $${base + 14}, $${base + 15})`; - }).join(','); - - const batchValues = subBatch.flatMap(o => [ - o.order_number, - o.pid, - o.sku || 'NO-SKU', - o.date, // This is now a TIMESTAMP WITH TIME ZONE - o.price, - o.quantity, - o.discount, - o.tax, - o.tax_included, - o.shipping, - o.customer, - o.customer_name, - o.status.toString(), // Convert status to TEXT - o.canceled, - o.costeach - ]); - - const [result] = await localConnection.query(` - WITH inserted_orders AS ( - INSERT INTO orders ( - order_number, pid, sku, date, price, quantity, discount, - tax, tax_included, shipping, customer, customer_name, - status, canceled, costeach - ) - VALUES ${placeholders} - ON CONFLICT (order_number, pid) DO UPDATE SET - sku = EXCLUDED.sku, - date = EXCLUDED.date, - price = EXCLUDED.price, - quantity = EXCLUDED.quantity, - discount = EXCLUDED.discount, - tax = EXCLUDED.tax, - tax_included = EXCLUDED.tax_included, - shipping = EXCLUDED.shipping, - customer = EXCLUDED.customer, - customer_name = EXCLUDED.customer_name, - status = EXCLUDED.status, - canceled = EXCLUDED.canceled, - costeach = EXCLUDED.costeach - RETURNING xmax = 0 as inserted + // Start a transaction for this sub-batch + await localConnection.beginTransaction(); + try { + const [orders] = await localConnection.query(` + WITH order_totals AS ( + SELECT + oi.order_id, + oi.pid, + SUM(COALESCE(od.discount, 0)) as promo_discount, + COALESCE(ot.tax, 0) as total_tax, + COALESCE(oc.costeach, oi.price * 0.5) as costeach + FROM temp_order_items oi + LEFT JOIN temp_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid + LEFT JOIN temp_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid + LEFT JOIN temp_order_costs oc ON oi.order_id = oc.order_id AND oi.pid = oc.pid + WHERE oi.order_id = ANY($1) + GROUP BY oi.order_id, oi.pid, ot.tax, oc.costeach ) SELECT - COUNT(*) FILTER (WHERE inserted) as inserted, - COUNT(*) FILTER (WHERE NOT inserted) as updated - FROM inserted_orders - `, batchValues); - - const { inserted, updated } = result.rows[0]; - recordsAdded += parseInt(inserted) || 0; - recordsUpdated += parseInt(updated) || 0; - importedCount += subBatch.length; - } + oi.order_id as order_number, + oi.pid::bigint as pid, + oi.sku, + om.date, + oi.price, + oi.quantity, + ( + -- Part 1: Sale Savings for the Line + (oi.base_discount * oi.quantity) + + + -- Part 2: Prorated Points Discount (if applicable) + CASE + WHEN om.summary_discount_subtotal > 0 AND om.summary_subtotal > 0 THEN + COALESCE(ROUND((om.summary_discount_subtotal * (oi.price * oi.quantity)) / NULLIF(om.summary_subtotal, 0), 4), 0) + ELSE 0 + END + + + -- Part 3: Specific Promo Code Discount (if applicable) + COALESCE(ot.promo_discount, 0) + )::NUMERIC(14, 4) as discount, + COALESCE(ot.total_tax, 0)::NUMERIC(14, 4) as tax, + false as tax_included, + 0 as shipping, + om.customer, + om.customer_name, + om.status, + om.canceled, + COALESCE(ot.costeach, oi.price * 0.5)::NUMERIC(14, 4) as costeach + FROM temp_order_items oi + JOIN temp_order_meta om ON oi.order_id = om.order_id + LEFT JOIN order_totals ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid + WHERE oi.order_id = ANY($1) + ORDER BY oi.order_id, oi.pid + `, [subBatchIds]); - cumulativeProcessedOrders += processedOrders.size; - outputProgress({ - status: "running", - operation: "Orders import", - message: `Importing orders: ${cumulativeProcessedOrders} of ${totalUniqueOrders}`, - current: cumulativeProcessedOrders, - total: totalUniqueOrders, - elapsed: formatElapsedTime((Date.now() - startTime) / 1000), - remaining: estimateRemaining(startTime, cumulativeProcessedOrders, totalUniqueOrders), - rate: calculateRate(startTime, cumulativeProcessedOrders) - }); + // Filter orders and track missing products + const validOrders = []; + const processedOrderItems = new Set(); + const processedOrders = new Set(); + + for (const order of orders.rows) { + if (!existingPids.has(order.pid)) { + missingProducts.add(order.pid); + skippedOrders.add(order.order_number); + continue; + } + validOrders.push(order); + processedOrderItems.add(`${order.order_number}-${order.pid}`); + processedOrders.add(order.order_number); + } + + // Process valid orders in smaller sub-batches + const FINAL_BATCH_SIZE = 100; // Increased from 50 to 100 + for (let k = 0; k < validOrders.length; k += FINAL_BATCH_SIZE) { + const subBatch = validOrders.slice(k, k + FINAL_BATCH_SIZE); + + const placeholders = subBatch.map((_, idx) => { + const base = idx * 15; // 15 columns including costeach + return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, $${base + 10}, $${base + 11}, $${base + 12}, $${base + 13}, $${base + 14}, $${base + 15})`; + }).join(','); + + const batchValues = subBatch.flatMap(o => [ + o.order_number, + o.pid, + o.sku || 'NO-SKU', + o.date, // This is now a TIMESTAMP WITH TIME ZONE + o.price, + o.quantity, + o.discount, + o.tax, + o.tax_included, + o.shipping, + o.customer, + o.customer_name, + o.status.toString(), // Convert status to TEXT + o.canceled, + o.costeach + ]); + + const [result] = await localConnection.query(` + WITH inserted_orders AS ( + INSERT INTO orders ( + order_number, pid, sku, date, price, quantity, discount, + tax, tax_included, shipping, customer, customer_name, + status, canceled, costeach + ) + VALUES ${placeholders} + ON CONFLICT (order_number, pid) DO UPDATE SET + sku = EXCLUDED.sku, + date = EXCLUDED.date, + price = EXCLUDED.price, + quantity = EXCLUDED.quantity, + discount = EXCLUDED.discount, + tax = EXCLUDED.tax, + tax_included = EXCLUDED.tax_included, + shipping = EXCLUDED.shipping, + customer = EXCLUDED.customer, + customer_name = EXCLUDED.customer_name, + status = EXCLUDED.status, + canceled = EXCLUDED.canceled, + costeach = EXCLUDED.costeach + RETURNING xmax = 0 as inserted + ) + SELECT + COUNT(*) FILTER (WHERE inserted) as inserted, + COUNT(*) FILTER (WHERE NOT inserted) as updated + FROM inserted_orders + `, batchValues); + + const { inserted, updated } = result.rows[0]; + recordsAdded += parseInt(inserted) || 0; + recordsUpdated += parseInt(updated) || 0; + importedCount += subBatch.length; + } + + await localConnection.commit(); + + cumulativeProcessedOrders += processedOrders.size; + outputProgress({ + status: "running", + operation: "Orders import", + message: `Importing orders: ${cumulativeProcessedOrders} of ${totalUniqueOrders}`, + current: cumulativeProcessedOrders, + total: totalUniqueOrders, + elapsed: formatElapsedTime((Date.now() - startTime) / 1000), + remaining: estimateRemaining(startTime, cumulativeProcessedOrders, totalUniqueOrders), + rate: calculateRate(startTime, cumulativeProcessedOrders) + }); + } catch (error) { + await localConnection.rollback(); + throw error; + } } } - // Update sync status - await localConnection.query(` - INSERT INTO sync_status (table_name, last_sync_timestamp) - VALUES ('orders', NOW()) - ON CONFLICT (table_name) DO UPDATE SET - last_sync_timestamp = NOW() - `); - - // Cleanup temporary tables - await localConnection.query(` - DROP TABLE IF EXISTS temp_order_items; - DROP TABLE IF EXISTS temp_order_meta; - DROP TABLE IF EXISTS temp_order_discounts; - DROP TABLE IF EXISTS temp_order_taxes; - DROP TABLE IF EXISTS temp_order_costs; - `); - - // Commit transaction - await localConnection.commit(); + // Start a transaction for updating sync status and dropping temp tables + await localConnection.beginTransaction(); + try { + // Update sync status + await localConnection.query(` + INSERT INTO sync_status (table_name, last_sync_timestamp) + VALUES ('orders', NOW()) + ON CONFLICT (table_name) DO UPDATE SET + last_sync_timestamp = NOW() + `); + + // Cleanup temporary tables + await localConnection.query(` + DROP TABLE IF EXISTS temp_order_items; + DROP TABLE IF EXISTS temp_order_meta; + DROP TABLE IF EXISTS temp_order_discounts; + DROP TABLE IF EXISTS temp_order_taxes; + DROP TABLE IF EXISTS temp_order_costs; + `); + + // Commit final transaction + await localConnection.commit(); + } catch (error) { + await localConnection.rollback(); + throw error; + } return { status: "complete", @@ -604,16 +670,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = }; } catch (error) { console.error("Error during orders import:", error); - - // Rollback transaction - try { - await localConnection.rollback(); - } catch (rollbackError) { - console.error("Error during rollback:", rollbackError); - } - throw error; } } -module.exports = importOrders; +module.exports = importOrders; \ No newline at end of file diff --git a/inventory-server/scripts/import/products.js b/inventory-server/scripts/import/products.js index ec156d1..9c76ecc 100644 --- a/inventory-server/scripts/import/products.js +++ b/inventory-server/scripts/import/products.js @@ -8,29 +8,7 @@ dotenv.config({ path: path.join(__dirname, "../../.env") }); // Utility functions const imageUrlBase = process.env.PRODUCT_IMAGE_URL_BASE || 'https://sbing.com/i/products/0000/'; - -// Modified to accept a db connection for querying product_images -const getImageUrls = async (pid, prodConnection, iid = null) => { - // If iid isn't provided, try to get it from product_images - if (iid === null && prodConnection) { - try { - // Query for images with order=255 (default/primary images) - const [primaryImages] = await prodConnection.query( - 'SELECT iid FROM product_images WHERE pid = ? AND `order` = 255 LIMIT 1', - [pid] - ); - - // Use the found iid or default to 1 - iid = primaryImages.length > 0 ? primaryImages[0].iid : 1; - } catch (error) { - console.error(`Error fetching primary image for pid ${pid}:`, error); - iid = 1; // Fallback to default - } - } else { - // Use default if connection not provided - iid = iid || 1; - } - +const getImageUrls = (pid, iid = 1) => { const paddedPid = pid.toString().padStart(6, '0'); // Use padded PID only for the first 3 digits const prefix = paddedPid.slice(0, 3); @@ -120,6 +98,7 @@ async function setupTemporaryTables(connection) { baskets INTEGER, notifies INTEGER, date_last_sold TIMESTAMP WITH TIME ZONE, + primary_iid INTEGER, image TEXT, image_175 TEXT, image_full TEXT, @@ -217,6 +196,7 @@ async function importMissingProducts(prodConnection, localConnection, missingPid (SELECT COUNT(*) FROM product_notify pn WHERE pn.pid = p.pid) AS notifies, (SELECT COALESCE(SUM(oi.qty_ordered), 0) FROM order_items oi WHERE oi.prod_pid = p.pid) AS total_sold, pls.date_sold as date_last_sold, + (SELECT iid FROM product_images WHERE pid = p.pid AND \`order\` = 255 LIMIT 1) AS primary_iid, GROUP_CONCAT(DISTINCT CASE WHEN pc.cat_id IS NOT NULL AND pc.type IN (10, 20, 11, 21, 12, 13) @@ -255,15 +235,13 @@ async function importMissingProducts(prodConnection, localConnection, missingPid const batch = prodData.slice(i, i + BATCH_SIZE); const placeholders = batch.map((_, idx) => { - const base = idx * 47; // 47 columns - return `(${Array.from({ length: 47 }, (_, i) => `$${base + i + 1}`).join(', ')})`; + const base = idx * 48; // 48 columns + return `(${Array.from({ length: 48 }, (_, i) => `$${base + i + 1}`).join(', ')})`; }).join(','); - // Process image URLs for the batch - const processedValues = []; - for (const row of batch) { - const imageUrls = await getImageUrls(row.pid, prodConnection); - processedValues.push([ + const values = batch.flatMap(row => { + const imageUrls = getImageUrls(row.pid, row.primary_iid || 1); + return [ row.pid, row.title, row.description, @@ -306,15 +284,14 @@ async function importMissingProducts(prodConnection, localConnection, missingPid row.baskets, row.notifies, validateDate(row.date_last_sold), + row.primary_iid, imageUrls.image, imageUrls.image_175, imageUrls.image_full, null, null - ]); - } - - const values = processedValues.flat(); + ]; + }); const [result] = await localConnection.query(` WITH inserted_products AS ( @@ -325,7 +302,7 @@ async function importMissingProducts(prodConnection, localConnection, missingPid landing_cost_price, barcode, harmonized_tariff_code, updated_at, visible, managing_stock, replenishable, permalink, moq, uom, rating, reviews, weight, length, width, height, country_of_origin, location, total_sold, - baskets, notifies, date_last_sold, image, image_175, image_full, options, tags + baskets, notifies, date_last_sold, primary_iid, image, image_175, image_full, options, tags ) VALUES ${placeholders} ON CONFLICT (pid) DO NOTHING @@ -422,6 +399,7 @@ async function materializeCalculations(prodConnection, localConnection, incremen (SELECT COUNT(*) FROM product_notify pn WHERE pn.pid = p.pid) AS notifies, (SELECT COALESCE(SUM(oi.qty_ordered), 0) FROM order_items oi WHERE oi.prod_pid = p.pid) AS total_sold, pls.date_sold as date_last_sold, + (SELECT iid FROM product_images WHERE pid = p.pid AND \`order\` = 255 LIMIT 1) AS primary_iid, GROUP_CONCAT(DISTINCT CASE WHEN pc.cat_id IS NOT NULL AND pc.type IN (10, 20, 11, 21, 12, 13) @@ -448,9 +426,11 @@ async function materializeCalculations(prodConnection, localConnection, incremen pcp.date_deactive > ? OR pcp.date_active > ? OR pnb.date_updated > ? + -- Add condition for product_images changes if needed for incremental updates + -- OR EXISTS (SELECT 1 FROM product_images pi WHERE pi.pid = p.pid AND pi.stamp > ?) ` : 'TRUE'} GROUP BY p.pid - `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []); + `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime /*, lastSyncTime */] : []); outputProgress({ status: "running", @@ -464,15 +444,13 @@ async function materializeCalculations(prodConnection, localConnection, incremen await withRetry(async () => { const placeholders = batch.map((_, idx) => { - const base = idx * 47; // 47 columns - return `(${Array.from({ length: 47 }, (_, i) => `$${base + i + 1}`).join(', ')})`; + const base = idx * 48; // 48 columns + return `(${Array.from({ length: 48 }, (_, i) => `$${base + i + 1}`).join(', ')})`; }).join(','); - // Process image URLs for the batch - const processedValues = []; - for (const row of batch) { - const imageUrls = await getImageUrls(row.pid, prodConnection); - processedValues.push([ + const values = batch.flatMap(row => { + const imageUrls = getImageUrls(row.pid, row.primary_iid || 1); + return [ row.pid, row.title, row.description, @@ -515,15 +493,14 @@ async function materializeCalculations(prodConnection, localConnection, incremen row.baskets, row.notifies, validateDate(row.date_last_sold), + row.primary_iid, imageUrls.image, imageUrls.image_175, imageUrls.image_full, null, null - ]); - } - - const values = processedValues.flat(); + ]; + }); await localConnection.query(` INSERT INTO temp_products ( @@ -533,7 +510,7 @@ async function materializeCalculations(prodConnection, localConnection, incremen landing_cost_price, barcode, harmonized_tariff_code, updated_at, visible, managing_stock, replenishable, permalink, moq, uom, rating, reviews, weight, length, width, height, country_of_origin, location, total_sold, - baskets, notifies, date_last_sold, image, image_175, image_full, options, tags + baskets, notifies, date_last_sold, primary_iid, image, image_175, image_full, options, tags ) VALUES ${placeholders} ON CONFLICT (pid) DO UPDATE SET title = EXCLUDED.title, @@ -576,6 +553,7 @@ async function materializeCalculations(prodConnection, localConnection, incremen baskets = EXCLUDED.baskets, notifies = EXCLUDED.notifies, date_last_sold = EXCLUDED.date_last_sold, + primary_iid = EXCLUDED.primary_iid, image = EXCLUDED.image, image_175 = EXCLUDED.image_175, image_full = EXCLUDED.image_full, @@ -674,6 +652,7 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate t.baskets, t.notifies, t.date_last_sold, + t.primary_iid, t.image, t.image_175, t.image_full, @@ -695,11 +674,9 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate return `(${Array.from({ length: 47 }, (_, i) => `$${base + i + 1}`).join(', ')})`; }).join(','); - // Process image URLs for the batch - const processedValues = []; - for (const row of batch) { - const imageUrls = await getImageUrls(row.pid, prodConnection); - processedValues.push([ + const values = batch.flatMap(row => { + const imageUrls = getImageUrls(row.pid, row.primary_iid || 1); + return [ row.pid, row.title, row.description, @@ -747,10 +724,8 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate imageUrls.image_full, row.options, row.tags - ]); - } - - const values = processedValues.flat(); + ]; + }); const [result] = await localConnection.query(` WITH upserted AS ( diff --git a/inventory-server/scripts/metrics-new/update_daily_snapshots.sql b/inventory-server/scripts/metrics-new/update_daily_snapshots.sql index 82c9697..7165451 100644 --- a/inventory-server/scripts/metrics-new/update_daily_snapshots.sql +++ b/inventory-server/scripts/metrics-new/update_daily_snapshots.sql @@ -1,4 +1,4 @@ --- Description: Calculates and updates daily aggregated product data for the current day. +-- Description: Calculates and updates daily aggregated product data for recent days. -- Uses UPSERT (INSERT ON CONFLICT UPDATE) for idempotency. -- Dependencies: Core import tables (products, orders, purchase_orders), calculate_status table. -- Frequency: Hourly (Run ~5-10 minutes after hourly data import completes). @@ -8,211 +8,243 @@ DECLARE _module_name TEXT := 'daily_snapshots'; _start_time TIMESTAMPTZ := clock_timestamp(); -- Time execution started _last_calc_time TIMESTAMPTZ; - _target_date DATE := CURRENT_DATE; -- Always recalculate today for simplicity with hourly runs + _target_date DATE; -- Will be set in the loop _total_records INT := 0; _has_orders BOOLEAN := FALSE; + _process_days INT := 5; -- Number of days to check/process (today plus previous 4 days) + _day_counter INT; + _missing_days INT[] := ARRAY[]::INT[]; -- Array to store days with missing or incomplete data BEGIN -- Get the timestamp before the last successful run of this module SELECT last_calculation_timestamp INTO _last_calc_time FROM public.calculate_status WHERE module_name = _module_name; - RAISE NOTICE 'Running % for date %. Start Time: %', _module_name, _target_date, _start_time; - - -- CRITICAL FIX: Check if we have any orders or receiving activity for today - -- to prevent creating artificial records when no real activity exists - SELECT EXISTS ( - SELECT 1 FROM public.orders WHERE date::date = _target_date - UNION - SELECT 1 FROM public.purchase_orders - WHERE date::date = _target_date - OR EXISTS ( - SELECT 1 FROM jsonb_array_elements(receiving_history) AS rh - WHERE jsonb_typeof(receiving_history) = 'array' - AND ( - (rh->>'date')::date = _target_date OR - (rh->>'received_at')::date = _target_date OR - (rh->>'receipt_date')::date = _target_date - ) - ) - LIMIT 1 - ) INTO _has_orders; + RAISE NOTICE 'Running % script. Start Time: %', _module_name, _start_time; - -- If no orders or receiving activity found for today, log and exit - IF NOT _has_orders THEN - RAISE NOTICE 'No orders or receiving activity found for % - skipping daily snapshot creation', _target_date; + -- First, check which days need processing by comparing orders data with snapshot data + FOR _day_counter IN 0..(_process_days-1) LOOP + _target_date := CURRENT_DATE - (_day_counter * INTERVAL '1 day'); - -- Still update the calculate_status to prevent repeated attempts + -- Check if this date needs updating by comparing orders to snapshot data + -- If the date has orders but not enough snapshots, or if snapshots show zero sales but orders exist, it's incomplete + SELECT + CASE WHEN ( + -- We have orders for this date but not enough snapshots, or snapshots with wrong total + (EXISTS (SELECT 1 FROM public.orders WHERE date::date = _target_date) AND + ( + -- No snapshots exist for this date + NOT EXISTS (SELECT 1 FROM public.daily_product_snapshots WHERE snapshot_date = _target_date) OR + -- Or snapshots show zero sales but orders exist + (SELECT COALESCE(SUM(units_sold), 0) FROM public.daily_product_snapshots WHERE snapshot_date = _target_date) = 0 OR + -- Or the count of snapshot records is significantly less than distinct products in orders + (SELECT COUNT(*) FROM public.daily_product_snapshots WHERE snapshot_date = _target_date) < + (SELECT COUNT(DISTINCT pid) FROM public.orders WHERE date::date = _target_date) * 0.8 + ) + ) + ) THEN TRUE ELSE FALSE END + INTO _has_orders; + + IF _has_orders THEN + -- This day needs processing - add to our array + _missing_days := _missing_days || _day_counter; + RAISE NOTICE 'Day % needs updating (incomplete or missing data)', _target_date; + END IF; + END LOOP; + + -- If no days need updating, exit early + IF array_length(_missing_days, 1) IS NULL THEN + RAISE NOTICE 'No days need updating - all snapshot data appears complete'; + + -- Still update the calculate_status to record this run UPDATE public.calculate_status SET last_calculation_timestamp = _start_time WHERE module_name = _module_name; - RETURN; -- Exit without creating snapshots + RETURN; END IF; + + RAISE NOTICE 'Need to update % days with missing or incomplete data', array_length(_missing_days, 1); - -- IMPORTANT: First delete any existing data for this date to prevent duplication - DELETE FROM public.daily_product_snapshots - WHERE snapshot_date = _target_date; + -- Process only the days that need updating + FOREACH _day_counter IN ARRAY _missing_days LOOP + _target_date := CURRENT_DATE - (_day_counter * INTERVAL '1 day'); + RAISE NOTICE 'Processing date: %', _target_date; + + -- IMPORTANT: First delete any existing data for this date to prevent duplication + DELETE FROM public.daily_product_snapshots + WHERE snapshot_date = _target_date; - -- Proceed with calculating daily metrics only for products with actual activity - WITH SalesData AS ( - SELECT - p.pid, - p.sku, - -- Track number of orders to ensure we have real data - COUNT(o.id) as order_count, - -- Aggregate Sales (Quantity > 0, Status not Canceled/Returned) - COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN o.quantity ELSE 0 END), 0) AS units_sold, - COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN o.price * o.quantity ELSE 0 END), 0.00) AS gross_revenue_unadjusted, -- Before discount - COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN o.discount ELSE 0 END), 0.00) AS discounts, - COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN COALESCE(o.costeach, p.landing_cost_price, p.cost_price) * o.quantity ELSE 0 END), 0.00) AS cogs, - COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN p.regular_price * o.quantity ELSE 0 END), 0.00) AS gross_regular_revenue, -- Use current regular price for simplicity here + -- Proceed with calculating daily metrics only for products with actual activity + WITH SalesData AS ( + SELECT + p.pid, + p.sku, + -- Track number of orders to ensure we have real data + COUNT(o.id) as order_count, + -- Aggregate Sales (Quantity > 0, Status not Canceled/Returned) + COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN o.quantity ELSE 0 END), 0) AS units_sold, + COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN o.price * o.quantity ELSE 0 END), 0.00) AS gross_revenue_unadjusted, -- Before discount + COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN o.discount ELSE 0 END), 0.00) AS discounts, + COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN COALESCE(o.costeach, p.landing_cost_price, p.cost_price) * o.quantity ELSE 0 END), 0.00) AS cogs, + COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN p.regular_price * o.quantity ELSE 0 END), 0.00) AS gross_regular_revenue, -- Use current regular price for simplicity here - -- Aggregate Returns (Quantity < 0 or Status = Returned) - COALESCE(SUM(CASE WHEN o.quantity < 0 OR COALESCE(o.status, 'pending') = 'returned' THEN ABS(o.quantity) ELSE 0 END), 0) AS units_returned, - COALESCE(SUM(CASE WHEN o.quantity < 0 OR COALESCE(o.status, 'pending') = 'returned' THEN o.price * ABS(o.quantity) ELSE 0 END), 0.00) AS returns_revenue - FROM public.products p -- Start from products to include those with no orders today - LEFT JOIN public.orders o - ON p.pid = o.pid - AND o.date::date = _target_date -- Cast to date to ensure compatibility regardless of original type - GROUP BY p.pid, p.sku - HAVING COUNT(o.id) > 0 -- CRITICAL: Only include products with actual orders - ), - ReceivingData AS ( - SELECT - po.pid, - -- Track number of POs to ensure we have real data - COUNT(po.po_id) as po_count, - -- Prioritize the actual table fields over the JSON data - COALESCE( - -- First try the received field from purchase_orders table - SUM(CASE WHEN po.date::date = _target_date THEN po.received ELSE 0 END), + -- Aggregate Returns (Quantity < 0 or Status = Returned) + COALESCE(SUM(CASE WHEN o.quantity < 0 OR COALESCE(o.status, 'pending') = 'returned' THEN ABS(o.quantity) ELSE 0 END), 0) AS units_returned, + COALESCE(SUM(CASE WHEN o.quantity < 0 OR COALESCE(o.status, 'pending') = 'returned' THEN o.price * ABS(o.quantity) ELSE 0 END), 0.00) AS returns_revenue + FROM public.products p -- Start from products to include those with no orders today + JOIN public.orders o -- Changed to INNER JOIN to only process products with orders + ON p.pid = o.pid + AND o.date::date = _target_date -- Cast to date to ensure compatibility regardless of original type + GROUP BY p.pid, p.sku + -- No HAVING clause here - we always want to include all orders + ), + ReceivingData AS ( + SELECT + po.pid, + -- Track number of POs to ensure we have real data + COUNT(po.po_id) as po_count, + -- Prioritize the actual table fields over the JSON data + COALESCE( + -- First try the received field from purchase_orders table + SUM(CASE WHEN po.date::date = _target_date THEN po.received ELSE 0 END), + + -- Otherwise fall back to the receiving_history JSON as secondary source + SUM( + CASE + WHEN (rh.item->>'date')::date = _target_date THEN (rh.item->>'qty')::numeric + WHEN (rh.item->>'received_at')::date = _target_date THEN (rh.item->>'qty')::numeric + WHEN (rh.item->>'receipt_date')::date = _target_date THEN (rh.item->>'qty')::numeric + ELSE 0 + END + ), + 0 + ) AS units_received, - -- Otherwise fall back to the receiving_history JSON as secondary source - SUM( - CASE - WHEN (rh.item->>'date')::date = _target_date THEN (rh.item->>'qty')::numeric - WHEN (rh.item->>'received_at')::date = _target_date THEN (rh.item->>'qty')::numeric - WHEN (rh.item->>'receipt_date')::date = _target_date THEN (rh.item->>'qty')::numeric - ELSE 0 - END - ), - 0 - ) AS units_received, - - COALESCE( - -- First try the actual cost_price from purchase_orders - SUM(CASE WHEN po.date::date = _target_date THEN po.received * po.cost_price ELSE 0 END), - - -- Otherwise fall back to receiving_history JSON - SUM( - CASE - WHEN (rh.item->>'date')::date = _target_date THEN (rh.item->>'qty')::numeric - WHEN (rh.item->>'received_at')::date = _target_date THEN (rh.item->>'qty')::numeric - WHEN (rh.item->>'receipt_date')::date = _target_date THEN (rh.item->>'qty')::numeric - ELSE 0 - END - * COALESCE((rh.item->>'cost')::numeric, po.cost_price) - ), - 0.00 - ) AS cost_received - FROM public.purchase_orders po - LEFT JOIN LATERAL jsonb_array_elements(po.receiving_history) AS rh(item) ON - jsonb_typeof(po.receiving_history) = 'array' AND - jsonb_array_length(po.receiving_history) > 0 AND - ( - (rh.item->>'date')::date = _target_date OR - (rh.item->>'received_at')::date = _target_date OR - (rh.item->>'receipt_date')::date = _target_date - ) - -- Include POs with the current date or relevant receiving_history - WHERE - po.date::date = _target_date OR - jsonb_typeof(po.receiving_history) = 'array' AND - jsonb_array_length(po.receiving_history) > 0 - GROUP BY po.pid - -- CRITICAL: Only include products with actual receiving activity - HAVING COUNT(po.po_id) > 0 OR SUM( - CASE - WHEN (rh.item->>'date')::date = _target_date THEN (rh.item->>'qty')::numeric - WHEN (rh.item->>'received_at')::date = _target_date THEN (rh.item->>'qty')::numeric - WHEN (rh.item->>'receipt_date')::date = _target_date THEN (rh.item->>'qty')::numeric - ELSE 0 - END - ) > 0 - ), - CurrentStock AS ( - -- Select current stock values directly from products table - SELECT + COALESCE( + -- First try the actual cost_price from purchase_orders + SUM(CASE WHEN po.date::date = _target_date THEN po.received * po.cost_price ELSE 0 END), + + -- Otherwise fall back to receiving_history JSON + SUM( + CASE + WHEN (rh.item->>'date')::date = _target_date THEN (rh.item->>'qty')::numeric + WHEN (rh.item->>'received_at')::date = _target_date THEN (rh.item->>'qty')::numeric + WHEN (rh.item->>'receipt_date')::date = _target_date THEN (rh.item->>'qty')::numeric + ELSE 0 + END + * COALESCE((rh.item->>'cost')::numeric, po.cost_price) + ), + 0.00 + ) AS cost_received + FROM public.purchase_orders po + LEFT JOIN LATERAL jsonb_array_elements(po.receiving_history) AS rh(item) ON + jsonb_typeof(po.receiving_history) = 'array' AND + jsonb_array_length(po.receiving_history) > 0 AND + ( + (rh.item->>'date')::date = _target_date OR + (rh.item->>'received_at')::date = _target_date OR + (rh.item->>'receipt_date')::date = _target_date + ) + -- Include POs with the current date or relevant receiving_history + WHERE + po.date::date = _target_date OR + jsonb_typeof(po.receiving_history) = 'array' AND + jsonb_array_length(po.receiving_history) > 0 + GROUP BY po.pid + -- CRITICAL: Only include products with actual receiving activity + HAVING COUNT(po.po_id) > 0 OR SUM( + CASE + WHEN (rh.item->>'date')::date = _target_date THEN (rh.item->>'qty')::numeric + WHEN (rh.item->>'received_at')::date = _target_date THEN (rh.item->>'qty')::numeric + WHEN (rh.item->>'receipt_date')::date = _target_date THEN (rh.item->>'qty')::numeric + ELSE 0 + END + ) > 0 + ), + CurrentStock AS ( + -- Select current stock values directly from products table + SELECT + pid, + stock_quantity, + COALESCE(landing_cost_price, cost_price, 0.00) as effective_cost_price, + COALESCE(price, 0.00) as current_price, + COALESCE(regular_price, 0.00) as current_regular_price + FROM public.products + ), + ProductsWithActivity AS ( + -- Quick pre-filter to only process products with activity + SELECT DISTINCT pid + FROM ( + SELECT pid FROM SalesData + UNION + SELECT pid FROM ReceivingData + ) a + ) + -- Now insert records, but ONLY for products with actual activity + INSERT INTO public.daily_product_snapshots ( + snapshot_date, pid, - stock_quantity, - COALESCE(landing_cost_price, cost_price, 0.00) as effective_cost_price, - COALESCE(price, 0.00) as current_price, - COALESCE(regular_price, 0.00) as current_regular_price - FROM public.products - ) - -- Now insert records, but ONLY for products with actual activity - INSERT INTO public.daily_product_snapshots ( - snapshot_date, - pid, - sku, - eod_stock_quantity, - eod_stock_cost, - eod_stock_retail, - eod_stock_gross, - stockout_flag, - units_sold, - units_returned, - gross_revenue, - discounts, - returns_revenue, - net_revenue, - cogs, - gross_regular_revenue, - profit, - units_received, - cost_received, - calculation_timestamp - ) - SELECT - _target_date AS snapshot_date, - COALESCE(sd.pid, rd.pid) AS pid, -- Use sales or receiving PID - COALESCE(sd.sku, p.sku) AS sku, -- Get SKU from sales data or products table - -- Inventory Metrics (Using CurrentStock) - cs.stock_quantity AS eod_stock_quantity, - cs.stock_quantity * cs.effective_cost_price AS eod_stock_cost, - cs.stock_quantity * cs.current_price AS eod_stock_retail, - cs.stock_quantity * cs.current_regular_price AS eod_stock_gross, - (cs.stock_quantity <= 0) AS stockout_flag, - -- Sales Metrics (From SalesData) - COALESCE(sd.units_sold, 0), - COALESCE(sd.units_returned, 0), - COALESCE(sd.gross_revenue_unadjusted, 0.00), - COALESCE(sd.discounts, 0.00), - COALESCE(sd.returns_revenue, 0.00), - COALESCE(sd.gross_revenue_unadjusted, 0.00) - COALESCE(sd.discounts, 0.00) AS net_revenue, - COALESCE(sd.cogs, 0.00), - COALESCE(sd.gross_regular_revenue, 0.00), - (COALESCE(sd.gross_revenue_unadjusted, 0.00) - COALESCE(sd.discounts, 0.00)) - COALESCE(sd.cogs, 0.00) AS profit, -- Basic profit: Net Revenue - COGS - -- Receiving Metrics (From ReceivingData) - COALESCE(rd.units_received, 0), - COALESCE(rd.cost_received, 0.00), - _start_time -- Timestamp of this calculation run - FROM SalesData sd - FULL OUTER JOIN ReceivingData rd ON sd.pid = rd.pid - LEFT JOIN public.products p ON COALESCE(sd.pid, rd.pid) = p.pid - LEFT JOIN CurrentStock cs ON COALESCE(sd.pid, rd.pid) = cs.pid - WHERE p.pid IS NOT NULL; -- Ensure we only insert for existing products + sku, + eod_stock_quantity, + eod_stock_cost, + eod_stock_retail, + eod_stock_gross, + stockout_flag, + units_sold, + units_returned, + gross_revenue, + discounts, + returns_revenue, + net_revenue, + cogs, + gross_regular_revenue, + profit, + units_received, + cost_received, + calculation_timestamp + ) + SELECT + _target_date AS snapshot_date, + COALESCE(sd.pid, rd.pid) AS pid, -- Use sales or receiving PID + COALESCE(sd.sku, p.sku) AS sku, -- Get SKU from sales data or products table + -- Inventory Metrics (Using CurrentStock) + cs.stock_quantity AS eod_stock_quantity, + cs.stock_quantity * cs.effective_cost_price AS eod_stock_cost, + cs.stock_quantity * cs.current_price AS eod_stock_retail, + cs.stock_quantity * cs.current_regular_price AS eod_stock_gross, + (cs.stock_quantity <= 0) AS stockout_flag, + -- Sales Metrics (From SalesData) + COALESCE(sd.units_sold, 0), + COALESCE(sd.units_returned, 0), + COALESCE(sd.gross_revenue_unadjusted, 0.00), + COALESCE(sd.discounts, 0.00), + COALESCE(sd.returns_revenue, 0.00), + COALESCE(sd.gross_revenue_unadjusted, 0.00) - COALESCE(sd.discounts, 0.00) AS net_revenue, + COALESCE(sd.cogs, 0.00), + COALESCE(sd.gross_regular_revenue, 0.00), + (COALESCE(sd.gross_revenue_unadjusted, 0.00) - COALESCE(sd.discounts, 0.00)) - COALESCE(sd.cogs, 0.00) AS profit, -- Basic profit: Net Revenue - COGS + -- Receiving Metrics (From ReceivingData) + COALESCE(rd.units_received, 0), + COALESCE(rd.cost_received, 0.00), + _start_time -- Timestamp of this calculation run + FROM SalesData sd + FULL OUTER JOIN ReceivingData rd ON sd.pid = rd.pid + JOIN ProductsWithActivity pwa ON COALESCE(sd.pid, rd.pid) = pwa.pid + LEFT JOIN public.products p ON COALESCE(sd.pid, rd.pid) = p.pid + LEFT JOIN CurrentStock cs ON COALESCE(sd.pid, rd.pid) = cs.pid + WHERE p.pid IS NOT NULL; -- Ensure we only insert for existing products - -- Get the total number of records inserted - GET DIAGNOSTICS _total_records = ROW_COUNT; - RAISE NOTICE 'Created % daily snapshot records for % with sales/receiving activity', _total_records, _target_date; + -- Get the total number of records inserted for this date + GET DIAGNOSTICS _total_records = ROW_COUNT; + RAISE NOTICE 'Created % daily snapshot records for % with sales/receiving activity', _total_records, _target_date; + END LOOP; -- Update the status table with the timestamp from the START of this run UPDATE public.calculate_status SET last_calculation_timestamp = _start_time WHERE module_name = _module_name; - RAISE NOTICE 'Finished % for date %. Duration: %', _module_name, _target_date, clock_timestamp() - _start_time; + RAISE NOTICE 'Finished % processing for multiple dates. Duration: %', _module_name, clock_timestamp() - _start_time; END $$; \ No newline at end of file diff --git a/inventory-server/src/routes/csv.js b/inventory-server/src/routes/csv.js index 7c4cf0d..e5c182d 100644 --- a/inventory-server/src/routes/csv.js +++ b/inventory-server/src/routes/csv.js @@ -844,7 +844,7 @@ router.get('/status/table-counts', async (req, res) => { // Core tables 'products', 'categories', 'product_categories', 'orders', 'purchase_orders', // New metrics tables - 'product_metrics', 'daily_product_snapshots', + 'product_metrics', 'daily_product_snapshots','brand_metrics','category_metrics','vendor_metrics', // Config tables 'settings_global', 'settings_vendor', 'settings_product' ]; @@ -867,7 +867,7 @@ router.get('/status/table-counts', async (req, res) => { // Group tables by type const groupedCounts = { core: counts.filter(c => ['products', 'categories', 'product_categories', 'orders', 'purchase_orders'].includes(c.table_name)), - metrics: counts.filter(c => ['product_metrics', 'daily_product_snapshots'].includes(c.table_name)), + metrics: counts.filter(c => ['product_metrics', 'daily_product_snapshots','brand_metrics','category_metrics','vendor_metrics'].includes(c.table_name)), config: counts.filter(c => ['settings_global', 'settings_vendor', 'settings_product'].includes(c.table_name)) };