const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress'); const { getConnection } = require('./utils/db'); async function calculateSalesForecasts(startTime, totalProducts, processedCount = 0, isCancelled = false) { const connection = await getConnection(); let success = false; let processedOrders = 0; try { if (isCancelled) { outputProgress({ status: 'cancelled', operation: 'Sales forecasts calculation cancelled', current: processedCount, total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: null, rate: calculateRate(startTime, processedCount), percentage: ((processedCount / totalProducts) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); return { processedProducts: processedCount, processedOrders: 0, processedPurchaseOrders: 0, success }; } // Get order count that will be processed const orderCount = await connection.query(` SELECT COUNT(*) as count FROM orders o WHERE o.canceled = false AND o.date >= CURRENT_DATE - INTERVAL '90 days' `); processedOrders = parseInt(orderCount.rows[0].count); outputProgress({ status: 'running', operation: 'Starting sales forecasts calculation', current: processedCount, total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, processedCount, totalProducts), rate: calculateRate(startTime, processedCount), percentage: ((processedCount / totalProducts) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); // First, create a temporary table for forecast dates await connection.query(` CREATE TEMPORARY TABLE IF NOT EXISTS temp_forecast_dates ( forecast_date DATE, day_of_week INT, month INT, PRIMARY KEY (forecast_date) ) `); await connection.query(` INSERT INTO temp_forecast_dates SELECT CURRENT_DATE + (n || ' days')::INTERVAL as forecast_date, EXTRACT(DOW FROM CURRENT_DATE + (n || ' days')::INTERVAL) + 1 as day_of_week, EXTRACT(MONTH FROM CURRENT_DATE + (n || ' days')::INTERVAL) as month FROM ( SELECT a.n + b.n * 10 as n FROM (SELECT 0 as n UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4 UNION SELECT 5 UNION SELECT 6 UNION SELECT 7 UNION SELECT 8 UNION SELECT 9) a, (SELECT 0 as n UNION SELECT 1 UNION SELECT 2) b ORDER BY n LIMIT 31 ) numbers `); processedCount = Math.floor(totalProducts * 0.92); outputProgress({ status: 'running', operation: 'Forecast dates prepared, calculating daily sales stats', current: processedCount, total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, processedCount, totalProducts), rate: calculateRate(startTime, processedCount), percentage: ((processedCount / totalProducts) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); if (isCancelled) return { processedProducts: processedCount, processedOrders, processedPurchaseOrders: 0, success }; // Create temporary table for daily sales stats await connection.query(` CREATE TEMPORARY TABLE temp_daily_sales AS SELECT o.pid, EXTRACT(DOW FROM o.date) + 1 as day_of_week, SUM(o.quantity) as daily_quantity, SUM(o.price * o.quantity) as daily_revenue, COUNT(DISTINCT DATE(o.date)) as day_count FROM orders o WHERE o.canceled = false AND o.date >= CURRENT_DATE - INTERVAL '90 days' GROUP BY o.pid, EXTRACT(DOW FROM o.date) + 1 `); processedCount = Math.floor(totalProducts * 0.94); outputProgress({ status: 'running', operation: 'Daily sales stats calculated, preparing product stats', current: processedCount, total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, processedCount, totalProducts), rate: calculateRate(startTime, processedCount), percentage: ((processedCount / totalProducts) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); if (isCancelled) return { processedProducts: processedCount, processedOrders, processedPurchaseOrders: 0, success }; // Create temporary table for product stats await connection.query(` CREATE TEMPORARY TABLE temp_product_stats AS SELECT pid, AVG(daily_revenue) as overall_avg_revenue, SUM(day_count) as total_days FROM temp_daily_sales GROUP BY pid `); processedCount = Math.floor(totalProducts * 0.96); outputProgress({ status: 'running', operation: 'Product stats prepared, calculating product-level forecasts', current: processedCount, total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, processedCount, totalProducts), rate: calculateRate(startTime, processedCount), percentage: ((processedCount / totalProducts) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); if (isCancelled) return { processedProducts: processedCount, processedOrders, processedPurchaseOrders: 0, success }; // Calculate product-level forecasts await connection.query(` INSERT INTO sales_forecasts ( pid, forecast_date, forecast_quantity, confidence_level, created_at ) WITH daily_stats AS ( SELECT ds.pid, AVG(ds.daily_quantity) as avg_daily_qty, STDDEV(ds.daily_quantity) as std_daily_qty, COUNT(DISTINCT ds.day_count) as data_points, SUM(ds.day_count) as total_days, AVG(ds.daily_revenue) as avg_daily_revenue, STDDEV(ds.daily_revenue) as std_daily_revenue, MIN(ds.daily_quantity) as min_daily_qty, MAX(ds.daily_quantity) as max_daily_qty, -- Calculate variance without using LAG COALESCE( STDDEV(ds.daily_quantity) / NULLIF(AVG(ds.daily_quantity), 0), 0 ) as daily_variance_ratio FROM temp_daily_sales ds GROUP BY ds.pid HAVING AVG(ds.daily_quantity) > 0 ) SELECT ds.pid, fd.forecast_date, GREATEST(0, ROUND( ds.avg_daily_qty * (1 + COALESCE(sf.seasonality_factor, 0)) ) ) as forecast_quantity, CASE WHEN ds.total_days >= 60 AND ds.daily_variance_ratio < 0.5 THEN 90 WHEN ds.total_days >= 60 THEN 85 WHEN ds.total_days >= 30 AND ds.daily_variance_ratio < 0.5 THEN 80 WHEN ds.total_days >= 30 THEN 75 WHEN ds.total_days >= 14 AND ds.daily_variance_ratio < 0.5 THEN 70 WHEN ds.total_days >= 14 THEN 65 ELSE 60 END as confidence_level, NOW() as created_at FROM daily_stats ds JOIN temp_product_stats ps ON ds.pid = ps.pid CROSS JOIN temp_forecast_dates fd LEFT JOIN sales_seasonality sf ON fd.month = sf.month GROUP BY ds.pid, fd.forecast_date, ps.overall_avg_revenue, sf.seasonality_factor, ds.avg_daily_qty, ds.std_daily_qty, ds.avg_daily_qty, ds.total_days, ds.daily_variance_ratio ON CONFLICT (pid, forecast_date) DO UPDATE SET forecast_quantity = EXCLUDED.forecast_quantity, confidence_level = EXCLUDED.confidence_level, created_at = NOW() `); processedCount = Math.floor(totalProducts * 0.98); outputProgress({ status: 'running', operation: 'Product forecasts calculated, preparing category stats', current: processedCount, total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, processedCount, totalProducts), rate: calculateRate(startTime, processedCount), percentage: ((processedCount / totalProducts) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); if (isCancelled) return { processedProducts: processedCount, processedOrders, processedPurchaseOrders: 0, success }; // Create temporary table for category stats await connection.query(` CREATE TEMPORARY TABLE temp_category_sales AS SELECT pc.cat_id, EXTRACT(DOW FROM o.date) + 1 as day_of_week, SUM(o.quantity) as daily_quantity, SUM(o.price * o.quantity) as daily_revenue, COUNT(DISTINCT DATE(o.date)) as day_count FROM orders o JOIN product_categories pc ON o.pid = pc.pid WHERE o.canceled = false AND o.date >= CURRENT_DATE - INTERVAL '90 days' GROUP BY pc.cat_id, EXTRACT(DOW FROM o.date) + 1 `); await connection.query(` CREATE TEMPORARY TABLE temp_category_stats AS SELECT cat_id, AVG(daily_revenue) as overall_avg_revenue, SUM(day_count) as total_days FROM temp_category_sales GROUP BY cat_id `); processedCount = Math.floor(totalProducts * 0.99); outputProgress({ status: 'running', operation: 'Category stats prepared, calculating category-level forecasts', current: processedCount, total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, processedCount, totalProducts), rate: calculateRate(startTime, processedCount), percentage: ((processedCount / totalProducts) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); if (isCancelled) return { processedProducts: processedCount, processedOrders, processedPurchaseOrders: 0, success }; // Calculate category-level forecasts await connection.query(` INSERT INTO category_forecasts ( category_id, forecast_date, forecast_units, forecast_revenue, confidence_level, created_at ) SELECT cs.cat_id::bigint as category_id, fd.forecast_date, GREATEST(0, ROUND(AVG(cs.daily_quantity) * (1 + COALESCE(sf.seasonality_factor, 0))) ) as forecast_units, GREATEST(0, COALESCE( CASE WHEN SUM(cs.day_count) >= 4 THEN AVG(cs.daily_revenue) ELSE ct.overall_avg_revenue END * (1 + COALESCE(sf.seasonality_factor, 0)), 0 ) ) as forecast_revenue, CASE WHEN ct.total_days >= 60 THEN 90 WHEN ct.total_days >= 30 THEN 80 WHEN ct.total_days >= 14 THEN 70 ELSE 60 END as confidence_level, NOW() as created_at FROM temp_category_sales cs JOIN temp_category_stats ct ON cs.cat_id = ct.cat_id CROSS JOIN temp_forecast_dates fd LEFT JOIN sales_seasonality sf ON fd.month = sf.month GROUP BY cs.cat_id, fd.forecast_date, ct.overall_avg_revenue, ct.total_days, sf.seasonality_factor, sf.month HAVING AVG(cs.daily_quantity) > 0 ON CONFLICT (category_id, forecast_date) DO UPDATE SET forecast_units = EXCLUDED.forecast_units, forecast_revenue = EXCLUDED.forecast_revenue, confidence_level = EXCLUDED.confidence_level, created_at = NOW() `); // Clean up temporary tables await connection.query(` DROP TABLE IF EXISTS temp_forecast_dates; DROP TABLE IF EXISTS temp_daily_sales; DROP TABLE IF EXISTS temp_product_stats; DROP TABLE IF EXISTS temp_category_sales; DROP TABLE IF EXISTS temp_category_stats; `); processedCount = Math.floor(totalProducts * 1.0); outputProgress({ status: 'running', operation: 'Category forecasts calculated and temporary tables cleaned up', current: processedCount, total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, processedCount, totalProducts), rate: calculateRate(startTime, processedCount), percentage: ((processedCount / totalProducts) * 100).toFixed(1), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); // If we get here, everything completed successfully success = true; // Update calculate_status await connection.query(` INSERT INTO calculate_status (module_name, last_calculation_timestamp) VALUES ('sales_forecasts', NOW()) ON CONFLICT (module_name) DO UPDATE SET last_calculation_timestamp = NOW() `); return { processedProducts: processedCount, processedOrders, processedPurchaseOrders: 0, success }; } catch (error) { success = false; logError(error, 'Error calculating sales forecasts'); throw error; } finally { if (connection) { try { // Ensure temporary tables are cleaned up await connection.query(` DROP TABLE IF EXISTS temp_forecast_dates; DROP TABLE IF EXISTS temp_daily_sales; DROP TABLE IF EXISTS temp_product_stats; DROP TABLE IF EXISTS temp_category_sales; DROP TABLE IF EXISTS temp_category_stats; `); } catch (err) { console.error('Error cleaning up temporary tables:', err); } connection.release(); } } } module.exports = calculateSalesForecasts;