440 lines
17 KiB
JavaScript
440 lines
17 KiB
JavaScript
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress');
|
|
const { getConnection } = require('./utils/db');
|
|
|
|
async function calculateSalesForecasts(startTime, totalProducts, processedCount = 0, isCancelled = false) {
|
|
const connection = await getConnection();
|
|
let success = false;
|
|
let processedOrders = 0;
|
|
|
|
try {
|
|
if (isCancelled) {
|
|
outputProgress({
|
|
status: 'cancelled',
|
|
operation: 'Sales forecasts calculation cancelled',
|
|
current: processedCount,
|
|
total: totalProducts,
|
|
elapsed: formatElapsedTime(startTime),
|
|
remaining: null,
|
|
rate: calculateRate(startTime, processedCount),
|
|
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
|
timing: {
|
|
start_time: new Date(startTime).toISOString(),
|
|
end_time: new Date().toISOString(),
|
|
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
|
}
|
|
});
|
|
return {
|
|
processedProducts: processedCount,
|
|
processedOrders: 0,
|
|
processedPurchaseOrders: 0,
|
|
success
|
|
};
|
|
}
|
|
|
|
// Get order count that will be processed
|
|
const orderCount = await connection.query(`
|
|
SELECT COUNT(*) as count
|
|
FROM orders o
|
|
WHERE o.canceled = false
|
|
AND o.date >= CURRENT_DATE - INTERVAL '90 days'
|
|
`);
|
|
processedOrders = parseInt(orderCount.rows[0].count);
|
|
|
|
outputProgress({
|
|
status: 'running',
|
|
operation: 'Starting sales forecasts calculation',
|
|
current: processedCount,
|
|
total: totalProducts,
|
|
elapsed: formatElapsedTime(startTime),
|
|
remaining: estimateRemaining(startTime, processedCount, totalProducts),
|
|
rate: calculateRate(startTime, processedCount),
|
|
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
|
timing: {
|
|
start_time: new Date(startTime).toISOString(),
|
|
end_time: new Date().toISOString(),
|
|
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
|
}
|
|
});
|
|
|
|
// First, create a temporary table for forecast dates
|
|
await connection.query(`
|
|
CREATE TEMPORARY TABLE IF NOT EXISTS temp_forecast_dates (
|
|
forecast_date DATE,
|
|
day_of_week INT,
|
|
month INT,
|
|
PRIMARY KEY (forecast_date)
|
|
)
|
|
`);
|
|
|
|
await connection.query(`
|
|
INSERT INTO temp_forecast_dates
|
|
SELECT
|
|
CURRENT_DATE + (n || ' days')::INTERVAL as forecast_date,
|
|
EXTRACT(DOW FROM CURRENT_DATE + (n || ' days')::INTERVAL) + 1 as day_of_week,
|
|
EXTRACT(MONTH FROM CURRENT_DATE + (n || ' days')::INTERVAL) as month
|
|
FROM (
|
|
SELECT a.n + b.n * 10 as n
|
|
FROM
|
|
(SELECT 0 as n UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4 UNION
|
|
SELECT 5 UNION SELECT 6 UNION SELECT 7 UNION SELECT 8 UNION SELECT 9) a,
|
|
(SELECT 0 as n UNION SELECT 1 UNION SELECT 2) b
|
|
ORDER BY n
|
|
LIMIT 31
|
|
) numbers
|
|
`);
|
|
|
|
processedCount = Math.floor(totalProducts * 0.92);
|
|
outputProgress({
|
|
status: 'running',
|
|
operation: 'Forecast dates prepared, calculating daily sales stats',
|
|
current: processedCount,
|
|
total: totalProducts,
|
|
elapsed: formatElapsedTime(startTime),
|
|
remaining: estimateRemaining(startTime, processedCount, totalProducts),
|
|
rate: calculateRate(startTime, processedCount),
|
|
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
|
timing: {
|
|
start_time: new Date(startTime).toISOString(),
|
|
end_time: new Date().toISOString(),
|
|
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
|
}
|
|
});
|
|
|
|
if (isCancelled) return {
|
|
processedProducts: processedCount,
|
|
processedOrders,
|
|
processedPurchaseOrders: 0,
|
|
success
|
|
};
|
|
|
|
// Create temporary table for daily sales stats
|
|
await connection.query(`
|
|
CREATE TEMPORARY TABLE temp_daily_sales AS
|
|
SELECT
|
|
o.pid,
|
|
EXTRACT(DOW FROM o.date) + 1 as day_of_week,
|
|
SUM(o.quantity) as daily_quantity,
|
|
SUM(o.price * o.quantity) as daily_revenue,
|
|
COUNT(DISTINCT DATE(o.date)) as day_count
|
|
FROM orders o
|
|
WHERE o.canceled = false
|
|
AND o.date >= CURRENT_DATE - INTERVAL '90 days'
|
|
GROUP BY o.pid, EXTRACT(DOW FROM o.date) + 1
|
|
`);
|
|
|
|
processedCount = Math.floor(totalProducts * 0.94);
|
|
outputProgress({
|
|
status: 'running',
|
|
operation: 'Daily sales stats calculated, preparing product stats',
|
|
current: processedCount,
|
|
total: totalProducts,
|
|
elapsed: formatElapsedTime(startTime),
|
|
remaining: estimateRemaining(startTime, processedCount, totalProducts),
|
|
rate: calculateRate(startTime, processedCount),
|
|
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
|
timing: {
|
|
start_time: new Date(startTime).toISOString(),
|
|
end_time: new Date().toISOString(),
|
|
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
|
}
|
|
});
|
|
|
|
if (isCancelled) return {
|
|
processedProducts: processedCount,
|
|
processedOrders,
|
|
processedPurchaseOrders: 0,
|
|
success
|
|
};
|
|
|
|
// Create temporary table for product stats
|
|
await connection.query(`
|
|
CREATE TEMPORARY TABLE temp_product_stats AS
|
|
SELECT
|
|
pid,
|
|
AVG(daily_revenue) as overall_avg_revenue,
|
|
SUM(day_count) as total_days
|
|
FROM temp_daily_sales
|
|
GROUP BY pid
|
|
`);
|
|
|
|
processedCount = Math.floor(totalProducts * 0.96);
|
|
outputProgress({
|
|
status: 'running',
|
|
operation: 'Product stats prepared, calculating product-level forecasts',
|
|
current: processedCount,
|
|
total: totalProducts,
|
|
elapsed: formatElapsedTime(startTime),
|
|
remaining: estimateRemaining(startTime, processedCount, totalProducts),
|
|
rate: calculateRate(startTime, processedCount),
|
|
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
|
timing: {
|
|
start_time: new Date(startTime).toISOString(),
|
|
end_time: new Date().toISOString(),
|
|
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
|
}
|
|
});
|
|
|
|
if (isCancelled) return {
|
|
processedProducts: processedCount,
|
|
processedOrders,
|
|
processedPurchaseOrders: 0,
|
|
success
|
|
};
|
|
|
|
// Calculate product-level forecasts
|
|
await connection.query(`
|
|
INSERT INTO sales_forecasts (
|
|
pid,
|
|
forecast_date,
|
|
forecast_quantity,
|
|
confidence_level,
|
|
created_at
|
|
)
|
|
WITH daily_stats AS (
|
|
SELECT
|
|
ds.pid,
|
|
AVG(ds.daily_quantity) as avg_daily_qty,
|
|
STDDEV(ds.daily_quantity) as std_daily_qty,
|
|
COUNT(DISTINCT ds.day_count) as data_points,
|
|
SUM(ds.day_count) as total_days,
|
|
AVG(ds.daily_revenue) as avg_daily_revenue,
|
|
STDDEV(ds.daily_revenue) as std_daily_revenue,
|
|
MIN(ds.daily_quantity) as min_daily_qty,
|
|
MAX(ds.daily_quantity) as max_daily_qty,
|
|
-- Calculate variance without using LAG
|
|
COALESCE(
|
|
STDDEV(ds.daily_quantity) / NULLIF(AVG(ds.daily_quantity), 0),
|
|
0
|
|
) as daily_variance_ratio
|
|
FROM temp_daily_sales ds
|
|
GROUP BY ds.pid
|
|
HAVING AVG(ds.daily_quantity) > 0
|
|
)
|
|
SELECT
|
|
ds.pid,
|
|
fd.forecast_date,
|
|
GREATEST(0,
|
|
ROUND(
|
|
ds.avg_daily_qty *
|
|
(1 + COALESCE(sf.seasonality_factor, 0))
|
|
)
|
|
) as forecast_quantity,
|
|
CASE
|
|
WHEN ds.total_days >= 60 AND ds.daily_variance_ratio < 0.5 THEN 90
|
|
WHEN ds.total_days >= 60 THEN 85
|
|
WHEN ds.total_days >= 30 AND ds.daily_variance_ratio < 0.5 THEN 80
|
|
WHEN ds.total_days >= 30 THEN 75
|
|
WHEN ds.total_days >= 14 AND ds.daily_variance_ratio < 0.5 THEN 70
|
|
WHEN ds.total_days >= 14 THEN 65
|
|
ELSE 60
|
|
END as confidence_level,
|
|
NOW() as created_at
|
|
FROM daily_stats ds
|
|
JOIN temp_product_stats ps ON ds.pid = ps.pid
|
|
CROSS JOIN temp_forecast_dates fd
|
|
LEFT JOIN sales_seasonality sf ON fd.month = sf.month
|
|
GROUP BY ds.pid, fd.forecast_date, ps.overall_avg_revenue, sf.seasonality_factor,
|
|
ds.avg_daily_qty, ds.std_daily_qty, ds.avg_daily_qty, ds.total_days, ds.daily_variance_ratio
|
|
ON CONFLICT (pid, forecast_date) DO UPDATE
|
|
SET
|
|
forecast_quantity = EXCLUDED.forecast_quantity,
|
|
confidence_level = EXCLUDED.confidence_level,
|
|
created_at = NOW()
|
|
`);
|
|
|
|
processedCount = Math.floor(totalProducts * 0.98);
|
|
outputProgress({
|
|
status: 'running',
|
|
operation: 'Product forecasts calculated, preparing category stats',
|
|
current: processedCount,
|
|
total: totalProducts,
|
|
elapsed: formatElapsedTime(startTime),
|
|
remaining: estimateRemaining(startTime, processedCount, totalProducts),
|
|
rate: calculateRate(startTime, processedCount),
|
|
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
|
timing: {
|
|
start_time: new Date(startTime).toISOString(),
|
|
end_time: new Date().toISOString(),
|
|
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
|
}
|
|
});
|
|
|
|
if (isCancelled) return {
|
|
processedProducts: processedCount,
|
|
processedOrders,
|
|
processedPurchaseOrders: 0,
|
|
success
|
|
};
|
|
|
|
// Create temporary table for category stats
|
|
await connection.query(`
|
|
CREATE TEMPORARY TABLE temp_category_sales AS
|
|
SELECT
|
|
pc.cat_id,
|
|
EXTRACT(DOW FROM o.date) + 1 as day_of_week,
|
|
SUM(o.quantity) as daily_quantity,
|
|
SUM(o.price * o.quantity) as daily_revenue,
|
|
COUNT(DISTINCT DATE(o.date)) as day_count
|
|
FROM orders o
|
|
JOIN product_categories pc ON o.pid = pc.pid
|
|
WHERE o.canceled = false
|
|
AND o.date >= CURRENT_DATE - INTERVAL '90 days'
|
|
GROUP BY pc.cat_id, EXTRACT(DOW FROM o.date) + 1
|
|
`);
|
|
|
|
await connection.query(`
|
|
CREATE TEMPORARY TABLE temp_category_stats AS
|
|
SELECT
|
|
cat_id,
|
|
AVG(daily_revenue) as overall_avg_revenue,
|
|
SUM(day_count) as total_days
|
|
FROM temp_category_sales
|
|
GROUP BY cat_id
|
|
`);
|
|
|
|
processedCount = Math.floor(totalProducts * 0.99);
|
|
outputProgress({
|
|
status: 'running',
|
|
operation: 'Category stats prepared, calculating category-level forecasts',
|
|
current: processedCount,
|
|
total: totalProducts,
|
|
elapsed: formatElapsedTime(startTime),
|
|
remaining: estimateRemaining(startTime, processedCount, totalProducts),
|
|
rate: calculateRate(startTime, processedCount),
|
|
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
|
timing: {
|
|
start_time: new Date(startTime).toISOString(),
|
|
end_time: new Date().toISOString(),
|
|
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
|
}
|
|
});
|
|
|
|
if (isCancelled) return {
|
|
processedProducts: processedCount,
|
|
processedOrders,
|
|
processedPurchaseOrders: 0,
|
|
success
|
|
};
|
|
|
|
// Calculate category-level forecasts
|
|
await connection.query(`
|
|
INSERT INTO category_forecasts (
|
|
category_id,
|
|
forecast_date,
|
|
forecast_units,
|
|
forecast_revenue,
|
|
confidence_level,
|
|
created_at
|
|
)
|
|
SELECT
|
|
cs.cat_id::bigint as category_id,
|
|
fd.forecast_date,
|
|
GREATEST(0,
|
|
ROUND(AVG(cs.daily_quantity) *
|
|
(1 + COALESCE(sf.seasonality_factor, 0)))
|
|
) as forecast_units,
|
|
GREATEST(0,
|
|
COALESCE(
|
|
CASE
|
|
WHEN SUM(cs.day_count) >= 4 THEN AVG(cs.daily_revenue)
|
|
ELSE ct.overall_avg_revenue
|
|
END *
|
|
(1 + COALESCE(sf.seasonality_factor, 0)),
|
|
0
|
|
)
|
|
) as forecast_revenue,
|
|
CASE
|
|
WHEN ct.total_days >= 60 THEN 90
|
|
WHEN ct.total_days >= 30 THEN 80
|
|
WHEN ct.total_days >= 14 THEN 70
|
|
ELSE 60
|
|
END as confidence_level,
|
|
NOW() as created_at
|
|
FROM temp_category_sales cs
|
|
JOIN temp_category_stats ct ON cs.cat_id = ct.cat_id
|
|
CROSS JOIN temp_forecast_dates fd
|
|
LEFT JOIN sales_seasonality sf ON fd.month = sf.month
|
|
GROUP BY
|
|
cs.cat_id,
|
|
fd.forecast_date,
|
|
ct.overall_avg_revenue,
|
|
ct.total_days,
|
|
sf.seasonality_factor,
|
|
sf.month
|
|
HAVING AVG(cs.daily_quantity) > 0
|
|
ON CONFLICT (category_id, forecast_date) DO UPDATE
|
|
SET
|
|
forecast_units = EXCLUDED.forecast_units,
|
|
forecast_revenue = EXCLUDED.forecast_revenue,
|
|
confidence_level = EXCLUDED.confidence_level,
|
|
created_at = NOW()
|
|
`);
|
|
|
|
// Clean up temporary tables
|
|
await connection.query(`
|
|
DROP TABLE IF EXISTS temp_forecast_dates;
|
|
DROP TABLE IF EXISTS temp_daily_sales;
|
|
DROP TABLE IF EXISTS temp_product_stats;
|
|
DROP TABLE IF EXISTS temp_category_sales;
|
|
DROP TABLE IF EXISTS temp_category_stats;
|
|
`);
|
|
|
|
processedCount = Math.floor(totalProducts * 1.0);
|
|
outputProgress({
|
|
status: 'running',
|
|
operation: 'Category forecasts calculated and temporary tables cleaned up',
|
|
current: processedCount,
|
|
total: totalProducts,
|
|
elapsed: formatElapsedTime(startTime),
|
|
remaining: estimateRemaining(startTime, processedCount, totalProducts),
|
|
rate: calculateRate(startTime, processedCount),
|
|
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
|
timing: {
|
|
start_time: new Date(startTime).toISOString(),
|
|
end_time: new Date().toISOString(),
|
|
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
|
}
|
|
});
|
|
|
|
// If we get here, everything completed successfully
|
|
success = true;
|
|
|
|
// Update calculate_status
|
|
await connection.query(`
|
|
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
|
|
VALUES ('sales_forecasts', NOW())
|
|
ON CONFLICT (module_name) DO UPDATE
|
|
SET last_calculation_timestamp = NOW()
|
|
`);
|
|
|
|
return {
|
|
processedProducts: processedCount,
|
|
processedOrders,
|
|
processedPurchaseOrders: 0,
|
|
success
|
|
};
|
|
|
|
} catch (error) {
|
|
success = false;
|
|
logError(error, 'Error calculating sales forecasts');
|
|
throw error;
|
|
} finally {
|
|
if (connection) {
|
|
try {
|
|
// Ensure temporary tables are cleaned up
|
|
await connection.query(`
|
|
DROP TABLE IF EXISTS temp_forecast_dates;
|
|
DROP TABLE IF EXISTS temp_daily_sales;
|
|
DROP TABLE IF EXISTS temp_product_stats;
|
|
DROP TABLE IF EXISTS temp_category_sales;
|
|
DROP TABLE IF EXISTS temp_category_stats;
|
|
`);
|
|
} catch (err) {
|
|
console.error('Error cleaning up temporary tables:', err);
|
|
}
|
|
connection.release();
|
|
}
|
|
}
|
|
}
|
|
|
|
module.exports = calculateSalesForecasts;
|