295 lines
12 KiB
JavaScript
295 lines
12 KiB
JavaScript
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress');
|
|
const { getConnection } = require('./utils/db');
|
|
|
|
async function calculateBrandMetrics(startTime, totalProducts, processedCount = 0, isCancelled = false) {
|
|
const connection = await getConnection();
|
|
let success = false;
|
|
const BATCH_SIZE = 5000;
|
|
let myProcessedProducts = 0; // Not *directly* processing products, tracking brands
|
|
|
|
try {
|
|
// Get last calculation timestamp
|
|
const [lastCalc] = await connection.query(`
|
|
SELECT last_calculation_timestamp
|
|
FROM calculate_status
|
|
WHERE module_name = 'brand_metrics'
|
|
`);
|
|
const lastCalculationTime = lastCalc[0]?.last_calculation_timestamp || '1970-01-01';
|
|
|
|
// Get total count of brands needing updates
|
|
const [brandCount] = await connection.query(`
|
|
SELECT COUNT(DISTINCT p.brand) as count
|
|
FROM products p
|
|
LEFT JOIN orders o ON p.pid = o.pid AND o.updated > ?
|
|
WHERE p.brand IS NOT NULL
|
|
AND (
|
|
p.updated > ?
|
|
OR o.id IS NOT NULL
|
|
)
|
|
`, [lastCalculationTime, lastCalculationTime]);
|
|
const totalBrands = brandCount[0].count; // Track total *brands*
|
|
|
|
if (totalBrands === 0) {
|
|
console.log('No brands need metric updates');
|
|
return {
|
|
processedProducts: 0, // Not directly processing products
|
|
processedOrders: 0,
|
|
processedPurchaseOrders: 0,
|
|
success: true
|
|
};
|
|
}
|
|
|
|
if (isCancelled) {
|
|
outputProgress({
|
|
status: 'cancelled',
|
|
operation: 'Brand metrics calculation cancelled',
|
|
current: processedCount, // Use passed-in value
|
|
total: totalBrands, // Report total *brands*
|
|
elapsed: formatElapsedTime(startTime),
|
|
remaining: null,
|
|
rate: calculateRate(startTime, processedCount),
|
|
percentage: ((processedCount / totalBrands) * 100).toFixed(1), // Base on brands
|
|
timing: {
|
|
start_time: new Date(startTime).toISOString(),
|
|
end_time: new Date().toISOString(),
|
|
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
|
}
|
|
});
|
|
return {
|
|
processedProducts: 0, // Not directly processing products
|
|
processedOrders: 0,
|
|
processedPurchaseOrders: 0,
|
|
success
|
|
};
|
|
}
|
|
|
|
outputProgress({
|
|
status: 'running',
|
|
operation: 'Starting brand metrics calculation',
|
|
current: processedCount, // Use passed-in value
|
|
total: totalBrands, // Report total *brands*
|
|
elapsed: formatElapsedTime(startTime),
|
|
remaining: estimateRemaining(startTime, processedCount, totalBrands),
|
|
rate: calculateRate(startTime, processedCount),
|
|
percentage: ((processedCount / totalBrands) * 100).toFixed(1), // Base on brands
|
|
timing: {
|
|
start_time: new Date(startTime).toISOString(),
|
|
end_time: new Date().toISOString(),
|
|
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
|
}
|
|
});
|
|
|
|
// Process in batches
|
|
let lastBrand = '';
|
|
let processedBrands = 0; // Track processed brands
|
|
while (true) {
|
|
if (isCancelled) break;
|
|
|
|
const [batch] = await connection.query(`
|
|
SELECT DISTINCT p.brand
|
|
FROM products p
|
|
FORCE INDEX (idx_brand)
|
|
LEFT JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid AND o.updated > ?
|
|
WHERE p.brand IS NOT NULL
|
|
AND p.brand > ?
|
|
AND (
|
|
p.updated > ?
|
|
OR o.id IS NOT NULL
|
|
)
|
|
ORDER BY p.brand
|
|
LIMIT ?
|
|
`, [lastCalculationTime, lastBrand, lastCalculationTime, BATCH_SIZE]);
|
|
|
|
if (batch.length === 0) break;
|
|
|
|
// Create temporary tables for better performance
|
|
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_product_stats');
|
|
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_sales_stats');
|
|
|
|
await connection.query(`
|
|
CREATE TEMPORARY TABLE temp_product_stats (
|
|
brand VARCHAR(100) NOT NULL,
|
|
product_count INT,
|
|
active_products INT,
|
|
total_stock_units INT,
|
|
total_stock_cost DECIMAL(15,2),
|
|
total_stock_retail DECIMAL(15,2),
|
|
total_revenue DECIMAL(15,2),
|
|
avg_margin DECIMAL(5,2),
|
|
PRIMARY KEY (brand),
|
|
INDEX (total_revenue),
|
|
INDEX (product_count)
|
|
) ENGINE=MEMORY
|
|
`);
|
|
|
|
await connection.query(`
|
|
CREATE TEMPORARY TABLE temp_sales_stats (
|
|
brand VARCHAR(100) NOT NULL,
|
|
current_period_sales DECIMAL(15,2),
|
|
previous_period_sales DECIMAL(15,2),
|
|
PRIMARY KEY (brand),
|
|
INDEX (current_period_sales),
|
|
INDEX (previous_period_sales)
|
|
) ENGINE=MEMORY
|
|
`);
|
|
|
|
// Populate product stats with optimized index usage
|
|
await connection.query(`
|
|
INSERT INTO temp_product_stats
|
|
SELECT
|
|
p.brand,
|
|
COUNT(DISTINCT p.pid) as product_count,
|
|
COUNT(DISTINCT CASE WHEN p.visible = true THEN p.pid END) as active_products,
|
|
COALESCE(SUM(p.stock_quantity), 0) as total_stock_units,
|
|
COALESCE(SUM(p.stock_quantity * p.cost_price), 0) as total_stock_cost,
|
|
COALESCE(SUM(p.stock_quantity * p.price), 0) as total_stock_retail,
|
|
COALESCE(SUM(pm.total_revenue), 0) as total_revenue,
|
|
COALESCE(AVG(NULLIF(pm.avg_margin_percent, 0)), 0) as avg_margin
|
|
FROM products p
|
|
FORCE INDEX (idx_brand)
|
|
LEFT JOIN product_metrics pm FORCE INDEX (PRIMARY) ON p.pid = pm.pid
|
|
WHERE p.brand IN (?)
|
|
AND (
|
|
p.updated > ?
|
|
OR EXISTS (
|
|
SELECT 1 FROM orders o FORCE INDEX (idx_orders_metrics)
|
|
WHERE o.pid = p.pid
|
|
AND o.updated > ?
|
|
)
|
|
)
|
|
GROUP BY p.brand
|
|
`, [batch.map(row => row.brand), lastCalculationTime, lastCalculationTime]);
|
|
|
|
// Populate sales stats with optimized date handling
|
|
await connection.query(`
|
|
INSERT INTO temp_sales_stats
|
|
WITH date_ranges AS (
|
|
SELECT
|
|
DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY) as current_start,
|
|
CURRENT_DATE as current_end,
|
|
DATE_SUB(CURRENT_DATE, INTERVAL 60 DAY) as previous_start,
|
|
DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY) as previous_end
|
|
)
|
|
SELECT
|
|
p.brand,
|
|
COALESCE(SUM(
|
|
CASE WHEN o.date >= dr.current_start
|
|
THEN o.quantity * o.price
|
|
ELSE 0
|
|
END
|
|
), 0) as current_period_sales,
|
|
COALESCE(SUM(
|
|
CASE WHEN o.date >= dr.previous_start AND o.date < dr.current_start
|
|
THEN o.quantity * o.price
|
|
ELSE 0
|
|
END
|
|
), 0) as previous_period_sales
|
|
FROM products p
|
|
FORCE INDEX (idx_brand)
|
|
INNER JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid
|
|
CROSS JOIN date_ranges dr
|
|
WHERE p.brand IN (?)
|
|
AND o.canceled = false
|
|
AND o.date >= dr.previous_start
|
|
AND o.updated > ?
|
|
GROUP BY p.brand
|
|
`, [batch.map(row => row.brand), lastCalculationTime]);
|
|
|
|
// Update metrics using temp tables with optimized calculations
|
|
await connection.query(`
|
|
INSERT INTO brand_metrics (
|
|
brand,
|
|
product_count,
|
|
active_products,
|
|
total_stock_units,
|
|
total_stock_cost,
|
|
total_stock_retail,
|
|
total_revenue,
|
|
avg_margin,
|
|
growth_rate,
|
|
last_calculated_at
|
|
)
|
|
SELECT
|
|
ps.brand,
|
|
ps.product_count,
|
|
ps.active_products,
|
|
ps.total_stock_units,
|
|
ps.total_stock_cost,
|
|
ps.total_stock_retail,
|
|
ps.total_revenue,
|
|
ps.avg_margin,
|
|
CASE
|
|
WHEN COALESCE(ss.previous_period_sales, 0) = 0 AND COALESCE(ss.current_period_sales, 0) > 0 THEN 100
|
|
WHEN COALESCE(ss.previous_period_sales, 0) = 0 THEN 0
|
|
ELSE ROUND(LEAST(999.99, GREATEST(-100,
|
|
((ss.current_period_sales / NULLIF(ss.previous_period_sales, 0)) - 1) * 100
|
|
)), 2)
|
|
END as growth_rate,
|
|
NOW() as last_calculated_at
|
|
FROM temp_product_stats ps
|
|
LEFT JOIN temp_sales_stats ss ON ps.brand = ss.brand
|
|
ON DUPLICATE KEY UPDATE
|
|
product_count = VALUES(product_count),
|
|
active_products = VALUES(active_products),
|
|
total_stock_units = VALUES(total_stock_units),
|
|
total_stock_cost = VALUES(total_stock_cost),
|
|
total_stock_retail = VALUES(total_stock_retail),
|
|
total_revenue = VALUES(total_revenue),
|
|
avg_margin = VALUES(avg_margin),
|
|
growth_rate = VALUES(growth_rate),
|
|
last_calculated_at = NOW()
|
|
`);
|
|
|
|
// Clean up temp tables
|
|
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_product_stats');
|
|
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_sales_stats');
|
|
|
|
lastBrand = batch[batch.length - 1].brand;
|
|
processedBrands += batch.length; // Increment processed *brands*
|
|
|
|
outputProgress({
|
|
status: 'running',
|
|
operation: 'Processing brand metrics batch',
|
|
current: processedCount + processedBrands, // Use cumulative brand count
|
|
total: totalBrands, // Report total *brands*
|
|
elapsed: formatElapsedTime(startTime),
|
|
remaining: estimateRemaining(startTime, processedCount + processedBrands, totalBrands),
|
|
rate: calculateRate(startTime, processedCount + processedBrands),
|
|
percentage: (((processedCount + processedBrands) / totalBrands) * 100).toFixed(1), // Base on brands
|
|
timing: {
|
|
start_time: new Date(startTime).toISOString(),
|
|
end_time: new Date().toISOString(),
|
|
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
|
}
|
|
});
|
|
}
|
|
|
|
// If we get here, everything completed successfully
|
|
success = true;
|
|
|
|
// Update calculate_status
|
|
await connection.query(`
|
|
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
|
|
VALUES ('brand_metrics', NOW())
|
|
ON DUPLICATE KEY UPDATE last_calculation_timestamp = NOW()
|
|
`);
|
|
|
|
return {
|
|
processedProducts: 0, // Not directly processing products
|
|
processedOrders: 0,
|
|
processedPurchaseOrders: 0,
|
|
success
|
|
};
|
|
|
|
} catch (error) {
|
|
success = false;
|
|
logError(error, 'Error calculating brand metrics');
|
|
throw error;
|
|
} finally {
|
|
if (connection) {
|
|
connection.release();
|
|
}
|
|
}
|
|
}
|
|
|
|
module.exports = calculateBrandMetrics;
|