Finish fixing calculate scripts

This commit is contained in:
2025-03-26 14:22:08 -04:00
parent 749907bd30
commit 8e19e6cd74
7 changed files with 295 additions and 375 deletions

View File

@@ -62,13 +62,24 @@ const TEMP_TABLES = [
// Add cleanup function for temporary tables
async function cleanupTemporaryTables(connection) {
// List of possible temporary tables that might exist
const tempTables = [
'temp_sales_metrics',
'temp_purchase_metrics',
'temp_forecast_dates',
'temp_daily_sales',
'temp_product_stats',
'temp_category_sales',
'temp_category_stats'
];
try {
for (const table of TEMP_TABLES) {
// Drop each temporary table if it exists
for (const table of tempTables) {
await connection.query(`DROP TABLE IF EXISTS ${table}`);
}
} catch (error) {
logError(error, 'Error cleaning up temporary tables');
throw error; // Re-throw to be handled by the caller
} catch (err) {
console.error('Error cleaning up temporary tables:', err);
}
}
@@ -86,22 +97,42 @@ let isCancelled = false;
function cancelCalculation() {
isCancelled = true;
global.clearProgress();
// Format as SSE event
const event = {
progress: {
status: 'cancelled',
operation: 'Calculation cancelled',
current: 0,
total: 0,
elapsed: null,
remaining: null,
rate: 0,
timestamp: Date.now()
}
console.log('Calculation has been cancelled by user');
// Force-terminate any query that's been running for more than 5 seconds
try {
const connection = getConnection();
connection.then(async (conn) => {
try {
// Identify and terminate long-running queries from our application
await conn.query(`
SELECT pg_cancel_backend(pid)
FROM pg_stat_activity
WHERE query_start < now() - interval '5 seconds'
AND application_name LIKE '%node%'
AND query NOT LIKE '%pg_cancel_backend%'
`);
// Clean up any temporary tables
await cleanupTemporaryTables(conn);
// Release connection
conn.release();
} catch (err) {
console.error('Error during force cancellation:', err);
conn.release();
}
}).catch(err => {
console.error('Could not get connection for cancellation:', err);
});
} catch (err) {
console.error('Failed to terminate running queries:', err);
}
return {
success: true,
message: 'Calculation has been cancelled'
};
process.stdout.write(JSON.stringify(event) + '\n');
process.exit(0);
}
// Handle SIGTERM signal for cancellation
@@ -119,6 +150,15 @@ async function calculateMetrics() {
let totalPurchaseOrders = 0;
let calculateHistoryId;
// Set a maximum execution time (30 minutes)
const MAX_EXECUTION_TIME = 30 * 60 * 1000;
const timeout = setTimeout(() => {
console.error(`Calculation timed out after ${MAX_EXECUTION_TIME/1000} seconds, forcing termination`);
// Call cancel and force exit
cancelCalculation();
process.exit(1);
}, MAX_EXECUTION_TIME);
try {
// Clean up any previously running calculations
connection = await getConnection();
@@ -360,223 +400,6 @@ async function calculateMetrics() {
console.log('Skipping sales forecasts calculation');
}
// Calculate ABC classification
outputProgress({
status: 'running',
operation: 'Starting ABC classification',
current: processedProducts || 0,
total: totalProducts || 0,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedProducts || 0, totalProducts || 0),
rate: calculateRate(startTime, processedProducts || 0),
percentage: (((processedProducts || 0) / (totalProducts || 1)) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (isCancelled) return {
processedProducts: processedProducts || 0,
processedOrders: processedOrders || 0,
processedPurchaseOrders: 0,
success: false
};
const abcConfigResult = await connection.query('SELECT a_threshold, b_threshold FROM abc_classification_config WHERE id = 1');
const abcThresholds = abcConfigResult.rows[0] || { a_threshold: 20, b_threshold: 50 };
// First, create and populate the rankings table
await connection.query('DROP TABLE IF EXISTS temp_revenue_ranks');
await connection.query(`
CREATE TEMPORARY TABLE temp_revenue_ranks (
pid BIGINT NOT NULL,
total_revenue DECIMAL(10,3),
rank_num INT,
total_count INT,
PRIMARY KEY (pid)
)
`);
await connection.query('CREATE INDEX ON temp_revenue_ranks (rank_num)');
outputProgress({
status: 'running',
operation: 'Creating revenue rankings',
current: processedProducts || 0,
total: totalProducts || 0,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedProducts || 0, totalProducts || 0),
rate: calculateRate(startTime, processedProducts || 0),
percentage: (((processedProducts || 0) / (totalProducts || 1)) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (isCancelled) return {
processedProducts: processedProducts || 0,
processedOrders: processedOrders || 0,
processedPurchaseOrders: 0,
success: false
};
// Use window functions instead of user variables
await connection.query(`
INSERT INTO temp_revenue_ranks
WITH ranked AS (
SELECT
pid,
total_revenue,
ROW_NUMBER() OVER (ORDER BY total_revenue DESC) as rank_num,
COUNT(*) OVER () as total_count
FROM product_metrics
WHERE total_revenue > 0
)
SELECT
pid,
total_revenue,
rank_num,
total_count
FROM ranked
`);
// Get total count for percentage calculation
const rankingCountResult = await connection.query('SELECT MAX(rank_num) as total_count FROM temp_revenue_ranks');
const totalCount = parseInt(rankingCountResult.rows[0].total_count) || 1;
const max_rank = totalCount; // Store max_rank for use in classification
outputProgress({
status: 'running',
operation: 'Updating ABC classifications',
current: processedProducts || 0,
total: totalProducts || 0,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedProducts || 0, totalProducts || 0),
rate: calculateRate(startTime, processedProducts || 0),
percentage: (((processedProducts || 0) / (totalProducts || 1)) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (isCancelled) return {
processedProducts: processedProducts || 0,
processedOrders: processedOrders || 0,
processedPurchaseOrders: 0,
success: false
};
// ABC classification progress tracking
let abcProcessedCount = 0;
const batchSize = 5000;
let lastProgressUpdate = Date.now();
const progressUpdateInterval = 1000; // Update every second
while (true) {
if (isCancelled) return {
processedProducts: Number(processedProducts) || 0,
processedOrders: Number(processedOrders) || 0,
processedPurchaseOrders: 0,
success: false
};
// First get a batch of PIDs that need updating
const pidsResult = await connection.query(`
SELECT pm.pid
FROM product_metrics pm
LEFT JOIN temp_revenue_ranks tr ON pm.pid = tr.pid
WHERE pm.abc_class IS NULL
OR pm.abc_class !=
CASE
WHEN tr.rank_num IS NULL THEN 'C'
WHEN (tr.rank_num::float / $1::float) * 100 <= $2 THEN 'A'
WHEN (tr.rank_num::float / $1::float) * 100 <= $3 THEN 'B'
ELSE 'C'
END
LIMIT $4
`, [max_rank, abcThresholds.a_threshold,
abcThresholds.b_threshold,
batchSize]);
if (pidsResult.rows.length === 0) {
break;
}
// Then update just those PIDs
const pidValues = pidsResult.rows.map(row => row.pid);
const result = await connection.query(`
UPDATE product_metrics pm
SET abc_class =
CASE
WHEN tr.rank_num IS NULL THEN 'C'
WHEN (tr.rank_num::float / $1::float) * 100 <= $2 THEN 'A'
WHEN (tr.rank_num::float / $1::float) * 100 <= $3 THEN 'B'
ELSE 'C'
END,
last_calculated_at = NOW()
FROM temp_revenue_ranks tr
WHERE pm.pid = tr.pid AND pm.pid = ANY($4::bigint[])
OR (pm.pid = ANY($4::bigint[]) AND tr.pid IS NULL)
`, [max_rank, abcThresholds.a_threshold,
abcThresholds.b_threshold,
pidValues]);
abcProcessedCount += result.rowCount;
// Calculate progress ensuring valid numbers
const currentProgress = Math.floor(totalProducts * (0.99 + (abcProcessedCount / (totalCount || 1)) * 0.01));
processedProducts = Number(currentProgress) || processedProducts || 0;
// Only update progress at most once per second
const now = Date.now();
if (now - lastProgressUpdate >= progressUpdateInterval) {
const progress = ensureValidProgress(processedProducts, totalProducts);
outputProgress({
status: 'running',
operation: 'ABC classification progress',
current: progress.current,
total: progress.total,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, progress.current, progress.total),
rate: calculateRate(startTime, progress.current),
percentage: progress.percentage,
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
lastProgressUpdate = now;
}
// Update database progress
await updateProgress(processedProducts, processedOrders, processedPurchaseOrders);
// Small delay between batches to allow other transactions
await new Promise(resolve => setTimeout(resolve, 100));
}
// Clean up
await connection.query('DROP TABLE IF EXISTS temp_revenue_ranks');
const endTime = Date.now();
const totalElapsedSeconds = Math.round((endTime - startTime) / 1000);
// Update calculate_status for ABC classification
await connection.query(`
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
VALUES ('abc_classification', NOW())
ON CONFLICT (module_name) DO UPDATE
SET last_calculation_timestamp = NOW()
`);
// Final progress update with guaranteed valid numbers
const finalProgress = ensureValidProgress(totalProducts, totalProducts);
@@ -586,14 +409,14 @@ async function calculateMetrics() {
operation: 'Metrics calculation complete',
current: finalProgress.current,
total: finalProgress.total,
elapsed: formatElapsedTime(startTime),
elapsed: global.formatElapsedTime(startTime),
remaining: '0s',
rate: calculateRate(startTime, finalProgress.current),
rate: global.calculateRate(startTime, finalProgress.current),
percentage: '100',
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: totalElapsedSeconds
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
@@ -615,7 +438,7 @@ async function calculateMetrics() {
processed_purchase_orders = $4,
status = 'completed'
WHERE id = $5
`, [totalElapsedSeconds,
`, [Math.round((Date.now() - startTime) / 1000),
finalStats.processedProducts,
finalStats.processedOrders,
finalStats.processedPurchaseOrders,
@@ -624,6 +447,11 @@ async function calculateMetrics() {
// Clear progress file on successful completion
global.clearProgress();
return {
success: true,
message: 'Calculation completed successfully',
duration: Math.round((Date.now() - startTime) / 1000)
};
} catch (error) {
const endTime = Date.now();
const totalElapsedSeconds = Math.round((endTime - startTime) / 1000);
@@ -685,17 +513,38 @@ async function calculateMetrics() {
}
throw error;
} finally {
// Clear the timeout to prevent forced termination
clearTimeout(timeout);
// Always clean up and release connection
if (connection) {
// Ensure temporary tables are cleaned up
await cleanupTemporaryTables(connection);
connection.release();
try {
await cleanupTemporaryTables(connection);
connection.release();
} catch (err) {
console.error('Error in final cleanup:', err);
}
}
// Close the connection pool when we're done
await closePool();
}
} catch (error) {
success = false;
logError(error, 'Error in metrics calculation');
console.error('Error in metrics calculation', error);
try {
if (connection) {
await connection.query(`
UPDATE calculate_history
SET
status = 'error',
end_time = NOW(),
duration_seconds = EXTRACT(EPOCH FROM (NOW() - start_time))::INTEGER,
error_message = $1
WHERE id = $2
`, [error.message.substring(0, 500), calculateHistoryId]);
}
} catch (updateError) {
console.error('Error updating calculation history:', updateError);
}
throw error;
}
}