Files
inventory/inventory-server/scripts/calculate-metrics.js

639 lines
27 KiB
JavaScript

const path = require('path');
// Change working directory to script directory
process.chdir(path.dirname(__filename));
require('dotenv').config({ path: path.resolve(__dirname, '..', '.env') });
// Configuration flags for controlling which metrics to calculate
// Set to 1 to skip the corresponding calculation, 0 to run it
const SKIP_PRODUCT_METRICS = 0;
const SKIP_TIME_AGGREGATES = 0;
const SKIP_FINANCIAL_METRICS = 0;
const SKIP_VENDOR_METRICS = 0;
const SKIP_CATEGORY_METRICS = 0;
const SKIP_BRAND_METRICS = 0;
const SKIP_SALES_FORECASTS = 0;
// Add error handler for uncaught exceptions
process.on('uncaughtException', (error) => {
console.error('Uncaught Exception:', error);
process.exit(1);
});
// Add error handler for unhandled promise rejections
process.on('unhandledRejection', (reason, promise) => {
console.error('Unhandled Rejection at:', promise, 'reason:', reason);
process.exit(1);
});
const progress = require('./metrics/utils/progress');
console.log('Progress module loaded:', {
modulePath: require.resolve('./metrics/utils/progress'),
exports: Object.keys(progress),
currentDir: process.cwd(),
scriptDir: __dirname
});
// Store progress functions in global scope to ensure availability
global.formatElapsedTime = progress.formatElapsedTime;
global.estimateRemaining = progress.estimateRemaining;
global.calculateRate = progress.calculateRate;
global.outputProgress = progress.outputProgress;
global.clearProgress = progress.clearProgress;
global.getProgress = progress.getProgress;
global.logError = progress.logError;
const { getConnection, closePool } = require('./metrics/utils/db');
const calculateProductMetrics = require('./metrics/product-metrics');
const calculateTimeAggregates = require('./metrics/time-aggregates');
const calculateFinancialMetrics = require('./metrics/financial-metrics');
const calculateVendorMetrics = require('./metrics/vendor-metrics');
const calculateCategoryMetrics = require('./metrics/category-metrics');
const calculateBrandMetrics = require('./metrics/brand-metrics');
const calculateSalesForecasts = require('./metrics/sales-forecasts');
// Add cancel handler
let isCancelled = false;
function cancelCalculation() {
isCancelled = true;
global.clearProgress();
// Format as SSE event
const event = {
progress: {
status: 'cancelled',
operation: 'Calculation cancelled',
current: 0,
total: 0,
elapsed: null,
remaining: null,
rate: 0,
timestamp: Date.now()
}
};
process.stdout.write(JSON.stringify(event) + '\n');
process.exit(0);
}
// Handle SIGTERM signal for cancellation
process.on('SIGTERM', cancelCalculation);
// Update the main calculation function to use the new modular structure
async function calculateMetrics() {
let connection;
const startTime = Date.now();
// Initialize all counts to 0
let processedProducts = 0;
let processedOrders = 0;
let processedPurchaseOrders = 0;
let totalProducts = 0;
let totalOrders = 0;
let totalPurchaseOrders = 0;
let calculateHistoryId;
try {
// Clean up any previously running calculations
connection = await getConnection();
await connection.query(`
UPDATE calculate_history
SET
status = 'cancelled',
end_time = NOW(),
duration_seconds = TIMESTAMPDIFF(SECOND, start_time, NOW()),
error_message = 'Previous calculation was not completed properly'
WHERE status = 'running'
`);
// Get counts of records that need updating based on last calculation time
const [[productCount], [orderCount], [poCount]] = await Promise.all([
connection.query(`
SELECT COUNT(DISTINCT p.pid) as total
FROM products p
FORCE INDEX (PRIMARY)
LEFT JOIN calculate_status cs ON cs.module_name = 'product_metrics'
LEFT JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid
AND o.updated > COALESCE(cs.last_calculation_timestamp, '1970-01-01')
AND o.canceled = false
LEFT JOIN purchase_orders po FORCE INDEX (idx_purchase_orders_metrics) ON p.pid = po.pid
AND po.updated > COALESCE(cs.last_calculation_timestamp, '1970-01-01')
WHERE p.updated > COALESCE(cs.last_calculation_timestamp, '1970-01-01')
OR o.pid IS NOT NULL
OR po.pid IS NOT NULL
`),
connection.query(`
SELECT COUNT(DISTINCT o.id) as total
FROM orders o
FORCE INDEX (idx_orders_metrics)
LEFT JOIN calculate_status cs ON cs.module_name = 'product_metrics'
WHERE o.updated > COALESCE(cs.last_calculation_timestamp, '1970-01-01')
AND o.canceled = false
`),
connection.query(`
SELECT COUNT(DISTINCT po.id) as total
FROM purchase_orders po
FORCE INDEX (idx_purchase_orders_metrics)
LEFT JOIN calculate_status cs ON cs.module_name = 'product_metrics'
WHERE po.updated > COALESCE(cs.last_calculation_timestamp, '1970-01-01')
`)
]);
totalProducts = productCount.total;
totalOrders = orderCount.total;
totalPurchaseOrders = poCount.total;
connection.release();
// If nothing needs updating, we can exit early
if (totalProducts === 0 && totalOrders === 0 && totalPurchaseOrders === 0) {
console.log('No records need updating');
return {
processedProducts: 0,
processedOrders: 0,
processedPurchaseOrders: 0,
success: true
};
}
// Create history record for this calculation
connection = await getConnection(); // Re-establish connection
const [historyResult] = await connection.query(`
INSERT INTO calculate_history (
start_time,
status,
total_products,
total_orders,
total_purchase_orders,
additional_info
) VALUES (
NOW(),
'running',
?,
?,
?,
JSON_OBJECT(
'skip_product_metrics', ?,
'skip_time_aggregates', ?,
'skip_financial_metrics', ?,
'skip_vendor_metrics', ?,
'skip_category_metrics', ?,
'skip_brand_metrics', ?,
'skip_sales_forecasts', ?
)
)
`, [
totalProducts,
totalOrders,
totalPurchaseOrders,
SKIP_PRODUCT_METRICS,
SKIP_TIME_AGGREGATES,
SKIP_FINANCIAL_METRICS,
SKIP_VENDOR_METRICS,
SKIP_CATEGORY_METRICS,
SKIP_BRAND_METRICS,
SKIP_SALES_FORECASTS
]);
calculateHistoryId = historyResult.insertId;
connection.release();
// Add debug logging for the progress functions
console.log('Debug - Progress functions:', {
formatElapsedTime: typeof global.formatElapsedTime,
estimateRemaining: typeof global.estimateRemaining,
calculateRate: typeof global.calculateRate,
startTime: startTime
});
try {
const elapsed = global.formatElapsedTime(startTime);
console.log('Debug - formatElapsedTime test successful:', elapsed);
} catch (err) {
console.error('Debug - Error testing formatElapsedTime:', err);
throw err;
}
isCancelled = false;
connection = await getConnection(); // Get a new connection for the main processing
try {
global.outputProgress({
status: 'running',
operation: 'Starting metrics calculation',
current: 0,
total: 100,
elapsed: '0s',
remaining: 'Calculating...',
rate: 0,
percentage: '0',
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
// Update progress periodically - REFACTORED
const updateProgress = async (products = null, orders = null, purchaseOrders = null) => {
if (products !== null) processedProducts = Number(products) || processedProducts || 0;
if (orders !== null) processedOrders = Number(orders) || processedOrders || 0;
if (purchaseOrders !== null) processedPurchaseOrders = Number(purchaseOrders) || processedPurchaseOrders || 0;
const safeProducts = Number(processedProducts) || 0;
const safeOrders = Number(processedOrders) || 0;
const safePurchaseOrders = Number(processedPurchaseOrders) || 0;
await connection.query(`
UPDATE calculate_history
SET
processed_products = ?,
processed_orders = ?,
processed_purchase_orders = ?
WHERE id = ?
`, [safeProducts, safeOrders, safePurchaseOrders, calculateHistoryId]);
};
// Helper function to ensure valid progress numbers - this is fine
const ensureValidProgress = (current, total) => ({
current: Number(current) || 0,
total: Number(total) || 1, // Default to 1 to avoid division by zero
percentage: (((Number(current) || 0) / (Number(total) || 1)) * 100).toFixed(1)
});
// Initial progress - this is fine
const initialProgress = ensureValidProgress(0, totalProducts);
global.outputProgress({
status: 'running',
operation: 'Starting metrics calculation',
current: initialProgress.current,
total: initialProgress.total,
elapsed: '0s',
remaining: 'Calculating...',
rate: 0,
percentage: initialProgress.percentage,
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
// --- Call each module, passing totals and accumulating processed counts ---
if (!SKIP_PRODUCT_METRICS) {
const result = await calculateProductMetrics(startTime, totalProducts, processedProducts, isCancelled); // Pass totals
processedProducts += result.processedProducts; // Accumulate
processedOrders += result.processedOrders;
processedPurchaseOrders += result.processedPurchaseOrders;
await updateProgress(processedProducts, processedOrders, processedPurchaseOrders); // Update with accumulated values
if (!result.success) {
throw new Error('Product metrics calculation failed');
}
} else {
console.log('Skipping product metrics calculation...');
// Don't artificially inflate processedProducts if skipping
}
if (!SKIP_TIME_AGGREGATES) {
const result = await calculateTimeAggregates(startTime, totalProducts, processedProducts, isCancelled); // Pass totals
processedProducts += result.processedProducts; // Accumulate
processedOrders += result.processedOrders;
processedPurchaseOrders += result.processedPurchaseOrders;
await updateProgress(processedProducts, processedOrders, processedPurchaseOrders);
if (!result.success) {
throw new Error('Time aggregates calculation failed');
}
} else {
console.log('Skipping time aggregates calculation');
}
if (!SKIP_FINANCIAL_METRICS) {
const result = await calculateFinancialMetrics(startTime, totalProducts, processedProducts, isCancelled); // Pass totals
processedProducts += result.processedProducts; // Accumulate
processedOrders += result.processedOrders;
processedPurchaseOrders += result.processedPurchaseOrders;
await updateProgress(processedProducts, processedOrders, processedPurchaseOrders);
if (!result.success) {
throw new Error('Financial metrics calculation failed');
}
} else {
console.log('Skipping financial metrics calculation');
}
if (!SKIP_VENDOR_METRICS) {
const result = await calculateVendorMetrics(startTime, totalProducts, processedProducts, isCancelled); // Pass totals
processedProducts += result.processedProducts; // Accumulate
processedOrders += result.processedOrders;
processedPurchaseOrders += result.processedPurchaseOrders;
await updateProgress(processedProducts, processedOrders, processedPurchaseOrders);
if (!result.success) {
throw new Error('Vendor metrics calculation failed');
}
} else {
console.log('Skipping vendor metrics calculation');
}
if (!SKIP_CATEGORY_METRICS) {
const result = await calculateCategoryMetrics(startTime, totalProducts, processedProducts, isCancelled); // Pass totals
processedProducts += result.processedProducts; // Accumulate
processedOrders += result.processedOrders;
processedPurchaseOrders += result.processedPurchaseOrders;
await updateProgress(processedProducts, processedOrders, processedPurchaseOrders);
if (!result.success) {
throw new Error('Category metrics calculation failed');
}
} else {
console.log('Skipping category metrics calculation');
}
if (!SKIP_BRAND_METRICS) {
const result = await calculateBrandMetrics(startTime, totalProducts, processedProducts, isCancelled); // Pass totals
processedProducts += result.processedProducts; // Accumulate
processedOrders += result.processedOrders;
processedPurchaseOrders += result.processedPurchaseOrders;
await updateProgress(processedProducts, processedOrders, processedPurchaseOrders);
if (!result.success) {
throw new Error('Brand metrics calculation failed');
}
} else {
console.log('Skipping brand metrics calculation');
}
if (!SKIP_SALES_FORECASTS) {
const result = await calculateSalesForecasts(startTime, totalProducts, processedProducts, isCancelled); // Pass totals
processedProducts += result.processedProducts; // Accumulate
processedOrders += result.processedOrders;
processedPurchaseOrders += result.processedPurchaseOrders;
await updateProgress(processedProducts, processedOrders, processedPurchaseOrders);
if (!result.success) {
throw new Error('Sales forecasts calculation failed');
}
} else {
console.log('Skipping sales forecasts calculation');
}
// --- ABC Classification (Refactored) ---
if (isCancelled) return {
processedProducts: processedProducts || 0,
processedOrders: processedOrders || 0,
processedPurchaseOrders: 0,
success: false
};
const [abcConfig] = await connection.query('SELECT a_threshold, b_threshold FROM abc_classification_config WHERE id = 1');
const abcThresholds = abcConfig[0] || { a_threshold: 20, b_threshold: 50 };
// First, create and populate the rankings table with an index
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_revenue_ranks');
await connection.query(`
CREATE TEMPORARY TABLE temp_revenue_ranks (
pid BIGINT NOT NULL,
total_revenue DECIMAL(10,3),
rank_num INT,
dense_rank_num INT,
percentile DECIMAL(5,2),
total_count INT,
PRIMARY KEY (pid),
INDEX (rank_num),
INDEX (dense_rank_num),
INDEX (percentile)
) ENGINE=MEMORY
`);
let processedCount = processedProducts;
global.outputProgress({
status: 'running',
operation: 'Creating revenue rankings',
current: processedCount,
total: totalProducts,
elapsed: global.formatElapsedTime(startTime),
remaining: global.estimateRemaining(startTime, processedCount, totalProducts),
rate: global.calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (isCancelled) return {
processedProducts: processedProducts || 0,
processedOrders: processedOrders || 0,
processedPurchaseOrders: 0,
success: false
};
// Calculate rankings with proper tie handling and get total count in one go.
await connection.query(`
INSERT INTO temp_revenue_ranks
WITH revenue_data AS (
SELECT
pid,
total_revenue,
COUNT(*) OVER () as total_count,
PERCENT_RANK() OVER (ORDER BY total_revenue DESC) * 100 as percentile,
RANK() OVER (ORDER BY total_revenue DESC) as rank_num,
DENSE_RANK() OVER (ORDER BY total_revenue DESC) as dense_rank_num
FROM product_metrics
WHERE total_revenue > 0
)
SELECT
pid,
total_revenue,
rank_num,
dense_rank_num,
percentile,
total_count
FROM revenue_data
`);
// Perform ABC classification in a single UPDATE statement. This is MUCH faster.
await connection.query(`
UPDATE product_metrics pm
LEFT JOIN temp_revenue_ranks tr ON pm.pid = tr.pid
SET pm.abc_class =
CASE
WHEN tr.pid IS NULL THEN 'C'
WHEN tr.percentile <= ? THEN 'A'
WHEN tr.percentile <= ? THEN 'B'
ELSE 'C'
END,
pm.last_calculated_at = NOW()
`, [abcThresholds.a_threshold, abcThresholds.b_threshold]);
//Now update turnover rate
await connection.query(`
UPDATE product_metrics pm
JOIN (
SELECT
o.pid,
SUM(o.quantity) as total_sold,
COUNT(DISTINCT DATE(o.date)) as active_days,
AVG(CASE
WHEN p.stock_quantity > 0 THEN p.stock_quantity
ELSE NULL
END) as avg_nonzero_stock
FROM orders o
JOIN products p ON o.pid = p.pid
WHERE o.canceled = false
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 90 DAY)
GROUP BY o.pid
) sales ON pm.pid = sales.pid
SET
pm.turnover_rate = CASE
WHEN sales.avg_nonzero_stock > 0 AND sales.active_days > 0
THEN LEAST(
(sales.total_sold / sales.avg_nonzero_stock) * (365.0 / sales.active_days),
999.99
)
ELSE 0
END,
pm.last_calculated_at = NOW()
`);
processedProducts = totalProducts;
await updateProgress(processedProducts, processedOrders, processedPurchaseOrders);
// Clean up
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_revenue_ranks');
const endTime = Date.now();
const totalElapsedSeconds = Math.round((endTime - startTime) / 1000);
// Update calculate_status for ABC classification
await connection.query(`
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
VALUES ('abc_classification', NOW())
ON DUPLICATE KEY UPDATE last_calculation_timestamp = NOW()
`);
// Final progress update with guaranteed valid numbers
const finalProgress = ensureValidProgress(totalProducts, totalProducts);
// Final success message
global.outputProgress({
status: 'complete',
operation: 'Metrics calculation complete',
current: finalProgress.current,
total: finalProgress.total,
elapsed: global.formatElapsedTime(startTime),
remaining: '0s',
rate: global.calculateRate(startTime, finalProgress.current),
percentage: '100',
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: totalElapsedSeconds
}
});
// Ensure all values are valid numbers before final update
const finalStats = {
processedProducts: Number(processedProducts) || 0,
processedOrders: Number(processedOrders) || 0,
processedPurchaseOrders: Number(processedPurchaseOrders) || 0
};
// Update history with completion
await connection.query(`
UPDATE calculate_history
SET
end_time = NOW(),
duration_seconds = ?,
processed_products = ?,
processed_orders = ?,
processed_purchase_orders = ?,
status = 'completed'
WHERE id = ?
`, [totalElapsedSeconds,
finalStats.processedProducts,
finalStats.processedOrders,
finalStats.processedPurchaseOrders,
calculateHistoryId]);
// Clear progress file on successful completion
global.clearProgress();
} catch (error) {
const endTime = Date.now();
const totalElapsedSeconds = Math.round((endTime - startTime) / 1000);
// Update history with error
await connection.query(`
UPDATE calculate_history
SET
end_time = NOW(),
duration_seconds = ?,
processed_products = ?,
processed_orders = ?,
processed_purchase_orders = ?,
status = ?,
error_message = ?
WHERE id = ?
`, [
totalElapsedSeconds,
processedProducts || 0, // Ensure we have a valid number
processedOrders || 0, // Ensure we have a valid number
processedPurchaseOrders || 0, // Ensure we have a valid number
isCancelled ? 'cancelled' : 'failed',
error.message,
calculateHistoryId
]);
if (isCancelled) {
global.outputProgress({
status: 'cancelled',
operation: 'Calculation cancelled',
current: processedProducts,
total: totalProducts || 0,
elapsed: global.formatElapsedTime(startTime),
remaining: null,
rate: global.calculateRate(startTime, processedProducts),
percentage: ((processedProducts / (totalProducts || 1)) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
} else {
global.outputProgress({
status: 'error',
operation: 'Error: ' + error.message,
current: processedProducts,
total: totalProducts || 0,
elapsed: global.formatElapsedTime(startTime),
remaining: null,
rate: global.calculateRate(startTime, processedProducts),
percentage: ((processedProducts / (totalProducts || 1)) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
}
throw error;
} finally {
if (connection) {
connection.release();
}
}
} finally {
// Close the connection pool when we're done
await closePool();
}
}
// Export both functions and progress checker
module.exports = calculateMetrics;
module.exports.cancelCalculation = cancelCalculation;
module.exports.getProgress = global.getProgress;
// Run directly if called from command line
if (require.main === module) {
calculateMetrics().catch(error => {
if (!error.message.includes('Operation cancelled')) {
console.error('Error:', error);
}
process.exit(1);
});
}