const mysql = require('mysql2/promise'); const path = require('path'); require('dotenv').config({ path: path.resolve(__dirname, '..', '.env') }); // Helper function to output progress function outputProgress(data) { console.log(JSON.stringify(data)); } // Helper function to log errors function logError(error, context) { console.error(JSON.stringify({ status: 'error', error: error.message || error, context })); } // Database configuration const dbConfig = { host: process.env.DB_HOST, user: process.env.DB_USER, password: process.env.DB_PASSWORD, database: process.env.DB_NAME, waitForConnections: true, connectionLimit: 10, queueLimit: 0 }; async function calculateMetrics() { let pool; try { pool = mysql.createPool(dbConfig); const connection = await pool.getConnection(); try { // Create temporary tables for metrics calculations outputProgress({ status: 'running', operation: 'Creating temporary tables', percentage: '0' }); await connection.query(` CREATE TABLE IF NOT EXISTS temp_sales_metrics ( product_id INT PRIMARY KEY, total_quantity_sold INT DEFAULT 0, total_revenue DECIMAL(10,2) DEFAULT 0.00, average_price DECIMAL(10,2) DEFAULT 0.00, last_sale_date DATE, sales_rank INT ); CREATE TABLE IF NOT EXISTS temp_purchase_metrics ( product_id INT PRIMARY KEY, total_quantity_purchased INT DEFAULT 0, total_cost DECIMAL(10,2) DEFAULT 0.00, average_cost DECIMAL(10,2) DEFAULT 0.00, last_purchase_date DATE, purchase_rank INT ); TRUNCATE TABLE temp_sales_metrics; TRUNCATE TABLE temp_purchase_metrics; `); // Calculate sales metrics outputProgress({ status: 'running', operation: 'Calculating sales metrics', percentage: '20' }); await connection.query(` INSERT INTO temp_sales_metrics ( product_id, total_quantity_sold, total_revenue, average_price, last_sale_date ) SELECT product_id, SUM(quantity) as total_quantity_sold, SUM((price - COALESCE(discount, 0)) * quantity) as total_revenue, AVG(price - COALESCE(discount, 0)) as average_price, MAX(date) as last_sale_date FROM orders WHERE canceled = 0 GROUP BY product_id; UPDATE temp_sales_metrics SET sales_rank = ( SELECT rank FROM ( SELECT product_id, RANK() OVER (ORDER BY total_revenue DESC) as rank FROM temp_sales_metrics ) rankings WHERE rankings.product_id = temp_sales_metrics.product_id ); `); // Calculate purchase metrics outputProgress({ status: 'running', operation: 'Calculating purchase metrics', percentage: '40' }); await connection.query(` INSERT INTO temp_purchase_metrics ( product_id, total_quantity_purchased, total_cost, average_cost, last_purchase_date ) SELECT product_id, SUM(received) as total_quantity_purchased, SUM(cost_price * received) as total_cost, AVG(cost_price) as average_cost, MAX(received_date) as last_purchase_date FROM purchase_orders WHERE status = 'closed' AND received > 0 GROUP BY product_id; UPDATE temp_purchase_metrics SET purchase_rank = ( SELECT rank FROM ( SELECT product_id, RANK() OVER (ORDER BY total_cost DESC) as rank FROM temp_purchase_metrics ) rankings WHERE rankings.product_id = temp_purchase_metrics.product_id ); `); // Update product metrics outputProgress({ status: 'running', operation: 'Updating product metrics', percentage: '60' }); await connection.query(` INSERT INTO product_metrics ( product_id, total_quantity_sold, total_revenue, average_price, total_quantity_purchased, total_cost, average_cost, profit_margin, turnover_rate, last_sale_date, last_purchase_date, sales_rank, purchase_rank, last_calculated_at ) SELECT p.product_id, COALESCE(s.total_quantity_sold, 0), COALESCE(s.total_revenue, 0.00), COALESCE(s.average_price, 0.00), COALESCE(po.total_quantity_purchased, 0), COALESCE(po.total_cost, 0.00), COALESCE(po.average_cost, 0.00), CASE WHEN COALESCE(s.total_revenue, 0) = 0 THEN 0 ELSE ((s.total_revenue - po.total_cost) / s.total_revenue) * 100 END as profit_margin, CASE WHEN COALESCE(po.total_quantity_purchased, 0) = 0 THEN 0 ELSE (s.total_quantity_sold / po.total_quantity_purchased) * 100 END as turnover_rate, s.last_sale_date, po.last_purchase_date, s.sales_rank, po.purchase_rank, NOW() FROM products p LEFT JOIN temp_sales_metrics s ON p.product_id = s.product_id LEFT JOIN temp_purchase_metrics po ON p.product_id = po.product_id ON DUPLICATE KEY UPDATE total_quantity_sold = VALUES(total_quantity_sold), total_revenue = VALUES(total_revenue), average_price = VALUES(average_price), total_quantity_purchased = VALUES(total_quantity_purchased), total_cost = VALUES(total_cost), average_cost = VALUES(average_cost), profit_margin = VALUES(profit_margin), turnover_rate = VALUES(turnover_rate), last_sale_date = VALUES(last_sale_date), last_purchase_date = VALUES(last_purchase_date), sales_rank = VALUES(sales_rank), purchase_rank = VALUES(purchase_rank), last_calculated_at = VALUES(last_calculated_at); `); // Calculate ABC classification outputProgress({ status: 'running', operation: 'Calculating ABC classification', percentage: '80' }); await connection.query(` WITH revenue_percentiles AS ( SELECT product_id, total_revenue, PERCENT_RANK() OVER (ORDER BY total_revenue DESC) as revenue_percentile FROM product_metrics WHERE total_revenue > 0 ) UPDATE product_metrics pm JOIN revenue_percentiles rp ON pm.product_id = rp.product_id SET pm.abc_class = CASE WHEN rp.revenue_percentile < 0.2 THEN 'A' WHEN rp.revenue_percentile < 0.5 THEN 'B' ELSE 'C' END; `); // Calculate time-based aggregates outputProgress({ status: 'running', operation: 'Calculating time aggregates', percentage: '90' }); await connection.query(` TRUNCATE TABLE product_time_aggregates; -- Daily aggregates INSERT INTO product_time_aggregates (product_id, period_type, period_start, quantity_sold, revenue) SELECT product_id, 'daily' as period_type, DATE(date) as period_start, SUM(quantity) as quantity_sold, SUM((price - COALESCE(discount, 0)) * quantity) as revenue FROM orders WHERE canceled = 0 GROUP BY product_id, DATE(date); -- Weekly aggregates INSERT INTO product_time_aggregates (product_id, period_type, period_start, quantity_sold, revenue) SELECT product_id, 'weekly' as period_type, DATE(DATE_SUB(date, INTERVAL WEEKDAY(date) DAY)) as period_start, SUM(quantity) as quantity_sold, SUM((price - COALESCE(discount, 0)) * quantity) as revenue FROM orders WHERE canceled = 0 GROUP BY product_id, DATE(DATE_SUB(date, INTERVAL WEEKDAY(date) DAY)); -- Monthly aggregates INSERT INTO product_time_aggregates (product_id, period_type, period_start, quantity_sold, revenue) SELECT product_id, 'monthly' as period_type, DATE(DATE_SUB(date, INTERVAL DAY(date)-1 DAY)) as period_start, SUM(quantity) as quantity_sold, SUM((price - COALESCE(discount, 0)) * quantity) as revenue FROM orders WHERE canceled = 0 GROUP BY product_id, DATE(DATE_SUB(date, INTERVAL DAY(date)-1 DAY)); `); // Calculate vendor metrics outputProgress({ status: 'running', operation: 'Calculating vendor metrics', percentage: '95' }); await connection.query(` INSERT INTO vendor_metrics ( vendor, total_orders, total_items_ordered, total_items_received, total_spend, average_order_value, fulfillment_rate, average_delivery_days, last_order_date, last_delivery_date ) SELECT vendor, COUNT(DISTINCT po_id) as total_orders, SUM(ordered) as total_items_ordered, SUM(received) as total_items_received, SUM(cost_price * received) as total_spend, AVG(cost_price * ordered) as average_order_value, (SUM(received) / NULLIF(SUM(ordered), 0)) * 100 as fulfillment_rate, AVG(DATEDIFF(received_date, date)) as average_delivery_days, MAX(date) as last_order_date, MAX(received_date) as last_delivery_date FROM purchase_orders WHERE status = 'closed' GROUP BY vendor ON DUPLICATE KEY UPDATE total_orders = VALUES(total_orders), total_items_ordered = VALUES(total_items_ordered), total_items_received = VALUES(total_items_received), total_spend = VALUES(total_spend), average_order_value = VALUES(average_order_value), fulfillment_rate = VALUES(fulfillment_rate), average_delivery_days = VALUES(average_delivery_days), last_order_date = VALUES(last_order_date), last_delivery_date = VALUES(last_delivery_date); `); outputProgress({ status: 'complete', operation: 'Metrics calculation completed', percentage: '100' }); } catch (error) { logError(error, 'Error calculating metrics'); throw error; } finally { connection.release(); } } catch (error) { logError(error, 'Fatal error during metrics calculation'); throw error; } finally { if (pool) { await pool.end(); } } } // Export the function if being required as a module if (typeof module !== 'undefined' && module.exports) { module.exports = calculateMetrics; } // Run directly if called from command line if (require.main === module) { calculateMetrics().catch(error => { console.error('Error:', error); process.exit(1); }); }