const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress'); const { getConnection } = require('./utils/db'); // Helper function to handle NaN and undefined values function sanitizeValue(value) { if (value === undefined || value === null || Number.isNaN(value)) { return null; } return value; } async function calculateProductMetrics(startTime, totalProducts, processedCount = 0, isCancelled = false) { const connection = await getConnection(); let success = false; let processedOrders = 0; const BATCH_SIZE = 5000; try { // Skip flags are inherited from the parent scope const SKIP_PRODUCT_BASE_METRICS = 0; const SKIP_PRODUCT_TIME_AGGREGATES = 0; // Get total product count if not provided if (!totalProducts) { const productCount = await connection.query('SELECT COUNT(*) as count FROM products'); totalProducts = parseInt(productCount.rows[0].count); } if (isCancelled) { outputProgress({ status: 'cancelled', operation: 'Product metrics calculation cancelled', current: processedCount, total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: null, rate: calculateRate(startTime, processedCount), percentage: ((processedCount / totalProducts) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); return { processedProducts: processedCount, processedOrders, processedPurchaseOrders: 0, success }; } // First ensure all products have a metrics record await connection.query(` INSERT INTO product_metrics (pid, last_calculated_at) SELECT pid, NOW() FROM products ON CONFLICT (pid) DO NOTHING `); // Get threshold settings once const thresholds = await connection.query(` SELECT critical_days, reorder_days, overstock_days, low_stock_threshold FROM stock_thresholds WHERE category_id IS NULL AND vendor IS NULL LIMIT 1 `); const defaultThresholds = thresholds.rows[0]; // Calculate base product metrics if (!SKIP_PRODUCT_BASE_METRICS) { outputProgress({ status: 'running', operation: 'Starting base product metrics calculation', current: processedCount, total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, processedCount, totalProducts), rate: calculateRate(startTime, processedCount), percentage: ((processedCount / totalProducts) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); // Get order count that will be processed const orderCount = await connection.query(` SELECT COUNT(*) as count FROM orders o WHERE o.canceled = false `); processedOrders = parseInt(orderCount.rows[0].count); // Clear temporary tables await connection.query('DROP TABLE IF EXISTS temp_sales_metrics'); await connection.query('DROP TABLE IF EXISTS temp_purchase_metrics'); // Create temp_sales_metrics await connection.query(` CREATE TEMPORARY TABLE temp_sales_metrics ( pid BIGINT NOT NULL, daily_sales_avg DECIMAL(10,3), weekly_sales_avg DECIMAL(10,3), monthly_sales_avg DECIMAL(10,3), total_revenue DECIMAL(10,3), avg_margin_percent DECIMAL(10,3), first_sale_date DATE, last_sale_date DATE, PRIMARY KEY (pid) ) `); // Create temp_purchase_metrics await connection.query(` CREATE TEMPORARY TABLE temp_purchase_metrics ( pid BIGINT NOT NULL, avg_lead_time_days DOUBLE PRECISION, last_purchase_date DATE, first_received_date DATE, last_received_date DATE, PRIMARY KEY (pid) ) `); // Populate temp_sales_metrics with base stats and sales averages await connection.query(` INSERT INTO temp_sales_metrics SELECT p.pid, COALESCE(SUM(o.quantity) / NULLIF(COUNT(DISTINCT DATE(o.date)), 0), 0) as daily_sales_avg, COALESCE(SUM(o.quantity) / NULLIF(CEIL(COUNT(DISTINCT DATE(o.date)) / 7), 0), 0) as weekly_sales_avg, COALESCE(SUM(o.quantity) / NULLIF(CEIL(COUNT(DISTINCT DATE(o.date)) / 30), 0), 0) as monthly_sales_avg, COALESCE(SUM(o.quantity * o.price), 0) as total_revenue, CASE WHEN SUM(o.quantity * o.price) > 0 THEN ((SUM(o.quantity * o.price) - SUM(o.quantity * p.cost_price)) / SUM(o.quantity * o.price)) * 100 ELSE 0 END as avg_margin_percent, MIN(o.date) as first_sale_date, MAX(o.date) as last_sale_date FROM products p LEFT JOIN orders o ON p.pid = o.pid AND o.canceled = false AND o.date >= CURRENT_DATE - INTERVAL '90 days' GROUP BY p.pid `); // Populate temp_purchase_metrics await connection.query(` INSERT INTO temp_purchase_metrics SELECT p.pid, AVG( CASE WHEN po.received_date IS NOT NULL AND po.date IS NOT NULL THEN EXTRACT(EPOCH FROM (po.received_date::timestamp with time zone - po.date::timestamp with time zone)) / 86400.0 ELSE NULL END ) as avg_lead_time_days, MAX(po.date) as last_purchase_date, MIN(po.received_date) as first_received_date, MAX(po.received_date) as last_received_date FROM products p LEFT JOIN purchase_orders po ON p.pid = po.pid AND po.received_date IS NOT NULL AND po.date >= CURRENT_DATE - INTERVAL '365 days' GROUP BY p.pid `); // Process updates in batches let lastPid = 0; while (true) { if (isCancelled) break; const batch = await connection.query( 'SELECT pid FROM products WHERE pid > $1 ORDER BY pid LIMIT $2', [lastPid, BATCH_SIZE] ); if (batch.rows.length === 0) break; await connection.query(` UPDATE product_metrics pm SET inventory_value = p.stock_quantity * NULLIF(p.cost_price, 0), daily_sales_avg = COALESCE(sm.daily_sales_avg, 0), weekly_sales_avg = COALESCE(sm.weekly_sales_avg, 0), monthly_sales_avg = COALESCE(sm.monthly_sales_avg, 0), total_revenue = COALESCE(sm.total_revenue, 0), avg_margin_percent = COALESCE(sm.avg_margin_percent, 0), first_sale_date = sm.first_sale_date, last_sale_date = sm.last_sale_date, avg_lead_time_days = COALESCE(lm.avg_lead_time_days, 30), days_of_inventory = CASE WHEN COALESCE(sm.daily_sales_avg, 0) > 0 THEN FLOOR(p.stock_quantity / NULLIF(sm.daily_sales_avg, 0)) ELSE NULL END, weeks_of_inventory = CASE WHEN COALESCE(sm.weekly_sales_avg, 0) > 0 THEN FLOOR(p.stock_quantity / NULLIF(sm.weekly_sales_avg, 0)) ELSE NULL END, stock_status = CASE WHEN p.stock_quantity <= 0 THEN 'Out of Stock' WHEN COALESCE(sm.daily_sales_avg, 0) = 0 AND p.stock_quantity <= $1 THEN 'Low Stock' WHEN COALESCE(sm.daily_sales_avg, 0) = 0 THEN 'In Stock' WHEN p.stock_quantity / NULLIF(sm.daily_sales_avg, 0) <= $2 THEN 'Critical' WHEN p.stock_quantity / NULLIF(sm.daily_sales_avg, 0) <= $3 THEN 'Reorder' WHEN p.stock_quantity / NULLIF(sm.daily_sales_avg, 0) > $4 THEN 'Overstocked' ELSE 'Healthy' END, safety_stock = CASE WHEN COALESCE(sm.daily_sales_avg, 0) > 0 THEN CEIL(sm.daily_sales_avg * SQRT(ABS(COALESCE(lm.avg_lead_time_days, 30))) * 1.96) ELSE $5 END, reorder_point = CASE WHEN COALESCE(sm.daily_sales_avg, 0) > 0 THEN CEIL(sm.daily_sales_avg * COALESCE(lm.avg_lead_time_days, 30)) + CEIL(sm.daily_sales_avg * SQRT(ABS(COALESCE(lm.avg_lead_time_days, 30))) * 1.96) ELSE $6 END, reorder_qty = CASE WHEN COALESCE(sm.daily_sales_avg, 0) > 0 AND NULLIF(p.cost_price, 0) IS NOT NULL AND NULLIF(p.cost_price, 0) > 0 THEN GREATEST( CEIL(SQRT(ABS((2 * (sm.daily_sales_avg * 365) * 25) / (NULLIF(p.cost_price, 0) * 0.25)))), $7 ) ELSE $8 END, overstocked_amt = CASE WHEN p.stock_quantity / NULLIF(sm.daily_sales_avg, 0) > $9 THEN GREATEST(0, p.stock_quantity - CEIL(sm.daily_sales_avg * $10)) ELSE 0 END, last_calculated_at = NOW() FROM products p LEFT JOIN temp_sales_metrics sm ON p.pid = sm.pid LEFT JOIN temp_purchase_metrics lm ON p.pid = lm.pid WHERE p.pid = ANY($11::bigint[]) `, [ defaultThresholds.low_stock_threshold, defaultThresholds.critical_days, defaultThresholds.reorder_days, defaultThresholds.overstock_days, defaultThresholds.low_stock_threshold, defaultThresholds.low_stock_threshold, defaultThresholds.low_stock_threshold, defaultThresholds.low_stock_threshold, defaultThresholds.overstock_days, defaultThresholds.overstock_days, batch.rows.map(row => row.pid) ] ); lastPid = batch.rows[batch.rows.length - 1].pid; processedCount += batch.rows.length; outputProgress({ status: 'running', operation: 'Processing base metrics batch', current: processedCount, total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, processedCount, totalProducts), rate: calculateRate(startTime, processedCount), percentage: ((processedCount / totalProducts) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); } // Calculate forecast accuracy and bias in batches lastPid = 0; while (true) { if (isCancelled) break; const batch = await connection.query( 'SELECT pid FROM products WHERE pid > $1 ORDER BY pid LIMIT $2', [lastPid, BATCH_SIZE] ); if (batch.rows.length === 0) break; await connection.query(` UPDATE product_metrics pm SET forecast_accuracy = GREATEST(0, 100 - LEAST(fa.avg_forecast_error, 100)), forecast_bias = GREATEST(-100, LEAST(fa.avg_forecast_bias, 100)), last_forecast_date = fa.last_forecast_date, last_calculated_at = NOW() FROM ( SELECT sf.pid, AVG(CASE WHEN o.quantity > 0 THEN ABS(sf.forecast_quantity - o.quantity) / o.quantity * 100 ELSE 100 END) as avg_forecast_error, AVG(CASE WHEN o.quantity > 0 THEN (sf.forecast_quantity - o.quantity) / o.quantity * 100 ELSE 0 END) as avg_forecast_bias, MAX(sf.forecast_date) as last_forecast_date FROM sales_forecasts sf JOIN orders o ON sf.pid = o.pid AND DATE(o.date) = sf.forecast_date WHERE o.canceled = false AND sf.forecast_date >= CURRENT_DATE - INTERVAL '90 days' AND sf.pid = ANY($1::bigint[]) GROUP BY sf.pid ) fa WHERE pm.pid = fa.pid `, [batch.rows.map(row => row.pid)]); lastPid = batch.rows[batch.rows.length - 1].pid; } } // Calculate product time aggregates if (!SKIP_PRODUCT_TIME_AGGREGATES) { outputProgress({ status: 'running', operation: 'Starting product time aggregates calculation', current: processedCount || 0, total: totalProducts || 0, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, processedCount || 0, totalProducts || 0), rate: calculateRate(startTime, processedCount || 0), percentage: (((processedCount || 0) / (totalProducts || 1)) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); // Calculate time-based aggregates await connection.query(` INSERT INTO product_time_aggregates ( pid, year, month, total_quantity_sold, total_revenue, total_cost, order_count, avg_price, profit_margin, inventory_value, gmroi ) SELECT p.pid, EXTRACT(YEAR FROM o.date) as year, EXTRACT(MONTH FROM o.date) as month, SUM(o.quantity) as total_quantity_sold, SUM(o.quantity * o.price) as total_revenue, SUM(o.quantity * p.cost_price) as total_cost, COUNT(DISTINCT o.order_number) as order_count, AVG(o.price) as avg_price, CASE WHEN SUM(o.quantity * o.price) > 0 THEN ((SUM(o.quantity * o.price) - SUM(o.quantity * p.cost_price)) / SUM(o.quantity * o.price)) * 100 ELSE 0 END as profit_margin, p.cost_price * p.stock_quantity as inventory_value, CASE WHEN p.cost_price * p.stock_quantity > 0 THEN (SUM(o.quantity * (o.price - p.cost_price))) / (p.cost_price * p.stock_quantity) ELSE 0 END as gmroi FROM products p LEFT JOIN orders o ON p.pid = o.pid AND o.canceled = false WHERE o.date >= CURRENT_DATE - INTERVAL '12 months' GROUP BY p.pid, EXTRACT(YEAR FROM o.date), EXTRACT(MONTH FROM o.date) ON CONFLICT (pid, year, month) DO UPDATE SET total_quantity_sold = EXCLUDED.total_quantity_sold, total_revenue = EXCLUDED.total_revenue, total_cost = EXCLUDED.total_cost, order_count = EXCLUDED.order_count, avg_price = EXCLUDED.avg_price, profit_margin = EXCLUDED.profit_margin, inventory_value = EXCLUDED.inventory_value, gmroi = EXCLUDED.gmroi `); processedCount = Math.floor(totalProducts * 0.6); outputProgress({ status: 'running', operation: 'Product time aggregates calculated', current: processedCount || 0, total: totalProducts || 0, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, processedCount || 0, totalProducts || 0), rate: calculateRate(startTime, processedCount || 0), percentage: (((processedCount || 0) / (totalProducts || 1)) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); } else { processedCount = Math.floor(totalProducts * 0.6); outputProgress({ status: 'running', operation: 'Skipping product time aggregates calculation', current: processedCount || 0, total: totalProducts || 0, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, processedCount || 0, totalProducts || 0), rate: calculateRate(startTime, processedCount || 0), percentage: (((processedCount || 0) / (totalProducts || 1)) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); } // Calculate ABC classification outputProgress({ status: 'running', operation: 'Starting ABC classification', current: processedCount, total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, processedCount, totalProducts), rate: calculateRate(startTime, processedCount), percentage: ((processedCount / totalProducts) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); if (isCancelled) return { processedProducts: processedCount, processedOrders, processedPurchaseOrders: 0, // This module doesn't process POs success }; const abcConfig = await connection.query('SELECT a_threshold, b_threshold FROM abc_classification_config WHERE id = 1'); const abcThresholds = abcConfig.rows[0] || { a_threshold: 20, b_threshold: 50 }; // First, create and populate the rankings table with an index await connection.query('DROP TABLE IF EXISTS temp_revenue_ranks'); await connection.query(` CREATE TEMPORARY TABLE temp_revenue_ranks ( pid BIGINT NOT NULL, total_revenue DECIMAL(10,3), rank_num INT, dense_rank_num INT, percentile DECIMAL(5,2), total_count INT, PRIMARY KEY (pid) ) `); await connection.query('CREATE INDEX ON temp_revenue_ranks (rank_num)'); await connection.query('CREATE INDEX ON temp_revenue_ranks (dense_rank_num)'); await connection.query('CREATE INDEX ON temp_revenue_ranks (percentile)'); // Calculate rankings with proper tie handling await connection.query(` INSERT INTO temp_revenue_ranks WITH revenue_data AS ( SELECT pid, total_revenue, COUNT(*) OVER () as total_count, PERCENT_RANK() OVER (ORDER BY total_revenue DESC) * 100 as percentile, RANK() OVER (ORDER BY total_revenue DESC) as rank_num, DENSE_RANK() OVER (ORDER BY total_revenue DESC) as dense_rank_num FROM product_metrics WHERE total_revenue > 0 ) SELECT pid, total_revenue, rank_num, dense_rank_num, percentile, total_count FROM revenue_data `); // Get total count for percentage calculation const rankingCount = await connection.query('SELECT MAX(rank_num) as total_count FROM temp_revenue_ranks'); const totalCount = parseInt(rankingCount.rows[0].total_count) || 1; const max_rank = totalCount; // Process updates in batches let abcProcessedCount = 0; const batchSize = 5000; while (true) { if (isCancelled) return { processedProducts: processedCount, processedOrders, processedPurchaseOrders: 0, // This module doesn't process POs success }; // Get a batch of PIDs that need updating const pids = await connection.query(` SELECT pm.pid FROM product_metrics pm LEFT JOIN temp_revenue_ranks tr ON pm.pid = tr.pid WHERE pm.abc_class IS NULL OR pm.abc_class != CASE WHEN tr.pid IS NULL THEN 'C' WHEN tr.percentile <= $1 THEN 'A' WHEN tr.percentile <= $2 THEN 'B' ELSE 'C' END LIMIT $3 `, [abcThresholds.a_threshold, abcThresholds.b_threshold, batchSize]); if (pids.rows.length === 0) break; const pidValues = pids.rows.map(row => row.pid); await connection.query(` UPDATE product_metrics pm SET abc_class = CASE WHEN tr.pid IS NULL THEN 'C' WHEN tr.percentile <= $1 THEN 'A' WHEN tr.percentile <= $2 THEN 'B' ELSE 'C' END, last_calculated_at = NOW() FROM (SELECT pid, percentile FROM temp_revenue_ranks) tr WHERE pm.pid = tr.pid AND pm.pid = ANY($3::bigint[]) OR (pm.pid = ANY($3::bigint[]) AND tr.pid IS NULL) `, [abcThresholds.a_threshold, abcThresholds.b_threshold, pidValues]); // Now update turnover rate with proper handling of zero inventory periods await connection.query(` UPDATE product_metrics pm SET turnover_rate = CASE WHEN sales.avg_nonzero_stock > 0 AND sales.active_days > 0 THEN LEAST( (sales.total_sold / sales.avg_nonzero_stock) * (365.0 / sales.active_days), 999.99 ) ELSE 0 END, last_calculated_at = NOW() FROM ( SELECT o.pid, SUM(o.quantity) as total_sold, COUNT(DISTINCT DATE(o.date)) as active_days, AVG(CASE WHEN p.stock_quantity > 0 THEN p.stock_quantity ELSE NULL END) as avg_nonzero_stock FROM orders o JOIN products p ON o.pid = p.pid WHERE o.canceled = false AND o.date >= CURRENT_DATE - INTERVAL '90 days' AND o.pid = ANY($1::bigint[]) GROUP BY o.pid ) sales WHERE pm.pid = sales.pid `, [pidValues]); abcProcessedCount += pids.rows.length; // Calculate progress proportionally to batch size processedCount = Math.floor(totalProducts * (0.60 + (abcProcessedCount / totalProducts) * 0.2)); outputProgress({ status: 'running', operation: 'ABC classification progress', current: processedCount, total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, processedCount, totalProducts), rate: calculateRate(startTime, processedCount), percentage: ((processedCount / totalProducts) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); } // If we get here, everything completed successfully success = true; // Update calculate_status await connection.query(` INSERT INTO calculate_status (module_name, last_calculation_timestamp) VALUES ('product_metrics', NOW()) ON CONFLICT (module_name) DO UPDATE SET last_calculation_timestamp = NOW() `); return { processedProducts: processedCount || 0, processedOrders: processedOrders || 0, processedPurchaseOrders: 0, // This module doesn't process POs success }; } catch (error) { success = false; logError(error, 'Error calculating product metrics'); throw error; } finally { if (connection) { connection.release(); } } } function calculateStockStatus(stock, config, daily_sales_avg, weekly_sales_avg, monthly_sales_avg) { if (stock <= 0) { return 'Out of Stock'; } // Use the most appropriate sales average based on data quality let sales_avg = daily_sales_avg; if (sales_avg === 0) { sales_avg = weekly_sales_avg / 7; } if (sales_avg === 0) { sales_avg = monthly_sales_avg / 30; } if (sales_avg === 0) { return stock <= config.low_stock_threshold ? 'Low Stock' : 'In Stock'; } const days_of_stock = stock / sales_avg; if (days_of_stock <= config.critical_days) { return 'Critical'; } else if (days_of_stock <= config.reorder_days) { return 'Reorder'; } else if (days_of_stock > config.overstock_days) { return 'Overstocked'; } return 'Healthy'; } function calculateReorderQuantities(stock, stock_status, daily_sales_avg, avg_lead_time, config) { // Calculate safety stock based on service level and lead time const z_score = 1.96; // 95% service level const lead_time = avg_lead_time || config.target_days; const safety_stock = Math.ceil(daily_sales_avg * Math.sqrt(lead_time) * z_score); // Calculate reorder point const lead_time_demand = daily_sales_avg * lead_time; const reorder_point = Math.ceil(lead_time_demand + safety_stock); // Calculate reorder quantity using EOQ formula if we have the necessary data let reorder_qty = 0; if (daily_sales_avg > 0) { const annual_demand = daily_sales_avg * 365; const order_cost = 25; // Fixed cost per order const holding_cost = config.cost_price * 0.25; // 25% of unit cost as annual holding cost reorder_qty = Math.ceil(Math.sqrt((2 * annual_demand * order_cost) / holding_cost)); } else { // If no sales data, use a basic calculation reorder_qty = Math.max(safety_stock, config.low_stock_threshold); } // Calculate overstocked amount const overstocked_amt = stock_status === 'Overstocked' ? stock - Math.ceil(daily_sales_avg * config.overstock_days) : 0; return { safety_stock, reorder_point, reorder_qty, overstocked_amt }; } module.exports = calculateProductMetrics;