const path = require('path'); const fs = require('fs'); const os = require('os'); // For detecting CPU cores // Get the base directory (the directory containing the inventory-server folder) const baseDir = path.resolve(__dirname, '../../..'); // Load environment variables from the inventory-server directory require('dotenv').config({ path: path.resolve(__dirname, '../..', '.env') }); // Configure statement timeout (30 minutes) const PG_STATEMENT_TIMEOUT_MS = 1800000; // Add error handler for uncaught exceptions process.on('uncaughtException', (error) => { console.error('Uncaught Exception:', error); process.exit(1); }); // Add error handler for unhandled promise rejections process.on('unhandledRejection', (reason, promise) => { console.error('Unhandled Rejection at:', promise, 'reason:', reason); process.exit(1); }); // Load progress module const progress = require('../utils/progress'); // Store progress functions in global scope to ensure availability global.formatElapsedTime = progress.formatElapsedTime; global.estimateRemaining = progress.estimateRemaining; global.calculateRate = progress.calculateRate; global.outputProgress = progress.outputProgress; global.clearProgress = progress.clearProgress; global.getProgress = progress.getProgress; global.logError = progress.logError; // Load database module const { getConnection, closePool } = require('../utils/db'); // Add cancel handler let isCancelled = false; let runningQueryPromise = null; function cancelCalculation() { if (!isCancelled) { isCancelled = true; console.log('Calculation has been cancelled by user'); // Store the query promise to potentially cancel it const queryToCancel = runningQueryPromise; if (queryToCancel) { console.log('Attempting to cancel the running query...'); } // Force-terminate any query that's been running for more than 5 seconds try { const connection = getConnection(); connection.then(async (conn) => { try { // Identify and terminate long-running queries from our application await conn.query(` SELECT pg_cancel_backend(pid) FROM pg_stat_activity WHERE query_start < now() - interval '5 seconds' AND application_name = 'populate_metrics' AND query NOT LIKE '%pg_cancel_backend%' `); // Release connection conn.release(); } catch (err) { console.error('Error during force cancellation:', err); conn.release(); } }).catch(err => { console.error('Could not get connection for cancellation:', err); }); } catch (err) { console.error('Failed to terminate running queries:', err); } } return { success: true, message: 'Calculation has been cancelled' }; } // Handle SIGTERM signal for cancellation process.on('SIGTERM', cancelCalculation); process.on('SIGINT', cancelCalculation); const calculateInitialMetrics = (client, onProgress) => { return client.query(` -- Truncate the existing metrics tables to ensure clean data TRUNCATE TABLE public.daily_product_snapshots; TRUNCATE TABLE public.product_metrics; -- First let's create daily snapshots for all products with order activity WITH SalesData AS ( SELECT p.pid, p.sku, o.date::date AS order_date, -- Count orders to ensure we only include products with real activity COUNT(o.id) as order_count, -- Aggregate Sales (Quantity > 0, Status not Canceled/Returned) COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN o.quantity ELSE 0 END), 0) AS units_sold, COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN o.price * o.quantity ELSE 0 END), 0.00) AS gross_revenue_unadjusted, COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN o.discount ELSE 0 END), 0.00) AS discounts, COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN COALESCE(o.costeach, p.landing_cost_price, p.cost_price) * o.quantity ELSE 0 END), 0.00) AS cogs, COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN p.regular_price * o.quantity ELSE 0 END), 0.00) AS gross_regular_revenue, -- Aggregate Returns (Quantity < 0 or Status = Returned) COALESCE(SUM(CASE WHEN o.quantity < 0 OR COALESCE(o.status, 'pending') = 'returned' THEN ABS(o.quantity) ELSE 0 END), 0) AS units_returned, COALESCE(SUM(CASE WHEN o.quantity < 0 OR COALESCE(o.status, 'pending') = 'returned' THEN o.price * ABS(o.quantity) ELSE 0 END), 0.00) AS returns_revenue FROM public.products p LEFT JOIN public.orders o ON p.pid = o.pid GROUP BY p.pid, p.sku, o.date::date HAVING COUNT(o.id) > 0 -- Only include products with actual orders ), ReceivingData AS ( SELECT r.pid, r.received_date::date AS receiving_date, -- Count receiving documents to ensure we only include products with real activity COUNT(DISTINCT r.receiving_id) as receiving_count, -- Calculate received quantity for this day SUM(r.received_quantity) AS units_received, -- Calculate received cost for this day SUM(r.received_quantity * r.unit_cost) AS cost_received FROM public.receivings r GROUP BY r.pid, r.received_date::date HAVING COUNT(DISTINCT r.receiving_id) > 0 OR SUM(r.received_quantity) > 0 ), -- Get current stock quantities StockData AS ( SELECT p.pid, p.stock_quantity, COALESCE(p.landing_cost_price, p.cost_price, 0.00) as effective_cost_price, COALESCE(p.price, 0.00) as current_price, COALESCE(p.regular_price, 0.00) as current_regular_price FROM public.products p ), -- Combine sales and receiving dates to get all activity dates DatePidCombos AS ( SELECT DISTINCT pid, order_date AS activity_date FROM SalesData UNION SELECT DISTINCT pid, receiving_date FROM ReceivingData ), -- Insert daily snapshots for all product-date combinations SnapshotInsert AS ( INSERT INTO public.daily_product_snapshots ( snapshot_date, pid, sku, eod_stock_quantity, eod_stock_cost, eod_stock_retail, eod_stock_gross, stockout_flag, units_sold, units_returned, gross_revenue, discounts, returns_revenue, net_revenue, cogs, gross_regular_revenue, profit, units_received, cost_received, calculation_timestamp ) SELECT d.activity_date AS snapshot_date, d.pid, p.sku, -- Use current stock as approximation, since historical stock data is not available s.stock_quantity AS eod_stock_quantity, s.stock_quantity * s.effective_cost_price AS eod_stock_cost, s.stock_quantity * s.current_price AS eod_stock_retail, s.stock_quantity * s.current_regular_price AS eod_stock_gross, (s.stock_quantity <= 0) AS stockout_flag, -- Sales metrics COALESCE(sd.units_sold, 0), COALESCE(sd.units_returned, 0), COALESCE(sd.gross_revenue_unadjusted, 0.00), COALESCE(sd.discounts, 0.00), COALESCE(sd.returns_revenue, 0.00), COALESCE(sd.gross_revenue_unadjusted, 0.00) - COALESCE(sd.discounts, 0.00) AS net_revenue, COALESCE(sd.cogs, 0.00), COALESCE(sd.gross_regular_revenue, 0.00), (COALESCE(sd.gross_revenue_unadjusted, 0.00) - COALESCE(sd.discounts, 0.00)) - COALESCE(sd.cogs, 0.00) AS profit, -- Receiving metrics COALESCE(rd.units_received, 0), COALESCE(rd.cost_received, 0.00), now() -- calculation timestamp FROM DatePidCombos d JOIN public.products p ON d.pid = p.pid LEFT JOIN SalesData sd ON d.pid = sd.pid AND d.activity_date = sd.order_date LEFT JOIN ReceivingData rd ON d.pid = rd.pid AND d.activity_date = rd.receiving_date LEFT JOIN StockData s ON d.pid = s.pid RETURNING pid, snapshot_date ), -- Now build the aggregated product metrics from the daily snapshots MetricsInsert AS ( INSERT INTO public.product_metrics ( pid, sku, current_stock_quantity, current_stock_cost, current_stock_retail, current_stock_msrp, is_out_of_stock, total_units_sold, total_units_returned, return_rate, gross_revenue, total_discounts, total_returns, net_revenue, total_cogs, total_gross_revenue, total_profit, profit_margin, avg_daily_units, reorder_point, reorder_alert, days_of_supply, sales_velocity, sales_velocity_score, rank_by_revenue, rank_by_quantity, rank_by_profit, total_received_quantity, total_received_cost, last_sold_date, last_received_date, days_since_last_sale, days_since_last_received, calculation_timestamp ) SELECT p.pid, p.sku, p.stock_quantity AS current_stock_quantity, p.stock_quantity * COALESCE(p.landing_cost_price, p.cost_price, 0) AS current_stock_cost, p.stock_quantity * COALESCE(p.price, 0) AS current_stock_retail, p.stock_quantity * COALESCE(p.regular_price, 0) AS current_stock_msrp, (p.stock_quantity <= 0) AS is_out_of_stock, -- Aggregate metrics COALESCE(SUM(ds.units_sold), 0) AS total_units_sold, COALESCE(SUM(ds.units_returned), 0) AS total_units_returned, CASE WHEN COALESCE(SUM(ds.units_sold), 0) > 0 THEN COALESCE(SUM(ds.units_returned), 0)::float / NULLIF(COALESCE(SUM(ds.units_sold), 0), 0) ELSE 0 END AS return_rate, COALESCE(SUM(ds.gross_revenue), 0) AS gross_revenue, COALESCE(SUM(ds.discounts), 0) AS total_discounts, COALESCE(SUM(ds.returns_revenue), 0) AS total_returns, COALESCE(SUM(ds.net_revenue), 0) AS net_revenue, COALESCE(SUM(ds.cogs), 0) AS total_cogs, COALESCE(SUM(ds.gross_regular_revenue), 0) AS total_gross_revenue, COALESCE(SUM(ds.profit), 0) AS total_profit, CASE WHEN COALESCE(SUM(ds.net_revenue), 0) > 0 THEN COALESCE(SUM(ds.profit), 0) / NULLIF(COALESCE(SUM(ds.net_revenue), 0), 0) ELSE 0 END AS profit_margin, -- Calculate average daily units COALESCE(AVG(ds.units_sold), 0) AS avg_daily_units, -- Calculate reorder point (simplified, can be enhanced with lead time and safety stock) CEILING(COALESCE(AVG(ds.units_sold) * 14, 0)) AS reorder_point, (p.stock_quantity <= CEILING(COALESCE(AVG(ds.units_sold) * 14, 0))) AS reorder_alert, -- Days of supply based on average daily sales CASE WHEN COALESCE(AVG(ds.units_sold), 0) > 0 THEN p.stock_quantity / NULLIF(COALESCE(AVG(ds.units_sold), 0), 0) ELSE NULL END AS days_of_supply, -- Sales velocity (average units sold per day over last 30 days) (SELECT COALESCE(AVG(recent.units_sold), 0) FROM public.daily_product_snapshots recent WHERE recent.pid = p.pid AND recent.snapshot_date >= CURRENT_DATE - INTERVAL '30 days' ) AS sales_velocity, -- Placeholder for sales velocity score (can be calculated based on velocity) 0 AS sales_velocity_score, -- Will be updated later by ranking procedure 0 AS rank_by_revenue, 0 AS rank_by_quantity, 0 AS rank_by_profit, -- Receiving data COALESCE(SUM(ds.units_received), 0) AS total_received_quantity, COALESCE(SUM(ds.cost_received), 0) AS total_received_cost, -- Date metrics (SELECT MAX(sd.snapshot_date) FROM public.daily_product_snapshots sd WHERE sd.pid = p.pid AND sd.units_sold > 0 ) AS last_sold_date, (SELECT MAX(rd.snapshot_date) FROM public.daily_product_snapshots rd WHERE rd.pid = p.pid AND rd.units_received > 0 ) AS last_received_date, -- Calculate days since last sale/received CASE WHEN (SELECT MAX(sd.snapshot_date) FROM public.daily_product_snapshots sd WHERE sd.pid = p.pid AND sd.units_sold > 0) IS NOT NULL THEN (CURRENT_DATE - (SELECT MAX(sd.snapshot_date) FROM public.daily_product_snapshots sd WHERE sd.pid = p.pid AND sd.units_sold > 0))::integer ELSE NULL END AS days_since_last_sale, CASE WHEN (SELECT MAX(rd.snapshot_date) FROM public.daily_product_snapshots rd WHERE rd.pid = p.pid AND rd.units_received > 0) IS NOT NULL THEN (CURRENT_DATE - (SELECT MAX(rd.snapshot_date) FROM public.daily_product_snapshots rd WHERE rd.pid = p.pid AND rd.units_received > 0))::integer ELSE NULL END AS days_since_last_received, now() -- calculation timestamp FROM public.products p LEFT JOIN public.daily_product_snapshots ds ON p.pid = ds.pid GROUP BY p.pid, p.sku, p.stock_quantity, p.landing_cost_price, p.cost_price, p.price, p.regular_price ) -- Update the calculate_status table INSERT INTO public.calculate_status (module_name, last_calculation_timestamp) VALUES ('daily_snapshots', now()), ('product_metrics', now()) ON CONFLICT (module_name) DO UPDATE SET last_calculation_timestamp = now(); -- Finally, update the ranks for products UPDATE public.product_metrics pm SET rank_by_revenue = rev_ranks.rank FROM ( SELECT pid, RANK() OVER (ORDER BY net_revenue DESC) AS rank FROM public.product_metrics WHERE net_revenue > 0 ) rev_ranks WHERE pm.pid = rev_ranks.pid; UPDATE public.product_metrics pm SET rank_by_quantity = qty_ranks.rank FROM ( SELECT pid, RANK() OVER (ORDER BY total_units_sold DESC) AS rank FROM public.product_metrics WHERE total_units_sold > 0 ) qty_ranks WHERE pm.pid = qty_ranks.pid; UPDATE public.product_metrics pm SET rank_by_profit = profit_ranks.rank FROM ( SELECT pid, RANK() OVER (ORDER BY total_profit DESC) AS rank FROM public.product_metrics WHERE total_profit > 0 ) profit_ranks WHERE pm.pid = profit_ranks.pid; -- Return count of products with metrics SELECT COUNT(*) AS product_count FROM public.product_metrics `); }; async function populateInitialMetrics() { let connection; const startTime = Date.now(); let calculateHistoryId; try { // Clean up any previously running calculations connection = await getConnection({ // Add performance-related settings application_name: 'populate_metrics', statement_timeout: PG_STATEMENT_TIMEOUT_MS, // 30 min timeout per statement }); // Ensure the calculate_status table exists and has the correct structure await connection.query(` CREATE TABLE IF NOT EXISTS calculate_status ( module_name TEXT PRIMARY KEY, last_calculation_timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP ) `); await connection.query(` UPDATE calculate_history SET status = 'cancelled', end_time = NOW(), duration_seconds = EXTRACT(EPOCH FROM (NOW() - start_time))::INTEGER, error_message = 'Previous calculation was not completed properly' WHERE status = 'running' AND additional_info->>'type' = 'populate_initial_metrics' `); // Create history record for this calculation const historyResult = await connection.query(` INSERT INTO calculate_history ( start_time, status, additional_info ) VALUES ( NOW(), 'running', jsonb_build_object( 'type', 'populate_initial_metrics', 'sql_file', 'populate_initial_product_metrics.sql' ) ) RETURNING id `); calculateHistoryId = historyResult.rows[0].id; // Initialize progress global.outputProgress({ status: 'running', operation: 'Starting initial product metrics population', current: 0, total: 100, elapsed: '0s', remaining: 'Calculating... (this may take a while)', rate: 0, percentage: '0', timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) }, historyId: calculateHistoryId }); // Prepare the database - analyze tables global.outputProgress({ status: 'running', operation: 'Analyzing database tables for better query performance', current: 2, total: 100, elapsed: global.formatElapsedTime(startTime), remaining: 'Analyzing...', rate: 0, percentage: '2', timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) }, historyId: calculateHistoryId }); // Enable better query planning and parallel operations await connection.query(` -- Analyze tables for better query planning ANALYZE public.products; ANALYZE public.purchase_orders; ANALYZE public.daily_product_snapshots; ANALYZE public.orders; -- Enable parallel operations SET LOCAL enable_parallel_append = on; SET LOCAL enable_parallel_hash = on; SET LOCAL max_parallel_workers_per_gather = 4; -- Larger work memory for complex sorts/joins SET LOCAL work_mem = '128MB'; `).catch(err => { // Non-fatal if analyze fails console.warn('Failed to analyze tables (non-fatal):', err.message); }); // Execute the SQL query global.outputProgress({ status: 'running', operation: 'Executing initial metrics SQL query', current: 5, total: 100, elapsed: global.formatElapsedTime(startTime), remaining: 'Calculating... (this could take several hours with 150M+ records)', rate: 0, percentage: '5', timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) }, historyId: calculateHistoryId }); // Read the SQL file const sqlFilePath = path.resolve(__dirname, 'populate_initial_product_metrics.sql'); console.log('Base directory:', baseDir); console.log('Script directory:', __dirname); console.log('SQL file path:', sqlFilePath); console.log('Current working directory:', process.cwd()); if (!fs.existsSync(sqlFilePath)) { throw new Error(`SQL file not found at ${sqlFilePath}`); } // Read and clean up the SQL (Slightly more robust cleaning) const sqlQuery = fs.readFileSync(sqlFilePath, 'utf8') .replace(/\r\n/g, '\n') // Handle Windows endings .replace(/\r/g, '\n') // Handle old Mac endings .trim(); // Remove leading/trailing whitespace VERY IMPORTANT // Log details again AFTER cleaning console.log('SQL Query length (cleaned):', sqlQuery.length); console.log('SQL Query structure validation:'); console.log('- Contains DO block:', sqlQuery.includes('DO $$') || sqlQuery.includes('DO $')); // Check both types of tag start console.log('- Contains BEGIN:', sqlQuery.includes('BEGIN')); console.log('- Contains END:', sqlQuery.includes('END $$;') || sqlQuery.includes('END $')); // Check both types of tag end console.log('- First 50 chars:', JSON.stringify(sqlQuery.slice(0, 50))); console.log('- Last 100 chars (cleaned):', JSON.stringify(sqlQuery.slice(-100))); // Final check to ensure clean SQL ending if (!sqlQuery.endsWith('END $$;')) { console.warn('WARNING: SQL does not end with "END $$;". This might cause issues.'); console.log('Exact ending:', JSON.stringify(sqlQuery.slice(-20))); } // Execute the script console.log('Starting initial product metrics population...'); // Track the query promise for potential cancellation runningQueryPromise = connection.query({ text: sqlQuery, rowMode: 'array' }); await runningQueryPromise; runningQueryPromise = null; // Update progress to 100% global.outputProgress({ status: 'complete', operation: 'Initial product metrics population complete', current: 100, total: 100, elapsed: global.formatElapsedTime(startTime), remaining: '0s', rate: 0, percentage: '100', timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) }, historyId: calculateHistoryId }); // Update history with completion await connection.query(` UPDATE calculate_history SET end_time = NOW(), duration_seconds = $1, status = 'completed' WHERE id = $2 `, [Math.round((Date.now() - startTime) / 1000), calculateHistoryId]); // Clear progress file on successful completion global.clearProgress(); return { success: true, message: 'Initial product metrics population completed successfully', duration: Math.round((Date.now() - startTime) / 1000) }; } catch (error) { const endTime = Date.now(); const totalElapsedSeconds = Math.round((endTime - startTime) / 1000); // Enhanced error logging console.error('Error details:', { message: error.message, code: error.code, hint: error.hint, position: error.position, detail: error.detail, where: error.where ? error.where.substring(0, 500) + '...' : undefined, // Truncate to avoid huge logs severity: error.severity, file: error.file, line: error.line, routine: error.routine }); // Update history with error if (connection && calculateHistoryId) { await connection.query(` UPDATE calculate_history SET end_time = NOW(), duration_seconds = $1, status = $2, error_message = $3 WHERE id = $4 `, [ totalElapsedSeconds, isCancelled ? 'cancelled' : 'failed', error.message, calculateHistoryId ]); } if (isCancelled) { global.outputProgress({ status: 'cancelled', operation: 'Calculation cancelled', current: 50, total: 100, elapsed: global.formatElapsedTime(startTime), remaining: null, rate: 0, percentage: '50', timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: totalElapsedSeconds }, historyId: calculateHistoryId }); } else { global.outputProgress({ status: 'error', operation: 'Error during initial product metrics population', message: error.message, current: 0, total: 100, elapsed: global.formatElapsedTime(startTime), remaining: null, rate: 0, percentage: '0', timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: totalElapsedSeconds }, historyId: calculateHistoryId }); } console.error('Error during initial product metrics population:', error); return { success: false, error: error.message, duration: totalElapsedSeconds }; } finally { if (connection) { connection.release(); } await closePool(); } } // Start population process populateInitialMetrics() .then(result => { if (result.success) { console.log(`Initial product metrics population completed successfully in ${result.duration} seconds`); process.exit(0); } else { console.error(`Initial product metrics population failed: ${result.error}`); process.exit(1); } }) .catch(err => { console.error('Unexpected error:', err); process.exit(1); });