Try to speed up calculate script + fixes

This commit is contained in:
2025-02-10 01:29:01 -05:00
parent 7ff757203f
commit 610e26689c
7 changed files with 520 additions and 260 deletions

View File

@@ -107,22 +107,32 @@ async function calculateMetrics() {
// Get counts of records that need updating based on last calculation time
const [[productCount], [orderCount], [poCount]] = await Promise.all([
connection.query(`
SELECT COUNT(*) as total
SELECT COUNT(DISTINCT p.pid) as total
FROM products p
LEFT JOIN calculate_status cs ON cs.module_name = 'product_metrics'
FORCE INDEX (PRIMARY)
LEFT JOIN calculate_status cs ON cs.module_name = 'product_metrics'
LEFT JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid
AND o.updated > COALESCE(cs.last_calculation_timestamp, '1970-01-01')
AND o.canceled = false
LEFT JOIN purchase_orders po FORCE INDEX (idx_purchase_orders_metrics) ON p.pid = po.pid
AND po.updated > COALESCE(cs.last_calculation_timestamp, '1970-01-01')
WHERE p.updated > COALESCE(cs.last_calculation_timestamp, '1970-01-01')
OR o.pid IS NOT NULL
OR po.pid IS NOT NULL
`),
connection.query(`
SELECT COUNT(*) as total
SELECT COUNT(DISTINCT o.id) as total
FROM orders o
LEFT JOIN calculate_status cs ON cs.module_name = 'product_metrics'
FORCE INDEX (idx_orders_metrics)
LEFT JOIN calculate_status cs ON cs.module_name = 'product_metrics'
WHERE o.updated > COALESCE(cs.last_calculation_timestamp, '1970-01-01')
AND o.canceled = false
AND o.canceled = false
`),
connection.query(`
SELECT COUNT(*) as total
SELECT COUNT(DISTINCT po.id) as total
FROM purchase_orders po
LEFT JOIN calculate_status cs ON cs.module_name = 'product_metrics'
FORCE INDEX (idx_purchase_orders_metrics)
LEFT JOIN calculate_status cs ON cs.module_name = 'product_metrics'
WHERE po.updated > COALESCE(cs.last_calculation_timestamp, '1970-01-01')
`)
]);

View File

@@ -86,23 +86,104 @@ async function calculateBrandMetrics(startTime, totalProducts, processedCount =
const [batch] = await connection.query(`
SELECT DISTINCT p.brand
FROM products p
FORCE INDEX (idx_brand)
LEFT JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid AND o.updated > ?
WHERE p.brand IS NOT NULL
AND p.brand > ?
AND (
p.updated > ?
OR EXISTS (
SELECT 1 FROM orders o
WHERE o.pid = p.pid
AND o.updated > ?
)
OR o.id IS NOT NULL
)
ORDER BY p.brand
LIMIT ?
`, [lastBrand, lastCalculationTime, lastCalculationTime, BATCH_SIZE]);
`, [lastCalculationTime, lastBrand, lastCalculationTime, BATCH_SIZE]);
if (batch.length === 0) break;
// Update brand metrics for this batch
// Create temporary tables for better performance
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_product_stats');
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_sales_stats');
await connection.query(`
CREATE TEMPORARY TABLE temp_product_stats (
brand VARCHAR(100) NOT NULL,
product_count INT,
active_products INT,
total_stock_units INT,
total_stock_cost DECIMAL(15,2),
total_stock_retail DECIMAL(15,2),
total_revenue DECIMAL(15,2),
avg_margin DECIMAL(5,2),
PRIMARY KEY (brand),
INDEX (total_revenue),
INDEX (product_count)
) ENGINE=MEMORY
`);
await connection.query(`
CREATE TEMPORARY TABLE temp_sales_stats (
brand VARCHAR(100) NOT NULL,
current_period_sales DECIMAL(15,2),
previous_period_sales DECIMAL(15,2),
PRIMARY KEY (brand),
INDEX (current_period_sales),
INDEX (previous_period_sales)
) ENGINE=MEMORY
`);
// Populate product stats with optimized index usage
await connection.query(`
INSERT INTO temp_product_stats
SELECT
p.brand,
COUNT(DISTINCT p.pid) as product_count,
COUNT(DISTINCT CASE WHEN p.visible = true THEN p.pid END) as active_products,
SUM(p.stock_quantity) as total_stock_units,
SUM(p.stock_quantity * p.cost_price) as total_stock_cost,
SUM(p.stock_quantity * p.price) as total_stock_retail,
SUM(pm.total_revenue) as total_revenue,
AVG(pm.avg_margin_percent) as avg_margin
FROM products p
FORCE INDEX (idx_brand)
LEFT JOIN product_metrics pm FORCE INDEX (PRIMARY) ON p.pid = pm.pid
WHERE p.brand IN (?)
AND (
p.updated > ?
OR EXISTS (
SELECT 1 FROM orders o FORCE INDEX (idx_orders_metrics)
WHERE o.pid = p.pid
AND o.updated > ?
)
)
GROUP BY p.brand
`, [batch.map(row => row.brand), lastCalculationTime, lastCalculationTime]);
// Populate sales stats with optimized index usage
await connection.query(`
INSERT INTO temp_sales_stats
SELECT
p.brand,
SUM(CASE
WHEN o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY)
THEN o.quantity * o.price
ELSE 0
END) as current_period_sales,
SUM(CASE
WHEN o.date BETWEEN DATE_SUB(CURRENT_DATE, INTERVAL 60 DAY) AND DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY)
THEN o.quantity * o.price
ELSE 0
END) as previous_period_sales
FROM products p
FORCE INDEX (idx_brand)
INNER JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid
AND o.canceled = false
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 60 DAY)
AND o.updated > ?
WHERE p.brand IN (?)
GROUP BY p.brand
`, [lastCalculationTime, batch.map(row => row.brand)]);
// Update metrics using temp tables
await connection.query(`
INSERT INTO brand_metrics (
brand,
@@ -116,50 +197,6 @@ async function calculateBrandMetrics(startTime, totalProducts, processedCount =
growth_rate,
last_calculated_at
)
WITH product_stats AS (
SELECT
p.brand,
COUNT(DISTINCT p.pid) as product_count,
COUNT(DISTINCT CASE WHEN p.visible = true THEN p.pid END) as active_products,
SUM(p.stock_quantity) as total_stock_units,
SUM(p.stock_quantity * p.cost_price) as total_stock_cost,
SUM(p.stock_quantity * p.price) as total_stock_retail,
SUM(pm.total_revenue) as total_revenue,
AVG(pm.avg_margin_percent) as avg_margin
FROM products p
LEFT JOIN product_metrics pm ON p.pid = pm.pid
WHERE p.brand IN (?)
AND (
p.updated > ?
OR EXISTS (
SELECT 1 FROM orders o
WHERE o.pid = p.pid
AND o.updated > ?
)
)
GROUP BY p.brand
),
sales_periods AS (
SELECT
p.brand,
SUM(CASE
WHEN o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY)
THEN o.quantity * o.price
ELSE 0
END) as current_period_sales,
SUM(CASE
WHEN o.date BETWEEN DATE_SUB(CURRENT_DATE, INTERVAL 60 DAY) AND DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY)
THEN o.quantity * o.price
ELSE 0
END) as previous_period_sales
FROM products p
INNER JOIN orders o ON p.pid = o.pid
AND o.canceled = false
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 60 DAY)
AND o.updated > ?
WHERE p.brand IN (?)
GROUP BY p.brand
)
SELECT
ps.brand,
COALESCE(ps.product_count, 0) as product_count,
@@ -170,15 +207,15 @@ async function calculateBrandMetrics(startTime, totalProducts, processedCount =
COALESCE(ps.total_revenue, 0) as total_revenue,
COALESCE(ps.avg_margin, 0) as avg_margin,
CASE
WHEN COALESCE(sp.previous_period_sales, 0) = 0 AND COALESCE(sp.current_period_sales, 0) > 0 THEN 100
WHEN COALESCE(sp.previous_period_sales, 0) = 0 THEN 0
WHEN COALESCE(ss.previous_period_sales, 0) = 0 AND COALESCE(ss.current_period_sales, 0) > 0 THEN 100
WHEN COALESCE(ss.previous_period_sales, 0) = 0 THEN 0
ELSE LEAST(999.99, GREATEST(-100,
((COALESCE(sp.current_period_sales, 0) / sp.previous_period_sales) - 1) * 100
((COALESCE(ss.current_period_sales, 0) / ss.previous_period_sales) - 1) * 100
))
END as growth_rate,
NOW() as last_calculated_at
FROM product_stats ps
LEFT JOIN sales_periods sp ON ps.brand = sp.brand
FROM temp_product_stats ps
LEFT JOIN temp_sales_stats ss ON ps.brand = ss.brand
ON DUPLICATE KEY UPDATE
product_count = VALUES(product_count),
active_products = VALUES(active_products),
@@ -189,13 +226,11 @@ async function calculateBrandMetrics(startTime, totalProducts, processedCount =
avg_margin = VALUES(avg_margin),
growth_rate = VALUES(growth_rate),
last_calculated_at = NOW()
`, [
batch.map(row => row.brand), // For first IN clause
lastCalculationTime, // For p.updated > ?
lastCalculationTime, // For o.updated > ? in EXISTS
lastCalculationTime, // For o.updated > ? in sales_periods
batch.map(row => row.brand) // For second IN clause
]);
`);
// Clean up temp tables
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_product_stats');
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_sales_stats');
lastBrand = batch[batch.length - 1].brand;
processedCount += batch.length;

View File

@@ -88,9 +88,10 @@ async function calculateCategoryMetrics(startTime, totalProducts, processedCount
const [batch] = await connection.query(`
SELECT DISTINCT c.cat_id
FROM categories c
JOIN product_categories pc ON c.cat_id = pc.cat_id
LEFT JOIN products p ON pc.pid = p.pid AND p.updated > ?
LEFT JOIN orders o ON p.pid = o.pid AND o.updated > ?
FORCE INDEX (PRIMARY)
JOIN product_categories pc FORCE INDEX (idx_category) ON c.cat_id = pc.cat_id
LEFT JOIN products p FORCE INDEX (PRIMARY) ON pc.pid = p.pid AND p.updated > ?
LEFT JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid AND o.updated > ?
WHERE c.status = 'active'
AND c.cat_id > ?
AND (
@@ -103,7 +104,81 @@ async function calculateCategoryMetrics(startTime, totalProducts, processedCount
if (batch.length === 0) break;
// Update category metrics for this batch
// Create temporary tables for better performance
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_product_stats');
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_sales_stats');
await connection.query(`
CREATE TEMPORARY TABLE temp_product_stats (
cat_id BIGINT NOT NULL,
product_count INT,
active_products INT,
total_value DECIMAL(15,2),
avg_margin DECIMAL(5,2),
turnover_rate DECIMAL(10,2),
PRIMARY KEY (cat_id),
INDEX (product_count),
INDEX (total_value)
) ENGINE=MEMORY
`);
await connection.query(`
CREATE TEMPORARY TABLE temp_sales_stats (
cat_id BIGINT NOT NULL,
recent_revenue DECIMAL(15,2),
previous_revenue DECIMAL(15,2),
PRIMARY KEY (cat_id),
INDEX (recent_revenue),
INDEX (previous_revenue)
) ENGINE=MEMORY
`);
// Populate product stats
await connection.query(`
INSERT INTO temp_product_stats
SELECT
c.cat_id,
COUNT(DISTINCT p.pid) as product_count,
COUNT(DISTINCT CASE WHEN p.visible = true THEN p.pid END) as active_products,
SUM(p.stock_quantity * p.cost_price) as total_value,
AVG(pm.avg_margin_percent) as avg_margin,
AVG(pm.turnover_rate) as turnover_rate
FROM categories c
FORCE INDEX (PRIMARY)
JOIN product_categories pc FORCE INDEX (idx_category) ON c.cat_id = pc.cat_id
LEFT JOIN products p FORCE INDEX (PRIMARY) ON pc.pid = p.pid
LEFT JOIN product_metrics pm FORCE INDEX (PRIMARY) ON p.pid = pm.pid
WHERE c.cat_id IN (?)
GROUP BY c.cat_id
`, [batch.map(row => row.cat_id)]);
// Populate sales stats
await connection.query(`
INSERT INTO temp_sales_stats
SELECT
c.cat_id,
COALESCE(SUM(CASE
WHEN o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY)
THEN o.quantity * o.price
ELSE 0
END), 0) as recent_revenue,
COALESCE(SUM(CASE
WHEN o.date BETWEEN DATE_SUB(CURRENT_DATE, INTERVAL 60 DAY) AND DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY)
THEN o.quantity * o.price
ELSE 0
END), 0) as previous_revenue
FROM categories c
FORCE INDEX (PRIMARY)
JOIN product_categories pc FORCE INDEX (idx_category) ON c.cat_id = pc.cat_id
LEFT JOIN products p FORCE INDEX (PRIMARY) ON pc.pid = p.pid
LEFT JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid
AND o.canceled = false
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 60 DAY)
WHERE c.cat_id IN (?)
GROUP BY c.cat_id
`, [batch.map(row => row.cat_id)]);
// Update metrics using temp tables
await connection.query(`
INSERT INTO category_metrics (
category_id,
@@ -118,49 +193,25 @@ async function calculateCategoryMetrics(startTime, totalProducts, processedCount
)
SELECT
c.cat_id,
COUNT(DISTINCT p.pid) as product_count,
COUNT(DISTINCT CASE WHEN p.visible = true THEN p.pid END) as active_products,
SUM(p.stock_quantity * p.cost_price) as total_value,
AVG(pm.avg_margin_percent) as avg_margin,
AVG(pm.turnover_rate) as turnover_rate,
COALESCE(ps.product_count, 0) as product_count,
COALESCE(ps.active_products, 0) as active_products,
COALESCE(ps.total_value, 0) as total_value,
COALESCE(ps.avg_margin, 0) as avg_margin,
COALESCE(ps.turnover_rate, 0) as turnover_rate,
CASE
WHEN COALESCE(SUM(CASE
WHEN o.date BETWEEN DATE_SUB(CURRENT_DATE, INTERVAL 60 DAY) AND DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY)
THEN o.quantity * o.price
ELSE 0
END), 0) = 0 AND COALESCE(SUM(CASE
WHEN o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY)
THEN o.quantity * o.price
ELSE 0
END), 0) > 0 THEN 100
WHEN COALESCE(SUM(CASE
WHEN o.date BETWEEN DATE_SUB(CURRENT_DATE, INTERVAL 60 DAY) AND DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY)
THEN o.quantity * o.price
ELSE 0
END), 0) = 0 THEN 0
WHEN ss.previous_revenue = 0 AND ss.recent_revenue > 0 THEN 100
WHEN ss.previous_revenue = 0 THEN 0
ELSE LEAST(999.99, GREATEST(-100,
((SUM(CASE
WHEN o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY)
THEN o.quantity * o.price
ELSE 0
END) / NULLIF(SUM(CASE
WHEN o.date BETWEEN DATE_SUB(CURRENT_DATE, INTERVAL 60 DAY) AND DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY)
THEN o.quantity * o.price
ELSE 0
END), 0) - 1) * 100)
((ss.recent_revenue / NULLIF(ss.previous_revenue, 0) - 1) * 100)
))
END as growth_rate,
c.status,
NOW() as last_calculated_at
FROM categories c
JOIN product_categories pc ON c.cat_id = pc.cat_id
LEFT JOIN products p ON pc.pid = p.pid
LEFT JOIN product_metrics pm ON p.pid = pm.pid
LEFT JOIN orders o ON p.pid = o.pid
AND o.canceled = false
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 60 DAY)
FORCE INDEX (PRIMARY)
LEFT JOIN temp_product_stats ps ON c.cat_id = ps.cat_id
LEFT JOIN temp_sales_stats ss ON c.cat_id = ss.cat_id
WHERE c.cat_id IN (?)
GROUP BY c.cat_id, c.status
ON DUPLICATE KEY UPDATE
product_count = VALUES(product_count),
active_products = VALUES(active_products),
@@ -172,6 +223,10 @@ async function calculateCategoryMetrics(startTime, totalProducts, processedCount
last_calculated_at = NOW()
`, [batch.map(row => row.cat_id)]);
// Clean up temp tables
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_product_stats');
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_sales_stats');
lastCatId = batch[batch.length - 1].cat_id;
processedCount += batch.length;

View File

@@ -119,10 +119,39 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
processedOrders = orderCount[0].count;
// Clear temporary tables
await connection.query('TRUNCATE TABLE temp_sales_metrics');
await connection.query('TRUNCATE TABLE temp_purchase_metrics');
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_sales_metrics');
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_purchase_metrics');
// Populate temp_sales_metrics with base stats and sales averages
// Create optimized temporary tables with indexes
await connection.query(`
CREATE TEMPORARY TABLE temp_sales_metrics (
pid BIGINT NOT NULL,
daily_sales_avg DECIMAL(10,3),
weekly_sales_avg DECIMAL(10,3),
monthly_sales_avg DECIMAL(10,3),
total_revenue DECIMAL(10,2),
avg_margin_percent DECIMAL(5,2),
first_sale_date DATE,
last_sale_date DATE,
PRIMARY KEY (pid),
INDEX (daily_sales_avg),
INDEX (total_revenue)
) ENGINE=MEMORY
`);
await connection.query(`
CREATE TEMPORARY TABLE temp_purchase_metrics (
pid BIGINT NOT NULL,
avg_lead_time_days DECIMAL(5,1),
last_purchase_date DATE,
first_received_date DATE,
last_received_date DATE,
PRIMARY KEY (pid),
INDEX (avg_lead_time_days)
) ENGINE=MEMORY
`);
// Populate temp_sales_metrics with base stats and sales averages using FORCE INDEX
await connection.query(`
INSERT INTO temp_sales_metrics
SELECT
@@ -139,12 +168,13 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
MIN(o.date) as first_sale_date,
MAX(o.date) as last_sale_date
FROM products p
LEFT JOIN orders o ON p.pid = o.pid
AND o.canceled = false
AND o.date >= DATE_SUB(CURDATE(), INTERVAL 90 DAY)
FORCE INDEX (PRIMARY)
LEFT JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid
AND o.canceled = false
AND o.date >= DATE_SUB(CURDATE(), INTERVAL 90 DAY)
WHERE p.updated > ?
OR EXISTS (
SELECT 1 FROM orders o2
SELECT 1 FROM orders o2 FORCE INDEX (idx_orders_metrics)
WHERE o2.pid = p.pid
AND o2.canceled = false
AND o2.updated > ?
@@ -152,7 +182,7 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
GROUP BY p.pid
`, [lastCalculationTime, lastCalculationTime]);
// Populate temp_purchase_metrics
// Populate temp_purchase_metrics with optimized index usage
await connection.query(`
INSERT INTO temp_purchase_metrics
SELECT
@@ -162,12 +192,13 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
MIN(po.received_date) as first_received_date,
MAX(po.received_date) as last_received_date
FROM products p
LEFT JOIN purchase_orders po ON p.pid = po.pid
AND po.received_date IS NOT NULL
AND po.date >= DATE_SUB(CURDATE(), INTERVAL 365 DAY)
FORCE INDEX (PRIMARY)
LEFT JOIN purchase_orders po FORCE INDEX (idx_po_metrics) ON p.pid = po.pid
AND po.received_date IS NOT NULL
AND po.date >= DATE_SUB(CURDATE(), INTERVAL 365 DAY)
WHERE p.updated > ?
OR EXISTS (
SELECT 1 FROM purchase_orders po2
SELECT 1 FROM purchase_orders po2 FORCE INDEX (idx_po_metrics)
WHERE po2.pid = p.pid
AND po2.updated > ?
)

View File

@@ -86,7 +86,8 @@ async function calculateSalesForecasts(startTime, totalProducts, processedCount
const [batch] = await connection.query(`
SELECT DISTINCT p.pid
FROM products p
LEFT JOIN orders o ON p.pid = o.pid AND o.updated > ?
FORCE INDEX (PRIMARY)
LEFT JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid AND o.updated > ?
WHERE p.visible = true
AND p.pid > ?
AND (
@@ -99,101 +100,160 @@ async function calculateSalesForecasts(startTime, totalProducts, processedCount
if (batch.length === 0) break;
// Calculate forecasts for this batch
// Create temporary tables for better performance
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_historical_sales');
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_sales_stats');
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_recent_trend');
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_confidence_calc');
// Create optimized temporary tables with indexes
await connection.query(`
INSERT INTO sales_forecasts (
pid,
forecast_date,
forecast_units,
forecast_revenue,
confidence_level,
last_calculated_at
)
WITH historical_sales AS (
SELECT
o.pid,
DATE(o.date) as sale_date,
SUM(o.quantity) as daily_quantity,
SUM(o.quantity * o.price) as daily_revenue
FROM orders o
WHERE o.canceled = false
AND o.pid IN (?)
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 180 DAY)
GROUP BY o.pid, DATE(o.date)
),
sales_stats AS (
SELECT
pid,
AVG(daily_quantity) as avg_daily_units,
AVG(daily_revenue) as avg_daily_revenue,
STDDEV(daily_quantity) as std_daily_units,
COUNT(*) as days_with_sales,
MIN(sale_date) as first_sale,
MAX(sale_date) as last_sale
FROM historical_sales
GROUP BY pid
),
recent_trend AS (
SELECT
h.pid,
AVG(h.daily_quantity) as recent_avg_units,
AVG(h.daily_revenue) as recent_avg_revenue
FROM historical_sales h
WHERE h.sale_date >= DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY)
GROUP BY h.pid
),
confidence_calc AS (
SELECT
s.pid,
LEAST(100, GREATEST(0, ROUND(
(s.days_with_sales / 180.0 * 50) + -- Up to 50 points for history length
(CASE
WHEN s.std_daily_units = 0 OR s.avg_daily_units = 0 THEN 0
WHEN (s.std_daily_units / s.avg_daily_units) <= 0.5 THEN 30
WHEN (s.std_daily_units / s.avg_daily_units) <= 1.0 THEN 20
WHEN (s.std_daily_units / s.avg_daily_units) <= 2.0 THEN 10
ELSE 0
END) + -- Up to 30 points for consistency
(CASE
WHEN DATEDIFF(CURRENT_DATE, s.last_sale) <= 7 THEN 20
WHEN DATEDIFF(CURRENT_DATE, s.last_sale) <= 30 THEN 10
ELSE 0
END) -- Up to 20 points for recency
))) as confidence_level
FROM sales_stats s
)
(SELECT
s.pid,
DATE_ADD(CURRENT_DATE, INTERVAL n.days DAY) as forecast_date,
GREATEST(0, ROUND(
CASE
WHEN s.days_with_sales >= n.days THEN
COALESCE(t.recent_avg_units, s.avg_daily_units)
ELSE s.avg_daily_units * (s.days_with_sales / n.days)
END
)) as forecast_units,
GREATEST(0, ROUND(
CASE
WHEN s.days_with_sales >= n.days THEN
COALESCE(t.recent_avg_revenue, s.avg_daily_revenue)
ELSE s.avg_daily_revenue * (s.days_with_sales / n.days)
END
, 2)) as forecast_revenue,
c.confidence_level,
NOW() as last_calculated_at
FROM sales_stats s
CROSS JOIN (
SELECT 30 as days UNION SELECT 60 UNION SELECT 90
) n
LEFT JOIN recent_trend t ON s.pid = t.pid
LEFT JOIN confidence_calc c ON s.pid = c.pid)
ON DUPLICATE KEY UPDATE
forecast_units = VALUES(forecast_units),
forecast_revenue = VALUES(forecast_revenue),
confidence_level = VALUES(confidence_level),
last_calculated_at = NOW()
CREATE TEMPORARY TABLE temp_historical_sales (
pid BIGINT NOT NULL,
sale_date DATE NOT NULL,
daily_quantity INT,
daily_revenue DECIMAL(15,2),
PRIMARY KEY (pid, sale_date),
INDEX (sale_date)
) ENGINE=MEMORY
`);
await connection.query(`
CREATE TEMPORARY TABLE temp_sales_stats (
pid BIGINT NOT NULL,
avg_daily_units DECIMAL(10,2),
avg_daily_revenue DECIMAL(15,2),
std_daily_units DECIMAL(10,2),
days_with_sales INT,
first_sale DATE,
last_sale DATE,
PRIMARY KEY (pid),
INDEX (days_with_sales),
INDEX (last_sale)
) ENGINE=MEMORY
`);
await connection.query(`
CREATE TEMPORARY TABLE temp_recent_trend (
pid BIGINT NOT NULL,
recent_avg_units DECIMAL(10,2),
recent_avg_revenue DECIMAL(15,2),
PRIMARY KEY (pid)
) ENGINE=MEMORY
`);
await connection.query(`
CREATE TEMPORARY TABLE temp_confidence_calc (
pid BIGINT NOT NULL,
confidence_level TINYINT,
PRIMARY KEY (pid)
) ENGINE=MEMORY
`);
// Populate historical sales with optimized index usage
await connection.query(`
INSERT INTO temp_historical_sales
SELECT
o.pid,
DATE(o.date) as sale_date,
SUM(o.quantity) as daily_quantity,
SUM(o.quantity * o.price) as daily_revenue
FROM orders o
FORCE INDEX (idx_orders_metrics)
WHERE o.canceled = false
AND o.pid IN (?)
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 180 DAY)
GROUP BY o.pid, DATE(o.date)
`, [batch.map(row => row.pid)]);
// Populate sales stats
await connection.query(`
INSERT INTO temp_sales_stats
SELECT
pid,
AVG(daily_quantity) as avg_daily_units,
AVG(daily_revenue) as avg_daily_revenue,
STDDEV(daily_quantity) as std_daily_units,
COUNT(*) as days_with_sales,
MIN(sale_date) as first_sale,
MAX(sale_date) as last_sale
FROM temp_historical_sales
GROUP BY pid
`);
// Populate recent trend
await connection.query(`
INSERT INTO temp_recent_trend
SELECT
h.pid,
AVG(h.daily_quantity) as recent_avg_units,
AVG(h.daily_revenue) as recent_avg_revenue
FROM temp_historical_sales h
WHERE h.sale_date >= DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY)
GROUP BY h.pid
`);
// Calculate confidence levels
await connection.query(`
INSERT INTO temp_confidence_calc
SELECT
s.pid,
LEAST(100, GREATEST(0, ROUND(
(s.days_with_sales / 180.0 * 50) + -- Up to 50 points for history length
(CASE
WHEN s.std_daily_units = 0 OR s.avg_daily_units = 0 THEN 0
WHEN (s.std_daily_units / s.avg_daily_units) <= 0.5 THEN 30
WHEN (s.std_daily_units / s.avg_daily_units) <= 1.0 THEN 20
WHEN (s.std_daily_units / s.avg_daily_units) <= 2.0 THEN 10
ELSE 0
END) + -- Up to 30 points for consistency
(CASE
WHEN DATEDIFF(CURRENT_DATE, s.last_sale) <= 7 THEN 20
WHEN DATEDIFF(CURRENT_DATE, s.last_sale) <= 30 THEN 10
ELSE 0
END) -- Up to 20 points for recency
))) as confidence_level
FROM temp_sales_stats s
`);
// Generate forecasts using temp tables
await connection.query(`
REPLACE INTO sales_forecasts
(pid, forecast_date, forecast_units, forecast_revenue, confidence_level, last_calculated_at)
SELECT
s.pid,
DATE_ADD(CURRENT_DATE, INTERVAL n.days DAY),
GREATEST(0, ROUND(
CASE
WHEN s.days_with_sales >= n.days THEN COALESCE(t.recent_avg_units, s.avg_daily_units)
ELSE s.avg_daily_units * (s.days_with_sales / n.days)
END
)),
GREATEST(0, ROUND(
CASE
WHEN s.days_with_sales >= n.days THEN COALESCE(t.recent_avg_revenue, s.avg_daily_revenue)
ELSE s.avg_daily_revenue * (s.days_with_sales / n.days)
END,
2
)),
c.confidence_level,
NOW()
FROM temp_sales_stats s
CROSS JOIN (
SELECT 30 as days
UNION SELECT 60
UNION SELECT 90
) n
LEFT JOIN temp_recent_trend t ON s.pid = t.pid
LEFT JOIN temp_confidence_calc c ON s.pid = c.pid;
`);
// Clean up temp tables
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_historical_sales');
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_sales_stats');
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_recent_trend');
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_confidence_calc');
lastPid = batch[batch.length - 1].pid;
processedCount += batch.length;

View File

@@ -85,12 +85,14 @@ async function calculateTimeAggregates(startTime, totalProducts, processedCount
const [batch] = await connection.query(`
SELECT DISTINCT p.pid
FROM products p
LEFT JOIN orders o ON p.pid = o.pid
FORCE INDEX (PRIMARY)
LEFT JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid
WHERE p.pid > ?
AND (
p.updated > ?
OR EXISTS (
SELECT 1 FROM orders o2
SELECT 1
FROM orders o2 FORCE INDEX (idx_orders_metrics)
WHERE o2.pid = p.pid
AND o2.updated > ?
)
@@ -101,21 +103,32 @@ async function calculateTimeAggregates(startTime, totalProducts, processedCount
if (batch.length === 0) break;
// Calculate and update time aggregates for this batch
// Calculate and update time aggregates for this batch using temporary table
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_time_aggregates');
await connection.query(`
INSERT INTO product_time_aggregates (
pid,
year,
month,
total_quantity_sold,
total_revenue,
total_cost,
order_count,
avg_price,
profit_margin,
inventory_value,
gmroi
)
CREATE TEMPORARY TABLE temp_time_aggregates (
pid BIGINT NOT NULL,
year INT NOT NULL,
month INT NOT NULL,
total_quantity_sold INT DEFAULT 0,
total_revenue DECIMAL(10,3) DEFAULT 0,
total_cost DECIMAL(10,3) DEFAULT 0,
order_count INT DEFAULT 0,
stock_received INT DEFAULT 0,
stock_ordered INT DEFAULT 0,
avg_price DECIMAL(10,3),
profit_margin DECIMAL(10,3),
inventory_value DECIMAL(10,3),
gmroi DECIMAL(10,3),
PRIMARY KEY (pid, year, month),
INDEX (pid),
INDEX (year, month)
) ENGINE=MEMORY
`);
// Populate temporary table
await connection.query(`
INSERT INTO temp_time_aggregates
SELECT
p.pid,
YEAR(o.date) as year,
@@ -124,6 +137,8 @@ async function calculateTimeAggregates(startTime, totalProducts, processedCount
SUM(o.quantity * o.price) as total_revenue,
SUM(o.quantity * p.cost_price) as total_cost,
COUNT(DISTINCT o.order_number) as order_count,
COALESCE(SUM(CASE WHEN po.received_date IS NOT NULL THEN po.received ELSE 0 END), 0) as stock_received,
COALESCE(SUM(po.ordered), 0) as stock_ordered,
AVG(o.price) as avg_price,
CASE
WHEN SUM(o.quantity * o.price) > 0
@@ -137,22 +152,45 @@ async function calculateTimeAggregates(startTime, totalProducts, processedCount
ELSE 0
END as gmroi
FROM products p
INNER JOIN orders o ON p.pid = o.pid
AND o.canceled = false
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH)
FORCE INDEX (PRIMARY)
INNER JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid
AND o.canceled = false
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH)
LEFT JOIN purchase_orders po FORCE INDEX (idx_po_metrics) ON p.pid = po.pid
AND po.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH)
WHERE p.pid IN (?)
GROUP BY p.pid, YEAR(o.date), MONTH(o.date)
HAVING year IS NOT NULL AND month IS NOT NULL
`, [batch.map(row => row.pid)]);
// Update from temporary table
await connection.query(`
INSERT INTO product_time_aggregates (
pid, year, month,
total_quantity_sold, total_revenue, total_cost,
order_count, stock_received, stock_ordered,
avg_price, profit_margin, inventory_value, gmroi
)
SELECT
pid, year, month,
total_quantity_sold, total_revenue, total_cost,
order_count, stock_received, stock_ordered,
avg_price, profit_margin, inventory_value, gmroi
FROM temp_time_aggregates
ON DUPLICATE KEY UPDATE
total_quantity_sold = VALUES(total_quantity_sold),
total_revenue = VALUES(total_revenue),
total_cost = VALUES(total_cost),
order_count = VALUES(order_count),
stock_received = VALUES(stock_received),
stock_ordered = VALUES(stock_ordered),
avg_price = VALUES(avg_price),
profit_margin = VALUES(profit_margin),
inventory_value = VALUES(inventory_value),
gmroi = VALUES(gmroi)
`, [batch.map(row => row.pid)]);
`);
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_time_aggregates');
lastPid = batch[batch.length - 1].pid;
processedCount += batch.length;

View File

@@ -94,33 +94,54 @@ async function calculateVendorMetrics(startTime, totalProducts, processedCount =
const [batch] = await connection.query(`
SELECT DISTINCT v.vendor
FROM vendor_details v
FORCE INDEX (PRIMARY)
LEFT JOIN products p FORCE INDEX (idx_vendor) ON p.vendor = v.vendor AND p.updated > ?
LEFT JOIN purchase_orders po FORCE INDEX (idx_vendor) ON po.vendor = v.vendor AND po.updated > ?
WHERE v.status = 'active'
AND v.vendor > ?
AND (
EXISTS (
SELECT 1 FROM products p
WHERE p.vendor = v.vendor
AND p.updated > ?
)
OR EXISTS (
SELECT 1 FROM purchase_orders po
WHERE po.vendor = v.vendor
AND po.updated > ?
)
)
AND (p.pid IS NOT NULL OR po.po_id IS NOT NULL)
ORDER BY v.vendor
LIMIT ?
`, [lastVendor, lastCalculationTime, lastCalculationTime, BATCH_SIZE]);
`, [lastCalculationTime, lastCalculationTime, lastVendor, BATCH_SIZE]);
if (batch.length === 0) break;
// Create temporary tables for better performance
// Create temporary tables with optimized structure and indexes
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_purchase_stats');
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_product_stats');
// Create and populate purchase_stats temp table
await connection.query(`
CREATE TEMPORARY TABLE temp_purchase_stats AS
CREATE TEMPORARY TABLE temp_purchase_stats (
vendor VARCHAR(100) NOT NULL,
avg_lead_time_days DECIMAL(10,2),
total_orders INT,
total_late_orders INT,
total_purchase_value DECIMAL(15,2),
avg_order_value DECIMAL(15,2),
on_time_delivery_rate DECIMAL(5,2),
order_fill_rate DECIMAL(5,2),
PRIMARY KEY (vendor),
INDEX (total_orders),
INDEX (total_purchase_value)
) ENGINE=MEMORY
`);
await connection.query(`
CREATE TEMPORARY TABLE temp_product_stats (
vendor VARCHAR(100) NOT NULL,
total_products INT,
active_products INT,
avg_margin_percent DECIMAL(5,2),
total_revenue DECIMAL(15,2),
PRIMARY KEY (vendor),
INDEX (total_products),
INDEX (total_revenue)
) ENGINE=MEMORY
`);
// Populate purchase_stats temp table with optimized index usage
await connection.query(`
INSERT INTO temp_purchase_stats
SELECT
po.vendor,
AVG(DATEDIFF(po.received_date, po.date)) as avg_lead_time_days,
@@ -131,6 +152,7 @@ async function calculateVendorMetrics(startTime, totalProducts, processedCount =
(COUNT(CASE WHEN DATEDIFF(po.received_date, po.date) <= 30 THEN 1 END) / COUNT(*)) * 100 as on_time_delivery_rate,
(SUM(LEAST(po.received, po.ordered)) / NULLIF(SUM(po.ordered), 0)) * 100 as order_fill_rate
FROM purchase_orders po
FORCE INDEX (idx_vendor)
WHERE po.vendor IN (?)
AND po.received_date IS NOT NULL
AND po.date >= DATE_SUB(CURRENT_DATE, INTERVAL 365 DAY)
@@ -138,23 +160,31 @@ async function calculateVendorMetrics(startTime, totalProducts, processedCount =
GROUP BY po.vendor
`, [batch.map(row => row.vendor), lastCalculationTime]);
// Create and populate product_stats temp table
// Populate product stats with optimized index usage
await connection.query(`
CREATE TEMPORARY TABLE temp_product_stats AS
INSERT INTO temp_product_stats
SELECT
p.vendor,
COUNT(DISTINCT p.pid) as total_products,
COUNT(DISTINCT p.pid) as product_count,
COUNT(DISTINCT CASE WHEN p.visible = true THEN p.pid END) as active_products,
AVG(pm.avg_margin_percent) as avg_margin_percent,
AVG(pm.avg_margin_percent) as avg_margin,
SUM(pm.total_revenue) as total_revenue
FROM products p
LEFT JOIN product_metrics pm ON p.pid = pm.pid
FORCE INDEX (idx_vendor)
LEFT JOIN product_metrics pm FORCE INDEX (PRIMARY) ON p.pid = pm.pid
WHERE p.vendor IN (?)
AND p.updated > ?
AND (
p.updated > ?
OR EXISTS (
SELECT 1 FROM orders o FORCE INDEX (idx_orders_metrics)
WHERE o.pid = p.pid
AND o.updated > ?
)
)
GROUP BY p.vendor
`, [batch.map(row => row.vendor), lastCalculationTime]);
`, [batch.map(row => row.vendor), lastCalculationTime, lastCalculationTime]);
// Update metrics using temp tables
// Update metrics using temp tables with optimized join order
await connection.query(`
INSERT INTO vendor_metrics (
vendor,
@@ -188,8 +218,9 @@ async function calculateVendorMetrics(startTime, totalProducts, processedCount =
v.status,
NOW() as last_calculated_at
FROM vendor_details v
LEFT JOIN temp_purchase_stats ps ON v.vendor = ps.vendor
LEFT JOIN temp_product_stats prs ON v.vendor = prs.vendor
FORCE INDEX (PRIMARY)
LEFT JOIN temp_purchase_stats ps ON v.vendor = ps.vendor
LEFT JOIN temp_product_stats prs ON v.vendor = prs.vendor
WHERE v.vendor IN (?)
ON DUPLICATE KEY UPDATE
avg_lead_time_days = VALUES(avg_lead_time_days),