Optimize and fix calculate scripts
This commit is contained in:
@@ -5,8 +5,15 @@ process.chdir(path.dirname(__filename));
|
||||
|
||||
require('dotenv').config({ path: path.resolve(__dirname, '..', '.env') });
|
||||
|
||||
// Set to 1 to skip product metrics and only calculate the remaining metrics
|
||||
const SKIP_PRODUCT_METRICS = 0;
|
||||
// Configuration flags for controlling which metrics to calculate
|
||||
// Set to 1 to skip the corresponding calculation, 0 to run it
|
||||
const SKIP_PRODUCT_METRICS = 1; // Skip all product metrics
|
||||
const SKIP_TIME_AGGREGATES = 1; // Skip time aggregates
|
||||
const SKIP_FINANCIAL_METRICS = 1; // Skip financial metrics
|
||||
const SKIP_VENDOR_METRICS = 1; // Skip vendor metrics
|
||||
const SKIP_CATEGORY_METRICS = 1; // Skip category metrics
|
||||
const SKIP_BRAND_METRICS = 1; // Skip brand metrics
|
||||
const SKIP_SALES_FORECASTS = 1; // Skip sales forecasts
|
||||
|
||||
// Add error handler for uncaught exceptions
|
||||
process.on('uncaughtException', (error) => {
|
||||
@@ -137,50 +144,136 @@ async function calculateMetrics() {
|
||||
}
|
||||
|
||||
// Calculate time-based aggregates
|
||||
processedCount = await calculateTimeAggregates(startTime, totalProducts, processedCount);
|
||||
if (!SKIP_TIME_AGGREGATES) {
|
||||
processedCount = await calculateTimeAggregates(startTime, totalProducts, processedCount);
|
||||
} else {
|
||||
console.log('Skipping time aggregates calculation');
|
||||
}
|
||||
|
||||
// Calculate financial metrics
|
||||
processedCount = await calculateFinancialMetrics(startTime, totalProducts, processedCount);
|
||||
if (!SKIP_FINANCIAL_METRICS) {
|
||||
processedCount = await calculateFinancialMetrics(startTime, totalProducts, processedCount);
|
||||
} else {
|
||||
console.log('Skipping financial metrics calculation');
|
||||
}
|
||||
|
||||
// Calculate vendor metrics
|
||||
processedCount = await calculateVendorMetrics(startTime, totalProducts, processedCount);
|
||||
if (!SKIP_VENDOR_METRICS) {
|
||||
processedCount = await calculateVendorMetrics(startTime, totalProducts, processedCount);
|
||||
} else {
|
||||
console.log('Skipping vendor metrics calculation');
|
||||
}
|
||||
|
||||
// Calculate category metrics
|
||||
processedCount = await calculateCategoryMetrics(startTime, totalProducts, processedCount);
|
||||
if (!SKIP_CATEGORY_METRICS) {
|
||||
processedCount = await calculateCategoryMetrics(startTime, totalProducts, processedCount);
|
||||
} else {
|
||||
console.log('Skipping category metrics calculation');
|
||||
}
|
||||
|
||||
// Calculate brand metrics
|
||||
processedCount = await calculateBrandMetrics(startTime, totalProducts, processedCount);
|
||||
if (!SKIP_BRAND_METRICS) {
|
||||
processedCount = await calculateBrandMetrics(startTime, totalProducts, processedCount);
|
||||
} else {
|
||||
console.log('Skipping brand metrics calculation');
|
||||
}
|
||||
|
||||
// Calculate sales forecasts
|
||||
processedCount = await calculateSalesForecasts(startTime, totalProducts, processedCount);
|
||||
if (!SKIP_SALES_FORECASTS) {
|
||||
processedCount = await calculateSalesForecasts(startTime, totalProducts, processedCount);
|
||||
} else {
|
||||
console.log('Skipping sales forecasts calculation');
|
||||
}
|
||||
|
||||
// Calculate ABC classification
|
||||
const [abcConfig] = await connection.query('SELECT a_threshold, b_threshold FROM abc_classification_config WHERE id = 1');
|
||||
const abcThresholds = abcConfig[0] || { a_threshold: 20, b_threshold: 50 };
|
||||
|
||||
// First, create and populate the rankings table with an index
|
||||
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_revenue_ranks');
|
||||
await connection.query(`
|
||||
WITH revenue_rankings AS (
|
||||
SELECT
|
||||
product_id,
|
||||
total_revenue,
|
||||
PERCENT_RANK() OVER (ORDER BY COALESCE(total_revenue, 0) DESC) * 100 as revenue_percentile
|
||||
FROM product_metrics
|
||||
),
|
||||
classification_update AS (
|
||||
SELECT
|
||||
product_id,
|
||||
CREATE TEMPORARY TABLE temp_revenue_ranks (
|
||||
pid BIGINT NOT NULL,
|
||||
total_revenue DECIMAL(10,3),
|
||||
rank_num INT,
|
||||
total_count INT,
|
||||
PRIMARY KEY (pid),
|
||||
INDEX (rank_num)
|
||||
) ENGINE=MEMORY
|
||||
`);
|
||||
|
||||
await connection.query(`
|
||||
INSERT INTO temp_revenue_ranks
|
||||
SELECT
|
||||
pid,
|
||||
total_revenue,
|
||||
@rank := @rank + 1 as rank_num,
|
||||
@total_count := @rank as total_count
|
||||
FROM (
|
||||
SELECT pid, total_revenue
|
||||
FROM product_metrics
|
||||
WHERE total_revenue > 0
|
||||
ORDER BY total_revenue DESC
|
||||
) ranked,
|
||||
(SELECT @rank := 0) r
|
||||
`);
|
||||
|
||||
// Get total count for percentage calculation
|
||||
const [rankingCount] = await connection.query('SELECT MAX(rank_num) as total_count FROM temp_revenue_ranks');
|
||||
const totalCount = rankingCount[0].total_count || 1;
|
||||
|
||||
// Process updates in batches
|
||||
let abcProcessedCount = 0;
|
||||
const batchSize = 5000;
|
||||
|
||||
while (true) {
|
||||
// First get a batch of PIDs that need updating
|
||||
const [pids] = await connection.query(`
|
||||
SELECT pm.pid
|
||||
FROM product_metrics pm
|
||||
LEFT JOIN temp_revenue_ranks tr ON pm.pid = tr.pid
|
||||
WHERE pm.abc_class IS NULL
|
||||
OR pm.abc_class !=
|
||||
CASE
|
||||
WHEN revenue_percentile <= ? THEN 'A'
|
||||
WHEN revenue_percentile <= ? THEN 'B'
|
||||
WHEN tr.rank_num IS NULL THEN 'C'
|
||||
WHEN (tr.rank_num / ?) * 100 <= ? THEN 'A'
|
||||
WHEN (tr.rank_num / ?) * 100 <= ? THEN 'B'
|
||||
ELSE 'C'
|
||||
END as abc_class
|
||||
FROM revenue_rankings
|
||||
)
|
||||
UPDATE product_metrics pm
|
||||
JOIN classification_update cu ON pm.product_id = cu.product_id
|
||||
SET pm.abc_class = cu.abc_class,
|
||||
pm.last_calculated_at = NOW()
|
||||
`, [abcThresholds.a_threshold, abcThresholds.b_threshold]);
|
||||
END
|
||||
LIMIT ?
|
||||
`, [totalCount, abcThresholds.a_threshold,
|
||||
totalCount, abcThresholds.b_threshold,
|
||||
batchSize]);
|
||||
|
||||
if (pids.length === 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
// Then update just those PIDs
|
||||
const [result] = await connection.query(`
|
||||
UPDATE product_metrics pm
|
||||
LEFT JOIN temp_revenue_ranks tr ON pm.pid = tr.pid
|
||||
SET pm.abc_class =
|
||||
CASE
|
||||
WHEN tr.rank_num IS NULL THEN 'C'
|
||||
WHEN (tr.rank_num / ?) * 100 <= ? THEN 'A'
|
||||
WHEN (tr.rank_num / ?) * 100 <= ? THEN 'B'
|
||||
ELSE 'C'
|
||||
END,
|
||||
pm.last_calculated_at = NOW()
|
||||
WHERE pm.pid IN (?)
|
||||
`, [totalCount, abcThresholds.a_threshold,
|
||||
totalCount, abcThresholds.b_threshold,
|
||||
pids.map(row => row.pid)]);
|
||||
|
||||
abcProcessedCount += result.affectedRows;
|
||||
|
||||
// Small delay between batches to allow other transactions
|
||||
await new Promise(resolve => setTimeout(resolve, 100));
|
||||
}
|
||||
|
||||
// Clean up
|
||||
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_revenue_ranks');
|
||||
|
||||
// Final success message
|
||||
global.outputProgress({
|
||||
|
||||
Reference in New Issue
Block a user