297 lines
12 KiB
JavaScript
297 lines
12 KiB
JavaScript
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress');
|
|
const { getConnection } = require('./utils/db');
|
|
|
|
async function calculateCategoryMetrics(startTime, totalProducts, processedCount = 0, isCancelled = false) {
|
|
const connection = await getConnection();
|
|
let success = false;
|
|
const BATCH_SIZE = 5000;
|
|
|
|
try {
|
|
// Get last calculation timestamp
|
|
const [lastCalc] = await connection.query(`
|
|
SELECT last_calculation_timestamp
|
|
FROM calculate_status
|
|
WHERE module_name = 'category_metrics'
|
|
`);
|
|
const lastCalculationTime = lastCalc[0]?.last_calculation_timestamp || '1970-01-01';
|
|
|
|
// Get total count of categories needing updates
|
|
const [categoryCount] = await connection.query(`
|
|
SELECT COUNT(DISTINCT c.cat_id) as count
|
|
FROM categories c
|
|
JOIN product_categories pc ON c.cat_id = pc.cat_id
|
|
LEFT JOIN products p ON pc.pid = p.pid AND p.updated > ?
|
|
LEFT JOIN orders o ON p.pid = o.pid AND o.updated > ?
|
|
WHERE c.status = 'active'
|
|
AND (
|
|
p.pid IS NOT NULL
|
|
OR o.id IS NOT NULL
|
|
)
|
|
`, [lastCalculationTime, lastCalculationTime]);
|
|
const totalCategories = categoryCount[0].count;
|
|
|
|
if (totalCategories === 0) {
|
|
console.log('No categories need metric updates');
|
|
return {
|
|
processedProducts: 0,
|
|
processedOrders: 0,
|
|
processedPurchaseOrders: 0,
|
|
success: true
|
|
};
|
|
}
|
|
|
|
if (isCancelled) {
|
|
outputProgress({
|
|
status: 'cancelled',
|
|
operation: 'Category metrics calculation cancelled',
|
|
current: processedCount,
|
|
total: totalCategories,
|
|
elapsed: formatElapsedTime(startTime),
|
|
remaining: null,
|
|
rate: calculateRate(startTime, processedCount),
|
|
percentage: ((processedCount / totalCategories) * 100).toFixed(1),
|
|
timing: {
|
|
start_time: new Date(startTime).toISOString(),
|
|
end_time: new Date().toISOString(),
|
|
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
|
}
|
|
});
|
|
return {
|
|
processedProducts: processedCount,
|
|
processedOrders: 0,
|
|
processedPurchaseOrders: 0,
|
|
success
|
|
};
|
|
}
|
|
|
|
outputProgress({
|
|
status: 'running',
|
|
operation: 'Starting category metrics calculation',
|
|
current: processedCount,
|
|
total: totalCategories,
|
|
elapsed: formatElapsedTime(startTime),
|
|
remaining: estimateRemaining(startTime, processedCount, totalCategories),
|
|
rate: calculateRate(startTime, processedCount),
|
|
percentage: ((processedCount / totalCategories) * 100).toFixed(1),
|
|
timing: {
|
|
start_time: new Date(startTime).toISOString(),
|
|
end_time: new Date().toISOString(),
|
|
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
|
}
|
|
});
|
|
|
|
// Process in batches
|
|
let lastCatId = 0;
|
|
while (true) {
|
|
if (isCancelled) break;
|
|
|
|
const [batch] = await connection.query(`
|
|
SELECT DISTINCT c.cat_id
|
|
FROM categories c
|
|
FORCE INDEX (PRIMARY)
|
|
JOIN product_categories pc FORCE INDEX (idx_category) ON c.cat_id = pc.cat_id
|
|
LEFT JOIN products p FORCE INDEX (PRIMARY) ON pc.pid = p.pid AND p.updated > ?
|
|
LEFT JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid AND o.updated > ?
|
|
WHERE c.status = 'active'
|
|
AND c.cat_id > ?
|
|
AND (
|
|
p.pid IS NOT NULL
|
|
OR o.id IS NOT NULL
|
|
)
|
|
ORDER BY c.cat_id
|
|
LIMIT ?
|
|
`, [lastCalculationTime, lastCalculationTime, lastCatId, BATCH_SIZE]);
|
|
|
|
if (batch.length === 0) break;
|
|
|
|
// Create temporary tables for better performance
|
|
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_product_stats');
|
|
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_sales_stats');
|
|
|
|
await connection.query(`
|
|
CREATE TEMPORARY TABLE temp_product_stats (
|
|
cat_id BIGINT NOT NULL,
|
|
product_count INT,
|
|
active_products INT,
|
|
total_value DECIMAL(15,2),
|
|
avg_margin DECIMAL(5,2),
|
|
turnover_rate DECIMAL(10,2),
|
|
PRIMARY KEY (cat_id),
|
|
INDEX (product_count),
|
|
INDEX (total_value)
|
|
) ENGINE=MEMORY
|
|
`);
|
|
|
|
await connection.query(`
|
|
CREATE TEMPORARY TABLE temp_sales_stats (
|
|
cat_id BIGINT NOT NULL,
|
|
recent_revenue DECIMAL(15,2),
|
|
previous_revenue DECIMAL(15,2),
|
|
PRIMARY KEY (cat_id),
|
|
INDEX (recent_revenue),
|
|
INDEX (previous_revenue)
|
|
) ENGINE=MEMORY
|
|
`);
|
|
|
|
// Populate product stats with optimized index usage
|
|
await connection.query(`
|
|
INSERT INTO temp_product_stats
|
|
SELECT
|
|
c.cat_id,
|
|
COUNT(DISTINCT p.pid) as product_count,
|
|
COUNT(DISTINCT CASE WHEN p.visible = true THEN p.pid END) as active_products,
|
|
COALESCE(SUM(p.stock_quantity * p.cost_price), 0) as total_value,
|
|
COALESCE(AVG(NULLIF(pm.avg_margin_percent, 0)), 0) as avg_margin,
|
|
COALESCE(AVG(NULLIF(pm.turnover_rate, 0)), 0) as turnover_rate
|
|
FROM categories c
|
|
FORCE INDEX (PRIMARY)
|
|
INNER JOIN product_categories pc FORCE INDEX (idx_category) ON c.cat_id = pc.cat_id
|
|
LEFT JOIN products p FORCE INDEX (PRIMARY) ON pc.pid = p.pid
|
|
LEFT JOIN product_metrics pm FORCE INDEX (PRIMARY) ON p.pid = pm.pid
|
|
WHERE c.cat_id IN (?)
|
|
AND (
|
|
p.updated > ?
|
|
OR EXISTS (
|
|
SELECT 1 FROM orders o FORCE INDEX (idx_orders_metrics)
|
|
WHERE o.pid = p.pid
|
|
AND o.updated > ?
|
|
)
|
|
)
|
|
GROUP BY c.cat_id
|
|
`, [batch.map(row => row.cat_id), lastCalculationTime, lastCalculationTime]);
|
|
|
|
// Populate sales stats with optimized date handling
|
|
await connection.query(`
|
|
INSERT INTO temp_sales_stats
|
|
WITH date_ranges AS (
|
|
SELECT
|
|
DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY) as current_start,
|
|
CURRENT_DATE as current_end,
|
|
DATE_SUB(CURRENT_DATE, INTERVAL 60 DAY) as previous_start,
|
|
DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY) as previous_end
|
|
)
|
|
SELECT
|
|
c.cat_id,
|
|
COALESCE(SUM(
|
|
CASE WHEN o.date >= dr.current_start
|
|
THEN o.quantity * o.price
|
|
ELSE 0
|
|
END
|
|
), 0) as recent_revenue,
|
|
COALESCE(SUM(
|
|
CASE WHEN o.date >= dr.previous_start AND o.date < dr.current_start
|
|
THEN o.quantity * o.price
|
|
ELSE 0
|
|
END
|
|
), 0) as previous_revenue
|
|
FROM categories c
|
|
FORCE INDEX (PRIMARY)
|
|
INNER JOIN product_categories pc FORCE INDEX (idx_category) ON c.cat_id = pc.cat_id
|
|
INNER JOIN products p FORCE INDEX (PRIMARY) ON pc.pid = p.pid
|
|
INNER JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid
|
|
CROSS JOIN date_ranges dr
|
|
WHERE c.cat_id IN (?)
|
|
AND o.canceled = false
|
|
AND o.date >= dr.previous_start
|
|
AND o.updated > ?
|
|
GROUP BY c.cat_id
|
|
`, [batch.map(row => row.cat_id), lastCalculationTime]);
|
|
|
|
// Update metrics using temp tables with optimized calculations
|
|
await connection.query(`
|
|
INSERT INTO category_metrics (
|
|
category_id,
|
|
product_count,
|
|
active_products,
|
|
total_value,
|
|
avg_margin,
|
|
turnover_rate,
|
|
growth_rate,
|
|
status,
|
|
last_calculated_at
|
|
)
|
|
SELECT
|
|
c.cat_id,
|
|
ps.product_count,
|
|
ps.active_products,
|
|
ps.total_value,
|
|
ps.avg_margin,
|
|
ps.turnover_rate,
|
|
CASE
|
|
WHEN COALESCE(ss.previous_revenue, 0) = 0 AND COALESCE(ss.recent_revenue, 0) > 0 THEN 100
|
|
WHEN COALESCE(ss.previous_revenue, 0) = 0 THEN 0
|
|
ELSE ROUND(LEAST(999.99, GREATEST(-100,
|
|
((ss.recent_revenue / NULLIF(ss.previous_revenue, 0)) - 1) * 100
|
|
)), 2)
|
|
END as growth_rate,
|
|
c.status,
|
|
NOW() as last_calculated_at
|
|
FROM categories c
|
|
FORCE INDEX (PRIMARY)
|
|
LEFT JOIN temp_product_stats ps ON c.cat_id = ps.cat_id
|
|
LEFT JOIN temp_sales_stats ss ON c.cat_id = ss.cat_id
|
|
WHERE c.cat_id IN (?)
|
|
ON DUPLICATE KEY UPDATE
|
|
product_count = VALUES(product_count),
|
|
active_products = VALUES(active_products),
|
|
total_value = VALUES(total_value),
|
|
avg_margin = VALUES(avg_margin),
|
|
turnover_rate = VALUES(turnover_rate),
|
|
growth_rate = VALUES(growth_rate),
|
|
status = VALUES(status),
|
|
last_calculated_at = NOW()
|
|
`, [batch.map(row => row.cat_id)]);
|
|
|
|
// Clean up temp tables
|
|
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_product_stats');
|
|
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_sales_stats');
|
|
|
|
lastCatId = batch[batch.length - 1].cat_id;
|
|
processedCount += batch.length;
|
|
|
|
outputProgress({
|
|
status: 'running',
|
|
operation: 'Processing category metrics batch',
|
|
current: processedCount,
|
|
total: totalCategories,
|
|
elapsed: formatElapsedTime(startTime),
|
|
remaining: estimateRemaining(startTime, processedCount, totalCategories),
|
|
rate: calculateRate(startTime, processedCount),
|
|
percentage: ((processedCount / totalCategories) * 100).toFixed(1),
|
|
timing: {
|
|
start_time: new Date(startTime).toISOString(),
|
|
end_time: new Date().toISOString(),
|
|
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
|
}
|
|
});
|
|
}
|
|
|
|
// If we get here, everything completed successfully
|
|
success = true;
|
|
|
|
// Update calculate_status
|
|
await connection.query(`
|
|
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
|
|
VALUES ('category_metrics', NOW())
|
|
ON DUPLICATE KEY UPDATE last_calculation_timestamp = NOW()
|
|
`);
|
|
|
|
return {
|
|
processedProducts: processedCount,
|
|
processedOrders: 0,
|
|
processedPurchaseOrders: 0,
|
|
success
|
|
};
|
|
|
|
} catch (error) {
|
|
success = false;
|
|
logError(error, 'Error calculating category metrics');
|
|
throw error;
|
|
} finally {
|
|
if (connection) {
|
|
connection.release();
|
|
}
|
|
}
|
|
}
|
|
|
|
module.exports = calculateCategoryMetrics;
|