Make calculations incremental

This commit is contained in:
2025-02-09 13:35:44 -05:00
parent 2a6a0d0a87
commit 843ce71506
9 changed files with 935 additions and 1654 deletions

View File

@@ -4,19 +4,50 @@ const { getConnection } = require('./utils/db');
async function calculateSalesForecasts(startTime, totalProducts, processedCount = 0, isCancelled = false) {
const connection = await getConnection();
let success = false;
let processedOrders = 0;
const BATCH_SIZE = 5000;
try {
// Get last calculation timestamp
const [lastCalc] = await connection.query(`
SELECT last_calculation_timestamp
FROM calculate_status
WHERE module_name = 'sales_forecasts'
`);
const lastCalculationTime = lastCalc[0]?.last_calculation_timestamp || '1970-01-01';
// Get total count of products needing updates
const [productCount] = await connection.query(`
SELECT COUNT(DISTINCT p.pid) as count
FROM products p
LEFT JOIN orders o ON p.pid = o.pid AND o.updated > ?
WHERE p.visible = true
AND (
p.updated > ?
OR o.id IS NOT NULL
)
`, [lastCalculationTime, lastCalculationTime]);
const totalProductsToUpdate = productCount[0].count;
if (totalProductsToUpdate === 0) {
console.log('No products need forecast updates');
return {
processedProducts: 0,
processedOrders: 0,
processedPurchaseOrders: 0,
success: true
};
}
if (isCancelled) {
outputProgress({
status: 'cancelled',
operation: 'Sales forecasts calculation cancelled',
operation: 'Sales forecast calculation cancelled',
current: processedCount,
total: totalProducts,
total: totalProductsToUpdate,
elapsed: formatElapsedTime(startTime),
remaining: null,
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
percentage: ((processedCount / totalProductsToUpdate) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
@@ -31,24 +62,15 @@ async function calculateSalesForecasts(startTime, totalProducts, processedCount
};
}
// Get order count that will be processed
const [orderCount] = await connection.query(`
SELECT COUNT(*) as count
FROM orders o
WHERE o.canceled = false
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 90 DAY)
`);
processedOrders = orderCount[0].count;
outputProgress({
status: 'running',
operation: 'Starting sales forecasts calculation',
operation: 'Starting sales forecast calculation',
current: processedCount,
total: totalProducts,
total: totalProductsToUpdate,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
remaining: estimateRemaining(startTime, processedCount, totalProductsToUpdate),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
percentage: ((processedCount / totalProductsToUpdate) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
@@ -56,365 +78,141 @@ async function calculateSalesForecasts(startTime, totalProducts, processedCount
}
});
// First, create a temporary table for forecast dates
await connection.query(`
CREATE TEMPORARY TABLE IF NOT EXISTS temp_forecast_dates (
forecast_date DATE,
day_of_week INT,
month INT,
PRIMARY KEY (forecast_date)
)
`);
// Process in batches
let lastPid = '';
while (true) {
if (isCancelled) break;
await connection.query(`
INSERT INTO temp_forecast_dates
SELECT
DATE_ADD(CURRENT_DATE, INTERVAL n DAY) as forecast_date,
DAYOFWEEK(DATE_ADD(CURRENT_DATE, INTERVAL n DAY)) as day_of_week,
MONTH(DATE_ADD(CURRENT_DATE, INTERVAL n DAY)) as month
FROM (
SELECT a.N + b.N * 10 as n
FROM
(SELECT 0 as N UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4 UNION
SELECT 5 UNION SELECT 6 UNION SELECT 7 UNION SELECT 8 UNION SELECT 9) a,
(SELECT 0 as N UNION SELECT 1 UNION SELECT 2) b
ORDER BY n
LIMIT 31
) numbers
`);
const [batch] = await connection.query(`
SELECT DISTINCT p.pid
FROM products p
LEFT JOIN orders o ON p.pid = o.pid AND o.updated > ?
WHERE p.visible = true
AND p.pid > ?
AND (
p.updated > ?
OR o.id IS NOT NULL
)
ORDER BY p.pid
LIMIT ?
`, [lastCalculationTime, lastPid, lastCalculationTime, BATCH_SIZE]);
processedCount = Math.floor(totalProducts * 0.92);
outputProgress({
status: 'running',
operation: 'Forecast dates prepared, calculating daily sales stats',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (batch.length === 0) break;
if (isCancelled) return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders: 0,
success
};
// Create temporary table for daily sales stats
await connection.query(`
CREATE TEMPORARY TABLE IF NOT EXISTS temp_daily_sales AS
SELECT
o.pid,
DAYOFWEEK(o.date) as day_of_week,
SUM(o.quantity) as daily_quantity,
SUM(o.price * o.quantity) as daily_revenue,
COUNT(DISTINCT DATE(o.date)) as day_count
FROM orders o
WHERE o.canceled = false
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 90 DAY)
GROUP BY o.pid, DAYOFWEEK(o.date)
`);
processedCount = Math.floor(totalProducts * 0.94);
outputProgress({
status: 'running',
operation: 'Daily sales stats calculated, preparing product stats',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (isCancelled) return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders: 0,
success
};
// Create temporary table for product stats
await connection.query(`
CREATE TEMPORARY TABLE IF NOT EXISTS temp_product_stats AS
SELECT
pid,
AVG(daily_revenue) as overall_avg_revenue,
SUM(day_count) as total_days
FROM temp_daily_sales
GROUP BY pid
`);
processedCount = Math.floor(totalProducts * 0.96);
outputProgress({
status: 'running',
operation: 'Product stats prepared, calculating product-level forecasts',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (isCancelled) return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders: 0,
success
};
// Calculate product-level forecasts
await connection.query(`
INSERT INTO sales_forecasts (
pid,
forecast_date,
forecast_units,
forecast_revenue,
confidence_level,
last_calculated_at
)
WITH daily_stats AS (
SELECT
ds.pid,
AVG(ds.daily_quantity) as avg_daily_qty,
STDDEV(ds.daily_quantity) as std_daily_qty,
COUNT(DISTINCT ds.day_count) as data_points,
SUM(ds.day_count) as total_days,
AVG(ds.daily_revenue) as avg_daily_revenue,
STDDEV(ds.daily_revenue) as std_daily_revenue,
MIN(ds.daily_quantity) as min_daily_qty,
MAX(ds.daily_quantity) as max_daily_qty,
-- Calculate variance without using LAG
COALESCE(
STDDEV(ds.daily_quantity) / NULLIF(AVG(ds.daily_quantity), 0),
0
) as daily_variance_ratio
FROM temp_daily_sales ds
GROUP BY ds.pid
HAVING AVG(ds.daily_quantity) > 0
)
SELECT
ds.pid,
fd.forecast_date,
GREATEST(0,
ROUND(
ds.avg_daily_qty *
(1 + COALESCE(sf.seasonality_factor, 0)) *
CASE
WHEN ds.std_daily_qty / NULLIF(ds.avg_daily_qty, 0) > 1.5 THEN 0.85
WHEN ds.std_daily_qty / NULLIF(ds.avg_daily_qty, 0) > 1.0 THEN 0.9
WHEN ds.std_daily_qty / NULLIF(ds.avg_daily_qty, 0) > 0.5 THEN 0.95
ELSE 1.0
END,
2
)
) as forecast_units,
GREATEST(0,
ROUND(
COALESCE(
CASE
WHEN ds.data_points >= 4 THEN ds.avg_daily_revenue
ELSE ps.overall_avg_revenue
END *
(1 + COALESCE(sf.seasonality_factor, 0)) *
CASE
WHEN ds.std_daily_revenue / NULLIF(ds.avg_daily_revenue, 0) > 1.5 THEN 0.85
WHEN ds.std_daily_revenue / NULLIF(ds.avg_daily_revenue, 0) > 1.0 THEN 0.9
WHEN ds.std_daily_revenue / NULLIF(ds.avg_daily_revenue, 0) > 0.5 THEN 0.95
ELSE 1.0
END,
0
),
2
)
) as forecast_revenue,
CASE
WHEN ds.total_days >= 60 AND ds.daily_variance_ratio < 0.5 THEN 90
WHEN ds.total_days >= 60 THEN 85
WHEN ds.total_days >= 30 AND ds.daily_variance_ratio < 0.5 THEN 80
WHEN ds.total_days >= 30 THEN 75
WHEN ds.total_days >= 14 AND ds.daily_variance_ratio < 0.5 THEN 70
WHEN ds.total_days >= 14 THEN 65
ELSE 60
END as confidence_level,
NOW() as last_calculated_at
FROM daily_stats ds
JOIN temp_product_stats ps ON ds.pid = ps.pid
CROSS JOIN temp_forecast_dates fd
LEFT JOIN sales_seasonality sf ON fd.month = sf.month
GROUP BY ds.pid, fd.forecast_date, ps.overall_avg_revenue, sf.seasonality_factor
ON DUPLICATE KEY UPDATE
forecast_units = VALUES(forecast_units),
forecast_revenue = VALUES(forecast_revenue),
confidence_level = VALUES(confidence_level),
last_calculated_at = NOW()
`);
processedCount = Math.floor(totalProducts * 0.98);
outputProgress({
status: 'running',
operation: 'Product forecasts calculated, preparing category stats',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (isCancelled) return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders: 0,
success
};
// Create temporary table for category stats
await connection.query(`
CREATE TEMPORARY TABLE IF NOT EXISTS temp_category_sales AS
SELECT
pc.cat_id,
DAYOFWEEK(o.date) as day_of_week,
SUM(o.quantity) as daily_quantity,
SUM(o.price * o.quantity) as daily_revenue,
COUNT(DISTINCT DATE(o.date)) as day_count
FROM orders o
JOIN product_categories pc ON o.pid = pc.pid
WHERE o.canceled = false
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 90 DAY)
GROUP BY pc.cat_id, DAYOFWEEK(o.date)
`);
await connection.query(`
CREATE TEMPORARY TABLE IF NOT EXISTS temp_category_stats AS
SELECT
cat_id,
AVG(daily_revenue) as overall_avg_revenue,
SUM(day_count) as total_days
FROM temp_category_sales
GROUP BY cat_id
`);
processedCount = Math.floor(totalProducts * 0.99);
outputProgress({
status: 'running',
operation: 'Category stats prepared, calculating category-level forecasts',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (isCancelled) return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders: 0,
success
};
// Calculate category-level forecasts
await connection.query(`
INSERT INTO category_forecasts (
category_id,
forecast_date,
forecast_units,
forecast_revenue,
confidence_level,
last_calculated_at
)
SELECT
cs.cat_id as category_id,
fd.forecast_date,
GREATEST(0,
AVG(cs.daily_quantity) *
(1 + COALESCE(sf.seasonality_factor, 0))
) as forecast_units,
GREATEST(0,
COALESCE(
// Calculate forecasts for this batch
await connection.query(`
INSERT INTO sales_forecasts (
pid,
forecast_date,
forecast_units,
forecast_revenue,
confidence_level,
last_calculated_at
)
WITH historical_sales AS (
SELECT
o.pid,
DATE(o.date) as sale_date,
SUM(o.quantity) as daily_quantity,
SUM(o.quantity * o.price) as daily_revenue
FROM orders o
WHERE o.canceled = false
AND o.pid IN (?)
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 180 DAY)
GROUP BY o.pid, DATE(o.date)
),
sales_stats AS (
SELECT
pid,
AVG(daily_quantity) as avg_daily_units,
AVG(daily_revenue) as avg_daily_revenue,
STDDEV(daily_quantity) as std_daily_units,
COUNT(*) as days_with_sales,
MIN(sale_date) as first_sale,
MAX(sale_date) as last_sale
FROM historical_sales
GROUP BY pid
),
recent_trend AS (
SELECT
h.pid,
AVG(h.daily_quantity) as recent_avg_units,
AVG(h.daily_revenue) as recent_avg_revenue
FROM historical_sales h
WHERE h.sale_date >= DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY)
GROUP BY h.pid
),
confidence_calc AS (
SELECT
s.pid,
LEAST(100, GREATEST(0, ROUND(
(s.days_with_sales / 180.0 * 50) + -- Up to 50 points for history length
(CASE
WHEN s.std_daily_units = 0 OR s.avg_daily_units = 0 THEN 0
WHEN (s.std_daily_units / s.avg_daily_units) <= 0.5 THEN 30
WHEN (s.std_daily_units / s.avg_daily_units) <= 1.0 THEN 20
WHEN (s.std_daily_units / s.avg_daily_units) <= 2.0 THEN 10
ELSE 0
END) + -- Up to 30 points for consistency
(CASE
WHEN DATEDIFF(CURRENT_DATE, s.last_sale) <= 7 THEN 20
WHEN DATEDIFF(CURRENT_DATE, s.last_sale) <= 30 THEN 10
ELSE 0
END) -- Up to 20 points for recency
))) as confidence_level
FROM sales_stats s
)
(SELECT
s.pid,
DATE_ADD(CURRENT_DATE, INTERVAL n.days DAY) as forecast_date,
GREATEST(0, ROUND(
CASE
WHEN SUM(cs.day_count) >= 4 THEN AVG(cs.daily_revenue)
ELSE ct.overall_avg_revenue
END *
(1 + COALESCE(sf.seasonality_factor, 0)) *
(0.95 + (RAND() * 0.1)),
0
)
) as forecast_revenue,
CASE
WHEN ct.total_days >= 60 THEN 90
WHEN ct.total_days >= 30 THEN 80
WHEN ct.total_days >= 14 THEN 70
ELSE 60
END as confidence_level,
NOW() as last_calculated_at
FROM temp_category_sales cs
JOIN temp_category_stats ct ON cs.cat_id = ct.cat_id
CROSS JOIN temp_forecast_dates fd
LEFT JOIN sales_seasonality sf ON fd.month = sf.month
GROUP BY cs.cat_id, fd.forecast_date, ct.overall_avg_revenue, ct.total_days, sf.seasonality_factor
HAVING AVG(cs.daily_quantity) > 0
ON DUPLICATE KEY UPDATE
forecast_units = VALUES(forecast_units),
forecast_revenue = VALUES(forecast_revenue),
confidence_level = VALUES(confidence_level),
last_calculated_at = NOW()
`);
WHEN s.days_with_sales >= n.days THEN
COALESCE(t.recent_avg_units, s.avg_daily_units)
ELSE s.avg_daily_units * (s.days_with_sales / n.days)
END
)) as forecast_units,
GREATEST(0, ROUND(
CASE
WHEN s.days_with_sales >= n.days THEN
COALESCE(t.recent_avg_revenue, s.avg_daily_revenue)
ELSE s.avg_daily_revenue * (s.days_with_sales / n.days)
END
, 2)) as forecast_revenue,
c.confidence_level,
NOW() as last_calculated_at
FROM sales_stats s
CROSS JOIN (
SELECT 30 as days UNION SELECT 60 UNION SELECT 90
) n
LEFT JOIN recent_trend t ON s.pid = t.pid
LEFT JOIN confidence_calc c ON s.pid = c.pid)
ON DUPLICATE KEY UPDATE
forecast_units = VALUES(forecast_units),
forecast_revenue = VALUES(forecast_revenue),
confidence_level = VALUES(confidence_level),
last_calculated_at = NOW()
`, [batch.map(row => row.pid)]);
// Clean up temporary tables
await connection.query(`
DROP TEMPORARY TABLE IF EXISTS temp_forecast_dates;
DROP TEMPORARY TABLE IF EXISTS temp_daily_sales;
DROP TEMPORARY TABLE IF EXISTS temp_product_stats;
DROP TEMPORARY TABLE IF EXISTS temp_category_sales;
DROP TEMPORARY TABLE IF EXISTS temp_category_stats;
`);
lastPid = batch[batch.length - 1].pid;
processedCount += batch.length;
processedCount = Math.floor(totalProducts * 1.0);
outputProgress({
status: 'running',
operation: 'Category forecasts calculated and temporary tables cleaned up',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
outputProgress({
status: 'running',
operation: 'Processing sales forecast batch',
current: processedCount,
total: totalProductsToUpdate,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProductsToUpdate),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProductsToUpdate) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
}
// If we get here, everything completed successfully
success = true;
@@ -428,7 +226,7 @@ async function calculateSalesForecasts(startTime, totalProducts, processedCount
return {
processedProducts: processedCount,
processedOrders,
processedOrders: 0,
processedPurchaseOrders: 0,
success
};