const path = require('path'); // Change working directory to script directory process.chdir(path.dirname(__filename)); require('dotenv').config({ path: path.resolve(__dirname, '..', '.env') }); // Configuration flags for controlling which metrics to calculate // Set to 1 to skip the corresponding calculation, 0 to run it const SKIP_PRODUCT_METRICS = 0; const SKIP_TIME_AGGREGATES = 0; const SKIP_FINANCIAL_METRICS = 0; const SKIP_VENDOR_METRICS = 0; const SKIP_CATEGORY_METRICS = 0; const SKIP_BRAND_METRICS = 0; const SKIP_SALES_FORECASTS = 0; // Add error handler for uncaught exceptions process.on('uncaughtException', (error) => { console.error('Uncaught Exception:', error); process.exit(1); }); // Add error handler for unhandled promise rejections process.on('unhandledRejection', (reason, promise) => { console.error('Unhandled Rejection at:', promise, 'reason:', reason); process.exit(1); }); const progress = require('./metrics/utils/progress'); console.log('Progress module loaded:', { modulePath: require.resolve('./metrics/utils/progress'), exports: Object.keys(progress), currentDir: process.cwd(), scriptDir: __dirname }); // Store progress functions in global scope to ensure availability global.formatElapsedTime = progress.formatElapsedTime; global.estimateRemaining = progress.estimateRemaining; global.calculateRate = progress.calculateRate; global.outputProgress = progress.outputProgress; global.clearProgress = progress.clearProgress; global.getProgress = progress.getProgress; global.logError = progress.logError; const { getConnection, closePool } = require('./metrics/utils/db'); const calculateProductMetrics = require('./metrics/product-metrics'); const calculateTimeAggregates = require('./metrics/time-aggregates'); const calculateFinancialMetrics = require('./metrics/financial-metrics'); const calculateVendorMetrics = require('./metrics/vendor-metrics'); const calculateCategoryMetrics = require('./metrics/category-metrics'); const calculateBrandMetrics = require('./metrics/brand-metrics'); const calculateSalesForecasts = require('./metrics/sales-forecasts'); // Add cancel handler let isCancelled = false; function cancelCalculation() { isCancelled = true; global.clearProgress(); // Format as SSE event const event = { progress: { status: 'cancelled', operation: 'Calculation cancelled', current: 0, total: 0, elapsed: null, remaining: null, rate: 0, timestamp: Date.now() } }; process.stdout.write(JSON.stringify(event) + '\n'); process.exit(0); } // Handle SIGTERM signal for cancellation process.on('SIGTERM', cancelCalculation); // Update the main calculation function to use the new modular structure async function calculateMetrics() { let connection; const startTime = Date.now(); // Initialize all counts to 0 let processedProducts = 0; let processedOrders = 0; let processedPurchaseOrders = 0; let totalProducts = 0; let totalOrders = 0; let totalPurchaseOrders = 0; let calculateHistoryId; try { // Clean up any previously running calculations connection = await getConnection(); await connection.query(` UPDATE calculate_history SET status = 'cancelled', end_time = NOW(), duration_seconds = TIMESTAMPDIFF(SECOND, start_time, NOW()), error_message = 'Previous calculation was not completed properly' WHERE status = 'running' `); // Get counts of records that need updating based on last calculation time const [[productCount], [orderCount], [poCount]] = await Promise.all([ connection.query(` SELECT COUNT(DISTINCT p.pid) as total FROM products p FORCE INDEX (PRIMARY) LEFT JOIN calculate_status cs ON cs.module_name = 'product_metrics' LEFT JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid AND o.updated > COALESCE(cs.last_calculation_timestamp, '1970-01-01') AND o.canceled = false LEFT JOIN purchase_orders po FORCE INDEX (idx_purchase_orders_metrics) ON p.pid = po.pid AND po.updated > COALESCE(cs.last_calculation_timestamp, '1970-01-01') WHERE p.updated > COALESCE(cs.last_calculation_timestamp, '1970-01-01') OR o.pid IS NOT NULL OR po.pid IS NOT NULL `), connection.query(` SELECT COUNT(DISTINCT o.id) as total FROM orders o FORCE INDEX (idx_orders_metrics) LEFT JOIN calculate_status cs ON cs.module_name = 'product_metrics' WHERE o.updated > COALESCE(cs.last_calculation_timestamp, '1970-01-01') AND o.canceled = false `), connection.query(` SELECT COUNT(DISTINCT po.id) as total FROM purchase_orders po FORCE INDEX (idx_purchase_orders_metrics) LEFT JOIN calculate_status cs ON cs.module_name = 'product_metrics' WHERE po.updated > COALESCE(cs.last_calculation_timestamp, '1970-01-01') `) ]); totalProducts = productCount.total; totalOrders = orderCount.total; totalPurchaseOrders = poCount.total; connection.release(); // If nothing needs updating, we can exit early if (totalProducts === 0 && totalOrders === 0 && totalPurchaseOrders === 0) { console.log('No records need updating'); return { processedProducts: 0, processedOrders: 0, processedPurchaseOrders: 0, success: true }; } // Create history record for this calculation connection = await getConnection(); // Re-establish connection const [historyResult] = await connection.query(` INSERT INTO calculate_history ( start_time, status, total_products, total_orders, total_purchase_orders, additional_info ) VALUES ( NOW(), 'running', ?, ?, ?, JSON_OBJECT( 'skip_product_metrics', ?, 'skip_time_aggregates', ?, 'skip_financial_metrics', ?, 'skip_vendor_metrics', ?, 'skip_category_metrics', ?, 'skip_brand_metrics', ?, 'skip_sales_forecasts', ? ) ) `, [ totalProducts, totalOrders, totalPurchaseOrders, SKIP_PRODUCT_METRICS, SKIP_TIME_AGGREGATES, SKIP_FINANCIAL_METRICS, SKIP_VENDOR_METRICS, SKIP_CATEGORY_METRICS, SKIP_BRAND_METRICS, SKIP_SALES_FORECASTS ]); calculateHistoryId = historyResult.insertId; connection.release(); // Add debug logging for the progress functions console.log('Debug - Progress functions:', { formatElapsedTime: typeof global.formatElapsedTime, estimateRemaining: typeof global.estimateRemaining, calculateRate: typeof global.calculateRate, startTime: startTime }); try { const elapsed = global.formatElapsedTime(startTime); console.log('Debug - formatElapsedTime test successful:', elapsed); } catch (err) { console.error('Debug - Error testing formatElapsedTime:', err); throw err; } isCancelled = false; connection = await getConnection(); // Get a new connection for the main processing try { global.outputProgress({ status: 'running', operation: 'Starting metrics calculation', current: 0, total: 100, elapsed: '0s', remaining: 'Calculating...', rate: 0, percentage: '0', timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); // Update progress periodically - REFACTORED const updateProgress = async (products = null, orders = null, purchaseOrders = null) => { if (products !== null) processedProducts = Number(products) || processedProducts || 0; if (orders !== null) processedOrders = Number(orders) || processedOrders || 0; if (purchaseOrders !== null) processedPurchaseOrders = Number(purchaseOrders) || processedPurchaseOrders || 0; const safeProducts = Number(processedProducts) || 0; const safeOrders = Number(processedOrders) || 0; const safePurchaseOrders = Number(processedPurchaseOrders) || 0; await connection.query(` UPDATE calculate_history SET processed_products = ?, processed_orders = ?, processed_purchase_orders = ? WHERE id = ? `, [safeProducts, safeOrders, safePurchaseOrders, calculateHistoryId]); }; // Helper function to ensure valid progress numbers - this is fine const ensureValidProgress = (current, total) => ({ current: Number(current) || 0, total: Number(total) || 1, // Default to 1 to avoid division by zero percentage: (((Number(current) || 0) / (Number(total) || 1)) * 100).toFixed(1) }); // Initial progress - this is fine const initialProgress = ensureValidProgress(0, totalProducts); global.outputProgress({ status: 'running', operation: 'Starting metrics calculation', current: initialProgress.current, total: initialProgress.total, elapsed: '0s', remaining: 'Calculating...', rate: 0, percentage: initialProgress.percentage, timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); // --- Call each module, passing totals and accumulating processed counts --- if (!SKIP_PRODUCT_METRICS) { const result = await calculateProductMetrics(startTime, totalProducts, processedProducts, isCancelled); // Pass totals processedProducts += result.processedProducts; // Accumulate processedOrders += result.processedOrders; processedPurchaseOrders += result.processedPurchaseOrders; await updateProgress(processedProducts, processedOrders, processedPurchaseOrders); // Update with accumulated values if (!result.success) { throw new Error('Product metrics calculation failed'); } } else { console.log('Skipping product metrics calculation...'); // Don't artificially inflate processedProducts if skipping } if (!SKIP_TIME_AGGREGATES) { const result = await calculateTimeAggregates(startTime, totalProducts, processedProducts, isCancelled); // Pass totals processedProducts += result.processedProducts; // Accumulate processedOrders += result.processedOrders; processedPurchaseOrders += result.processedPurchaseOrders; await updateProgress(processedProducts, processedOrders, processedPurchaseOrders); if (!result.success) { throw new Error('Time aggregates calculation failed'); } } else { console.log('Skipping time aggregates calculation'); } if (!SKIP_FINANCIAL_METRICS) { const result = await calculateFinancialMetrics(startTime, totalProducts, processedProducts, isCancelled); // Pass totals processedProducts += result.processedProducts; // Accumulate processedOrders += result.processedOrders; processedPurchaseOrders += result.processedPurchaseOrders; await updateProgress(processedProducts, processedOrders, processedPurchaseOrders); if (!result.success) { throw new Error('Financial metrics calculation failed'); } } else { console.log('Skipping financial metrics calculation'); } if (!SKIP_VENDOR_METRICS) { const result = await calculateVendorMetrics(startTime, totalProducts, processedProducts, isCancelled); // Pass totals processedProducts += result.processedProducts; // Accumulate processedOrders += result.processedOrders; processedPurchaseOrders += result.processedPurchaseOrders; await updateProgress(processedProducts, processedOrders, processedPurchaseOrders); if (!result.success) { throw new Error('Vendor metrics calculation failed'); } } else { console.log('Skipping vendor metrics calculation'); } if (!SKIP_CATEGORY_METRICS) { const result = await calculateCategoryMetrics(startTime, totalProducts, processedProducts, isCancelled); // Pass totals processedProducts += result.processedProducts; // Accumulate processedOrders += result.processedOrders; processedPurchaseOrders += result.processedPurchaseOrders; await updateProgress(processedProducts, processedOrders, processedPurchaseOrders); if (!result.success) { throw new Error('Category metrics calculation failed'); } } else { console.log('Skipping category metrics calculation'); } if (!SKIP_BRAND_METRICS) { const result = await calculateBrandMetrics(startTime, totalProducts, processedProducts, isCancelled); // Pass totals processedProducts += result.processedProducts; // Accumulate processedOrders += result.processedOrders; processedPurchaseOrders += result.processedPurchaseOrders; await updateProgress(processedProducts, processedOrders, processedPurchaseOrders); if (!result.success) { throw new Error('Brand metrics calculation failed'); } } else { console.log('Skipping brand metrics calculation'); } if (!SKIP_SALES_FORECASTS) { const result = await calculateSalesForecasts(startTime, totalProducts, processedProducts, isCancelled); // Pass totals processedProducts += result.processedProducts; // Accumulate processedOrders += result.processedOrders; processedPurchaseOrders += result.processedPurchaseOrders; await updateProgress(processedProducts, processedOrders, processedPurchaseOrders); if (!result.success) { throw new Error('Sales forecasts calculation failed'); } } else { console.log('Skipping sales forecasts calculation'); } // --- ABC Classification (Refactored) --- if (isCancelled) return { processedProducts: processedProducts || 0, processedOrders: processedOrders || 0, processedPurchaseOrders: 0, success: false }; const [abcConfig] = await connection.query('SELECT a_threshold, b_threshold FROM abc_classification_config WHERE id = 1'); const abcThresholds = abcConfig[0] || { a_threshold: 20, b_threshold: 50 }; // First, create and populate the rankings table with an index await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_revenue_ranks'); await connection.query(` CREATE TEMPORARY TABLE temp_revenue_ranks ( pid BIGINT NOT NULL, total_revenue DECIMAL(10,3), rank_num INT, dense_rank_num INT, percentile DECIMAL(5,2), total_count INT, PRIMARY KEY (pid), INDEX (rank_num), INDEX (dense_rank_num), INDEX (percentile) ) ENGINE=MEMORY `); let processedCount = processedProducts; outputProgress({ status: 'running', operation: 'Creating revenue rankings', current: processedCount, total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, processedCount, totalProducts), rate: calculateRate(startTime, processedCount), percentage: ((processedCount / totalProducts) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); if (isCancelled) return { processedProducts: processedProducts || 0, processedOrders: processedOrders || 0, processedPurchaseOrders: 0, success: false }; // Calculate rankings with proper tie handling and get total count in one go. await connection.query(` INSERT INTO temp_revenue_ranks WITH revenue_data AS ( SELECT pid, total_revenue, COUNT(*) OVER () as total_count, PERCENT_RANK() OVER (ORDER BY total_revenue DESC) * 100 as percentile, RANK() OVER (ORDER BY total_revenue DESC) as rank_num, DENSE_RANK() OVER (ORDER BY total_revenue DESC) as dense_rank_num FROM product_metrics WHERE total_revenue > 0 ) SELECT pid, total_revenue, rank_num, dense_rank_num, percentile, total_count FROM revenue_data `); // Get total count for percentage calculation (already done in the above query) // No need for this separate query: // const [rankingCount] = await connection.query('SELECT MAX(rank_num) as total_count FROM temp_revenue_ranks'); // const totalCount = rankingCount[0].total_count || 1; // const max_rank = totalCount; // Store max_rank for use in classification // ABC classification progress tracking let abcProcessedCount = 0; const batchSize = 5000; let lastProgressUpdate = Date.now(); const progressUpdateInterval = 1000; // Update every second while (true) { if (isCancelled) return { processedProducts: Number(processedProducts) || 0, processedOrders: Number(processedOrders) || 0, processedPurchaseOrders: 0, success: false }; // Get a batch of PIDs that need updating - REFACTORED to use percentile const [pids] = await connection.query(` SELECT pm.pid FROM product_metrics pm LEFT JOIN temp_revenue_ranks tr ON pm.pid = tr.pid WHERE pm.abc_class IS NULL OR pm.abc_class != CASE WHEN tr.pid IS NULL THEN 'C' WHEN tr.percentile <= ? THEN 'A' WHEN tr.percentile <= ? THEN 'B' ELSE 'C' END LIMIT ? `, [abcThresholds.a_threshold, abcThresholds.b_threshold, batchSize]); if (pids.length === 0) { break; } // Update just those PIDs - REFACTORED to use percentile await connection.query(` UPDATE product_metrics pm LEFT JOIN temp_revenue_ranks tr ON pm.pid = tr.pid SET pm.abc_class = CASE WHEN tr.pid IS NULL THEN 'C' WHEN tr.percentile <= ? THEN 'A' WHEN tr.percentile <= ? THEN 'B' ELSE 'C' END, pm.last_calculated_at = NOW() WHERE pm.pid IN (?) `, [abcThresholds.a_threshold, abcThresholds.b_threshold, pids.map(row => row.pid)]); abcProcessedCount += pids.length; // Use pids.length, more accurate processedProducts += pids.length; // Add to the main processedProducts // Calculate progress ensuring valid numbers const currentProgress = Math.floor(totalProducts * (0.99 + (abcProcessedCount / (totalProducts || 1)) * 0.01)); processedProducts = Number(currentProgress) || processedProducts || 0; // Only update progress at most once per second const now = Date.now(); if (now - lastProgressUpdate >= progressUpdateInterval) { const progress = ensureValidProgress(processedProducts, totalProducts); outputProgress({ status: 'running', operation: 'ABC classification progress', current: progress.current, total: progress.total, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, progress.current, progress.total), rate: calculateRate(startTime, progress.current), percentage: progress.percentage, timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); lastProgressUpdate = now; } // Update database progress await updateProgress(processedProducts, processedOrders, processedPurchaseOrders); // Small delay between batches to allow other transactions await new Promise(resolve => setTimeout(resolve, 100)); } // Clean up await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_revenue_ranks'); const endTime = Date.now(); const totalElapsedSeconds = Math.round((endTime - startTime) / 1000); // Update calculate_status for ABC classification await connection.query(` INSERT INTO calculate_status (module_name, last_calculation_timestamp) VALUES ('abc_classification', NOW()) ON DUPLICATE KEY UPDATE last_calculation_timestamp = NOW() `); // Final progress update with guaranteed valid numbers const finalProgress = ensureValidProgress(totalProducts, totalProducts); // Final success message outputProgress({ status: 'complete', operation: 'Metrics calculation complete', current: finalProgress.current, total: finalProgress.total, elapsed: formatElapsedTime(startTime), remaining: '0s', rate: calculateRate(startTime, finalProgress.current), percentage: '100', timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: totalElapsedSeconds } }); // Ensure all values are valid numbers before final update const finalStats = { processedProducts: Number(processedProducts) || 0, processedOrders: Number(processedOrders) || 0, processedPurchaseOrders: Number(processedPurchaseOrders) || 0 }; // Update history with completion await connection.query(` UPDATE calculate_history SET end_time = NOW(), duration_seconds = ?, processed_products = ?, processed_orders = ?, processed_purchase_orders = ?, status = 'completed' WHERE id = ? `, [totalElapsedSeconds, finalStats.processedProducts, finalStats.processedOrders, finalStats.processedPurchaseOrders, calculateHistoryId]); // Clear progress file on successful completion global.clearProgress(); } catch (error) { const endTime = Date.now(); const totalElapsedSeconds = Math.round((endTime - startTime) / 1000); // Update history with error await connection.query(` UPDATE calculate_history SET end_time = NOW(), duration_seconds = ?, processed_products = ?, processed_orders = ?, processed_purchase_orders = ?, status = ?, error_message = ? WHERE id = ? `, [ totalElapsedSeconds, processedProducts || 0, // Ensure we have a valid number processedOrders || 0, // Ensure we have a valid number processedPurchaseOrders || 0, // Ensure we have a valid number isCancelled ? 'cancelled' : 'failed', error.message, calculateHistoryId ]); if (isCancelled) { global.outputProgress({ status: 'cancelled', operation: 'Calculation cancelled', current: processedProducts, total: totalProducts || 0, elapsed: global.formatElapsedTime(startTime), remaining: null, rate: global.calculateRate(startTime, processedProducts), percentage: ((processedProducts / (totalProducts || 1)) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); } else { global.outputProgress({ status: 'error', operation: 'Error: ' + error.message, current: processedProducts, total: totalProducts || 0, elapsed: global.formatElapsedTime(startTime), remaining: null, rate: global.calculateRate(startTime, processedProducts), percentage: ((processedProducts / (totalProducts || 1)) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); } throw error; } finally { if (connection) { connection.release(); } } } finally { // Close the connection pool when we're done await closePool(); } } // Export both functions and progress checker module.exports = calculateMetrics; module.exports.cancelCalculation = cancelCalculation; module.exports.getProgress = global.getProgress; // Run directly if called from command line if (require.main === module) { calculateMetrics().catch(error => { if (!error.message.includes('Operation cancelled')) { console.error('Error:', error); } process.exit(1); }); }