const path = require('path'); // Change working directory to script directory process.chdir(path.dirname(__filename)); require('dotenv').config({ path: path.resolve(__dirname, '..', '.env') }); // Configuration flags for controlling which metrics to calculate // Set to 1 to skip the corresponding calculation, 0 to run it const SKIP_PRODUCT_METRICS = 0; const SKIP_TIME_AGGREGATES = 0; const SKIP_FINANCIAL_METRICS = 0; const SKIP_VENDOR_METRICS = 0; const SKIP_CATEGORY_METRICS = 0; const SKIP_BRAND_METRICS = 0; const SKIP_SALES_FORECASTS = 0; // Add error handler for uncaught exceptions process.on('uncaughtException', (error) => { console.error('Uncaught Exception:', error); process.exit(1); }); // Add error handler for unhandled promise rejections process.on('unhandledRejection', (reason, promise) => { console.error('Unhandled Rejection at:', promise, 'reason:', reason); process.exit(1); }); const progress = require('./metrics/utils/progress'); console.log('Progress module loaded:', { modulePath: require.resolve('./metrics/utils/progress'), exports: Object.keys(progress), currentDir: process.cwd(), scriptDir: __dirname }); // Store progress functions in global scope to ensure availability global.formatElapsedTime = progress.formatElapsedTime; global.estimateRemaining = progress.estimateRemaining; global.calculateRate = progress.calculateRate; global.outputProgress = progress.outputProgress; global.clearProgress = progress.clearProgress; global.getProgress = progress.getProgress; global.logError = progress.logError; // List of temporary tables used in the calculation process const TEMP_TABLES = [ 'temp_revenue_ranks', 'temp_sales_metrics', 'temp_purchase_metrics', 'temp_product_metrics', 'temp_vendor_metrics', 'temp_category_metrics', 'temp_brand_metrics', 'temp_forecast_dates', 'temp_daily_sales', 'temp_product_stats', 'temp_category_sales', 'temp_category_stats' ]; // Add cleanup function for temporary tables async function cleanupTemporaryTables(connection) { // List of possible temporary tables that might exist const tempTables = [ 'temp_sales_metrics', 'temp_purchase_metrics', 'temp_forecast_dates', 'temp_daily_sales', 'temp_product_stats', 'temp_category_sales', 'temp_category_stats' ]; try { // Drop each temporary table if it exists for (const table of tempTables) { await connection.query(`DROP TABLE IF EXISTS ${table}`); } } catch (err) { console.error('Error cleaning up temporary tables:', err); } } const { getConnection, closePool } = require('./metrics/utils/db'); const calculateProductMetrics = require('./metrics/product-metrics'); const calculateTimeAggregates = require('./metrics/time-aggregates'); const calculateFinancialMetrics = require('./metrics/financial-metrics'); const calculateVendorMetrics = require('./metrics/vendor-metrics'); const calculateCategoryMetrics = require('./metrics/category-metrics'); const calculateBrandMetrics = require('./metrics/brand-metrics'); const calculateSalesForecasts = require('./metrics/sales-forecasts'); // Add cancel handler let isCancelled = false; function cancelCalculation() { isCancelled = true; console.log('Calculation has been cancelled by user'); // Force-terminate any query that's been running for more than 5 seconds try { const connection = getConnection(); connection.then(async (conn) => { try { // Identify and terminate long-running queries from our application await conn.query(` SELECT pg_cancel_backend(pid) FROM pg_stat_activity WHERE query_start < now() - interval '5 seconds' AND application_name LIKE '%node%' AND query NOT LIKE '%pg_cancel_backend%' `); // Clean up any temporary tables await cleanupTemporaryTables(conn); // Release connection conn.release(); } catch (err) { console.error('Error during force cancellation:', err); conn.release(); } }).catch(err => { console.error('Could not get connection for cancellation:', err); }); } catch (err) { console.error('Failed to terminate running queries:', err); } return { success: true, message: 'Calculation has been cancelled' }; } // Handle SIGTERM signal for cancellation process.on('SIGTERM', cancelCalculation); // Update the main calculation function to use the new modular structure async function calculateMetrics() { let connection; const startTime = Date.now(); let processedProducts = 0; let processedOrders = 0; let processedPurchaseOrders = 0; let totalProducts = 0; let totalOrders = 0; let totalPurchaseOrders = 0; let calculateHistoryId; // Set a maximum execution time (30 minutes) const MAX_EXECUTION_TIME = 30 * 60 * 1000; const timeout = setTimeout(() => { console.error(`Calculation timed out after ${MAX_EXECUTION_TIME/1000} seconds, forcing termination`); // Call cancel and force exit cancelCalculation(); process.exit(1); }, MAX_EXECUTION_TIME); try { // Clean up any previously running calculations connection = await getConnection(); await connection.query(` UPDATE calculate_history SET status = 'cancelled', end_time = NOW(), duration_seconds = EXTRACT(EPOCH FROM (NOW() - start_time))::INTEGER, error_message = 'Previous calculation was not completed properly' WHERE status = 'running' `); // Get counts from all relevant tables const [productCountResult, orderCountResult, poCountResult] = await Promise.all([ connection.query('SELECT COUNT(*) as total FROM products'), connection.query('SELECT COUNT(*) as total FROM orders'), connection.query('SELECT COUNT(*) as total FROM purchase_orders') ]); totalProducts = parseInt(productCountResult.rows[0].total); totalOrders = parseInt(orderCountResult.rows[0].total); totalPurchaseOrders = parseInt(poCountResult.rows[0].total); // Create history record for this calculation const historyResult = await connection.query(` INSERT INTO calculate_history ( start_time, status, total_products, total_orders, total_purchase_orders, additional_info ) VALUES ( NOW(), 'running', $1, $2, $3, jsonb_build_object( 'skip_product_metrics', ($4::int > 0), 'skip_time_aggregates', ($5::int > 0), 'skip_financial_metrics', ($6::int > 0), 'skip_vendor_metrics', ($7::int > 0), 'skip_category_metrics', ($8::int > 0), 'skip_brand_metrics', ($9::int > 0), 'skip_sales_forecasts', ($10::int > 0) ) ) RETURNING id `, [ totalProducts, totalOrders, totalPurchaseOrders, SKIP_PRODUCT_METRICS, SKIP_TIME_AGGREGATES, SKIP_FINANCIAL_METRICS, SKIP_VENDOR_METRICS, SKIP_CATEGORY_METRICS, SKIP_BRAND_METRICS, SKIP_SALES_FORECASTS ]); calculateHistoryId = historyResult.rows[0].id; // Add debug logging for the progress functions console.log('Debug - Progress functions:', { formatElapsedTime: typeof global.formatElapsedTime, estimateRemaining: typeof global.estimateRemaining, calculateRate: typeof global.calculateRate, startTime: startTime }); try { const elapsed = global.formatElapsedTime(startTime); console.log('Debug - formatElapsedTime test successful:', elapsed); } catch (err) { console.error('Debug - Error testing formatElapsedTime:', err); throw err; } // Release the connection before getting a new one connection.release(); isCancelled = false; connection = await getConnection(); try { global.outputProgress({ status: 'running', operation: 'Starting metrics calculation', current: 0, total: 100, elapsed: '0s', remaining: 'Calculating...', rate: 0, percentage: '0', timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); // Update progress periodically const updateProgress = async (products = null, orders = null, purchaseOrders = null) => { // Ensure all values are valid numbers or default to previous value if (products !== null) processedProducts = Number(products) || processedProducts || 0; if (orders !== null) processedOrders = Number(orders) || processedOrders || 0; if (purchaseOrders !== null) processedPurchaseOrders = Number(purchaseOrders) || processedPurchaseOrders || 0; // Ensure we never send NaN to the database const safeProducts = Number(processedProducts) || 0; const safeOrders = Number(processedOrders) || 0; const safePurchaseOrders = Number(processedPurchaseOrders) || 0; await connection.query(` UPDATE calculate_history SET processed_products = $1, processed_orders = $2, processed_purchase_orders = $3 WHERE id = $4 `, [safeProducts, safeOrders, safePurchaseOrders, calculateHistoryId]); }; // Helper function to ensure valid progress numbers const ensureValidProgress = (current, total) => ({ current: Number(current) || 0, total: Number(total) || 1, // Default to 1 to avoid division by zero percentage: (((Number(current) || 0) / (Number(total) || 1)) * 100).toFixed(1) }); // Initial progress const initialProgress = ensureValidProgress(0, totalProducts); global.outputProgress({ status: 'running', operation: 'Starting metrics calculation', current: initialProgress.current, total: initialProgress.total, elapsed: '0s', remaining: 'Calculating...', rate: 0, percentage: initialProgress.percentage, timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); if (!SKIP_PRODUCT_METRICS) { const result = await calculateProductMetrics(startTime, totalProducts); await updateProgress(result.processedProducts, result.processedOrders, result.processedPurchaseOrders); if (!result.success) { throw new Error('Product metrics calculation failed'); } } else { console.log('Skipping product metrics calculation...'); processedProducts = Math.floor(totalProducts * 0.6); await updateProgress(processedProducts); global.outputProgress({ status: 'running', operation: 'Skipping product metrics calculation', current: processedProducts, total: totalProducts, elapsed: global.formatElapsedTime(startTime), remaining: global.estimateRemaining(startTime, processedProducts, totalProducts), rate: global.calculateRate(startTime, processedProducts), percentage: '60', timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); } // Calculate time-based aggregates if (!SKIP_TIME_AGGREGATES) { const result = await calculateTimeAggregates(startTime, totalProducts, processedProducts); await updateProgress(result.processedProducts, result.processedOrders, result.processedPurchaseOrders); if (!result.success) { throw new Error('Time aggregates calculation failed'); } } else { console.log('Skipping time aggregates calculation'); } // Calculate financial metrics if (!SKIP_FINANCIAL_METRICS) { const result = await calculateFinancialMetrics(startTime, totalProducts, processedProducts); await updateProgress(result.processedProducts, result.processedOrders, result.processedPurchaseOrders); if (!result.success) { throw new Error('Financial metrics calculation failed'); } } else { console.log('Skipping financial metrics calculation'); } // Calculate vendor metrics if (!SKIP_VENDOR_METRICS) { const result = await calculateVendorMetrics(startTime, totalProducts, processedProducts); await updateProgress(result.processedProducts, result.processedOrders, result.processedPurchaseOrders); if (!result.success) { throw new Error('Vendor metrics calculation failed'); } } else { console.log('Skipping vendor metrics calculation'); } // Calculate category metrics if (!SKIP_CATEGORY_METRICS) { const result = await calculateCategoryMetrics(startTime, totalProducts, processedProducts); await updateProgress(result.processedProducts, result.processedOrders, result.processedPurchaseOrders); if (!result.success) { throw new Error('Category metrics calculation failed'); } } else { console.log('Skipping category metrics calculation'); } // Calculate brand metrics if (!SKIP_BRAND_METRICS) { const result = await calculateBrandMetrics(startTime, totalProducts, processedProducts); await updateProgress(result.processedProducts, result.processedOrders, result.processedPurchaseOrders); if (!result.success) { throw new Error('Brand metrics calculation failed'); } } else { console.log('Skipping brand metrics calculation'); } // Calculate sales forecasts if (!SKIP_SALES_FORECASTS) { const result = await calculateSalesForecasts(startTime, totalProducts, processedProducts); await updateProgress(result.processedProducts, result.processedOrders, result.processedPurchaseOrders); if (!result.success) { throw new Error('Sales forecasts calculation failed'); } } else { console.log('Skipping sales forecasts calculation'); } // Final progress update with guaranteed valid numbers const finalProgress = ensureValidProgress(totalProducts, totalProducts); // Final success message outputProgress({ status: 'complete', operation: 'Metrics calculation complete', current: finalProgress.current, total: finalProgress.total, elapsed: global.formatElapsedTime(startTime), remaining: '0s', rate: global.calculateRate(startTime, finalProgress.current), percentage: '100', timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); // Ensure all values are valid numbers before final update const finalStats = { processedProducts: Number(processedProducts) || 0, processedOrders: Number(processedOrders) || 0, processedPurchaseOrders: Number(processedPurchaseOrders) || 0 }; // Update history with completion await connection.query(` UPDATE calculate_history SET end_time = NOW(), duration_seconds = $1, processed_products = $2, processed_orders = $3, processed_purchase_orders = $4, status = 'completed' WHERE id = $5 `, [Math.round((Date.now() - startTime) / 1000), finalStats.processedProducts, finalStats.processedOrders, finalStats.processedPurchaseOrders, calculateHistoryId]); // Clear progress file on successful completion global.clearProgress(); return { success: true, message: 'Calculation completed successfully', duration: Math.round((Date.now() - startTime) / 1000) }; } catch (error) { const endTime = Date.now(); const totalElapsedSeconds = Math.round((endTime - startTime) / 1000); // Update history with error await connection.query(` UPDATE calculate_history SET end_time = NOW(), duration_seconds = $1, processed_products = $2, processed_orders = $3, processed_purchase_orders = $4, status = $5, error_message = $6 WHERE id = $7 `, [ totalElapsedSeconds, processedProducts || 0, // Ensure we have a valid number processedOrders || 0, // Ensure we have a valid number processedPurchaseOrders || 0, // Ensure we have a valid number isCancelled ? 'cancelled' : 'failed', error.message, calculateHistoryId ]); if (isCancelled) { global.outputProgress({ status: 'cancelled', operation: 'Calculation cancelled', current: processedProducts, total: totalProducts || 0, elapsed: global.formatElapsedTime(startTime), remaining: null, rate: global.calculateRate(startTime, processedProducts), percentage: ((processedProducts / (totalProducts || 1)) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); } else { global.outputProgress({ status: 'error', operation: 'Error: ' + error.message, current: processedProducts, total: totalProducts || 0, elapsed: global.formatElapsedTime(startTime), remaining: null, rate: global.calculateRate(startTime, processedProducts), percentage: ((processedProducts / (totalProducts || 1)) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); } throw error; } finally { // Clear the timeout to prevent forced termination clearTimeout(timeout); // Always clean up and release connection if (connection) { try { await cleanupTemporaryTables(connection); connection.release(); } catch (err) { console.error('Error in final cleanup:', err); } } } } catch (error) { console.error('Error in metrics calculation', error); try { if (connection) { await connection.query(` UPDATE calculate_history SET status = 'error', end_time = NOW(), duration_seconds = EXTRACT(EPOCH FROM (NOW() - start_time))::INTEGER, error_message = $1 WHERE id = $2 `, [error.message.substring(0, 500), calculateHistoryId]); } } catch (updateError) { console.error('Error updating calculation history:', updateError); } throw error; } } // Export as a module with all necessary functions module.exports = { calculateMetrics, cancelCalculation, getProgress: global.getProgress }; // Run directly if called from command line if (require.main === module) { calculateMetrics().catch(error => { if (!error.message.includes('Operation cancelled')) { console.error('Error:', error); } process.exit(1); }); }