const path = require('path'); const fs = require('fs'); const progress = require('./utils/progress'); // Assuming progress utils are here const { getConnection, closePool } = require('./utils/db'); // Assuming db utils are here const os = require('os'); // For detecting number of CPU cores // --- Configuration --- const BATCH_SIZE_DAYS = 1; // Process 1 day per database function call const SQL_FUNCTION_FILE = path.resolve(__dirname, 'backfill_historical_snapshots.sql'); // Correct path const LOG_PROGRESS_INTERVAL_MS = 5000; // Update console progress roughly every 5 seconds const HISTORY_TYPE = 'backfill_snapshots'; // Identifier for history table const MAX_WORKERS = Math.max(1, Math.floor(os.cpus().length / 2)); // Use half of available CPU cores const USE_PARALLEL = false; // Set to true to enable parallel processing const PG_STATEMENT_TIMEOUT_MS = 1800000; // 30 minutes max per query // --- Cancellation Handling --- let isCancelled = false; let runningQueryPromise = null; // To potentially track the active query function requestCancellation() { if (!isCancelled) { isCancelled = true; console.warn('\nCancellation requested. Finishing current batch then stopping...'); // Note: We are NOT forcefully cancelling the backend query anymore. } } process.on('SIGINT', requestCancellation); // Handle Ctrl+C process.on('SIGTERM', requestCancellation); // Handle termination signals // --- Main Backfill Function --- async function backfillSnapshots(cmdStartDate, cmdEndDate, cmdStartBatch = 1) { let connection; const overallStartTime = Date.now(); let calculateHistoryId = null; let processedDaysTotal = 0; // Track total days processed across all batches executed in this run let currentBatchNum = cmdStartBatch > 0 ? cmdStartBatch : 1; let totalBatches = 0; // Initialize totalBatches let totalDays = 0; // Initialize totalDays console.log(`Starting snapshot backfill process...`); console.log(`SQL Function definition file: ${SQL_FUNCTION_FILE}`); if (!fs.existsSync(SQL_FUNCTION_FILE)) { console.error(`FATAL: SQL file not found at ${SQL_FUNCTION_FILE}`); process.exit(1); // Exit early if file doesn't exist } try { // Set up a connection with higher memory limits connection = await getConnection({ // Add performance-related settings application_name: 'backfill_snapshots', statement_timeout: PG_STATEMENT_TIMEOUT_MS, // 30 min timeout per statement // These parameters may need to be configured in your database: // work_mem: '1GB', // maintenance_work_mem: '2GB', // temp_buffers: '1GB', }); console.log('Database connection acquired.'); // --- Ensure Function Exists --- console.log('Ensuring database function is up-to-date...'); try { const sqlFunctionDef = fs.readFileSync(SQL_FUNCTION_FILE, 'utf8'); if (!sqlFunctionDef.includes('CREATE OR REPLACE FUNCTION backfill_daily_snapshots_range_final')) { throw new Error(`SQL file ${SQL_FUNCTION_FILE} does not seem to contain the function definition.`); } await connection.query(sqlFunctionDef); // Execute the whole file console.log('Database function `backfill_daily_snapshots_range_final` created/updated.'); // Add performance query hints to the database await connection.query(` -- Analyze tables for better query planning ANALYZE public.products; ANALYZE public.imported_daily_inventory; ANALYZE public.imported_product_stat_history; ANALYZE public.daily_product_snapshots; ANALYZE public.imported_product_current_prices; `).catch(err => { // Non-fatal if analyze fails console.warn('Failed to analyze tables (non-fatal):', err.message); }); } catch (err) { console.error(`Error processing SQL function file ${SQL_FUNCTION_FILE}:`, err); throw new Error(`Failed to create or replace DB function: ${err.message}`); } // --- Prepare History Record --- console.log('Preparing calculation history record...'); // Ensure history table exists (optional, could be done elsewhere) await connection.query(` CREATE TABLE IF NOT EXISTS public.calculate_history ( id SERIAL PRIMARY KEY, start_time TIMESTAMPTZ NOT NULL DEFAULT NOW(), end_time TIMESTAMPTZ, duration_seconds INTEGER, status VARCHAR(20) NOT NULL, -- e.g., 'running', 'completed', 'failed', 'cancelled' error_message TEXT, additional_info JSONB -- Store type, file, batch info etc. ); `); // Mark previous runs of this type as potentially failed if they were left 'running' await connection.query(` UPDATE public.calculate_history SET status = 'failed', error_message = 'Interrupted by new run.' WHERE status = 'running' AND additional_info->>'type' = $1; `, [HISTORY_TYPE]); // Create new history record const historyResult = await connection.query(` INSERT INTO public.calculate_history (start_time, status, additional_info) VALUES (NOW(), 'running', jsonb_build_object('type', $1::text, 'sql_file', $2::text, 'start_batch', $3::integer)) RETURNING id; `, [HISTORY_TYPE, path.basename(SQL_FUNCTION_FILE), cmdStartBatch]); calculateHistoryId = historyResult.rows[0].id; console.log(`Calculation history record created with ID: ${calculateHistoryId}`); // --- Determine Date Range --- console.log('Determining date range...'); let effectiveStartDate, effectiveEndDate; // Use command-line dates if provided, otherwise query DB if (cmdStartDate) { effectiveStartDate = cmdStartDate; } else { const minDateResult = await connection.query(` SELECT LEAST( COALESCE((SELECT MIN(date) FROM public.imported_daily_inventory WHERE date > '1970-01-01'), CURRENT_DATE), COALESCE((SELECT MIN(date) FROM public.imported_product_stat_history WHERE date > '1970-01-01'), CURRENT_DATE) )::date as min_date; `); effectiveStartDate = minDateResult.rows[0]?.min_date || new Date().toISOString().split('T')[0]; // Fallback console.log(`Auto-detected start date: ${effectiveStartDate}`); } if (cmdEndDate) { effectiveEndDate = cmdEndDate; } else { const maxDateResult = await connection.query(` SELECT GREATEST( COALESCE((SELECT MAX(date) FROM public.imported_daily_inventory WHERE date < CURRENT_DATE), '1970-01-01'::date), COALESCE((SELECT MAX(date) FROM public.imported_product_stat_history WHERE date < CURRENT_DATE), '1970-01-01'::date) )::date as max_date; `); // Ensure end date is not today or in the future effectiveEndDate = maxDateResult.rows[0]?.max_date || new Date(Date.now() - 86400000).toISOString().split('T')[0]; // Default yesterday if (new Date(effectiveEndDate) >= new Date(new Date().toISOString().split('T')[0])) { effectiveEndDate = new Date(Date.now() - 86400000).toISOString().split('T')[0]; // Set to yesterday if >= today } console.log(`Auto-detected end date: ${effectiveEndDate}`); } // Validate dates const dStart = new Date(effectiveStartDate); const dEnd = new Date(effectiveEndDate); if (isNaN(dStart.getTime()) || isNaN(dEnd.getTime()) || dStart > dEnd) { throw new Error(`Invalid date range: Start "${effectiveStartDate}", End "${effectiveEndDate}"`); } // --- Batch Processing --- totalDays = Math.ceil((dEnd - dStart) / (1000 * 60 * 60 * 24)) + 1; // Inclusive totalBatches = Math.ceil(totalDays / BATCH_SIZE_DAYS); console.log(`Target Date Range: ${effectiveStartDate} to ${effectiveEndDate} (${totalDays} days)`); console.log(`Total Batches: ${totalBatches} (Batch Size: ${BATCH_SIZE_DAYS} days)`); console.log(`Starting from Batch: ${currentBatchNum}`); // Initial progress update progress.outputProgress({ status: 'running', operation: 'Starting Batch Processing', currentBatch: currentBatchNum, totalBatches: totalBatches, totalDays: totalDays, elapsed: '0s', remaining: 'Calculating...', rate: 0, historyId: calculateHistoryId // Include history ID in the object }); while (currentBatchNum <= totalBatches && !isCancelled) { const batchOffset = (currentBatchNum - 1) * BATCH_SIZE_DAYS; const batchStartDate = new Date(dStart); batchStartDate.setDate(dStart.getDate() + batchOffset); const batchEndDate = new Date(batchStartDate); batchEndDate.setDate(batchStartDate.getDate() + BATCH_SIZE_DAYS - 1); // Clamp batch end date to the overall effective end date if (batchEndDate > dEnd) { batchEndDate.setTime(dEnd.getTime()); } const batchStartDateStr = batchStartDate.toISOString().split('T')[0]; const batchEndDateStr = batchEndDate.toISOString().split('T')[0]; const batchStartTime = Date.now(); console.log(`\n--- Processing Batch ${currentBatchNum} / ${totalBatches} ---`); console.log(` Dates: ${batchStartDateStr} to ${batchEndDateStr}`); // Execute the function for the batch try { progress.outputProgress({ status: 'running', operation: `Executing DB function for batch ${currentBatchNum}...`, currentBatch: currentBatchNum, totalBatches: totalBatches, totalDays: totalDays, elapsed: progress.formatElapsedTime(overallStartTime), remaining: 'Executing...', rate: 0, historyId: calculateHistoryId }); // Performance improvement: Add batch processing hint await connection.query('SET LOCAL enable_parallel_append = on; SET LOCAL enable_parallel_hash = on; SET LOCAL max_parallel_workers_per_gather = 4;'); // Store promise in case we need to try and cancel (though not implemented forcefully) runningQueryPromise = connection.query( `SELECT backfill_daily_snapshots_range_final($1::date, $2::date);`, [batchStartDateStr, batchEndDateStr] ); await runningQueryPromise; // Wait for the function call to complete runningQueryPromise = null; // Clear the promise const batchDurationMs = Date.now() - batchStartTime; const daysInThisBatch = Math.ceil((batchEndDate - batchStartDate) / (1000 * 60 * 60 * 24)) + 1; processedDaysTotal += daysInThisBatch; console.log(` Batch ${currentBatchNum} completed in ${progress.formatElapsedTime(batchStartTime)}.`); // --- Update Progress & History --- const overallElapsedSec = Math.round((Date.now() - overallStartTime) / 1000); progress.outputProgress({ status: 'running', operation: `Completed batch ${currentBatchNum}`, currentBatch: currentBatchNum, totalBatches: totalBatches, totalDays: totalDays, processedDays: processedDaysTotal, elapsed: progress.formatElapsedTime(overallStartTime), remaining: progress.estimateRemaining(overallStartTime, processedDaysTotal, totalDays), rate: progress.calculateRate(overallStartTime, processedDaysTotal), batchDuration: progress.formatElapsedTime(batchStartTime), historyId: calculateHistoryId }); // Save checkpoint in history await connection.query(` UPDATE public.calculate_history SET additional_info = jsonb_set(additional_info, '{last_completed_batch}', $1::jsonb) || jsonb_build_object('last_processed_date', $2::text) WHERE id = $3::integer; `, [JSON.stringify(currentBatchNum), batchEndDateStr, calculateHistoryId]); } catch (batchError) { console.error(`\n--- ERROR in Batch ${currentBatchNum} (${batchStartDateStr} to ${batchEndDateStr}) ---`); console.error(' Database Error:', batchError.message); console.error(' DB Error Code:', batchError.code); // Log detailed error to history and re-throw to stop the process await connection.query(` UPDATE public.calculate_history SET status = 'failed', end_time = NOW(), duration_seconds = $1::integer, error_message = $2::text, additional_info = additional_info || jsonb_build_object('failed_batch', $3::integer, 'failed_date_range', $4::text) WHERE id = $5::integer; `, [ Math.round((Date.now() - overallStartTime) / 1000), `Batch ${currentBatchNum} failed: ${batchError.message} (Code: ${batchError.code || 'N/A'})`, currentBatchNum, `${batchStartDateStr} to ${batchEndDateStr}`, calculateHistoryId ]); throw batchError; // Stop execution } currentBatchNum++; // Optional delay between batches // await new Promise(resolve => setTimeout(resolve, 500)); } // End while loop // --- Final Outcome --- const finalStatus = isCancelled ? 'cancelled' : 'completed'; const finalMessage = isCancelled ? `Calculation stopped after completing batch ${currentBatchNum - 1}.` : 'Historical snapshots backfill completed successfully.'; const finalDurationSec = Math.round((Date.now() - overallStartTime) / 1000); console.log(`\n--- Backfill ${finalStatus.toUpperCase()} ---`); console.log(finalMessage); console.log(`Total duration: ${progress.formatElapsedTime(overallStartTime)}`); // Update history record await connection.query(` UPDATE public.calculate_history SET status = $1::calculation_status, end_time = NOW(), duration_seconds = $2::integer, error_message = $3 WHERE id = $4::integer; `, [finalStatus, finalDurationSec, (isCancelled ? 'User cancelled' : null), calculateHistoryId]); if (!isCancelled) { progress.clearProgress(); // Clear progress state only on successful completion } else { progress.outputProgress({ // Final cancelled status update status: 'cancelled', operation: finalMessage, currentBatch: currentBatchNum - 1, totalBatches: totalBatches, totalDays: totalDays, processedDays: processedDaysTotal, elapsed: progress.formatElapsedTime(overallStartTime), remaining: 'Cancelled', rate: 0, historyId: calculateHistoryId }); } return { success: true, status: finalStatus, message: finalMessage, duration: finalDurationSec }; } catch (error) { console.error('\n--- Backfill encountered an unrecoverable error ---'); console.error(error.message); const finalDurationSec = Math.round((Date.now() - overallStartTime) / 1000); // Update history if possible if (connection && calculateHistoryId) { try { await connection.query(` UPDATE public.calculate_history SET status = $1::calculation_status, end_time = NOW(), duration_seconds = $2::integer, error_message = $3::text WHERE id = $4::integer; `, [ isCancelled ? 'cancelled' : 'failed', finalDurationSec, error.message, calculateHistoryId ]); } catch (histError) { console.error("Failed to update history record with error state:", histError); } } else { console.error("Could not update history record (no ID or connection)."); } // FIX: Use initialized value or a default if loop never started const batchNumForError = currentBatchNum > cmdStartBatch ? currentBatchNum - 1 : cmdStartBatch - 1; // Update progress.outputProgress call to match actual function signature try { // Create progress data object const progressData = { status: 'failed', operation: 'Backfill failed', message: error.message, currentBatch: batchNumForError, totalBatches: totalBatches, totalDays: totalDays, processedDays: processedDaysTotal, elapsed: progress.formatElapsedTime(overallStartTime), remaining: 'Failed', rate: 0, // Include history ID in progress data if needed historyId: calculateHistoryId }; // Call with single object parameter (not separate historyId) progress.outputProgress(progressData); } catch (progressError) { console.error('Failed to report progress:', progressError); } return { success: false, status: 'failed', error: error.message, duration: finalDurationSec }; } finally { if (connection) { console.log('Releasing database connection.'); connection.release(); } // Close pool only if this script is meant to be standalone // If part of a larger app, the app should manage pool closure // console.log('Closing database pool.'); // await closePool(); } } // --- Script Execution --- // Parse command-line arguments const args = process.argv.slice(2); let cmdStartDateArg, cmdEndDateArg, cmdStartBatchArg = 1; // Default start batch is 1 for (let i = 0; i < args.length; i++) { if (args[i] === '--start-date' && args[i+1]) cmdStartDateArg = args[++i]; else if (args[i] === '--end-date' && args[i+1]) cmdEndDateArg = args[++i]; else if (args[i] === '--start-batch' && args[i+1]) cmdStartBatchArg = parseInt(args[++i], 10); } if (isNaN(cmdStartBatchArg) || cmdStartBatchArg < 1) { console.warn(`Invalid --start-batch value. Defaulting to 1.`); cmdStartBatchArg = 1; } // Run the backfill process backfillSnapshots(cmdStartDateArg, cmdEndDateArg, cmdStartBatchArg) .then(result => { if (result.success) { console.log(`\n✅ ${result.message} (Duration: ${result.duration}s)`); process.exitCode = 0; // Success } else { console.error(`\n❌ Backfill failed: ${result.error || 'Unknown error'} (Duration: ${result.duration}s)`); process.exitCode = 1; // Failure } }) .catch(err => { console.error('\n❌ Unexpected error during backfill execution:', err); process.exitCode = 1; // Failure }) .finally(async () => { // Ensure pool is closed if run standalone console.log('Backfill script finished. Closing pool.'); await closePool(); // Make sure closePool exists and works in your db utils process.exit(process.exitCode); // Exit with appropriate code });