243 lines
10 KiB
JavaScript
243 lines
10 KiB
JavaScript
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress');
|
|
const { getConnection } = require('./utils/db');
|
|
|
|
async function calculateTimeAggregates(startTime, totalProducts, processedCount = 0, isCancelled = false) {
|
|
const connection = await getConnection();
|
|
let success = false;
|
|
const BATCH_SIZE = 5000;
|
|
|
|
try {
|
|
// Get last calculation timestamp
|
|
const [lastCalc] = await connection.query(`
|
|
SELECT last_calculation_timestamp
|
|
FROM calculate_status
|
|
WHERE module_name = 'time_aggregates'
|
|
`);
|
|
const lastCalculationTime = lastCalc[0]?.last_calculation_timestamp || '1970-01-01';
|
|
|
|
// Get total count of products needing updates
|
|
if (!totalProducts) {
|
|
const [productCount] = await connection.query(`
|
|
SELECT COUNT(DISTINCT p.pid) as count
|
|
FROM products p
|
|
LEFT JOIN orders o ON p.pid = o.pid AND o.updated > ?
|
|
WHERE p.updated > ?
|
|
OR o.pid IS NOT NULL
|
|
`, [lastCalculationTime, lastCalculationTime]);
|
|
totalProducts = productCount[0].count;
|
|
}
|
|
|
|
if (totalProducts === 0) {
|
|
console.log('No products need time aggregate updates');
|
|
return {
|
|
processedProducts: 0,
|
|
processedOrders: 0,
|
|
processedPurchaseOrders: 0,
|
|
success: true
|
|
};
|
|
}
|
|
|
|
if (isCancelled) {
|
|
outputProgress({
|
|
status: 'cancelled',
|
|
operation: 'Time aggregates calculation cancelled',
|
|
current: processedCount,
|
|
total: totalProducts,
|
|
elapsed: formatElapsedTime(startTime),
|
|
remaining: null,
|
|
rate: calculateRate(startTime, processedCount),
|
|
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
|
timing: {
|
|
start_time: new Date(startTime).toISOString(),
|
|
end_time: new Date().toISOString(),
|
|
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
|
}
|
|
});
|
|
return {
|
|
processedProducts: processedCount,
|
|
processedOrders: 0,
|
|
processedPurchaseOrders: 0,
|
|
success
|
|
};
|
|
}
|
|
|
|
outputProgress({
|
|
status: 'running',
|
|
operation: 'Starting time aggregates calculation',
|
|
current: processedCount,
|
|
total: totalProducts,
|
|
elapsed: formatElapsedTime(startTime),
|
|
remaining: estimateRemaining(startTime, processedCount, totalProducts),
|
|
rate: calculateRate(startTime, processedCount),
|
|
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
|
timing: {
|
|
start_time: new Date(startTime).toISOString(),
|
|
end_time: new Date().toISOString(),
|
|
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
|
}
|
|
});
|
|
|
|
// Process in batches
|
|
let lastPid = 0;
|
|
while (true) {
|
|
if (isCancelled) break;
|
|
|
|
const [batch] = await connection.query(`
|
|
SELECT DISTINCT p.pid
|
|
FROM products p
|
|
FORCE INDEX (PRIMARY)
|
|
LEFT JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid
|
|
WHERE p.pid > ?
|
|
AND (
|
|
p.updated > ?
|
|
OR EXISTS (
|
|
SELECT 1
|
|
FROM orders o2 FORCE INDEX (idx_orders_metrics)
|
|
WHERE o2.pid = p.pid
|
|
AND o2.updated > ?
|
|
)
|
|
)
|
|
ORDER BY p.pid
|
|
LIMIT ?
|
|
`, [lastPid, lastCalculationTime, lastCalculationTime, BATCH_SIZE]);
|
|
|
|
if (batch.length === 0) break;
|
|
|
|
// Calculate and update time aggregates for this batch using temporary table
|
|
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_time_aggregates');
|
|
await connection.query(`
|
|
CREATE TEMPORARY TABLE temp_time_aggregates (
|
|
pid BIGINT NOT NULL,
|
|
year INT NOT NULL,
|
|
month INT NOT NULL,
|
|
total_quantity_sold INT DEFAULT 0,
|
|
total_revenue DECIMAL(10,3) DEFAULT 0,
|
|
total_cost DECIMAL(10,3) DEFAULT 0,
|
|
order_count INT DEFAULT 0,
|
|
stock_received INT DEFAULT 0,
|
|
stock_ordered INT DEFAULT 0,
|
|
avg_price DECIMAL(10,3),
|
|
profit_margin DECIMAL(10,3),
|
|
inventory_value DECIMAL(10,3),
|
|
gmroi DECIMAL(10,3),
|
|
PRIMARY KEY (pid, year, month),
|
|
INDEX (pid),
|
|
INDEX (year, month)
|
|
) ENGINE=MEMORY
|
|
`);
|
|
|
|
// Populate temporary table
|
|
await connection.query(`
|
|
INSERT INTO temp_time_aggregates
|
|
SELECT
|
|
p.pid,
|
|
YEAR(o.date) as year,
|
|
MONTH(o.date) as month,
|
|
SUM(o.quantity) as total_quantity_sold,
|
|
SUM(o.quantity * o.price) as total_revenue,
|
|
SUM(o.quantity * p.cost_price) as total_cost,
|
|
COUNT(DISTINCT o.order_number) as order_count,
|
|
COALESCE(SUM(CASE WHEN po.received_date IS NOT NULL THEN po.received ELSE 0 END), 0) as stock_received,
|
|
COALESCE(SUM(po.ordered), 0) as stock_ordered,
|
|
AVG(o.price) as avg_price,
|
|
CASE
|
|
WHEN SUM(o.quantity * o.price) > 0
|
|
THEN ((SUM(o.quantity * o.price) - SUM(o.quantity * p.cost_price)) / SUM(o.quantity * o.price)) * 100
|
|
ELSE 0
|
|
END as profit_margin,
|
|
p.cost_price * p.stock_quantity as inventory_value,
|
|
CASE
|
|
WHEN p.cost_price * p.stock_quantity > 0
|
|
THEN (SUM(o.quantity * (o.price - p.cost_price))) / (p.cost_price * p.stock_quantity)
|
|
ELSE 0
|
|
END as gmroi
|
|
FROM products p
|
|
FORCE INDEX (PRIMARY)
|
|
INNER JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid
|
|
AND o.canceled = false
|
|
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH)
|
|
LEFT JOIN purchase_orders po FORCE INDEX (idx_po_metrics) ON p.pid = po.pid
|
|
AND po.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH)
|
|
WHERE p.pid IN (?)
|
|
GROUP BY p.pid, YEAR(o.date), MONTH(o.date)
|
|
HAVING year IS NOT NULL AND month IS NOT NULL
|
|
`, [batch.map(row => row.pid)]);
|
|
|
|
// Update from temporary table
|
|
await connection.query(`
|
|
INSERT INTO product_time_aggregates (
|
|
pid, year, month,
|
|
total_quantity_sold, total_revenue, total_cost,
|
|
order_count, stock_received, stock_ordered,
|
|
avg_price, profit_margin, inventory_value, gmroi
|
|
)
|
|
SELECT
|
|
pid, year, month,
|
|
total_quantity_sold, total_revenue, total_cost,
|
|
order_count, stock_received, stock_ordered,
|
|
avg_price, profit_margin, inventory_value, gmroi
|
|
FROM temp_time_aggregates
|
|
ON DUPLICATE KEY UPDATE
|
|
total_quantity_sold = VALUES(total_quantity_sold),
|
|
total_revenue = VALUES(total_revenue),
|
|
total_cost = VALUES(total_cost),
|
|
order_count = VALUES(order_count),
|
|
stock_received = VALUES(stock_received),
|
|
stock_ordered = VALUES(stock_ordered),
|
|
avg_price = VALUES(avg_price),
|
|
profit_margin = VALUES(profit_margin),
|
|
inventory_value = VALUES(inventory_value),
|
|
gmroi = VALUES(gmroi)
|
|
`);
|
|
|
|
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_time_aggregates');
|
|
|
|
lastPid = batch[batch.length - 1].pid;
|
|
processedCount += batch.length;
|
|
|
|
outputProgress({
|
|
status: 'running',
|
|
operation: 'Processing time aggregates batch',
|
|
current: processedCount,
|
|
total: totalProducts,
|
|
elapsed: formatElapsedTime(startTime),
|
|
remaining: estimateRemaining(startTime, processedCount, totalProducts),
|
|
rate: calculateRate(startTime, processedCount),
|
|
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
|
timing: {
|
|
start_time: new Date(startTime).toISOString(),
|
|
end_time: new Date().toISOString(),
|
|
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
|
}
|
|
});
|
|
}
|
|
|
|
// If we get here, everything completed successfully
|
|
success = true;
|
|
|
|
// Update calculate_status
|
|
await connection.query(`
|
|
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
|
|
VALUES ('time_aggregates', NOW())
|
|
ON DUPLICATE KEY UPDATE last_calculation_timestamp = NOW()
|
|
`);
|
|
|
|
return {
|
|
processedProducts: processedCount,
|
|
processedOrders: 0,
|
|
processedPurchaseOrders: 0,
|
|
success
|
|
};
|
|
|
|
} catch (error) {
|
|
success = false;
|
|
logError(error, 'Error calculating time aggregates');
|
|
throw error;
|
|
} finally {
|
|
if (connection) {
|
|
connection.release();
|
|
}
|
|
}
|
|
}
|
|
|
|
module.exports = calculateTimeAggregates;
|