Enhance calculate scripts to deal with times and counts + fix regressions

This commit is contained in:
2025-02-03 22:21:39 -05:00
parent 5676e9094d
commit ebffb8f912
2 changed files with 224 additions and 109 deletions

View File

@@ -7,12 +7,12 @@ require('dotenv').config({ path: path.resolve(__dirname, '..', '.env') });
// Configuration flags for controlling which metrics to calculate // Configuration flags for controlling which metrics to calculate
// Set to 1 to skip the corresponding calculation, 0 to run it // Set to 1 to skip the corresponding calculation, 0 to run it
const SKIP_PRODUCT_METRICS = 1; const SKIP_PRODUCT_METRICS = 0;
const SKIP_TIME_AGGREGATES = 1; const SKIP_TIME_AGGREGATES = 0;
const SKIP_FINANCIAL_METRICS = 1; const SKIP_FINANCIAL_METRICS = 0;
const SKIP_VENDOR_METRICS = 1; const SKIP_VENDOR_METRICS = 0;
const SKIP_CATEGORY_METRICS = 1; const SKIP_CATEGORY_METRICS = 0;
const SKIP_BRAND_METRICS = 1; const SKIP_BRAND_METRICS = 0;
const SKIP_SALES_FORECASTS = 0; const SKIP_SALES_FORECASTS = 0;
// Add error handler for uncaught exceptions // Add error handler for uncaught exceptions
@@ -193,9 +193,15 @@ async function calculateMetrics() {
// Update progress periodically // Update progress periodically
const updateProgress = async (products = null, orders = null, purchaseOrders = null) => { const updateProgress = async (products = null, orders = null, purchaseOrders = null) => {
if (products !== null) processedProducts = products; // Ensure all values are valid numbers or default to previous value
if (orders !== null) processedOrders = orders; if (products !== null) processedProducts = Number(products) || processedProducts || 0;
if (purchaseOrders !== null) processedPurchaseOrders = purchaseOrders; if (orders !== null) processedOrders = Number(orders) || processedOrders || 0;
if (purchaseOrders !== null) processedPurchaseOrders = Number(purchaseOrders) || processedPurchaseOrders || 0;
// Ensure we never send NaN to the database
const safeProducts = Number(processedProducts) || 0;
const safeOrders = Number(processedOrders) || 0;
const safePurchaseOrders = Number(processedPurchaseOrders) || 0;
await connection.query(` await connection.query(`
UPDATE calculate_history UPDATE calculate_history
@@ -204,12 +210,40 @@ async function calculateMetrics() {
processed_orders = ?, processed_orders = ?,
processed_purchase_orders = ? processed_purchase_orders = ?
WHERE id = ? WHERE id = ?
`, [processedProducts, processedOrders, processedPurchaseOrders, calculateHistoryId]); `, [safeProducts, safeOrders, safePurchaseOrders, calculateHistoryId]);
}; };
// Helper function to ensure valid progress numbers
const ensureValidProgress = (current, total) => ({
current: Number(current) || 0,
total: Number(total) || 1, // Default to 1 to avoid division by zero
percentage: (((Number(current) || 0) / (Number(total) || 1)) * 100).toFixed(1)
});
// Initial progress
const initialProgress = ensureValidProgress(0, totalProducts);
global.outputProgress({
status: 'running',
operation: 'Starting metrics calculation',
current: initialProgress.current,
total: initialProgress.total,
elapsed: '0s',
remaining: 'Calculating...',
rate: 0,
percentage: initialProgress.percentage,
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (!SKIP_PRODUCT_METRICS) { if (!SKIP_PRODUCT_METRICS) {
processedProducts = await calculateProductMetrics(startTime, totalProducts); const result = await calculateProductMetrics(startTime, totalProducts);
await updateProgress(processedProducts); await updateProgress(result.processedProducts, result.processedOrders, result.processedPurchaseOrders);
if (!result.success) {
throw new Error('Product metrics calculation failed');
}
} else { } else {
console.log('Skipping product metrics calculation...'); console.log('Skipping product metrics calculation...');
processedProducts = Math.floor(totalProducts * 0.6); processedProducts = Math.floor(totalProducts * 0.6);
@@ -233,48 +267,66 @@ async function calculateMetrics() {
// Calculate time-based aggregates // Calculate time-based aggregates
if (!SKIP_TIME_AGGREGATES) { if (!SKIP_TIME_AGGREGATES) {
processedProducts = await calculateTimeAggregates(startTime, totalProducts, processedProducts); const result = await calculateTimeAggregates(startTime, totalProducts, processedProducts);
await updateProgress(processedProducts); await updateProgress(result.processedProducts, result.processedOrders, result.processedPurchaseOrders);
if (!result.success) {
throw new Error('Time aggregates calculation failed');
}
} else { } else {
console.log('Skipping time aggregates calculation'); console.log('Skipping time aggregates calculation');
} }
// Calculate financial metrics // Calculate financial metrics
if (!SKIP_FINANCIAL_METRICS) { if (!SKIP_FINANCIAL_METRICS) {
processedProducts = await calculateFinancialMetrics(startTime, totalProducts, processedProducts); const result = await calculateFinancialMetrics(startTime, totalProducts, processedProducts);
await updateProgress(processedProducts); await updateProgress(result.processedProducts, result.processedOrders, result.processedPurchaseOrders);
if (!result.success) {
throw new Error('Financial metrics calculation failed');
}
} else { } else {
console.log('Skipping financial metrics calculation'); console.log('Skipping financial metrics calculation');
} }
// Calculate vendor metrics // Calculate vendor metrics
if (!SKIP_VENDOR_METRICS) { if (!SKIP_VENDOR_METRICS) {
processedProducts = await calculateVendorMetrics(startTime, totalProducts, processedProducts); const result = await calculateVendorMetrics(startTime, totalProducts, processedProducts);
await updateProgress(processedProducts); await updateProgress(result.processedProducts, result.processedOrders, result.processedPurchaseOrders);
if (!result.success) {
throw new Error('Vendor metrics calculation failed');
}
} else { } else {
console.log('Skipping vendor metrics calculation'); console.log('Skipping vendor metrics calculation');
} }
// Calculate category metrics // Calculate category metrics
if (!SKIP_CATEGORY_METRICS) { if (!SKIP_CATEGORY_METRICS) {
processedProducts = await calculateCategoryMetrics(startTime, totalProducts, processedProducts); const result = await calculateCategoryMetrics(startTime, totalProducts, processedProducts);
await updateProgress(processedProducts); await updateProgress(result.processedProducts, result.processedOrders, result.processedPurchaseOrders);
if (!result.success) {
throw new Error('Category metrics calculation failed');
}
} else { } else {
console.log('Skipping category metrics calculation'); console.log('Skipping category metrics calculation');
} }
// Calculate brand metrics // Calculate brand metrics
if (!SKIP_BRAND_METRICS) { if (!SKIP_BRAND_METRICS) {
processedProducts = await calculateBrandMetrics(startTime, totalProducts, processedProducts); const result = await calculateBrandMetrics(startTime, totalProducts, processedProducts);
await updateProgress(processedProducts); await updateProgress(result.processedProducts, result.processedOrders, result.processedPurchaseOrders);
if (!result.success) {
throw new Error('Brand metrics calculation failed');
}
} else { } else {
console.log('Skipping brand metrics calculation'); console.log('Skipping brand metrics calculation');
} }
// Calculate sales forecasts // Calculate sales forecasts
if (!SKIP_SALES_FORECASTS) { if (!SKIP_SALES_FORECASTS) {
processedProducts = await calculateSalesForecasts(startTime, totalProducts, processedProducts); const result = await calculateSalesForecasts(startTime, totalProducts, processedProducts);
await updateProgress(processedProducts); await updateProgress(result.processedProducts, result.processedOrders, result.processedPurchaseOrders);
if (!result.success) {
throw new Error('Sales forecasts calculation failed');
}
} else { } else {
console.log('Skipping sales forecasts calculation'); console.log('Skipping sales forecasts calculation');
} }
@@ -283,12 +335,12 @@ async function calculateMetrics() {
outputProgress({ outputProgress({
status: 'running', status: 'running',
operation: 'Starting ABC classification', operation: 'Starting ABC classification',
current: processedProducts, current: processedProducts || 0,
total: totalProducts, total: totalProducts || 0,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedProducts, totalProducts), remaining: estimateRemaining(startTime, processedProducts || 0, totalProducts || 0),
rate: calculateRate(startTime, processedProducts), rate: calculateRate(startTime, processedProducts || 0),
percentage: ((processedProducts / totalProducts) * 100).toFixed(1), percentage: (((processedProducts || 0) / (totalProducts || 1)) * 100).toFixed(1),
timing: { timing: {
start_time: new Date(startTime).toISOString(), start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(), end_time: new Date().toISOString(),
@@ -296,7 +348,12 @@ async function calculateMetrics() {
} }
}); });
if (isCancelled) return processedProducts; if (isCancelled) return {
processedProducts: processedProducts || 0,
processedOrders: processedOrders || 0,
processedPurchaseOrders: 0,
success: false
};
const [abcConfig] = await connection.query('SELECT a_threshold, b_threshold FROM abc_classification_config WHERE id = 1'); const [abcConfig] = await connection.query('SELECT a_threshold, b_threshold FROM abc_classification_config WHERE id = 1');
const abcThresholds = abcConfig[0] || { a_threshold: 20, b_threshold: 50 }; const abcThresholds = abcConfig[0] || { a_threshold: 20, b_threshold: 50 };
@@ -317,12 +374,12 @@ async function calculateMetrics() {
outputProgress({ outputProgress({
status: 'running', status: 'running',
operation: 'Creating revenue rankings', operation: 'Creating revenue rankings',
current: processedProducts, current: processedProducts || 0,
total: totalProducts, total: totalProducts || 0,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedProducts, totalProducts), remaining: estimateRemaining(startTime, processedProducts || 0, totalProducts || 0),
rate: calculateRate(startTime, processedProducts), rate: calculateRate(startTime, processedProducts || 0),
percentage: ((processedProducts / totalProducts) * 100).toFixed(1), percentage: (((processedProducts || 0) / (totalProducts || 1)) * 100).toFixed(1),
timing: { timing: {
start_time: new Date(startTime).toISOString(), start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(), end_time: new Date().toISOString(),
@@ -330,7 +387,12 @@ async function calculateMetrics() {
} }
}); });
if (isCancelled) return processedProducts; if (isCancelled) return {
processedProducts: processedProducts || 0,
processedOrders: processedOrders || 0,
processedPurchaseOrders: 0,
success: false
};
await connection.query(` await connection.query(`
INSERT INTO temp_revenue_ranks INSERT INTO temp_revenue_ranks
@@ -356,12 +418,12 @@ async function calculateMetrics() {
outputProgress({ outputProgress({
status: 'running', status: 'running',
operation: 'Updating ABC classifications', operation: 'Updating ABC classifications',
current: processedProducts, current: processedProducts || 0,
total: totalProducts, total: totalProducts || 0,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedProducts, totalProducts), remaining: estimateRemaining(startTime, processedProducts || 0, totalProducts || 0),
rate: calculateRate(startTime, processedProducts), rate: calculateRate(startTime, processedProducts || 0),
percentage: ((processedProducts / totalProducts) * 100).toFixed(1), percentage: (((processedProducts || 0) / (totalProducts || 1)) * 100).toFixed(1),
timing: { timing: {
start_time: new Date(startTime).toISOString(), start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(), end_time: new Date().toISOString(),
@@ -369,14 +431,26 @@ async function calculateMetrics() {
} }
}); });
if (isCancelled) return processedProducts; if (isCancelled) return {
processedProducts: processedProducts || 0,
processedOrders: processedOrders || 0,
processedPurchaseOrders: 0,
success: false
};
// Process updates in batches // ABC classification progress tracking
let abcProcessedProducts = 0; let abcProcessedCount = 0;
const batchSize = 5000; const batchSize = 5000;
let lastProgressUpdate = Date.now();
const progressUpdateInterval = 1000; // Update every second
while (true) { while (true) {
if (isCancelled) return processedProducts; if (isCancelled) return {
processedProducts: Number(processedProducts) || 0,
processedOrders: Number(processedOrders) || 0,
processedPurchaseOrders: 0,
success: false
};
// First get a batch of PIDs that need updating // First get a batch of PIDs that need updating
const [pids] = await connection.query(` const [pids] = await connection.query(`
@@ -417,24 +491,38 @@ async function calculateMetrics() {
max_rank, abcThresholds.b_threshold, max_rank, abcThresholds.b_threshold,
pids.map(row => row.pid)]); pids.map(row => row.pid)]);
abcProcessedProducts += result.affectedRows; abcProcessedCount += result.affectedRows;
processedProducts = Math.floor(totalProducts * (0.99 + (abcProcessedProducts / totalCount) * 0.01));
// Calculate progress ensuring valid numbers
const currentProgress = Math.floor(totalProducts * (0.99 + (abcProcessedCount / (totalCount || 1)) * 0.01));
processedProducts = Number(currentProgress) || processedProducts || 0;
outputProgress({ // Only update progress at most once per second
status: 'running', const now = Date.now();
operation: 'ABC classification progress', if (now - lastProgressUpdate >= progressUpdateInterval) {
current: processedProducts, const progress = ensureValidProgress(processedProducts, totalProducts);
total: totalProducts,
elapsed: formatElapsedTime(startTime), outputProgress({
remaining: estimateRemaining(startTime, processedProducts, totalProducts), status: 'running',
rate: calculateRate(startTime, processedProducts), operation: 'ABC classification progress',
percentage: ((processedProducts / totalProducts) * 100).toFixed(1), current: progress.current,
timing: { total: progress.total,
start_time: new Date(startTime).toISOString(), elapsed: formatElapsedTime(startTime),
end_time: new Date().toISOString(), remaining: estimateRemaining(startTime, progress.current, progress.total),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000) rate: calculateRate(startTime, progress.current),
} percentage: progress.percentage,
}); timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
lastProgressUpdate = now;
}
// Update database progress
await updateProgress(processedProducts, processedOrders, processedPurchaseOrders);
// Small delay between batches to allow other transactions // Small delay between batches to allow other transactions
await new Promise(resolve => setTimeout(resolve, 100)); await new Promise(resolve => setTimeout(resolve, 100));
@@ -446,6 +534,40 @@ async function calculateMetrics() {
const endTime = Date.now(); const endTime = Date.now();
const totalElapsedSeconds = Math.round((endTime - startTime) / 1000); const totalElapsedSeconds = Math.round((endTime - startTime) / 1000);
// Update calculate_status for ABC classification
await connection.query(`
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
VALUES ('abc_classification', NOW())
ON DUPLICATE KEY UPDATE last_calculation_timestamp = NOW()
`);
// Final progress update with guaranteed valid numbers
const finalProgress = ensureValidProgress(totalProducts, totalProducts);
// Final success message
outputProgress({
status: 'complete',
operation: 'Metrics calculation complete',
current: finalProgress.current,
total: finalProgress.total,
elapsed: formatElapsedTime(startTime),
remaining: '0s',
rate: calculateRate(startTime, finalProgress.current),
percentage: '100',
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: totalElapsedSeconds
}
});
// Ensure all values are valid numbers before final update
const finalStats = {
processedProducts: Number(processedProducts) || 0,
processedOrders: Number(processedOrders) || 0,
processedPurchaseOrders: Number(processedPurchaseOrders) || 0
};
// Update history with completion // Update history with completion
await connection.query(` await connection.query(`
UPDATE calculate_history UPDATE calculate_history
@@ -457,24 +579,11 @@ async function calculateMetrics() {
processed_purchase_orders = ?, processed_purchase_orders = ?,
status = 'completed' status = 'completed'
WHERE id = ? WHERE id = ?
`, [totalElapsedSeconds, processedProducts, processedOrders, processedPurchaseOrders, calculateHistoryId]); `, [totalElapsedSeconds,
finalStats.processedProducts,
// Final success message finalStats.processedOrders,
outputProgress({ finalStats.processedPurchaseOrders,
status: 'complete', calculateHistoryId]);
operation: 'Metrics calculation complete',
current: totalProducts,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: '0s',
rate: calculateRate(startTime, totalProducts),
percentage: '100',
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
// Clear progress file on successful completion // Clear progress file on successful completion
global.clearProgress(); global.clearProgress();

View File

@@ -19,6 +19,12 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
const SKIP_PRODUCT_BASE_METRICS = 0; const SKIP_PRODUCT_BASE_METRICS = 0;
const SKIP_PRODUCT_TIME_AGGREGATES = 0; const SKIP_PRODUCT_TIME_AGGREGATES = 0;
// Get total product count if not provided
if (!totalProducts) {
const [productCount] = await connection.query('SELECT COUNT(*) as count FROM products');
totalProducts = productCount[0].count;
}
if (isCancelled) { if (isCancelled) {
outputProgress({ outputProgress({
status: 'cancelled', status: 'cancelled',
@@ -166,12 +172,12 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
outputProgress({ outputProgress({
status: 'running', status: 'running',
operation: 'Base product metrics calculated', operation: 'Base product metrics calculated',
current: processedCount, current: processedCount || 0,
total: totalProducts, total: totalProducts || 0,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts), remaining: estimateRemaining(startTime, processedCount || 0, totalProducts || 0),
rate: calculateRate(startTime, processedCount), rate: calculateRate(startTime, processedCount || 0),
percentage: ((processedCount / totalProducts) * 100).toFixed(1), percentage: (((processedCount || 0) / (totalProducts || 1)) * 100).toFixed(1),
timing: { timing: {
start_time: new Date(startTime).toISOString(), start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(), end_time: new Date().toISOString(),
@@ -183,12 +189,12 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
outputProgress({ outputProgress({
status: 'running', status: 'running',
operation: 'Skipping base product metrics calculation', operation: 'Skipping base product metrics calculation',
current: processedCount, current: processedCount || 0,
total: totalProducts, total: totalProducts || 0,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts), remaining: estimateRemaining(startTime, processedCount || 0, totalProducts || 0),
rate: calculateRate(startTime, processedCount), rate: calculateRate(startTime, processedCount || 0),
percentage: ((processedCount / totalProducts) * 100).toFixed(1), percentage: (((processedCount || 0) / (totalProducts || 1)) * 100).toFixed(1),
timing: { timing: {
start_time: new Date(startTime).toISOString(), start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(), end_time: new Date().toISOString(),
@@ -198,8 +204,8 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
} }
if (isCancelled) return { if (isCancelled) return {
processedProducts: processedCount, processedProducts: processedCount || 0,
processedOrders, processedOrders: processedOrders || 0,
processedPurchaseOrders: 0, // This module doesn't process POs processedPurchaseOrders: 0, // This module doesn't process POs
success success
}; };
@@ -209,12 +215,12 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
outputProgress({ outputProgress({
status: 'running', status: 'running',
operation: 'Starting product time aggregates calculation', operation: 'Starting product time aggregates calculation',
current: processedCount, current: processedCount || 0,
total: totalProducts, total: totalProducts || 0,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts), remaining: estimateRemaining(startTime, processedCount || 0, totalProducts || 0),
rate: calculateRate(startTime, processedCount), rate: calculateRate(startTime, processedCount || 0),
percentage: ((processedCount / totalProducts) * 100).toFixed(1), percentage: (((processedCount || 0) / (totalProducts || 1)) * 100).toFixed(1),
timing: { timing: {
start_time: new Date(startTime).toISOString(), start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(), end_time: new Date().toISOString(),
@@ -276,12 +282,12 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
outputProgress({ outputProgress({
status: 'running', status: 'running',
operation: 'Product time aggregates calculated', operation: 'Product time aggregates calculated',
current: processedCount, current: processedCount || 0,
total: totalProducts, total: totalProducts || 0,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts), remaining: estimateRemaining(startTime, processedCount || 0, totalProducts || 0),
rate: calculateRate(startTime, processedCount), rate: calculateRate(startTime, processedCount || 0),
percentage: ((processedCount / totalProducts) * 100).toFixed(1), percentage: (((processedCount || 0) / (totalProducts || 1)) * 100).toFixed(1),
timing: { timing: {
start_time: new Date(startTime).toISOString(), start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(), end_time: new Date().toISOString(),
@@ -293,12 +299,12 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
outputProgress({ outputProgress({
status: 'running', status: 'running',
operation: 'Skipping product time aggregates calculation', operation: 'Skipping product time aggregates calculation',
current: processedCount, current: processedCount || 0,
total: totalProducts, total: totalProducts || 0,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts), remaining: estimateRemaining(startTime, processedCount || 0, totalProducts || 0),
rate: calculateRate(startTime, processedCount), rate: calculateRate(startTime, processedCount || 0),
percentage: ((processedCount / totalProducts) * 100).toFixed(1), percentage: (((processedCount || 0) / (totalProducts || 1)) * 100).toFixed(1),
timing: { timing: {
start_time: new Date(startTime).toISOString(), start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(), end_time: new Date().toISOString(),
@@ -468,8 +474,8 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
`); `);
return { return {
processedProducts: processedCount, processedProducts: processedCount || 0,
processedOrders, processedOrders: processedOrders || 0,
processedPurchaseOrders: 0, // This module doesn't process POs processedPurchaseOrders: 0, // This module doesn't process POs
success success
}; };