Files
inventory/inventory-server/scripts/calculate-metrics.js
2025-02-02 21:22:46 -05:00

566 lines
23 KiB
JavaScript

const path = require('path');
// Change working directory to script directory
process.chdir(path.dirname(__filename));
require('dotenv').config({ path: path.resolve(__dirname, '..', '.env') });
// Configuration flags for controlling which metrics to calculate
// Set to 1 to skip the corresponding calculation, 0 to run it
const SKIP_PRODUCT_METRICS = 1;
const SKIP_TIME_AGGREGATES = 1;
const SKIP_FINANCIAL_METRICS = 1;
const SKIP_VENDOR_METRICS = 1;
const SKIP_CATEGORY_METRICS = 1;
const SKIP_BRAND_METRICS = 1;
const SKIP_SALES_FORECASTS = 0;
// Add error handler for uncaught exceptions
process.on('uncaughtException', (error) => {
console.error('Uncaught Exception:', error);
process.exit(1);
});
// Add error handler for unhandled promise rejections
process.on('unhandledRejection', (reason, promise) => {
console.error('Unhandled Rejection at:', promise, 'reason:', reason);
process.exit(1);
});
const progress = require('./metrics/utils/progress');
console.log('Progress module loaded:', {
modulePath: require.resolve('./metrics/utils/progress'),
exports: Object.keys(progress),
currentDir: process.cwd(),
scriptDir: __dirname
});
// Store progress functions in global scope to ensure availability
global.formatElapsedTime = progress.formatElapsedTime;
global.estimateRemaining = progress.estimateRemaining;
global.calculateRate = progress.calculateRate;
global.outputProgress = progress.outputProgress;
global.clearProgress = progress.clearProgress;
global.getProgress = progress.getProgress;
global.logError = progress.logError;
const { getConnection, closePool } = require('./metrics/utils/db');
const calculateProductMetrics = require('./metrics/product-metrics');
const calculateTimeAggregates = require('./metrics/time-aggregates');
const calculateFinancialMetrics = require('./metrics/financial-metrics');
const calculateVendorMetrics = require('./metrics/vendor-metrics');
const calculateCategoryMetrics = require('./metrics/category-metrics');
const calculateBrandMetrics = require('./metrics/brand-metrics');
const calculateSalesForecasts = require('./metrics/sales-forecasts');
// Add cancel handler
let isCancelled = false;
function cancelCalculation() {
isCancelled = true;
global.clearProgress();
// Format as SSE event
const event = {
progress: {
status: 'cancelled',
operation: 'Calculation cancelled',
current: 0,
total: 0,
elapsed: null,
remaining: null,
rate: 0,
timestamp: Date.now()
}
};
process.stdout.write(JSON.stringify(event) + '\n');
process.exit(0);
}
// Handle SIGTERM signal for cancellation
process.on('SIGTERM', cancelCalculation);
// Update the main calculation function to use the new modular structure
async function calculateMetrics() {
let connection;
const startTime = Date.now();
let processedProducts = 0;
let processedOrders = 0;
let processedPurchaseOrders = 0;
let totalProducts = 0;
let totalOrders = 0;
let totalPurchaseOrders = 0;
let calculateHistoryId;
try {
// Clean up any previously running calculations
connection = await getConnection();
await connection.query(`
UPDATE calculate_history
SET
status = 'cancelled',
end_time = NOW(),
duration_seconds = TIMESTAMPDIFF(SECOND, start_time, NOW()),
error_message = 'Previous calculation was not completed properly'
WHERE status = 'running'
`);
// Get counts from all relevant tables
const [[productCount], [orderCount], [poCount]] = await Promise.all([
connection.query('SELECT COUNT(*) as total FROM products'),
connection.query('SELECT COUNT(*) as total FROM orders'),
connection.query('SELECT COUNT(*) as total FROM purchase_orders')
]);
totalProducts = productCount.total;
totalOrders = orderCount.total;
totalPurchaseOrders = poCount.total;
// Create history record for this calculation
const [historyResult] = await connection.query(`
INSERT INTO calculate_history (
start_time,
status,
total_products,
total_orders,
total_purchase_orders,
additional_info
) VALUES (
NOW(),
'running',
?,
?,
?,
JSON_OBJECT(
'skip_product_metrics', ?,
'skip_time_aggregates', ?,
'skip_financial_metrics', ?,
'skip_vendor_metrics', ?,
'skip_category_metrics', ?,
'skip_brand_metrics', ?,
'skip_sales_forecasts', ?
)
)
`, [
totalProducts,
totalOrders,
totalPurchaseOrders,
SKIP_PRODUCT_METRICS,
SKIP_TIME_AGGREGATES,
SKIP_FINANCIAL_METRICS,
SKIP_VENDOR_METRICS,
SKIP_CATEGORY_METRICS,
SKIP_BRAND_METRICS,
SKIP_SALES_FORECASTS
]);
calculateHistoryId = historyResult.insertId;
connection.release();
// Add debug logging for the progress functions
console.log('Debug - Progress functions:', {
formatElapsedTime: typeof global.formatElapsedTime,
estimateRemaining: typeof global.estimateRemaining,
calculateRate: typeof global.calculateRate,
startTime: startTime
});
try {
const elapsed = global.formatElapsedTime(startTime);
console.log('Debug - formatElapsedTime test successful:', elapsed);
} catch (err) {
console.error('Debug - Error testing formatElapsedTime:', err);
throw err;
}
isCancelled = false;
connection = await getConnection();
try {
global.outputProgress({
status: 'running',
operation: 'Starting metrics calculation',
current: 0,
total: 100,
elapsed: '0s',
remaining: 'Calculating...',
rate: 0,
percentage: '0',
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
// Update progress periodically
const updateProgress = async (products = null, orders = null, purchaseOrders = null) => {
if (products !== null) processedProducts = products;
if (orders !== null) processedOrders = orders;
if (purchaseOrders !== null) processedPurchaseOrders = purchaseOrders;
await connection.query(`
UPDATE calculate_history
SET
processed_products = ?,
processed_orders = ?,
processed_purchase_orders = ?
WHERE id = ?
`, [processedProducts, processedOrders, processedPurchaseOrders, calculateHistoryId]);
};
if (!SKIP_PRODUCT_METRICS) {
processedProducts = await calculateProductMetrics(startTime, totalProducts);
await updateProgress(processedProducts);
} else {
console.log('Skipping product metrics calculation...');
processedProducts = Math.floor(totalProducts * 0.6);
await updateProgress(processedProducts);
global.outputProgress({
status: 'running',
operation: 'Skipping product metrics calculation',
current: processedProducts,
total: totalProducts,
elapsed: global.formatElapsedTime(startTime),
remaining: global.estimateRemaining(startTime, processedProducts, totalProducts),
rate: global.calculateRate(startTime, processedProducts),
percentage: '60',
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
}
// Calculate time-based aggregates
if (!SKIP_TIME_AGGREGATES) {
processedProducts = await calculateTimeAggregates(startTime, totalProducts, processedProducts);
await updateProgress(processedProducts);
} else {
console.log('Skipping time aggregates calculation');
}
// Calculate financial metrics
if (!SKIP_FINANCIAL_METRICS) {
processedProducts = await calculateFinancialMetrics(startTime, totalProducts, processedProducts);
await updateProgress(processedProducts);
} else {
console.log('Skipping financial metrics calculation');
}
// Calculate vendor metrics
if (!SKIP_VENDOR_METRICS) {
processedProducts = await calculateVendorMetrics(startTime, totalProducts, processedProducts);
await updateProgress(processedProducts);
} else {
console.log('Skipping vendor metrics calculation');
}
// Calculate category metrics
if (!SKIP_CATEGORY_METRICS) {
processedProducts = await calculateCategoryMetrics(startTime, totalProducts, processedProducts);
await updateProgress(processedProducts);
} else {
console.log('Skipping category metrics calculation');
}
// Calculate brand metrics
if (!SKIP_BRAND_METRICS) {
processedProducts = await calculateBrandMetrics(startTime, totalProducts, processedProducts);
await updateProgress(processedProducts);
} else {
console.log('Skipping brand metrics calculation');
}
// Calculate sales forecasts
if (!SKIP_SALES_FORECASTS) {
processedProducts = await calculateSalesForecasts(startTime, totalProducts, processedProducts);
await updateProgress(processedProducts);
} else {
console.log('Skipping sales forecasts calculation');
}
// Calculate ABC classification
outputProgress({
status: 'running',
operation: 'Starting ABC classification',
current: processedProducts,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedProducts, totalProducts),
rate: calculateRate(startTime, processedProducts),
percentage: ((processedProducts / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (isCancelled) return processedProducts;
const [abcConfig] = await connection.query('SELECT a_threshold, b_threshold FROM abc_classification_config WHERE id = 1');
const abcThresholds = abcConfig[0] || { a_threshold: 20, b_threshold: 50 };
// First, create and populate the rankings table with an index
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_revenue_ranks');
await connection.query(`
CREATE TEMPORARY TABLE temp_revenue_ranks (
pid BIGINT NOT NULL,
total_revenue DECIMAL(10,3),
rank_num INT,
total_count INT,
PRIMARY KEY (pid),
INDEX (rank_num)
) ENGINE=MEMORY
`);
outputProgress({
status: 'running',
operation: 'Creating revenue rankings',
current: processedProducts,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedProducts, totalProducts),
rate: calculateRate(startTime, processedProducts),
percentage: ((processedProducts / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (isCancelled) return processedProducts;
await connection.query(`
INSERT INTO temp_revenue_ranks
SELECT
pid,
total_revenue,
@rank := @rank + 1 as rank_num,
@total_count := @rank as total_count
FROM (
SELECT pid, total_revenue
FROM product_metrics
WHERE total_revenue > 0
ORDER BY total_revenue DESC
) ranked,
(SELECT @rank := 0) r
`);
// Get total count for percentage calculation
const [rankingCount] = await connection.query('SELECT MAX(rank_num) as total_count FROM temp_revenue_ranks');
const totalCount = rankingCount[0].total_count || 1;
const max_rank = totalCount; // Store max_rank for use in classification
outputProgress({
status: 'running',
operation: 'Updating ABC classifications',
current: processedProducts,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedProducts, totalProducts),
rate: calculateRate(startTime, processedProducts),
percentage: ((processedProducts / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (isCancelled) return processedProducts;
// Process updates in batches
let abcProcessedProducts = 0;
const batchSize = 5000;
while (true) {
if (isCancelled) return processedProducts;
// First get a batch of PIDs that need updating
const [pids] = await connection.query(`
SELECT pm.pid
FROM product_metrics pm
LEFT JOIN temp_revenue_ranks tr ON pm.pid = tr.pid
WHERE pm.abc_class IS NULL
OR pm.abc_class !=
CASE
WHEN tr.rank_num IS NULL THEN 'C'
WHEN (tr.rank_num / ?) * 100 <= ? THEN 'A'
WHEN (tr.rank_num / ?) * 100 <= ? THEN 'B'
ELSE 'C'
END
LIMIT ?
`, [max_rank, abcThresholds.a_threshold,
max_rank, abcThresholds.b_threshold,
batchSize]);
if (pids.length === 0) {
break;
}
// Then update just those PIDs
const [result] = await connection.query(`
UPDATE product_metrics pm
LEFT JOIN temp_revenue_ranks tr ON pm.pid = tr.pid
SET pm.abc_class =
CASE
WHEN tr.rank_num IS NULL THEN 'C'
WHEN (tr.rank_num / ?) * 100 <= ? THEN 'A'
WHEN (tr.rank_num / ?) * 100 <= ? THEN 'B'
ELSE 'C'
END,
pm.last_calculated_at = NOW()
WHERE pm.pid IN (?)
`, [max_rank, abcThresholds.a_threshold,
max_rank, abcThresholds.b_threshold,
pids.map(row => row.pid)]);
abcProcessedProducts += result.affectedRows;
processedProducts = Math.floor(totalProducts * (0.99 + (abcProcessedProducts / totalCount) * 0.01));
outputProgress({
status: 'running',
operation: 'ABC classification progress',
current: processedProducts,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedProducts, totalProducts),
rate: calculateRate(startTime, processedProducts),
percentage: ((processedProducts / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
// Small delay between batches to allow other transactions
await new Promise(resolve => setTimeout(resolve, 100));
}
// Clean up
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_revenue_ranks');
const endTime = Date.now();
const totalElapsedSeconds = Math.round((endTime - startTime) / 1000);
// Update history with completion
await connection.query(`
UPDATE calculate_history
SET
end_time = NOW(),
duration_seconds = ?,
processed_products = ?,
processed_orders = ?,
processed_purchase_orders = ?,
status = 'completed'
WHERE id = ?
`, [totalElapsedSeconds, processedProducts, processedOrders, processedPurchaseOrders, calculateHistoryId]);
// Final success message
outputProgress({
status: 'complete',
operation: 'Metrics calculation complete',
current: totalProducts,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: '0s',
rate: calculateRate(startTime, totalProducts),
percentage: '100',
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
// Clear progress file on successful completion
global.clearProgress();
} catch (error) {
const endTime = Date.now();
const totalElapsedSeconds = Math.round((endTime - startTime) / 1000);
// Update history with error
await connection.query(`
UPDATE calculate_history
SET
end_time = NOW(),
duration_seconds = ?,
processed_products = ?,
processed_orders = ?,
processed_purchase_orders = ?,
status = ?,
error_message = ?
WHERE id = ?
`, [
totalElapsedSeconds,
processedProducts || 0, // Ensure we have a valid number
processedOrders || 0, // Ensure we have a valid number
processedPurchaseOrders || 0, // Ensure we have a valid number
isCancelled ? 'cancelled' : 'failed',
error.message,
calculateHistoryId
]);
if (isCancelled) {
global.outputProgress({
status: 'cancelled',
operation: 'Calculation cancelled',
current: processedProducts,
total: totalProducts || 0,
elapsed: global.formatElapsedTime(startTime),
remaining: null,
rate: global.calculateRate(startTime, processedProducts),
percentage: ((processedProducts / (totalProducts || 1)) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
} else {
global.outputProgress({
status: 'error',
operation: 'Error: ' + error.message,
current: processedProducts,
total: totalProducts || 0,
elapsed: global.formatElapsedTime(startTime),
remaining: null,
rate: global.calculateRate(startTime, processedProducts),
percentage: ((processedProducts / (totalProducts || 1)) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
}
throw error;
} finally {
if (connection) {
connection.release();
}
}
} finally {
// Close the connection pool when we're done
await closePool();
}
}
// Export both functions and progress checker
module.exports = calculateMetrics;
module.exports.cancelCalculation = cancelCalculation;
module.exports.getProgress = global.getProgress;
// Run directly if called from command line
if (require.main === module) {
calculateMetrics().catch(error => {
if (!error.message.includes('Operation cancelled')) {
console.error('Error:', error);
}
process.exit(1);
});
}