const mysql = require('mysql2/promise'); const path = require('path'); require('dotenv').config({ path: path.resolve(__dirname, '..', '.env') }); const fs = require('fs'); // Configuration flags const SKIP_PRODUCT_METRICS = 0; // Helper function to format elapsed time function formatElapsedTime(startTime) { const elapsed = Date.now() - startTime; const seconds = Math.floor(elapsed / 1000); const minutes = Math.floor(seconds / 60); const hours = Math.floor(minutes / 60); if (hours > 0) { return `${hours}h ${minutes % 60}m`; } else if (minutes > 0) { return `${minutes}m ${seconds % 60}s`; } else { return `${seconds}s`; } } // Helper function to estimate remaining time function estimateRemaining(startTime, current, total) { if (current === 0) return null; const elapsed = Date.now() - startTime; const rate = current / elapsed; const remaining = (total - current) / rate; const minutes = Math.floor(remaining / 60000); const seconds = Math.floor((remaining % 60000) / 1000); if (minutes > 0) { return `${minutes}m ${seconds}s`; } else { return `${seconds}s`; } } // Helper function to calculate rate function calculateRate(startTime, current) { const elapsed = (Date.now() - startTime) / 1000; // Convert to seconds return elapsed > 0 ? Math.round(current / elapsed) : 0; } // Helper function to output progress function outputProgress(data) { // Save progress to file for resumption saveProgress(data); // Format as SSE event const event = { progress: data }; // Always send to stdout for frontend process.stdout.write(JSON.stringify(event) + '\n'); // Log significant events to disk const isSignificant = // Operation starts (data.operation && !data.current) || // Operation completions and errors data.status === 'complete' || data.status === 'error' || // Major phase changes data.operation?.includes('Starting ABC classification') || data.operation?.includes('Starting time-based aggregates') || data.operation?.includes('Starting vendor metrics'); if (isSignificant) { logImport(`${data.operation || 'Operation'}${data.message ? ': ' + data.message : ''}${data.error ? ' Error: ' + data.error : ''}${data.status ? ' Status: ' + data.status : ''}`); } } // Set up logging const LOG_DIR = path.join(__dirname, '../logs'); const ERROR_LOG = path.join(LOG_DIR, 'import-errors.log'); const IMPORT_LOG = path.join(LOG_DIR, 'import.log'); // Ensure log directory exists if (!fs.existsSync(LOG_DIR)) { fs.mkdirSync(LOG_DIR, { recursive: true }); } // Helper function to log errors function logError(error, context = '') { const timestamp = new Date().toISOString(); const errorMessage = `[${timestamp}] ${context}\nError: ${error.message}\nStack: ${error.stack}\n\n`; // Log to error file fs.appendFileSync(ERROR_LOG, errorMessage); // Also log to console console.error(`\n${context}\nError: ${error.message}`); } // Helper function to log import progress function logImport(message, isSignificant = true) { const timestamp = new Date().toISOString(); const logMessage = `[${timestamp}] ${message}\n`; fs.appendFileSync(IMPORT_LOG, logMessage); } // Database configuration const dbConfig = { host: process.env.DB_HOST, user: process.env.DB_USER, password: process.env.DB_PASSWORD, database: process.env.DB_NAME, waitForConnections: true, connectionLimit: 10, queueLimit: 0, // Add performance optimizations namedPlaceholders: true, maxPreparedStatements: 256, enableKeepAlive: true, keepAliveInitialDelay: 0, // Add memory optimizations flags: [ 'FOUND_ROWS', 'LONG_PASSWORD', 'PROTOCOL_41', 'TRANSACTIONS', 'SECURE_CONNECTION', 'MULTI_RESULTS', 'PS_MULTI_RESULTS', 'PLUGIN_AUTH', 'CONNECT_ATTRS', 'PLUGIN_AUTH_LENENC_CLIENT_DATA', 'SESSION_TRACK', 'MULTI_STATEMENTS' ] }; // Add cancel handler let isCancelled = false; // Add status file handling for progress resumption const STATUS_FILE = path.join(__dirname, '..', 'logs', 'metrics-status.json'); function saveProgress(progress) { try { fs.writeFileSync(STATUS_FILE, JSON.stringify({ ...progress, timestamp: Date.now() })); } catch (err) { console.error('Failed to save progress:', err); } } function clearProgress() { try { if (fs.existsSync(STATUS_FILE)) { fs.unlinkSync(STATUS_FILE); } } catch (err) { console.error('Failed to clear progress:', err); } } function getProgress() { try { if (fs.existsSync(STATUS_FILE)) { const progress = JSON.parse(fs.readFileSync(STATUS_FILE, 'utf8')); // Check if the progress is still valid (less than 1 hour old) if (progress.timestamp && Date.now() - progress.timestamp < 3600000) { return progress; } else { // Clear old progress clearProgress(); } } } catch (err) { console.error('Failed to read progress:', err); clearProgress(); } return null; } function cancelCalculation() { isCancelled = true; clearProgress(); // Format as SSE event const event = { progress: { status: 'cancelled', operation: 'Calculation cancelled', current: 0, total: 0, elapsed: null, remaining: null, rate: 0, timestamp: Date.now() } }; process.stdout.write(JSON.stringify(event) + '\n'); process.exit(0); } // Handle SIGTERM signal for cancellation process.on('SIGTERM', cancelCalculation); // Calculate GMROI and other financial metrics async function calculateFinancialMetrics(connection, startTime, totalProducts) { outputProgress({ status: 'running', operation: 'Calculating financial metrics', current: Math.floor(totalProducts * 0.6), total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.6), totalProducts), rate: calculateRate(startTime, Math.floor(totalProducts * 0.6)), percentage: '60' }); await connection.query(` UPDATE product_metrics pm JOIN ( SELECT p.product_id, p.cost_price * p.stock_quantity as inventory_value, SUM(o.quantity * o.price) as total_revenue, SUM(o.quantity * p.cost_price) as cost_of_goods_sold, SUM(o.quantity * (o.price - p.cost_price)) as gross_profit, MIN(o.date) as first_sale_date, MAX(o.date) as last_sale_date, DATEDIFF(MAX(o.date), MIN(o.date)) + 1 as calculation_period_days FROM products p LEFT JOIN orders o ON p.product_id = o.product_id WHERE o.canceled = false AND DATE(o.date) >= DATE_SUB(CURDATE(), INTERVAL 12 MONTH) GROUP BY p.product_id ) fin ON pm.product_id = fin.product_id SET pm.inventory_value = COALESCE(fin.inventory_value, 0), pm.total_revenue = COALESCE(fin.total_revenue, 0), pm.cost_of_goods_sold = COALESCE(fin.cost_of_goods_sold, 0), pm.gross_profit = COALESCE(fin.gross_profit, 0), pm.gmroi = CASE WHEN COALESCE(fin.inventory_value, 0) > 0 AND fin.calculation_period_days > 0 THEN (COALESCE(fin.gross_profit, 0) * (365.0 / fin.calculation_period_days)) / COALESCE(fin.inventory_value, 0) ELSE 0 END `); // Update time-based aggregates with financial metrics await connection.query(` UPDATE product_time_aggregates pta JOIN ( SELECT p.product_id, YEAR(o.date) as year, MONTH(o.date) as month, p.cost_price * p.stock_quantity as inventory_value, SUM(o.quantity * (o.price - p.cost_price)) as gross_profit, COUNT(DISTINCT DATE(o.date)) as days_in_period FROM products p LEFT JOIN orders o ON p.product_id = o.product_id WHERE o.canceled = false GROUP BY p.product_id, YEAR(o.date), MONTH(o.date) ) fin ON pta.product_id = fin.product_id AND pta.year = fin.year AND pta.month = fin.month SET pta.inventory_value = COALESCE(fin.inventory_value, 0), pta.gmroi = CASE WHEN COALESCE(fin.inventory_value, 0) > 0 AND fin.days_in_period > 0 THEN (COALESCE(fin.gross_profit, 0) * (365.0 / fin.days_in_period)) / COALESCE(fin.inventory_value, 0) ELSE 0 END `); } // Calculate vendor metrics async function calculateVendorMetrics(connection, startTime, totalProducts) { outputProgress({ status: 'running', operation: 'Calculating vendor metrics', current: Math.floor(totalProducts * 0.7), total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.7), totalProducts), rate: calculateRate(startTime, Math.floor(totalProducts * 0.7)), percentage: '70' }); // First, ensure all vendors exist in vendor_details await connection.query(` INSERT IGNORE INTO vendor_details (vendor, status) SELECT DISTINCT vendor, 'active' as status FROM products WHERE vendor IS NOT NULL AND vendor NOT IN (SELECT vendor FROM vendor_details) `); // Calculate vendor performance metrics await connection.query(` INSERT INTO vendor_metrics ( vendor, avg_lead_time_days, on_time_delivery_rate, order_fill_rate, total_orders, total_late_orders, total_purchase_value, avg_order_value, active_products, total_products, total_revenue, avg_margin_percent, status ) WITH vendor_orders AS ( SELECT po.vendor, AVG(DATEDIFF(po.received_date, po.date)) as avg_lead_time_days, COUNT(*) as total_orders, COUNT(CASE WHEN po.received_date > po.expected_date THEN 1 END) as total_late_orders, SUM(po.cost_price * po.ordered) as total_purchase_value, AVG(po.cost_price * po.ordered) as avg_order_value, CASE WHEN COUNT(*) > 0 THEN (COUNT(CASE WHEN po.received = po.ordered THEN 1 END) * 100.0) / COUNT(*) ELSE 0 END as order_fill_rate FROM purchase_orders po WHERE po.status = 'closed' GROUP BY po.vendor ), vendor_products AS ( SELECT p.vendor, COUNT(DISTINCT p.product_id) as total_products, COUNT(DISTINCT CASE WHEN p.visible = true THEN p.product_id END) as active_products, SUM(o.price * o.quantity) as total_revenue, CASE WHEN SUM(o.price * o.quantity) > 0 THEN (SUM((o.price - p.cost_price) * o.quantity) * 100.0) / SUM(o.price * o.quantity) ELSE 0 END as avg_margin_percent FROM products p LEFT JOIN orders o ON p.product_id = o.product_id AND o.canceled = false GROUP BY p.vendor ) SELECT vd.vendor, COALESCE(vo.avg_lead_time_days, 0) as avg_lead_time_days, CASE WHEN COALESCE(vo.total_orders, 0) > 0 THEN ((COALESCE(vo.total_orders, 0) - COALESCE(vo.total_late_orders, 0)) * 100.0) / COALESCE(vo.total_orders, 1) ELSE 0 END as on_time_delivery_rate, COALESCE(vo.order_fill_rate, 0) as order_fill_rate, COALESCE(vo.total_orders, 0) as total_orders, COALESCE(vo.total_late_orders, 0) as total_late_orders, COALESCE(vo.total_purchase_value, 0) as total_purchase_value, COALESCE(vo.avg_order_value, 0) as avg_order_value, COALESCE(vp.active_products, 0) as active_products, COALESCE(vp.total_products, 0) as total_products, COALESCE(vp.total_revenue, 0) as total_revenue, COALESCE(vp.avg_margin_percent, 0) as avg_margin_percent, vd.status FROM vendor_details vd LEFT JOIN vendor_orders vo ON vd.vendor = vo.vendor LEFT JOIN vendor_products vp ON vd.vendor = vp.vendor ON DUPLICATE KEY UPDATE avg_lead_time_days = VALUES(avg_lead_time_days), on_time_delivery_rate = VALUES(on_time_delivery_rate), order_fill_rate = VALUES(order_fill_rate), total_orders = VALUES(total_orders), total_late_orders = VALUES(total_late_orders), total_purchase_value = VALUES(total_purchase_value), avg_order_value = VALUES(avg_order_value), active_products = VALUES(active_products), total_products = VALUES(total_products), total_revenue = VALUES(total_revenue), avg_margin_percent = VALUES(avg_margin_percent), status = VALUES(status), last_calculated_at = CURRENT_TIMESTAMP `); // Calculate vendor time-based metrics await connection.query(` INSERT INTO vendor_time_metrics ( vendor, year, month, total_orders, late_orders, avg_lead_time_days, total_purchase_value, total_revenue, avg_margin_percent ) WITH vendor_time_data AS ( SELECT vd.vendor, YEAR(po.date) as year, MONTH(po.date) as month, COUNT(DISTINCT po.po_id) as total_orders, COUNT(DISTINCT CASE WHEN po.received_date > po.expected_date THEN po.po_id END) as late_orders, AVG(DATEDIFF(po.received_date, po.date)) as avg_lead_time_days, SUM(po.cost_price * po.ordered) as total_purchase_value, SUM(o.price * o.quantity) as total_revenue, CASE WHEN SUM(o.price * o.quantity) > 0 THEN (SUM((o.price - p.cost_price) * o.quantity) * 100.0) / SUM(o.price * o.quantity) ELSE 0 END as avg_margin_percent FROM vendor_details vd LEFT JOIN products p ON vd.vendor = p.vendor LEFT JOIN purchase_orders po ON p.product_id = po.product_id LEFT JOIN orders o ON p.product_id = o.product_id AND o.canceled = false WHERE po.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH) GROUP BY vd.vendor, YEAR(po.date), MONTH(po.date) ) SELECT vendor, year, month, COALESCE(total_orders, 0) as total_orders, COALESCE(late_orders, 0) as late_orders, COALESCE(avg_lead_time_days, 0) as avg_lead_time_days, COALESCE(total_purchase_value, 0) as total_purchase_value, COALESCE(total_revenue, 0) as total_revenue, COALESCE(avg_margin_percent, 0) as avg_margin_percent FROM vendor_time_data ON DUPLICATE KEY UPDATE total_orders = VALUES(total_orders), late_orders = VALUES(late_orders), avg_lead_time_days = VALUES(avg_lead_time_days), total_purchase_value = VALUES(total_purchase_value), total_revenue = VALUES(total_revenue), avg_margin_percent = VALUES(avg_margin_percent) `); } async function calculateCategoryMetrics(connection, startTime, totalProducts) { outputProgress({ status: 'running', operation: 'Calculating category metrics', current: Math.floor(totalProducts * 0.85), total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.85), totalProducts), rate: calculateRate(startTime, Math.floor(totalProducts * 0.85)), percentage: '85' }); // Calculate category performance metrics await connection.query(` INSERT INTO category_metrics ( category_id, product_count, active_products, total_value, avg_margin, turnover_rate, growth_rate, status ) WITH category_sales AS ( SELECT c.id as category_id, COUNT(DISTINCT p.product_id) as product_count, COUNT(DISTINCT CASE WHEN p.visible = true THEN p.product_id END) as active_products, SUM(p.stock_quantity * p.cost_price) as total_value, CASE WHEN SUM(o.price * o.quantity) > 0 THEN (SUM((o.price - p.cost_price) * o.quantity) * 100.0) / SUM(o.price * o.quantity) ELSE 0 END as avg_margin, CASE WHEN AVG(GREATEST(p.stock_quantity, 0)) >= 0.01 THEN LEAST( SUM(CASE WHEN o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 1 YEAR) THEN COALESCE(o.quantity, 0) ELSE 0 END) / GREATEST( AVG(GREATEST(p.stock_quantity, 0)), 1.0 ), 999.99 ) ELSE 0 END as turnover_rate, -- Current period (last 3 months) SUM(CASE WHEN o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 3 MONTH) THEN COALESCE(o.quantity * o.price, 0) ELSE 0 END) as current_period_sales, -- Previous year same period SUM(CASE WHEN o.date BETWEEN DATE_SUB(CURRENT_DATE, INTERVAL 15 MONTH) AND DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH) THEN COALESCE(o.quantity * o.price, 0) ELSE 0 END) as previous_year_period_sales, c.status FROM categories c LEFT JOIN product_categories pc ON c.id = pc.category_id LEFT JOIN products p ON pc.product_id = p.product_id LEFT JOIN orders o ON p.product_id = o.product_id AND o.canceled = false GROUP BY c.id, c.status ) SELECT category_id, product_count, active_products, total_value, COALESCE(avg_margin, 0) as avg_margin, COALESCE(turnover_rate, 0) as turnover_rate, -- Enhanced YoY growth rate calculation CASE WHEN previous_year_period_sales = 0 AND current_period_sales > 0 THEN 100.0 WHEN previous_year_period_sales = 0 THEN 0.0 ELSE LEAST( GREATEST( ((current_period_sales - previous_year_period_sales) / NULLIF(previous_year_period_sales, 0)) * 100.0, -100.0 ), 999.99 ) END as growth_rate, status FROM category_sales ON DUPLICATE KEY UPDATE product_count = VALUES(product_count), active_products = VALUES(active_products), total_value = VALUES(total_value), avg_margin = VALUES(avg_margin), turnover_rate = VALUES(turnover_rate), growth_rate = VALUES(growth_rate), status = VALUES(status), last_calculated_at = CURRENT_TIMESTAMP `); // Calculate category time-based metrics await connection.query(` INSERT INTO category_time_metrics ( category_id, year, month, product_count, active_products, total_value, total_revenue, avg_margin, turnover_rate ) SELECT c.id as category_id, YEAR(o.date) as year, MONTH(o.date) as month, COUNT(DISTINCT p.product_id) as product_count, COUNT(DISTINCT CASE WHEN p.visible = true THEN p.product_id END) as active_products, SUM(p.stock_quantity * p.cost_price) as total_value, SUM(o.price * o.quantity) as total_revenue, CASE WHEN SUM(o.price * o.quantity) > 0 THEN (SUM((o.price - p.cost_price) * o.quantity) * 100.0) / SUM(o.price * o.quantity) ELSE 0 END as avg_margin, CASE WHEN AVG(p.stock_quantity) > 0 THEN SUM(o.quantity) / AVG(p.stock_quantity) ELSE 0 END as turnover_rate FROM categories c LEFT JOIN product_categories pc ON c.id = pc.category_id LEFT JOIN products p ON pc.product_id = p.product_id LEFT JOIN orders o ON p.product_id = o.product_id AND o.canceled = false WHERE o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH) GROUP BY c.id, YEAR(o.date), MONTH(o.date) ON DUPLICATE KEY UPDATE product_count = VALUES(product_count), active_products = VALUES(active_products), total_value = VALUES(total_value), total_revenue = VALUES(total_revenue), avg_margin = VALUES(avg_margin), turnover_rate = VALUES(turnover_rate) `); } // Calculate turnover rate metrics async function calculateTurnoverMetrics(connection, startTime, totalProducts) { outputProgress({ status: 'running', operation: 'Calculating turnover metrics', current: Math.floor(totalProducts * 0.75), total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.75), totalProducts), rate: calculateRate(startTime, Math.floor(totalProducts * 0.75)), percentage: '75' }); await connection.query(` WITH product_turnover AS ( SELECT p.product_id, p.vendor, pc.category_id, COALESCE( (SELECT calculation_period_days FROM turnover_config tc WHERE tc.category_id = pc.category_id AND tc.vendor = p.vendor LIMIT 1), COALESCE( (SELECT calculation_period_days FROM turnover_config tc WHERE tc.category_id = pc.category_id AND tc.vendor IS NULL LIMIT 1), COALESCE( (SELECT calculation_period_days FROM turnover_config tc WHERE tc.category_id IS NULL AND tc.vendor = p.vendor LIMIT 1), (SELECT calculation_period_days FROM turnover_config WHERE category_id IS NULL AND vendor IS NULL LIMIT 1) ) ) ) as calculation_period_days, -- Calculate average daily sales over the calculation period SUM(o.quantity) / GREATEST(DATEDIFF(CURDATE(), DATE_SUB(CURDATE(), INTERVAL 30 DAY)), 1) as avg_daily_sales, -- Calculate average stock level, excluding zero stock periods AVG(CASE WHEN p.stock_quantity > 0 THEN p.stock_quantity ELSE NULL END) as avg_stock_level FROM products p LEFT JOIN product_categories pc ON p.product_id = pc.product_id LEFT JOIN orders o ON p.product_id = o.product_id WHERE o.date >= DATE_SUB(CURDATE(), INTERVAL 30 DAY) AND o.canceled = 0 GROUP BY p.product_id, p.vendor, pc.category_id ) UPDATE product_metrics pm JOIN product_turnover pt ON pm.product_id = pt.product_id SET pm.turnover_rate = CASE WHEN pt.avg_stock_level > 0 AND pt.avg_daily_sales > 0 THEN -- Calculate annualized turnover rate LEAST( (pt.avg_daily_sales * 365) / GREATEST(pt.avg_stock_level, 1), 999999.999 ) ELSE 0 END, pm.last_calculated_at = NOW() `); } // Enhance lead time calculations async function calculateLeadTimeMetrics(connection, startTime, totalProducts) { outputProgress({ status: 'running', operation: 'Calculating lead time metrics', current: Math.floor(totalProducts * 0.8), total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.8), totalProducts), rate: calculateRate(startTime, Math.floor(totalProducts * 0.8)), percentage: '80' }); await connection.query(` WITH lead_time_stats AS ( SELECT p.product_id, p.vendor, pc.category_id, AVG(DATEDIFF(po.received_date, po.date)) as current_lead_time, ( SELECT target_days FROM lead_time_thresholds lt WHERE lt.category_id = pc.category_id AND lt.vendor = p.vendor LIMIT 1 ) as target_lead_time_cat_vendor, ( SELECT target_days FROM lead_time_thresholds lt WHERE lt.category_id = pc.category_id AND lt.vendor IS NULL LIMIT 1 ) as target_lead_time_cat, ( SELECT target_days FROM lead_time_thresholds lt WHERE lt.category_id IS NULL AND lt.vendor = p.vendor LIMIT 1 ) as target_lead_time_vendor, ( SELECT target_days FROM lead_time_thresholds WHERE category_id IS NULL AND vendor IS NULL LIMIT 1 ) as target_lead_time_default, ( SELECT warning_days FROM lead_time_thresholds lt WHERE lt.category_id = pc.category_id AND lt.vendor = p.vendor LIMIT 1 ) as warning_lead_time_cat_vendor, ( SELECT warning_days FROM lead_time_thresholds lt WHERE lt.category_id = pc.category_id AND lt.vendor IS NULL LIMIT 1 ) as warning_lead_time_cat, ( SELECT warning_days FROM lead_time_thresholds lt WHERE lt.category_id IS NULL AND lt.vendor = p.vendor LIMIT 1 ) as warning_lead_time_vendor, ( SELECT warning_days FROM lead_time_thresholds WHERE category_id IS NULL AND vendor IS NULL LIMIT 1 ) as warning_lead_time_default FROM products p LEFT JOIN product_categories pc ON p.product_id = pc.product_id LEFT JOIN purchase_orders po ON p.product_id = po.product_id WHERE po.status = 'completed' AND po.received_date IS NOT NULL AND po.date >= DATE_SUB(CURDATE(), INTERVAL 90 DAY) GROUP BY p.product_id, p.vendor, pc.category_id ), lead_time_final AS ( SELECT product_id, current_lead_time, COALESCE( target_lead_time_cat_vendor, target_lead_time_cat, target_lead_time_vendor, target_lead_time_default, 14 ) as target_lead_time, COALESCE( warning_lead_time_cat_vendor, warning_lead_time_cat, warning_lead_time_vendor, warning_lead_time_default, 21 ) as warning_lead_time FROM lead_time_stats ) UPDATE product_metrics pm JOIN lead_time_final lt ON pm.product_id = lt.product_id SET pm.current_lead_time = lt.current_lead_time, pm.target_lead_time = lt.target_lead_time, pm.lead_time_status = CASE WHEN lt.current_lead_time <= lt.target_lead_time THEN 'On Target' WHEN lt.current_lead_time <= lt.warning_lead_time THEN 'Warning' ELSE 'Critical' END, pm.last_calculated_at = NOW() `); } // Add new function for category sales metrics async function calculateCategorySalesMetrics(connection, startTime, totalProducts) { outputProgress({ status: 'running', operation: 'Calculating category sales metrics', current: Math.floor(totalProducts * 0.9), total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.9), totalProducts), rate: calculateRate(startTime, Math.floor(totalProducts * 0.9)), percentage: '90' }); await connection.query(` INSERT INTO category_sales_metrics ( category_id, brand, period_start, period_end, avg_daily_sales, total_sold, num_products, avg_price, last_calculated_at ) WITH date_ranges AS ( SELECT DATE_SUB(CURDATE(), INTERVAL 30 DAY) as period_start, CURDATE() as period_end UNION ALL SELECT DATE_SUB(CURDATE(), INTERVAL 90 DAY), CURDATE() UNION ALL SELECT DATE_SUB(CURDATE(), INTERVAL 180 DAY), CURDATE() UNION ALL SELECT DATE_SUB(CURDATE(), INTERVAL 365 DAY), CURDATE() ), category_metrics AS ( SELECT c.id as category_id, p.brand, dr.period_start, dr.period_end, COUNT(DISTINCT p.product_id) as num_products, COALESCE(SUM(o.quantity), 0) / DATEDIFF(dr.period_end, dr.period_start) as avg_daily_sales, COALESCE(SUM(o.quantity), 0) as total_sold, COALESCE(AVG(o.price), 0) as avg_price FROM categories c JOIN product_categories pc ON c.id = pc.category_id JOIN products p ON pc.product_id = p.product_id CROSS JOIN date_ranges dr LEFT JOIN orders o ON p.product_id = o.product_id AND o.date BETWEEN dr.period_start AND dr.period_end AND o.canceled = false GROUP BY c.id, p.brand, dr.period_start, dr.period_end ) SELECT category_id, brand, period_start, period_end, avg_daily_sales, total_sold, num_products, avg_price, NOW() as last_calculated_at FROM category_metrics ON DUPLICATE KEY UPDATE avg_daily_sales = VALUES(avg_daily_sales), total_sold = VALUES(total_sold), num_products = VALUES(num_products), avg_price = VALUES(avg_price), last_calculated_at = NOW() `); } // Add new function for safety stock calculation async function calculateSafetyStock(connection, startTime, totalProducts) { outputProgress({ status: 'running', operation: 'Calculating safety stock levels', current: Math.floor(totalProducts * 0.7), total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.7), totalProducts), rate: calculateRate(startTime, Math.floor(totalProducts * 0.7)), percentage: '70' }); await connection.query(` WITH daily_sales AS ( SELECT o.product_id, DATE(o.date) as sale_date, SUM(o.quantity) as daily_quantity FROM orders o WHERE o.canceled = false AND o.date >= DATE_SUB(CURDATE(), INTERVAL 90 DAY) GROUP BY o.product_id, DATE(o.date) ), sales_stats AS ( SELECT ds.product_id, AVG(ds.daily_quantity) as avg_daily_sales, STDDEV_SAMP(ds.daily_quantity) as daily_sales_stddev, COUNT(DISTINCT ds.sale_date) as days_with_sales FROM daily_sales ds GROUP BY ds.product_id ), product_config AS ( SELECT p.product_id, p.vendor, pc.category_id, COALESCE( (SELECT service_level FROM safety_stock_config sc WHERE sc.category_id = pc.category_id AND sc.vendor = p.vendor LIMIT 1), COALESCE( (SELECT service_level FROM safety_stock_config sc WHERE sc.category_id = pc.category_id AND sc.vendor IS NULL LIMIT 1), COALESCE( (SELECT service_level FROM safety_stock_config sc WHERE sc.category_id IS NULL AND sc.vendor = p.vendor LIMIT 1), (SELECT service_level FROM safety_stock_config WHERE category_id IS NULL AND vendor IS NULL LIMIT 1) ) ) ) as service_level, COALESCE( (SELECT coverage_days FROM safety_stock_config sc WHERE sc.category_id = pc.category_id AND sc.vendor = p.vendor LIMIT 1), COALESCE( (SELECT coverage_days FROM safety_stock_config sc WHERE sc.category_id = pc.category_id AND sc.vendor IS NULL LIMIT 1), COALESCE( (SELECT coverage_days FROM safety_stock_config sc WHERE sc.category_id IS NULL AND sc.vendor = p.vendor LIMIT 1), (SELECT coverage_days FROM safety_stock_config WHERE category_id IS NULL AND vendor IS NULL LIMIT 1) ) ) ) as coverage_days FROM products p LEFT JOIN product_categories pc ON p.product_id = pc.product_id ) UPDATE product_metrics pm JOIN sales_stats ss ON pm.product_id = ss.product_id JOIN product_config pc ON pm.product_id = pc.product_id SET pm.safety_stock = GREATEST(1, CEIL( ss.avg_daily_sales * pc.coverage_days * (1 + ( COALESCE(ss.daily_sales_stddev, 0) * CASE WHEN pc.service_level >= 99.9 THEN 3.1 WHEN pc.service_level >= 99 THEN 2.33 WHEN pc.service_level >= 95 THEN 1.65 WHEN pc.service_level >= 90 THEN 1.29 ELSE 1 END )) ) ), pm.last_calculated_at = NOW() WHERE ss.days_with_sales > 0 `); } // Update the main calculation function to include the new metrics async function calculateMetrics() { let pool; const startTime = Date.now(); let processedCount = 0; let totalProducts = 0; try { isCancelled = false; pool = mysql.createPool(dbConfig); const connection = await pool.getConnection(); try { outputProgress({ status: 'running', operation: 'Starting metrics calculation', current: 0, total: 100, elapsed: '0s', remaining: 'Calculating...', rate: 0, percentage: '0' }); // Get total number of products const [countResult] = await connection.query('SELECT COUNT(*) as total FROM products') .catch(err => { logError(err, 'Failed to count products'); throw err; }); totalProducts = countResult[0].total; if (!SKIP_PRODUCT_METRICS) { // Initial progress with percentage outputProgress({ status: 'running', operation: 'Processing sales and stock metrics', current: processedCount, total: totalProducts, elapsed: '0s', remaining: 'Calculating...', rate: 0, percentage: '0' }); // Process in batches of 250 const batchSize = 250; for (let offset = 0; offset < totalProducts; offset += batchSize) { if (isCancelled) { throw new Error('Operation cancelled'); } const [products] = await connection.query('SELECT product_id, vendor FROM products LIMIT ? OFFSET ?', [batchSize, offset]) .catch(err => { logError(err, `Failed to fetch products batch at offset ${offset}`); throw err; }); processedCount += products.length; // Update progress after each batch outputProgress({ status: 'running', operation: 'Processing products', current: processedCount, total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, processedCount, totalProducts), rate: calculateRate(startTime, processedCount), percentage: ((processedCount / totalProducts) * 100).toFixed(1) }); // Process the batch const metricsUpdates = []; for (const product of products) { try { // Get configuration values for this product const [configs] = await connection.query(` WITH product_info AS ( SELECT p.product_id, p.vendor, pc.category_id FROM products p LEFT JOIN product_categories pc ON p.product_id = pc.product_id WHERE p.product_id = ? ), threshold_options AS ( SELECT st.*, CASE WHEN st.category_id = pi.category_id AND st.vendor = pi.vendor THEN 1 -- Category + vendor match WHEN st.category_id = pi.category_id AND st.vendor IS NULL THEN 2 -- Category match WHEN st.category_id IS NULL AND st.vendor = pi.vendor THEN 3 -- Vendor match WHEN st.category_id IS NULL AND st.vendor IS NULL THEN 4 -- Default ELSE 5 END as priority FROM product_info pi CROSS JOIN stock_thresholds st WHERE (st.category_id = pi.category_id OR st.category_id IS NULL) AND (st.vendor = pi.vendor OR st.vendor IS NULL) ), velocity_options AS ( SELECT sv.*, CASE WHEN sv.category_id = pi.category_id AND sv.vendor = pi.vendor THEN 1 WHEN sv.category_id = pi.category_id AND sv.vendor IS NULL THEN 2 WHEN sv.category_id IS NULL AND sv.vendor = pi.vendor THEN 3 WHEN sv.category_id IS NULL AND sv.vendor IS NULL THEN 4 ELSE 5 END as priority FROM product_info pi CROSS JOIN sales_velocity_config sv WHERE (sv.category_id = pi.category_id OR sv.category_id IS NULL) AND (sv.vendor = pi.vendor OR sv.vendor IS NULL) ), safety_options AS ( SELECT ss.*, CASE WHEN ss.category_id = pi.category_id AND ss.vendor = pi.vendor THEN 1 WHEN ss.category_id = pi.category_id AND ss.vendor IS NULL THEN 2 WHEN ss.category_id IS NULL AND ss.vendor = pi.vendor THEN 3 WHEN ss.category_id IS NULL AND ss.vendor IS NULL THEN 4 ELSE 5 END as priority FROM product_info pi CROSS JOIN safety_stock_config ss WHERE (ss.category_id = pi.category_id OR ss.category_id IS NULL) AND (ss.vendor = pi.vendor OR ss.vendor IS NULL) ) SELECT -- Stock thresholds COALESCE( (SELECT critical_days FROM threshold_options ORDER BY priority LIMIT 1), 7 ) as critical_days, COALESCE( (SELECT reorder_days FROM threshold_options ORDER BY priority LIMIT 1), 14 ) as reorder_days, COALESCE( (SELECT overstock_days FROM threshold_options ORDER BY priority LIMIT 1), 90 ) as overstock_days, COALESCE( (SELECT low_stock_threshold FROM threshold_options ORDER BY priority LIMIT 1), 5 ) as low_stock_threshold, -- Sales velocity windows COALESCE( (SELECT daily_window_days FROM velocity_options ORDER BY priority LIMIT 1), 30 ) as daily_window_days, COALESCE( (SELECT weekly_window_days FROM velocity_options ORDER BY priority LIMIT 1), 7 ) as weekly_window_days, COALESCE( (SELECT monthly_window_days FROM velocity_options ORDER BY priority LIMIT 1), 90 ) as monthly_window_days, -- Safety stock config COALESCE( (SELECT coverage_days FROM safety_options ORDER BY priority LIMIT 1), 14 ) as safety_stock_days, COALESCE( (SELECT service_level FROM safety_options ORDER BY priority LIMIT 1), 95.0 ) as service_level, -- ABC Classification (SELECT a_threshold FROM abc_classification_config WHERE id = 1) as abc_a_threshold, (SELECT b_threshold FROM abc_classification_config WHERE id = 1) as abc_b_threshold, (SELECT classification_period_days FROM abc_classification_config WHERE id = 1) as abc_period_days `, [product.product_id]); const config = configs[0] || { critical_days: 7, reorder_days: 14, overstock_days: 90, low_stock_threshold: 5, daily_window_days: 30, weekly_window_days: 7, monthly_window_days: 90, safety_stock_days: 14, service_level: 95.0, abc_a_threshold: 20.0, abc_b_threshold: 50.0, abc_period_days: 90 }; // Calculate sales metrics with trends using configured windows const [salesMetrics] = await connection.query(` WITH sales_summary AS ( SELECT SUM(o.quantity) as total_quantity_sold, SUM((o.price - COALESCE(o.discount, 0)) * o.quantity) as total_revenue, SUM(COALESCE(p.cost_price, 0) * o.quantity) as total_cost, MAX(o.date) as last_sale_date, MIN(o.date) as first_sale_date, COUNT(DISTINCT o.order_number) as number_of_orders, AVG(o.quantity) as avg_quantity_per_order, -- Calculate rolling averages using configured windows SUM(CASE WHEN o.date >= DATE_SUB(CURDATE(), INTERVAL ? DAY) THEN o.quantity ELSE 0 END) as last_30_days_qty, CASE WHEN SUM(CASE WHEN o.date >= DATE_SUB(CURDATE(), INTERVAL ? DAY) THEN o.quantity ELSE 0 END) IS NULL THEN 0 ELSE SUM(CASE WHEN o.date >= DATE_SUB(CURDATE(), INTERVAL ? DAY) THEN o.quantity ELSE 0 END) END as rolling_weekly_avg, SUM(CASE WHEN o.date >= DATE_SUB(CURDATE(), INTERVAL ? DAY) THEN o.quantity ELSE 0 END) as last_month_qty FROM orders o JOIN products p ON o.product_id = p.product_id WHERE o.canceled = 0 AND o.product_id = ? GROUP BY o.product_id ) SELECT total_quantity_sold, total_revenue, total_cost, last_sale_date, first_sale_date, number_of_orders, avg_quantity_per_order, last_30_days_qty / ? as rolling_daily_avg, rolling_weekly_avg / ? as rolling_weekly_avg, last_month_qty / ? as rolling_monthly_avg, total_quantity_sold as total_sales_to_date FROM sales_summary `, [ config.daily_window_days, config.weekly_window_days, config.weekly_window_days, config.monthly_window_days, product.product_id, config.daily_window_days, config.weekly_window_days, config.monthly_window_days ]).catch(err => { logError(err, `Failed to calculate sales metrics for product ${product.product_id}`); throw err; }); // Calculate purchase metrics with proper handling of negative quantities const [purchaseMetrics] = await connection.query(` WITH recent_orders AS ( SELECT date, received_date, received, cost_price, DATEDIFF(received_date, date) as lead_time_days, ROW_NUMBER() OVER (ORDER BY date DESC) as order_rank FROM purchase_orders WHERE status = 'closed' AND product_id = ? AND received > 0 AND received_date IS NOT NULL ), lead_time_orders AS ( SELECT * FROM recent_orders WHERE order_rank <= 5 -- Last 5 orders OR date >= DATE_SUB(CURDATE(), INTERVAL 90 DAY) -- Or orders from last 90 days ) SELECT SUM(CASE WHEN received >= 0 THEN received ELSE 0 END) as total_quantity_purchased, SUM(CASE WHEN received >= 0 THEN cost_price * received ELSE 0 END) as total_cost, MAX(date) as last_purchase_date, MIN(received_date) as first_received_date, MAX(received_date) as last_received_date, AVG(lead_time_days) as avg_lead_time_days, COUNT(*) as orders_analyzed FROM lead_time_orders `, [product.product_id]).catch(err => { logError(err, `Failed to calculate purchase metrics for product ${product.product_id}`); throw err; }); // Get current stock and stock age const [stockInfo] = await connection.query(` SELECT p.stock_quantity, p.cost_price, p.created_at, p.replenishable, p.moq, DATEDIFF(CURDATE(), MIN(po.received_date)) as days_since_first_stock, DATEDIFF(CURDATE(), COALESCE( (SELECT MAX(o2.date) FROM orders o2 WHERE o2.product_id = p.product_id AND o2.canceled = false), CURDATE() -- If no sales, use current date )) as days_since_last_sale, (SELECT SUM(quantity) FROM orders o3 WHERE o3.product_id = p.product_id AND o3.canceled = false) as total_quantity_sold, CASE WHEN EXISTS ( SELECT 1 FROM orders o WHERE o.product_id = p.product_id AND o.date >= DATE_SUB(CURDATE(), INTERVAL 30 DAY) AND o.canceled = false AND (SELECT SUM(quantity) FROM orders o2 WHERE o2.product_id = p.product_id AND o2.date >= o.date AND o2.canceled = false) = 0 ) THEN true ELSE false END as had_recent_stockout FROM products p LEFT JOIN purchase_orders po ON p.product_id = po.product_id AND po.status = 'closed' AND po.received > 0 WHERE p.product_id = ? GROUP BY p.product_id `, [product.product_id]).catch(err => { logError(err, `Failed to get stock info for product ${product.product_id}`); throw err; }); // Calculate metrics const metrics = salesMetrics[0] || {}; const purchases = purchaseMetrics[0] || {}; const stock = stockInfo[0] || {}; const daily_sales_avg = metrics.rolling_daily_avg || 0; const weekly_sales_avg = metrics.rolling_weekly_avg || 0; const monthly_sales_avg = metrics.total_quantity_sold ? metrics.total_quantity_sold / 30 : 0; // Calculate days of inventory with safety factor and lead times const days_of_inventory = daily_sales_avg > 0 ? Math.ceil( (stock.stock_quantity / daily_sales_avg) + (purchases.avg_lead_time_days || config.reorder_days) * (1 + (config.service_level / 100)) ) : null; const weeks_of_inventory = days_of_inventory ? Math.ceil(days_of_inventory / 7) : null; // Calculate margin percent with proper handling of zero revenue const margin_percent = metrics.total_revenue > 0 ? ((metrics.total_revenue - metrics.total_cost) / metrics.total_revenue) * 100 : null; // Calculate current inventory value const inventory_value = (stock.stock_quantity || 0) * (stock.cost_price || 0); // Calculate stock status with improved handling const stock_status = (() => { const days_since_first_stock = stockInfo[0]?.days_since_first_stock || 0; const days_since_last_sale = stockInfo[0]?.days_since_last_sale || 9999; const total_quantity_sold = stockInfo[0]?.total_quantity_sold || 0; const had_recent_stockout = stockInfo[0]?.had_recent_stockout || false; const dq = stock.stock_quantity || 0; const ds = daily_sales_avg || 0; const ws = weekly_sales_avg || 0; const ms = monthly_sales_avg || 0; // If no stock, return immediately if (dq === 0) { return had_recent_stockout ? 'Critical' : 'Out of Stock'; } // 1. Check if truly "New" (≤30 days and no sales) if (days_since_first_stock <= 30 && total_quantity_sold === 0) { return 'New'; } // 2. Handle zero or very low sales velocity cases if (ds === 0 || (ds < 0.1 && ws < 0.5)) { // Less than 1 sale per 10 days and less than 0.5 per week if (days_since_first_stock > config.overstock_days) { return 'Overstocked'; } if (days_since_first_stock > 30) { return 'At Risk'; } } // 3. Calculate days of supply and check velocity trends const days_of_supply = ds > 0 ? dq / ds : 999; const velocity_trend = ds > 0 ? (ds / (ms || ds) - 1) * 100 : 0; // Percent change from monthly to daily avg // Critical stock level if (days_of_supply <= config.critical_days) { return 'Critical'; } // Reorder cases if (days_of_supply <= config.reorder_days || (had_recent_stockout && days_of_supply <= config.reorder_days * 1.5)) { return 'Reorder'; } // At Risk cases (multiple scenarios) if ( // Approaching overstock threshold (days_of_supply >= config.overstock_days * 0.8) || // Significant sales decline (velocity_trend <= -50 && days_of_supply > config.reorder_days * 2) || // No recent sales (days_since_last_sale > 45 && dq > 0) || // Very low velocity with significant stock (ds > 0 && ds < 0.2 && dq > ds * config.overstock_days * 0.5) ) { return 'At Risk'; } // Overstock cases if (days_of_supply >= config.overstock_days) { return 'Overstocked'; } // If none of the above conditions are met return 'Healthy'; })(); // Calculate reorder quantity and overstocked amount let reorder_qty = 0; let overstocked_amt = 0; // Only calculate reorder quantity for replenishable products if (stock.replenishable && (stock_status === 'Critical' || stock_status === 'Reorder')) { const ds = daily_sales_avg || 0; const lt = purchases.avg_lead_time_days || 14; // Default to 14 days if no lead time data const sc = config.safety_stock_days || 14; const ss = config.safety_stock_days || 14; const dq = stock.stock_quantity || 0; const moq = stock.moq || 1; // Calculate desired stock level based on daily sales, lead time, coverage days, and safety stock const desired_stock = (ds * (lt + sc)) + ss; // Calculate raw reorder amount const raw_reorder = Math.max(0, desired_stock - dq); // Round up to nearest MOQ reorder_qty = Math.ceil(raw_reorder / moq) * moq; } // Calculate overstocked amount for overstocked products if (stock_status === 'Overstocked') { const ds = daily_sales_avg || 0; const dq = stock.stock_quantity || 0; const lt = purchases.avg_lead_time_days || 14; const sc = config.safety_stock_days || 14; const ss = config.safety_stock_days || 14; // Calculate maximum desired stock based on overstock days configuration const max_desired_stock = (ds * config.overstock_days) + ss; // Calculate excess inventory overstocked_amt = Math.max(0, dq - max_desired_stock); } // Add to batch update (remove safety_stock from the array since it's calculated separately) metricsUpdates.push([ product.product_id, daily_sales_avg || null, weekly_sales_avg || null, monthly_sales_avg || null, metrics.avg_quantity_per_order || null, metrics.number_of_orders || 0, metrics.first_sale_date || null, metrics.last_sale_date || null, days_of_inventory, weeks_of_inventory, daily_sales_avg > 0 ? Math.max(1, Math.ceil(daily_sales_avg * config.reorder_days)) : null, margin_percent, metrics.total_revenue || 0, inventory_value || 0, purchases.avg_lead_time_days || null, purchases.last_purchase_date || null, purchases.first_received_date || null, purchases.last_received_date || null, stock_status, reorder_qty, overstocked_amt ]); } catch (err) { logError(err, `Failed processing product ${product.product_id}`); // Continue with next product instead of failing entire batch continue; } } // Batch update metrics if (metricsUpdates.length > 0) { await connection.query(` INSERT INTO product_metrics ( product_id, daily_sales_avg, weekly_sales_avg, monthly_sales_avg, avg_quantity_per_order, number_of_orders, first_sale_date, last_sale_date, days_of_inventory, weeks_of_inventory, reorder_point, avg_margin_percent, total_revenue, inventory_value, avg_lead_time_days, last_purchase_date, first_received_date, last_received_date, stock_status, reorder_qty, overstocked_amt ) VALUES ? ON DUPLICATE KEY UPDATE last_calculated_at = NOW(), daily_sales_avg = VALUES(daily_sales_avg), weekly_sales_avg = VALUES(weekly_sales_avg), monthly_sales_avg = VALUES(monthly_sales_avg), avg_quantity_per_order = VALUES(avg_quantity_per_order), number_of_orders = VALUES(number_of_orders), first_sale_date = VALUES(first_sale_date), last_sale_date = VALUES(last_sale_date), days_of_inventory = VALUES(days_of_inventory), weeks_of_inventory = VALUES(weeks_of_inventory), reorder_point = VALUES(reorder_point), avg_margin_percent = VALUES(avg_margin_percent), total_revenue = VALUES(total_revenue), inventory_value = VALUES(inventory_value), avg_lead_time_days = VALUES(avg_lead_time_days), last_purchase_date = VALUES(last_purchase_date), first_received_date = VALUES(first_received_date), last_received_date = VALUES(last_received_date), stock_status = VALUES(stock_status), reorder_qty = VALUES(reorder_qty), overstocked_amt = VALUES(overstocked_amt) `, [metricsUpdates]).catch(err => { logError(err, `Failed to batch update metrics for ${metricsUpdates.length} products`); throw err; }); } } } else { console.log('Skipping product metrics calculation...'); outputProgress({ status: 'running', operation: 'Skipping product metrics calculation', current: Math.floor(totalProducts * 0.6), total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.6), totalProducts), rate: calculateRate(startTime, Math.floor(totalProducts * 0.6)), percentage: '60' }); } outputProgress({ status: 'running', operation: 'Starting financial metrics calculation', current: Math.floor(totalProducts * 0.6), total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.6), totalProducts), rate: calculateRate(startTime, Math.floor(totalProducts * 0.6)), percentage: '60' }); await calculateFinancialMetrics(connection, startTime, totalProducts); outputProgress({ status: 'running', operation: 'Starting vendor metrics calculation', current: Math.floor(totalProducts * 0.7), total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.7), totalProducts), rate: calculateRate(startTime, Math.floor(totalProducts * 0.7)), percentage: '70' }); await calculateVendorMetrics(connection, startTime, totalProducts); outputProgress({ status: 'running', operation: 'Starting turnover metrics calculation', current: Math.floor(totalProducts * 0.75), total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.75), totalProducts), rate: calculateRate(startTime, Math.floor(totalProducts * 0.75)), percentage: '75' }); await calculateTurnoverMetrics(connection, startTime, totalProducts); outputProgress({ status: 'running', operation: 'Starting lead time metrics calculation', current: Math.floor(totalProducts * 0.8), total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.8), totalProducts), rate: calculateRate(startTime, Math.floor(totalProducts * 0.8)), percentage: '80' }); await calculateLeadTimeMetrics(connection, startTime, totalProducts); // Calculate category metrics await calculateCategoryMetrics(connection, startTime, totalProducts); // Calculate category sales metrics await calculateCategorySalesMetrics(connection, startTime, totalProducts); // Calculate ABC classification const [abcConfig] = await connection.query('SELECT a_threshold, b_threshold FROM abc_classification_config WHERE id = 1'); const abcThresholds = abcConfig[0] || { a_threshold: 20, b_threshold: 50 }; await connection.query(` WITH revenue_rankings AS ( SELECT product_id, total_revenue, PERCENT_RANK() OVER (ORDER BY COALESCE(total_revenue, 0) DESC) * 100 as revenue_percentile FROM product_metrics ), classification_update AS ( SELECT product_id, CASE WHEN revenue_percentile <= ? THEN 'A' WHEN revenue_percentile <= ? THEN 'B' ELSE 'C' END as abc_class FROM revenue_rankings ) UPDATE product_metrics pm JOIN classification_update cu ON pm.product_id = cu.product_id SET pm.abc_class = cu.abc_class, pm.last_calculated_at = NOW() `, [abcThresholds.a_threshold, abcThresholds.b_threshold]); // Calculate time-based aggregates outputProgress({ status: 'running', operation: 'Starting time-based aggregates calculation', current: Math.floor(totalProducts * 0.85), total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.85), totalProducts), rate: calculateRate(startTime, Math.floor(totalProducts * 0.85)), percentage: '85' }); await connection.query('TRUNCATE TABLE product_time_aggregates;'); await connection.query(` INSERT INTO product_time_aggregates ( product_id, year, month, total_quantity_sold, total_revenue, total_cost, order_count, stock_received, stock_ordered, avg_price, profit_margin ) WITH sales_data AS ( SELECT o.product_id, YEAR(o.date) as year, MONTH(o.date) as month, SUM(o.quantity) as total_quantity_sold, SUM((o.price - COALESCE(o.discount, 0)) * o.quantity) as total_revenue, SUM(COALESCE(p.cost_price, 0) * o.quantity) as total_cost, COUNT(DISTINCT o.order_number) as order_count, AVG(o.price - COALESCE(o.discount, 0)) as avg_price, CASE WHEN SUM((o.price - COALESCE(o.discount, 0)) * o.quantity) = 0 THEN 0 ELSE ((SUM((o.price - COALESCE(o.discount, 0)) * o.quantity) - SUM(COALESCE(p.cost_price, 0) * o.quantity)) / SUM((o.price - COALESCE(o.discount, 0)) * o.quantity)) * 100 END as profit_margin FROM orders o JOIN products p ON o.product_id = p.product_id WHERE o.canceled = 0 GROUP BY o.product_id, YEAR(o.date), MONTH(o.date) ), purchase_data AS ( SELECT product_id, YEAR(date) as year, MONTH(date) as month, SUM(received) as stock_received, SUM(ordered) as stock_ordered FROM purchase_orders WHERE status = 'closed' GROUP BY product_id, YEAR(date), MONTH(date) ) SELECT s.product_id, s.year, s.month, s.total_quantity_sold, s.total_revenue, s.total_cost, s.order_count, COALESCE(p.stock_received, 0) as stock_received, COALESCE(p.stock_ordered, 0) as stock_ordered, 0 as avg_price, 0 as profit_margin FROM sales_data s LEFT JOIN purchase_data p ON s.product_id = p.product_id AND s.year = p.year AND s.month = p.month UNION SELECT p.product_id, p.year, p.month, 0 as total_quantity_sold, 0 as total_revenue, 0 as total_cost, 0 as order_count, p.stock_received, p.stock_ordered, 0 as avg_price, 0 as profit_margin FROM purchase_data p LEFT JOIN sales_data s ON p.product_id = s.product_id AND p.year = s.year AND p.month = s.month WHERE s.product_id IS NULL `); // Final success message outputProgress({ status: 'complete', operation: 'Metrics calculation complete', current: totalProducts, total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: '0s', rate: calculateRate(startTime, totalProducts), percentage: '100' }); // Clear progress file on successful completion clearProgress(); } catch (error) { if (isCancelled) { outputProgress({ status: 'cancelled', operation: 'Calculation cancelled', current: processedCount, total: totalProducts || 0, elapsed: formatElapsedTime(startTime), remaining: null, rate: calculateRate(startTime, processedCount), percentage: ((processedCount / (totalProducts || 1)) * 100).toFixed(1) }); } else { outputProgress({ status: 'error', operation: 'Error: ' + error.message, current: processedCount, total: totalProducts || 0, elapsed: formatElapsedTime(startTime), remaining: null, rate: calculateRate(startTime, processedCount), percentage: ((processedCount / (totalProducts || 1)) * 100).toFixed(1) }); } throw error; } finally { connection.release(); } } finally { if (pool) { await pool.end(); } } } // Export both functions and progress checker module.exports = calculateMetrics; module.exports.cancelCalculation = cancelCalculation; module.exports.getProgress = getProgress; // Run directly if called from command line if (require.main === module) { calculateMetrics().catch(error => { if (!error.message.includes('Operation cancelled')) { console.error('Error:', error); } process.exit(1); }); }