const mysql = require('mysql2/promise'); const path = require('path'); require('dotenv').config({ path: path.resolve(__dirname, '..', '.env') }); // Helper function to format elapsed time function formatElapsedTime(startTime) { const elapsed = Date.now() - startTime; const seconds = Math.floor(elapsed / 1000); const minutes = Math.floor(seconds / 60); const hours = Math.floor(minutes / 60); if (hours > 0) { return `${hours}h ${minutes % 60}m`; } else if (minutes > 0) { return `${minutes}m ${seconds % 60}s`; } else { return `${seconds}s`; } } // Helper function to estimate remaining time function estimateRemaining(startTime, current, total) { if (current === 0) return null; const elapsed = Date.now() - startTime; const rate = current / elapsed; const remaining = (total - current) / rate; const minutes = Math.floor(remaining / 60000); const seconds = Math.floor((remaining % 60000) / 1000); if (minutes > 0) { return `${minutes}m ${seconds}s`; } else { return `${seconds}s`; } } // Helper function to calculate rate function calculateRate(startTime, current) { const elapsed = (Date.now() - startTime) / 1000; // Convert to seconds return elapsed > 0 ? Math.round(current / elapsed) : 0; } // Helper function to output progress function outputProgress(data) { // Format as SSE event const event = { progress: data }; process.stdout.write(JSON.stringify(event) + '\n'); } // Helper function to log errors function logError(error, context) { console.error(JSON.stringify({ progress: { status: 'error', error: error.message || error, context } })); } // Database configuration const dbConfig = { host: process.env.DB_HOST, user: process.env.DB_USER, password: process.env.DB_PASSWORD, database: process.env.DB_NAME, waitForConnections: true, connectionLimit: 10, queueLimit: 0 }; // Add cancel handler let isCancelled = false; function cancelCalculation() { isCancelled = true; // Format as SSE event const event = { progress: { status: 'cancelled', operation: 'Calculation cancelled', current: 0, total: 0, elapsed: null, remaining: null, rate: 0 } }; process.stdout.write(JSON.stringify(event) + '\n'); process.exit(0); } async function calculateMetrics() { let pool; const startTime = Date.now(); let processedCount = 0; let totalProducts = 0; // Initialize at the top try { isCancelled = false; pool = mysql.createPool(dbConfig); const connection = await pool.getConnection(); try { // Get total number of products const [countResult] = await connection.query('SELECT COUNT(*) as total FROM products') .catch(err => { logError(err, 'Failed to count products'); throw err; }); totalProducts = countResult[0].total; // Initial progress with percentage outputProgress({ status: 'running', operation: 'Processing products', current: processedCount, total: totalProducts, elapsed: '0s', remaining: 'Calculating...', rate: 0, percentage: '0' }); // Process in batches of 100 const batchSize = 100; for (let offset = 0; offset < totalProducts; offset += batchSize) { if (isCancelled) { throw new Error('Operation cancelled'); } const [products] = await connection.query('SELECT product_id, vendor FROM products LIMIT ? OFFSET ?', [batchSize, offset]) .catch(err => { logError(err, `Failed to fetch products batch at offset ${offset}`); throw err; }); processedCount += products.length; // Update progress after each batch outputProgress({ status: 'running', operation: 'Processing products', current: processedCount, total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, processedCount, totalProducts), rate: calculateRate(startTime, processedCount), percentage: ((processedCount / totalProducts) * 100).toFixed(1) }); // Process the batch const metricsUpdates = []; for (const product of products) { try { // Get configuration values for this product const [configs] = await connection.query(` WITH product_info AS ( SELECT p.product_id, p.vendor, pc.category_id FROM products p LEFT JOIN product_categories pc ON p.product_id = pc.product_id WHERE p.product_id = ? ), threshold_options AS ( SELECT st.*, CASE WHEN st.category_id = pi.category_id AND st.vendor = pi.vendor THEN 1 -- Category + vendor match WHEN st.category_id = pi.category_id AND st.vendor IS NULL THEN 2 -- Category match WHEN st.category_id IS NULL AND st.vendor = pi.vendor THEN 3 -- Vendor match WHEN st.category_id IS NULL AND st.vendor IS NULL THEN 4 -- Default ELSE 5 END as priority FROM product_info pi CROSS JOIN stock_thresholds st WHERE (st.category_id = pi.category_id OR st.category_id IS NULL) AND (st.vendor = pi.vendor OR st.vendor IS NULL) ), velocity_options AS ( SELECT sv.*, CASE WHEN sv.category_id = pi.category_id AND sv.vendor = pi.vendor THEN 1 WHEN sv.category_id = pi.category_id AND sv.vendor IS NULL THEN 2 WHEN sv.category_id IS NULL AND sv.vendor = pi.vendor THEN 3 WHEN sv.category_id IS NULL AND sv.vendor IS NULL THEN 4 ELSE 5 END as priority FROM product_info pi CROSS JOIN sales_velocity_config sv WHERE (sv.category_id = pi.category_id OR sv.category_id IS NULL) AND (sv.vendor = pi.vendor OR sv.vendor IS NULL) ), safety_options AS ( SELECT ss.*, CASE WHEN ss.category_id = pi.category_id AND ss.vendor = pi.vendor THEN 1 WHEN ss.category_id = pi.category_id AND ss.vendor IS NULL THEN 2 WHEN ss.category_id IS NULL AND ss.vendor = pi.vendor THEN 3 WHEN ss.category_id IS NULL AND ss.vendor IS NULL THEN 4 ELSE 5 END as priority FROM product_info pi CROSS JOIN safety_stock_config ss WHERE (ss.category_id = pi.category_id OR ss.category_id IS NULL) AND (ss.vendor = pi.vendor OR ss.vendor IS NULL) ) SELECT -- Stock thresholds COALESCE( (SELECT critical_days FROM threshold_options ORDER BY priority LIMIT 1), 7 ) as critical_days, COALESCE( (SELECT reorder_days FROM threshold_options ORDER BY priority LIMIT 1), 14 ) as reorder_days, COALESCE( (SELECT overstock_days FROM threshold_options ORDER BY priority LIMIT 1), 90 ) as overstock_days, COALESCE( (SELECT low_stock_threshold FROM threshold_options ORDER BY priority LIMIT 1), 5 ) as low_stock_threshold, -- Sales velocity windows COALESCE( (SELECT daily_window_days FROM velocity_options ORDER BY priority LIMIT 1), 30 ) as daily_window_days, COALESCE( (SELECT weekly_window_days FROM velocity_options ORDER BY priority LIMIT 1), 7 ) as weekly_window_days, COALESCE( (SELECT monthly_window_days FROM velocity_options ORDER BY priority LIMIT 1), 90 ) as monthly_window_days, -- Safety stock config COALESCE( (SELECT coverage_days FROM safety_options ORDER BY priority LIMIT 1), 14 ) as safety_stock_days, COALESCE( (SELECT service_level FROM safety_options ORDER BY priority LIMIT 1), 95.0 ) as service_level, -- ABC Classification (SELECT a_threshold FROM abc_classification_config WHERE id = 1) as abc_a_threshold, (SELECT b_threshold FROM abc_classification_config WHERE id = 1) as abc_b_threshold, (SELECT classification_period_days FROM abc_classification_config WHERE id = 1) as abc_period_days `, [product.product_id]); const config = configs[0] || { critical_days: 7, reorder_days: 14, overstock_days: 90, low_stock_threshold: 5, daily_window_days: 30, weekly_window_days: 7, monthly_window_days: 90, safety_stock_days: 14, service_level: 95.0, abc_a_threshold: 20.0, abc_b_threshold: 50.0, abc_period_days: 90 }; // Calculate sales metrics with trends using configured windows const [salesMetrics] = await connection.query(` WITH sales_summary AS ( SELECT SUM(o.quantity) as total_quantity_sold, SUM((o.price - COALESCE(o.discount, 0)) * o.quantity) as total_revenue, SUM(COALESCE(p.cost_price, 0) * o.quantity) as total_cost, MAX(o.date) as last_sale_date, MIN(o.date) as first_sale_date, COUNT(DISTINCT o.order_number) as number_of_orders, AVG(o.quantity) as avg_quantity_per_order, -- Calculate rolling averages using configured windows SUM(CASE WHEN o.date >= DATE_SUB(CURDATE(), INTERVAL ? DAY) THEN o.quantity ELSE 0 END) as last_30_days_qty, SUM(CASE WHEN o.date >= DATE_SUB(CURDATE(), INTERVAL ? DAY) THEN o.quantity ELSE 0 END) as last_7_days_qty, SUM(CASE WHEN o.date >= DATE_SUB(CURDATE(), INTERVAL ? DAY) THEN o.quantity ELSE 0 END) as last_month_qty FROM orders o JOIN products p ON o.product_id = p.product_id WHERE o.canceled = 0 AND o.product_id = ? GROUP BY o.product_id ) SELECT total_quantity_sold, total_revenue, total_cost, last_sale_date, first_sale_date, number_of_orders, avg_quantity_per_order, last_30_days_qty / ? as rolling_daily_avg, last_7_days_qty / ? as rolling_weekly_avg, last_month_qty / ? as rolling_monthly_avg, total_quantity_sold as total_sales_to_date FROM sales_summary `, [ config.daily_window_days, config.weekly_window_days, config.monthly_window_days, product.product_id, config.daily_window_days, config.weekly_window_days, config.monthly_window_days ]).catch(err => { logError(err, `Failed to calculate sales metrics for product ${product.product_id}`); throw err; }); // Calculate purchase metrics with proper handling of negative quantities const [purchaseMetrics] = await connection.query(` WITH recent_orders AS ( SELECT date, received_date, received, cost_price, DATEDIFF(received_date, date) as lead_time_days, ROW_NUMBER() OVER (ORDER BY date DESC) as order_rank FROM purchase_orders WHERE status = 'closed' AND product_id = ? AND received > 0 AND received_date IS NOT NULL ), lead_time_orders AS ( SELECT * FROM recent_orders WHERE order_rank <= 5 -- Last 5 orders OR date >= DATE_SUB(CURDATE(), INTERVAL 90 DAY) -- Or orders from last 90 days ) SELECT SUM(CASE WHEN received >= 0 THEN received ELSE 0 END) as total_quantity_purchased, SUM(CASE WHEN received >= 0 THEN cost_price * received ELSE 0 END) as total_cost, MAX(date) as last_purchase_date, MAX(received_date) as last_received_date, AVG(lead_time_days) as avg_lead_time_days, COUNT(*) as orders_analyzed FROM lead_time_orders `, [product.product_id]).catch(err => { logError(err, `Failed to calculate purchase metrics for product ${product.product_id}`); throw err; }); // Get current stock const [stockInfo] = await connection.query(` SELECT stock_quantity, cost_price FROM products WHERE product_id = ? `, [product.product_id]).catch(err => { logError(err, `Failed to get stock info for product ${product.product_id}`); throw err; }); // Get stock thresholds for this product's category/vendor const [thresholds] = await connection.query(` WITH product_info AS ( SELECT p.product_id, p.vendor, pc.category_id FROM products p LEFT JOIN product_categories pc ON p.product_id = pc.product_id WHERE p.product_id = ? ), threshold_options AS ( SELECT st.*, CASE WHEN st.category_id = pi.category_id AND st.vendor = pi.vendor THEN 1 -- Category + vendor match WHEN st.category_id = pi.category_id AND st.vendor IS NULL THEN 2 -- Category match WHEN st.category_id IS NULL AND st.vendor = pi.vendor THEN 3 -- Vendor match WHEN st.category_id IS NULL AND st.vendor IS NULL THEN 4 -- Default ELSE 5 END as priority FROM product_info pi CROSS JOIN stock_thresholds st WHERE (st.category_id = pi.category_id OR st.category_id IS NULL) AND (st.vendor = pi.vendor OR st.vendor IS NULL) ) SELECT COALESCE( (SELECT critical_days FROM threshold_options ORDER BY priority LIMIT 1), 7 ) as critical_days, COALESCE( (SELECT reorder_days FROM threshold_options ORDER BY priority LIMIT 1), 14 ) as reorder_days, COALESCE( (SELECT overstock_days FROM threshold_options ORDER BY priority LIMIT 1), 90 ) as overstock_days `, [product.product_id]).catch(err => { logError(err, `Failed to get thresholds for product ${product.product_id}`); throw err; }); const threshold = thresholds[0] || { critical_days: 7, reorder_days: 14, overstock_days: 90 }; // Calculate metrics const metrics = salesMetrics[0] || {}; const purchases = purchaseMetrics[0] || {}; const stock = stockInfo[0] || {}; const daily_sales_avg = metrics.rolling_daily_avg || 0; const weekly_sales_avg = metrics.rolling_weekly_avg || 0; const monthly_sales_avg = metrics.total_quantity_sold ? metrics.total_quantity_sold / 30 : 0; // Calculate margin percent with proper handling of zero revenue const margin_percent = metrics.total_revenue > 0 ? ((metrics.total_revenue - metrics.total_cost) / metrics.total_revenue) * 100 : null; // Calculate current inventory value const inventory_value = (stock.stock_quantity || 0) * (stock.cost_price || 0); // Calculate stock status using configurable thresholds with proper handling of zero sales const stock_status = daily_sales_avg === 0 ? 'New' : stock.stock_quantity <= Math.max(1, Math.ceil(daily_sales_avg * config.critical_days)) ? 'Critical' : stock.stock_quantity <= Math.max(1, Math.ceil(daily_sales_avg * config.reorder_days)) ? 'Reorder' : stock.stock_quantity > Math.max(1, daily_sales_avg * config.overstock_days) ? 'Overstocked' : 'Healthy'; // Calculate safety stock using configured values const safety_stock = daily_sales_avg > 0 ? Math.max(1, Math.ceil(daily_sales_avg * config.safety_stock_days * (config.service_level / 100))) : null; // Add to batch update metricsUpdates.push([ product.product_id, daily_sales_avg || null, weekly_sales_avg || null, monthly_sales_avg || null, metrics.avg_quantity_per_order || null, metrics.number_of_orders || 0, metrics.first_sale_date || null, metrics.last_sale_date || null, daily_sales_avg > 0 ? stock.stock_quantity / daily_sales_avg : null, weekly_sales_avg > 0 ? stock.stock_quantity / weekly_sales_avg : null, daily_sales_avg > 0 ? Math.max(1, Math.ceil(daily_sales_avg * config.reorder_days)) : null, daily_sales_avg > 0 ? Math.max(1, Math.ceil(daily_sales_avg * config.critical_days)) : null, margin_percent, metrics.total_revenue || 0, inventory_value || 0, purchases.avg_lead_time_days || null, purchases.last_purchase_date || null, purchases.last_received_date || null, stock_status ]); } catch (err) { logError(err, `Failed processing product ${product.product_id}`); // Continue with next product instead of failing entire batch continue; } } // Batch update metrics if (metricsUpdates.length > 0) { await connection.query(` INSERT INTO product_metrics ( product_id, daily_sales_avg, weekly_sales_avg, monthly_sales_avg, avg_quantity_per_order, number_of_orders, first_sale_date, last_sale_date, days_of_inventory, weeks_of_inventory, reorder_point, safety_stock, avg_margin_percent, total_revenue, inventory_value, avg_lead_time_days, last_purchase_date, last_received_date, stock_status ) VALUES ? ON DUPLICATE KEY UPDATE last_calculated_at = NOW(), daily_sales_avg = VALUES(daily_sales_avg), weekly_sales_avg = VALUES(weekly_sales_avg), monthly_sales_avg = VALUES(monthly_sales_avg), avg_quantity_per_order = VALUES(avg_quantity_per_order), number_of_orders = VALUES(number_of_orders), first_sale_date = VALUES(first_sale_date), last_sale_date = VALUES(last_sale_date), days_of_inventory = VALUES(days_of_inventory), weeks_of_inventory = VALUES(weeks_of_inventory), reorder_point = VALUES(reorder_point), safety_stock = VALUES(safety_stock), avg_margin_percent = VALUES(avg_margin_percent), total_revenue = VALUES(total_revenue), inventory_value = VALUES(inventory_value), avg_lead_time_days = VALUES(avg_lead_time_days), last_purchase_date = VALUES(last_purchase_date), last_received_date = VALUES(last_received_date), stock_status = VALUES(stock_status) `, [metricsUpdates]).catch(err => { logError(err, `Failed to batch update metrics for ${metricsUpdates.length} products`); throw err; }); } } // Update progress for ABC classification outputProgress({ status: 'running', operation: 'Calculating ABC classification', current: totalProducts, total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, totalProducts, totalProducts), rate: calculateRate(startTime, totalProducts), percentage: '100' }); // Calculate ABC classification using configured thresholds await connection.query(` WITH revenue_rankings AS ( SELECT product_id, total_revenue, PERCENT_RANK() OVER (ORDER BY COALESCE(total_revenue, 0) DESC) * 100 as revenue_percentile FROM product_metrics ), classification_update AS ( SELECT product_id, CASE WHEN revenue_percentile <= ? THEN 'A' WHEN revenue_percentile <= ? THEN 'B' ELSE 'C' END as abc_class FROM revenue_rankings ) UPDATE product_metrics pm JOIN classification_update cu ON pm.product_id = cu.product_id SET pm.abc_class = cu.abc_class, pm.last_calculated_at = NOW() `, [config.abc_a_threshold, config.abc_b_threshold]); // Update progress for time-based aggregates outputProgress({ status: 'running', operation: 'Calculating time-based aggregates', current: totalProducts, total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, totalProducts, totalProducts), rate: calculateRate(startTime, totalProducts), percentage: '100' }); // Calculate time-based aggregates await connection.query('TRUNCATE TABLE product_time_aggregates;'); await connection.query(` INSERT INTO product_time_aggregates ( product_id, year, month, total_quantity_sold, total_revenue, total_cost, order_count, stock_received, stock_ordered, avg_price, profit_margin ) WITH sales_data AS ( SELECT o.product_id, YEAR(o.date) as year, MONTH(o.date) as month, SUM(o.quantity) as total_quantity_sold, SUM((o.price - COALESCE(o.discount, 0)) * o.quantity) as total_revenue, SUM(COALESCE(p.cost_price, 0) * o.quantity) as total_cost, COUNT(DISTINCT o.order_number) as order_count, AVG(o.price - COALESCE(o.discount, 0)) as avg_price, CASE WHEN SUM((o.price - COALESCE(o.discount, 0)) * o.quantity) = 0 THEN 0 ELSE ((SUM((o.price - COALESCE(o.discount, 0)) * o.quantity) - SUM(COALESCE(p.cost_price, 0) * o.quantity)) / SUM((o.price - COALESCE(o.discount, 0)) * o.quantity)) * 100 END as profit_margin FROM orders o JOIN products p ON o.product_id = p.product_id WHERE o.canceled = 0 GROUP BY o.product_id, YEAR(o.date), MONTH(o.date) ), purchase_data AS ( SELECT product_id, YEAR(date) as year, MONTH(date) as month, SUM(received) as stock_received, SUM(ordered) as stock_ordered FROM purchase_orders WHERE status = 'closed' GROUP BY product_id, YEAR(date), MONTH(date) ) SELECT s.product_id, s.year, s.month, s.total_quantity_sold, s.total_revenue, s.total_cost, s.order_count, COALESCE(p.stock_received, 0) as stock_received, COALESCE(p.stock_ordered, 0) as stock_ordered, s.avg_price, s.profit_margin FROM sales_data s LEFT JOIN purchase_data p ON s.product_id = p.product_id AND s.year = p.year AND s.month = p.month UNION SELECT p.product_id, p.year, p.month, 0 as total_quantity_sold, 0 as total_revenue, 0 as total_cost, 0 as order_count, p.stock_received, p.stock_ordered, 0 as avg_price, 0 as profit_margin FROM purchase_data p LEFT JOIN sales_data s ON p.product_id = s.product_id AND p.year = s.year AND p.month = s.month WHERE s.product_id IS NULL `); // Update progress for vendor metrics outputProgress({ status: 'running', operation: 'Calculating vendor metrics', current: totalProducts, total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, totalProducts, totalProducts), rate: calculateRate(startTime, totalProducts), percentage: '100' }); // Calculate vendor metrics await connection.query(` INSERT INTO vendor_metrics ( vendor, last_calculated_at, avg_lead_time_days, on_time_delivery_rate, order_fill_rate, total_orders, total_late_orders ) SELECT vendor, NOW() as last_calculated_at, COALESCE(AVG(DATEDIFF(received_date, date)), 0) as avg_lead_time_days, COALESCE((COUNT(CASE WHEN DATEDIFF(received_date, date) <= 14 THEN 1 END) * 100.0 / NULLIF(COUNT(*), 0)), 0) as on_time_delivery_rate, COALESCE((SUM(received) * 100.0 / NULLIF(SUM(ordered), 0)), 0) as order_fill_rate, COUNT(DISTINCT po_id) as total_orders, COUNT(CASE WHEN DATEDIFF(received_date, date) > 14 THEN 1 END) as total_late_orders FROM purchase_orders WHERE status = 'closed' GROUP BY vendor ON DUPLICATE KEY UPDATE last_calculated_at = VALUES(last_calculated_at), avg_lead_time_days = VALUES(avg_lead_time_days), on_time_delivery_rate = VALUES(on_time_delivery_rate), order_fill_rate = VALUES(order_fill_rate), total_orders = VALUES(total_orders), total_late_orders = VALUES(total_late_orders) `); // Final success message outputProgress({ status: 'complete', operation: 'Metrics calculation complete', current: totalProducts, total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: '0s', rate: calculateRate(startTime, totalProducts), percentage: '100' }); } catch (error) { if (isCancelled) { outputProgress({ status: 'cancelled', operation: 'Calculation cancelled', current: processedCount, total: totalProducts || 0, // Use 0 if not yet defined elapsed: formatElapsedTime(startTime), remaining: null, rate: calculateRate(startTime, processedCount), percentage: ((processedCount / (totalProducts || 1)) * 100).toFixed(1) }); } else { outputProgress({ status: 'error', operation: 'Error: ' + error.message, current: processedCount, total: totalProducts || 0, // Use 0 if not yet defined elapsed: formatElapsedTime(startTime), remaining: null, rate: calculateRate(startTime, processedCount), percentage: ((processedCount / (totalProducts || 1)) * 100).toFixed(1) }); } throw error; } finally { connection.release(); } } finally { if (pool) { await pool.end(); } } } // Export both functions module.exports = calculateMetrics; module.exports.cancelCalculation = cancelCalculation; // Run directly if called from command line if (require.main === module) { calculateMetrics().catch(error => { if (!error.message.includes('Operation cancelled')) { console.error('Error:', error); } process.exit(1); }); }