const path = require('path'); const fs = require('fs'); // Change working directory to script directory process.chdir(path.dirname(__filename)); // Load environment variables require('dotenv').config({ path: path.resolve(__dirname, '../..', '.env') }); // Add error handler for uncaught exceptions process.on('uncaughtException', (error) => { console.error('Uncaught Exception:', error); process.exit(1); }); // Add error handler for unhandled promise rejections process.on('unhandledRejection', (reason, promise) => { console.error('Unhandled Rejection at:', promise, 'reason:', reason); process.exit(1); }); // Load progress module const progress = require('./utils/progress'); // Store progress functions in global scope to ensure availability global.formatElapsedTime = progress.formatElapsedTime; global.estimateRemaining = progress.estimateRemaining; global.calculateRate = progress.calculateRate; global.outputProgress = progress.outputProgress; global.clearProgress = progress.clearProgress; global.getProgress = progress.getProgress; global.logError = progress.logError; // Load database module const { getConnection, closePool } = require('./utils/db'); // Add cancel handler let isCancelled = false; function cancelCalculation() { isCancelled = true; console.log('Calculation has been cancelled by user'); // Force-terminate any query that's been running for more than 5 seconds try { const connection = getConnection(); connection.then(async (conn) => { try { // Identify and terminate long-running queries from our application await conn.query(` SELECT pg_cancel_backend(pid) FROM pg_stat_activity WHERE query_start < now() - interval '5 seconds' AND application_name LIKE '%node%' AND query NOT LIKE '%pg_cancel_backend%' `); // Release connection conn.release(); } catch (err) { console.error('Error during force cancellation:', err); conn.release(); } }).catch(err => { console.error('Could not get connection for cancellation:', err); }); } catch (err) { console.error('Failed to terminate running queries:', err); } return { success: true, message: 'Calculation has been cancelled' }; } // Handle SIGTERM signal for cancellation process.on('SIGTERM', cancelCalculation); async function updateProductMetrics() { let connection; const startTime = Date.now(); let calculateHistoryId; // Set a maximum execution time (30 minutes) const MAX_EXECUTION_TIME = 30 * 60 * 1000; const timeout = setTimeout(() => { console.error(`Calculation timed out after ${MAX_EXECUTION_TIME/1000} seconds, forcing termination`); // Call cancel and force exit cancelCalculation(); process.exit(1); }, MAX_EXECUTION_TIME); try { // Read the SQL file const sqlFilePath = path.resolve(__dirname, 'update_product_metrics.sql'); const sqlQuery = fs.readFileSync(sqlFilePath, 'utf8'); // Clean up any previously running calculations connection = await getConnection(); // Ensure the calculate_status table exists and has the correct structure await connection.query(` CREATE TABLE IF NOT EXISTS calculate_status ( module_name TEXT PRIMARY KEY, last_calculation_timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP ) `); await connection.query(` UPDATE calculate_history SET status = 'cancelled', end_time = NOW(), duration_seconds = EXTRACT(EPOCH FROM (NOW() - start_time))::INTEGER, error_message = 'Previous calculation was not completed properly' WHERE status = 'running' AND additional_info->>'type' = 'product_metrics' `); // Create history record for this calculation const historyResult = await connection.query(` INSERT INTO calculate_history ( start_time, status, additional_info ) VALUES ( NOW(), 'running', jsonb_build_object( 'type', 'product_metrics', 'sql_file', 'update_product_metrics.sql' ) ) RETURNING id `); calculateHistoryId = historyResult.rows[0].id; // Initialize progress global.outputProgress({ status: 'running', operation: 'Starting product metrics calculation', current: 0, total: 100, elapsed: '0s', remaining: 'Calculating...', rate: 0, percentage: '0', timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); // Execute the SQL query global.outputProgress({ status: 'running', operation: 'Executing product metrics SQL query', current: 25, total: 100, elapsed: global.formatElapsedTime(startTime), remaining: 'Calculating...', rate: 0, percentage: '25', timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); await connection.query(sqlQuery); // Update calculate_status table await connection.query(` INSERT INTO calculate_status (module_name, last_calculation_timestamp) VALUES ($1, $2) ON CONFLICT (module_name) DO UPDATE SET last_calculation_timestamp = EXCLUDED.last_calculation_timestamp `, ['product_metrics', new Date()]); // Update progress to 100% global.outputProgress({ status: 'complete', operation: 'Product metrics calculation complete', current: 100, total: 100, elapsed: global.formatElapsedTime(startTime), remaining: '0s', rate: 0, percentage: '100', timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); // Update history with completion await connection.query(` UPDATE calculate_history SET end_time = NOW(), duration_seconds = $1, status = 'completed' WHERE id = $2 `, [Math.round((Date.now() - startTime) / 1000), calculateHistoryId]); // Clear progress file on successful completion global.clearProgress(); return { success: true, message: 'Product metrics calculation completed successfully', duration: Math.round((Date.now() - startTime) / 1000) }; } catch (error) { const endTime = Date.now(); const totalElapsedSeconds = Math.round((endTime - startTime) / 1000); // Update history with error if (connection && calculateHistoryId) { await connection.query(` UPDATE calculate_history SET end_time = NOW(), duration_seconds = $1, status = $2, error_message = $3 WHERE id = $4 `, [ totalElapsedSeconds, isCancelled ? 'cancelled' : 'failed', error.message, calculateHistoryId ]); } if (isCancelled) { global.outputProgress({ status: 'cancelled', operation: 'Calculation cancelled', current: 50, total: 100, elapsed: global.formatElapsedTime(startTime), remaining: null, rate: 0, percentage: '50', timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); } else { global.outputProgress({ status: 'error', operation: 'Error: ' + error.message, current: 50, total: 100, elapsed: global.formatElapsedTime(startTime), remaining: null, rate: 0, percentage: '50', timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); } throw error; } finally { // Clear the timeout to prevent forced termination clearTimeout(timeout); // Always release connection if (connection) { try { connection.release(); } catch (err) { console.error('Error in final cleanup:', err); } } } } // Export as a module with all necessary functions module.exports = { updateProductMetrics, cancelCalculation, getProgress: global.getProgress }; // Run directly if called from command line if (require.main === module) { updateProductMetrics().then(() => { closePool().then(() => { process.exit(0); }); }).catch(error => { console.error('Error:', error); closePool().then(() => { process.exit(1); }); }); }