const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress'); const { getConnection } = require('./utils/db'); // Helper function to handle NaN and undefined values function sanitizeValue(value) { if (value === undefined || value === null || Number.isNaN(value)) { return null; } return value; } async function calculateProductMetrics(startTime, totalProducts, processedCount = 0, isCancelled = false) { const connection = await getConnection(); let success = false; let processedOrders = 0; const BATCH_SIZE = 5000; try { // Get last calculation timestamp const [lastCalc] = await connection.query(` SELECT last_calculation_timestamp FROM calculate_status WHERE module_name = 'product_metrics' `); const lastCalculationTime = lastCalc[0]?.last_calculation_timestamp || '1970-01-01'; // Get total product count if not provided if (!totalProducts) { const [productCount] = await connection.query(` SELECT COUNT(DISTINCT p.pid) as count FROM products p LEFT JOIN orders o ON p.pid = o.pid AND o.updated > ? LEFT JOIN purchase_orders po ON p.pid = po.pid AND po.updated > ? WHERE p.updated > ? OR o.pid IS NOT NULL OR po.pid IS NOT NULL `, [lastCalculationTime, lastCalculationTime, lastCalculationTime]); totalProducts = productCount[0].count; } if (totalProducts === 0) { console.log('No products need updating'); return { processedProducts: 0, processedOrders: 0, processedPurchaseOrders: 0, success: true }; } // Skip flags are inherited from the parent scope const SKIP_PRODUCT_BASE_METRICS = 0; const SKIP_PRODUCT_TIME_AGGREGATES = 0; if (isCancelled) { outputProgress({ status: 'cancelled', operation: 'Product metrics calculation cancelled', current: processedCount, total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: null, rate: calculateRate(startTime, processedCount), percentage: ((processedCount / totalProducts) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); return { processedProducts: processedCount, processedOrders, processedPurchaseOrders: 0, success }; } // First ensure all products have a metrics record await connection.query(` INSERT IGNORE INTO product_metrics (pid, last_calculated_at) SELECT pid, NOW() FROM products `); // Get threshold settings once const [thresholds] = await connection.query(` SELECT critical_days, reorder_days, overstock_days, low_stock_threshold FROM stock_thresholds WHERE category_id IS NULL AND vendor IS NULL LIMIT 1 `); const defaultThresholds = thresholds[0]; // Calculate base product metrics if (!SKIP_PRODUCT_BASE_METRICS) { outputProgress({ status: 'running', operation: 'Starting base product metrics calculation', current: processedCount, total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, processedCount, totalProducts), rate: calculateRate(startTime, processedCount), percentage: ((processedCount / totalProducts) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); // Get order count that will be processed const [orderCount] = await connection.query(` SELECT COUNT(*) as count FROM orders o WHERE o.canceled = false `); processedOrders = orderCount[0].count; // Clear temporary tables await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_sales_metrics'); await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_purchase_metrics'); // Create optimized temporary tables with indexes await connection.query(` CREATE TEMPORARY TABLE temp_sales_metrics ( pid BIGINT NOT NULL, daily_sales_avg DECIMAL(10,3), weekly_sales_avg DECIMAL(10,3), monthly_sales_avg DECIMAL(10,3), total_revenue DECIMAL(10,2), avg_margin_percent DECIMAL(5,2), first_sale_date DATE, last_sale_date DATE, PRIMARY KEY (pid), INDEX (daily_sales_avg), INDEX (total_revenue) ) ENGINE=MEMORY `); await connection.query(` CREATE TEMPORARY TABLE temp_purchase_metrics ( pid BIGINT NOT NULL, avg_lead_time_days DECIMAL(5,1), last_purchase_date DATE, first_received_date DATE, last_received_date DATE, PRIMARY KEY (pid), INDEX (avg_lead_time_days) ) ENGINE=MEMORY `); // Populate temp_sales_metrics with base stats and sales averages using FORCE INDEX await connection.query(` INSERT INTO temp_sales_metrics SELECT p.pid, COALESCE(SUM(o.quantity) / NULLIF(COUNT(DISTINCT DATE(o.date)), 0), 0) as daily_sales_avg, COALESCE(SUM(o.quantity) / NULLIF(CEIL(COUNT(DISTINCT DATE(o.date)) / 7), 0), 0) as weekly_sales_avg, COALESCE(SUM(o.quantity) / NULLIF(CEIL(COUNT(DISTINCT DATE(o.date)) / 30), 0), 0) as monthly_sales_avg, COALESCE(SUM(o.quantity * o.price), 0) as total_revenue, CASE WHEN SUM(o.quantity * o.price) > 0 THEN ((SUM(o.quantity * o.price) - SUM(o.quantity * p.cost_price)) / SUM(o.quantity * o.price)) * 100 ELSE 0 END as avg_margin_percent, MIN(o.date) as first_sale_date, MAX(o.date) as last_sale_date FROM products p FORCE INDEX (PRIMARY) LEFT JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid AND o.canceled = false AND o.date >= DATE_SUB(CURDATE(), INTERVAL 90 DAY) WHERE p.updated > ? OR EXISTS ( SELECT 1 FROM orders o2 FORCE INDEX (idx_orders_metrics) WHERE o2.pid = p.pid AND o2.canceled = false AND o2.updated > ? ) GROUP BY p.pid `, [lastCalculationTime, lastCalculationTime]); // Populate temp_purchase_metrics with optimized index usage await connection.query(` INSERT INTO temp_purchase_metrics SELECT p.pid, AVG(DATEDIFF(po.received_date, po.date)) as avg_lead_time_days, MAX(po.date) as last_purchase_date, MIN(po.received_date) as first_received_date, MAX(po.received_date) as last_received_date FROM products p FORCE INDEX (PRIMARY) LEFT JOIN purchase_orders po FORCE INDEX (idx_po_metrics) ON p.pid = po.pid AND po.received_date IS NOT NULL AND po.date >= DATE_SUB(CURDATE(), INTERVAL 365 DAY) WHERE p.updated > ? OR EXISTS ( SELECT 1 FROM purchase_orders po2 FORCE INDEX (idx_po_metrics) WHERE po2.pid = p.pid AND po2.updated > ? ) GROUP BY p.pid `, [lastCalculationTime, lastCalculationTime]); // Process updates in batches, but only for affected products let lastPid = 0; while (true) { if (isCancelled) break; const [batch] = await connection.query(` SELECT DISTINCT p.pid FROM products p LEFT JOIN orders o ON p.pid = o.pid AND o.updated > ? LEFT JOIN purchase_orders po ON p.pid = po.pid AND po.updated > ? WHERE p.pid > ? AND ( p.updated > ? OR o.pid IS NOT NULL OR po.pid IS NOT NULL ) ORDER BY p.pid LIMIT ? `, [lastCalculationTime, lastCalculationTime, lastPid, lastCalculationTime, BATCH_SIZE]); if (batch.length === 0) break; await connection.query(` UPDATE product_metrics pm JOIN products p ON pm.pid = p.pid LEFT JOIN temp_sales_metrics sm ON pm.pid = sm.pid LEFT JOIN temp_purchase_metrics lm ON pm.pid = lm.pid SET pm.inventory_value = p.stock_quantity * p.cost_price, pm.daily_sales_avg = COALESCE(sm.daily_sales_avg, 0), pm.weekly_sales_avg = COALESCE(sm.weekly_sales_avg, 0), pm.monthly_sales_avg = COALESCE(sm.monthly_sales_avg, 0), pm.total_revenue = COALESCE(sm.total_revenue, 0), pm.avg_margin_percent = COALESCE(sm.avg_margin_percent, 0), pm.first_sale_date = sm.first_sale_date, pm.last_sale_date = sm.last_sale_date, pm.avg_lead_time_days = COALESCE(lm.avg_lead_time_days, 30), pm.days_of_inventory = CASE WHEN COALESCE(sm.daily_sales_avg, 0) > 0 THEN FLOOR(p.stock_quantity / sm.daily_sales_avg) ELSE NULL END, pm.weeks_of_inventory = CASE WHEN COALESCE(sm.weekly_sales_avg, 0) > 0 THEN FLOOR(p.stock_quantity / sm.weekly_sales_avg) ELSE NULL END, pm.stock_status = CASE WHEN p.stock_quantity <= 0 THEN 'Out of Stock' WHEN COALESCE(sm.daily_sales_avg, 0) = 0 AND p.stock_quantity <= ? THEN 'Low Stock' WHEN COALESCE(sm.daily_sales_avg, 0) = 0 THEN 'In Stock' WHEN p.stock_quantity / NULLIF(sm.daily_sales_avg, 0) <= ? THEN 'Critical' WHEN p.stock_quantity / NULLIF(sm.daily_sales_avg, 0) <= ? THEN 'Reorder' WHEN p.stock_quantity / NULLIF(sm.daily_sales_avg, 0) > ? THEN 'Overstocked' ELSE 'Healthy' END, pm.reorder_qty = CASE WHEN COALESCE(sm.daily_sales_avg, 0) > 0 THEN GREATEST( CEIL(sm.daily_sales_avg * COALESCE(lm.avg_lead_time_days, 30) * 1.96), ? ) ELSE ? END, pm.overstocked_amt = CASE WHEN p.stock_quantity / NULLIF(sm.daily_sales_avg, 0) > ? THEN GREATEST(0, p.stock_quantity - CEIL(sm.daily_sales_avg * ?)) ELSE 0 END, pm.last_calculated_at = NOW() WHERE p.pid IN (?) `, [ defaultThresholds.low_stock_threshold, defaultThresholds.critical_days, defaultThresholds.reorder_days, defaultThresholds.overstock_days, defaultThresholds.low_stock_threshold, defaultThresholds.low_stock_threshold, defaultThresholds.overstock_days, defaultThresholds.overstock_days, batch.map(row => row.pid) ]); lastPid = batch[batch.length - 1].pid; processedCount += batch.length; outputProgress({ status: 'running', operation: 'Processing base metrics batch', current: processedCount, total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, processedCount, totalProducts), rate: calculateRate(startTime, processedCount), percentage: ((processedCount / totalProducts) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); } // Calculate forecast accuracy and bias in batches lastPid = 0; while (true) { if (isCancelled) break; const [batch] = await connection.query( 'SELECT pid FROM products WHERE pid > ? ORDER BY pid LIMIT ?', [lastPid, BATCH_SIZE] ); if (batch.length === 0) break; await connection.query(` UPDATE product_metrics pm JOIN ( SELECT sf.pid, AVG(CASE WHEN o.quantity > 0 THEN ABS(sf.forecast_units - o.quantity) / o.quantity * 100 ELSE 100 END) as avg_forecast_error, AVG(CASE WHEN o.quantity > 0 THEN (sf.forecast_units - o.quantity) / o.quantity * 100 ELSE 0 END) as avg_forecast_bias, MAX(sf.forecast_date) as last_forecast_date FROM sales_forecasts sf JOIN orders o ON sf.pid = o.pid AND DATE(o.date) = sf.forecast_date WHERE o.canceled = false AND sf.forecast_date >= DATE_SUB(CURRENT_DATE, INTERVAL 90 DAY) AND sf.pid IN (?) GROUP BY sf.pid ) fa ON pm.pid = fa.pid SET pm.forecast_accuracy = GREATEST(0, 100 - LEAST(fa.avg_forecast_error, 100)), pm.forecast_bias = GREATEST(-100, LEAST(fa.avg_forecast_bias, 100)), pm.last_forecast_date = fa.last_forecast_date, pm.last_calculated_at = NOW() WHERE pm.pid IN (?) `, [batch.map(row => row.pid), batch.map(row => row.pid)]); lastPid = batch[batch.length - 1].pid; } } // Calculate product time aggregates if (!SKIP_PRODUCT_TIME_AGGREGATES) { outputProgress({ status: 'running', operation: 'Starting product time aggregates calculation', current: processedCount || 0, total: totalProducts || 0, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, processedCount || 0, totalProducts || 0), rate: calculateRate(startTime, processedCount || 0), percentage: (((processedCount || 0) / (totalProducts || 1)) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); // Calculate time-based aggregates await connection.query(` INSERT INTO product_time_aggregates ( pid, year, month, total_quantity_sold, total_revenue, total_cost, order_count, avg_price, profit_margin, inventory_value, gmroi ) SELECT p.pid, YEAR(o.date) as year, MONTH(o.date) as month, SUM(o.quantity) as total_quantity_sold, SUM(o.quantity * o.price) as total_revenue, SUM(o.quantity * p.cost_price) as total_cost, COUNT(DISTINCT o.order_number) as order_count, AVG(o.price) as avg_price, CASE WHEN SUM(o.quantity * o.price) > 0 THEN ((SUM(o.quantity * o.price) - SUM(o.quantity * p.cost_price)) / SUM(o.quantity * o.price)) * 100 ELSE 0 END as profit_margin, p.cost_price * p.stock_quantity as inventory_value, CASE WHEN p.cost_price * p.stock_quantity > 0 THEN (SUM(o.quantity * (o.price - p.cost_price))) / (p.cost_price * p.stock_quantity) ELSE 0 END as gmroi FROM products p LEFT JOIN orders o ON p.pid = o.pid AND o.canceled = false WHERE o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH) GROUP BY p.pid, YEAR(o.date), MONTH(o.date) ON DUPLICATE KEY UPDATE total_quantity_sold = VALUES(total_quantity_sold), total_revenue = VALUES(total_revenue), total_cost = VALUES(total_cost), order_count = VALUES(order_count), avg_price = VALUES(avg_price), profit_margin = VALUES(profit_margin), inventory_value = VALUES(inventory_value), gmroi = VALUES(gmroi) `); processedCount = Math.floor(totalProducts * 0.6); outputProgress({ status: 'running', operation: 'Product time aggregates calculated', current: processedCount || 0, total: totalProducts || 0, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, processedCount || 0, totalProducts || 0), rate: calculateRate(startTime, processedCount || 0), percentage: (((processedCount || 0) / (totalProducts || 1)) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); } else { processedCount = Math.floor(totalProducts * 0.6); outputProgress({ status: 'running', operation: 'Skipping product time aggregates calculation', current: processedCount || 0, total: totalProducts || 0, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, processedCount || 0, totalProducts || 0), rate: calculateRate(startTime, processedCount || 0), percentage: (((processedCount || 0) / (totalProducts || 1)) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); } // Calculate ABC classification outputProgress({ status: 'running', operation: 'Starting ABC classification', current: processedCount, total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, processedCount, totalProducts), rate: calculateRate(startTime, processedCount), percentage: ((processedCount / totalProducts) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); if (isCancelled) return { processedProducts: processedCount, processedOrders, processedPurchaseOrders: 0, // This module doesn't process POs success }; const [abcConfig] = await connection.query('SELECT a_threshold, b_threshold FROM abc_classification_config WHERE id = 1'); const abcThresholds = abcConfig[0] || { a_threshold: 20, b_threshold: 50 }; // First, create and populate the rankings table with an index await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_revenue_ranks'); await connection.query(` CREATE TEMPORARY TABLE temp_revenue_ranks ( pid BIGINT NOT NULL, total_revenue DECIMAL(10,3), rank_num INT, dense_rank_num INT, percentile DECIMAL(5,2), total_count INT, PRIMARY KEY (pid), INDEX (rank_num), INDEX (dense_rank_num), INDEX (percentile) ) ENGINE=MEMORY `); // Calculate rankings with proper tie handling await connection.query(` INSERT INTO temp_revenue_ranks WITH revenue_data AS ( SELECT pid, total_revenue, COUNT(*) OVER () as total_count, PERCENT_RANK() OVER (ORDER BY total_revenue DESC) * 100 as percentile, RANK() OVER (ORDER BY total_revenue DESC) as rank_num, DENSE_RANK() OVER (ORDER BY total_revenue DESC) as dense_rank_num FROM product_metrics WHERE total_revenue > 0 ) SELECT pid, total_revenue, rank_num, dense_rank_num, percentile, total_count FROM revenue_data `); // Get total count for percentage calculation const [rankingCount] = await connection.query('SELECT MAX(rank_num) as total_count FROM temp_revenue_ranks'); const totalCount = rankingCount[0].total_count || 1; const max_rank = totalCount; // Process updates in batches let abcProcessedCount = 0; const batchSize = 5000; while (true) { if (isCancelled) return { processedProducts: processedCount, processedOrders, processedPurchaseOrders: 0, // This module doesn't process POs success }; // Get a batch of PIDs that need updating const [pids] = await connection.query(` SELECT pm.pid FROM product_metrics pm LEFT JOIN temp_revenue_ranks tr ON pm.pid = tr.pid WHERE pm.abc_class IS NULL OR pm.abc_class != CASE WHEN tr.pid IS NULL THEN 'C' WHEN tr.percentile <= ? THEN 'A' WHEN tr.percentile <= ? THEN 'B' ELSE 'C' END LIMIT ? `, [abcThresholds.a_threshold, abcThresholds.b_threshold, batchSize]); if (pids.length === 0) break; await connection.query(` UPDATE product_metrics pm LEFT JOIN temp_revenue_ranks tr ON pm.pid = tr.pid SET pm.abc_class = CASE WHEN tr.pid IS NULL THEN 'C' WHEN tr.percentile <= ? THEN 'A' WHEN tr.percentile <= ? THEN 'B' ELSE 'C' END, pm.last_calculated_at = NOW() WHERE pm.pid IN (?) `, [abcThresholds.a_threshold, abcThresholds.b_threshold, pids.map(row => row.pid)]); // Now update turnover rate with proper handling of zero inventory periods await connection.query(` UPDATE product_metrics pm JOIN ( SELECT o.pid, SUM(o.quantity) as total_sold, COUNT(DISTINCT DATE(o.date)) as active_days, AVG(CASE WHEN p.stock_quantity > 0 THEN p.stock_quantity ELSE NULL END) as avg_nonzero_stock FROM orders o JOIN products p ON o.pid = p.pid WHERE o.canceled = false AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 90 DAY) AND o.pid IN (?) GROUP BY o.pid ) sales ON pm.pid = sales.pid SET pm.turnover_rate = CASE WHEN sales.avg_nonzero_stock > 0 AND sales.active_days > 0 THEN LEAST( (sales.total_sold / sales.avg_nonzero_stock) * (365.0 / sales.active_days), 999.99 ) ELSE 0 END, pm.last_calculated_at = NOW() WHERE pm.pid IN (?) `, [pids.map(row => row.pid), pids.map(row => row.pid)]); } // If we get here, everything completed successfully success = true; // Update calculate_status with current timestamp await connection.query(` INSERT INTO calculate_status (module_name, last_calculation_timestamp) VALUES ('product_metrics', NOW()) ON DUPLICATE KEY UPDATE last_calculation_timestamp = NOW() `); return { processedProducts: processedCount || 0, processedOrders: processedOrders || 0, processedPurchaseOrders: 0, // This module doesn't process POs success }; } catch (error) { success = false; logError(error, 'Error calculating product metrics'); throw error; } finally { if (connection) { connection.release(); } } } function calculateStockStatus(stock, config, daily_sales_avg, weekly_sales_avg, monthly_sales_avg) { if (stock <= 0) { return 'Out of Stock'; } // Use the most appropriate sales average based on data quality let sales_avg = daily_sales_avg; if (sales_avg === 0) { sales_avg = weekly_sales_avg / 7; } if (sales_avg === 0) { sales_avg = monthly_sales_avg / 30; } if (sales_avg === 0) { return stock <= config.low_stock_threshold ? 'Low Stock' : 'In Stock'; } const days_of_stock = stock / sales_avg; if (days_of_stock <= config.critical_days) { return 'Critical'; } else if (days_of_stock <= config.reorder_days) { return 'Reorder'; } else if (days_of_stock > config.overstock_days) { return 'Overstocked'; } return 'Healthy'; } function calculateReorderQuantities(stock, stock_status, daily_sales_avg, avg_lead_time, config) { // Calculate safety stock based on service level and lead time const z_score = 1.96; // 95% service level const lead_time = avg_lead_time || config.target_days; const safety_stock = Math.ceil(daily_sales_avg * Math.sqrt(lead_time) * z_score); // Calculate reorder point const lead_time_demand = daily_sales_avg * lead_time; const reorder_point = Math.ceil(lead_time_demand + safety_stock); // Calculate reorder quantity using EOQ formula if we have the necessary data let reorder_qty = 0; if (daily_sales_avg > 0) { const annual_demand = daily_sales_avg * 365; const order_cost = 25; // Fixed cost per order const holding_cost_percent = 0.25; // 25% annual holding cost reorder_qty = Math.ceil(Math.sqrt((2 * annual_demand * order_cost) / holding_cost_percent)); } else { // If no sales data, use a basic calculation reorder_qty = Math.max(safety_stock, config.low_stock_threshold); } // Calculate overstocked amount const overstocked_amt = stock_status === 'Overstocked' ? stock - Math.ceil(daily_sales_avg * config.overstock_days) : 0; return { safety_stock, reorder_point, reorder_qty, overstocked_amt }; } module.exports = calculateProductMetrics;