const path = require('path'); // Change working directory to script directory process.chdir(path.dirname(__filename)); require('dotenv').config({ path: path.resolve(__dirname, '..', '.env') }); // Configuration flags for controlling which metrics to calculate // Set to 1 to skip the corresponding calculation, 0 to run it const SKIP_PRODUCT_METRICS = 0; const SKIP_TIME_AGGREGATES = 0; const SKIP_FINANCIAL_METRICS = 0; const SKIP_VENDOR_METRICS = 0; const SKIP_CATEGORY_METRICS = 0; const SKIP_BRAND_METRICS = 0; const SKIP_SALES_FORECASTS = 0; // Add error handler for uncaught exceptions process.on('uncaughtException', (error) => { console.error('Uncaught Exception:', error); process.exit(1); }); // Add error handler for unhandled promise rejections process.on('unhandledRejection', (reason, promise) => { console.error('Unhandled Rejection at:', promise, 'reason:', reason); process.exit(1); }); const progress = require('./metrics/utils/progress'); console.log('Progress module loaded:', { modulePath: require.resolve('./metrics/utils/progress'), exports: Object.keys(progress), currentDir: process.cwd(), scriptDir: __dirname }); // Store progress functions in global scope to ensure availability global.formatElapsedTime = progress.formatElapsedTime; global.estimateRemaining = progress.estimateRemaining; global.calculateRate = progress.calculateRate; global.outputProgress = progress.outputProgress; global.clearProgress = progress.clearProgress; global.getProgress = progress.getProgress; global.logError = progress.logError; const { getConnection, closePool } = require('./metrics/utils/db'); const calculateProductMetrics = require('./metrics/product-metrics'); const calculateTimeAggregates = require('./metrics/time-aggregates'); const calculateFinancialMetrics = require('./metrics/financial-metrics'); const calculateVendorMetrics = require('./metrics/vendor-metrics'); const calculateCategoryMetrics = require('./metrics/category-metrics'); const calculateBrandMetrics = require('./metrics/brand-metrics'); const calculateSalesForecasts = require('./metrics/sales-forecasts'); // Add cancel handler let isCancelled = false; function cancelCalculation() { isCancelled = true; global.clearProgress(); // Format as SSE event const event = { progress: { status: 'cancelled', operation: 'Calculation cancelled', current: 0, total: 0, elapsed: null, remaining: null, rate: 0, timestamp: Date.now() } }; process.stdout.write(JSON.stringify(event) + '\n'); process.exit(0); } // Handle SIGTERM signal for cancellation process.on('SIGTERM', cancelCalculation); // Update the main calculation function to use the new modular structure async function calculateMetrics() { let connection; const startTime = Date.now(); let processedCount = 0; let totalProducts = 0; try { // Add debug logging for the progress functions console.log('Debug - Progress functions:', { formatElapsedTime: typeof global.formatElapsedTime, estimateRemaining: typeof global.estimateRemaining, calculateRate: typeof global.calculateRate, startTime: startTime }); try { const elapsed = global.formatElapsedTime(startTime); console.log('Debug - formatElapsedTime test successful:', elapsed); } catch (err) { console.error('Debug - Error testing formatElapsedTime:', err); throw err; } isCancelled = false; connection = await getConnection(); try { global.outputProgress({ status: 'running', operation: 'Starting metrics calculation', current: 0, total: 100, elapsed: '0s', remaining: 'Calculating...', rate: 0, percentage: '0', timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); // Get total number of products const [countResult] = await connection.query('SELECT COUNT(*) as total FROM products') .catch(err => { global.logError(err, 'Failed to count products'); throw err; }); totalProducts = countResult[0].total; if (!SKIP_PRODUCT_METRICS) { processedCount = await calculateProductMetrics(startTime, totalProducts); } else { console.log('Skipping product metrics calculation...'); processedCount = Math.floor(totalProducts * 0.6); global.outputProgress({ status: 'running', operation: 'Skipping product metrics calculation', current: processedCount, total: totalProducts, elapsed: global.formatElapsedTime(startTime), remaining: global.estimateRemaining(startTime, processedCount, totalProducts), rate: global.calculateRate(startTime, processedCount), percentage: '60', timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); } // Calculate time-based aggregates if (!SKIP_TIME_AGGREGATES) { processedCount = await calculateTimeAggregates(startTime, totalProducts, processedCount); } else { console.log('Skipping time aggregates calculation'); } // Calculate financial metrics if (!SKIP_FINANCIAL_METRICS) { processedCount = await calculateFinancialMetrics(startTime, totalProducts, processedCount); } else { console.log('Skipping financial metrics calculation'); } // Calculate vendor metrics if (!SKIP_VENDOR_METRICS) { processedCount = await calculateVendorMetrics(startTime, totalProducts, processedCount); } else { console.log('Skipping vendor metrics calculation'); } // Calculate category metrics if (!SKIP_CATEGORY_METRICS) { processedCount = await calculateCategoryMetrics(startTime, totalProducts, processedCount); } else { console.log('Skipping category metrics calculation'); } // Calculate brand metrics if (!SKIP_BRAND_METRICS) { processedCount = await calculateBrandMetrics(startTime, totalProducts, processedCount); } else { console.log('Skipping brand metrics calculation'); } // Calculate sales forecasts if (!SKIP_SALES_FORECASTS) { processedCount = await calculateSalesForecasts(startTime, totalProducts, processedCount); } else { console.log('Skipping sales forecasts calculation'); } // Calculate ABC classification outputProgress({ status: 'running', operation: 'Starting ABC classification', current: processedCount, total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, processedCount, totalProducts), rate: calculateRate(startTime, processedCount), percentage: ((processedCount / totalProducts) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); if (isCancelled) return processedCount; const [abcConfig] = await connection.query('SELECT a_threshold, b_threshold FROM abc_classification_config WHERE id = 1'); const abcThresholds = abcConfig[0] || { a_threshold: 20, b_threshold: 50 }; // First, create and populate the rankings table with an index await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_revenue_ranks'); await connection.query(` CREATE TEMPORARY TABLE temp_revenue_ranks ( pid BIGINT NOT NULL, total_revenue DECIMAL(10,3), rank_num INT, total_count INT, PRIMARY KEY (pid), INDEX (rank_num) ) ENGINE=MEMORY `); outputProgress({ status: 'running', operation: 'Creating revenue rankings', current: processedCount, total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, processedCount, totalProducts), rate: calculateRate(startTime, processedCount), percentage: ((processedCount / totalProducts) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); if (isCancelled) return processedCount; await connection.query(` INSERT INTO temp_revenue_ranks SELECT pid, total_revenue, @rank := @rank + 1 as rank_num, @total_count := @rank as total_count FROM ( SELECT pid, total_revenue FROM product_metrics WHERE total_revenue > 0 ORDER BY total_revenue DESC ) ranked, (SELECT @rank := 0) r `); // Get total count for percentage calculation const [rankingCount] = await connection.query('SELECT MAX(rank_num) as total_count FROM temp_revenue_ranks'); const totalCount = rankingCount[0].total_count || 1; const max_rank = totalCount; // Store max_rank for use in classification outputProgress({ status: 'running', operation: 'Updating ABC classifications', current: processedCount, total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, processedCount, totalProducts), rate: calculateRate(startTime, processedCount), percentage: ((processedCount / totalProducts) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); if (isCancelled) return processedCount; // Process updates in batches let abcProcessedCount = 0; const batchSize = 5000; while (true) { if (isCancelled) return processedCount; // First get a batch of PIDs that need updating const [pids] = await connection.query(` SELECT pm.pid FROM product_metrics pm LEFT JOIN temp_revenue_ranks tr ON pm.pid = tr.pid WHERE pm.abc_class IS NULL OR pm.abc_class != CASE WHEN tr.rank_num IS NULL THEN 'C' WHEN (tr.rank_num / ?) * 100 <= ? THEN 'A' WHEN (tr.rank_num / ?) * 100 <= ? THEN 'B' ELSE 'C' END LIMIT ? `, [max_rank, abcThresholds.a_threshold, max_rank, abcThresholds.b_threshold, batchSize]); if (pids.length === 0) { break; } // Then update just those PIDs const [result] = await connection.query(` UPDATE product_metrics pm LEFT JOIN temp_revenue_ranks tr ON pm.pid = tr.pid SET pm.abc_class = CASE WHEN tr.rank_num IS NULL THEN 'C' WHEN (tr.rank_num / ?) * 100 <= ? THEN 'A' WHEN (tr.rank_num / ?) * 100 <= ? THEN 'B' ELSE 'C' END, pm.last_calculated_at = NOW() WHERE pm.pid IN (?) `, [max_rank, abcThresholds.a_threshold, max_rank, abcThresholds.b_threshold, pids.map(row => row.pid)]); abcProcessedCount += result.affectedRows; processedCount = Math.floor(totalProducts * (0.99 + (abcProcessedCount / totalCount) * 0.01)); outputProgress({ status: 'running', operation: 'ABC classification progress', current: processedCount, total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, processedCount, totalProducts), rate: calculateRate(startTime, processedCount), percentage: ((processedCount / totalProducts) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); // Small delay between batches to allow other transactions await new Promise(resolve => setTimeout(resolve, 100)); } // Clean up await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_revenue_ranks'); // Final success message outputProgress({ status: 'complete', operation: 'Metrics calculation complete', current: totalProducts, total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: '0s', rate: calculateRate(startTime, totalProducts), percentage: '100', timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); // Clear progress file on successful completion global.clearProgress(); } catch (error) { if (isCancelled) { global.outputProgress({ status: 'cancelled', operation: 'Calculation cancelled', current: processedCount, total: totalProducts || 0, elapsed: global.formatElapsedTime(startTime), remaining: null, rate: global.calculateRate(startTime, processedCount), percentage: ((processedCount / (totalProducts || 1)) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); } else { global.outputProgress({ status: 'error', operation: 'Error: ' + error.message, current: processedCount, total: totalProducts || 0, elapsed: global.formatElapsedTime(startTime), remaining: null, rate: global.calculateRate(startTime, processedCount), percentage: ((processedCount / (totalProducts || 1)) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); } throw error; } finally { if (connection) { connection.release(); } } } finally { // Close the connection pool when we're done await closePool(); } } // Export both functions and progress checker module.exports = calculateMetrics; module.exports.cancelCalculation = cancelCalculation; module.exports.getProgress = global.getProgress; // Run directly if called from command line if (require.main === module) { calculateMetrics().catch(error => { if (!error.message.includes('Operation cancelled')) { console.error('Error:', error); } process.exit(1); }); }