396 lines
14 KiB
JavaScript
396 lines
14 KiB
JavaScript
const path = require('path');
|
|
const fs = require('fs');
|
|
const os = require('os'); // For detecting CPU cores
|
|
|
|
// Get the base directory (the directory containing the inventory-server folder)
|
|
const baseDir = path.resolve(__dirname, '../../..');
|
|
|
|
// Load environment variables from the inventory-server directory
|
|
require('dotenv').config({ path: path.resolve(__dirname, '../..', '.env') });
|
|
|
|
// Configure statement timeout (30 minutes)
|
|
const PG_STATEMENT_TIMEOUT_MS = 1800000;
|
|
|
|
// Add error handler for uncaught exceptions
|
|
process.on('uncaughtException', (error) => {
|
|
console.error('Uncaught Exception:', error);
|
|
process.exit(1);
|
|
});
|
|
|
|
// Add error handler for unhandled promise rejections
|
|
process.on('unhandledRejection', (reason, promise) => {
|
|
console.error('Unhandled Rejection at:', promise, 'reason:', reason);
|
|
process.exit(1);
|
|
});
|
|
|
|
// Load progress module
|
|
const progress = require('../utils/progress');
|
|
|
|
// Store progress functions in global scope to ensure availability
|
|
global.formatElapsedTime = progress.formatElapsedTime;
|
|
global.estimateRemaining = progress.estimateRemaining;
|
|
global.calculateRate = progress.calculateRate;
|
|
global.outputProgress = progress.outputProgress;
|
|
global.clearProgress = progress.clearProgress;
|
|
global.getProgress = progress.getProgress;
|
|
global.logError = progress.logError;
|
|
|
|
// Load database module
|
|
const { getConnection, closePool } = require('../utils/db');
|
|
|
|
// Add cancel handler
|
|
let isCancelled = false;
|
|
let runningQueryPromise = null;
|
|
|
|
function cancelCalculation() {
|
|
if (!isCancelled) {
|
|
isCancelled = true;
|
|
console.log('Calculation has been cancelled by user');
|
|
|
|
// Store the query promise to potentially cancel it
|
|
const queryToCancel = runningQueryPromise;
|
|
if (queryToCancel) {
|
|
console.log('Attempting to cancel the running query...');
|
|
}
|
|
|
|
// Force-terminate any query that's been running for more than 5 seconds
|
|
try {
|
|
const connection = getConnection();
|
|
connection.then(async (conn) => {
|
|
try {
|
|
// Identify and terminate long-running queries from our application
|
|
await conn.query(`
|
|
SELECT pg_cancel_backend(pid)
|
|
FROM pg_stat_activity
|
|
WHERE query_start < now() - interval '5 seconds'
|
|
AND application_name = 'populate_metrics'
|
|
AND query NOT LIKE '%pg_cancel_backend%'
|
|
`);
|
|
|
|
// Release connection
|
|
conn.release();
|
|
} catch (err) {
|
|
console.error('Error during force cancellation:', err);
|
|
conn.release();
|
|
}
|
|
}).catch(err => {
|
|
console.error('Could not get connection for cancellation:', err);
|
|
});
|
|
} catch (err) {
|
|
console.error('Failed to terminate running queries:', err);
|
|
}
|
|
}
|
|
|
|
return {
|
|
success: true,
|
|
message: 'Calculation has been cancelled'
|
|
};
|
|
}
|
|
|
|
// Handle SIGTERM signal for cancellation
|
|
process.on('SIGTERM', cancelCalculation);
|
|
process.on('SIGINT', cancelCalculation);
|
|
|
|
async function populateInitialMetrics() {
|
|
let connection;
|
|
const startTime = Date.now();
|
|
let calculateHistoryId;
|
|
|
|
try {
|
|
// Clean up any previously running calculations
|
|
connection = await getConnection({
|
|
// Add performance-related settings
|
|
application_name: 'populate_metrics',
|
|
statement_timeout: PG_STATEMENT_TIMEOUT_MS, // 30 min timeout per statement
|
|
});
|
|
|
|
// Ensure the calculate_status table exists and has the correct structure
|
|
await connection.query(`
|
|
CREATE TABLE IF NOT EXISTS calculate_status (
|
|
module_name TEXT PRIMARY KEY,
|
|
last_calculation_timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
`);
|
|
|
|
await connection.query(`
|
|
UPDATE calculate_history
|
|
SET
|
|
status = 'cancelled',
|
|
end_time = NOW(),
|
|
duration_seconds = EXTRACT(EPOCH FROM (NOW() - start_time))::INTEGER,
|
|
error_message = 'Previous calculation was not completed properly'
|
|
WHERE status = 'running' AND additional_info->>'type' = 'populate_initial_metrics'
|
|
`);
|
|
|
|
// Create history record for this calculation
|
|
const historyResult = await connection.query(`
|
|
INSERT INTO calculate_history (
|
|
start_time,
|
|
status,
|
|
additional_info
|
|
) VALUES (
|
|
NOW(),
|
|
'running',
|
|
jsonb_build_object(
|
|
'type', 'populate_initial_metrics',
|
|
'sql_file', 'populate_initial_product_metrics.sql'
|
|
)
|
|
) RETURNING id
|
|
`);
|
|
calculateHistoryId = historyResult.rows[0].id;
|
|
|
|
// Initialize progress
|
|
global.outputProgress({
|
|
status: 'running',
|
|
operation: 'Starting initial product metrics population',
|
|
current: 0,
|
|
total: 100,
|
|
elapsed: '0s',
|
|
remaining: 'Calculating... (this may take a while)',
|
|
rate: 0,
|
|
percentage: '0',
|
|
timing: {
|
|
start_time: new Date(startTime).toISOString(),
|
|
end_time: new Date().toISOString(),
|
|
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
|
},
|
|
historyId: calculateHistoryId
|
|
});
|
|
|
|
// Prepare the database - analyze tables
|
|
global.outputProgress({
|
|
status: 'running',
|
|
operation: 'Analyzing database tables for better query performance',
|
|
current: 2,
|
|
total: 100,
|
|
elapsed: global.formatElapsedTime(startTime),
|
|
remaining: 'Analyzing...',
|
|
rate: 0,
|
|
percentage: '2',
|
|
timing: {
|
|
start_time: new Date(startTime).toISOString(),
|
|
end_time: new Date().toISOString(),
|
|
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
|
},
|
|
historyId: calculateHistoryId
|
|
});
|
|
|
|
// Enable better query planning and parallel operations
|
|
await connection.query(`
|
|
-- Analyze tables for better query planning
|
|
ANALYZE public.products;
|
|
ANALYZE public.purchase_orders;
|
|
ANALYZE public.daily_product_snapshots;
|
|
ANALYZE public.orders;
|
|
|
|
-- Enable parallel operations
|
|
SET LOCAL enable_parallel_append = on;
|
|
SET LOCAL enable_parallel_hash = on;
|
|
SET LOCAL max_parallel_workers_per_gather = 4;
|
|
|
|
-- Larger work memory for complex sorts/joins
|
|
SET LOCAL work_mem = '128MB';
|
|
`).catch(err => {
|
|
// Non-fatal if analyze fails
|
|
console.warn('Failed to analyze tables (non-fatal):', err.message);
|
|
});
|
|
|
|
// Execute the SQL query
|
|
global.outputProgress({
|
|
status: 'running',
|
|
operation: 'Executing initial metrics SQL query',
|
|
current: 5,
|
|
total: 100,
|
|
elapsed: global.formatElapsedTime(startTime),
|
|
remaining: 'Calculating... (this could take several hours with 150M+ records)',
|
|
rate: 0,
|
|
percentage: '5',
|
|
timing: {
|
|
start_time: new Date(startTime).toISOString(),
|
|
end_time: new Date().toISOString(),
|
|
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
|
},
|
|
historyId: calculateHistoryId
|
|
});
|
|
|
|
// Read the SQL file
|
|
const sqlFilePath = path.resolve(__dirname, 'populate_initial_product_metrics.sql');
|
|
console.log('Base directory:', baseDir);
|
|
console.log('Script directory:', __dirname);
|
|
console.log('SQL file path:', sqlFilePath);
|
|
console.log('Current working directory:', process.cwd());
|
|
|
|
if (!fs.existsSync(sqlFilePath)) {
|
|
throw new Error(`SQL file not found at ${sqlFilePath}`);
|
|
}
|
|
|
|
// Read and clean up the SQL (Slightly more robust cleaning)
|
|
const sqlQuery = fs.readFileSync(sqlFilePath, 'utf8')
|
|
.replace(/\r\n/g, '\n') // Handle Windows endings
|
|
.replace(/\r/g, '\n') // Handle old Mac endings
|
|
.trim(); // Remove leading/trailing whitespace VERY IMPORTANT
|
|
|
|
// Log details again AFTER cleaning
|
|
console.log('SQL Query length (cleaned):', sqlQuery.length);
|
|
console.log('SQL Query structure validation:');
|
|
console.log('- Contains DO block:', sqlQuery.includes('DO $$') || sqlQuery.includes('DO $')); // Check both types of tag start
|
|
console.log('- Contains BEGIN:', sqlQuery.includes('BEGIN'));
|
|
console.log('- Contains END:', sqlQuery.includes('END $$;') || sqlQuery.includes('END $')); // Check both types of tag end
|
|
console.log('- First 50 chars:', JSON.stringify(sqlQuery.slice(0, 50)));
|
|
console.log('- Last 100 chars (cleaned):', JSON.stringify(sqlQuery.slice(-100)));
|
|
|
|
// Final check to ensure clean SQL ending
|
|
if (!sqlQuery.endsWith('END $$;')) {
|
|
console.warn('WARNING: SQL does not end with "END $$;". This might cause issues.');
|
|
console.log('Exact ending:', JSON.stringify(sqlQuery.slice(-20)));
|
|
}
|
|
|
|
// Execute the script
|
|
console.log('Starting initial product metrics population...');
|
|
|
|
// Track the query promise for potential cancellation
|
|
runningQueryPromise = connection.query({
|
|
text: sqlQuery,
|
|
rowMode: 'array'
|
|
});
|
|
await runningQueryPromise;
|
|
runningQueryPromise = null;
|
|
|
|
// Update progress to 100%
|
|
global.outputProgress({
|
|
status: 'complete',
|
|
operation: 'Initial product metrics population complete',
|
|
current: 100,
|
|
total: 100,
|
|
elapsed: global.formatElapsedTime(startTime),
|
|
remaining: '0s',
|
|
rate: 0,
|
|
percentage: '100',
|
|
timing: {
|
|
start_time: new Date(startTime).toISOString(),
|
|
end_time: new Date().toISOString(),
|
|
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
|
},
|
|
historyId: calculateHistoryId
|
|
});
|
|
|
|
// Update history with completion
|
|
await connection.query(`
|
|
UPDATE calculate_history
|
|
SET
|
|
end_time = NOW(),
|
|
duration_seconds = $1,
|
|
status = 'completed'
|
|
WHERE id = $2
|
|
`, [Math.round((Date.now() - startTime) / 1000), calculateHistoryId]);
|
|
|
|
// Clear progress file on successful completion
|
|
global.clearProgress();
|
|
|
|
return {
|
|
success: true,
|
|
message: 'Initial product metrics population completed successfully',
|
|
duration: Math.round((Date.now() - startTime) / 1000)
|
|
};
|
|
} catch (error) {
|
|
const endTime = Date.now();
|
|
const totalElapsedSeconds = Math.round((endTime - startTime) / 1000);
|
|
|
|
// Enhanced error logging
|
|
console.error('Error details:', {
|
|
message: error.message,
|
|
code: error.code,
|
|
hint: error.hint,
|
|
position: error.position,
|
|
detail: error.detail,
|
|
where: error.where ? error.where.substring(0, 500) + '...' : undefined, // Truncate to avoid huge logs
|
|
severity: error.severity,
|
|
file: error.file,
|
|
line: error.line,
|
|
routine: error.routine
|
|
});
|
|
|
|
// Update history with error
|
|
if (connection && calculateHistoryId) {
|
|
await connection.query(`
|
|
UPDATE calculate_history
|
|
SET
|
|
end_time = NOW(),
|
|
duration_seconds = $1,
|
|
status = $2,
|
|
error_message = $3
|
|
WHERE id = $4
|
|
`, [
|
|
totalElapsedSeconds,
|
|
isCancelled ? 'cancelled' : 'failed',
|
|
error.message,
|
|
calculateHistoryId
|
|
]);
|
|
}
|
|
|
|
if (isCancelled) {
|
|
global.outputProgress({
|
|
status: 'cancelled',
|
|
operation: 'Calculation cancelled',
|
|
current: 50,
|
|
total: 100,
|
|
elapsed: global.formatElapsedTime(startTime),
|
|
remaining: null,
|
|
rate: 0,
|
|
percentage: '50',
|
|
timing: {
|
|
start_time: new Date(startTime).toISOString(),
|
|
end_time: new Date().toISOString(),
|
|
elapsed_seconds: totalElapsedSeconds
|
|
},
|
|
historyId: calculateHistoryId
|
|
});
|
|
} else {
|
|
global.outputProgress({
|
|
status: 'error',
|
|
operation: 'Error during initial product metrics population',
|
|
message: error.message,
|
|
current: 0,
|
|
total: 100,
|
|
elapsed: global.formatElapsedTime(startTime),
|
|
remaining: null,
|
|
rate: 0,
|
|
percentage: '0',
|
|
timing: {
|
|
start_time: new Date(startTime).toISOString(),
|
|
end_time: new Date().toISOString(),
|
|
elapsed_seconds: totalElapsedSeconds
|
|
},
|
|
historyId: calculateHistoryId
|
|
});
|
|
}
|
|
|
|
console.error('Error during initial product metrics population:', error);
|
|
return {
|
|
success: false,
|
|
error: error.message,
|
|
duration: totalElapsedSeconds
|
|
};
|
|
} finally {
|
|
if (connection) {
|
|
connection.release();
|
|
}
|
|
await closePool();
|
|
}
|
|
}
|
|
|
|
// Start population process
|
|
populateInitialMetrics()
|
|
.then(result => {
|
|
if (result.success) {
|
|
console.log(`Initial product metrics population completed successfully in ${result.duration} seconds`);
|
|
process.exit(0);
|
|
} else {
|
|
console.error(`Initial product metrics population failed: ${result.error}`);
|
|
process.exit(1);
|
|
}
|
|
})
|
|
.catch(err => {
|
|
console.error('Unexpected error:', err);
|
|
process.exit(1);
|
|
});
|