Change calculate metrics script to only record one entry in database per run
This commit is contained in:
@@ -156,6 +156,7 @@ let currentStep = ''; // Track which step is running for cancellation message
|
|||||||
let overallStartTime = null;
|
let overallStartTime = null;
|
||||||
let mainTimeoutHandle = null;
|
let mainTimeoutHandle = null;
|
||||||
let stepTimeoutHandle = null;
|
let stepTimeoutHandle = null;
|
||||||
|
let combinedHistoryId = null; // ID for the combined history record
|
||||||
|
|
||||||
async function cancelCalculation(reason = 'cancelled by user') {
|
async function cancelCalculation(reason = 'cancelled by user') {
|
||||||
if (isCancelled) return; // Prevent multiple cancellations
|
if (isCancelled) return; // Prevent multiple cancellations
|
||||||
@@ -181,6 +182,22 @@ async function cancelCalculation(reason = 'cancelled by user') {
|
|||||||
AND pid <> pg_backend_pid(); -- Don't cancel self
|
AND pid <> pg_backend_pid(); -- Don't cancel self
|
||||||
`);
|
`);
|
||||||
console.log(`Sent ${result.rowCount} cancellation signal(s).`);
|
console.log(`Sent ${result.rowCount} cancellation signal(s).`);
|
||||||
|
|
||||||
|
// Update the combined history record to show cancellation
|
||||||
|
if (combinedHistoryId) {
|
||||||
|
const totalDuration = Math.round((Date.now() - overallStartTime) / 1000);
|
||||||
|
await conn.query(`
|
||||||
|
UPDATE calculate_history
|
||||||
|
SET
|
||||||
|
status = 'cancelled'::calculation_status,
|
||||||
|
end_time = NOW(),
|
||||||
|
duration_seconds = $1::integer,
|
||||||
|
error_message = $2::text
|
||||||
|
WHERE id = $3::integer;
|
||||||
|
`, [totalDuration, `Calculation ${reason} during step: ${currentStep}`, combinedHistoryId]);
|
||||||
|
console.log(`Updated combined history record ${combinedHistoryId} with cancellation status`);
|
||||||
|
}
|
||||||
|
|
||||||
conn.release();
|
conn.release();
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.error('Error during database query cancellation:', err.message);
|
console.error('Error during database query cancellation:', err.message);
|
||||||
@@ -349,7 +366,6 @@ async function executeSqlStep(config, progress) {
|
|||||||
console.log(`\n--- Starting Step: ${config.name} ---`);
|
console.log(`\n--- Starting Step: ${config.name} ---`);
|
||||||
const stepStartTime = Date.now();
|
const stepStartTime = Date.now();
|
||||||
let connection = null;
|
let connection = null;
|
||||||
let calculateHistoryId = null;
|
|
||||||
|
|
||||||
// Set timeout for this specific step
|
// Set timeout for this specific step
|
||||||
if (stepTimeoutHandle) clearTimeout(stepTimeoutHandle); // Clear previous step's timeout
|
if (stepTimeoutHandle) clearTimeout(stepTimeoutHandle); // Clear previous step's timeout
|
||||||
@@ -383,10 +399,7 @@ async function executeSqlStep(config, progress) {
|
|||||||
connection = await getConnection();
|
connection = await getConnection();
|
||||||
console.log("Database connection acquired.");
|
console.log("Database connection acquired.");
|
||||||
|
|
||||||
// 3. Clean up Previous Runs & Create History Record (within a transaction)
|
// 3. Ensure calculate_status table exists
|
||||||
await connection.query('BEGIN');
|
|
||||||
|
|
||||||
// Ensure calculate_status table exists
|
|
||||||
await connection.query(`
|
await connection.query(`
|
||||||
CREATE TABLE IF NOT EXISTS calculate_status (
|
CREATE TABLE IF NOT EXISTS calculate_status (
|
||||||
module_name TEXT PRIMARY KEY,
|
module_name TEXT PRIMARY KEY,
|
||||||
@@ -394,57 +407,6 @@ async function executeSqlStep(config, progress) {
|
|||||||
);
|
);
|
||||||
`);
|
`);
|
||||||
|
|
||||||
// Ensure calculate_history table exists (basic structure)
|
|
||||||
await connection.query(`
|
|
||||||
CREATE TABLE IF NOT EXISTS calculate_history (
|
|
||||||
id SERIAL PRIMARY KEY,
|
|
||||||
start_time TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
|
|
||||||
end_time TIMESTAMP WITH TIME ZONE,
|
|
||||||
duration_seconds INTEGER,
|
|
||||||
status TEXT, -- Will be altered to enum if needed below
|
|
||||||
error_message TEXT,
|
|
||||||
additional_info JSONB
|
|
||||||
);
|
|
||||||
`);
|
|
||||||
|
|
||||||
// Ensure the calculation_status enum type exists if needed
|
|
||||||
await connection.query(`
|
|
||||||
DO $$
|
|
||||||
BEGIN
|
|
||||||
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'calculation_status') THEN
|
|
||||||
CREATE TYPE calculation_status AS ENUM ('running', 'completed', 'failed', 'cancelled');
|
|
||||||
|
|
||||||
-- If needed, alter the existing table to use the enum
|
|
||||||
ALTER TABLE calculate_history
|
|
||||||
ALTER COLUMN status TYPE calculation_status
|
|
||||||
USING status::calculation_status;
|
|
||||||
END IF;
|
|
||||||
END
|
|
||||||
$$;
|
|
||||||
`);
|
|
||||||
|
|
||||||
// Mark previous runs of this type as cancelled
|
|
||||||
await connection.query(`
|
|
||||||
UPDATE calculate_history
|
|
||||||
SET
|
|
||||||
status = 'cancelled'::calculation_status,
|
|
||||||
end_time = NOW(),
|
|
||||||
duration_seconds = EXTRACT(EPOCH FROM (NOW() - start_time))::INTEGER,
|
|
||||||
error_message = 'Previous calculation was not completed properly or was superseded.'
|
|
||||||
WHERE status = 'running'::calculation_status AND additional_info->>'type' = $1::text;
|
|
||||||
`, [config.historyType]);
|
|
||||||
|
|
||||||
// Create history record for this run
|
|
||||||
const historyResult = await connection.query(`
|
|
||||||
INSERT INTO calculate_history (status, additional_info)
|
|
||||||
VALUES ('running'::calculation_status, jsonb_build_object('type', $1::text, 'sql_file', $2::text))
|
|
||||||
RETURNING id;
|
|
||||||
`, [config.historyType, config.sqlFile]);
|
|
||||||
calculateHistoryId = historyResult.rows[0].id;
|
|
||||||
|
|
||||||
await connection.query('COMMIT');
|
|
||||||
console.log(`Created history record ID: ${calculateHistoryId}`);
|
|
||||||
|
|
||||||
// 4. Initial Progress Update
|
// 4. Initial Progress Update
|
||||||
progress.outputProgress({
|
progress.outputProgress({
|
||||||
status: 'running',
|
status: 'running',
|
||||||
@@ -502,9 +464,7 @@ async function executeSqlStep(config, progress) {
|
|||||||
|
|
||||||
console.log(`SQL execution finished for ${config.name}.`);
|
console.log(`SQL execution finished for ${config.name}.`);
|
||||||
|
|
||||||
// 6. Update Status & History (within a transaction)
|
// 6. Update Status table only
|
||||||
await connection.query('BEGIN');
|
|
||||||
|
|
||||||
await connection.query(`
|
await connection.query(`
|
||||||
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
|
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
|
||||||
VALUES ($1::text, NOW())
|
VALUES ($1::text, NOW())
|
||||||
@@ -513,16 +473,6 @@ async function executeSqlStep(config, progress) {
|
|||||||
`, [config.statusModule]);
|
`, [config.statusModule]);
|
||||||
|
|
||||||
const stepDuration = Math.round((Date.now() - stepStartTime) / 1000);
|
const stepDuration = Math.round((Date.now() - stepStartTime) / 1000);
|
||||||
await connection.query(`
|
|
||||||
UPDATE calculate_history
|
|
||||||
SET
|
|
||||||
end_time = NOW(),
|
|
||||||
duration_seconds = $1::integer,
|
|
||||||
status = 'completed'::calculation_status
|
|
||||||
WHERE id = $2::integer;
|
|
||||||
`, [stepDuration, calculateHistoryId]);
|
|
||||||
|
|
||||||
await connection.query('COMMIT');
|
|
||||||
|
|
||||||
// 7. Final Progress Update for Step
|
// 7. Final Progress Update for Step
|
||||||
progress.outputProgress({
|
progress.outputProgress({
|
||||||
@@ -556,31 +506,6 @@ async function executeSqlStep(config, progress) {
|
|||||||
console.error(error); // Log the full error
|
console.error(error); // Log the full error
|
||||||
console.error(`------------------------------------`);
|
console.error(`------------------------------------`);
|
||||||
|
|
||||||
// Update history with error/cancellation status
|
|
||||||
if (connection && calculateHistoryId) {
|
|
||||||
try {
|
|
||||||
// Use a separate transaction for error logging
|
|
||||||
await connection.query('ROLLBACK'); // Rollback any partial transaction from try block
|
|
||||||
await connection.query('BEGIN');
|
|
||||||
await connection.query(`
|
|
||||||
UPDATE calculate_history
|
|
||||||
SET
|
|
||||||
end_time = NOW(),
|
|
||||||
duration_seconds = $1::integer,
|
|
||||||
status = $2::calculation_status,
|
|
||||||
error_message = $3::text
|
|
||||||
WHERE id = $4::integer;
|
|
||||||
`, [errorDuration, finalStatus, errorMessage.substring(0, 1000), calculateHistoryId]); // Limit error message size
|
|
||||||
await connection.query('COMMIT');
|
|
||||||
console.log(`Updated history record ID ${calculateHistoryId} with status: ${finalStatus}`);
|
|
||||||
} catch (historyError) {
|
|
||||||
console.error("FATAL: Failed to update history record on error:", historyError);
|
|
||||||
// Cannot rollback here if already rolled back or commit failed
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
console.warn("Could not update history record on error (no connection or history ID).");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update progress file with error/cancellation
|
// Update progress file with error/cancellation
|
||||||
progress.outputProgress({
|
progress.outputProgress({
|
||||||
status: finalStatus,
|
status: finalStatus,
|
||||||
@@ -672,9 +597,80 @@ async function runAllCalculations() {
|
|||||||
}
|
}
|
||||||
];
|
];
|
||||||
|
|
||||||
|
// Build a list of steps we will actually run
|
||||||
|
const stepsToRun = steps.filter(step => step.run);
|
||||||
|
const stepNames = stepsToRun.map(step => step.name);
|
||||||
|
const sqlFiles = stepsToRun.map(step => step.sqlFile);
|
||||||
|
|
||||||
let overallSuccess = true;
|
let overallSuccess = true;
|
||||||
|
let connection = null;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
// Create a single history record before starting all calculations
|
||||||
|
try {
|
||||||
|
connection = await getConnection();
|
||||||
|
|
||||||
|
// Ensure calculate_history table exists (basic structure)
|
||||||
|
await connection.query(`
|
||||||
|
CREATE TABLE IF NOT EXISTS calculate_history (
|
||||||
|
id SERIAL PRIMARY KEY,
|
||||||
|
start_time TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
|
||||||
|
end_time TIMESTAMP WITH TIME ZONE,
|
||||||
|
duration_seconds INTEGER,
|
||||||
|
status TEXT, -- Will be altered to enum if needed below
|
||||||
|
error_message TEXT,
|
||||||
|
additional_info JSONB
|
||||||
|
);
|
||||||
|
`);
|
||||||
|
|
||||||
|
// Ensure the calculation_status enum type exists if needed
|
||||||
|
await connection.query(`
|
||||||
|
DO $$
|
||||||
|
BEGIN
|
||||||
|
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'calculation_status') THEN
|
||||||
|
CREATE TYPE calculation_status AS ENUM ('running', 'completed', 'failed', 'cancelled');
|
||||||
|
|
||||||
|
-- If needed, alter the existing table to use the enum
|
||||||
|
ALTER TABLE calculate_history
|
||||||
|
ALTER COLUMN status TYPE calculation_status
|
||||||
|
USING status::calculation_status;
|
||||||
|
END IF;
|
||||||
|
END
|
||||||
|
$$;
|
||||||
|
`);
|
||||||
|
|
||||||
|
// Mark any previous running combined calculations as cancelled
|
||||||
|
await connection.query(`
|
||||||
|
UPDATE calculate_history
|
||||||
|
SET
|
||||||
|
status = 'cancelled'::calculation_status,
|
||||||
|
end_time = NOW(),
|
||||||
|
duration_seconds = EXTRACT(EPOCH FROM (NOW() - start_time))::INTEGER,
|
||||||
|
error_message = 'Previous calculation was not completed properly or was superseded.'
|
||||||
|
WHERE status = 'running'::calculation_status AND additional_info->>'type' = 'combined_metrics';
|
||||||
|
`);
|
||||||
|
|
||||||
|
// Create a single history record for this run
|
||||||
|
const historyResult = await connection.query(`
|
||||||
|
INSERT INTO calculate_history (status, additional_info)
|
||||||
|
VALUES ('running'::calculation_status, jsonb_build_object(
|
||||||
|
'type', 'combined_metrics',
|
||||||
|
'steps', $1::jsonb,
|
||||||
|
'sql_files', $2::jsonb
|
||||||
|
))
|
||||||
|
RETURNING id;
|
||||||
|
`, [JSON.stringify(stepNames), JSON.stringify(sqlFiles)]);
|
||||||
|
|
||||||
|
combinedHistoryId = historyResult.rows[0].id;
|
||||||
|
console.log(`Created combined history record ID: ${combinedHistoryId}`);
|
||||||
|
|
||||||
|
connection.release();
|
||||||
|
} catch (historyError) {
|
||||||
|
console.error('Error creating combined history record:', historyError);
|
||||||
|
if (connection) connection.release();
|
||||||
|
// Continue without history tracking if it fails
|
||||||
|
}
|
||||||
|
|
||||||
// First, sync the settings_product table to ensure all products have entries
|
// First, sync the settings_product table to ensure all products have entries
|
||||||
progressUtils.outputProgress({
|
progressUtils.outputProgress({
|
||||||
operation: 'Starting metrics calculation',
|
operation: 'Starting metrics calculation',
|
||||||
@@ -694,6 +690,9 @@ async function runAllCalculations() {
|
|||||||
// Don't fail the entire process if settings sync fails
|
// Don't fail the entire process if settings sync fails
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Track completed steps
|
||||||
|
const completedSteps = [];
|
||||||
|
|
||||||
// Now run the calculation steps
|
// Now run the calculation steps
|
||||||
for (const step of steps) {
|
for (const step of steps) {
|
||||||
if (step.run) {
|
if (step.run) {
|
||||||
@@ -702,8 +701,17 @@ async function runAllCalculations() {
|
|||||||
overallSuccess = false; // Mark as not fully successful if steps are skipped due to cancel
|
overallSuccess = false; // Mark as not fully successful if steps are skipped due to cancel
|
||||||
continue; // Skip to next step
|
continue; // Skip to next step
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pass the progress utilities to the step executor
|
// Pass the progress utilities to the step executor
|
||||||
await executeSqlStep(step, progressUtils);
|
const result = await executeSqlStep(step, progressUtils);
|
||||||
|
|
||||||
|
if (result.success) {
|
||||||
|
completedSteps.push({
|
||||||
|
name: step.name,
|
||||||
|
duration: result.duration,
|
||||||
|
status: 'completed'
|
||||||
|
});
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
console.log(`Skipping step "${step.name}" (disabled by configuration).`);
|
console.log(`Skipping step "${step.name}" (disabled by configuration).`);
|
||||||
}
|
}
|
||||||
@@ -712,6 +720,34 @@ async function runAllCalculations() {
|
|||||||
// If we finished naturally (no errors thrown out)
|
// If we finished naturally (no errors thrown out)
|
||||||
clearTimeout(mainTimeoutHandle); // Clear the main timeout
|
clearTimeout(mainTimeoutHandle); // Clear the main timeout
|
||||||
|
|
||||||
|
// Update the combined history record on successful completion
|
||||||
|
if (combinedHistoryId) {
|
||||||
|
try {
|
||||||
|
connection = await getConnection();
|
||||||
|
const totalDuration = Math.round((Date.now() - overallStartTime) / 1000);
|
||||||
|
|
||||||
|
await connection.query(`
|
||||||
|
UPDATE calculate_history
|
||||||
|
SET
|
||||||
|
end_time = NOW(),
|
||||||
|
duration_seconds = $1::integer,
|
||||||
|
status = $2::calculation_status,
|
||||||
|
additional_info = additional_info || jsonb_build_object('completed_steps', $3::jsonb)
|
||||||
|
WHERE id = $4::integer;
|
||||||
|
`, [
|
||||||
|
totalDuration,
|
||||||
|
isCancelled ? 'cancelled' : 'completed',
|
||||||
|
JSON.stringify(completedSteps),
|
||||||
|
combinedHistoryId
|
||||||
|
]);
|
||||||
|
|
||||||
|
connection.release();
|
||||||
|
} catch (historyError) {
|
||||||
|
console.error('Error updating combined history record on completion:', historyError);
|
||||||
|
if (connection) connection.release();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (isCancelled) {
|
if (isCancelled) {
|
||||||
console.log("\n--- Calculation finished with cancellation ---");
|
console.log("\n--- Calculation finished with cancellation ---");
|
||||||
overallSuccess = false;
|
overallSuccess = false;
|
||||||
@@ -725,8 +761,34 @@ async function runAllCalculations() {
|
|||||||
console.error("\n--- SCRIPT EXECUTION FAILED ---");
|
console.error("\n--- SCRIPT EXECUTION FAILED ---");
|
||||||
// Error details were already logged by executeSqlStep or global handlers
|
// Error details were already logged by executeSqlStep or global handlers
|
||||||
overallSuccess = false;
|
overallSuccess = false;
|
||||||
// Don't re-log the error here unless adding context
|
|
||||||
// console.error("Overall failure reason:", error.message);
|
// Update the combined history record on error
|
||||||
|
if (combinedHistoryId) {
|
||||||
|
try {
|
||||||
|
connection = await getConnection();
|
||||||
|
const totalDuration = Math.round((Date.now() - overallStartTime) / 1000);
|
||||||
|
|
||||||
|
await connection.query(`
|
||||||
|
UPDATE calculate_history
|
||||||
|
SET
|
||||||
|
end_time = NOW(),
|
||||||
|
duration_seconds = $1::integer,
|
||||||
|
status = $2::calculation_status,
|
||||||
|
error_message = $3::text
|
||||||
|
WHERE id = $4::integer;
|
||||||
|
`, [
|
||||||
|
totalDuration,
|
||||||
|
isCancelled ? 'cancelled' : 'failed',
|
||||||
|
error.message.substring(0, 1000),
|
||||||
|
combinedHistoryId
|
||||||
|
]);
|
||||||
|
|
||||||
|
connection.release();
|
||||||
|
} catch (historyError) {
|
||||||
|
console.error('Error updating combined history record on error:', historyError);
|
||||||
|
if (connection) connection.release();
|
||||||
|
}
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
await closePool();
|
await closePool();
|
||||||
console.log(`Total execution time: ${progressUtils.formatElapsedTime(overallStartTime)}`);
|
console.log(`Total execution time: ${progressUtils.formatElapsedTime(overallStartTime)}`);
|
||||||
|
|||||||
Reference in New Issue
Block a user