const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress'); const { getConnection } = require('./utils/db'); async function calculateTimeAggregates(startTime, totalProducts, processedCount = 0, isCancelled = false) { const connection = await getConnection(); let success = false; const BATCH_SIZE = 5000; let myProcessedProducts = 0; // Track products processed *within this module* try { // Get last calculation timestamp const [lastCalc] = await connection.query(` SELECT last_calculation_timestamp FROM calculate_status WHERE module_name = 'time_aggregates' `); const lastCalculationTime = lastCalc[0]?.last_calculation_timestamp || '1970-01-01'; // We now receive totalProducts as an argument, so we don't need to query for it here. if (totalProducts === 0) { console.log('No products need time aggregate updates'); return { processedProducts: 0, processedOrders: 0, processedPurchaseOrders: 0, success: true }; } if (isCancelled) { outputProgress({ status: 'cancelled', operation: 'Time aggregates calculation cancelled', current: processedCount, // Use passed-in value total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: null, rate: calculateRate(startTime, processedCount), percentage: ((processedCount / totalProducts) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); return { processedProducts: myProcessedProducts, // Return only what *this* module processed processedOrders: 0, processedPurchaseOrders: 0, success }; } outputProgress({ status: 'running', operation: 'Starting time aggregates calculation', current: processedCount, // Use passed-in value total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, processedCount, totalProducts), rate: calculateRate(startTime, processedCount), percentage: ((processedCount / totalProducts) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); // Process in batches let lastPid = 0; while (true) { if (isCancelled) break; const [batch] = await connection.query(` SELECT DISTINCT p.pid FROM products p FORCE INDEX (PRIMARY) LEFT JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid WHERE p.pid > ? AND ( p.updated > ? OR EXISTS ( SELECT 1 FROM orders o2 FORCE INDEX (idx_orders_metrics) WHERE o2.pid = p.pid AND o2.updated > ? ) ) ORDER BY p.pid LIMIT ? `, [lastPid, lastCalculationTime, lastCalculationTime, BATCH_SIZE]); if (batch.length === 0) break; // Create temporary tables for better performance await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_order_stats'); await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_purchase_stats'); await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_time_aggregates'); // Create optimized temporary tables await connection.query(` CREATE TEMPORARY TABLE temp_order_stats ( pid BIGINT NOT NULL, year INT NOT NULL, month INT NOT NULL, total_quantity_sold INT DEFAULT 0, total_revenue DECIMAL(10,3) DEFAULT 0, total_cost DECIMAL(10,3) DEFAULT 0, order_count INT DEFAULT 0, avg_price DECIMAL(10,3), PRIMARY KEY (pid, year, month), INDEX (pid) ) ENGINE=MEMORY `); await connection.query(` CREATE TEMPORARY TABLE temp_purchase_stats ( pid BIGINT NOT NULL, year INT NOT NULL, month INT NOT NULL, stock_received INT DEFAULT 0, stock_ordered INT DEFAULT 0, PRIMARY KEY (pid, year, month), INDEX (pid) ) ENGINE=MEMORY `); await connection.query(` CREATE TEMPORARY TABLE temp_time_aggregates ( pid BIGINT NOT NULL, year INT NOT NULL, month INT NOT NULL, total_quantity_sold INT DEFAULT 0, total_revenue DECIMAL(10,3) DEFAULT 0, total_cost DECIMAL(10,3) DEFAULT 0, order_count INT DEFAULT 0, stock_received INT DEFAULT 0, stock_ordered INT DEFAULT 0, avg_price DECIMAL(10,3), profit_margin DECIMAL(10,3), inventory_value DECIMAL(10,3), gmroi DECIMAL(10,3), PRIMARY KEY (pid, year, month), INDEX (pid) ) ENGINE=MEMORY `); // Populate order stats await connection.query(` INSERT INTO temp_order_stats SELECT p.pid, YEAR(o.date) as year, MONTH(o.date) as month, SUM(o.quantity) as total_quantity_sold, SUM(o.quantity * o.price) as total_revenue, SUM(o.quantity * p.cost_price) as total_cost, COUNT(DISTINCT o.order_number) as order_count, AVG(o.price) as avg_price FROM products p FORCE INDEX (PRIMARY) INNER JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid WHERE p.pid IN (?) AND o.canceled = false AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH) GROUP BY p.pid, YEAR(o.date), MONTH(o.date) `, [batch.map(row => row.pid)]); // Populate purchase stats await connection.query(` INSERT INTO temp_purchase_stats SELECT p.pid, YEAR(po.date) as year, MONTH(po.date) as month, COALESCE(SUM(CASE WHEN po.received_date IS NOT NULL THEN po.received ELSE 0 END), 0) as stock_received, COALESCE(SUM(po.ordered), 0) as stock_ordered FROM products p FORCE INDEX (PRIMARY) INNER JOIN purchase_orders po FORCE INDEX (idx_po_metrics) ON p.pid = po.pid WHERE p.pid IN (?) AND po.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH) GROUP BY p.pid, YEAR(po.date), MONTH(po.date) `, [batch.map(row => row.pid)]); // Combine stats and calculate metrics await connection.query(` INSERT INTO temp_time_aggregates SELECT o.pid, o.year, o.month, o.total_quantity_sold, o.total_revenue, o.total_cost, o.order_count, COALESCE(ps.stock_received, 0) as stock_received, COALESCE(ps.stock_ordered, 0) as stock_ordered, o.avg_price, CASE WHEN o.total_revenue > 0 THEN ((o.total_revenue - o.total_cost) / o.total_revenue) * 100 ELSE 0 END as profit_margin, p.cost_price * p.stock_quantity as inventory_value, CASE WHEN (p.cost_price * p.stock_quantity) > 0 THEN (o.total_revenue - o.total_cost) / (p.cost_price * p.stock_quantity) ELSE 0 END as gmroi FROM temp_order_stats o LEFT JOIN temp_purchase_stats ps ON o.pid = ps.pid AND o.year = ps.year AND o.month = ps.month JOIN products p FORCE INDEX (PRIMARY) ON o.pid = p.pid `); // Update final table with optimized batch update await connection.query(` INSERT INTO product_time_aggregates ( pid, year, month, total_quantity_sold, total_revenue, total_cost, order_count, stock_received, stock_ordered, avg_price, profit_margin, inventory_value, gmroi ) SELECT * FROM temp_time_aggregates ON DUPLICATE KEY UPDATE total_quantity_sold = VALUES(total_quantity_sold), total_revenue = VALUES(total_revenue), total_cost = VALUES(total_cost), order_count = VALUES(order_count), stock_received = VALUES(stock_received), stock_ordered = VALUES(stock_ordered), avg_price = VALUES(avg_price), profit_margin = VALUES(profit_margin), inventory_value = VALUES(inventory_value), gmroi = VALUES(gmroi) `); // Clean up temp tables await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_order_stats'); await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_purchase_stats'); await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_time_aggregates'); lastPid = batch[batch.length - 1].pid; myProcessedProducts += batch.length; // Increment *this module's* count outputProgress({ status: 'running', operation: 'Processing time aggregates batch', current: processedCount + myProcessedProducts, // Show cumulative progress total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, processedCount + myProcessedProducts, totalProducts), rate: calculateRate(startTime, processedCount + myProcessedProducts), percentage: (((processedCount + myProcessedProducts) / totalProducts) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); } // If we get here, everything completed successfully success = true; // Update calculate_status await connection.query(` INSERT INTO calculate_status (module_name, last_calculation_timestamp) VALUES ('time_aggregates', NOW()) ON DUPLICATE KEY UPDATE last_calculation_timestamp = NOW() `); return { processedProducts: myProcessedProducts, // Return only what *this* module processed processedOrders: 0, processedPurchaseOrders: 0, success }; } catch (error) { success = false; logError(error, 'Error calculating time aggregates'); throw error; } finally { if (connection) { connection.release(); } } } module.exports = calculateTimeAggregates;