More try to speed up time-aggregate calcs

This commit is contained in:
2025-02-10 14:30:52 -05:00
parent 619409847d
commit d8fd64cf62

View File

@@ -103,8 +103,39 @@ async function calculateTimeAggregates(startTime, totalProducts, processedCount
if (batch.length === 0) break; if (batch.length === 0) break;
// Calculate and update time aggregates for this batch using temporary table // Create temporary tables for better performance
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_order_stats');
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_purchase_stats');
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_time_aggregates'); await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_time_aggregates');
// Create optimized temporary tables
await connection.query(`
CREATE TEMPORARY TABLE temp_order_stats (
pid BIGINT NOT NULL,
year INT NOT NULL,
month INT NOT NULL,
total_quantity_sold INT DEFAULT 0,
total_revenue DECIMAL(10,3) DEFAULT 0,
total_cost DECIMAL(10,3) DEFAULT 0,
order_count INT DEFAULT 0,
avg_price DECIMAL(10,3),
PRIMARY KEY (pid, year, month),
INDEX (pid)
) ENGINE=MEMORY
`);
await connection.query(`
CREATE TEMPORARY TABLE temp_purchase_stats (
pid BIGINT NOT NULL,
year INT NOT NULL,
month INT NOT NULL,
stock_received INT DEFAULT 0,
stock_ordered INT DEFAULT 0,
PRIMARY KEY (pid, year, month),
INDEX (pid)
) ENGINE=MEMORY
`);
await connection.query(` await connection.query(`
CREATE TEMPORARY TABLE temp_time_aggregates ( CREATE TEMPORARY TABLE temp_time_aggregates (
pid BIGINT NOT NULL, pid BIGINT NOT NULL,
@@ -121,14 +152,13 @@ async function calculateTimeAggregates(startTime, totalProducts, processedCount
inventory_value DECIMAL(10,3), inventory_value DECIMAL(10,3),
gmroi DECIMAL(10,3), gmroi DECIMAL(10,3),
PRIMARY KEY (pid, year, month), PRIMARY KEY (pid, year, month),
INDEX (pid), INDEX (pid)
INDEX (year, month)
) ENGINE=MEMORY ) ENGINE=MEMORY
`); `);
// Populate temporary table // Populate order stats
await connection.query(` await connection.query(`
INSERT INTO temp_time_aggregates INSERT INTO temp_order_stats
SELECT SELECT
p.pid, p.pid,
YEAR(o.date) as year, YEAR(o.date) as year,
@@ -137,32 +167,64 @@ async function calculateTimeAggregates(startTime, totalProducts, processedCount
SUM(o.quantity * o.price) as total_revenue, SUM(o.quantity * o.price) as total_revenue,
SUM(o.quantity * p.cost_price) as total_cost, SUM(o.quantity * p.cost_price) as total_cost,
COUNT(DISTINCT o.order_number) as order_count, COUNT(DISTINCT o.order_number) as order_count,
COALESCE(SUM(CASE WHEN po.received_date IS NOT NULL THEN po.received ELSE 0 END), 0) as stock_received, AVG(o.price) as avg_price
COALESCE(SUM(po.ordered), 0) as stock_ordered,
AVG(o.price) as avg_price,
CASE
WHEN SUM(o.quantity * o.price) > 0
THEN ((SUM(o.quantity * o.price) - SUM(o.quantity * p.cost_price)) / SUM(o.quantity * o.price)) * 100
ELSE 0
END as profit_margin,
p.cost_price * p.stock_quantity as inventory_value,
CASE
WHEN p.cost_price * p.stock_quantity > 0
THEN (SUM(o.quantity * (o.price - p.cost_price))) / (p.cost_price * p.stock_quantity)
ELSE 0
END as gmroi
FROM products p FROM products p
FORCE INDEX (PRIMARY) FORCE INDEX (PRIMARY)
INNER JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid INNER JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid
LEFT JOIN purchase_orders po FORCE INDEX (idx_po_metrics) ON p.pid = po.pid
AND po.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH)
WHERE p.pid IN (?) WHERE p.pid IN (?)
AND o.canceled = false AND o.canceled = false
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH) AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH)
GROUP BY p.pid, YEAR(o.date), MONTH(o.date) GROUP BY p.pid, YEAR(o.date), MONTH(o.date)
`, [batch.map(row => row.pid)]); `, [batch.map(row => row.pid)]);
// Update from temporary table // Populate purchase stats
await connection.query(`
INSERT INTO temp_purchase_stats
SELECT
p.pid,
YEAR(po.date) as year,
MONTH(po.date) as month,
COALESCE(SUM(CASE WHEN po.received_date IS NOT NULL THEN po.received ELSE 0 END), 0) as stock_received,
COALESCE(SUM(po.ordered), 0) as stock_ordered
FROM products p
FORCE INDEX (PRIMARY)
INNER JOIN purchase_orders po FORCE INDEX (idx_po_metrics) ON p.pid = po.pid
WHERE p.pid IN (?)
AND po.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH)
GROUP BY p.pid, YEAR(po.date), MONTH(po.date)
`, [batch.map(row => row.pid)]);
// Combine stats and calculate metrics
await connection.query(`
INSERT INTO temp_time_aggregates
SELECT
o.pid,
o.year,
o.month,
o.total_quantity_sold,
o.total_revenue,
o.total_cost,
o.order_count,
COALESCE(ps.stock_received, 0) as stock_received,
COALESCE(ps.stock_ordered, 0) as stock_ordered,
o.avg_price,
CASE
WHEN o.total_revenue > 0
THEN ((o.total_revenue - o.total_cost) / o.total_revenue) * 100
ELSE 0
END as profit_margin,
p.cost_price * p.stock_quantity as inventory_value,
CASE
WHEN (p.cost_price * p.stock_quantity) > 0
THEN (o.total_revenue - o.total_cost) / (p.cost_price * p.stock_quantity)
ELSE 0
END as gmroi
FROM temp_order_stats o
LEFT JOIN temp_purchase_stats ps ON o.pid = ps.pid AND o.year = ps.year AND o.month = ps.month
JOIN products p FORCE INDEX (PRIMARY) ON o.pid = p.pid
`);
// Update final table with optimized batch update
await connection.query(` await connection.query(`
INSERT INTO product_time_aggregates ( INSERT INTO product_time_aggregates (
pid, year, month, pid, year, month,
@@ -170,11 +232,7 @@ async function calculateTimeAggregates(startTime, totalProducts, processedCount
order_count, stock_received, stock_ordered, order_count, stock_received, stock_ordered,
avg_price, profit_margin, inventory_value, gmroi avg_price, profit_margin, inventory_value, gmroi
) )
SELECT SELECT *
pid, year, month,
total_quantity_sold, total_revenue, total_cost,
order_count, stock_received, stock_ordered,
avg_price, profit_margin, inventory_value, gmroi
FROM temp_time_aggregates FROM temp_time_aggregates
ON DUPLICATE KEY UPDATE ON DUPLICATE KEY UPDATE
total_quantity_sold = VALUES(total_quantity_sold), total_quantity_sold = VALUES(total_quantity_sold),
@@ -189,6 +247,9 @@ async function calculateTimeAggregates(startTime, totalProducts, processedCount
gmroi = VALUES(gmroi) gmroi = VALUES(gmroi)
`); `);
// Clean up temp tables
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_order_stats');
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_purchase_stats');
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_time_aggregates'); await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_time_aggregates');
lastPid = batch[batch.length - 1].pid; lastPid = batch[batch.length - 1].pid;