908 lines
38 KiB
JavaScript
908 lines
38 KiB
JavaScript
// run-all-updates.js
|
|
const path = require('path');
|
|
const fs = require('fs');
|
|
const { Pool } = require('pg'); // Assuming you use 'pg'
|
|
|
|
// --- Configuration ---
|
|
// Toggle these constants to enable/disable specific steps for testing
|
|
const RUN_DAILY_SNAPSHOTS = true;
|
|
const RUN_PRODUCT_METRICS = true;
|
|
const RUN_PERIODIC_METRICS = true;
|
|
const RUN_BRAND_METRICS = true;
|
|
const RUN_VENDOR_METRICS = true;
|
|
const RUN_CATEGORY_METRICS = true;
|
|
|
|
// Maximum execution time for the entire sequence (e.g., 90 minutes)
|
|
const MAX_EXECUTION_TIME_TOTAL = 90 * 60 * 1000;
|
|
// Maximum execution time per individual SQL step (e.g., 30 minutes)
|
|
const MAX_EXECUTION_TIME_PER_STEP = 30 * 60 * 1000;
|
|
// Query cancellation timeout
|
|
const CANCEL_QUERY_AFTER_SECONDS = 5;
|
|
// --- End Configuration ---
|
|
|
|
// Change working directory to script directory
|
|
process.chdir(path.dirname(__filename));
|
|
|
|
// Log script path for debugging
|
|
console.log('Script running from:', __dirname);
|
|
|
|
// Try to load environment variables from multiple locations
|
|
const envPaths = [
|
|
path.resolve(__dirname, '../..', '.env'), // Two levels up (inventory/.env)
|
|
path.resolve(__dirname, '..', '.env'), // One level up (inventory-server/.env)
|
|
path.resolve(__dirname, '.env'), // Same directory
|
|
'/var/www/html/inventory/.env' // Server absolute path
|
|
];
|
|
|
|
let envLoaded = false;
|
|
for (const envPath of envPaths) {
|
|
if (fs.existsSync(envPath)) {
|
|
console.log(`Loading environment from: ${envPath}`);
|
|
require('dotenv').config({ path: envPath });
|
|
envLoaded = true;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (!envLoaded) {
|
|
console.warn('WARNING: Could not find .env file in any of the expected locations.');
|
|
console.warn('Checked paths:', envPaths);
|
|
}
|
|
|
|
// --- Database Setup ---
|
|
// Make sure we have the required DB credentials
|
|
if (!process.env.DB_HOST && !process.env.DATABASE_URL) {
|
|
console.error('WARNING: Neither DB_HOST nor DATABASE_URL environment variables found');
|
|
}
|
|
|
|
// Only validate individual parameters if not using connection string
|
|
if (!process.env.DATABASE_URL) {
|
|
if (!process.env.DB_USER) console.error('WARNING: DB_USER environment variable is missing');
|
|
if (!process.env.DB_NAME) console.error('WARNING: DB_NAME environment variable is missing');
|
|
|
|
// Password must be a string for PostgreSQL SCRAM authentication
|
|
if (!process.env.DB_PASSWORD || typeof process.env.DB_PASSWORD !== 'string') {
|
|
console.error('WARNING: DB_PASSWORD environment variable is missing or not a string');
|
|
}
|
|
}
|
|
|
|
// Configure database connection to match individual scripts
|
|
let dbConfig;
|
|
|
|
// Check if a DATABASE_URL exists (common in production environments)
|
|
if (process.env.DATABASE_URL && typeof process.env.DATABASE_URL === 'string') {
|
|
console.log('Using DATABASE_URL for connection');
|
|
dbConfig = {
|
|
connectionString: process.env.DATABASE_URL,
|
|
ssl: process.env.DB_SSL === 'true' ? { rejectUnauthorized: false } : false,
|
|
// Add performance optimizations
|
|
max: 10, // connection pool max size
|
|
idleTimeoutMillis: 30000,
|
|
connectionTimeoutMillis: 60000,
|
|
// Set timeouts for long-running queries
|
|
statement_timeout: 1800000, // 30 minutes
|
|
query_timeout: 1800000 // 30 minutes
|
|
};
|
|
} else {
|
|
// Use individual connection parameters
|
|
dbConfig = {
|
|
host: process.env.DB_HOST,
|
|
user: process.env.DB_USER,
|
|
password: process.env.DB_PASSWORD,
|
|
database: process.env.DB_NAME,
|
|
port: process.env.DB_PORT || 5432,
|
|
ssl: process.env.DB_SSL === 'true',
|
|
// Add performance optimizations
|
|
max: 10, // connection pool max size
|
|
idleTimeoutMillis: 30000,
|
|
connectionTimeoutMillis: 60000,
|
|
// Set timeouts for long-running queries
|
|
statement_timeout: 1800000, // 30 minutes
|
|
query_timeout: 1800000 // 30 minutes
|
|
};
|
|
}
|
|
|
|
// Try to load from utils DB module as a last resort
|
|
try {
|
|
if (!process.env.DB_HOST && !process.env.DATABASE_URL) {
|
|
console.log('Attempting to load DB config from individual script modules...');
|
|
const dbModule = require('./metrics-new/utils/db');
|
|
if (dbModule && dbModule.dbConfig) {
|
|
console.log('Found DB config in individual script module');
|
|
dbConfig = {
|
|
...dbModule.dbConfig,
|
|
// Add performance optimizations if not present
|
|
max: dbModule.dbConfig.max || 10,
|
|
idleTimeoutMillis: dbModule.dbConfig.idleTimeoutMillis || 30000,
|
|
connectionTimeoutMillis: dbModule.dbConfig.connectionTimeoutMillis || 60000,
|
|
statement_timeout: 1800000,
|
|
query_timeout: 1800000
|
|
};
|
|
}
|
|
}
|
|
} catch (err) {
|
|
console.warn('Could not load DB config from individual script modules:', err.message);
|
|
}
|
|
|
|
// Debug log connection info (without password)
|
|
console.log('DB Connection Info:', {
|
|
connectionString: dbConfig.connectionString ? 'PROVIDED' : undefined,
|
|
host: dbConfig.host,
|
|
user: dbConfig.user,
|
|
database: dbConfig.database,
|
|
port: dbConfig.port,
|
|
ssl: dbConfig.ssl ? 'ENABLED' : 'DISABLED',
|
|
password: (dbConfig.password || dbConfig.connectionString) ? '****' : 'MISSING' // Only show if credentials exist
|
|
});
|
|
|
|
const pool = new Pool(dbConfig);
|
|
|
|
const getConnection = () => {
|
|
return pool.connect();
|
|
};
|
|
|
|
const closePool = () => {
|
|
console.log("Closing database connection pool.");
|
|
return pool.end();
|
|
};
|
|
|
|
// --- Progress Utilities ---
|
|
// Using functions directly instead of globals
|
|
const progressUtils = require('./metrics-new/utils/progress'); // Assuming utils/progress.js exports these
|
|
|
|
// --- State & Cancellation ---
|
|
let isCancelled = false;
|
|
let currentStep = ''; // Track which step is running for cancellation message
|
|
let overallStartTime = null;
|
|
let mainTimeoutHandle = null;
|
|
let stepTimeoutHandle = null;
|
|
let combinedHistoryId = null; // ID for the combined history record
|
|
|
|
async function cancelCalculation(reason = 'cancelled by user') {
|
|
if (isCancelled) return; // Prevent multiple cancellations
|
|
isCancelled = true;
|
|
console.log(`Calculation ${reason}. Attempting to cancel active step: ${currentStep}`);
|
|
|
|
// Clear timeouts
|
|
if (mainTimeoutHandle) clearTimeout(mainTimeoutHandle);
|
|
if (stepTimeoutHandle) clearTimeout(stepTimeoutHandle);
|
|
|
|
// Attempt to cancel the long-running query in Postgres
|
|
let conn = null;
|
|
try {
|
|
console.log(`Attempting to cancel queries running longer than ${CANCEL_QUERY_AFTER_SECONDS} seconds...`);
|
|
conn = await getConnection();
|
|
const result = await conn.query(`
|
|
SELECT pg_cancel_backend(pid)
|
|
FROM pg_stat_activity
|
|
WHERE query_start < now() - interval '${CANCEL_QUERY_AFTER_SECONDS} seconds'
|
|
AND application_name = 'node-metrics-calculator' -- Match specific app name
|
|
AND state = 'active' -- Only cancel active queries
|
|
AND query NOT LIKE '%pg_cancel_backend%'
|
|
AND pid <> pg_backend_pid(); -- Don't cancel self
|
|
`);
|
|
console.log(`Sent ${result.rowCount} cancellation signal(s).`);
|
|
|
|
// Update the combined history record to show cancellation
|
|
if (combinedHistoryId) {
|
|
const totalDuration = Math.round((Date.now() - overallStartTime) / 1000);
|
|
await conn.query(`
|
|
UPDATE calculate_history
|
|
SET
|
|
status = 'cancelled'::calculation_status,
|
|
end_time = NOW(),
|
|
duration_seconds = $1::integer,
|
|
error_message = $2::text
|
|
WHERE id = $3::integer;
|
|
`, [totalDuration, `Calculation ${reason} during step: ${currentStep}`, combinedHistoryId]);
|
|
console.log(`Updated combined history record ${combinedHistoryId} with cancellation status`);
|
|
}
|
|
|
|
conn.release();
|
|
} catch (err) {
|
|
console.error('Error during database query cancellation:', err.message);
|
|
if (conn) {
|
|
try { conn.release(); } catch (e) { console.error("Error releasing cancellation connection", e); }
|
|
}
|
|
// Proceed with script termination attempt even if DB cancel fails
|
|
} finally {
|
|
// Update progress to show cancellation
|
|
progressUtils.outputProgress({
|
|
status: 'cancelled',
|
|
operation: `Calculation ${reason} during step: ${currentStep}`,
|
|
current: 0, // Reset progress indicators
|
|
total: 100,
|
|
elapsed: overallStartTime ? progressUtils.formatElapsedTime(overallStartTime) : 'N/A',
|
|
remaining: null,
|
|
rate: 0,
|
|
percentage: '0', // Or keep last known percentage?
|
|
timing: {
|
|
start_time: overallStartTime ? new Date(overallStartTime).toISOString() : 'N/A',
|
|
end_time: new Date().toISOString(),
|
|
elapsed_seconds: overallStartTime ? Math.round((Date.now() - overallStartTime) / 1000) : 0
|
|
}
|
|
});
|
|
}
|
|
|
|
// Note: We don't force exit here anymore. We let the main function's error
|
|
// handling catch the cancellation error thrown by executeSqlStep or the timeout.
|
|
return {
|
|
success: true, // Indicates cancellation was initiated
|
|
message: `Calculation ${reason}`
|
|
};
|
|
}
|
|
|
|
// Handle SIGINT (Ctrl+C) and SIGTERM (kill) signals
|
|
process.on('SIGINT', () => {
|
|
console.log('\nReceived SIGINT (Ctrl+C).');
|
|
cancelCalculation('cancelled by user (SIGINT)');
|
|
// Give cancellation a moment to propagate before force-exiting if needed
|
|
setTimeout(() => process.exit(1), 2000);
|
|
});
|
|
process.on('SIGTERM', () => {
|
|
console.log('Received SIGTERM.');
|
|
cancelCalculation('cancelled by system (SIGTERM)');
|
|
// Give cancellation a moment to propagate before force-exiting if needed
|
|
setTimeout(() => process.exit(1), 2000);
|
|
});
|
|
|
|
// Add error handlers for uncaught exceptions/rejections
|
|
process.on('uncaughtException', (error) => {
|
|
console.error('Uncaught Exception:', error);
|
|
// Attempt graceful shutdown/logging if possible, then exit
|
|
cancelCalculation('failed due to uncaught exception').finally(() => {
|
|
closePool().finally(() => process.exit(1));
|
|
});
|
|
});
|
|
|
|
process.on('unhandledRejection', (reason, promise) => {
|
|
console.error('Unhandled Rejection at:', promise, 'reason:', reason);
|
|
// Attempt graceful shutdown/logging if possible, then exit
|
|
cancelCalculation('failed due to unhandled rejection').finally(() => {
|
|
closePool().finally(() => process.exit(1));
|
|
});
|
|
});
|
|
|
|
|
|
// --- Core Logic ---
|
|
|
|
/**
|
|
* Ensures all products have entries in the settings_product table
|
|
* This is important after importing new products
|
|
*/
|
|
async function syncSettingsProductTable() {
|
|
let conn = null;
|
|
try {
|
|
currentStep = 'Syncing settings_product table';
|
|
progressUtils.outputProgress({
|
|
operation: 'Syncing product settings',
|
|
message: 'Ensuring all products have settings entries'
|
|
});
|
|
|
|
conn = await getConnection();
|
|
|
|
// Get counts before sync
|
|
const beforeCounts = await conn.query(`
|
|
SELECT
|
|
(SELECT COUNT(*) FROM products) AS products_count,
|
|
(SELECT COUNT(*) FROM settings_product) AS settings_count
|
|
`);
|
|
|
|
const productsCount = parseInt(beforeCounts.rows[0].products_count);
|
|
const settingsCount = parseInt(beforeCounts.rows[0].settings_count);
|
|
|
|
progressUtils.outputProgress({
|
|
operation: 'Settings product sync',
|
|
message: `Found ${productsCount} products and ${settingsCount} settings entries`
|
|
});
|
|
|
|
// Insert missing product settings
|
|
const result = await conn.query(`
|
|
INSERT INTO settings_product (
|
|
pid,
|
|
lead_time_days,
|
|
days_of_stock,
|
|
safety_stock,
|
|
forecast_method,
|
|
exclude_from_forecast
|
|
)
|
|
SELECT
|
|
p.pid,
|
|
CAST(NULL AS INTEGER),
|
|
CAST(NULL AS INTEGER),
|
|
COALESCE((SELECT setting_value::int FROM settings_global WHERE setting_key = 'default_safety_stock_units'), 0),
|
|
CAST(NULL AS VARCHAR),
|
|
FALSE
|
|
FROM
|
|
public.products p
|
|
WHERE
|
|
NOT EXISTS (
|
|
SELECT 1 FROM settings_product sp WHERE sp.pid = p.pid
|
|
)
|
|
ON CONFLICT (pid) DO NOTHING
|
|
`);
|
|
|
|
// Get counts after sync
|
|
const afterCounts = await conn.query(`
|
|
SELECT COUNT(*) AS settings_count FROM settings_product
|
|
`);
|
|
|
|
const newSettingsCount = parseInt(afterCounts.rows[0].settings_count);
|
|
const addedCount = newSettingsCount - settingsCount;
|
|
|
|
progressUtils.outputProgress({
|
|
operation: 'Settings product sync',
|
|
message: `Added ${addedCount} new settings entries. Now have ${newSettingsCount} total entries.`,
|
|
status: 'complete'
|
|
});
|
|
|
|
conn.release();
|
|
return addedCount;
|
|
} catch (err) {
|
|
progressUtils.outputProgress({
|
|
status: 'error',
|
|
operation: 'Settings product sync failed',
|
|
error: err.message
|
|
});
|
|
if (conn) conn.release();
|
|
throw err;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Executes a single SQL calculation step.
|
|
* @param {object} config - Configuration for the step.
|
|
* @param {string} config.name - User-friendly name of the step.
|
|
* @param {string} config.sqlFile - Path to the SQL file.
|
|
* @param {string} config.historyType - Type identifier for calculate_history.
|
|
* @param {string} config.statusModule - Module name for calculate_status.
|
|
* @param {object} progress - Progress utility functions.
|
|
* @returns {Promise<{success: boolean, message: string, duration: number, rowsAffected: number}>}
|
|
*/
|
|
async function executeSqlStep(config, progress) {
|
|
if (isCancelled) throw new Error(`Calculation skipped step ${config.name} due to prior cancellation.`);
|
|
|
|
currentStep = config.name; // Update global state
|
|
console.log(`\n--- Starting Step: ${config.name} ---`);
|
|
const stepStartTime = Date.now();
|
|
let connection = null;
|
|
let rowsAffected = 0; // Track rows affected by this step
|
|
|
|
// Set timeout for this specific step
|
|
if (stepTimeoutHandle) clearTimeout(stepTimeoutHandle); // Clear previous step's timeout
|
|
stepTimeoutHandle = setTimeout(() => {
|
|
// Don't exit directly, throw an error to be caught by the main loop
|
|
const timeoutError = new Error(`Step "${config.name}" timed out after ${MAX_EXECUTION_TIME_PER_STEP / 1000} seconds.`);
|
|
cancelCalculation(`timed out during step: ${config.name}`); // Initiate cancellation process
|
|
// The error will likely be thrown before cancelCalculation fully completes,
|
|
// but cancelCalculation attempts to stop the query.
|
|
// The main catch block will handle cleanup.
|
|
}, MAX_EXECUTION_TIME_PER_STEP);
|
|
|
|
|
|
try {
|
|
// 1. Read SQL File
|
|
const sqlFilePath = path.resolve(__dirname, config.sqlFile);
|
|
if (!fs.existsSync(sqlFilePath)) {
|
|
throw new Error(`SQL file not found: ${sqlFilePath}`);
|
|
}
|
|
const sqlQuery = fs.readFileSync(sqlFilePath, 'utf8');
|
|
console.log(`Read SQL file: ${config.sqlFile}`);
|
|
|
|
// Check for potential parameter references that might cause issues
|
|
const parameterMatches = sqlQuery.match(/\$\d+(?!\:\:)/g);
|
|
if (parameterMatches && parameterMatches.length > 0) {
|
|
console.warn(`WARNING: Found ${parameterMatches.length} untyped parameters in SQL: ${parameterMatches.slice(0, 5).join(', ')}${parameterMatches.length > 5 ? '...' : ''}`);
|
|
console.warn('These might cause "could not determine data type of parameter" errors.');
|
|
}
|
|
|
|
// 2. Get Database Connection
|
|
connection = await getConnection();
|
|
console.log("Database connection acquired.");
|
|
|
|
// 3. Ensure calculate_status table exists
|
|
await connection.query(`
|
|
CREATE TABLE IF NOT EXISTS calculate_status (
|
|
module_name TEXT PRIMARY KEY,
|
|
last_calculation_timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
|
|
);
|
|
`);
|
|
|
|
// 4. Initial Progress Update
|
|
progress.outputProgress({
|
|
status: 'running',
|
|
operation: `Starting: ${config.name}`,
|
|
current: 0, total: 100,
|
|
elapsed: progress.formatElapsedTime(stepStartTime),
|
|
remaining: 'Calculating...', rate: 0, percentage: '0',
|
|
timing: {
|
|
start_time: new Date(stepStartTime).toISOString(),
|
|
step_start_ms: stepStartTime
|
|
}
|
|
});
|
|
|
|
// 5. Execute the Main SQL Query
|
|
progress.outputProgress({
|
|
status: 'running',
|
|
operation: `Executing SQL: ${config.name}`,
|
|
current: 25, total: 100,
|
|
elapsed: progress.formatElapsedTime(stepStartTime),
|
|
remaining: 'Executing query...', rate: 0, percentage: '25',
|
|
timing: {
|
|
start_time: new Date(stepStartTime).toISOString(),
|
|
step_start_ms: stepStartTime
|
|
}
|
|
});
|
|
console.log(`Executing SQL for ${config.name}...`);
|
|
|
|
try {
|
|
// Try executing exactly as individual scripts do
|
|
const result = await connection.query(sqlQuery);
|
|
|
|
// Try to extract row count from result
|
|
if (result && result.rowCount !== undefined) {
|
|
rowsAffected = result.rowCount;
|
|
} else if (Array.isArray(result) && result[0] && result[0].rowCount !== undefined) {
|
|
rowsAffected = result[0].rowCount;
|
|
}
|
|
|
|
// Check if the query returned a result set with row count info
|
|
if (result && result.rows && result.rows.length > 0 && result.rows[0].rows_processed) {
|
|
rowsAffected = parseInt(result.rows[0].rows_processed) || rowsAffected;
|
|
console.log(`SQL returned metrics: ${JSON.stringify(result.rows[0])}`);
|
|
} else if (Array.isArray(result) && result[0] && result[0].rows && result[0].rows[0] && result[0].rows[0].rows_processed) {
|
|
rowsAffected = parseInt(result[0].rows[0].rows_processed) || rowsAffected;
|
|
console.log(`SQL returned metrics: ${JSON.stringify(result[0].rows[0])}`);
|
|
}
|
|
|
|
console.log(`SQL affected ${rowsAffected} rows`);
|
|
} catch (sqlError) {
|
|
if (sqlError.message.includes('could not determine data type of parameter')) {
|
|
console.log('Simple query failed with parameter type error, trying alternative method...');
|
|
try {
|
|
// Execute with explicit text mode to avoid parameter confusion
|
|
await connection.query({
|
|
text: sqlQuery,
|
|
rowMode: 'text'
|
|
});
|
|
} catch (altError) {
|
|
console.error('Alternative execution method also failed:', altError.message);
|
|
throw altError; // Re-throw the alternative error
|
|
}
|
|
} else {
|
|
console.error('SQL Execution Error:', sqlError.message);
|
|
if (sqlError.position) {
|
|
// If the error has a position, try to show the relevant part of the SQL query
|
|
const position = parseInt(sqlError.position, 10);
|
|
const startPos = Math.max(0, position - 100);
|
|
const endPos = Math.min(sqlQuery.length, position + 100);
|
|
console.error('SQL Error Context:');
|
|
console.error('...' + sqlQuery.substring(startPos, position) + ' [ERROR HERE] ' + sqlQuery.substring(position, endPos) + '...');
|
|
}
|
|
throw sqlError; // Re-throw to be caught by the main try/catch
|
|
}
|
|
}
|
|
|
|
// Check for cancellation immediately after query finishes
|
|
if (isCancelled) throw new Error(`Calculation cancelled during SQL execution for ${config.name}`);
|
|
|
|
console.log(`SQL execution finished for ${config.name}.`);
|
|
|
|
// 6. Update Status table only
|
|
await connection.query(`
|
|
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
|
|
VALUES ($1::text, NOW())
|
|
ON CONFLICT (module_name) DO UPDATE
|
|
SET last_calculation_timestamp = EXCLUDED.last_calculation_timestamp;
|
|
`, [config.statusModule]);
|
|
|
|
const stepDuration = Math.round((Date.now() - stepStartTime) / 1000);
|
|
|
|
// 7. Final Progress Update for Step
|
|
progress.outputProgress({
|
|
status: 'complete',
|
|
operation: `Completed: ${config.name}`,
|
|
current: 100, total: 100,
|
|
elapsed: progress.formatElapsedTime(stepStartTime),
|
|
remaining: '0s', rate: 0, percentage: '100',
|
|
timing: {
|
|
start_time: new Date(stepStartTime).toISOString(),
|
|
end_time: new Date().toISOString(),
|
|
elapsed_seconds: stepDuration
|
|
}
|
|
});
|
|
console.log(`--- Finished Step: ${config.name} (Duration: ${progress.formatElapsedTime(stepStartTime)}) ---`);
|
|
|
|
return {
|
|
success: true,
|
|
message: `${config.name} completed successfully`,
|
|
duration: stepDuration,
|
|
rowsAffected: rowsAffected
|
|
};
|
|
|
|
} catch (error) {
|
|
clearTimeout(stepTimeoutHandle); // Clear timeout on error
|
|
const errorEndTime = Date.now();
|
|
const errorDuration = Math.round((errorEndTime - stepStartTime) / 1000);
|
|
const finalStatus = isCancelled ? 'cancelled' : 'failed';
|
|
const errorMessage = error.message || 'Unknown error';
|
|
|
|
console.error(`--- ERROR in Step: ${config.name} ---`);
|
|
console.error(error); // Log the full error
|
|
console.error(`------------------------------------`);
|
|
|
|
// Update progress file with error/cancellation
|
|
progress.outputProgress({
|
|
status: finalStatus,
|
|
operation: `Error in ${config.name}: ${errorMessage.split('\n')[0]}`, // Show first line of error
|
|
current: 50, total: 100, // Indicate partial completion
|
|
elapsed: progress.formatElapsedTime(stepStartTime),
|
|
remaining: null, rate: 0, percentage: '50',
|
|
timing: {
|
|
start_time: new Date(stepStartTime).toISOString(),
|
|
end_time: new Date(errorEndTime).toISOString(),
|
|
elapsed_seconds: errorDuration
|
|
}
|
|
});
|
|
|
|
// Rethrow the error to be caught by the main runCalculations function
|
|
throw error; // Add context if needed: new Error(`Step ${config.name} failed: ${errorMessage}`)
|
|
|
|
} finally {
|
|
clearTimeout(stepTimeoutHandle); // Ensure timeout is cleared
|
|
currentStep = ''; // Reset current step
|
|
if (connection) {
|
|
try {
|
|
await connection.release();
|
|
console.log("Database connection released.");
|
|
} catch (releaseError) {
|
|
console.error("Error releasing database connection:", releaseError);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Main function to run all calculation steps sequentially.
|
|
*/
|
|
async function runAllCalculations() {
|
|
overallStartTime = Date.now();
|
|
isCancelled = false; // Reset cancellation flag at start
|
|
|
|
// Overall timeout for the entire script
|
|
mainTimeoutHandle = setTimeout(() => {
|
|
console.error(`--- OVERALL TIMEOUT REACHED (${MAX_EXECUTION_TIME_TOTAL / 1000}s) ---`);
|
|
cancelCalculation(`overall timeout reached`);
|
|
// The process should exit via the unhandled rejection/exception handlers
|
|
// or the SIGTERM/SIGINT handlers after cancellation attempt.
|
|
}, MAX_EXECUTION_TIME_TOTAL);
|
|
|
|
const steps = [
|
|
{
|
|
run: RUN_DAILY_SNAPSHOTS,
|
|
name: 'Daily Snapshots Update',
|
|
sqlFile: 'metrics-new/update_daily_snapshots.sql',
|
|
historyType: 'daily_snapshots',
|
|
statusModule: 'daily_snapshots'
|
|
},
|
|
{
|
|
run: RUN_PRODUCT_METRICS,
|
|
name: 'Product Metrics Update',
|
|
sqlFile: 'metrics-new/update_product_metrics.sql', // ASSUMING the initial population is now part of a regular update
|
|
historyType: 'product_metrics',
|
|
statusModule: 'product_metrics'
|
|
},
|
|
{
|
|
run: RUN_PERIODIC_METRICS,
|
|
name: 'Periodic Metrics Update',
|
|
sqlFile: 'metrics-new/update_periodic_metrics.sql',
|
|
historyType: 'periodic_metrics',
|
|
statusModule: 'periodic_metrics'
|
|
},
|
|
{
|
|
run: RUN_BRAND_METRICS,
|
|
name: 'Brand Metrics Update',
|
|
sqlFile: 'metrics-new/calculate_brand_metrics.sql',
|
|
historyType: 'brand_metrics',
|
|
statusModule: 'brand_metrics'
|
|
},
|
|
{
|
|
run: RUN_VENDOR_METRICS,
|
|
name: 'Vendor Metrics Update',
|
|
sqlFile: 'metrics-new/calculate_vendor_metrics.sql',
|
|
historyType: 'vendor_metrics',
|
|
statusModule: 'vendor_metrics'
|
|
},
|
|
{
|
|
run: RUN_CATEGORY_METRICS,
|
|
name: 'Category Metrics Update',
|
|
sqlFile: 'metrics-new/calculate_category_metrics.sql',
|
|
historyType: 'category_metrics',
|
|
statusModule: 'category_metrics'
|
|
}
|
|
];
|
|
|
|
// Build a list of steps we will actually run
|
|
const stepsToRun = steps.filter(step => step.run);
|
|
const stepNames = stepsToRun.map(step => step.name);
|
|
const sqlFiles = stepsToRun.map(step => step.sqlFile);
|
|
|
|
let overallSuccess = true;
|
|
let connection = null;
|
|
|
|
try {
|
|
// Create a single history record before starting all calculations
|
|
try {
|
|
connection = await getConnection();
|
|
|
|
// Ensure calculate_history table exists (basic structure)
|
|
await connection.query(`
|
|
CREATE TABLE IF NOT EXISTS calculate_history (
|
|
id SERIAL PRIMARY KEY,
|
|
start_time TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
|
|
end_time TIMESTAMP WITH TIME ZONE,
|
|
duration_seconds INTEGER,
|
|
status TEXT, -- Will be altered to enum if needed below
|
|
error_message TEXT,
|
|
additional_info JSONB
|
|
);
|
|
`);
|
|
|
|
// Ensure the calculation_status enum type exists if needed
|
|
await connection.query(`
|
|
DO $$
|
|
BEGIN
|
|
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'calculation_status') THEN
|
|
CREATE TYPE calculation_status AS ENUM ('running', 'completed', 'failed', 'cancelled');
|
|
|
|
-- If needed, alter the existing table to use the enum
|
|
ALTER TABLE calculate_history
|
|
ALTER COLUMN status TYPE calculation_status
|
|
USING status::calculation_status;
|
|
END IF;
|
|
END
|
|
$$;
|
|
`);
|
|
|
|
// Mark any previous running combined calculations as cancelled
|
|
await connection.query(`
|
|
UPDATE calculate_history
|
|
SET
|
|
status = 'cancelled'::calculation_status,
|
|
end_time = NOW(),
|
|
duration_seconds = EXTRACT(EPOCH FROM (NOW() - start_time))::INTEGER,
|
|
error_message = 'Previous calculation was not completed properly or was superseded.'
|
|
WHERE status = 'running'::calculation_status AND additional_info->>'type' = 'combined_metrics';
|
|
`);
|
|
|
|
// Create a single history record for this run
|
|
const historyResult = await connection.query(`
|
|
INSERT INTO calculate_history (status, additional_info)
|
|
VALUES ('running'::calculation_status, jsonb_build_object(
|
|
'type', 'combined_metrics',
|
|
'steps', $1::jsonb,
|
|
'sql_files', $2::jsonb
|
|
))
|
|
RETURNING id;
|
|
`, [JSON.stringify(stepNames), JSON.stringify(sqlFiles)]);
|
|
|
|
combinedHistoryId = historyResult.rows[0].id;
|
|
console.log(`Created combined history record ID: ${combinedHistoryId}`);
|
|
|
|
// Get initial counts for tracking
|
|
const productCount = await connection.query('SELECT COUNT(*) as count FROM products');
|
|
const totalProducts = parseInt(productCount.rows[0].count);
|
|
|
|
// Update history with initial counts
|
|
await connection.query(`
|
|
UPDATE calculate_history
|
|
SET additional_info = additional_info || jsonb_build_object('total_products', $1::integer)
|
|
WHERE id = $2
|
|
`, [totalProducts, combinedHistoryId]);
|
|
|
|
connection.release();
|
|
} catch (historyError) {
|
|
console.error('Error creating combined history record:', historyError);
|
|
if (connection) connection.release();
|
|
// Continue without history tracking if it fails
|
|
}
|
|
|
|
// First, sync the settings_product table to ensure all products have entries
|
|
progressUtils.outputProgress({
|
|
operation: 'Starting metrics calculation',
|
|
message: 'Preparing product settings...'
|
|
});
|
|
|
|
try {
|
|
const addedCount = await syncSettingsProductTable();
|
|
|
|
progressUtils.outputProgress({
|
|
operation: 'Preparation complete',
|
|
message: `Added ${addedCount} missing product settings entries`,
|
|
status: 'complete'
|
|
});
|
|
} catch (syncError) {
|
|
console.error('Warning: Failed to sync product settings, continuing with metrics calculations:', syncError);
|
|
// Don't fail the entire process if settings sync fails
|
|
}
|
|
|
|
// Track completed steps
|
|
const completedSteps = [];
|
|
const stepTimings = {};
|
|
const stepRowCounts = {};
|
|
let currentStepIndex = 0;
|
|
|
|
// Now run the calculation steps
|
|
for (const step of stepsToRun) {
|
|
if (isCancelled) {
|
|
console.log(`Skipping step "${step.name}" due to cancellation.`);
|
|
overallSuccess = false; // Mark as not fully successful if steps are skipped due to cancel
|
|
continue; // Skip to next step
|
|
}
|
|
|
|
currentStepIndex++;
|
|
|
|
// Update overall progress
|
|
progressUtils.outputProgress({
|
|
status: 'running',
|
|
operation: 'Running calculations',
|
|
message: `Step ${currentStepIndex} of ${stepsToRun.length}: ${step.name}`,
|
|
current: currentStepIndex - 1,
|
|
total: stepsToRun.length,
|
|
elapsed: progressUtils.formatElapsedTime(overallStartTime),
|
|
remaining: progressUtils.estimateRemaining(overallStartTime, currentStepIndex - 1, stepsToRun.length),
|
|
percentage: Math.round(((currentStepIndex - 1) / stepsToRun.length) * 100).toString(),
|
|
timing: {
|
|
overall_start_time: new Date(overallStartTime).toISOString(),
|
|
current_step: step.name,
|
|
completed_steps: completedSteps.length
|
|
}
|
|
});
|
|
|
|
// Pass the progress utilities to the step executor
|
|
const result = await executeSqlStep(step, progressUtils);
|
|
|
|
if (result.success) {
|
|
completedSteps.push({
|
|
name: step.name,
|
|
duration: result.duration,
|
|
status: 'completed',
|
|
rowsAffected: result.rowsAffected
|
|
});
|
|
stepTimings[step.name] = result.duration;
|
|
stepRowCounts[step.name] = result.rowsAffected;
|
|
}
|
|
}
|
|
|
|
// If we finished naturally (no errors thrown out)
|
|
clearTimeout(mainTimeoutHandle); // Clear the main timeout
|
|
|
|
// Update the combined history record on successful completion
|
|
if (combinedHistoryId) {
|
|
try {
|
|
connection = await getConnection();
|
|
const totalDuration = Math.round((Date.now() - overallStartTime) / 1000);
|
|
|
|
// Get final processed counts
|
|
const processedCounts = await connection.query(`
|
|
SELECT
|
|
(SELECT COUNT(*) FROM product_metrics WHERE last_calculated >= $1) as processed_products
|
|
`, [new Date(overallStartTime)]);
|
|
|
|
await connection.query(`
|
|
UPDATE calculate_history
|
|
SET
|
|
end_time = NOW(),
|
|
duration_seconds = $1::integer,
|
|
status = $2::calculation_status,
|
|
additional_info = additional_info || jsonb_build_object(
|
|
'processed_products', $3::integer,
|
|
'completed_steps', $4::jsonb,
|
|
'step_timings', $5::jsonb,
|
|
'step_row_counts', $6::jsonb
|
|
)
|
|
WHERE id = $7::integer;
|
|
`, [
|
|
totalDuration,
|
|
isCancelled ? 'cancelled' : 'completed',
|
|
processedCounts.rows[0].processed_products,
|
|
JSON.stringify(completedSteps),
|
|
JSON.stringify(stepTimings),
|
|
JSON.stringify(stepRowCounts),
|
|
combinedHistoryId
|
|
]);
|
|
|
|
connection.release();
|
|
} catch (historyError) {
|
|
console.error('Error updating combined history record on completion:', historyError);
|
|
if (connection) connection.release();
|
|
}
|
|
}
|
|
|
|
if (isCancelled) {
|
|
console.log("\n--- Calculation finished with cancellation ---");
|
|
overallSuccess = false;
|
|
} else {
|
|
console.log("\n--- All enabled calculations finished successfully ---");
|
|
|
|
// Send final completion progress
|
|
progressUtils.outputProgress({
|
|
status: 'complete',
|
|
operation: 'All calculations completed',
|
|
message: `Successfully completed ${completedSteps.length} of ${stepsToRun.length} steps`,
|
|
current: stepsToRun.length,
|
|
total: stepsToRun.length,
|
|
elapsed: progressUtils.formatElapsedTime(overallStartTime),
|
|
remaining: '0s',
|
|
percentage: '100',
|
|
timing: {
|
|
overall_start_time: new Date(overallStartTime).toISOString(),
|
|
overall_end_time: new Date().toISOString(),
|
|
total_duration_seconds: Math.round((Date.now() - overallStartTime) / 1000),
|
|
step_timings: stepTimings,
|
|
completed_steps: completedSteps.length
|
|
}
|
|
});
|
|
|
|
progressUtils.clearProgress(); // Clear progress only on full success
|
|
}
|
|
|
|
} catch (error) {
|
|
clearTimeout(mainTimeoutHandle); // Clear the main timeout
|
|
console.error("\n--- SCRIPT EXECUTION FAILED ---");
|
|
// Error details were already logged by executeSqlStep or global handlers
|
|
overallSuccess = false;
|
|
|
|
// Update the combined history record on error
|
|
if (combinedHistoryId) {
|
|
try {
|
|
connection = await getConnection();
|
|
const totalDuration = Math.round((Date.now() - overallStartTime) / 1000);
|
|
|
|
await connection.query(`
|
|
UPDATE calculate_history
|
|
SET
|
|
end_time = NOW(),
|
|
duration_seconds = $1::integer,
|
|
status = $2::calculation_status,
|
|
error_message = $3::text
|
|
WHERE id = $4::integer;
|
|
`, [
|
|
totalDuration,
|
|
isCancelled ? 'cancelled' : 'failed',
|
|
error.message.substring(0, 1000),
|
|
combinedHistoryId
|
|
]);
|
|
|
|
connection.release();
|
|
} catch (historyError) {
|
|
console.error('Error updating combined history record on error:', historyError);
|
|
if (connection) connection.release();
|
|
}
|
|
}
|
|
} finally {
|
|
await closePool();
|
|
console.log(`Total execution time: ${progressUtils.formatElapsedTime(overallStartTime)}`);
|
|
process.exit(overallSuccess ? 0 : 1);
|
|
}
|
|
}
|
|
|
|
// --- Script Execution ---
|
|
if (require.main === module) {
|
|
runAllCalculations();
|
|
} else {
|
|
// Export functions if needed as a module (e.g., for testing or API)
|
|
module.exports = {
|
|
runAllCalculations,
|
|
cancelCalculation,
|
|
syncSettingsProductTable,
|
|
// Expose individual steps if useful, wrapping them slightly
|
|
runDailySnapshots: () => executeSqlStep({ name: 'Daily Snapshots Update', sqlFile: 'update_daily_snapshots.sql', historyType: 'daily_snapshots', statusModule: 'daily_snapshots' }, progressUtils),
|
|
runProductMetrics: () => executeSqlStep({ name: 'Product Metrics Update', sqlFile: 'update_product_metrics.sql', historyType: 'product_metrics', statusModule: 'product_metrics' }, progressUtils),
|
|
runPeriodicMetrics: () => executeSqlStep({ name: 'Periodic Metrics Update', sqlFile: 'update_periodic_metrics.sql', historyType: 'periodic_metrics', statusModule: 'periodic_metrics' }, progressUtils),
|
|
runBrandMetrics: () => executeSqlStep({ name: 'Brand Metrics Update', sqlFile: 'calculate_brand_metrics.sql', historyType: 'brand_metrics', statusModule: 'brand_metrics' }, progressUtils),
|
|
runVendorMetrics: () => executeSqlStep({ name: 'Vendor Metrics Update', sqlFile: 'calculate_vendor_metrics.sql', historyType: 'vendor_metrics', statusModule: 'vendor_metrics' }, progressUtils),
|
|
runCategoryMetrics: () => executeSqlStep({ name: 'Category Metrics Update', sqlFile: 'calculate_category_metrics.sql', historyType: 'category_metrics', statusModule: 'category_metrics' }, progressUtils),
|
|
getProgress: progressUtils.getProgress
|
|
};
|
|
} |