Fixes and enhancements for calculate metrics script

This commit is contained in:
2025-01-12 18:13:05 -05:00
parent 4d3f956a03
commit 35105f9474
3 changed files with 337 additions and 60 deletions

View File

@@ -1,6 +1,7 @@
const mysql = require('mysql2/promise');
const path = require('path');
require('dotenv').config({ path: path.resolve(__dirname, '..', '.env') });
const fs = require('fs');
// Helper function to format elapsed time
function formatElapsedTime(startTime) {
@@ -43,22 +44,59 @@ function calculateRate(startTime, current) {
// Helper function to output progress
function outputProgress(data) {
// Save progress to file for resumption
saveProgress(data);
// Format as SSE event
const event = {
progress: data
};
// Always send to stdout for frontend
process.stdout.write(JSON.stringify(event) + '\n');
// Log significant events to disk
const isSignificant =
// Operation starts
(data.operation && !data.current) ||
// Operation completions and errors
data.status === 'complete' ||
data.status === 'error' ||
// Major phase changes
data.operation?.includes('Starting ABC classification') ||
data.operation?.includes('Starting time-based aggregates') ||
data.operation?.includes('Starting vendor metrics');
if (isSignificant) {
logImport(`${data.operation || 'Operation'}${data.message ? ': ' + data.message : ''}${data.error ? ' Error: ' + data.error : ''}${data.status ? ' Status: ' + data.status : ''}`);
}
}
// Set up logging
const LOG_DIR = path.join(__dirname, '../logs');
const ERROR_LOG = path.join(LOG_DIR, 'import-errors.log');
const IMPORT_LOG = path.join(LOG_DIR, 'import.log');
// Ensure log directory exists
if (!fs.existsSync(LOG_DIR)) {
fs.mkdirSync(LOG_DIR, { recursive: true });
}
// Helper function to log errors
function logError(error, context) {
console.error(JSON.stringify({
progress: {
status: 'error',
error: error.message || error,
context
}
}));
function logError(error, context = '') {
const timestamp = new Date().toISOString();
const errorMessage = `[${timestamp}] ${context}\nError: ${error.message}\nStack: ${error.stack}\n\n`;
// Log to error file
fs.appendFileSync(ERROR_LOG, errorMessage);
// Also log to console
console.error(`\n${context}\nError: ${error.message}`);
}
// Helper function to log import progress
function logImport(message, isSignificant = true) {
const timestamp = new Date().toISOString();
const logMessage = `[${timestamp}] ${message}\n`;
fs.appendFileSync(IMPORT_LOG, logMessage);
}
// Database configuration
@@ -75,8 +113,52 @@ const dbConfig = {
// Add cancel handler
let isCancelled = false;
// Add status file handling for progress resumption
const STATUS_FILE = path.join(__dirname, '..', 'logs', 'metrics-status.json');
function saveProgress(progress) {
try {
fs.writeFileSync(STATUS_FILE, JSON.stringify({
...progress,
timestamp: Date.now()
}));
} catch (err) {
console.error('Failed to save progress:', err);
}
}
function clearProgress() {
try {
if (fs.existsSync(STATUS_FILE)) {
fs.unlinkSync(STATUS_FILE);
}
} catch (err) {
console.error('Failed to clear progress:', err);
}
}
function getProgress() {
try {
if (fs.existsSync(STATUS_FILE)) {
const progress = JSON.parse(fs.readFileSync(STATUS_FILE, 'utf8'));
// Check if the progress is still valid (less than 1 hour old)
if (progress.timestamp && Date.now() - progress.timestamp < 3600000) {
return progress;
} else {
// Clear old progress
clearProgress();
}
}
} catch (err) {
console.error('Failed to read progress:', err);
clearProgress();
}
return null;
}
function cancelCalculation() {
isCancelled = true;
clearProgress();
// Format as SSE event
const event = {
progress: {
@@ -86,18 +168,22 @@ function cancelCalculation() {
total: 0,
elapsed: null,
remaining: null,
rate: 0
rate: 0,
timestamp: Date.now()
}
};
process.stdout.write(JSON.stringify(event) + '\n');
process.exit(0);
}
// Handle SIGTERM signal for cancellation
process.on('SIGTERM', cancelCalculation);
async function calculateMetrics() {
let pool;
const startTime = Date.now();
let processedCount = 0;
let totalProducts = 0; // Initialize at the top
let totalProducts = 0;
try {
isCancelled = false;
@@ -431,7 +517,13 @@ async function calculateMetrics() {
throw err;
});
const threshold = thresholds[0] || { critical_days: 7, reorder_days: 14, overstock_days: 90 };
const threshold = thresholds[0] || {
critical_days: 7,
reorder_days: 14,
overstock_days: 90,
safety_stock_days: 14, // Add default safety stock days
service_level: 95.0 // Add default service level
};
// Calculate metrics
const metrics = salesMetrics[0] || {};
@@ -452,13 +544,13 @@ async function calculateMetrics() {
// Calculate stock status using configurable thresholds with proper handling of zero sales
const stock_status = daily_sales_avg === 0 ? 'New' :
stock.stock_quantity <= Math.max(1, Math.ceil(daily_sales_avg * config.critical_days)) ? 'Critical' :
stock.stock_quantity <= Math.max(1, Math.ceil(daily_sales_avg * config.reorder_days)) ? 'Reorder' :
stock.stock_quantity > Math.max(1, daily_sales_avg * config.overstock_days) ? 'Overstocked' : 'Healthy';
stock.stock_quantity <= Math.max(1, Math.ceil(daily_sales_avg * threshold.critical_days)) ? 'Critical' :
stock.stock_quantity <= Math.max(1, Math.ceil(daily_sales_avg * threshold.reorder_days)) ? 'Reorder' :
stock.stock_quantity > Math.max(1, daily_sales_avg * threshold.overstock_days) ? 'Overstocked' : 'Healthy';
// Calculate safety stock using configured values
// Calculate safety stock using configured values with proper defaults
const safety_stock = daily_sales_avg > 0 ?
Math.max(1, Math.ceil(daily_sales_avg * config.safety_stock_days * (config.service_level / 100))) :
Math.max(1, Math.ceil(daily_sales_avg * (threshold.safety_stock_days || 14) * ((threshold.service_level || 95.0) / 100))) :
null;
// Add to batch update
@@ -473,8 +565,8 @@ async function calculateMetrics() {
metrics.last_sale_date || null,
daily_sales_avg > 0 ? stock.stock_quantity / daily_sales_avg : null,
weekly_sales_avg > 0 ? stock.stock_quantity / weekly_sales_avg : null,
daily_sales_avg > 0 ? Math.max(1, Math.ceil(daily_sales_avg * config.reorder_days)) : null,
daily_sales_avg > 0 ? Math.max(1, Math.ceil(daily_sales_avg * config.critical_days)) : null,
daily_sales_avg > 0 ? Math.max(1, Math.ceil(daily_sales_avg * threshold.reorder_days)) : null,
daily_sales_avg > 0 ? Math.max(1, Math.ceil(daily_sales_avg * threshold.critical_days)) : null,
margin_percent,
metrics.total_revenue || 0,
inventory_value || 0,
@@ -542,18 +634,38 @@ async function calculateMetrics() {
}
// Update progress for ABC classification
if (isCancelled) {
throw new Error('Operation cancelled');
}
outputProgress({
status: 'running',
operation: 'Calculating ABC classification',
current: totalProducts,
operation: 'Starting ABC classification',
current: Math.floor(totalProducts * 0.7), // Start from 70% after product processing
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, totalProducts, totalProducts),
rate: calculateRate(startTime, totalProducts),
percentage: '100'
remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.7), totalProducts),
rate: calculateRate(startTime, Math.floor(totalProducts * 0.7)),
percentage: '70'
});
// Calculate ABC classification using configured thresholds
if (isCancelled) {
throw new Error('Operation cancelled');
}
const [abcConfig] = await connection.query('SELECT a_threshold, b_threshold FROM abc_classification_config WHERE id = 1');
const abcThresholds = abcConfig[0] || { a_threshold: 20, b_threshold: 50 };
outputProgress({
status: 'running',
operation: 'Calculating ABC rankings',
current: Math.floor(totalProducts * 0.8),
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.8), totalProducts),
rate: calculateRate(startTime, Math.floor(totalProducts * 0.8)),
percentage: '80'
});
await connection.query(`
WITH revenue_rankings AS (
SELECT
@@ -576,23 +688,40 @@ async function calculateMetrics() {
JOIN classification_update cu ON pm.product_id = cu.product_id
SET pm.abc_class = cu.abc_class,
pm.last_calculated_at = NOW()
`, [config.abc_a_threshold, config.abc_b_threshold]);
`, [abcThresholds.a_threshold, abcThresholds.b_threshold]);
// Update progress for time-based aggregates
if (isCancelled) {
throw new Error('Operation cancelled');
}
outputProgress({
status: 'running',
operation: 'Calculating time-based aggregates',
current: totalProducts,
operation: 'Starting time-based aggregates calculation',
current: Math.floor(totalProducts * 0.85),
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, totalProducts, totalProducts),
rate: calculateRate(startTime, totalProducts),
percentage: '100'
remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.85), totalProducts),
rate: calculateRate(startTime, Math.floor(totalProducts * 0.85)),
percentage: '85'
});
// Calculate time-based aggregates
if (isCancelled) {
throw new Error('Operation cancelled');
}
await connection.query('TRUNCATE TABLE product_time_aggregates;');
outputProgress({
status: 'running',
operation: 'Calculating sales aggregates',
current: Math.floor(totalProducts * 0.9),
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.9), totalProducts),
rate: calculateRate(startTime, Math.floor(totalProducts * 0.9)),
percentage: '90'
});
await connection.query(`
INSERT INTO product_time_aggregates (
product_id,
@@ -677,10 +806,9 @@ async function calculateMetrics() {
WHERE s.product_id IS NULL
`);
// Update progress for vendor metrics
outputProgress({
status: 'running',
operation: 'Calculating vendor metrics',
operation: 'Time-based aggregates complete',
current: totalProducts,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
@@ -689,7 +817,25 @@ async function calculateMetrics() {
percentage: '100'
});
// Calculate vendor metrics
// Update progress for vendor metrics
if (isCancelled) {
throw new Error('Operation cancelled');
}
outputProgress({
status: 'running',
operation: 'Starting vendor metrics calculation',
current: Math.floor(totalProducts * 0.95),
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.95), totalProducts),
rate: calculateRate(startTime, Math.floor(totalProducts * 0.95)),
percentage: '95'
});
// Calculate vendor metrics with fixed order fill rate calculation
if (isCancelled) {
throw new Error('Operation cancelled');
}
await connection.query(`
INSERT INTO vendor_metrics (
vendor,
@@ -704,8 +850,14 @@ async function calculateMetrics() {
vendor,
NOW() as last_calculated_at,
COALESCE(AVG(DATEDIFF(received_date, date)), 0) as avg_lead_time_days,
COALESCE((COUNT(CASE WHEN DATEDIFF(received_date, date) <= 14 THEN 1 END) * 100.0 / NULLIF(COUNT(*), 0)), 0) as on_time_delivery_rate,
COALESCE((SUM(received) * 100.0 / NULLIF(SUM(ordered), 0)), 0) as order_fill_rate,
COALESCE(
(COUNT(CASE WHEN DATEDIFF(received_date, date) <= 14 THEN 1 END) * 100.0 / NULLIF(COUNT(*), 0)),
0
) as on_time_delivery_rate,
CASE
WHEN SUM(ordered) = 0 THEN 0
ELSE LEAST(100, GREATEST(0, (SUM(CASE WHEN received >= 0 THEN received ELSE 0 END) * 100.0 / SUM(ordered))))
END as order_fill_rate,
COUNT(DISTINCT po_id) as total_orders,
COUNT(CASE WHEN DATEDIFF(received_date, date) > 14 THEN 1 END) as total_late_orders
FROM purchase_orders
@@ -720,6 +872,17 @@ async function calculateMetrics() {
total_late_orders = VALUES(total_late_orders)
`);
outputProgress({
status: 'running',
operation: 'Vendor metrics complete',
current: Math.floor(totalProducts * 0.98),
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.98), totalProducts),
rate: calculateRate(startTime, Math.floor(totalProducts * 0.98)),
percentage: '98'
});
// Final success message
outputProgress({
status: 'complete',
@@ -732,13 +895,16 @@ async function calculateMetrics() {
percentage: '100'
});
// Clear progress file on successful completion
clearProgress();
} catch (error) {
if (isCancelled) {
outputProgress({
status: 'cancelled',
operation: 'Calculation cancelled',
current: processedCount,
total: totalProducts || 0, // Use 0 if not yet defined
total: totalProducts || 0,
elapsed: formatElapsedTime(startTime),
remaining: null,
rate: calculateRate(startTime, processedCount),
@@ -749,7 +915,7 @@ async function calculateMetrics() {
status: 'error',
operation: 'Error: ' + error.message,
current: processedCount,
total: totalProducts || 0, // Use 0 if not yet defined
total: totalProducts || 0,
elapsed: formatElapsedTime(startTime),
remaining: null,
rate: calculateRate(startTime, processedCount),
@@ -767,9 +933,10 @@ async function calculateMetrics() {
}
}
// Export both functions
// Export both functions and progress checker
module.exports = calculateMetrics;
module.exports.cancelCalculation = cancelCalculation;
module.exports.getProgress = getProgress;
// Run directly if called from command line
if (require.main === module) {

View File

@@ -141,8 +141,12 @@ router.get('/calculate-metrics/progress', (req, res) => {
'Access-Control-Allow-Credentials': 'true'
});
// Send an initial message to test the connection
res.write('data: {"status":"running","operation":"Initializing connection..."}\n\n');
// Send current progress if it exists
if (importProgress) {
res.write(`data: ${JSON.stringify(importProgress)}\n\n`);
} else {
res.write('data: {"status":"running","operation":"Initializing connection..."}\n\n');
}
// Add this client to the calculate-metrics set
calculateMetricsClients.add(res);
@@ -168,6 +172,21 @@ router.get('/status', (req, res) => {
});
});
// Add calculate-metrics status endpoint
router.get('/calculate-metrics/status', (req, res) => {
console.log('Calculate metrics status endpoint hit');
const calculateMetrics = require('../../scripts/calculate-metrics');
const progress = calculateMetrics.getProgress();
// Only consider it active if both the process is running and we have progress
const isActive = !!activeImport && !!progress;
res.json({
active: isActive,
progress: isActive ? progress : null
});
});
// Route to update CSV files
router.post('/update', async (req, res, next) => {
if (activeImport) {
@@ -532,6 +551,20 @@ router.post('/reset-metrics', async (req, res) => {
}
});
// Add calculate-metrics status endpoint
router.get('/calculate-metrics/status', (req, res) => {
const calculateMetrics = require('../../scripts/calculate-metrics');
const progress = calculateMetrics.getProgress();
// Only consider it active if both the process is running and we have progress
const isActive = !!activeImport && !!progress;
res.json({
active: isActive,
progress: isActive ? progress : null
});
});
// Add calculate-metrics endpoint
router.post('/calculate-metrics', async (req, res) => {
if (activeImport) {
@@ -554,16 +587,18 @@ router.post('/calculate-metrics', async (req, res) => {
try {
// Try to parse as JSON
const jsonData = JSON.parse(output);
sendProgressToClients(calculateMetricsClients, {
importProgress = {
status: 'running',
...jsonData
});
...jsonData.progress
};
sendProgressToClients(calculateMetricsClients, importProgress);
} catch (e) {
// If not JSON, send as plain progress
sendProgressToClients(calculateMetricsClients, {
importProgress = {
status: 'running',
progress: output
});
};
sendProgressToClients(calculateMetricsClients, importProgress);
}
});
@@ -574,15 +609,17 @@ router.post('/calculate-metrics', async (req, res) => {
try {
// Try to parse as JSON
const jsonData = JSON.parse(error);
sendProgressToClients(calculateMetricsClients, {
importProgress = {
status: 'error',
...jsonData
});
...jsonData.progress
};
sendProgressToClients(calculateMetricsClients, importProgress);
} catch {
sendProgressToClients(calculateMetricsClients, {
importProgress = {
status: 'error',
error
});
};
sendProgressToClients(calculateMetricsClients, importProgress);
}
});
@@ -590,22 +627,24 @@ router.post('/calculate-metrics', async (req, res) => {
activeImport.on('close', (code, signal) => {
wasCancelled = signal === 'SIGTERM' || code === 143;
activeImport = null;
importProgress = null;
if (code === 0 || wasCancelled) {
if (wasCancelled) {
sendProgressToClients(calculateMetricsClients, {
importProgress = {
status: 'cancelled',
operation: 'Operation cancelled'
});
};
sendProgressToClients(calculateMetricsClients, importProgress);
} else {
sendProgressToClients(calculateMetricsClients, {
importProgress = {
status: 'complete',
operation: 'Metrics calculation complete'
});
};
sendProgressToClients(calculateMetricsClients, importProgress);
}
resolve();
} else {
importProgress = null;
reject(new Error(`Metrics calculation process exited with code ${code}`));
}
});

View File

@@ -1,4 +1,4 @@
import { useState } from 'react';
import { useState, useEffect } from 'react';
import { Button } from "@/components/ui/button";
import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/components/ui/card";
import { Progress } from "@/components/ui/progress";
@@ -61,6 +61,77 @@ export function DataManagement() {
const [isCalculatingMetrics, setIsCalculatingMetrics] = useState(false);
const [metricsProgress, setMetricsProgress] = useState<ImportProgress | null>(null);
// Helper to check if any operation is running
const isAnyOperationRunning = () => {
return isUpdating || isImporting || isResetting || isResettingMetrics || isCalculatingMetrics;
};
// Check status on mount
useEffect(() => {
const checkStatus = async () => {
try {
// Check calculate-metrics status first
const metricsResponse = await fetch(`${config.apiUrl}/csv/calculate-metrics/status`, {
credentials: 'include'
});
const metricsData = await metricsResponse.json();
if (metricsData.active && metricsData.progress) {
setIsCalculatingMetrics(true);
setMetricsProgress(metricsData.progress);
connectToEventSource('calculate-metrics');
return;
} else {
setIsCalculatingMetrics(false);
setMetricsProgress(null);
}
// Check other operations
const response = await fetch(`${config.apiUrl}/csv/status`, {
credentials: 'include'
});
const data = await response.json();
if (data.active && data.progress) {
if (data.progress?.operation?.toLowerCase().includes('import')) {
setIsImporting(true);
setImportProgress(data.progress);
connectToEventSource('import');
} else if (data.progress?.operation?.toLowerCase().includes('update')) {
setIsUpdating(true);
setUpdateProgress(data.progress);
connectToEventSource('update');
} else if (data.progress?.operation?.toLowerCase().includes('reset')) {
setIsResetting(true);
setResetProgress(data.progress);
connectToEventSource('reset');
}
} else {
// Reset all states if no active process
setIsImporting(false);
setIsUpdating(false);
setIsResetting(false);
setImportProgress(null);
setUpdateProgress(null);
setResetProgress(null);
}
} catch (error) {
console.error('Error checking status:', error);
// Reset all states on error
setIsCalculatingMetrics(false);
setIsImporting(false);
setIsUpdating(false);
setIsResetting(false);
setMetricsProgress(null);
setImportProgress(null);
setUpdateProgress(null);
setResetProgress(null);
}
};
checkStatus();
}, []);
// Helper to connect to event source
const connectToEventSource = (type: 'update' | 'import' | 'reset' | 'reset-metrics' | 'calculate-metrics') => {
console.log(`Setting up EventSource for ${type}...`);
@@ -541,7 +612,7 @@ export function DataManagement() {
<Button
className="flex-1"
onClick={handleUpdateCSV}
disabled={isUpdating || isImporting}
disabled={isAnyOperationRunning()}
>
{isUpdating ? (
<>
@@ -581,7 +652,7 @@ export function DataManagement() {
<Button
className="flex-1"
onClick={handleImportCSV}
disabled={isImporting || isUpdating || isResetting}
disabled={isAnyOperationRunning()}
>
{isImporting ? (
<>
@@ -626,7 +697,7 @@ export function DataManagement() {
<Button
className="flex-1"
onClick={handleCalculateMetrics}
disabled={isCalculatingMetrics || isImporting || isUpdating || isResetting || isResettingMetrics}
disabled={isAnyOperationRunning()}
>
{isCalculatingMetrics ? (
<>
@@ -668,7 +739,7 @@ export function DataManagement() {
<Button
variant="destructive"
className="flex-1 min-w-[140px]"
disabled={isResetting || isImporting || isUpdating || isResettingMetrics}
disabled={isAnyOperationRunning()}
>
{isResetting ? (
<>
@@ -699,7 +770,7 @@ export function DataManagement() {
<Button
variant="destructive"
className="flex-1 min-w-[140px]"
disabled={isResetting || isImporting || isUpdating || isResettingMetrics}
disabled={isAnyOperationRunning()}
>
Reset Metrics Only
</Button>