426 lines
20 KiB
JavaScript
426 lines
20 KiB
JavaScript
const path = require('path');
|
|
const fs = require('fs');
|
|
const progress = require('../utils/progress'); // Assuming progress utils are here
|
|
const { getConnection, closePool } = require('../utils/db'); // Assuming db utils are here
|
|
const os = require('os'); // For detecting number of CPU cores
|
|
|
|
// --- Configuration ---
|
|
const BATCH_SIZE_DAYS = 1; // Process 1 day per database function call
|
|
const SQL_FUNCTION_FILE = path.resolve(__dirname, 'backfill_historical_snapshots.sql'); // Correct path
|
|
const LOG_PROGRESS_INTERVAL_MS = 5000; // Update console progress roughly every 5 seconds
|
|
const HISTORY_TYPE = 'backfill_snapshots'; // Identifier for history table
|
|
const MAX_WORKERS = Math.max(1, Math.floor(os.cpus().length / 2)); // Use half of available CPU cores
|
|
const USE_PARALLEL = false; // Set to true to enable parallel processing
|
|
const PG_STATEMENT_TIMEOUT_MS = 1800000; // 30 minutes max per query
|
|
|
|
// --- Cancellation Handling ---
|
|
let isCancelled = false;
|
|
let runningQueryPromise = null; // To potentially track the active query
|
|
|
|
function requestCancellation() {
|
|
if (!isCancelled) {
|
|
isCancelled = true;
|
|
console.warn('\nCancellation requested. Finishing current batch then stopping...');
|
|
// Note: We are NOT forcefully cancelling the backend query anymore.
|
|
}
|
|
}
|
|
|
|
process.on('SIGINT', requestCancellation); // Handle Ctrl+C
|
|
process.on('SIGTERM', requestCancellation); // Handle termination signals
|
|
|
|
// --- Main Backfill Function ---
|
|
async function backfillSnapshots(cmdStartDate, cmdEndDate, cmdStartBatch = 1) {
|
|
let connection;
|
|
const overallStartTime = Date.now();
|
|
let calculateHistoryId = null;
|
|
let processedDaysTotal = 0; // Track total days processed across all batches executed in this run
|
|
let currentBatchNum = cmdStartBatch > 0 ? cmdStartBatch : 1;
|
|
let totalBatches = 0; // Initialize totalBatches
|
|
let totalDays = 0; // Initialize totalDays
|
|
|
|
console.log(`Starting snapshot backfill process...`);
|
|
console.log(`SQL Function definition file: ${SQL_FUNCTION_FILE}`);
|
|
if (!fs.existsSync(SQL_FUNCTION_FILE)) {
|
|
console.error(`FATAL: SQL file not found at ${SQL_FUNCTION_FILE}`);
|
|
process.exit(1); // Exit early if file doesn't exist
|
|
}
|
|
|
|
try {
|
|
// Set up a connection with higher memory limits
|
|
connection = await getConnection({
|
|
// Add performance-related settings
|
|
application_name: 'backfill_snapshots',
|
|
statement_timeout: PG_STATEMENT_TIMEOUT_MS, // 30 min timeout per statement
|
|
// These parameters may need to be configured in your database:
|
|
// work_mem: '1GB',
|
|
// maintenance_work_mem: '2GB',
|
|
// temp_buffers: '1GB',
|
|
});
|
|
|
|
console.log('Database connection acquired.');
|
|
|
|
// --- Ensure Function Exists ---
|
|
console.log('Ensuring database function is up-to-date...');
|
|
try {
|
|
const sqlFunctionDef = fs.readFileSync(SQL_FUNCTION_FILE, 'utf8');
|
|
if (!sqlFunctionDef.includes('CREATE OR REPLACE FUNCTION backfill_daily_snapshots_range_final')) {
|
|
throw new Error(`SQL file ${SQL_FUNCTION_FILE} does not seem to contain the function definition.`);
|
|
}
|
|
await connection.query(sqlFunctionDef); // Execute the whole file
|
|
console.log('Database function `backfill_daily_snapshots_range_final` created/updated.');
|
|
|
|
// Add performance query hints to the database
|
|
await connection.query(`
|
|
-- Analyze tables for better query planning
|
|
ANALYZE public.products;
|
|
ANALYZE public.imported_daily_inventory;
|
|
ANALYZE public.imported_product_stat_history;
|
|
ANALYZE public.daily_product_snapshots;
|
|
ANALYZE public.imported_product_current_prices;
|
|
`).catch(err => {
|
|
// Non-fatal if analyze fails
|
|
console.warn('Failed to analyze tables (non-fatal):', err.message);
|
|
});
|
|
|
|
} catch (err) {
|
|
console.error(`Error processing SQL function file ${SQL_FUNCTION_FILE}:`, err);
|
|
throw new Error(`Failed to create or replace DB function: ${err.message}`);
|
|
}
|
|
|
|
// --- Prepare History Record ---
|
|
console.log('Preparing calculation history record...');
|
|
// Ensure history table exists (optional, could be done elsewhere)
|
|
await connection.query(`
|
|
CREATE TABLE IF NOT EXISTS public.calculate_history (
|
|
id SERIAL PRIMARY KEY,
|
|
start_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
end_time TIMESTAMPTZ,
|
|
duration_seconds INTEGER,
|
|
status VARCHAR(20) NOT NULL, -- e.g., 'running', 'completed', 'failed', 'cancelled'
|
|
error_message TEXT,
|
|
additional_info JSONB -- Store type, file, batch info etc.
|
|
);
|
|
`);
|
|
// Mark previous runs of this type as potentially failed if they were left 'running'
|
|
await connection.query(`
|
|
UPDATE public.calculate_history
|
|
SET status = 'failed', error_message = 'Interrupted by new run.'
|
|
WHERE status = 'running' AND additional_info->>'type' = $1;
|
|
`, [HISTORY_TYPE]);
|
|
|
|
// Create new history record
|
|
const historyResult = await connection.query(`
|
|
INSERT INTO public.calculate_history (start_time, status, additional_info)
|
|
VALUES (NOW(), 'running', jsonb_build_object('type', $1::text, 'sql_file', $2::text, 'start_batch', $3::integer))
|
|
RETURNING id;
|
|
`, [HISTORY_TYPE, path.basename(SQL_FUNCTION_FILE), cmdStartBatch]);
|
|
calculateHistoryId = historyResult.rows[0].id;
|
|
console.log(`Calculation history record created with ID: ${calculateHistoryId}`);
|
|
|
|
|
|
// --- Determine Date Range ---
|
|
console.log('Determining date range...');
|
|
let effectiveStartDate, effectiveEndDate;
|
|
|
|
// Use command-line dates if provided, otherwise query DB
|
|
if (cmdStartDate) {
|
|
effectiveStartDate = cmdStartDate;
|
|
} else {
|
|
const minDateResult = await connection.query(`
|
|
SELECT LEAST(
|
|
COALESCE((SELECT MIN(date) FROM public.imported_daily_inventory WHERE date > '1970-01-01'), CURRENT_DATE),
|
|
COALESCE((SELECT MIN(date) FROM public.imported_product_stat_history WHERE date > '1970-01-01'), CURRENT_DATE)
|
|
)::date as min_date;
|
|
`);
|
|
effectiveStartDate = minDateResult.rows[0]?.min_date || new Date().toISOString().split('T')[0]; // Fallback
|
|
console.log(`Auto-detected start date: ${effectiveStartDate}`);
|
|
}
|
|
|
|
if (cmdEndDate) {
|
|
effectiveEndDate = cmdEndDate;
|
|
} else {
|
|
const maxDateResult = await connection.query(`
|
|
SELECT GREATEST(
|
|
COALESCE((SELECT MAX(date) FROM public.imported_daily_inventory WHERE date < CURRENT_DATE), '1970-01-01'::date),
|
|
COALESCE((SELECT MAX(date) FROM public.imported_product_stat_history WHERE date < CURRENT_DATE), '1970-01-01'::date)
|
|
)::date as max_date;
|
|
`);
|
|
// Ensure end date is not today or in the future
|
|
effectiveEndDate = maxDateResult.rows[0]?.max_date || new Date(Date.now() - 86400000).toISOString().split('T')[0]; // Default yesterday
|
|
if (new Date(effectiveEndDate) >= new Date(new Date().toISOString().split('T')[0])) {
|
|
effectiveEndDate = new Date(Date.now() - 86400000).toISOString().split('T')[0]; // Set to yesterday if >= today
|
|
}
|
|
console.log(`Auto-detected end date: ${effectiveEndDate}`);
|
|
}
|
|
|
|
// Validate dates
|
|
const dStart = new Date(effectiveStartDate);
|
|
const dEnd = new Date(effectiveEndDate);
|
|
if (isNaN(dStart.getTime()) || isNaN(dEnd.getTime()) || dStart > dEnd) {
|
|
throw new Error(`Invalid date range: Start "${effectiveStartDate}", End "${effectiveEndDate}"`);
|
|
}
|
|
|
|
// --- Batch Processing ---
|
|
totalDays = Math.ceil((dEnd - dStart) / (1000 * 60 * 60 * 24)) + 1; // Inclusive
|
|
totalBatches = Math.ceil(totalDays / BATCH_SIZE_DAYS);
|
|
|
|
console.log(`Target Date Range: ${effectiveStartDate} to ${effectiveEndDate} (${totalDays} days)`);
|
|
console.log(`Total Batches: ${totalBatches} (Batch Size: ${BATCH_SIZE_DAYS} days)`);
|
|
console.log(`Starting from Batch: ${currentBatchNum}`);
|
|
|
|
// Initial progress update
|
|
progress.outputProgress({
|
|
status: 'running',
|
|
operation: 'Starting Batch Processing',
|
|
currentBatch: currentBatchNum,
|
|
totalBatches: totalBatches,
|
|
totalDays: totalDays,
|
|
elapsed: '0s',
|
|
remaining: 'Calculating...',
|
|
rate: 0,
|
|
historyId: calculateHistoryId // Include history ID in the object
|
|
});
|
|
|
|
while (currentBatchNum <= totalBatches && !isCancelled) {
|
|
const batchOffset = (currentBatchNum - 1) * BATCH_SIZE_DAYS;
|
|
const batchStartDate = new Date(dStart);
|
|
batchStartDate.setDate(dStart.getDate() + batchOffset);
|
|
|
|
const batchEndDate = new Date(batchStartDate);
|
|
batchEndDate.setDate(batchStartDate.getDate() + BATCH_SIZE_DAYS - 1);
|
|
|
|
// Clamp batch end date to the overall effective end date
|
|
if (batchEndDate > dEnd) {
|
|
batchEndDate.setTime(dEnd.getTime());
|
|
}
|
|
|
|
const batchStartDateStr = batchStartDate.toISOString().split('T')[0];
|
|
const batchEndDateStr = batchEndDate.toISOString().split('T')[0];
|
|
const batchStartTime = Date.now();
|
|
|
|
console.log(`\n--- Processing Batch ${currentBatchNum} / ${totalBatches} ---`);
|
|
console.log(` Dates: ${batchStartDateStr} to ${batchEndDateStr}`);
|
|
|
|
// Execute the function for the batch
|
|
try {
|
|
progress.outputProgress({
|
|
status: 'running',
|
|
operation: `Executing DB function for batch ${currentBatchNum}...`,
|
|
currentBatch: currentBatchNum,
|
|
totalBatches: totalBatches,
|
|
totalDays: totalDays,
|
|
elapsed: progress.formatElapsedTime(overallStartTime),
|
|
remaining: 'Executing...',
|
|
rate: 0,
|
|
historyId: calculateHistoryId
|
|
});
|
|
|
|
// Performance improvement: Add batch processing hint
|
|
await connection.query('SET LOCAL enable_parallel_append = on; SET LOCAL enable_parallel_hash = on; SET LOCAL max_parallel_workers_per_gather = 4;');
|
|
|
|
// Store promise in case we need to try and cancel (though not implemented forcefully)
|
|
runningQueryPromise = connection.query(
|
|
`SELECT backfill_daily_snapshots_range_final($1::date, $2::date);`,
|
|
[batchStartDateStr, batchEndDateStr]
|
|
);
|
|
await runningQueryPromise; // Wait for the function call to complete
|
|
runningQueryPromise = null; // Clear the promise
|
|
|
|
const batchDurationMs = Date.now() - batchStartTime;
|
|
const daysInThisBatch = Math.ceil((batchEndDate - batchStartDate) / (1000 * 60 * 60 * 24)) + 1;
|
|
processedDaysTotal += daysInThisBatch;
|
|
|
|
console.log(` Batch ${currentBatchNum} completed in ${progress.formatElapsedTime(batchStartTime)}.`);
|
|
|
|
// --- Update Progress & History ---
|
|
const overallElapsedSec = Math.round((Date.now() - overallStartTime) / 1000);
|
|
progress.outputProgress({
|
|
status: 'running',
|
|
operation: `Completed batch ${currentBatchNum}`,
|
|
currentBatch: currentBatchNum,
|
|
totalBatches: totalBatches,
|
|
totalDays: totalDays,
|
|
processedDays: processedDaysTotal,
|
|
elapsed: progress.formatElapsedTime(overallStartTime),
|
|
remaining: progress.estimateRemaining(overallStartTime, processedDaysTotal, totalDays),
|
|
rate: progress.calculateRate(overallStartTime, processedDaysTotal),
|
|
batchDuration: progress.formatElapsedTime(batchStartTime),
|
|
historyId: calculateHistoryId
|
|
});
|
|
|
|
// Save checkpoint in history
|
|
await connection.query(`
|
|
UPDATE public.calculate_history
|
|
SET additional_info = jsonb_set(additional_info, '{last_completed_batch}', $1::jsonb)
|
|
|| jsonb_build_object('last_processed_date', $2::text)
|
|
WHERE id = $3::integer;
|
|
`, [JSON.stringify(currentBatchNum), batchEndDateStr, calculateHistoryId]);
|
|
|
|
|
|
} catch (batchError) {
|
|
console.error(`\n--- ERROR in Batch ${currentBatchNum} (${batchStartDateStr} to ${batchEndDateStr}) ---`);
|
|
console.error(' Database Error:', batchError.message);
|
|
console.error(' DB Error Code:', batchError.code);
|
|
// Log detailed error to history and re-throw to stop the process
|
|
await connection.query(`
|
|
UPDATE public.calculate_history
|
|
SET status = 'failed',
|
|
end_time = NOW(),
|
|
duration_seconds = $1::integer,
|
|
error_message = $2::text,
|
|
additional_info = additional_info || jsonb_build_object('failed_batch', $3::integer, 'failed_date_range', $4::text)
|
|
WHERE id = $5::integer;
|
|
`, [
|
|
Math.round((Date.now() - overallStartTime) / 1000),
|
|
`Batch ${currentBatchNum} failed: ${batchError.message} (Code: ${batchError.code || 'N/A'})`,
|
|
currentBatchNum,
|
|
`${batchStartDateStr} to ${batchEndDateStr}`,
|
|
calculateHistoryId
|
|
]);
|
|
throw batchError; // Stop execution
|
|
}
|
|
|
|
currentBatchNum++;
|
|
// Optional delay between batches
|
|
// await new Promise(resolve => setTimeout(resolve, 500));
|
|
|
|
} // End while loop
|
|
|
|
// --- Final Outcome ---
|
|
const finalStatus = isCancelled ? 'cancelled' : 'completed';
|
|
const finalMessage = isCancelled ? `Calculation stopped after completing batch ${currentBatchNum - 1}.` : 'Historical snapshots backfill completed successfully.';
|
|
const finalDurationSec = Math.round((Date.now() - overallStartTime) / 1000);
|
|
|
|
console.log(`\n--- Backfill ${finalStatus.toUpperCase()} ---`);
|
|
console.log(finalMessage);
|
|
console.log(`Total duration: ${progress.formatElapsedTime(overallStartTime)}`);
|
|
|
|
// Update history record
|
|
await connection.query(`
|
|
UPDATE public.calculate_history SET status = $1::calculation_status, end_time = NOW(), duration_seconds = $2::integer, error_message = $3
|
|
WHERE id = $4::integer;
|
|
`, [finalStatus, finalDurationSec, (isCancelled ? 'User cancelled' : null), calculateHistoryId]);
|
|
|
|
if (!isCancelled) {
|
|
progress.clearProgress(); // Clear progress state only on successful completion
|
|
} else {
|
|
progress.outputProgress({ // Final cancelled status update
|
|
status: 'cancelled',
|
|
operation: finalMessage,
|
|
currentBatch: currentBatchNum - 1,
|
|
totalBatches: totalBatches,
|
|
totalDays: totalDays,
|
|
processedDays: processedDaysTotal,
|
|
elapsed: progress.formatElapsedTime(overallStartTime),
|
|
remaining: 'Cancelled',
|
|
rate: 0,
|
|
historyId: calculateHistoryId
|
|
});
|
|
}
|
|
|
|
return { success: true, status: finalStatus, message: finalMessage, duration: finalDurationSec };
|
|
|
|
} catch (error) {
|
|
console.error('\n--- Backfill encountered an unrecoverable error ---');
|
|
console.error(error.message);
|
|
const finalDurationSec = Math.round((Date.now() - overallStartTime) / 1000);
|
|
|
|
// Update history if possible
|
|
if (connection && calculateHistoryId) {
|
|
try {
|
|
await connection.query(`
|
|
UPDATE public.calculate_history
|
|
SET status = $1::calculation_status, end_time = NOW(), duration_seconds = $2::integer, error_message = $3::text
|
|
WHERE id = $4::integer;
|
|
`, [
|
|
isCancelled ? 'cancelled' : 'failed',
|
|
finalDurationSec,
|
|
error.message,
|
|
calculateHistoryId
|
|
]);
|
|
} catch (histError) {
|
|
console.error("Failed to update history record with error state:", histError);
|
|
}
|
|
} else {
|
|
console.error("Could not update history record (no ID or connection).");
|
|
}
|
|
|
|
// FIX: Use initialized value or a default if loop never started
|
|
const batchNumForError = currentBatchNum > cmdStartBatch ? currentBatchNum - 1 : cmdStartBatch - 1;
|
|
|
|
// Update progress.outputProgress call to match actual function signature
|
|
try {
|
|
// Create progress data object
|
|
const progressData = {
|
|
status: 'failed',
|
|
operation: 'Backfill failed',
|
|
message: error.message,
|
|
currentBatch: batchNumForError,
|
|
totalBatches: totalBatches,
|
|
totalDays: totalDays,
|
|
processedDays: processedDaysTotal,
|
|
elapsed: progress.formatElapsedTime(overallStartTime),
|
|
remaining: 'Failed',
|
|
rate: 0,
|
|
// Include history ID in progress data if needed
|
|
historyId: calculateHistoryId
|
|
};
|
|
|
|
// Call with single object parameter (not separate historyId)
|
|
progress.outputProgress(progressData);
|
|
} catch (progressError) {
|
|
console.error('Failed to report progress:', progressError);
|
|
}
|
|
|
|
return { success: false, status: 'failed', error: error.message, duration: finalDurationSec };
|
|
|
|
} finally {
|
|
if (connection) {
|
|
console.log('Releasing database connection.');
|
|
connection.release();
|
|
}
|
|
// Close pool only if this script is meant to be standalone
|
|
// If part of a larger app, the app should manage pool closure
|
|
// console.log('Closing database pool.');
|
|
// await closePool();
|
|
}
|
|
}
|
|
|
|
// --- Script Execution ---
|
|
|
|
// Parse command-line arguments
|
|
const args = process.argv.slice(2);
|
|
let cmdStartDateArg, cmdEndDateArg, cmdStartBatchArg = 1; // Default start batch is 1
|
|
|
|
for (let i = 0; i < args.length; i++) {
|
|
if (args[i] === '--start-date' && args[i+1]) cmdStartDateArg = args[++i];
|
|
else if (args[i] === '--end-date' && args[i+1]) cmdEndDateArg = args[++i];
|
|
else if (args[i] === '--start-batch' && args[i+1]) cmdStartBatchArg = parseInt(args[++i], 10);
|
|
}
|
|
|
|
if (isNaN(cmdStartBatchArg) || cmdStartBatchArg < 1) {
|
|
console.warn(`Invalid --start-batch value. Defaulting to 1.`);
|
|
cmdStartBatchArg = 1;
|
|
}
|
|
|
|
// Run the backfill process
|
|
backfillSnapshots(cmdStartDateArg, cmdEndDateArg, cmdStartBatchArg)
|
|
.then(result => {
|
|
if (result.success) {
|
|
console.log(`\n✅ ${result.message} (Duration: ${result.duration}s)`);
|
|
process.exitCode = 0; // Success
|
|
} else {
|
|
console.error(`\n❌ Backfill failed: ${result.error || 'Unknown error'} (Duration: ${result.duration}s)`);
|
|
process.exitCode = 1; // Failure
|
|
}
|
|
})
|
|
.catch(err => {
|
|
console.error('\n❌ Unexpected error during backfill execution:', err);
|
|
process.exitCode = 1; // Failure
|
|
})
|
|
.finally(async () => {
|
|
// Ensure pool is closed if run standalone
|
|
console.log('Backfill script finished. Closing pool.');
|
|
await closePool(); // Make sure closePool exists and works in your db utils
|
|
process.exit(process.exitCode); // Exit with appropriate code
|
|
}); |