const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress'); const { getConnection } = require('./utils/db'); async function calculateTimeAggregates(startTime, totalProducts, processedCount, isCancelled = false) { const connection = await getConnection(); try { if (isCancelled) { outputProgress({ status: 'cancelled', operation: 'Time aggregates calculation cancelled', current: processedCount, total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: null, rate: calculateRate(startTime, processedCount), percentage: ((processedCount / totalProducts) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); return processedCount; } outputProgress({ status: 'running', operation: 'Starting time aggregates calculation', current: processedCount, total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, processedCount, totalProducts), rate: calculateRate(startTime, processedCount), percentage: ((processedCount / totalProducts) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); // Initial insert of time-based aggregates await connection.query(` INSERT INTO product_time_aggregates ( pid, year, month, total_quantity_sold, total_revenue, total_cost, order_count, stock_received, stock_ordered, avg_price, profit_margin ) WITH sales_data AS ( SELECT o.pid, YEAR(o.date) as year, MONTH(o.date) as month, SUM(o.quantity) as total_quantity_sold, SUM((o.price - COALESCE(o.discount, 0)) * o.quantity) as total_revenue, SUM(COALESCE(p.cost_price, 0) * o.quantity) as total_cost, COUNT(DISTINCT o.order_number) as order_count, AVG(o.price - COALESCE(o.discount, 0)) as avg_price, CASE WHEN SUM((o.price - COALESCE(o.discount, 0)) * o.quantity) = 0 THEN 0 ELSE ((SUM((o.price - COALESCE(o.discount, 0)) * o.quantity) - SUM(COALESCE(p.cost_price, 0) * o.quantity)) / SUM((o.price - COALESCE(o.discount, 0)) * o.quantity)) * 100 END as profit_margin FROM orders o JOIN products p ON o.pid = p.pid WHERE o.canceled = 0 GROUP BY o.pid, YEAR(o.date), MONTH(o.date) ), purchase_data AS ( SELECT pid, YEAR(date) as year, MONTH(date) as month, SUM(received) as stock_received, SUM(ordered) as stock_ordered, COUNT(DISTINCT CASE WHEN receiving_status = 40 THEN id END) as fulfilled_orders, COUNT(DISTINCT id) as total_orders, AVG(CASE WHEN receiving_status = 40 THEN DATEDIFF(received_date, date) END) as avg_lead_time, SUM(CASE WHEN receiving_status = 40 AND received_date > expected_date THEN 1 ELSE 0 END) as late_deliveries FROM purchase_orders GROUP BY pid, YEAR(date), MONTH(date) ), stock_trends AS ( SELECT p.pid, YEAR(po.date) as year, MONTH(po.date) as month, AVG(p.stock_quantity) as avg_stock_level, STDDEV(p.stock_quantity) as stock_volatility, SUM(CASE WHEN p.stock_quantity <= COALESCE(pm.reorder_point, 5) THEN 1 ELSE 0 END) as days_below_reorder, COUNT(*) as total_days FROM products p CROSS JOIN ( SELECT DISTINCT DATE(date) as date FROM purchase_orders WHERE date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH) ) po LEFT JOIN product_metrics pm ON p.pid = pm.pid GROUP BY p.pid, YEAR(po.date), MONTH(po.date) ) SELECT s.pid, s.year, s.month, s.total_quantity_sold, s.total_revenue, s.total_cost, s.order_count, COALESCE(p.stock_received, 0) as stock_received, COALESCE(p.stock_ordered, 0) as stock_ordered, s.avg_price, s.profit_margin, COALESCE(p.fulfilled_orders, 0) as fulfilled_orders, COALESCE(p.total_orders, 0) as total_orders, COALESCE(p.avg_lead_time, 0) as avg_lead_time, COALESCE(p.late_deliveries, 0) as late_deliveries, COALESCE(st.avg_stock_level, 0) as avg_stock_level, COALESCE(st.stock_volatility, 0) as stock_volatility, COALESCE(st.days_below_reorder, 0) as days_below_reorder, COALESCE(st.total_days, 0) as total_days FROM sales_data s LEFT JOIN purchase_data p ON s.pid = p.pid AND s.year = p.year AND s.month = p.month LEFT JOIN stock_trends st ON s.pid = st.pid AND s.year = st.year AND s.month = st.month UNION SELECT p.pid, p.year, p.month, 0 as total_quantity_sold, 0 as total_revenue, 0 as total_cost, 0 as order_count, p.stock_received, p.stock_ordered, 0 as avg_price, 0 as profit_margin, p.fulfilled_orders, p.total_orders, p.avg_lead_time, p.late_deliveries, st.avg_stock_level, st.stock_volatility, st.days_below_reorder, st.total_days FROM purchase_data p LEFT JOIN sales_data s ON p.pid = s.pid AND p.year = s.year AND p.month = s.month LEFT JOIN stock_trends st ON p.pid = st.pid AND p.year = st.year AND p.month = st.month WHERE s.pid IS NULL ON DUPLICATE KEY UPDATE total_quantity_sold = VALUES(total_quantity_sold), total_revenue = VALUES(total_revenue), total_cost = VALUES(total_cost), order_count = VALUES(order_count), stock_received = VALUES(stock_received), stock_ordered = VALUES(stock_ordered), avg_price = VALUES(avg_price), profit_margin = VALUES(profit_margin), last_calculated_at = CURRENT_TIMESTAMP `); processedCount = Math.floor(totalProducts * 0.60); outputProgress({ status: 'running', operation: 'Base time aggregates calculated, updating financial metrics', current: processedCount, total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, processedCount, totalProducts), rate: calculateRate(startTime, processedCount), percentage: ((processedCount / totalProducts) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); if (isCancelled) return processedCount; // Update with financial metrics await connection.query(` UPDATE product_time_aggregates pta JOIN ( SELECT p.pid, YEAR(o.date) as year, MONTH(o.date) as month, p.cost_price * p.stock_quantity as inventory_value, SUM(o.quantity * (o.price - p.cost_price)) as gross_profit, COUNT(DISTINCT DATE(o.date)) as days_in_period FROM products p LEFT JOIN orders o ON p.pid = o.pid WHERE o.canceled = false GROUP BY p.pid, YEAR(o.date), MONTH(o.date) ) fin ON pta.pid = fin.pid AND pta.year = fin.year AND pta.month = fin.month SET pta.inventory_value = COALESCE(fin.inventory_value, 0), pta.gmroi = CASE WHEN COALESCE(fin.inventory_value, 0) > 0 AND fin.days_in_period > 0 THEN (COALESCE(fin.gross_profit, 0) * (365.0 / fin.days_in_period)) / COALESCE(fin.inventory_value, 0) ELSE 0 END, pta.last_calculated_at = CURRENT_TIMESTAMP `); processedCount = Math.floor(totalProducts * 0.65); outputProgress({ status: 'running', operation: 'Financial metrics updated', current: processedCount, total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, processedCount, totalProducts), rate: calculateRate(startTime, processedCount), percentage: ((processedCount / totalProducts) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); return processedCount; } catch (error) { logError(error, 'Error calculating time aggregates'); throw error; } finally { if (connection) { connection.release(); } } } module.exports = calculateTimeAggregates;