245 lines
10 KiB
JavaScript
245 lines
10 KiB
JavaScript
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress');
|
|
const { getConnection } = require('./utils/db');
|
|
|
|
async function calculateSalesForecasts(startTime, totalProducts, processedCount = 0, isCancelled = false) {
|
|
const connection = await getConnection();
|
|
let success = false;
|
|
const BATCH_SIZE = 5000;
|
|
|
|
try {
|
|
// Get last calculation timestamp
|
|
const [lastCalc] = await connection.query(`
|
|
SELECT last_calculation_timestamp
|
|
FROM calculate_status
|
|
WHERE module_name = 'sales_forecasts'
|
|
`);
|
|
const lastCalculationTime = lastCalc[0]?.last_calculation_timestamp || '1970-01-01';
|
|
|
|
// Get total count of products needing updates
|
|
const [productCount] = await connection.query(`
|
|
SELECT COUNT(DISTINCT p.pid) as count
|
|
FROM products p
|
|
LEFT JOIN orders o ON p.pid = o.pid AND o.updated > ?
|
|
WHERE p.visible = true
|
|
AND (
|
|
p.updated > ?
|
|
OR o.id IS NOT NULL
|
|
)
|
|
`, [lastCalculationTime, lastCalculationTime]);
|
|
const totalProductsToUpdate = productCount[0].count;
|
|
|
|
if (totalProductsToUpdate === 0) {
|
|
console.log('No products need forecast updates');
|
|
return {
|
|
processedProducts: 0,
|
|
processedOrders: 0,
|
|
processedPurchaseOrders: 0,
|
|
success: true
|
|
};
|
|
}
|
|
|
|
if (isCancelled) {
|
|
outputProgress({
|
|
status: 'cancelled',
|
|
operation: 'Sales forecast calculation cancelled',
|
|
current: processedCount,
|
|
total: totalProductsToUpdate,
|
|
elapsed: formatElapsedTime(startTime),
|
|
remaining: null,
|
|
rate: calculateRate(startTime, processedCount),
|
|
percentage: ((processedCount / totalProductsToUpdate) * 100).toFixed(1),
|
|
timing: {
|
|
start_time: new Date(startTime).toISOString(),
|
|
end_time: new Date().toISOString(),
|
|
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
|
}
|
|
});
|
|
return {
|
|
processedProducts: processedCount,
|
|
processedOrders: 0,
|
|
processedPurchaseOrders: 0,
|
|
success
|
|
};
|
|
}
|
|
|
|
outputProgress({
|
|
status: 'running',
|
|
operation: 'Starting sales forecast calculation',
|
|
current: processedCount,
|
|
total: totalProductsToUpdate,
|
|
elapsed: formatElapsedTime(startTime),
|
|
remaining: estimateRemaining(startTime, processedCount, totalProductsToUpdate),
|
|
rate: calculateRate(startTime, processedCount),
|
|
percentage: ((processedCount / totalProductsToUpdate) * 100).toFixed(1),
|
|
timing: {
|
|
start_time: new Date(startTime).toISOString(),
|
|
end_time: new Date().toISOString(),
|
|
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
|
}
|
|
});
|
|
|
|
// Process in batches
|
|
let lastPid = '';
|
|
while (true) {
|
|
if (isCancelled) break;
|
|
|
|
const [batch] = await connection.query(`
|
|
SELECT DISTINCT p.pid
|
|
FROM products p
|
|
LEFT JOIN orders o ON p.pid = o.pid AND o.updated > ?
|
|
WHERE p.visible = true
|
|
AND p.pid > ?
|
|
AND (
|
|
p.updated > ?
|
|
OR o.id IS NOT NULL
|
|
)
|
|
ORDER BY p.pid
|
|
LIMIT ?
|
|
`, [lastCalculationTime, lastPid, lastCalculationTime, BATCH_SIZE]);
|
|
|
|
if (batch.length === 0) break;
|
|
|
|
// Calculate forecasts for this batch
|
|
await connection.query(`
|
|
INSERT INTO sales_forecasts (
|
|
pid,
|
|
forecast_date,
|
|
forecast_units,
|
|
forecast_revenue,
|
|
confidence_level,
|
|
last_calculated_at
|
|
)
|
|
WITH historical_sales AS (
|
|
SELECT
|
|
o.pid,
|
|
DATE(o.date) as sale_date,
|
|
SUM(o.quantity) as daily_quantity,
|
|
SUM(o.quantity * o.price) as daily_revenue
|
|
FROM orders o
|
|
WHERE o.canceled = false
|
|
AND o.pid IN (?)
|
|
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 180 DAY)
|
|
GROUP BY o.pid, DATE(o.date)
|
|
),
|
|
sales_stats AS (
|
|
SELECT
|
|
pid,
|
|
AVG(daily_quantity) as avg_daily_units,
|
|
AVG(daily_revenue) as avg_daily_revenue,
|
|
STDDEV(daily_quantity) as std_daily_units,
|
|
COUNT(*) as days_with_sales,
|
|
MIN(sale_date) as first_sale,
|
|
MAX(sale_date) as last_sale
|
|
FROM historical_sales
|
|
GROUP BY pid
|
|
),
|
|
recent_trend AS (
|
|
SELECT
|
|
h.pid,
|
|
AVG(h.daily_quantity) as recent_avg_units,
|
|
AVG(h.daily_revenue) as recent_avg_revenue
|
|
FROM historical_sales h
|
|
WHERE h.sale_date >= DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY)
|
|
GROUP BY h.pid
|
|
),
|
|
confidence_calc AS (
|
|
SELECT
|
|
s.pid,
|
|
LEAST(100, GREATEST(0, ROUND(
|
|
(s.days_with_sales / 180.0 * 50) + -- Up to 50 points for history length
|
|
(CASE
|
|
WHEN s.std_daily_units = 0 OR s.avg_daily_units = 0 THEN 0
|
|
WHEN (s.std_daily_units / s.avg_daily_units) <= 0.5 THEN 30
|
|
WHEN (s.std_daily_units / s.avg_daily_units) <= 1.0 THEN 20
|
|
WHEN (s.std_daily_units / s.avg_daily_units) <= 2.0 THEN 10
|
|
ELSE 0
|
|
END) + -- Up to 30 points for consistency
|
|
(CASE
|
|
WHEN DATEDIFF(CURRENT_DATE, s.last_sale) <= 7 THEN 20
|
|
WHEN DATEDIFF(CURRENT_DATE, s.last_sale) <= 30 THEN 10
|
|
ELSE 0
|
|
END) -- Up to 20 points for recency
|
|
))) as confidence_level
|
|
FROM sales_stats s
|
|
)
|
|
(SELECT
|
|
s.pid,
|
|
DATE_ADD(CURRENT_DATE, INTERVAL n.days DAY) as forecast_date,
|
|
GREATEST(0, ROUND(
|
|
CASE
|
|
WHEN s.days_with_sales >= n.days THEN
|
|
COALESCE(t.recent_avg_units, s.avg_daily_units)
|
|
ELSE s.avg_daily_units * (s.days_with_sales / n.days)
|
|
END
|
|
)) as forecast_units,
|
|
GREATEST(0, ROUND(
|
|
CASE
|
|
WHEN s.days_with_sales >= n.days THEN
|
|
COALESCE(t.recent_avg_revenue, s.avg_daily_revenue)
|
|
ELSE s.avg_daily_revenue * (s.days_with_sales / n.days)
|
|
END
|
|
, 2)) as forecast_revenue,
|
|
c.confidence_level,
|
|
NOW() as last_calculated_at
|
|
FROM sales_stats s
|
|
CROSS JOIN (
|
|
SELECT 30 as days UNION SELECT 60 UNION SELECT 90
|
|
) n
|
|
LEFT JOIN recent_trend t ON s.pid = t.pid
|
|
LEFT JOIN confidence_calc c ON s.pid = c.pid)
|
|
ON DUPLICATE KEY UPDATE
|
|
forecast_units = VALUES(forecast_units),
|
|
forecast_revenue = VALUES(forecast_revenue),
|
|
confidence_level = VALUES(confidence_level),
|
|
last_calculated_at = NOW()
|
|
`, [batch.map(row => row.pid)]);
|
|
|
|
lastPid = batch[batch.length - 1].pid;
|
|
processedCount += batch.length;
|
|
|
|
outputProgress({
|
|
status: 'running',
|
|
operation: 'Processing sales forecast batch',
|
|
current: processedCount,
|
|
total: totalProductsToUpdate,
|
|
elapsed: formatElapsedTime(startTime),
|
|
remaining: estimateRemaining(startTime, processedCount, totalProductsToUpdate),
|
|
rate: calculateRate(startTime, processedCount),
|
|
percentage: ((processedCount / totalProductsToUpdate) * 100).toFixed(1),
|
|
timing: {
|
|
start_time: new Date(startTime).toISOString(),
|
|
end_time: new Date().toISOString(),
|
|
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
|
}
|
|
});
|
|
}
|
|
|
|
// If we get here, everything completed successfully
|
|
success = true;
|
|
|
|
// Update calculate_status
|
|
await connection.query(`
|
|
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
|
|
VALUES ('sales_forecasts', NOW())
|
|
ON DUPLICATE KEY UPDATE last_calculation_timestamp = NOW()
|
|
`);
|
|
|
|
return {
|
|
processedProducts: processedCount,
|
|
processedOrders: 0,
|
|
processedPurchaseOrders: 0,
|
|
success
|
|
};
|
|
|
|
} catch (error) {
|
|
success = false;
|
|
logError(error, 'Error calculating sales forecasts');
|
|
throw error;
|
|
} finally {
|
|
if (connection) {
|
|
connection.release();
|
|
}
|
|
}
|
|
}
|
|
|
|
module.exports = calculateSalesForecasts;
|