Compare commits
6 Commits
add-produc
...
eea57528ab
| Author | SHA1 | Date | |
|---|---|---|---|
| eea57528ab | |||
| 3d2d1b3946 | |||
| d936d50f83 | |||
| 610e26689c | |||
| 7ff757203f | |||
| 843ce71506 |
@@ -126,13 +126,13 @@ CREATE TABLE IF NOT EXISTS vendor_metrics (
|
||||
order_fill_rate DECIMAL(5,2),
|
||||
total_orders INT DEFAULT 0,
|
||||
total_late_orders INT DEFAULT 0,
|
||||
total_purchase_value DECIMAL(10,3) DEFAULT 0,
|
||||
avg_order_value DECIMAL(10,3),
|
||||
total_purchase_value DECIMAL(15,3) DEFAULT 0,
|
||||
avg_order_value DECIMAL(15,3),
|
||||
-- Product metrics
|
||||
active_products INT DEFAULT 0,
|
||||
total_products INT DEFAULT 0,
|
||||
-- Financial metrics
|
||||
total_revenue DECIMAL(10,3) DEFAULT 0,
|
||||
total_revenue DECIMAL(15,3) DEFAULT 0,
|
||||
avg_margin_percent DECIMAL(5,2),
|
||||
-- Status
|
||||
status VARCHAR(20) DEFAULT 'active',
|
||||
|
||||
@@ -104,17 +104,54 @@ async function calculateMetrics() {
|
||||
WHERE status = 'running'
|
||||
`);
|
||||
|
||||
// Get counts from all relevant tables
|
||||
// Get counts of records that need updating based on last calculation time
|
||||
const [[productCount], [orderCount], [poCount]] = await Promise.all([
|
||||
connection.query('SELECT COUNT(*) as total FROM products'),
|
||||
connection.query('SELECT COUNT(*) as total FROM orders'),
|
||||
connection.query('SELECT COUNT(*) as total FROM purchase_orders')
|
||||
connection.query(`
|
||||
SELECT COUNT(DISTINCT p.pid) as total
|
||||
FROM products p
|
||||
FORCE INDEX (PRIMARY)
|
||||
LEFT JOIN calculate_status cs ON cs.module_name = 'product_metrics'
|
||||
LEFT JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid
|
||||
AND o.updated > COALESCE(cs.last_calculation_timestamp, '1970-01-01')
|
||||
AND o.canceled = false
|
||||
LEFT JOIN purchase_orders po FORCE INDEX (idx_purchase_orders_metrics) ON p.pid = po.pid
|
||||
AND po.updated > COALESCE(cs.last_calculation_timestamp, '1970-01-01')
|
||||
WHERE p.updated > COALESCE(cs.last_calculation_timestamp, '1970-01-01')
|
||||
OR o.pid IS NOT NULL
|
||||
OR po.pid IS NOT NULL
|
||||
`),
|
||||
connection.query(`
|
||||
SELECT COUNT(DISTINCT o.id) as total
|
||||
FROM orders o
|
||||
FORCE INDEX (idx_orders_metrics)
|
||||
LEFT JOIN calculate_status cs ON cs.module_name = 'product_metrics'
|
||||
WHERE o.updated > COALESCE(cs.last_calculation_timestamp, '1970-01-01')
|
||||
AND o.canceled = false
|
||||
`),
|
||||
connection.query(`
|
||||
SELECT COUNT(DISTINCT po.id) as total
|
||||
FROM purchase_orders po
|
||||
FORCE INDEX (idx_purchase_orders_metrics)
|
||||
LEFT JOIN calculate_status cs ON cs.module_name = 'product_metrics'
|
||||
WHERE po.updated > COALESCE(cs.last_calculation_timestamp, '1970-01-01')
|
||||
`)
|
||||
]);
|
||||
|
||||
totalProducts = productCount.total;
|
||||
totalOrders = orderCount.total;
|
||||
totalPurchaseOrders = poCount.total;
|
||||
|
||||
// If nothing needs updating, we can exit early
|
||||
if (totalProducts === 0 && totalOrders === 0 && totalPurchaseOrders === 0) {
|
||||
console.log('No records need updating');
|
||||
return {
|
||||
processedProducts: 0,
|
||||
processedOrders: 0,
|
||||
processedPurchaseOrders: 0,
|
||||
success: true
|
||||
};
|
||||
}
|
||||
|
||||
// Create history record for this calculation
|
||||
const [historyResult] = await connection.query(`
|
||||
INSERT INTO calculate_history (
|
||||
@@ -239,7 +276,7 @@ async function calculateMetrics() {
|
||||
});
|
||||
|
||||
if (!SKIP_PRODUCT_METRICS) {
|
||||
const result = await calculateProductMetrics(startTime, totalProducts);
|
||||
const result = await calculateProductMetrics(startTime, totalProducts, processedProducts, isCancelled);
|
||||
await updateProgress(result.processedProducts, result.processedOrders, result.processedPurchaseOrders);
|
||||
if (!result.success) {
|
||||
throw new Error('Product metrics calculation failed');
|
||||
|
||||
@@ -4,19 +4,50 @@ const { getConnection } = require('./utils/db');
|
||||
async function calculateBrandMetrics(startTime, totalProducts, processedCount = 0, isCancelled = false) {
|
||||
const connection = await getConnection();
|
||||
let success = false;
|
||||
let processedOrders = 0;
|
||||
const BATCH_SIZE = 5000;
|
||||
|
||||
try {
|
||||
// Get last calculation timestamp
|
||||
const [lastCalc] = await connection.query(`
|
||||
SELECT last_calculation_timestamp
|
||||
FROM calculate_status
|
||||
WHERE module_name = 'brand_metrics'
|
||||
`);
|
||||
const lastCalculationTime = lastCalc[0]?.last_calculation_timestamp || '1970-01-01';
|
||||
|
||||
// Get total count of brands needing updates
|
||||
const [brandCount] = await connection.query(`
|
||||
SELECT COUNT(DISTINCT p.brand) as count
|
||||
FROM products p
|
||||
LEFT JOIN orders o ON p.pid = o.pid AND o.updated > ?
|
||||
WHERE p.brand IS NOT NULL
|
||||
AND (
|
||||
p.updated > ?
|
||||
OR o.id IS NOT NULL
|
||||
)
|
||||
`, [lastCalculationTime, lastCalculationTime]);
|
||||
const totalBrands = brandCount[0].count;
|
||||
|
||||
if (totalBrands === 0) {
|
||||
console.log('No brands need metric updates');
|
||||
return {
|
||||
processedProducts: 0,
|
||||
processedOrders: 0,
|
||||
processedPurchaseOrders: 0,
|
||||
success: true
|
||||
};
|
||||
}
|
||||
|
||||
if (isCancelled) {
|
||||
outputProgress({
|
||||
status: 'cancelled',
|
||||
operation: 'Brand metrics calculation cancelled',
|
||||
current: processedCount,
|
||||
total: totalProducts,
|
||||
total: totalBrands,
|
||||
elapsed: formatElapsedTime(startTime),
|
||||
remaining: null,
|
||||
rate: calculateRate(startTime, processedCount),
|
||||
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
||||
percentage: ((processedCount / totalBrands) * 100).toFixed(1),
|
||||
timing: {
|
||||
start_time: new Date(startTime).toISOString(),
|
||||
end_time: new Date().toISOString(),
|
||||
@@ -31,23 +62,15 @@ async function calculateBrandMetrics(startTime, totalProducts, processedCount =
|
||||
};
|
||||
}
|
||||
|
||||
// Get order count that will be processed
|
||||
const [orderCount] = await connection.query(`
|
||||
SELECT COUNT(*) as count
|
||||
FROM orders o
|
||||
WHERE o.canceled = false
|
||||
`);
|
||||
processedOrders = orderCount[0].count;
|
||||
|
||||
outputProgress({
|
||||
status: 'running',
|
||||
operation: 'Starting brand metrics calculation',
|
||||
current: processedCount,
|
||||
total: totalProducts,
|
||||
total: totalBrands,
|
||||
elapsed: formatElapsedTime(startTime),
|
||||
remaining: estimateRemaining(startTime, processedCount, totalProducts),
|
||||
remaining: estimateRemaining(startTime, processedCount, totalBrands),
|
||||
rate: calculateRate(startTime, processedCount),
|
||||
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
||||
percentage: ((processedCount / totalBrands) * 100).toFixed(1),
|
||||
timing: {
|
||||
start_time: new Date(startTime).toISOString(),
|
||||
end_time: new Date().toISOString(),
|
||||
@@ -55,7 +78,122 @@ async function calculateBrandMetrics(startTime, totalProducts, processedCount =
|
||||
}
|
||||
});
|
||||
|
||||
// Calculate brand metrics with optimized queries
|
||||
// Process in batches
|
||||
let lastBrand = '';
|
||||
while (true) {
|
||||
if (isCancelled) break;
|
||||
|
||||
const [batch] = await connection.query(`
|
||||
SELECT DISTINCT p.brand
|
||||
FROM products p
|
||||
FORCE INDEX (idx_brand)
|
||||
LEFT JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid AND o.updated > ?
|
||||
WHERE p.brand IS NOT NULL
|
||||
AND p.brand > ?
|
||||
AND (
|
||||
p.updated > ?
|
||||
OR o.id IS NOT NULL
|
||||
)
|
||||
ORDER BY p.brand
|
||||
LIMIT ?
|
||||
`, [lastCalculationTime, lastBrand, lastCalculationTime, BATCH_SIZE]);
|
||||
|
||||
if (batch.length === 0) break;
|
||||
|
||||
// Create temporary tables for better performance
|
||||
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_product_stats');
|
||||
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_sales_stats');
|
||||
|
||||
await connection.query(`
|
||||
CREATE TEMPORARY TABLE temp_product_stats (
|
||||
brand VARCHAR(100) NOT NULL,
|
||||
product_count INT,
|
||||
active_products INT,
|
||||
total_stock_units INT,
|
||||
total_stock_cost DECIMAL(15,2),
|
||||
total_stock_retail DECIMAL(15,2),
|
||||
total_revenue DECIMAL(15,2),
|
||||
avg_margin DECIMAL(5,2),
|
||||
PRIMARY KEY (brand),
|
||||
INDEX (total_revenue),
|
||||
INDEX (product_count)
|
||||
) ENGINE=MEMORY
|
||||
`);
|
||||
|
||||
await connection.query(`
|
||||
CREATE TEMPORARY TABLE temp_sales_stats (
|
||||
brand VARCHAR(100) NOT NULL,
|
||||
current_period_sales DECIMAL(15,2),
|
||||
previous_period_sales DECIMAL(15,2),
|
||||
PRIMARY KEY (brand),
|
||||
INDEX (current_period_sales),
|
||||
INDEX (previous_period_sales)
|
||||
) ENGINE=MEMORY
|
||||
`);
|
||||
|
||||
// Populate product stats with optimized index usage
|
||||
await connection.query(`
|
||||
INSERT INTO temp_product_stats
|
||||
SELECT
|
||||
p.brand,
|
||||
COUNT(DISTINCT p.pid) as product_count,
|
||||
COUNT(DISTINCT CASE WHEN p.visible = true THEN p.pid END) as active_products,
|
||||
COALESCE(SUM(p.stock_quantity), 0) as total_stock_units,
|
||||
COALESCE(SUM(p.stock_quantity * p.cost_price), 0) as total_stock_cost,
|
||||
COALESCE(SUM(p.stock_quantity * p.price), 0) as total_stock_retail,
|
||||
COALESCE(SUM(pm.total_revenue), 0) as total_revenue,
|
||||
COALESCE(AVG(NULLIF(pm.avg_margin_percent, 0)), 0) as avg_margin
|
||||
FROM products p
|
||||
FORCE INDEX (idx_brand)
|
||||
LEFT JOIN product_metrics pm FORCE INDEX (PRIMARY) ON p.pid = pm.pid
|
||||
WHERE p.brand IN (?)
|
||||
AND (
|
||||
p.updated > ?
|
||||
OR EXISTS (
|
||||
SELECT 1 FROM orders o FORCE INDEX (idx_orders_metrics)
|
||||
WHERE o.pid = p.pid
|
||||
AND o.updated > ?
|
||||
)
|
||||
)
|
||||
GROUP BY p.brand
|
||||
`, [batch.map(row => row.brand), lastCalculationTime, lastCalculationTime]);
|
||||
|
||||
// Populate sales stats with optimized date handling
|
||||
await connection.query(`
|
||||
INSERT INTO temp_sales_stats
|
||||
WITH date_ranges AS (
|
||||
SELECT
|
||||
DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY) as current_start,
|
||||
CURRENT_DATE as current_end,
|
||||
DATE_SUB(CURRENT_DATE, INTERVAL 60 DAY) as previous_start,
|
||||
DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY) as previous_end
|
||||
)
|
||||
SELECT
|
||||
p.brand,
|
||||
COALESCE(SUM(
|
||||
CASE WHEN o.date >= dr.current_start
|
||||
THEN o.quantity * o.price
|
||||
ELSE 0
|
||||
END
|
||||
), 0) as current_period_sales,
|
||||
COALESCE(SUM(
|
||||
CASE WHEN o.date >= dr.previous_start AND o.date < dr.current_start
|
||||
THEN o.quantity * o.price
|
||||
ELSE 0
|
||||
END
|
||||
), 0) as previous_period_sales
|
||||
FROM products p
|
||||
FORCE INDEX (idx_brand)
|
||||
INNER JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid
|
||||
CROSS JOIN date_ranges dr
|
||||
WHERE p.brand IN (?)
|
||||
AND o.canceled = false
|
||||
AND o.date >= dr.previous_start
|
||||
AND o.updated > ?
|
||||
GROUP BY p.brand
|
||||
`, [batch.map(row => row.brand), lastCalculationTime]);
|
||||
|
||||
// Update metrics using temp tables with optimized calculations
|
||||
await connection.query(`
|
||||
INSERT INTO brand_metrics (
|
||||
brand,
|
||||
@@ -66,105 +204,28 @@ async function calculateBrandMetrics(startTime, totalProducts, processedCount =
|
||||
total_stock_retail,
|
||||
total_revenue,
|
||||
avg_margin,
|
||||
growth_rate
|
||||
)
|
||||
WITH filtered_products AS (
|
||||
SELECT
|
||||
p.*,
|
||||
CASE
|
||||
WHEN p.stock_quantity <= 5000 AND p.stock_quantity >= 0
|
||||
THEN p.pid
|
||||
END as valid_pid,
|
||||
CASE
|
||||
WHEN p.visible = true
|
||||
AND p.stock_quantity <= 5000
|
||||
AND p.stock_quantity >= 0
|
||||
THEN p.pid
|
||||
END as active_pid,
|
||||
CASE
|
||||
WHEN p.stock_quantity IS NULL
|
||||
OR p.stock_quantity < 0
|
||||
OR p.stock_quantity > 5000
|
||||
THEN 0
|
||||
ELSE p.stock_quantity
|
||||
END as valid_stock
|
||||
FROM products p
|
||||
WHERE p.brand IS NOT NULL
|
||||
),
|
||||
sales_periods AS (
|
||||
SELECT
|
||||
p.brand,
|
||||
SUM(o.quantity * (o.price - COALESCE(o.discount, 0))) as period_revenue,
|
||||
SUM(o.quantity * (o.price - COALESCE(o.discount, 0) - p.cost_price)) as period_margin,
|
||||
COUNT(DISTINCT DATE(o.date)) as period_days,
|
||||
CASE
|
||||
WHEN o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 3 MONTH) THEN 'current'
|
||||
WHEN o.date BETWEEN DATE_SUB(CURRENT_DATE, INTERVAL 15 MONTH)
|
||||
AND DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH) THEN 'previous'
|
||||
END as period_type
|
||||
FROM filtered_products p
|
||||
JOIN orders o ON p.pid = o.pid
|
||||
WHERE o.canceled = false
|
||||
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 15 MONTH)
|
||||
GROUP BY p.brand, period_type
|
||||
),
|
||||
brand_data AS (
|
||||
SELECT
|
||||
p.brand,
|
||||
COUNT(DISTINCT p.valid_pid) as product_count,
|
||||
COUNT(DISTINCT p.active_pid) as active_products,
|
||||
SUM(p.valid_stock) as total_stock_units,
|
||||
SUM(p.valid_stock * p.cost_price) as total_stock_cost,
|
||||
SUM(p.valid_stock * p.price) as total_stock_retail,
|
||||
COALESCE(SUM(o.quantity * (o.price - COALESCE(o.discount, 0))), 0) as total_revenue,
|
||||
CASE
|
||||
WHEN SUM(o.quantity * o.price) > 0
|
||||
THEN GREATEST(
|
||||
-100.0,
|
||||
LEAST(
|
||||
100.0,
|
||||
(
|
||||
SUM(o.quantity * o.price) - -- Use gross revenue (before discounts)
|
||||
SUM(o.quantity * COALESCE(p.cost_price, 0)) -- Total costs
|
||||
) * 100.0 /
|
||||
NULLIF(SUM(o.quantity * o.price), 0) -- Divide by gross revenue
|
||||
)
|
||||
)
|
||||
ELSE 0
|
||||
END as avg_margin
|
||||
FROM filtered_products p
|
||||
LEFT JOIN orders o ON p.pid = o.pid AND o.canceled = false
|
||||
GROUP BY p.brand
|
||||
growth_rate,
|
||||
last_calculated_at
|
||||
)
|
||||
SELECT
|
||||
bd.brand,
|
||||
bd.product_count,
|
||||
bd.active_products,
|
||||
bd.total_stock_units,
|
||||
bd.total_stock_cost,
|
||||
bd.total_stock_retail,
|
||||
bd.total_revenue,
|
||||
bd.avg_margin,
|
||||
ps.brand,
|
||||
ps.product_count,
|
||||
ps.active_products,
|
||||
ps.total_stock_units,
|
||||
ps.total_stock_cost,
|
||||
ps.total_stock_retail,
|
||||
ps.total_revenue,
|
||||
ps.avg_margin,
|
||||
CASE
|
||||
WHEN MAX(CASE WHEN sp.period_type = 'previous' THEN sp.period_revenue END) = 0
|
||||
AND MAX(CASE WHEN sp.period_type = 'current' THEN sp.period_revenue END) > 0
|
||||
THEN 100.0
|
||||
WHEN MAX(CASE WHEN sp.period_type = 'previous' THEN sp.period_revenue END) = 0
|
||||
THEN 0.0
|
||||
ELSE GREATEST(
|
||||
-100.0,
|
||||
LEAST(
|
||||
((MAX(CASE WHEN sp.period_type = 'current' THEN sp.period_revenue END) -
|
||||
MAX(CASE WHEN sp.period_type = 'previous' THEN sp.period_revenue END)) /
|
||||
NULLIF(ABS(MAX(CASE WHEN sp.period_type = 'previous' THEN sp.period_revenue END)), 0)) * 100.0,
|
||||
999.99
|
||||
)
|
||||
)
|
||||
END as growth_rate
|
||||
FROM brand_data bd
|
||||
LEFT JOIN sales_periods sp ON bd.brand = sp.brand
|
||||
GROUP BY bd.brand, bd.product_count, bd.active_products, bd.total_stock_units,
|
||||
bd.total_stock_cost, bd.total_stock_retail, bd.total_revenue, bd.avg_margin
|
||||
WHEN COALESCE(ss.previous_period_sales, 0) = 0 AND COALESCE(ss.current_period_sales, 0) > 0 THEN 100
|
||||
WHEN COALESCE(ss.previous_period_sales, 0) = 0 THEN 0
|
||||
ELSE ROUND(LEAST(999.99, GREATEST(-100,
|
||||
((ss.current_period_sales / NULLIF(ss.previous_period_sales, 0)) - 1) * 100
|
||||
)), 2)
|
||||
END as growth_rate,
|
||||
NOW() as last_calculated_at
|
||||
FROM temp_product_stats ps
|
||||
LEFT JOIN temp_sales_stats ss ON ps.brand = ss.brand
|
||||
ON DUPLICATE KEY UPDATE
|
||||
product_count = VALUES(product_count),
|
||||
active_products = VALUES(active_products),
|
||||
@@ -174,118 +235,32 @@ async function calculateBrandMetrics(startTime, totalProducts, processedCount =
|
||||
total_revenue = VALUES(total_revenue),
|
||||
avg_margin = VALUES(avg_margin),
|
||||
growth_rate = VALUES(growth_rate),
|
||||
last_calculated_at = CURRENT_TIMESTAMP
|
||||
last_calculated_at = NOW()
|
||||
`);
|
||||
|
||||
processedCount = Math.floor(totalProducts * 0.97);
|
||||
// Clean up temp tables
|
||||
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_product_stats');
|
||||
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_sales_stats');
|
||||
|
||||
lastBrand = batch[batch.length - 1].brand;
|
||||
processedCount += batch.length;
|
||||
|
||||
outputProgress({
|
||||
status: 'running',
|
||||
operation: 'Brand metrics calculated, starting time-based metrics',
|
||||
operation: 'Processing brand metrics batch',
|
||||
current: processedCount,
|
||||
total: totalProducts,
|
||||
total: totalBrands,
|
||||
elapsed: formatElapsedTime(startTime),
|
||||
remaining: estimateRemaining(startTime, processedCount, totalProducts),
|
||||
remaining: estimateRemaining(startTime, processedCount, totalBrands),
|
||||
rate: calculateRate(startTime, processedCount),
|
||||
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
||||
percentage: ((processedCount / totalBrands) * 100).toFixed(1),
|
||||
timing: {
|
||||
start_time: new Date(startTime).toISOString(),
|
||||
end_time: new Date().toISOString(),
|
||||
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
||||
}
|
||||
});
|
||||
|
||||
if (isCancelled) return {
|
||||
processedProducts: processedCount,
|
||||
processedOrders,
|
||||
processedPurchaseOrders: 0,
|
||||
success
|
||||
};
|
||||
|
||||
// Calculate brand time-based metrics with optimized query
|
||||
await connection.query(`
|
||||
INSERT INTO brand_time_metrics (
|
||||
brand,
|
||||
year,
|
||||
month,
|
||||
product_count,
|
||||
active_products,
|
||||
total_stock_units,
|
||||
total_stock_cost,
|
||||
total_stock_retail,
|
||||
total_revenue,
|
||||
avg_margin
|
||||
)
|
||||
WITH filtered_products AS (
|
||||
SELECT
|
||||
p.*,
|
||||
CASE WHEN p.stock_quantity <= 5000 THEN p.pid END as valid_pid,
|
||||
CASE WHEN p.visible = true AND p.stock_quantity <= 5000 THEN p.pid END as active_pid,
|
||||
CASE
|
||||
WHEN p.stock_quantity IS NULL OR p.stock_quantity < 0 OR p.stock_quantity > 5000 THEN 0
|
||||
ELSE p.stock_quantity
|
||||
END as valid_stock
|
||||
FROM products p
|
||||
WHERE p.brand IS NOT NULL
|
||||
),
|
||||
monthly_metrics AS (
|
||||
SELECT
|
||||
p.brand,
|
||||
YEAR(o.date) as year,
|
||||
MONTH(o.date) as month,
|
||||
COUNT(DISTINCT p.valid_pid) as product_count,
|
||||
COUNT(DISTINCT p.active_pid) as active_products,
|
||||
SUM(p.valid_stock) as total_stock_units,
|
||||
SUM(p.valid_stock * p.cost_price) as total_stock_cost,
|
||||
SUM(p.valid_stock * p.price) as total_stock_retail,
|
||||
SUM(o.quantity * o.price) as total_revenue,
|
||||
CASE
|
||||
WHEN SUM(o.quantity * o.price) > 0
|
||||
THEN GREATEST(
|
||||
-100.0,
|
||||
LEAST(
|
||||
100.0,
|
||||
(
|
||||
SUM(o.quantity * o.price) - -- Use gross revenue (before discounts)
|
||||
SUM(o.quantity * COALESCE(p.cost_price, 0)) -- Total costs
|
||||
) * 100.0 /
|
||||
NULLIF(SUM(o.quantity * o.price), 0) -- Divide by gross revenue
|
||||
)
|
||||
)
|
||||
ELSE 0
|
||||
END as avg_margin
|
||||
FROM filtered_products p
|
||||
LEFT JOIN orders o ON p.pid = o.pid AND o.canceled = false
|
||||
WHERE o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH)
|
||||
GROUP BY p.brand, YEAR(o.date), MONTH(o.date)
|
||||
)
|
||||
SELECT *
|
||||
FROM monthly_metrics
|
||||
ON DUPLICATE KEY UPDATE
|
||||
product_count = VALUES(product_count),
|
||||
active_products = VALUES(active_products),
|
||||
total_stock_units = VALUES(total_stock_units),
|
||||
total_stock_cost = VALUES(total_stock_cost),
|
||||
total_stock_retail = VALUES(total_stock_retail),
|
||||
total_revenue = VALUES(total_revenue),
|
||||
avg_margin = VALUES(avg_margin)
|
||||
`);
|
||||
|
||||
processedCount = Math.floor(totalProducts * 0.99);
|
||||
outputProgress({
|
||||
status: 'running',
|
||||
operation: 'Brand time-based metrics calculated',
|
||||
current: processedCount,
|
||||
total: totalProducts,
|
||||
elapsed: formatElapsedTime(startTime),
|
||||
remaining: estimateRemaining(startTime, processedCount, totalProducts),
|
||||
rate: calculateRate(startTime, processedCount),
|
||||
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
||||
timing: {
|
||||
start_time: new Date(startTime).toISOString(),
|
||||
end_time: new Date().toISOString(),
|
||||
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
||||
}
|
||||
});
|
||||
|
||||
// If we get here, everything completed successfully
|
||||
success = true;
|
||||
@@ -299,7 +274,7 @@ async function calculateBrandMetrics(startTime, totalProducts, processedCount =
|
||||
|
||||
return {
|
||||
processedProducts: processedCount,
|
||||
processedOrders,
|
||||
processedOrders: 0,
|
||||
processedPurchaseOrders: 0,
|
||||
success
|
||||
};
|
||||
|
||||
@@ -4,19 +4,52 @@ const { getConnection } = require('./utils/db');
|
||||
async function calculateCategoryMetrics(startTime, totalProducts, processedCount = 0, isCancelled = false) {
|
||||
const connection = await getConnection();
|
||||
let success = false;
|
||||
let processedOrders = 0;
|
||||
const BATCH_SIZE = 5000;
|
||||
|
||||
try {
|
||||
// Get last calculation timestamp
|
||||
const [lastCalc] = await connection.query(`
|
||||
SELECT last_calculation_timestamp
|
||||
FROM calculate_status
|
||||
WHERE module_name = 'category_metrics'
|
||||
`);
|
||||
const lastCalculationTime = lastCalc[0]?.last_calculation_timestamp || '1970-01-01';
|
||||
|
||||
// Get total count of categories needing updates
|
||||
const [categoryCount] = await connection.query(`
|
||||
SELECT COUNT(DISTINCT c.cat_id) as count
|
||||
FROM categories c
|
||||
JOIN product_categories pc ON c.cat_id = pc.cat_id
|
||||
LEFT JOIN products p ON pc.pid = p.pid AND p.updated > ?
|
||||
LEFT JOIN orders o ON p.pid = o.pid AND o.updated > ?
|
||||
WHERE c.status = 'active'
|
||||
AND (
|
||||
p.pid IS NOT NULL
|
||||
OR o.id IS NOT NULL
|
||||
)
|
||||
`, [lastCalculationTime, lastCalculationTime]);
|
||||
const totalCategories = categoryCount[0].count;
|
||||
|
||||
if (totalCategories === 0) {
|
||||
console.log('No categories need metric updates');
|
||||
return {
|
||||
processedProducts: 0,
|
||||
processedOrders: 0,
|
||||
processedPurchaseOrders: 0,
|
||||
success: true
|
||||
};
|
||||
}
|
||||
|
||||
if (isCancelled) {
|
||||
outputProgress({
|
||||
status: 'cancelled',
|
||||
operation: 'Category metrics calculation cancelled',
|
||||
current: processedCount,
|
||||
total: totalProducts,
|
||||
total: totalCategories,
|
||||
elapsed: formatElapsedTime(startTime),
|
||||
remaining: null,
|
||||
rate: calculateRate(startTime, processedCount),
|
||||
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
||||
percentage: ((processedCount / totalCategories) * 100).toFixed(1),
|
||||
timing: {
|
||||
start_time: new Date(startTime).toISOString(),
|
||||
end_time: new Date().toISOString(),
|
||||
@@ -31,23 +64,15 @@ async function calculateCategoryMetrics(startTime, totalProducts, processedCount
|
||||
};
|
||||
}
|
||||
|
||||
// Get order count that will be processed
|
||||
const [orderCount] = await connection.query(`
|
||||
SELECT COUNT(*) as count
|
||||
FROM orders o
|
||||
WHERE o.canceled = false
|
||||
`);
|
||||
processedOrders = orderCount[0].count;
|
||||
|
||||
outputProgress({
|
||||
status: 'running',
|
||||
operation: 'Starting category metrics calculation',
|
||||
current: processedCount,
|
||||
total: totalProducts,
|
||||
total: totalCategories,
|
||||
elapsed: formatElapsedTime(startTime),
|
||||
remaining: estimateRemaining(startTime, processedCount, totalProducts),
|
||||
remaining: estimateRemaining(startTime, processedCount, totalCategories),
|
||||
rate: calculateRate(startTime, processedCount),
|
||||
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
||||
percentage: ((processedCount / totalCategories) * 100).toFixed(1),
|
||||
timing: {
|
||||
start_time: new Date(startTime).toISOString(),
|
||||
end_time: new Date().toISOString(),
|
||||
@@ -55,441 +80,191 @@ async function calculateCategoryMetrics(startTime, totalProducts, processedCount
|
||||
}
|
||||
});
|
||||
|
||||
// First, calculate base category metrics
|
||||
// Process in batches
|
||||
let lastCatId = 0;
|
||||
while (true) {
|
||||
if (isCancelled) break;
|
||||
|
||||
const [batch] = await connection.query(`
|
||||
SELECT DISTINCT c.cat_id
|
||||
FROM categories c
|
||||
FORCE INDEX (PRIMARY)
|
||||
JOIN product_categories pc FORCE INDEX (idx_category) ON c.cat_id = pc.cat_id
|
||||
LEFT JOIN products p FORCE INDEX (PRIMARY) ON pc.pid = p.pid AND p.updated > ?
|
||||
LEFT JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid AND o.updated > ?
|
||||
WHERE c.status = 'active'
|
||||
AND c.cat_id > ?
|
||||
AND (
|
||||
p.pid IS NOT NULL
|
||||
OR o.id IS NOT NULL
|
||||
)
|
||||
ORDER BY c.cat_id
|
||||
LIMIT ?
|
||||
`, [lastCalculationTime, lastCalculationTime, lastCatId, BATCH_SIZE]);
|
||||
|
||||
if (batch.length === 0) break;
|
||||
|
||||
// Create temporary tables for better performance
|
||||
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_product_stats');
|
||||
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_sales_stats');
|
||||
|
||||
await connection.query(`
|
||||
CREATE TEMPORARY TABLE temp_product_stats (
|
||||
cat_id BIGINT NOT NULL,
|
||||
product_count INT,
|
||||
active_products INT,
|
||||
total_value DECIMAL(15,2),
|
||||
avg_margin DECIMAL(5,2),
|
||||
turnover_rate DECIMAL(10,2),
|
||||
PRIMARY KEY (cat_id),
|
||||
INDEX (product_count),
|
||||
INDEX (total_value)
|
||||
) ENGINE=MEMORY
|
||||
`);
|
||||
|
||||
await connection.query(`
|
||||
CREATE TEMPORARY TABLE temp_sales_stats (
|
||||
cat_id BIGINT NOT NULL,
|
||||
recent_revenue DECIMAL(15,2),
|
||||
previous_revenue DECIMAL(15,2),
|
||||
PRIMARY KEY (cat_id),
|
||||
INDEX (recent_revenue),
|
||||
INDEX (previous_revenue)
|
||||
) ENGINE=MEMORY
|
||||
`);
|
||||
|
||||
// Populate product stats with optimized index usage
|
||||
await connection.query(`
|
||||
INSERT INTO temp_product_stats
|
||||
SELECT
|
||||
c.cat_id,
|
||||
COUNT(DISTINCT p.pid) as product_count,
|
||||
COUNT(DISTINCT CASE WHEN p.visible = true THEN p.pid END) as active_products,
|
||||
COALESCE(SUM(p.stock_quantity * p.cost_price), 0) as total_value,
|
||||
COALESCE(AVG(NULLIF(pm.avg_margin_percent, 0)), 0) as avg_margin,
|
||||
COALESCE(AVG(NULLIF(pm.turnover_rate, 0)), 0) as turnover_rate
|
||||
FROM categories c
|
||||
FORCE INDEX (PRIMARY)
|
||||
INNER JOIN product_categories pc FORCE INDEX (idx_category) ON c.cat_id = pc.cat_id
|
||||
LEFT JOIN products p FORCE INDEX (PRIMARY) ON pc.pid = p.pid
|
||||
LEFT JOIN product_metrics pm FORCE INDEX (PRIMARY) ON p.pid = pm.pid
|
||||
WHERE c.cat_id IN (?)
|
||||
AND (
|
||||
p.updated > ?
|
||||
OR EXISTS (
|
||||
SELECT 1 FROM orders o FORCE INDEX (idx_orders_metrics)
|
||||
WHERE o.pid = p.pid
|
||||
AND o.updated > ?
|
||||
)
|
||||
)
|
||||
GROUP BY c.cat_id
|
||||
`, [batch.map(row => row.cat_id), lastCalculationTime, lastCalculationTime]);
|
||||
|
||||
// Populate sales stats with optimized date handling
|
||||
await connection.query(`
|
||||
INSERT INTO temp_sales_stats
|
||||
WITH date_ranges AS (
|
||||
SELECT
|
||||
DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY) as current_start,
|
||||
CURRENT_DATE as current_end,
|
||||
DATE_SUB(CURRENT_DATE, INTERVAL 60 DAY) as previous_start,
|
||||
DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY) as previous_end
|
||||
)
|
||||
SELECT
|
||||
c.cat_id,
|
||||
COALESCE(SUM(
|
||||
CASE WHEN o.date >= dr.current_start
|
||||
THEN o.quantity * o.price
|
||||
ELSE 0
|
||||
END
|
||||
), 0) as recent_revenue,
|
||||
COALESCE(SUM(
|
||||
CASE WHEN o.date >= dr.previous_start AND o.date < dr.current_start
|
||||
THEN o.quantity * o.price
|
||||
ELSE 0
|
||||
END
|
||||
), 0) as previous_revenue
|
||||
FROM categories c
|
||||
FORCE INDEX (PRIMARY)
|
||||
INNER JOIN product_categories pc FORCE INDEX (idx_category) ON c.cat_id = pc.cat_id
|
||||
INNER JOIN products p FORCE INDEX (PRIMARY) ON pc.pid = p.pid
|
||||
INNER JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid
|
||||
CROSS JOIN date_ranges dr
|
||||
WHERE c.cat_id IN (?)
|
||||
AND o.canceled = false
|
||||
AND o.date >= dr.previous_start
|
||||
AND o.updated > ?
|
||||
GROUP BY c.cat_id
|
||||
`, [batch.map(row => row.cat_id), lastCalculationTime]);
|
||||
|
||||
// Update metrics using temp tables with optimized calculations
|
||||
await connection.query(`
|
||||
INSERT INTO category_metrics (
|
||||
category_id,
|
||||
product_count,
|
||||
active_products,
|
||||
total_value,
|
||||
avg_margin,
|
||||
turnover_rate,
|
||||
growth_rate,
|
||||
status,
|
||||
last_calculated_at
|
||||
)
|
||||
SELECT
|
||||
c.cat_id,
|
||||
COUNT(DISTINCT p.pid) as product_count,
|
||||
COUNT(DISTINCT CASE WHEN p.visible = true THEN p.pid END) as active_products,
|
||||
COALESCE(SUM(p.stock_quantity * p.cost_price), 0) as total_value,
|
||||
ps.product_count,
|
||||
ps.active_products,
|
||||
ps.total_value,
|
||||
ps.avg_margin,
|
||||
ps.turnover_rate,
|
||||
CASE
|
||||
WHEN COALESCE(ss.previous_revenue, 0) = 0 AND COALESCE(ss.recent_revenue, 0) > 0 THEN 100
|
||||
WHEN COALESCE(ss.previous_revenue, 0) = 0 THEN 0
|
||||
ELSE ROUND(LEAST(999.99, GREATEST(-100,
|
||||
((ss.recent_revenue / NULLIF(ss.previous_revenue, 0)) - 1) * 100
|
||||
)), 2)
|
||||
END as growth_rate,
|
||||
c.status,
|
||||
NOW() as last_calculated_at
|
||||
FROM categories c
|
||||
LEFT JOIN product_categories pc ON c.cat_id = pc.cat_id
|
||||
LEFT JOIN products p ON pc.pid = p.pid
|
||||
GROUP BY c.cat_id, c.status
|
||||
FORCE INDEX (PRIMARY)
|
||||
LEFT JOIN temp_product_stats ps ON c.cat_id = ps.cat_id
|
||||
LEFT JOIN temp_sales_stats ss ON c.cat_id = ss.cat_id
|
||||
WHERE c.cat_id IN (?)
|
||||
ON DUPLICATE KEY UPDATE
|
||||
product_count = VALUES(product_count),
|
||||
active_products = VALUES(active_products),
|
||||
total_value = VALUES(total_value),
|
||||
status = VALUES(status),
|
||||
last_calculated_at = VALUES(last_calculated_at)
|
||||
`);
|
||||
|
||||
processedCount = Math.floor(totalProducts * 0.90);
|
||||
outputProgress({
|
||||
status: 'running',
|
||||
operation: 'Base category metrics calculated, updating with margin data',
|
||||
current: processedCount,
|
||||
total: totalProducts,
|
||||
elapsed: formatElapsedTime(startTime),
|
||||
remaining: estimateRemaining(startTime, processedCount, totalProducts),
|
||||
rate: calculateRate(startTime, processedCount),
|
||||
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
||||
timing: {
|
||||
start_time: new Date(startTime).toISOString(),
|
||||
end_time: new Date().toISOString(),
|
||||
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
||||
}
|
||||
});
|
||||
|
||||
if (isCancelled) return {
|
||||
processedProducts: processedCount,
|
||||
processedOrders,
|
||||
processedPurchaseOrders: 0,
|
||||
success
|
||||
};
|
||||
|
||||
// Then update with margin and turnover data
|
||||
await connection.query(`
|
||||
WITH category_sales AS (
|
||||
SELECT
|
||||
pc.cat_id,
|
||||
SUM(o.quantity * o.price) as total_sales,
|
||||
SUM(o.quantity * (o.price - p.cost_price)) as total_margin,
|
||||
SUM(o.quantity) as units_sold,
|
||||
AVG(GREATEST(p.stock_quantity, 0)) as avg_stock,
|
||||
COUNT(DISTINCT DATE(o.date)) as active_days
|
||||
FROM product_categories pc
|
||||
JOIN products p ON pc.pid = p.pid
|
||||
JOIN orders o ON p.pid = o.pid
|
||||
LEFT JOIN turnover_config tc ON
|
||||
(tc.category_id = pc.cat_id AND tc.vendor = p.vendor) OR
|
||||
(tc.category_id = pc.cat_id AND tc.vendor IS NULL) OR
|
||||
(tc.category_id IS NULL AND tc.vendor = p.vendor) OR
|
||||
(tc.category_id IS NULL AND tc.vendor IS NULL)
|
||||
WHERE o.canceled = false
|
||||
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL COALESCE(tc.calculation_period_days, 30) DAY)
|
||||
GROUP BY pc.cat_id
|
||||
)
|
||||
UPDATE category_metrics cm
|
||||
JOIN category_sales cs ON cm.category_id = cs.cat_id
|
||||
LEFT JOIN turnover_config tc ON
|
||||
(tc.category_id = cm.category_id AND tc.vendor IS NULL) OR
|
||||
(tc.category_id IS NULL AND tc.vendor IS NULL)
|
||||
SET
|
||||
cm.avg_margin = COALESCE(cs.total_margin * 100.0 / NULLIF(cs.total_sales, 0), 0),
|
||||
cm.turnover_rate = CASE
|
||||
WHEN cs.avg_stock > 0 AND cs.active_days > 0
|
||||
THEN LEAST(
|
||||
(cs.units_sold / cs.avg_stock) * (365.0 / cs.active_days),
|
||||
999.99
|
||||
)
|
||||
ELSE 0
|
||||
END,
|
||||
cm.last_calculated_at = NOW()
|
||||
`);
|
||||
|
||||
processedCount = Math.floor(totalProducts * 0.95);
|
||||
outputProgress({
|
||||
status: 'running',
|
||||
operation: 'Margin data updated, calculating growth rates',
|
||||
current: processedCount,
|
||||
total: totalProducts,
|
||||
elapsed: formatElapsedTime(startTime),
|
||||
remaining: estimateRemaining(startTime, processedCount, totalProducts),
|
||||
rate: calculateRate(startTime, processedCount),
|
||||
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
||||
timing: {
|
||||
start_time: new Date(startTime).toISOString(),
|
||||
end_time: new Date().toISOString(),
|
||||
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
||||
}
|
||||
});
|
||||
|
||||
if (isCancelled) return {
|
||||
processedProducts: processedCount,
|
||||
processedOrders,
|
||||
processedPurchaseOrders: 0,
|
||||
success
|
||||
};
|
||||
|
||||
// Finally update growth rates
|
||||
await connection.query(`
|
||||
WITH current_period AS (
|
||||
SELECT
|
||||
pc.cat_id,
|
||||
SUM(o.quantity * (o.price - COALESCE(o.discount, 0)) /
|
||||
(1 + COALESCE(ss.seasonality_factor, 0))) as revenue,
|
||||
SUM(o.quantity * (o.price - COALESCE(o.discount, 0) - p.cost_price)) as gross_profit,
|
||||
COUNT(DISTINCT DATE(o.date)) as days
|
||||
FROM product_categories pc
|
||||
JOIN products p ON pc.pid = p.pid
|
||||
JOIN orders o ON p.pid = o.pid
|
||||
LEFT JOIN sales_seasonality ss ON MONTH(o.date) = ss.month
|
||||
WHERE o.canceled = false
|
||||
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 3 MONTH)
|
||||
GROUP BY pc.cat_id
|
||||
),
|
||||
previous_period AS (
|
||||
SELECT
|
||||
pc.cat_id,
|
||||
SUM(o.quantity * (o.price - COALESCE(o.discount, 0)) /
|
||||
(1 + COALESCE(ss.seasonality_factor, 0))) as revenue,
|
||||
COUNT(DISTINCT DATE(o.date)) as days
|
||||
FROM product_categories pc
|
||||
JOIN products p ON pc.pid = p.pid
|
||||
JOIN orders o ON p.pid = o.pid
|
||||
LEFT JOIN sales_seasonality ss ON MONTH(o.date) = ss.month
|
||||
WHERE o.canceled = false
|
||||
AND o.date BETWEEN DATE_SUB(CURRENT_DATE, INTERVAL 15 MONTH)
|
||||
AND DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH)
|
||||
GROUP BY pc.cat_id
|
||||
),
|
||||
trend_data AS (
|
||||
SELECT
|
||||
pc.cat_id,
|
||||
MONTH(o.date) as month,
|
||||
SUM(o.quantity * (o.price - COALESCE(o.discount, 0)) /
|
||||
(1 + COALESCE(ss.seasonality_factor, 0))) as revenue,
|
||||
COUNT(DISTINCT DATE(o.date)) as days_in_month
|
||||
FROM product_categories pc
|
||||
JOIN products p ON pc.pid = p.pid
|
||||
JOIN orders o ON p.pid = o.pid
|
||||
LEFT JOIN sales_seasonality ss ON MONTH(o.date) = ss.month
|
||||
WHERE o.canceled = false
|
||||
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 15 MONTH)
|
||||
GROUP BY pc.cat_id, MONTH(o.date)
|
||||
),
|
||||
trend_stats AS (
|
||||
SELECT
|
||||
cat_id,
|
||||
COUNT(*) as n,
|
||||
AVG(month) as avg_x,
|
||||
AVG(revenue / NULLIF(days_in_month, 0)) as avg_y,
|
||||
SUM(month * (revenue / NULLIF(days_in_month, 0))) as sum_xy,
|
||||
SUM(month * month) as sum_xx
|
||||
FROM trend_data
|
||||
GROUP BY cat_id
|
||||
HAVING COUNT(*) >= 6
|
||||
),
|
||||
trend_analysis AS (
|
||||
SELECT
|
||||
cat_id,
|
||||
((n * sum_xy) - (avg_x * n * avg_y)) /
|
||||
NULLIF((n * sum_xx) - (n * avg_x * avg_x), 0) as trend_slope,
|
||||
avg_y as avg_daily_revenue
|
||||
FROM trend_stats
|
||||
),
|
||||
margin_calc AS (
|
||||
SELECT
|
||||
pc.cat_id,
|
||||
CASE
|
||||
WHEN SUM(o.quantity * o.price) > 0 THEN
|
||||
GREATEST(
|
||||
-100.0,
|
||||
LEAST(
|
||||
100.0,
|
||||
(
|
||||
SUM(o.quantity * o.price) - -- Use gross revenue (before discounts)
|
||||
SUM(o.quantity * COALESCE(p.cost_price, 0)) -- Total costs
|
||||
) * 100.0 /
|
||||
NULLIF(SUM(o.quantity * o.price), 0) -- Divide by gross revenue
|
||||
)
|
||||
)
|
||||
ELSE NULL
|
||||
END as avg_margin
|
||||
FROM product_categories pc
|
||||
JOIN products p ON pc.pid = p.pid
|
||||
JOIN orders o ON p.pid = o.pid
|
||||
WHERE o.canceled = false
|
||||
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 3 MONTH)
|
||||
GROUP BY pc.cat_id
|
||||
)
|
||||
UPDATE category_metrics cm
|
||||
LEFT JOIN current_period cp ON cm.category_id = cp.cat_id
|
||||
LEFT JOIN previous_period pp ON cm.category_id = pp.cat_id
|
||||
LEFT JOIN trend_analysis ta ON cm.category_id = ta.cat_id
|
||||
LEFT JOIN margin_calc mc ON cm.category_id = mc.cat_id
|
||||
SET
|
||||
cm.growth_rate = CASE
|
||||
WHEN pp.revenue = 0 AND COALESCE(cp.revenue, 0) > 0 THEN 100.0
|
||||
WHEN pp.revenue = 0 OR cp.revenue IS NULL THEN 0.0
|
||||
WHEN ta.trend_slope IS NOT NULL THEN
|
||||
GREATEST(
|
||||
-100.0,
|
||||
LEAST(
|
||||
(ta.trend_slope / NULLIF(ta.avg_daily_revenue, 0)) * 365 * 100,
|
||||
999.99
|
||||
)
|
||||
)
|
||||
ELSE
|
||||
GREATEST(
|
||||
-100.0,
|
||||
LEAST(
|
||||
((COALESCE(cp.revenue, 0) - pp.revenue) /
|
||||
NULLIF(ABS(pp.revenue), 0)) * 100.0,
|
||||
999.99
|
||||
)
|
||||
)
|
||||
END,
|
||||
cm.avg_margin = COALESCE(mc.avg_margin, cm.avg_margin),
|
||||
cm.last_calculated_at = NOW()
|
||||
WHERE cp.cat_id IS NOT NULL OR pp.cat_id IS NOT NULL
|
||||
`);
|
||||
|
||||
processedCount = Math.floor(totalProducts * 0.97);
|
||||
outputProgress({
|
||||
status: 'running',
|
||||
operation: 'Growth rates calculated, updating time-based metrics',
|
||||
current: processedCount,
|
||||
total: totalProducts,
|
||||
elapsed: formatElapsedTime(startTime),
|
||||
remaining: estimateRemaining(startTime, processedCount, totalProducts),
|
||||
rate: calculateRate(startTime, processedCount),
|
||||
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
||||
timing: {
|
||||
start_time: new Date(startTime).toISOString(),
|
||||
end_time: new Date().toISOString(),
|
||||
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
||||
}
|
||||
});
|
||||
|
||||
if (isCancelled) return {
|
||||
processedProducts: processedCount,
|
||||
processedOrders,
|
||||
processedPurchaseOrders: 0,
|
||||
success
|
||||
};
|
||||
|
||||
// Calculate time-based metrics
|
||||
await connection.query(`
|
||||
INSERT INTO category_time_metrics (
|
||||
category_id,
|
||||
year,
|
||||
month,
|
||||
product_count,
|
||||
active_products,
|
||||
total_value,
|
||||
total_revenue,
|
||||
avg_margin,
|
||||
turnover_rate
|
||||
)
|
||||
SELECT
|
||||
pc.cat_id,
|
||||
YEAR(o.date) as year,
|
||||
MONTH(o.date) as month,
|
||||
COUNT(DISTINCT p.pid) as product_count,
|
||||
COUNT(DISTINCT CASE WHEN p.visible = true THEN p.pid END) as active_products,
|
||||
SUM(p.stock_quantity * p.cost_price) as total_value,
|
||||
SUM(o.quantity * o.price) as total_revenue,
|
||||
CASE
|
||||
WHEN SUM(o.quantity * o.price) > 0 THEN
|
||||
LEAST(
|
||||
GREATEST(
|
||||
SUM(o.quantity * (o.price - GREATEST(p.cost_price, 0))) * 100.0 /
|
||||
SUM(o.quantity * o.price),
|
||||
-100
|
||||
),
|
||||
100
|
||||
)
|
||||
ELSE 0
|
||||
END as avg_margin,
|
||||
COALESCE(
|
||||
LEAST(
|
||||
SUM(o.quantity) / NULLIF(AVG(GREATEST(p.stock_quantity, 0)), 0),
|
||||
999.99
|
||||
),
|
||||
0
|
||||
) as turnover_rate
|
||||
FROM product_categories pc
|
||||
JOIN products p ON pc.pid = p.pid
|
||||
JOIN orders o ON p.pid = o.pid
|
||||
WHERE o.canceled = false
|
||||
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH)
|
||||
GROUP BY pc.cat_id, YEAR(o.date), MONTH(o.date)
|
||||
ON DUPLICATE KEY UPDATE
|
||||
product_count = VALUES(product_count),
|
||||
active_products = VALUES(active_products),
|
||||
total_value = VALUES(total_value),
|
||||
total_revenue = VALUES(total_revenue),
|
||||
avg_margin = VALUES(avg_margin),
|
||||
turnover_rate = VALUES(turnover_rate)
|
||||
`);
|
||||
turnover_rate = VALUES(turnover_rate),
|
||||
growth_rate = VALUES(growth_rate),
|
||||
status = VALUES(status),
|
||||
last_calculated_at = NOW()
|
||||
`, [batch.map(row => row.cat_id)]);
|
||||
|
||||
// Clean up temp tables
|
||||
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_product_stats');
|
||||
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_sales_stats');
|
||||
|
||||
lastCatId = batch[batch.length - 1].cat_id;
|
||||
processedCount += batch.length;
|
||||
|
||||
processedCount = Math.floor(totalProducts * 0.99);
|
||||
outputProgress({
|
||||
status: 'running',
|
||||
operation: 'Time-based metrics calculated, updating category-sales metrics',
|
||||
operation: 'Processing category metrics batch',
|
||||
current: processedCount,
|
||||
total: totalProducts,
|
||||
total: totalCategories,
|
||||
elapsed: formatElapsedTime(startTime),
|
||||
remaining: estimateRemaining(startTime, processedCount, totalProducts),
|
||||
remaining: estimateRemaining(startTime, processedCount, totalCategories),
|
||||
rate: calculateRate(startTime, processedCount),
|
||||
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
||||
percentage: ((processedCount / totalCategories) * 100).toFixed(1),
|
||||
timing: {
|
||||
start_time: new Date(startTime).toISOString(),
|
||||
end_time: new Date().toISOString(),
|
||||
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
||||
}
|
||||
});
|
||||
|
||||
if (isCancelled) return {
|
||||
processedProducts: processedCount,
|
||||
processedOrders,
|
||||
processedPurchaseOrders: 0,
|
||||
success
|
||||
};
|
||||
|
||||
// Calculate category-sales metrics
|
||||
await connection.query(`
|
||||
INSERT INTO category_sales_metrics (
|
||||
category_id,
|
||||
brand,
|
||||
period_start,
|
||||
period_end,
|
||||
avg_daily_sales,
|
||||
total_sold,
|
||||
num_products,
|
||||
avg_price,
|
||||
last_calculated_at
|
||||
)
|
||||
WITH date_ranges AS (
|
||||
SELECT
|
||||
DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY) as period_start,
|
||||
CURRENT_DATE as period_end
|
||||
UNION ALL
|
||||
SELECT
|
||||
DATE_SUB(CURRENT_DATE, INTERVAL 90 DAY),
|
||||
DATE_SUB(CURRENT_DATE, INTERVAL 31 DAY)
|
||||
UNION ALL
|
||||
SELECT
|
||||
DATE_SUB(CURRENT_DATE, INTERVAL 180 DAY),
|
||||
DATE_SUB(CURRENT_DATE, INTERVAL 91 DAY)
|
||||
UNION ALL
|
||||
SELECT
|
||||
DATE_SUB(CURRENT_DATE, INTERVAL 365 DAY),
|
||||
DATE_SUB(CURRENT_DATE, INTERVAL 181 DAY)
|
||||
),
|
||||
sales_data AS (
|
||||
SELECT
|
||||
pc.cat_id,
|
||||
COALESCE(p.brand, 'Unknown') as brand,
|
||||
dr.period_start,
|
||||
dr.period_end,
|
||||
COUNT(DISTINCT p.pid) as num_products,
|
||||
SUM(o.quantity) as total_sold,
|
||||
SUM(o.quantity * o.price) as total_revenue,
|
||||
COUNT(DISTINCT DATE(o.date)) as num_days
|
||||
FROM products p
|
||||
JOIN product_categories pc ON p.pid = pc.pid
|
||||
JOIN orders o ON p.pid = o.pid
|
||||
CROSS JOIN date_ranges dr
|
||||
WHERE o.canceled = false
|
||||
AND o.date BETWEEN dr.period_start AND dr.period_end
|
||||
GROUP BY pc.cat_id, p.brand, dr.period_start, dr.period_end
|
||||
)
|
||||
SELECT
|
||||
cat_id as category_id,
|
||||
brand,
|
||||
period_start,
|
||||
period_end,
|
||||
CASE
|
||||
WHEN num_days > 0
|
||||
THEN total_sold / num_days
|
||||
ELSE 0
|
||||
END as avg_daily_sales,
|
||||
total_sold,
|
||||
num_products,
|
||||
CASE
|
||||
WHEN total_sold > 0
|
||||
THEN total_revenue / total_sold
|
||||
ELSE 0
|
||||
END as avg_price,
|
||||
NOW() as last_calculated_at
|
||||
FROM sales_data
|
||||
ON DUPLICATE KEY UPDATE
|
||||
avg_daily_sales = VALUES(avg_daily_sales),
|
||||
total_sold = VALUES(total_sold),
|
||||
num_products = VALUES(num_products),
|
||||
avg_price = VALUES(avg_price),
|
||||
last_calculated_at = VALUES(last_calculated_at)
|
||||
`);
|
||||
|
||||
processedCount = Math.floor(totalProducts * 1.0);
|
||||
outputProgress({
|
||||
status: 'running',
|
||||
operation: 'Category-sales metrics calculated',
|
||||
current: processedCount,
|
||||
total: totalProducts,
|
||||
elapsed: formatElapsedTime(startTime),
|
||||
remaining: estimateRemaining(startTime, processedCount, totalProducts),
|
||||
rate: calculateRate(startTime, processedCount),
|
||||
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
||||
timing: {
|
||||
start_time: new Date(startTime).toISOString(),
|
||||
end_time: new Date().toISOString(),
|
||||
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
||||
}
|
||||
});
|
||||
|
||||
// If we get here, everything completed successfully
|
||||
success = true;
|
||||
@@ -503,7 +278,7 @@ async function calculateCategoryMetrics(startTime, totalProducts, processedCount
|
||||
|
||||
return {
|
||||
processedProducts: processedCount,
|
||||
processedOrders,
|
||||
processedOrders: 0,
|
||||
processedPurchaseOrders: 0,
|
||||
success
|
||||
};
|
||||
|
||||
@@ -4,9 +4,39 @@ const { getConnection } = require('./utils/db');
|
||||
async function calculateFinancialMetrics(startTime, totalProducts, processedCount = 0, isCancelled = false) {
|
||||
const connection = await getConnection();
|
||||
let success = false;
|
||||
let processedOrders = 0;
|
||||
const BATCH_SIZE = 5000;
|
||||
|
||||
try {
|
||||
// Get last calculation timestamp
|
||||
const [lastCalc] = await connection.query(`
|
||||
SELECT last_calculation_timestamp
|
||||
FROM calculate_status
|
||||
WHERE module_name = 'financial_metrics'
|
||||
`);
|
||||
const lastCalculationTime = lastCalc[0]?.last_calculation_timestamp || '1970-01-01';
|
||||
|
||||
// Get total count of products needing updates
|
||||
if (!totalProducts) {
|
||||
const [productCount] = await connection.query(`
|
||||
SELECT COUNT(DISTINCT p.pid) as count
|
||||
FROM products p
|
||||
LEFT JOIN orders o ON p.pid = o.pid AND o.updated > ?
|
||||
WHERE p.updated > ?
|
||||
OR o.pid IS NOT NULL
|
||||
`, [lastCalculationTime, lastCalculationTime]);
|
||||
totalProducts = productCount[0].count;
|
||||
}
|
||||
|
||||
if (totalProducts === 0) {
|
||||
console.log('No products need financial metric updates');
|
||||
return {
|
||||
processedProducts: 0,
|
||||
processedOrders: 0,
|
||||
processedPurchaseOrders: 0,
|
||||
success: true
|
||||
};
|
||||
}
|
||||
|
||||
if (isCancelled) {
|
||||
outputProgress({
|
||||
status: 'cancelled',
|
||||
@@ -31,15 +61,6 @@ async function calculateFinancialMetrics(startTime, totalProducts, processedCoun
|
||||
};
|
||||
}
|
||||
|
||||
// Get order count that will be processed
|
||||
const [orderCount] = await connection.query(`
|
||||
SELECT COUNT(*) as count
|
||||
FROM orders o
|
||||
WHERE o.canceled = false
|
||||
AND DATE(o.date) >= DATE_SUB(CURDATE(), INTERVAL 12 MONTH)
|
||||
`);
|
||||
processedOrders = orderCount[0].count;
|
||||
|
||||
outputProgress({
|
||||
status: 'running',
|
||||
operation: 'Starting financial metrics calculation',
|
||||
@@ -56,44 +77,67 @@ async function calculateFinancialMetrics(startTime, totalProducts, processedCoun
|
||||
}
|
||||
});
|
||||
|
||||
// Calculate financial metrics with optimized query
|
||||
// Process in batches
|
||||
let lastPid = 0;
|
||||
while (true) {
|
||||
if (isCancelled) break;
|
||||
|
||||
const [batch] = await connection.query(`
|
||||
SELECT DISTINCT p.pid
|
||||
FROM products p
|
||||
LEFT JOIN orders o ON p.pid = o.pid
|
||||
WHERE p.pid > ?
|
||||
AND (
|
||||
p.updated > ?
|
||||
OR EXISTS (
|
||||
SELECT 1 FROM orders o2
|
||||
WHERE o2.pid = p.pid
|
||||
AND o2.updated > ?
|
||||
)
|
||||
)
|
||||
ORDER BY p.pid
|
||||
LIMIT ?
|
||||
`, [lastPid, lastCalculationTime, lastCalculationTime, BATCH_SIZE]);
|
||||
|
||||
if (batch.length === 0) break;
|
||||
|
||||
// Update financial metrics for this batch
|
||||
await connection.query(`
|
||||
WITH product_financials AS (
|
||||
UPDATE product_metrics pm
|
||||
JOIN (
|
||||
SELECT
|
||||
p.pid,
|
||||
p.cost_price * p.stock_quantity as inventory_value,
|
||||
SUM(o.quantity * o.price) as total_revenue,
|
||||
SUM(o.quantity * p.cost_price) as cost_of_goods_sold,
|
||||
SUM(o.quantity * (o.price - p.cost_price)) as gross_profit,
|
||||
MIN(o.date) as first_sale_date,
|
||||
MAX(o.date) as last_sale_date,
|
||||
DATEDIFF(MAX(o.date), MIN(o.date)) + 1 as calculation_period_days,
|
||||
COUNT(DISTINCT DATE(o.date)) as active_days
|
||||
FROM products p
|
||||
LEFT JOIN orders o ON p.pid = o.pid
|
||||
WHERE o.canceled = false
|
||||
AND DATE(o.date) >= DATE_SUB(CURDATE(), INTERVAL 12 MONTH)
|
||||
AND o.canceled = false
|
||||
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 90 DAY)
|
||||
WHERE p.pid IN (?)
|
||||
GROUP BY p.pid
|
||||
)
|
||||
UPDATE product_metrics pm
|
||||
JOIN product_financials pf ON pm.pid = pf.pid
|
||||
) fin ON pm.pid = fin.pid
|
||||
SET
|
||||
pm.inventory_value = COALESCE(pf.inventory_value, 0),
|
||||
pm.total_revenue = COALESCE(pf.total_revenue, 0),
|
||||
pm.cost_of_goods_sold = COALESCE(pf.cost_of_goods_sold, 0),
|
||||
pm.gross_profit = COALESCE(pf.gross_profit, 0),
|
||||
pm.inventory_value = COALESCE(fin.inventory_value, 0),
|
||||
pm.total_revenue = COALESCE(fin.total_revenue, 0),
|
||||
pm.cost_of_goods_sold = COALESCE(fin.cost_of_goods_sold, 0),
|
||||
pm.gross_profit = COALESCE(fin.gross_profit, 0),
|
||||
pm.gmroi = CASE
|
||||
WHEN COALESCE(pf.inventory_value, 0) > 0 AND pf.active_days > 0 THEN
|
||||
(COALESCE(pf.gross_profit, 0) * (365.0 / pf.active_days)) / COALESCE(pf.inventory_value, 0)
|
||||
WHEN COALESCE(fin.inventory_value, 0) > 0 AND fin.active_days > 0
|
||||
THEN (COALESCE(fin.gross_profit, 0) * (365.0 / fin.active_days)) / COALESCE(fin.inventory_value, 0)
|
||||
ELSE 0
|
||||
END,
|
||||
pm.last_calculated_at = CURRENT_TIMESTAMP
|
||||
`);
|
||||
pm.last_calculated_at = NOW()
|
||||
`, [batch.map(row => row.pid)]);
|
||||
|
||||
lastPid = batch[batch.length - 1].pid;
|
||||
processedCount += batch.length;
|
||||
|
||||
processedCount = Math.floor(totalProducts * 0.65);
|
||||
outputProgress({
|
||||
status: 'running',
|
||||
operation: 'Base financial metrics calculated, updating time aggregates',
|
||||
operation: 'Processing financial metrics batch',
|
||||
current: processedCount,
|
||||
total: totalProducts,
|
||||
elapsed: formatElapsedTime(startTime),
|
||||
@@ -106,60 +150,7 @@ async function calculateFinancialMetrics(startTime, totalProducts, processedCoun
|
||||
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
||||
}
|
||||
});
|
||||
|
||||
if (isCancelled) return {
|
||||
processedProducts: processedCount,
|
||||
processedOrders,
|
||||
processedPurchaseOrders: 0,
|
||||
success
|
||||
};
|
||||
|
||||
// Update time-based aggregates with optimized query
|
||||
await connection.query(`
|
||||
WITH monthly_financials AS (
|
||||
SELECT
|
||||
p.pid,
|
||||
YEAR(o.date) as year,
|
||||
MONTH(o.date) as month,
|
||||
p.cost_price * p.stock_quantity as inventory_value,
|
||||
SUM(o.quantity * (o.price - p.cost_price)) as gross_profit,
|
||||
COUNT(DISTINCT DATE(o.date)) as active_days,
|
||||
MIN(o.date) as period_start,
|
||||
MAX(o.date) as period_end
|
||||
FROM products p
|
||||
LEFT JOIN orders o ON p.pid = o.pid
|
||||
WHERE o.canceled = false
|
||||
GROUP BY p.pid, YEAR(o.date), MONTH(o.date)
|
||||
)
|
||||
UPDATE product_time_aggregates pta
|
||||
JOIN monthly_financials mf ON pta.pid = mf.pid
|
||||
AND pta.year = mf.year
|
||||
AND pta.month = mf.month
|
||||
SET
|
||||
pta.inventory_value = COALESCE(mf.inventory_value, 0),
|
||||
pta.gmroi = CASE
|
||||
WHEN COALESCE(mf.inventory_value, 0) > 0 AND mf.active_days > 0 THEN
|
||||
(COALESCE(mf.gross_profit, 0) * (365.0 / mf.active_days)) / COALESCE(mf.inventory_value, 0)
|
||||
ELSE 0
|
||||
END
|
||||
`);
|
||||
|
||||
processedCount = Math.floor(totalProducts * 0.70);
|
||||
outputProgress({
|
||||
status: 'running',
|
||||
operation: 'Time-based aggregates updated',
|
||||
current: processedCount,
|
||||
total: totalProducts,
|
||||
elapsed: formatElapsedTime(startTime),
|
||||
remaining: estimateRemaining(startTime, processedCount, totalProducts),
|
||||
rate: calculateRate(startTime, processedCount),
|
||||
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
||||
timing: {
|
||||
start_time: new Date(startTime).toISOString(),
|
||||
end_time: new Date().toISOString(),
|
||||
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
||||
}
|
||||
});
|
||||
|
||||
// If we get here, everything completed successfully
|
||||
success = true;
|
||||
@@ -173,7 +164,7 @@ async function calculateFinancialMetrics(startTime, totalProducts, processedCoun
|
||||
|
||||
return {
|
||||
processedProducts: processedCount,
|
||||
processedOrders,
|
||||
processedOrders: 0,
|
||||
processedPurchaseOrders: 0,
|
||||
success
|
||||
};
|
||||
|
||||
@@ -16,16 +16,42 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
|
||||
const BATCH_SIZE = 5000;
|
||||
|
||||
try {
|
||||
// Skip flags are inherited from the parent scope
|
||||
const SKIP_PRODUCT_BASE_METRICS = 0;
|
||||
const SKIP_PRODUCT_TIME_AGGREGATES = 0;
|
||||
// Get last calculation timestamp
|
||||
const [lastCalc] = await connection.query(`
|
||||
SELECT last_calculation_timestamp
|
||||
FROM calculate_status
|
||||
WHERE module_name = 'product_metrics'
|
||||
`);
|
||||
const lastCalculationTime = lastCalc[0]?.last_calculation_timestamp || '1970-01-01';
|
||||
|
||||
// Get total product count if not provided
|
||||
if (!totalProducts) {
|
||||
const [productCount] = await connection.query('SELECT COUNT(*) as count FROM products');
|
||||
const [productCount] = await connection.query(`
|
||||
SELECT COUNT(DISTINCT p.pid) as count
|
||||
FROM products p
|
||||
LEFT JOIN orders o ON p.pid = o.pid AND o.updated > ?
|
||||
LEFT JOIN purchase_orders po ON p.pid = po.pid AND po.updated > ?
|
||||
WHERE p.updated > ?
|
||||
OR o.pid IS NOT NULL
|
||||
OR po.pid IS NOT NULL
|
||||
`, [lastCalculationTime, lastCalculationTime, lastCalculationTime]);
|
||||
totalProducts = productCount[0].count;
|
||||
}
|
||||
|
||||
if (totalProducts === 0) {
|
||||
console.log('No products need updating');
|
||||
return {
|
||||
processedProducts: 0,
|
||||
processedOrders: 0,
|
||||
processedPurchaseOrders: 0,
|
||||
success: true
|
||||
};
|
||||
}
|
||||
|
||||
// Skip flags are inherited from the parent scope
|
||||
const SKIP_PRODUCT_BASE_METRICS = 0;
|
||||
const SKIP_PRODUCT_TIME_AGGREGATES = 0;
|
||||
|
||||
if (isCancelled) {
|
||||
outputProgress({
|
||||
status: 'cancelled',
|
||||
@@ -93,10 +119,39 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
|
||||
processedOrders = orderCount[0].count;
|
||||
|
||||
// Clear temporary tables
|
||||
await connection.query('TRUNCATE TABLE temp_sales_metrics');
|
||||
await connection.query('TRUNCATE TABLE temp_purchase_metrics');
|
||||
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_sales_metrics');
|
||||
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_purchase_metrics');
|
||||
|
||||
// Populate temp_sales_metrics with base stats and sales averages
|
||||
// Create optimized temporary tables with indexes
|
||||
await connection.query(`
|
||||
CREATE TEMPORARY TABLE temp_sales_metrics (
|
||||
pid BIGINT NOT NULL,
|
||||
daily_sales_avg DECIMAL(10,3),
|
||||
weekly_sales_avg DECIMAL(10,3),
|
||||
monthly_sales_avg DECIMAL(10,3),
|
||||
total_revenue DECIMAL(10,2),
|
||||
avg_margin_percent DECIMAL(5,2),
|
||||
first_sale_date DATE,
|
||||
last_sale_date DATE,
|
||||
PRIMARY KEY (pid),
|
||||
INDEX (daily_sales_avg),
|
||||
INDEX (total_revenue)
|
||||
) ENGINE=MEMORY
|
||||
`);
|
||||
|
||||
await connection.query(`
|
||||
CREATE TEMPORARY TABLE temp_purchase_metrics (
|
||||
pid BIGINT NOT NULL,
|
||||
avg_lead_time_days DECIMAL(5,1),
|
||||
last_purchase_date DATE,
|
||||
first_received_date DATE,
|
||||
last_received_date DATE,
|
||||
PRIMARY KEY (pid),
|
||||
INDEX (avg_lead_time_days)
|
||||
) ENGINE=MEMORY
|
||||
`);
|
||||
|
||||
// Populate temp_sales_metrics with base stats and sales averages using FORCE INDEX
|
||||
await connection.query(`
|
||||
INSERT INTO temp_sales_metrics
|
||||
SELECT
|
||||
@@ -113,13 +168,21 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
|
||||
MIN(o.date) as first_sale_date,
|
||||
MAX(o.date) as last_sale_date
|
||||
FROM products p
|
||||
LEFT JOIN orders o ON p.pid = o.pid
|
||||
FORCE INDEX (PRIMARY)
|
||||
LEFT JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid
|
||||
AND o.canceled = false
|
||||
AND o.date >= DATE_SUB(CURDATE(), INTERVAL 90 DAY)
|
||||
WHERE p.updated > ?
|
||||
OR EXISTS (
|
||||
SELECT 1 FROM orders o2 FORCE INDEX (idx_orders_metrics)
|
||||
WHERE o2.pid = p.pid
|
||||
AND o2.canceled = false
|
||||
AND o2.updated > ?
|
||||
)
|
||||
GROUP BY p.pid
|
||||
`);
|
||||
`, [lastCalculationTime, lastCalculationTime]);
|
||||
|
||||
// Populate temp_purchase_metrics
|
||||
// Populate temp_purchase_metrics with optimized index usage
|
||||
await connection.query(`
|
||||
INSERT INTO temp_purchase_metrics
|
||||
SELECT
|
||||
@@ -129,21 +192,38 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
|
||||
MIN(po.received_date) as first_received_date,
|
||||
MAX(po.received_date) as last_received_date
|
||||
FROM products p
|
||||
LEFT JOIN purchase_orders po ON p.pid = po.pid
|
||||
FORCE INDEX (PRIMARY)
|
||||
LEFT JOIN purchase_orders po FORCE INDEX (idx_po_metrics) ON p.pid = po.pid
|
||||
AND po.received_date IS NOT NULL
|
||||
AND po.date >= DATE_SUB(CURDATE(), INTERVAL 365 DAY)
|
||||
WHERE p.updated > ?
|
||||
OR EXISTS (
|
||||
SELECT 1 FROM purchase_orders po2 FORCE INDEX (idx_po_metrics)
|
||||
WHERE po2.pid = p.pid
|
||||
AND po2.updated > ?
|
||||
)
|
||||
GROUP BY p.pid
|
||||
`);
|
||||
`, [lastCalculationTime, lastCalculationTime]);
|
||||
|
||||
// Process updates in batches
|
||||
// Process updates in batches, but only for affected products
|
||||
let lastPid = 0;
|
||||
while (true) {
|
||||
if (isCancelled) break;
|
||||
|
||||
const [batch] = await connection.query(
|
||||
'SELECT pid FROM products WHERE pid > ? ORDER BY pid LIMIT ?',
|
||||
[lastPid, BATCH_SIZE]
|
||||
);
|
||||
const [batch] = await connection.query(`
|
||||
SELECT DISTINCT p.pid
|
||||
FROM products p
|
||||
LEFT JOIN orders o ON p.pid = o.pid AND o.updated > ?
|
||||
LEFT JOIN purchase_orders po ON p.pid = po.pid AND po.updated > ?
|
||||
WHERE p.pid > ?
|
||||
AND (
|
||||
p.updated > ?
|
||||
OR o.pid IS NOT NULL
|
||||
OR po.pid IS NOT NULL
|
||||
)
|
||||
ORDER BY p.pid
|
||||
LIMIT ?
|
||||
`, [lastCalculationTime, lastCalculationTime, lastPid, lastCalculationTime, BATCH_SIZE]);
|
||||
|
||||
if (batch.length === 0) break;
|
||||
|
||||
@@ -532,7 +612,7 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
|
||||
// If we get here, everything completed successfully
|
||||
success = true;
|
||||
|
||||
// Update calculate_status
|
||||
// Update calculate_status with current timestamp
|
||||
await connection.query(`
|
||||
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
|
||||
VALUES ('product_metrics', NOW())
|
||||
|
||||
@@ -4,19 +4,50 @@ const { getConnection } = require('./utils/db');
|
||||
async function calculateSalesForecasts(startTime, totalProducts, processedCount = 0, isCancelled = false) {
|
||||
const connection = await getConnection();
|
||||
let success = false;
|
||||
let processedOrders = 0;
|
||||
const BATCH_SIZE = 5000;
|
||||
|
||||
try {
|
||||
// Get last calculation timestamp
|
||||
const [lastCalc] = await connection.query(`
|
||||
SELECT last_calculation_timestamp
|
||||
FROM calculate_status
|
||||
WHERE module_name = 'sales_forecasts'
|
||||
`);
|
||||
const lastCalculationTime = lastCalc[0]?.last_calculation_timestamp || '1970-01-01';
|
||||
|
||||
// Get total count of products needing updates
|
||||
const [productCount] = await connection.query(`
|
||||
SELECT COUNT(DISTINCT p.pid) as count
|
||||
FROM products p
|
||||
LEFT JOIN orders o ON p.pid = o.pid AND o.updated > ?
|
||||
WHERE p.visible = true
|
||||
AND (
|
||||
p.updated > ?
|
||||
OR o.id IS NOT NULL
|
||||
)
|
||||
`, [lastCalculationTime, lastCalculationTime]);
|
||||
const totalProductsToUpdate = productCount[0].count;
|
||||
|
||||
if (totalProductsToUpdate === 0) {
|
||||
console.log('No products need forecast updates');
|
||||
return {
|
||||
processedProducts: 0,
|
||||
processedOrders: 0,
|
||||
processedPurchaseOrders: 0,
|
||||
success: true
|
||||
};
|
||||
}
|
||||
|
||||
if (isCancelled) {
|
||||
outputProgress({
|
||||
status: 'cancelled',
|
||||
operation: 'Sales forecasts calculation cancelled',
|
||||
operation: 'Sales forecast calculation cancelled',
|
||||
current: processedCount,
|
||||
total: totalProducts,
|
||||
total: totalProductsToUpdate,
|
||||
elapsed: formatElapsedTime(startTime),
|
||||
remaining: null,
|
||||
rate: calculateRate(startTime, processedCount),
|
||||
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
||||
percentage: ((processedCount / totalProductsToUpdate) * 100).toFixed(1),
|
||||
timing: {
|
||||
start_time: new Date(startTime).toISOString(),
|
||||
end_time: new Date().toISOString(),
|
||||
@@ -31,24 +62,15 @@ async function calculateSalesForecasts(startTime, totalProducts, processedCount
|
||||
};
|
||||
}
|
||||
|
||||
// Get order count that will be processed
|
||||
const [orderCount] = await connection.query(`
|
||||
SELECT COUNT(*) as count
|
||||
FROM orders o
|
||||
WHERE o.canceled = false
|
||||
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 90 DAY)
|
||||
`);
|
||||
processedOrders = orderCount[0].count;
|
||||
|
||||
outputProgress({
|
||||
status: 'running',
|
||||
operation: 'Starting sales forecasts calculation',
|
||||
operation: 'Starting sales forecast calculation',
|
||||
current: processedCount,
|
||||
total: totalProducts,
|
||||
total: totalProductsToUpdate,
|
||||
elapsed: formatElapsedTime(startTime),
|
||||
remaining: estimateRemaining(startTime, processedCount, totalProducts),
|
||||
remaining: estimateRemaining(startTime, processedCount, totalProductsToUpdate),
|
||||
rate: calculateRate(startTime, processedCount),
|
||||
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
||||
percentage: ((processedCount / totalProductsToUpdate) * 100).toFixed(1),
|
||||
timing: {
|
||||
start_time: new Date(startTime).toISOString(),
|
||||
end_time: new Date().toISOString(),
|
||||
@@ -56,365 +78,201 @@ async function calculateSalesForecasts(startTime, totalProducts, processedCount
|
||||
}
|
||||
});
|
||||
|
||||
// First, create a temporary table for forecast dates
|
||||
await connection.query(`
|
||||
CREATE TEMPORARY TABLE IF NOT EXISTS temp_forecast_dates (
|
||||
forecast_date DATE,
|
||||
day_of_week INT,
|
||||
month INT,
|
||||
PRIMARY KEY (forecast_date)
|
||||
// Process in batches
|
||||
let lastPid = '';
|
||||
while (true) {
|
||||
if (isCancelled) break;
|
||||
|
||||
const [batch] = await connection.query(`
|
||||
SELECT DISTINCT p.pid
|
||||
FROM products p
|
||||
FORCE INDEX (PRIMARY)
|
||||
LEFT JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid AND o.updated > ?
|
||||
WHERE p.visible = true
|
||||
AND p.pid > ?
|
||||
AND (
|
||||
p.updated > ?
|
||||
OR o.id IS NOT NULL
|
||||
)
|
||||
ORDER BY p.pid
|
||||
LIMIT ?
|
||||
`, [lastCalculationTime, lastPid, lastCalculationTime, BATCH_SIZE]);
|
||||
|
||||
if (batch.length === 0) break;
|
||||
|
||||
// Create temporary tables for better performance
|
||||
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_historical_sales');
|
||||
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_sales_stats');
|
||||
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_recent_trend');
|
||||
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_confidence_calc');
|
||||
|
||||
// Create optimized temporary tables with indexes
|
||||
await connection.query(`
|
||||
CREATE TEMPORARY TABLE temp_historical_sales (
|
||||
pid BIGINT NOT NULL,
|
||||
sale_date DATE NOT NULL,
|
||||
daily_quantity INT,
|
||||
daily_revenue DECIMAL(15,2),
|
||||
PRIMARY KEY (pid, sale_date),
|
||||
INDEX (sale_date)
|
||||
) ENGINE=MEMORY
|
||||
`);
|
||||
|
||||
await connection.query(`
|
||||
INSERT INTO temp_forecast_dates
|
||||
SELECT
|
||||
DATE_ADD(CURRENT_DATE, INTERVAL n DAY) as forecast_date,
|
||||
DAYOFWEEK(DATE_ADD(CURRENT_DATE, INTERVAL n DAY)) as day_of_week,
|
||||
MONTH(DATE_ADD(CURRENT_DATE, INTERVAL n DAY)) as month
|
||||
FROM (
|
||||
SELECT a.N + b.N * 10 as n
|
||||
FROM
|
||||
(SELECT 0 as N UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4 UNION
|
||||
SELECT 5 UNION SELECT 6 UNION SELECT 7 UNION SELECT 8 UNION SELECT 9) a,
|
||||
(SELECT 0 as N UNION SELECT 1 UNION SELECT 2) b
|
||||
ORDER BY n
|
||||
LIMIT 31
|
||||
) numbers
|
||||
CREATE TEMPORARY TABLE temp_sales_stats (
|
||||
pid BIGINT NOT NULL,
|
||||
avg_daily_units DECIMAL(10,2),
|
||||
avg_daily_revenue DECIMAL(15,2),
|
||||
std_daily_units DECIMAL(10,2),
|
||||
days_with_sales INT,
|
||||
first_sale DATE,
|
||||
last_sale DATE,
|
||||
PRIMARY KEY (pid),
|
||||
INDEX (days_with_sales),
|
||||
INDEX (last_sale)
|
||||
) ENGINE=MEMORY
|
||||
`);
|
||||
|
||||
processedCount = Math.floor(totalProducts * 0.92);
|
||||
outputProgress({
|
||||
status: 'running',
|
||||
operation: 'Forecast dates prepared, calculating daily sales stats',
|
||||
current: processedCount,
|
||||
total: totalProducts,
|
||||
elapsed: formatElapsedTime(startTime),
|
||||
remaining: estimateRemaining(startTime, processedCount, totalProducts),
|
||||
rate: calculateRate(startTime, processedCount),
|
||||
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
||||
timing: {
|
||||
start_time: new Date(startTime).toISOString(),
|
||||
end_time: new Date().toISOString(),
|
||||
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
||||
}
|
||||
});
|
||||
|
||||
if (isCancelled) return {
|
||||
processedProducts: processedCount,
|
||||
processedOrders,
|
||||
processedPurchaseOrders: 0,
|
||||
success
|
||||
};
|
||||
|
||||
// Create temporary table for daily sales stats
|
||||
await connection.query(`
|
||||
CREATE TEMPORARY TABLE IF NOT EXISTS temp_daily_sales AS
|
||||
CREATE TEMPORARY TABLE temp_recent_trend (
|
||||
pid BIGINT NOT NULL,
|
||||
recent_avg_units DECIMAL(10,2),
|
||||
recent_avg_revenue DECIMAL(15,2),
|
||||
PRIMARY KEY (pid)
|
||||
) ENGINE=MEMORY
|
||||
`);
|
||||
|
||||
await connection.query(`
|
||||
CREATE TEMPORARY TABLE temp_confidence_calc (
|
||||
pid BIGINT NOT NULL,
|
||||
confidence_level TINYINT,
|
||||
PRIMARY KEY (pid)
|
||||
) ENGINE=MEMORY
|
||||
`);
|
||||
|
||||
// Populate historical sales with optimized index usage
|
||||
await connection.query(`
|
||||
INSERT INTO temp_historical_sales
|
||||
SELECT
|
||||
o.pid,
|
||||
DAYOFWEEK(o.date) as day_of_week,
|
||||
DATE(o.date) as sale_date,
|
||||
SUM(o.quantity) as daily_quantity,
|
||||
SUM(o.price * o.quantity) as daily_revenue,
|
||||
COUNT(DISTINCT DATE(o.date)) as day_count
|
||||
SUM(o.quantity * o.price) as daily_revenue
|
||||
FROM orders o
|
||||
FORCE INDEX (idx_orders_metrics)
|
||||
WHERE o.canceled = false
|
||||
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 90 DAY)
|
||||
GROUP BY o.pid, DAYOFWEEK(o.date)
|
||||
`);
|
||||
AND o.pid IN (?)
|
||||
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 180 DAY)
|
||||
GROUP BY o.pid, DATE(o.date)
|
||||
`, [batch.map(row => row.pid)]);
|
||||
|
||||
processedCount = Math.floor(totalProducts * 0.94);
|
||||
outputProgress({
|
||||
status: 'running',
|
||||
operation: 'Daily sales stats calculated, preparing product stats',
|
||||
current: processedCount,
|
||||
total: totalProducts,
|
||||
elapsed: formatElapsedTime(startTime),
|
||||
remaining: estimateRemaining(startTime, processedCount, totalProducts),
|
||||
rate: calculateRate(startTime, processedCount),
|
||||
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
||||
timing: {
|
||||
start_time: new Date(startTime).toISOString(),
|
||||
end_time: new Date().toISOString(),
|
||||
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
||||
}
|
||||
});
|
||||
|
||||
if (isCancelled) return {
|
||||
processedProducts: processedCount,
|
||||
processedOrders,
|
||||
processedPurchaseOrders: 0,
|
||||
success
|
||||
};
|
||||
|
||||
// Create temporary table for product stats
|
||||
// Populate sales stats
|
||||
await connection.query(`
|
||||
CREATE TEMPORARY TABLE IF NOT EXISTS temp_product_stats AS
|
||||
INSERT INTO temp_sales_stats
|
||||
SELECT
|
||||
pid,
|
||||
AVG(daily_revenue) as overall_avg_revenue,
|
||||
SUM(day_count) as total_days
|
||||
FROM temp_daily_sales
|
||||
AVG(daily_quantity) as avg_daily_units,
|
||||
AVG(daily_revenue) as avg_daily_revenue,
|
||||
STDDEV(daily_quantity) as std_daily_units,
|
||||
COUNT(*) as days_with_sales,
|
||||
MIN(sale_date) as first_sale,
|
||||
MAX(sale_date) as last_sale
|
||||
FROM temp_historical_sales
|
||||
GROUP BY pid
|
||||
`);
|
||||
|
||||
processedCount = Math.floor(totalProducts * 0.96);
|
||||
outputProgress({
|
||||
status: 'running',
|
||||
operation: 'Product stats prepared, calculating product-level forecasts',
|
||||
current: processedCount,
|
||||
total: totalProducts,
|
||||
elapsed: formatElapsedTime(startTime),
|
||||
remaining: estimateRemaining(startTime, processedCount, totalProducts),
|
||||
rate: calculateRate(startTime, processedCount),
|
||||
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
||||
timing: {
|
||||
start_time: new Date(startTime).toISOString(),
|
||||
end_time: new Date().toISOString(),
|
||||
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
||||
}
|
||||
});
|
||||
|
||||
if (isCancelled) return {
|
||||
processedProducts: processedCount,
|
||||
processedOrders,
|
||||
processedPurchaseOrders: 0,
|
||||
success
|
||||
};
|
||||
|
||||
// Calculate product-level forecasts
|
||||
// Populate recent trend
|
||||
await connection.query(`
|
||||
INSERT INTO sales_forecasts (
|
||||
pid,
|
||||
forecast_date,
|
||||
forecast_units,
|
||||
forecast_revenue,
|
||||
confidence_level,
|
||||
last_calculated_at
|
||||
)
|
||||
WITH daily_stats AS (
|
||||
INSERT INTO temp_recent_trend
|
||||
SELECT
|
||||
ds.pid,
|
||||
AVG(ds.daily_quantity) as avg_daily_qty,
|
||||
STDDEV(ds.daily_quantity) as std_daily_qty,
|
||||
COUNT(DISTINCT ds.day_count) as data_points,
|
||||
SUM(ds.day_count) as total_days,
|
||||
AVG(ds.daily_revenue) as avg_daily_revenue,
|
||||
STDDEV(ds.daily_revenue) as std_daily_revenue,
|
||||
MIN(ds.daily_quantity) as min_daily_qty,
|
||||
MAX(ds.daily_quantity) as max_daily_qty,
|
||||
-- Calculate variance without using LAG
|
||||
COALESCE(
|
||||
STDDEV(ds.daily_quantity) / NULLIF(AVG(ds.daily_quantity), 0),
|
||||
0
|
||||
) as daily_variance_ratio
|
||||
FROM temp_daily_sales ds
|
||||
GROUP BY ds.pid
|
||||
HAVING AVG(ds.daily_quantity) > 0
|
||||
)
|
||||
h.pid,
|
||||
AVG(h.daily_quantity) as recent_avg_units,
|
||||
AVG(h.daily_revenue) as recent_avg_revenue
|
||||
FROM temp_historical_sales h
|
||||
WHERE h.sale_date >= DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY)
|
||||
GROUP BY h.pid
|
||||
`);
|
||||
|
||||
// Calculate confidence levels
|
||||
await connection.query(`
|
||||
INSERT INTO temp_confidence_calc
|
||||
SELECT
|
||||
ds.pid,
|
||||
fd.forecast_date,
|
||||
GREATEST(0,
|
||||
ROUND(
|
||||
ds.avg_daily_qty *
|
||||
(1 + COALESCE(sf.seasonality_factor, 0)) *
|
||||
s.pid,
|
||||
LEAST(100, GREATEST(0, ROUND(
|
||||
(s.days_with_sales / 180.0 * 50) + -- Up to 50 points for history length
|
||||
(CASE
|
||||
WHEN s.std_daily_units = 0 OR s.avg_daily_units = 0 THEN 0
|
||||
WHEN (s.std_daily_units / s.avg_daily_units) <= 0.5 THEN 30
|
||||
WHEN (s.std_daily_units / s.avg_daily_units) <= 1.0 THEN 20
|
||||
WHEN (s.std_daily_units / s.avg_daily_units) <= 2.0 THEN 10
|
||||
ELSE 0
|
||||
END) + -- Up to 30 points for consistency
|
||||
(CASE
|
||||
WHEN DATEDIFF(CURRENT_DATE, s.last_sale) <= 7 THEN 20
|
||||
WHEN DATEDIFF(CURRENT_DATE, s.last_sale) <= 30 THEN 10
|
||||
ELSE 0
|
||||
END) -- Up to 20 points for recency
|
||||
))) as confidence_level
|
||||
FROM temp_sales_stats s
|
||||
`);
|
||||
|
||||
// Generate forecasts using temp tables
|
||||
await connection.query(`
|
||||
REPLACE INTO sales_forecasts
|
||||
(pid, forecast_date, forecast_units, forecast_revenue, confidence_level, last_calculated_at)
|
||||
SELECT
|
||||
s.pid,
|
||||
DATE_ADD(CURRENT_DATE, INTERVAL n.days DAY),
|
||||
GREATEST(0, ROUND(
|
||||
CASE
|
||||
WHEN ds.std_daily_qty / NULLIF(ds.avg_daily_qty, 0) > 1.5 THEN 0.85
|
||||
WHEN ds.std_daily_qty / NULLIF(ds.avg_daily_qty, 0) > 1.0 THEN 0.9
|
||||
WHEN ds.std_daily_qty / NULLIF(ds.avg_daily_qty, 0) > 0.5 THEN 0.95
|
||||
ELSE 1.0
|
||||
WHEN s.days_with_sales >= n.days THEN COALESCE(t.recent_avg_units, s.avg_daily_units)
|
||||
ELSE s.avg_daily_units * (s.days_with_sales / n.days)
|
||||
END
|
||||
)),
|
||||
GREATEST(0, ROUND(
|
||||
CASE
|
||||
WHEN s.days_with_sales >= n.days THEN COALESCE(t.recent_avg_revenue, s.avg_daily_revenue)
|
||||
ELSE s.avg_daily_revenue * (s.days_with_sales / n.days)
|
||||
END,
|
||||
2
|
||||
)
|
||||
) as forecast_units,
|
||||
GREATEST(0,
|
||||
ROUND(
|
||||
COALESCE(
|
||||
CASE
|
||||
WHEN ds.data_points >= 4 THEN ds.avg_daily_revenue
|
||||
ELSE ps.overall_avg_revenue
|
||||
END *
|
||||
(1 + COALESCE(sf.seasonality_factor, 0)) *
|
||||
CASE
|
||||
WHEN ds.std_daily_revenue / NULLIF(ds.avg_daily_revenue, 0) > 1.5 THEN 0.85
|
||||
WHEN ds.std_daily_revenue / NULLIF(ds.avg_daily_revenue, 0) > 1.0 THEN 0.9
|
||||
WHEN ds.std_daily_revenue / NULLIF(ds.avg_daily_revenue, 0) > 0.5 THEN 0.95
|
||||
ELSE 1.0
|
||||
END,
|
||||
0
|
||||
),
|
||||
2
|
||||
)
|
||||
) as forecast_revenue,
|
||||
CASE
|
||||
WHEN ds.total_days >= 60 AND ds.daily_variance_ratio < 0.5 THEN 90
|
||||
WHEN ds.total_days >= 60 THEN 85
|
||||
WHEN ds.total_days >= 30 AND ds.daily_variance_ratio < 0.5 THEN 80
|
||||
WHEN ds.total_days >= 30 THEN 75
|
||||
WHEN ds.total_days >= 14 AND ds.daily_variance_ratio < 0.5 THEN 70
|
||||
WHEN ds.total_days >= 14 THEN 65
|
||||
ELSE 60
|
||||
END as confidence_level,
|
||||
NOW() as last_calculated_at
|
||||
FROM daily_stats ds
|
||||
JOIN temp_product_stats ps ON ds.pid = ps.pid
|
||||
CROSS JOIN temp_forecast_dates fd
|
||||
LEFT JOIN sales_seasonality sf ON fd.month = sf.month
|
||||
GROUP BY ds.pid, fd.forecast_date, ps.overall_avg_revenue, sf.seasonality_factor
|
||||
ON DUPLICATE KEY UPDATE
|
||||
forecast_units = VALUES(forecast_units),
|
||||
forecast_revenue = VALUES(forecast_revenue),
|
||||
confidence_level = VALUES(confidence_level),
|
||||
last_calculated_at = NOW()
|
||||
)),
|
||||
c.confidence_level,
|
||||
NOW()
|
||||
FROM temp_sales_stats s
|
||||
CROSS JOIN (
|
||||
SELECT 30 as days
|
||||
UNION SELECT 60
|
||||
UNION SELECT 90
|
||||
) n
|
||||
LEFT JOIN temp_recent_trend t ON s.pid = t.pid
|
||||
LEFT JOIN temp_confidence_calc c ON s.pid = c.pid;
|
||||
`);
|
||||
|
||||
processedCount = Math.floor(totalProducts * 0.98);
|
||||
// Clean up temp tables
|
||||
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_historical_sales');
|
||||
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_sales_stats');
|
||||
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_recent_trend');
|
||||
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_confidence_calc');
|
||||
|
||||
lastPid = batch[batch.length - 1].pid;
|
||||
processedCount += batch.length;
|
||||
|
||||
outputProgress({
|
||||
status: 'running',
|
||||
operation: 'Product forecasts calculated, preparing category stats',
|
||||
operation: 'Processing sales forecast batch',
|
||||
current: processedCount,
|
||||
total: totalProducts,
|
||||
total: totalProductsToUpdate,
|
||||
elapsed: formatElapsedTime(startTime),
|
||||
remaining: estimateRemaining(startTime, processedCount, totalProducts),
|
||||
remaining: estimateRemaining(startTime, processedCount, totalProductsToUpdate),
|
||||
rate: calculateRate(startTime, processedCount),
|
||||
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
||||
percentage: ((processedCount / totalProductsToUpdate) * 100).toFixed(1),
|
||||
timing: {
|
||||
start_time: new Date(startTime).toISOString(),
|
||||
end_time: new Date().toISOString(),
|
||||
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
||||
}
|
||||
});
|
||||
|
||||
if (isCancelled) return {
|
||||
processedProducts: processedCount,
|
||||
processedOrders,
|
||||
processedPurchaseOrders: 0,
|
||||
success
|
||||
};
|
||||
|
||||
// Create temporary table for category stats
|
||||
await connection.query(`
|
||||
CREATE TEMPORARY TABLE IF NOT EXISTS temp_category_sales AS
|
||||
SELECT
|
||||
pc.cat_id,
|
||||
DAYOFWEEK(o.date) as day_of_week,
|
||||
SUM(o.quantity) as daily_quantity,
|
||||
SUM(o.price * o.quantity) as daily_revenue,
|
||||
COUNT(DISTINCT DATE(o.date)) as day_count
|
||||
FROM orders o
|
||||
JOIN product_categories pc ON o.pid = pc.pid
|
||||
WHERE o.canceled = false
|
||||
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 90 DAY)
|
||||
GROUP BY pc.cat_id, DAYOFWEEK(o.date)
|
||||
`);
|
||||
|
||||
await connection.query(`
|
||||
CREATE TEMPORARY TABLE IF NOT EXISTS temp_category_stats AS
|
||||
SELECT
|
||||
cat_id,
|
||||
AVG(daily_revenue) as overall_avg_revenue,
|
||||
SUM(day_count) as total_days
|
||||
FROM temp_category_sales
|
||||
GROUP BY cat_id
|
||||
`);
|
||||
|
||||
processedCount = Math.floor(totalProducts * 0.99);
|
||||
outputProgress({
|
||||
status: 'running',
|
||||
operation: 'Category stats prepared, calculating category-level forecasts',
|
||||
current: processedCount,
|
||||
total: totalProducts,
|
||||
elapsed: formatElapsedTime(startTime),
|
||||
remaining: estimateRemaining(startTime, processedCount, totalProducts),
|
||||
rate: calculateRate(startTime, processedCount),
|
||||
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
||||
timing: {
|
||||
start_time: new Date(startTime).toISOString(),
|
||||
end_time: new Date().toISOString(),
|
||||
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
||||
}
|
||||
});
|
||||
|
||||
if (isCancelled) return {
|
||||
processedProducts: processedCount,
|
||||
processedOrders,
|
||||
processedPurchaseOrders: 0,
|
||||
success
|
||||
};
|
||||
|
||||
// Calculate category-level forecasts
|
||||
await connection.query(`
|
||||
INSERT INTO category_forecasts (
|
||||
category_id,
|
||||
forecast_date,
|
||||
forecast_units,
|
||||
forecast_revenue,
|
||||
confidence_level,
|
||||
last_calculated_at
|
||||
)
|
||||
SELECT
|
||||
cs.cat_id as category_id,
|
||||
fd.forecast_date,
|
||||
GREATEST(0,
|
||||
AVG(cs.daily_quantity) *
|
||||
(1 + COALESCE(sf.seasonality_factor, 0))
|
||||
) as forecast_units,
|
||||
GREATEST(0,
|
||||
COALESCE(
|
||||
CASE
|
||||
WHEN SUM(cs.day_count) >= 4 THEN AVG(cs.daily_revenue)
|
||||
ELSE ct.overall_avg_revenue
|
||||
END *
|
||||
(1 + COALESCE(sf.seasonality_factor, 0)) *
|
||||
(0.95 + (RAND() * 0.1)),
|
||||
0
|
||||
)
|
||||
) as forecast_revenue,
|
||||
CASE
|
||||
WHEN ct.total_days >= 60 THEN 90
|
||||
WHEN ct.total_days >= 30 THEN 80
|
||||
WHEN ct.total_days >= 14 THEN 70
|
||||
ELSE 60
|
||||
END as confidence_level,
|
||||
NOW() as last_calculated_at
|
||||
FROM temp_category_sales cs
|
||||
JOIN temp_category_stats ct ON cs.cat_id = ct.cat_id
|
||||
CROSS JOIN temp_forecast_dates fd
|
||||
LEFT JOIN sales_seasonality sf ON fd.month = sf.month
|
||||
GROUP BY cs.cat_id, fd.forecast_date, ct.overall_avg_revenue, ct.total_days, sf.seasonality_factor
|
||||
HAVING AVG(cs.daily_quantity) > 0
|
||||
ON DUPLICATE KEY UPDATE
|
||||
forecast_units = VALUES(forecast_units),
|
||||
forecast_revenue = VALUES(forecast_revenue),
|
||||
confidence_level = VALUES(confidence_level),
|
||||
last_calculated_at = NOW()
|
||||
`);
|
||||
|
||||
// Clean up temporary tables
|
||||
await connection.query(`
|
||||
DROP TEMPORARY TABLE IF EXISTS temp_forecast_dates;
|
||||
DROP TEMPORARY TABLE IF EXISTS temp_daily_sales;
|
||||
DROP TEMPORARY TABLE IF EXISTS temp_product_stats;
|
||||
DROP TEMPORARY TABLE IF EXISTS temp_category_sales;
|
||||
DROP TEMPORARY TABLE IF EXISTS temp_category_stats;
|
||||
`);
|
||||
|
||||
processedCount = Math.floor(totalProducts * 1.0);
|
||||
outputProgress({
|
||||
status: 'running',
|
||||
operation: 'Category forecasts calculated and temporary tables cleaned up',
|
||||
current: processedCount,
|
||||
total: totalProducts,
|
||||
elapsed: formatElapsedTime(startTime),
|
||||
remaining: estimateRemaining(startTime, processedCount, totalProducts),
|
||||
rate: calculateRate(startTime, processedCount),
|
||||
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
||||
timing: {
|
||||
start_time: new Date(startTime).toISOString(),
|
||||
end_time: new Date().toISOString(),
|
||||
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
||||
}
|
||||
});
|
||||
|
||||
// If we get here, everything completed successfully
|
||||
success = true;
|
||||
@@ -428,7 +286,7 @@ async function calculateSalesForecasts(startTime, totalProducts, processedCount
|
||||
|
||||
return {
|
||||
processedProducts: processedCount,
|
||||
processedOrders,
|
||||
processedOrders: 0,
|
||||
processedPurchaseOrders: 0,
|
||||
success
|
||||
};
|
||||
|
||||
@@ -4,9 +4,39 @@ const { getConnection } = require('./utils/db');
|
||||
async function calculateTimeAggregates(startTime, totalProducts, processedCount = 0, isCancelled = false) {
|
||||
const connection = await getConnection();
|
||||
let success = false;
|
||||
let processedOrders = 0;
|
||||
const BATCH_SIZE = 5000;
|
||||
|
||||
try {
|
||||
// Get last calculation timestamp
|
||||
const [lastCalc] = await connection.query(`
|
||||
SELECT last_calculation_timestamp
|
||||
FROM calculate_status
|
||||
WHERE module_name = 'time_aggregates'
|
||||
`);
|
||||
const lastCalculationTime = lastCalc[0]?.last_calculation_timestamp || '1970-01-01';
|
||||
|
||||
// Get total count of products needing updates
|
||||
if (!totalProducts) {
|
||||
const [productCount] = await connection.query(`
|
||||
SELECT COUNT(DISTINCT p.pid) as count
|
||||
FROM products p
|
||||
LEFT JOIN orders o ON p.pid = o.pid AND o.updated > ?
|
||||
WHERE p.updated > ?
|
||||
OR o.pid IS NOT NULL
|
||||
`, [lastCalculationTime, lastCalculationTime]);
|
||||
totalProducts = productCount[0].count;
|
||||
}
|
||||
|
||||
if (totalProducts === 0) {
|
||||
console.log('No products need time aggregate updates');
|
||||
return {
|
||||
processedProducts: 0,
|
||||
processedOrders: 0,
|
||||
processedPurchaseOrders: 0,
|
||||
success: true
|
||||
};
|
||||
}
|
||||
|
||||
if (isCancelled) {
|
||||
outputProgress({
|
||||
status: 'cancelled',
|
||||
@@ -31,14 +61,6 @@ async function calculateTimeAggregates(startTime, totalProducts, processedCount
|
||||
};
|
||||
}
|
||||
|
||||
// Get order count that will be processed
|
||||
const [orderCount] = await connection.query(`
|
||||
SELECT COUNT(*) as count
|
||||
FROM orders o
|
||||
WHERE o.canceled = false
|
||||
`);
|
||||
processedOrders = orderCount[0].count;
|
||||
|
||||
outputProgress({
|
||||
status: 'running',
|
||||
operation: 'Starting time aggregates calculation',
|
||||
@@ -55,100 +77,106 @@ async function calculateTimeAggregates(startTime, totalProducts, processedCount
|
||||
}
|
||||
});
|
||||
|
||||
// Initial insert of time-based aggregates
|
||||
await connection.query(`
|
||||
INSERT INTO product_time_aggregates (
|
||||
pid,
|
||||
year,
|
||||
month,
|
||||
total_quantity_sold,
|
||||
total_revenue,
|
||||
total_cost,
|
||||
order_count,
|
||||
stock_received,
|
||||
stock_ordered,
|
||||
avg_price,
|
||||
profit_margin,
|
||||
inventory_value,
|
||||
gmroi
|
||||
// Process in batches
|
||||
let lastPid = 0;
|
||||
while (true) {
|
||||
if (isCancelled) break;
|
||||
|
||||
const [batch] = await connection.query(`
|
||||
SELECT DISTINCT p.pid
|
||||
FROM products p
|
||||
FORCE INDEX (PRIMARY)
|
||||
LEFT JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid
|
||||
WHERE p.pid > ?
|
||||
AND (
|
||||
p.updated > ?
|
||||
OR EXISTS (
|
||||
SELECT 1
|
||||
FROM orders o2 FORCE INDEX (idx_orders_metrics)
|
||||
WHERE o2.pid = p.pid
|
||||
AND o2.updated > ?
|
||||
)
|
||||
WITH monthly_sales AS (
|
||||
)
|
||||
ORDER BY p.pid
|
||||
LIMIT ?
|
||||
`, [lastPid, lastCalculationTime, lastCalculationTime, BATCH_SIZE]);
|
||||
|
||||
if (batch.length === 0) break;
|
||||
|
||||
// Calculate and update time aggregates for this batch using temporary table
|
||||
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_time_aggregates');
|
||||
await connection.query(`
|
||||
CREATE TEMPORARY TABLE temp_time_aggregates (
|
||||
pid BIGINT NOT NULL,
|
||||
year INT NOT NULL,
|
||||
month INT NOT NULL,
|
||||
total_quantity_sold INT DEFAULT 0,
|
||||
total_revenue DECIMAL(10,3) DEFAULT 0,
|
||||
total_cost DECIMAL(10,3) DEFAULT 0,
|
||||
order_count INT DEFAULT 0,
|
||||
stock_received INT DEFAULT 0,
|
||||
stock_ordered INT DEFAULT 0,
|
||||
avg_price DECIMAL(10,3),
|
||||
profit_margin DECIMAL(10,3),
|
||||
inventory_value DECIMAL(10,3),
|
||||
gmroi DECIMAL(10,3),
|
||||
PRIMARY KEY (pid, year, month),
|
||||
INDEX (pid),
|
||||
INDEX (year, month)
|
||||
) ENGINE=MEMORY
|
||||
`);
|
||||
|
||||
// Populate temporary table
|
||||
await connection.query(`
|
||||
INSERT INTO temp_time_aggregates
|
||||
SELECT
|
||||
o.pid,
|
||||
p.pid,
|
||||
YEAR(o.date) as year,
|
||||
MONTH(o.date) as month,
|
||||
SUM(o.quantity) as total_quantity_sold,
|
||||
SUM((o.price - COALESCE(o.discount, 0)) * o.quantity) as total_revenue,
|
||||
SUM(COALESCE(p.cost_price, 0) * o.quantity) as total_cost,
|
||||
SUM(o.quantity * o.price) as total_revenue,
|
||||
SUM(o.quantity * p.cost_price) as total_cost,
|
||||
COUNT(DISTINCT o.order_number) as order_count,
|
||||
AVG(o.price - COALESCE(o.discount, 0)) as avg_price,
|
||||
COALESCE(SUM(CASE WHEN po.received_date IS NOT NULL THEN po.received ELSE 0 END), 0) as stock_received,
|
||||
COALESCE(SUM(po.ordered), 0) as stock_ordered,
|
||||
AVG(o.price) as avg_price,
|
||||
CASE
|
||||
WHEN SUM((o.price - COALESCE(o.discount, 0)) * o.quantity) > 0
|
||||
THEN ((SUM((o.price - COALESCE(o.discount, 0)) * o.quantity) - SUM(COALESCE(p.cost_price, 0) * o.quantity))
|
||||
/ SUM((o.price - COALESCE(o.discount, 0)) * o.quantity)) * 100
|
||||
WHEN SUM(o.quantity * o.price) > 0
|
||||
THEN ((SUM(o.quantity * o.price) - SUM(o.quantity * p.cost_price)) / SUM(o.quantity * o.price)) * 100
|
||||
ELSE 0
|
||||
END as profit_margin,
|
||||
p.cost_price * p.stock_quantity as inventory_value,
|
||||
COUNT(DISTINCT DATE(o.date)) as active_days
|
||||
FROM orders o
|
||||
JOIN products p ON o.pid = p.pid
|
||||
WHERE o.canceled = false
|
||||
GROUP BY o.pid, YEAR(o.date), MONTH(o.date)
|
||||
),
|
||||
monthly_stock AS (
|
||||
SELECT
|
||||
pid,
|
||||
YEAR(date) as year,
|
||||
MONTH(date) as month,
|
||||
SUM(received) as stock_received,
|
||||
SUM(ordered) as stock_ordered
|
||||
FROM purchase_orders
|
||||
GROUP BY pid, YEAR(date), MONTH(date)
|
||||
)
|
||||
SELECT
|
||||
s.pid,
|
||||
s.year,
|
||||
s.month,
|
||||
s.total_quantity_sold,
|
||||
s.total_revenue,
|
||||
s.total_cost,
|
||||
s.order_count,
|
||||
COALESCE(ms.stock_received, 0) as stock_received,
|
||||
COALESCE(ms.stock_ordered, 0) as stock_ordered,
|
||||
s.avg_price,
|
||||
s.profit_margin,
|
||||
s.inventory_value,
|
||||
CASE
|
||||
WHEN s.inventory_value > 0 THEN
|
||||
(s.total_revenue - s.total_cost) / s.inventory_value
|
||||
WHEN p.cost_price * p.stock_quantity > 0
|
||||
THEN (SUM(o.quantity * (o.price - p.cost_price))) / (p.cost_price * p.stock_quantity)
|
||||
ELSE 0
|
||||
END as gmroi
|
||||
FROM monthly_sales s
|
||||
LEFT JOIN monthly_stock ms
|
||||
ON s.pid = ms.pid
|
||||
AND s.year = ms.year
|
||||
AND s.month = ms.month
|
||||
UNION
|
||||
FROM products p
|
||||
FORCE INDEX (PRIMARY)
|
||||
INNER JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid
|
||||
AND o.canceled = false
|
||||
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH)
|
||||
LEFT JOIN purchase_orders po FORCE INDEX (idx_po_metrics) ON p.pid = po.pid
|
||||
AND po.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH)
|
||||
WHERE p.pid IN (?)
|
||||
GROUP BY p.pid, YEAR(o.date), MONTH(o.date)
|
||||
HAVING year IS NOT NULL AND month IS NOT NULL
|
||||
`, [batch.map(row => row.pid)]);
|
||||
|
||||
// Update from temporary table
|
||||
await connection.query(`
|
||||
INSERT INTO product_time_aggregates (
|
||||
pid, year, month,
|
||||
total_quantity_sold, total_revenue, total_cost,
|
||||
order_count, stock_received, stock_ordered,
|
||||
avg_price, profit_margin, inventory_value, gmroi
|
||||
)
|
||||
SELECT
|
||||
p.pid,
|
||||
p.year,
|
||||
p.month,
|
||||
0 as total_quantity_sold,
|
||||
0 as total_revenue,
|
||||
0 as total_cost,
|
||||
0 as order_count,
|
||||
p.stock_received,
|
||||
p.stock_ordered,
|
||||
0 as avg_price,
|
||||
0 as profit_margin,
|
||||
(SELECT cost_price * stock_quantity FROM products WHERE pid = p.pid) as inventory_value,
|
||||
0 as gmroi
|
||||
FROM monthly_stock p
|
||||
LEFT JOIN monthly_sales s
|
||||
ON p.pid = s.pid
|
||||
AND p.year = s.year
|
||||
AND p.month = s.month
|
||||
WHERE s.pid IS NULL
|
||||
pid, year, month,
|
||||
total_quantity_sold, total_revenue, total_cost,
|
||||
order_count, stock_received, stock_ordered,
|
||||
avg_price, profit_margin, inventory_value, gmroi
|
||||
FROM temp_time_aggregates
|
||||
ON DUPLICATE KEY UPDATE
|
||||
total_quantity_sold = VALUES(total_quantity_sold),
|
||||
total_revenue = VALUES(total_revenue),
|
||||
@@ -162,10 +190,14 @@ async function calculateTimeAggregates(startTime, totalProducts, processedCount
|
||||
gmroi = VALUES(gmroi)
|
||||
`);
|
||||
|
||||
processedCount = Math.floor(totalProducts * 0.60);
|
||||
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_time_aggregates');
|
||||
|
||||
lastPid = batch[batch.length - 1].pid;
|
||||
processedCount += batch.length;
|
||||
|
||||
outputProgress({
|
||||
status: 'running',
|
||||
operation: 'Base time aggregates calculated, updating financial metrics',
|
||||
operation: 'Processing time aggregates batch',
|
||||
current: processedCount,
|
||||
total: totalProducts,
|
||||
elapsed: formatElapsedTime(startTime),
|
||||
@@ -178,57 +210,7 @@ async function calculateTimeAggregates(startTime, totalProducts, processedCount
|
||||
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
||||
}
|
||||
});
|
||||
|
||||
if (isCancelled) return {
|
||||
processedProducts: processedCount,
|
||||
processedOrders,
|
||||
processedPurchaseOrders: 0,
|
||||
success
|
||||
};
|
||||
|
||||
// Update with financial metrics
|
||||
await connection.query(`
|
||||
UPDATE product_time_aggregates pta
|
||||
JOIN (
|
||||
SELECT
|
||||
p.pid,
|
||||
YEAR(o.date) as year,
|
||||
MONTH(o.date) as month,
|
||||
p.cost_price * p.stock_quantity as inventory_value,
|
||||
SUM(o.quantity * (o.price - p.cost_price)) as gross_profit,
|
||||
COUNT(DISTINCT DATE(o.date)) as days_in_period
|
||||
FROM products p
|
||||
LEFT JOIN orders o ON p.pid = o.pid
|
||||
WHERE o.canceled = false
|
||||
GROUP BY p.pid, YEAR(o.date), MONTH(o.date)
|
||||
) fin ON pta.pid = fin.pid
|
||||
AND pta.year = fin.year
|
||||
AND pta.month = fin.month
|
||||
SET
|
||||
pta.inventory_value = COALESCE(fin.inventory_value, 0),
|
||||
pta.gmroi = CASE
|
||||
WHEN COALESCE(fin.inventory_value, 0) > 0 AND fin.days_in_period > 0 THEN
|
||||
(COALESCE(fin.gross_profit, 0) * (365.0 / fin.days_in_period)) / COALESCE(fin.inventory_value, 0)
|
||||
ELSE 0
|
||||
END
|
||||
`);
|
||||
|
||||
processedCount = Math.floor(totalProducts * 0.65);
|
||||
outputProgress({
|
||||
status: 'running',
|
||||
operation: 'Financial metrics updated',
|
||||
current: processedCount,
|
||||
total: totalProducts,
|
||||
elapsed: formatElapsedTime(startTime),
|
||||
remaining: estimateRemaining(startTime, processedCount, totalProducts),
|
||||
rate: calculateRate(startTime, processedCount),
|
||||
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
||||
timing: {
|
||||
start_time: new Date(startTime).toISOString(),
|
||||
end_time: new Date().toISOString(),
|
||||
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
||||
}
|
||||
});
|
||||
|
||||
// If we get here, everything completed successfully
|
||||
success = true;
|
||||
@@ -242,7 +224,7 @@ async function calculateTimeAggregates(startTime, totalProducts, processedCount
|
||||
|
||||
return {
|
||||
processedProducts: processedCount,
|
||||
processedOrders,
|
||||
processedOrders: 0,
|
||||
processedPurchaseOrders: 0,
|
||||
success
|
||||
};
|
||||
|
||||
@@ -4,20 +4,57 @@ const { getConnection } = require('./utils/db');
|
||||
async function calculateVendorMetrics(startTime, totalProducts, processedCount = 0, isCancelled = false) {
|
||||
const connection = await getConnection();
|
||||
let success = false;
|
||||
let processedOrders = 0;
|
||||
let processedPurchaseOrders = 0;
|
||||
const BATCH_SIZE = 5000;
|
||||
|
||||
try {
|
||||
// Get last calculation timestamp
|
||||
const [lastCalc] = await connection.query(`
|
||||
SELECT last_calculation_timestamp
|
||||
FROM calculate_status
|
||||
WHERE module_name = 'vendor_metrics'
|
||||
`);
|
||||
const lastCalculationTime = lastCalc[0]?.last_calculation_timestamp || '1970-01-01';
|
||||
|
||||
// Get total count of vendors needing updates using EXISTS for better performance
|
||||
const [vendorCount] = await connection.query(`
|
||||
SELECT COUNT(DISTINCT v.vendor) as count
|
||||
FROM vendor_details v
|
||||
WHERE v.status = 'active'
|
||||
AND (
|
||||
EXISTS (
|
||||
SELECT 1 FROM products p
|
||||
WHERE p.vendor = v.vendor
|
||||
AND p.updated > ?
|
||||
)
|
||||
OR EXISTS (
|
||||
SELECT 1 FROM purchase_orders po
|
||||
WHERE po.vendor = v.vendor
|
||||
AND po.updated > ?
|
||||
)
|
||||
)
|
||||
`, [lastCalculationTime, lastCalculationTime]);
|
||||
const totalVendors = vendorCount[0].count;
|
||||
|
||||
if (totalVendors === 0) {
|
||||
console.log('No vendors need metric updates');
|
||||
return {
|
||||
processedProducts: 0,
|
||||
processedOrders: 0,
|
||||
processedPurchaseOrders: 0,
|
||||
success: true
|
||||
};
|
||||
}
|
||||
|
||||
if (isCancelled) {
|
||||
outputProgress({
|
||||
status: 'cancelled',
|
||||
operation: 'Vendor metrics calculation cancelled',
|
||||
current: processedCount,
|
||||
total: totalProducts,
|
||||
total: totalVendors,
|
||||
elapsed: formatElapsedTime(startTime),
|
||||
remaining: null,
|
||||
rate: calculateRate(startTime, processedCount),
|
||||
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
||||
percentage: ((processedCount / totalVendors) * 100).toFixed(1),
|
||||
timing: {
|
||||
start_time: new Date(startTime).toISOString(),
|
||||
end_time: new Date().toISOString(),
|
||||
@@ -26,37 +63,21 @@ async function calculateVendorMetrics(startTime, totalProducts, processedCount =
|
||||
});
|
||||
return {
|
||||
processedProducts: processedCount,
|
||||
processedOrders,
|
||||
processedPurchaseOrders,
|
||||
processedOrders: 0,
|
||||
processedPurchaseOrders: 0,
|
||||
success
|
||||
};
|
||||
}
|
||||
|
||||
// Get counts of records that will be processed
|
||||
const [[orderCount], [poCount]] = await Promise.all([
|
||||
connection.query(`
|
||||
SELECT COUNT(*) as count
|
||||
FROM orders o
|
||||
WHERE o.canceled = false
|
||||
`),
|
||||
connection.query(`
|
||||
SELECT COUNT(*) as count
|
||||
FROM purchase_orders po
|
||||
WHERE po.status != 0
|
||||
`)
|
||||
]);
|
||||
processedOrders = orderCount.count;
|
||||
processedPurchaseOrders = poCount.count;
|
||||
|
||||
outputProgress({
|
||||
status: 'running',
|
||||
operation: 'Starting vendor metrics calculation',
|
||||
current: processedCount,
|
||||
total: totalProducts,
|
||||
total: totalVendors,
|
||||
elapsed: formatElapsedTime(startTime),
|
||||
remaining: estimateRemaining(startTime, processedCount, totalProducts),
|
||||
remaining: estimateRemaining(startTime, processedCount, totalVendors),
|
||||
rate: calculateRate(startTime, processedCount),
|
||||
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
||||
percentage: ((processedCount / totalVendors) * 100).toFixed(1),
|
||||
timing: {
|
||||
start_time: new Date(startTime).toISOString(),
|
||||
end_time: new Date().toISOString(),
|
||||
@@ -64,278 +85,192 @@ async function calculateVendorMetrics(startTime, totalProducts, processedCount =
|
||||
}
|
||||
});
|
||||
|
||||
// First ensure all vendors exist in vendor_details
|
||||
// Process in batches
|
||||
let lastVendor = '';
|
||||
while (true) {
|
||||
if (isCancelled) break;
|
||||
|
||||
// Get batch of vendors using EXISTS for better performance
|
||||
const [batch] = await connection.query(`
|
||||
SELECT DISTINCT v.vendor
|
||||
FROM vendor_details v
|
||||
WHERE v.status = 'active'
|
||||
AND v.vendor > ?
|
||||
AND (
|
||||
EXISTS (
|
||||
SELECT 1
|
||||
FROM products p
|
||||
WHERE p.vendor = v.vendor
|
||||
AND p.updated > ?
|
||||
)
|
||||
OR EXISTS (
|
||||
SELECT 1
|
||||
FROM purchase_orders po
|
||||
WHERE po.vendor = v.vendor
|
||||
AND po.updated > ?
|
||||
)
|
||||
)
|
||||
ORDER BY v.vendor
|
||||
LIMIT ?
|
||||
`, [lastVendor, lastCalculationTime, lastCalculationTime, BATCH_SIZE]);
|
||||
|
||||
if (batch.length === 0) break;
|
||||
|
||||
// Create temporary tables with optimized structure and indexes
|
||||
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_purchase_stats');
|
||||
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_product_stats');
|
||||
|
||||
await connection.query(`
|
||||
INSERT IGNORE INTO vendor_details (vendor, status, created_at, updated_at)
|
||||
SELECT DISTINCT
|
||||
vendor,
|
||||
'active' as status,
|
||||
NOW() as created_at,
|
||||
NOW() as updated_at
|
||||
FROM products
|
||||
WHERE vendor IS NOT NULL
|
||||
CREATE TEMPORARY TABLE temp_purchase_stats (
|
||||
vendor VARCHAR(100) NOT NULL,
|
||||
avg_lead_time_days DECIMAL(10,2),
|
||||
total_orders INT,
|
||||
total_late_orders INT,
|
||||
total_purchase_value DECIMAL(15,2),
|
||||
avg_order_value DECIMAL(15,2),
|
||||
on_time_delivery_rate DECIMAL(5,2),
|
||||
order_fill_rate DECIMAL(5,2),
|
||||
PRIMARY KEY (vendor),
|
||||
INDEX (total_orders),
|
||||
INDEX (total_purchase_value)
|
||||
) ENGINE=MEMORY
|
||||
`);
|
||||
|
||||
processedCount = Math.floor(totalProducts * 0.8);
|
||||
outputProgress({
|
||||
status: 'running',
|
||||
operation: 'Vendor details updated, calculating metrics',
|
||||
current: processedCount,
|
||||
total: totalProducts,
|
||||
elapsed: formatElapsedTime(startTime),
|
||||
remaining: estimateRemaining(startTime, processedCount, totalProducts),
|
||||
rate: calculateRate(startTime, processedCount),
|
||||
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
||||
timing: {
|
||||
start_time: new Date(startTime).toISOString(),
|
||||
end_time: new Date().toISOString(),
|
||||
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
||||
}
|
||||
});
|
||||
await connection.query(`
|
||||
CREATE TEMPORARY TABLE temp_product_stats (
|
||||
vendor VARCHAR(100) NOT NULL,
|
||||
total_products INT,
|
||||
active_products INT,
|
||||
avg_margin_percent DECIMAL(5,2),
|
||||
total_revenue DECIMAL(15,2),
|
||||
PRIMARY KEY (vendor),
|
||||
INDEX (total_products),
|
||||
INDEX (total_revenue)
|
||||
) ENGINE=MEMORY
|
||||
`);
|
||||
|
||||
if (isCancelled) return {
|
||||
processedProducts: processedCount,
|
||||
processedOrders,
|
||||
processedPurchaseOrders,
|
||||
success
|
||||
};
|
||||
// Populate purchase_stats temp table with optimized index usage
|
||||
await connection.query(`
|
||||
INSERT INTO temp_purchase_stats
|
||||
SELECT
|
||||
po.vendor,
|
||||
AVG(DATEDIFF(po.received_date, po.date)) as avg_lead_time_days,
|
||||
COUNT(DISTINCT po.po_id) as total_orders,
|
||||
COUNT(CASE WHEN DATEDIFF(po.received_date, po.date) > 30 THEN 1 END) as total_late_orders,
|
||||
SUM(po.ordered * po.po_cost_price) as total_purchase_value,
|
||||
AVG(po.ordered * po.po_cost_price) as avg_order_value,
|
||||
(COUNT(CASE WHEN DATEDIFF(po.received_date, po.date) <= 30 THEN 1 END) / COUNT(*)) * 100 as on_time_delivery_rate,
|
||||
(SUM(LEAST(po.received, po.ordered)) / NULLIF(SUM(po.ordered), 0)) * 100 as order_fill_rate
|
||||
FROM purchase_orders po
|
||||
FORCE INDEX (idx_vendor)
|
||||
WHERE po.vendor IN (?)
|
||||
AND po.received_date IS NOT NULL
|
||||
AND po.date >= DATE_SUB(CURRENT_DATE, INTERVAL 365 DAY)
|
||||
AND po.updated > ?
|
||||
GROUP BY po.vendor
|
||||
`, [batch.map(row => row.vendor), lastCalculationTime]);
|
||||
|
||||
// Now calculate vendor metrics
|
||||
// Populate product stats with optimized index usage
|
||||
await connection.query(`
|
||||
INSERT INTO temp_product_stats
|
||||
SELECT
|
||||
p.vendor,
|
||||
COUNT(DISTINCT p.pid) as product_count,
|
||||
COUNT(DISTINCT CASE WHEN p.visible = true THEN p.pid END) as active_products,
|
||||
AVG(pm.avg_margin_percent) as avg_margin,
|
||||
SUM(pm.total_revenue) as total_revenue
|
||||
FROM products p
|
||||
FORCE INDEX (idx_vendor)
|
||||
LEFT JOIN product_metrics pm FORCE INDEX (PRIMARY) ON p.pid = pm.pid
|
||||
WHERE p.vendor IN (?)
|
||||
AND (
|
||||
p.updated > ?
|
||||
OR EXISTS (
|
||||
SELECT 1 FROM orders o FORCE INDEX (idx_orders_metrics)
|
||||
WHERE o.pid = p.pid
|
||||
AND o.updated > ?
|
||||
)
|
||||
)
|
||||
GROUP BY p.vendor
|
||||
`, [batch.map(row => row.vendor), lastCalculationTime, lastCalculationTime]);
|
||||
|
||||
// Update metrics using temp tables with optimized join order
|
||||
await connection.query(`
|
||||
INSERT INTO vendor_metrics (
|
||||
vendor,
|
||||
total_revenue,
|
||||
total_orders,
|
||||
total_late_orders,
|
||||
avg_lead_time_days,
|
||||
on_time_delivery_rate,
|
||||
order_fill_rate,
|
||||
total_orders,
|
||||
total_late_orders,
|
||||
total_purchase_value,
|
||||
avg_order_value,
|
||||
active_products,
|
||||
total_products,
|
||||
total_purchase_value,
|
||||
total_revenue,
|
||||
avg_margin_percent,
|
||||
status,
|
||||
last_calculated_at
|
||||
)
|
||||
WITH vendor_sales AS (
|
||||
SELECT
|
||||
p.vendor,
|
||||
SUM(o.quantity * o.price) as total_revenue,
|
||||
COUNT(DISTINCT o.id) as total_orders,
|
||||
COUNT(DISTINCT p.pid) as active_products,
|
||||
SUM(o.quantity * (o.price - p.cost_price)) as total_margin
|
||||
FROM products p
|
||||
JOIN orders o ON p.pid = o.pid
|
||||
WHERE o.canceled = false
|
||||
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH)
|
||||
GROUP BY p.vendor
|
||||
),
|
||||
vendor_po AS (
|
||||
SELECT
|
||||
p.vendor,
|
||||
COUNT(DISTINCT CASE WHEN po.receiving_status = 40 THEN po.id END) as received_orders,
|
||||
COUNT(DISTINCT po.id) as total_orders,
|
||||
AVG(CASE
|
||||
WHEN po.receiving_status = 40
|
||||
THEN DATEDIFF(po.received_date, po.date)
|
||||
END) as avg_lead_time_days,
|
||||
SUM(po.ordered * po.po_cost_price) as total_purchase_value
|
||||
FROM products p
|
||||
JOIN purchase_orders po ON p.pid = po.pid
|
||||
WHERE po.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH)
|
||||
GROUP BY p.vendor
|
||||
),
|
||||
vendor_products AS (
|
||||
SELECT
|
||||
vendor,
|
||||
COUNT(DISTINCT pid) as total_products
|
||||
FROM products
|
||||
GROUP BY vendor
|
||||
)
|
||||
SELECT
|
||||
vs.vendor,
|
||||
COALESCE(vs.total_revenue, 0) as total_revenue,
|
||||
COALESCE(vp.total_orders, 0) as total_orders,
|
||||
COALESCE(vp.total_orders - vp.received_orders, 0) as total_late_orders,
|
||||
COALESCE(vp.avg_lead_time_days, 0) as avg_lead_time_days,
|
||||
CASE
|
||||
WHEN vp.total_orders > 0
|
||||
THEN (vp.received_orders / vp.total_orders) * 100
|
||||
ELSE 0
|
||||
END as on_time_delivery_rate,
|
||||
CASE
|
||||
WHEN vp.total_orders > 0
|
||||
THEN (vp.received_orders / vp.total_orders) * 100
|
||||
ELSE 0
|
||||
END as order_fill_rate,
|
||||
CASE
|
||||
WHEN vs.total_orders > 0
|
||||
THEN vs.total_revenue / vs.total_orders
|
||||
ELSE 0
|
||||
END as avg_order_value,
|
||||
COALESCE(vs.active_products, 0) as active_products,
|
||||
COALESCE(vpr.total_products, 0) as total_products,
|
||||
COALESCE(vp.total_purchase_value, 0) as total_purchase_value,
|
||||
CASE
|
||||
WHEN vs.total_revenue > 0
|
||||
THEN (vs.total_margin / vs.total_revenue) * 100
|
||||
ELSE 0
|
||||
END as avg_margin_percent,
|
||||
'active' as status,
|
||||
v.vendor,
|
||||
COALESCE(ps.avg_lead_time_days, 0) as avg_lead_time_days,
|
||||
COALESCE(ps.on_time_delivery_rate, 0) as on_time_delivery_rate,
|
||||
COALESCE(ps.order_fill_rate, 0) as order_fill_rate,
|
||||
COALESCE(ps.total_orders, 0) as total_orders,
|
||||
COALESCE(ps.total_late_orders, 0) as total_late_orders,
|
||||
COALESCE(ps.total_purchase_value, 0) as total_purchase_value,
|
||||
COALESCE(ps.avg_order_value, 0) as avg_order_value,
|
||||
COALESCE(prs.active_products, 0) as active_products,
|
||||
COALESCE(prs.total_products, 0) as total_products,
|
||||
COALESCE(prs.total_revenue, 0) as total_revenue,
|
||||
COALESCE(prs.avg_margin_percent, 0) as avg_margin_percent,
|
||||
v.status,
|
||||
NOW() as last_calculated_at
|
||||
FROM vendor_sales vs
|
||||
LEFT JOIN vendor_po vp ON vs.vendor = vp.vendor
|
||||
LEFT JOIN vendor_products vpr ON vs.vendor = vpr.vendor
|
||||
WHERE vs.vendor IS NOT NULL
|
||||
FROM vendor_details v
|
||||
FORCE INDEX (PRIMARY)
|
||||
LEFT JOIN temp_purchase_stats ps ON v.vendor = ps.vendor
|
||||
LEFT JOIN temp_product_stats prs ON v.vendor = prs.vendor
|
||||
WHERE v.vendor IN (?)
|
||||
ON DUPLICATE KEY UPDATE
|
||||
total_revenue = VALUES(total_revenue),
|
||||
total_orders = VALUES(total_orders),
|
||||
total_late_orders = VALUES(total_late_orders),
|
||||
avg_lead_time_days = VALUES(avg_lead_time_days),
|
||||
on_time_delivery_rate = VALUES(on_time_delivery_rate),
|
||||
order_fill_rate = VALUES(order_fill_rate),
|
||||
total_orders = VALUES(total_orders),
|
||||
total_late_orders = VALUES(total_late_orders),
|
||||
total_purchase_value = VALUES(total_purchase_value),
|
||||
avg_order_value = VALUES(avg_order_value),
|
||||
active_products = VALUES(active_products),
|
||||
total_products = VALUES(total_products),
|
||||
total_purchase_value = VALUES(total_purchase_value),
|
||||
total_revenue = VALUES(total_revenue),
|
||||
avg_margin_percent = VALUES(avg_margin_percent),
|
||||
status = VALUES(status),
|
||||
last_calculated_at = VALUES(last_calculated_at)
|
||||
`);
|
||||
last_calculated_at = NOW()
|
||||
`, [batch.map(row => row.vendor)]);
|
||||
|
||||
// Clean up temp tables
|
||||
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_purchase_stats');
|
||||
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_product_stats');
|
||||
|
||||
lastVendor = batch[batch.length - 1].vendor;
|
||||
processedCount += batch.length;
|
||||
|
||||
processedCount = Math.floor(totalProducts * 0.9);
|
||||
outputProgress({
|
||||
status: 'running',
|
||||
operation: 'Vendor metrics calculated, updating time-based metrics',
|
||||
operation: 'Processing vendor metrics batch',
|
||||
current: processedCount,
|
||||
total: totalProducts,
|
||||
total: totalVendors,
|
||||
elapsed: formatElapsedTime(startTime),
|
||||
remaining: estimateRemaining(startTime, processedCount, totalProducts),
|
||||
remaining: estimateRemaining(startTime, processedCount, totalVendors),
|
||||
rate: calculateRate(startTime, processedCount),
|
||||
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
||||
percentage: ((processedCount / totalVendors) * 100).toFixed(1),
|
||||
timing: {
|
||||
start_time: new Date(startTime).toISOString(),
|
||||
end_time: new Date().toISOString(),
|
||||
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
||||
}
|
||||
});
|
||||
|
||||
if (isCancelled) return {
|
||||
processedProducts: processedCount,
|
||||
processedOrders,
|
||||
processedPurchaseOrders,
|
||||
success
|
||||
};
|
||||
|
||||
// Calculate time-based metrics
|
||||
await connection.query(`
|
||||
INSERT INTO vendor_time_metrics (
|
||||
vendor,
|
||||
year,
|
||||
month,
|
||||
total_orders,
|
||||
late_orders,
|
||||
avg_lead_time_days,
|
||||
total_purchase_value,
|
||||
total_revenue,
|
||||
avg_margin_percent
|
||||
)
|
||||
WITH monthly_orders AS (
|
||||
SELECT
|
||||
p.vendor,
|
||||
YEAR(o.date) as year,
|
||||
MONTH(o.date) as month,
|
||||
COUNT(DISTINCT o.id) as total_orders,
|
||||
SUM(o.quantity * o.price) as total_revenue,
|
||||
SUM(o.quantity * (o.price - p.cost_price)) as total_margin
|
||||
FROM products p
|
||||
JOIN orders o ON p.pid = o.pid
|
||||
WHERE o.canceled = false
|
||||
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH)
|
||||
AND p.vendor IS NOT NULL
|
||||
GROUP BY p.vendor, YEAR(o.date), MONTH(o.date)
|
||||
),
|
||||
monthly_po AS (
|
||||
SELECT
|
||||
p.vendor,
|
||||
YEAR(po.date) as year,
|
||||
MONTH(po.date) as month,
|
||||
COUNT(DISTINCT po.id) as total_po,
|
||||
COUNT(DISTINCT CASE
|
||||
WHEN po.receiving_status = 40 AND po.received_date > po.expected_date
|
||||
THEN po.id
|
||||
END) as late_orders,
|
||||
AVG(CASE
|
||||
WHEN po.receiving_status = 40
|
||||
THEN DATEDIFF(po.received_date, po.date)
|
||||
END) as avg_lead_time_days,
|
||||
SUM(po.ordered * po.po_cost_price) as total_purchase_value
|
||||
FROM products p
|
||||
JOIN purchase_orders po ON p.pid = po.pid
|
||||
WHERE po.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH)
|
||||
AND p.vendor IS NOT NULL
|
||||
GROUP BY p.vendor, YEAR(po.date), MONTH(po.date)
|
||||
)
|
||||
SELECT
|
||||
mo.vendor,
|
||||
mo.year,
|
||||
mo.month,
|
||||
COALESCE(mp.total_po, 0) as total_orders,
|
||||
COALESCE(mp.late_orders, 0) as late_orders,
|
||||
COALESCE(mp.avg_lead_time_days, 0) as avg_lead_time_days,
|
||||
COALESCE(mp.total_purchase_value, 0) as total_purchase_value,
|
||||
mo.total_revenue,
|
||||
CASE
|
||||
WHEN mo.total_revenue > 0
|
||||
THEN (mo.total_margin / mo.total_revenue) * 100
|
||||
ELSE 0
|
||||
END as avg_margin_percent
|
||||
FROM monthly_orders mo
|
||||
LEFT JOIN monthly_po mp ON mo.vendor = mp.vendor
|
||||
AND mo.year = mp.year
|
||||
AND mo.month = mp.month
|
||||
UNION
|
||||
SELECT
|
||||
mp.vendor,
|
||||
mp.year,
|
||||
mp.month,
|
||||
mp.total_po as total_orders,
|
||||
mp.late_orders,
|
||||
mp.avg_lead_time_days,
|
||||
mp.total_purchase_value,
|
||||
0 as total_revenue,
|
||||
0 as avg_margin_percent
|
||||
FROM monthly_po mp
|
||||
LEFT JOIN monthly_orders mo ON mp.vendor = mo.vendor
|
||||
AND mp.year = mo.year
|
||||
AND mp.month = mo.month
|
||||
WHERE mo.vendor IS NULL
|
||||
ON DUPLICATE KEY UPDATE
|
||||
total_orders = VALUES(total_orders),
|
||||
late_orders = VALUES(late_orders),
|
||||
avg_lead_time_days = VALUES(avg_lead_time_days),
|
||||
total_purchase_value = VALUES(total_purchase_value),
|
||||
total_revenue = VALUES(total_revenue),
|
||||
avg_margin_percent = VALUES(avg_margin_percent)
|
||||
`);
|
||||
|
||||
processedCount = Math.floor(totalProducts * 0.95);
|
||||
outputProgress({
|
||||
status: 'running',
|
||||
operation: 'Time-based vendor metrics calculated',
|
||||
current: processedCount,
|
||||
total: totalProducts,
|
||||
elapsed: formatElapsedTime(startTime),
|
||||
remaining: estimateRemaining(startTime, processedCount, totalProducts),
|
||||
rate: calculateRate(startTime, processedCount),
|
||||
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
||||
timing: {
|
||||
start_time: new Date(startTime).toISOString(),
|
||||
end_time: new Date().toISOString(),
|
||||
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
||||
}
|
||||
});
|
||||
|
||||
// If we get here, everything completed successfully
|
||||
success = true;
|
||||
@@ -349,8 +284,8 @@ async function calculateVendorMetrics(startTime, totalProducts, processedCount =
|
||||
|
||||
return {
|
||||
processedProducts: processedCount,
|
||||
processedOrders,
|
||||
processedPurchaseOrders,
|
||||
processedOrders: 0,
|
||||
processedPurchaseOrders: 0,
|
||||
success
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user