const path = require('path'); // Change working directory to script directory process.chdir(path.dirname(__filename)); require('dotenv').config({ path: path.resolve(__dirname, '..', '.env') }); // Configuration flags for controlling which metrics to calculate // Set to 1 to skip the corresponding calculation, 0 to run it const SKIP_PRODUCT_METRICS = 0; const SKIP_TIME_AGGREGATES = 0; const SKIP_FINANCIAL_METRICS = 0; const SKIP_VENDOR_METRICS = 0; const SKIP_CATEGORY_METRICS = 0; const SKIP_BRAND_METRICS = 0; const SKIP_SALES_FORECASTS = 0; // Add error handler for uncaught exceptions process.on('uncaughtException', (error) => { console.error('Uncaught Exception:', error); process.exit(1); }); // Add error handler for unhandled promise rejections process.on('unhandledRejection', (reason, promise) => { console.error('Unhandled Rejection at:', promise, 'reason:', reason); process.exit(1); }); const progress = require('./metrics/utils/progress'); console.log('Progress module loaded:', { modulePath: require.resolve('./metrics/utils/progress'), exports: Object.keys(progress), currentDir: process.cwd(), scriptDir: __dirname }); // Store progress functions in global scope to ensure availability global.formatElapsedTime = progress.formatElapsedTime; global.estimateRemaining = progress.estimateRemaining; global.calculateRate = progress.calculateRate; global.outputProgress = progress.outputProgress; global.clearProgress = progress.clearProgress; global.getProgress = progress.getProgress; global.logError = progress.logError; // List of temporary tables used in the calculation process const TEMP_TABLES = [ 'temp_revenue_ranks', 'temp_sales_metrics', 'temp_purchase_metrics', 'temp_product_metrics', 'temp_vendor_metrics', 'temp_category_metrics', 'temp_brand_metrics', 'temp_forecast_dates', 'temp_daily_sales', 'temp_product_stats', 'temp_category_sales', 'temp_category_stats' ]; // Add cleanup function for temporary tables async function cleanupTemporaryTables(connection) { try { for (const table of TEMP_TABLES) { await connection.query(`DROP TEMPORARY TABLE IF EXISTS ${table}`); } } catch (error) { logError(error, 'Error cleaning up temporary tables'); throw error; // Re-throw to be handled by the caller } } const { getConnection, closePool } = require('./metrics/utils/db'); const calculateProductMetrics = require('./metrics/product-metrics'); const calculateTimeAggregates = require('./metrics/time-aggregates'); const calculateFinancialMetrics = require('./metrics/financial-metrics'); const calculateVendorMetrics = require('./metrics/vendor-metrics'); const calculateCategoryMetrics = require('./metrics/category-metrics'); const calculateBrandMetrics = require('./metrics/brand-metrics'); const calculateSalesForecasts = require('./metrics/sales-forecasts'); // Add cancel handler let isCancelled = false; function cancelCalculation() { isCancelled = true; global.clearProgress(); // Format as SSE event const event = { progress: { status: 'cancelled', operation: 'Calculation cancelled', current: 0, total: 0, elapsed: null, remaining: null, rate: 0, timestamp: Date.now() } }; process.stdout.write(JSON.stringify(event) + '\n'); process.exit(0); } // Handle SIGTERM signal for cancellation process.on('SIGTERM', cancelCalculation); // Update the main calculation function to use the new modular structure async function calculateMetrics() { let connection; const startTime = Date.now(); let processedProducts = 0; let processedOrders = 0; let processedPurchaseOrders = 0; let totalProducts = 0; let totalOrders = 0; let totalPurchaseOrders = 0; let calculateHistoryId; try { // Clean up any previously running calculations connection = await getConnection(); await connection.query(` UPDATE calculate_history SET status = 'cancelled', end_time = NOW(), duration_seconds = TIMESTAMPDIFF(SECOND, start_time, NOW()), error_message = 'Previous calculation was not completed properly' WHERE status = 'running' `); // Get counts from all relevant tables const [[productCount], [orderCount], [poCount]] = await Promise.all([ connection.query('SELECT COUNT(*) as total FROM products'), connection.query('SELECT COUNT(*) as total FROM orders'), connection.query('SELECT COUNT(*) as total FROM purchase_orders') ]); totalProducts = productCount.total; totalOrders = orderCount.total; totalPurchaseOrders = poCount.total; // Create history record for this calculation const [historyResult] = await connection.query(` INSERT INTO calculate_history ( start_time, status, total_products, total_orders, total_purchase_orders, additional_info ) VALUES ( NOW(), 'running', ?, ?, ?, JSON_OBJECT( 'skip_product_metrics', ?, 'skip_time_aggregates', ?, 'skip_financial_metrics', ?, 'skip_vendor_metrics', ?, 'skip_category_metrics', ?, 'skip_brand_metrics', ?, 'skip_sales_forecasts', ? ) ) `, [ totalProducts, totalOrders, totalPurchaseOrders, SKIP_PRODUCT_METRICS, SKIP_TIME_AGGREGATES, SKIP_FINANCIAL_METRICS, SKIP_VENDOR_METRICS, SKIP_CATEGORY_METRICS, SKIP_BRAND_METRICS, SKIP_SALES_FORECASTS ]); calculateHistoryId = historyResult.insertId; connection.release(); // Add debug logging for the progress functions console.log('Debug - Progress functions:', { formatElapsedTime: typeof global.formatElapsedTime, estimateRemaining: typeof global.estimateRemaining, calculateRate: typeof global.calculateRate, startTime: startTime }); try { const elapsed = global.formatElapsedTime(startTime); console.log('Debug - formatElapsedTime test successful:', elapsed); } catch (err) { console.error('Debug - Error testing formatElapsedTime:', err); throw err; } isCancelled = false; connection = await getConnection(); try { global.outputProgress({ status: 'running', operation: 'Starting metrics calculation', current: 0, total: 100, elapsed: '0s', remaining: 'Calculating...', rate: 0, percentage: '0', timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); // Update progress periodically const updateProgress = async (products = null, orders = null, purchaseOrders = null) => { // Ensure all values are valid numbers or default to previous value if (products !== null) processedProducts = Number(products) || processedProducts || 0; if (orders !== null) processedOrders = Number(orders) || processedOrders || 0; if (purchaseOrders !== null) processedPurchaseOrders = Number(purchaseOrders) || processedPurchaseOrders || 0; // Ensure we never send NaN to the database const safeProducts = Number(processedProducts) || 0; const safeOrders = Number(processedOrders) || 0; const safePurchaseOrders = Number(processedPurchaseOrders) || 0; await connection.query(` UPDATE calculate_history SET processed_products = ?, processed_orders = ?, processed_purchase_orders = ? WHERE id = ? `, [safeProducts, safeOrders, safePurchaseOrders, calculateHistoryId]); }; // Helper function to ensure valid progress numbers const ensureValidProgress = (current, total) => ({ current: Number(current) || 0, total: Number(total) || 1, // Default to 1 to avoid division by zero percentage: (((Number(current) || 0) / (Number(total) || 1)) * 100).toFixed(1) }); // Initial progress const initialProgress = ensureValidProgress(0, totalProducts); global.outputProgress({ status: 'running', operation: 'Starting metrics calculation', current: initialProgress.current, total: initialProgress.total, elapsed: '0s', remaining: 'Calculating...', rate: 0, percentage: initialProgress.percentage, timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); if (!SKIP_PRODUCT_METRICS) { const result = await calculateProductMetrics(startTime, totalProducts); await updateProgress(result.processedProducts, result.processedOrders, result.processedPurchaseOrders); if (!result.success) { throw new Error('Product metrics calculation failed'); } } else { console.log('Skipping product metrics calculation...'); processedProducts = Math.floor(totalProducts * 0.6); await updateProgress(processedProducts); global.outputProgress({ status: 'running', operation: 'Skipping product metrics calculation', current: processedProducts, total: totalProducts, elapsed: global.formatElapsedTime(startTime), remaining: global.estimateRemaining(startTime, processedProducts, totalProducts), rate: global.calculateRate(startTime, processedProducts), percentage: '60', timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); } // Calculate time-based aggregates if (!SKIP_TIME_AGGREGATES) { const result = await calculateTimeAggregates(startTime, totalProducts, processedProducts); await updateProgress(result.processedProducts, result.processedOrders, result.processedPurchaseOrders); if (!result.success) { throw new Error('Time aggregates calculation failed'); } } else { console.log('Skipping time aggregates calculation'); } // Calculate financial metrics if (!SKIP_FINANCIAL_METRICS) { const result = await calculateFinancialMetrics(startTime, totalProducts, processedProducts); await updateProgress(result.processedProducts, result.processedOrders, result.processedPurchaseOrders); if (!result.success) { throw new Error('Financial metrics calculation failed'); } } else { console.log('Skipping financial metrics calculation'); } // Calculate vendor metrics if (!SKIP_VENDOR_METRICS) { const result = await calculateVendorMetrics(startTime, totalProducts, processedProducts); await updateProgress(result.processedProducts, result.processedOrders, result.processedPurchaseOrders); if (!result.success) { throw new Error('Vendor metrics calculation failed'); } } else { console.log('Skipping vendor metrics calculation'); } // Calculate category metrics if (!SKIP_CATEGORY_METRICS) { const result = await calculateCategoryMetrics(startTime, totalProducts, processedProducts); await updateProgress(result.processedProducts, result.processedOrders, result.processedPurchaseOrders); if (!result.success) { throw new Error('Category metrics calculation failed'); } } else { console.log('Skipping category metrics calculation'); } // Calculate brand metrics if (!SKIP_BRAND_METRICS) { const result = await calculateBrandMetrics(startTime, totalProducts, processedProducts); await updateProgress(result.processedProducts, result.processedOrders, result.processedPurchaseOrders); if (!result.success) { throw new Error('Brand metrics calculation failed'); } } else { console.log('Skipping brand metrics calculation'); } // Calculate sales forecasts if (!SKIP_SALES_FORECASTS) { const result = await calculateSalesForecasts(startTime, totalProducts, processedProducts); await updateProgress(result.processedProducts, result.processedOrders, result.processedPurchaseOrders); if (!result.success) { throw new Error('Sales forecasts calculation failed'); } } else { console.log('Skipping sales forecasts calculation'); } // Calculate ABC classification outputProgress({ status: 'running', operation: 'Starting ABC classification', current: processedProducts || 0, total: totalProducts || 0, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, processedProducts || 0, totalProducts || 0), rate: calculateRate(startTime, processedProducts || 0), percentage: (((processedProducts || 0) / (totalProducts || 1)) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); if (isCancelled) return { processedProducts: processedProducts || 0, processedOrders: processedOrders || 0, processedPurchaseOrders: 0, success: false }; const [abcConfig] = await connection.query('SELECT a_threshold, b_threshold FROM abc_classification_config WHERE id = 1'); const abcThresholds = abcConfig[0] || { a_threshold: 20, b_threshold: 50 }; // First, create and populate the rankings table with an index await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_revenue_ranks'); await connection.query(` CREATE TEMPORARY TABLE temp_revenue_ranks ( pid BIGINT NOT NULL, total_revenue DECIMAL(10,3), rank_num INT, total_count INT, PRIMARY KEY (pid), INDEX (rank_num) ) ENGINE=MEMORY `); outputProgress({ status: 'running', operation: 'Creating revenue rankings', current: processedProducts || 0, total: totalProducts || 0, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, processedProducts || 0, totalProducts || 0), rate: calculateRate(startTime, processedProducts || 0), percentage: (((processedProducts || 0) / (totalProducts || 1)) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); if (isCancelled) return { processedProducts: processedProducts || 0, processedOrders: processedOrders || 0, processedPurchaseOrders: 0, success: false }; await connection.query(` INSERT INTO temp_revenue_ranks SELECT pid, total_revenue, @rank := @rank + 1 as rank_num, @total_count := @rank as total_count FROM ( SELECT pid, total_revenue FROM product_metrics WHERE total_revenue > 0 ORDER BY total_revenue DESC ) ranked, (SELECT @rank := 0) r `); // Get total count for percentage calculation const [rankingCount] = await connection.query('SELECT MAX(rank_num) as total_count FROM temp_revenue_ranks'); const totalCount = rankingCount[0].total_count || 1; const max_rank = totalCount; // Store max_rank for use in classification outputProgress({ status: 'running', operation: 'Updating ABC classifications', current: processedProducts || 0, total: totalProducts || 0, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, processedProducts || 0, totalProducts || 0), rate: calculateRate(startTime, processedProducts || 0), percentage: (((processedProducts || 0) / (totalProducts || 1)) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); if (isCancelled) return { processedProducts: processedProducts || 0, processedOrders: processedOrders || 0, processedPurchaseOrders: 0, success: false }; // ABC classification progress tracking let abcProcessedCount = 0; const batchSize = 5000; let lastProgressUpdate = Date.now(); const progressUpdateInterval = 1000; // Update every second while (true) { if (isCancelled) return { processedProducts: Number(processedProducts) || 0, processedOrders: Number(processedOrders) || 0, processedPurchaseOrders: 0, success: false }; // First get a batch of PIDs that need updating const [pids] = await connection.query(` SELECT pm.pid FROM product_metrics pm LEFT JOIN temp_revenue_ranks tr ON pm.pid = tr.pid WHERE pm.abc_class IS NULL OR pm.abc_class != CASE WHEN tr.rank_num IS NULL THEN 'C' WHEN (tr.rank_num / ?) * 100 <= ? THEN 'A' WHEN (tr.rank_num / ?) * 100 <= ? THEN 'B' ELSE 'C' END LIMIT ? `, [max_rank, abcThresholds.a_threshold, max_rank, abcThresholds.b_threshold, batchSize]); if (pids.length === 0) { break; } // Then update just those PIDs const [result] = await connection.query(` UPDATE product_metrics pm LEFT JOIN temp_revenue_ranks tr ON pm.pid = tr.pid SET pm.abc_class = CASE WHEN tr.rank_num IS NULL THEN 'C' WHEN (tr.rank_num / ?) * 100 <= ? THEN 'A' WHEN (tr.rank_num / ?) * 100 <= ? THEN 'B' ELSE 'C' END, pm.last_calculated_at = NOW() WHERE pm.pid IN (?) `, [max_rank, abcThresholds.a_threshold, max_rank, abcThresholds.b_threshold, pids.map(row => row.pid)]); abcProcessedCount += result.affectedRows; // Calculate progress ensuring valid numbers const currentProgress = Math.floor(totalProducts * (0.99 + (abcProcessedCount / (totalCount || 1)) * 0.01)); processedProducts = Number(currentProgress) || processedProducts || 0; // Only update progress at most once per second const now = Date.now(); if (now - lastProgressUpdate >= progressUpdateInterval) { const progress = ensureValidProgress(processedProducts, totalProducts); outputProgress({ status: 'running', operation: 'ABC classification progress', current: progress.current, total: progress.total, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, progress.current, progress.total), rate: calculateRate(startTime, progress.current), percentage: progress.percentage, timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); lastProgressUpdate = now; } // Update database progress await updateProgress(processedProducts, processedOrders, processedPurchaseOrders); // Small delay between batches to allow other transactions await new Promise(resolve => setTimeout(resolve, 100)); } // Clean up await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_revenue_ranks'); const endTime = Date.now(); const totalElapsedSeconds = Math.round((endTime - startTime) / 1000); // Update calculate_status for ABC classification await connection.query(` INSERT INTO calculate_status (module_name, last_calculation_timestamp) VALUES ('abc_classification', NOW()) ON DUPLICATE KEY UPDATE last_calculation_timestamp = NOW() `); // Final progress update with guaranteed valid numbers const finalProgress = ensureValidProgress(totalProducts, totalProducts); // Final success message outputProgress({ status: 'complete', operation: 'Metrics calculation complete', current: finalProgress.current, total: finalProgress.total, elapsed: formatElapsedTime(startTime), remaining: '0s', rate: calculateRate(startTime, finalProgress.current), percentage: '100', timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: totalElapsedSeconds } }); // Ensure all values are valid numbers before final update const finalStats = { processedProducts: Number(processedProducts) || 0, processedOrders: Number(processedOrders) || 0, processedPurchaseOrders: Number(processedPurchaseOrders) || 0 }; // Update history with completion await connection.query(` UPDATE calculate_history SET end_time = NOW(), duration_seconds = ?, processed_products = ?, processed_orders = ?, processed_purchase_orders = ?, status = 'completed' WHERE id = ? `, [totalElapsedSeconds, finalStats.processedProducts, finalStats.processedOrders, finalStats.processedPurchaseOrders, calculateHistoryId]); // Clear progress file on successful completion global.clearProgress(); } catch (error) { const endTime = Date.now(); const totalElapsedSeconds = Math.round((endTime - startTime) / 1000); // Update history with error await connection.query(` UPDATE calculate_history SET end_time = NOW(), duration_seconds = ?, processed_products = ?, processed_orders = ?, processed_purchase_orders = ?, status = ?, error_message = ? WHERE id = ? `, [ totalElapsedSeconds, processedProducts || 0, // Ensure we have a valid number processedOrders || 0, // Ensure we have a valid number processedPurchaseOrders || 0, // Ensure we have a valid number isCancelled ? 'cancelled' : 'failed', error.message, calculateHistoryId ]); if (isCancelled) { global.outputProgress({ status: 'cancelled', operation: 'Calculation cancelled', current: processedProducts, total: totalProducts || 0, elapsed: global.formatElapsedTime(startTime), remaining: null, rate: global.calculateRate(startTime, processedProducts), percentage: ((processedProducts / (totalProducts || 1)) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); } else { global.outputProgress({ status: 'error', operation: 'Error: ' + error.message, current: processedProducts, total: totalProducts || 0, elapsed: global.formatElapsedTime(startTime), remaining: null, rate: global.calculateRate(startTime, processedProducts), percentage: ((processedProducts / (totalProducts || 1)) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); } throw error; } finally { if (connection) { // Ensure temporary tables are cleaned up await cleanupTemporaryTables(connection); connection.release(); } // Close the connection pool when we're done await closePool(); } } catch (error) { success = false; logError(error, 'Error in metrics calculation'); throw error; } } // Export as a module with all necessary functions module.exports = { calculateMetrics, cancelCalculation, getProgress: global.getProgress }; // Run directly if called from command line if (require.main === module) { calculateMetrics().catch(error => { if (!error.message.includes('Operation cancelled')) { console.error('Error:', error); } process.exit(1); }); }