const mysql = require('mysql2/promise'); const path = require('path'); require('dotenv').config({ path: path.resolve(__dirname, '..', '.env') }); const fs = require('fs'); // Set to 1 to skip product metrics and only calculate the remaining metrics const SKIP_PRODUCT_METRICS = 0; // Helper function to format elapsed time function formatElapsedTime(startTime) { const elapsed = Date.now() - startTime; const seconds = Math.floor(elapsed / 1000); const minutes = Math.floor(seconds / 60); const hours = Math.floor(minutes / 60); if (hours > 0) { return `${hours}h ${minutes % 60}m`; } else if (minutes > 0) { return `${minutes}m ${seconds % 60}s`; } else { return `${seconds}s`; } } // Helper function to estimate remaining time function estimateRemaining(startTime, current, total) { if (current === 0) return null; const elapsed = Date.now() - startTime; const rate = current / elapsed; const remaining = (total - current) / rate; const minutes = Math.floor(remaining / 60000); const seconds = Math.floor((remaining % 60000) / 1000); if (minutes > 0) { return `${minutes}m ${seconds}s`; } else { return `${seconds}s`; } } // Helper function to calculate rate function calculateRate(startTime, current) { const elapsed = (Date.now() - startTime) / 1000; // Convert to seconds return elapsed > 0 ? Math.round(current / elapsed) : 0; } // Helper function to output progress function outputProgress(data) { // Save progress to file for resumption saveProgress(data); // Format as SSE event const event = { progress: data }; // Always send to stdout for frontend process.stdout.write(JSON.stringify(event) + '\n'); // Log significant events to disk const isSignificant = // Operation starts (data.operation && !data.current) || // Operation completions and errors data.status === 'complete' || data.status === 'error' || // Major phase changes data.operation?.includes('Starting ABC classification') || data.operation?.includes('Starting time-based aggregates') || data.operation?.includes('Starting vendor metrics'); if (isSignificant) { logImport(`${data.operation || 'Operation'}${data.message ? ': ' + data.message : ''}${data.error ? ' Error: ' + data.error : ''}${data.status ? ' Status: ' + data.status : ''}`); } } // Set up logging const LOG_DIR = path.join(__dirname, '../logs'); const ERROR_LOG = path.join(LOG_DIR, 'import-errors.log'); const IMPORT_LOG = path.join(LOG_DIR, 'import.log'); // Ensure log directory exists if (!fs.existsSync(LOG_DIR)) { fs.mkdirSync(LOG_DIR, { recursive: true }); } // Helper function to log errors function logError(error, context = '') { const timestamp = new Date().toISOString(); const errorMessage = `[${timestamp}] ${context}\nError: ${error.message}\nStack: ${error.stack}\n\n`; // Log to error file fs.appendFileSync(ERROR_LOG, errorMessage); // Also log to console console.error(`\n${context}\nError: ${error.message}`); } // Helper function to log import progress function logImport(message, isSignificant = true) { const timestamp = new Date().toISOString(); const logMessage = `[${timestamp}] ${message}\n`; fs.appendFileSync(IMPORT_LOG, logMessage); } // Database configuration const dbConfig = { host: process.env.DB_HOST, user: process.env.DB_USER, password: process.env.DB_PASSWORD, database: process.env.DB_NAME, waitForConnections: true, connectionLimit: 10, queueLimit: 0, // Add performance optimizations namedPlaceholders: true, maxPreparedStatements: 256, enableKeepAlive: true, keepAliveInitialDelay: 0, // Add memory optimizations flags: [ 'FOUND_ROWS', 'LONG_PASSWORD', 'PROTOCOL_41', 'TRANSACTIONS', 'SECURE_CONNECTION', 'MULTI_RESULTS', 'PS_MULTI_RESULTS', 'PLUGIN_AUTH', 'CONNECT_ATTRS', 'PLUGIN_AUTH_LENENC_CLIENT_DATA', 'SESSION_TRACK', 'MULTI_STATEMENTS' ] }; // Add cancel handler let isCancelled = false; // Add status file handling for progress resumption const STATUS_FILE = path.join(__dirname, '..', 'logs', 'metrics-status.json'); function saveProgress(progress) { try { fs.writeFileSync(STATUS_FILE, JSON.stringify({ ...progress, timestamp: Date.now() })); } catch (err) { console.error('Failed to save progress:', err); } } function clearProgress() { try { if (fs.existsSync(STATUS_FILE)) { fs.unlinkSync(STATUS_FILE); } } catch (err) { console.error('Failed to clear progress:', err); } } function getProgress() { try { if (fs.existsSync(STATUS_FILE)) { const progress = JSON.parse(fs.readFileSync(STATUS_FILE, 'utf8')); // Check if the progress is still valid (less than 1 hour old) if (progress.timestamp && Date.now() - progress.timestamp < 3600000) { return progress; } else { // Clear old progress clearProgress(); } } } catch (err) { console.error('Failed to read progress:', err); clearProgress(); } return null; } function cancelCalculation() { isCancelled = true; clearProgress(); // Format as SSE event const event = { progress: { status: 'cancelled', operation: 'Calculation cancelled', current: 0, total: 0, elapsed: null, remaining: null, rate: 0, timestamp: Date.now() } }; process.stdout.write(JSON.stringify(event) + '\n'); process.exit(0); } // Handle SIGTERM signal for cancellation process.on('SIGTERM', cancelCalculation); // Calculate GMROI and other financial metrics async function calculateFinancialMetrics(connection, startTime, totalProducts) { outputProgress({ status: 'running', operation: 'Calculating financial metrics', current: Math.floor(totalProducts * 0.6), total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.6), totalProducts), rate: calculateRate(startTime, Math.floor(totalProducts * 0.6)), percentage: '60' }); await connection.query(` UPDATE product_metrics pm JOIN ( SELECT p.product_id, p.cost_price * p.stock_quantity as inventory_value, SUM(o.quantity * o.price) as total_revenue, SUM(o.quantity * p.cost_price) as cost_of_goods_sold, SUM(o.quantity * (o.price - p.cost_price)) as gross_profit, MIN(o.date) as first_sale_date, MAX(o.date) as last_sale_date, DATEDIFF(MAX(o.date), MIN(o.date)) + 1 as calculation_period_days FROM products p LEFT JOIN orders o ON p.product_id = o.product_id WHERE o.canceled = false AND DATE(o.date) >= DATE_SUB(CURDATE(), INTERVAL 12 MONTH) GROUP BY p.product_id ) fin ON pm.product_id = fin.product_id SET pm.inventory_value = COALESCE(fin.inventory_value, 0), pm.total_revenue = COALESCE(fin.total_revenue, 0), pm.cost_of_goods_sold = COALESCE(fin.cost_of_goods_sold, 0), pm.gross_profit = COALESCE(fin.gross_profit, 0), pm.gmroi = CASE WHEN COALESCE(fin.inventory_value, 0) > 0 AND fin.calculation_period_days > 0 THEN (COALESCE(fin.gross_profit, 0) * (365.0 / fin.calculation_period_days)) / COALESCE(fin.inventory_value, 0) ELSE 0 END `); // Update time-based aggregates with financial metrics await connection.query(` UPDATE product_time_aggregates pta JOIN ( SELECT p.product_id, YEAR(o.date) as year, MONTH(o.date) as month, p.cost_price * p.stock_quantity as inventory_value, SUM(o.quantity * (o.price - p.cost_price)) as gross_profit, COUNT(DISTINCT DATE(o.date)) as days_in_period FROM products p LEFT JOIN orders o ON p.product_id = o.product_id WHERE o.canceled = false GROUP BY p.product_id, YEAR(o.date), MONTH(o.date) ) fin ON pta.product_id = fin.product_id AND pta.year = fin.year AND pta.month = fin.month SET pta.inventory_value = COALESCE(fin.inventory_value, 0), pta.gmroi = CASE WHEN COALESCE(fin.inventory_value, 0) > 0 AND fin.days_in_period > 0 THEN (COALESCE(fin.gross_profit, 0) * (365.0 / fin.days_in_period)) / COALESCE(fin.inventory_value, 0) ELSE 0 END `); } // Calculate vendor metrics async function calculateVendorMetrics(connection, startTime, totalProducts) { outputProgress({ status: 'running', operation: 'Calculating vendor metrics', current: Math.floor(totalProducts * 0.7), total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.7), totalProducts), rate: calculateRate(startTime, Math.floor(totalProducts * 0.7)), percentage: '70' }); // First, ensure all vendors exist in vendor_details await connection.query(` INSERT IGNORE INTO vendor_details (vendor, status) SELECT DISTINCT vendor, 'active' as status FROM products WHERE vendor IS NOT NULL AND vendor NOT IN (SELECT vendor FROM vendor_details) `); // Calculate vendor performance metrics await connection.query(` INSERT INTO vendor_metrics ( vendor, avg_lead_time_days, on_time_delivery_rate, order_fill_rate, total_orders, total_late_orders, total_purchase_value, avg_order_value, active_products, total_products, total_revenue, avg_margin_percent, status ) WITH vendor_orders AS ( SELECT po.vendor, AVG(DATEDIFF(po.received_date, po.date)) as avg_lead_time_days, COUNT(*) as total_orders, COUNT(CASE WHEN po.received_date > po.expected_date THEN 1 END) as total_late_orders, SUM(po.cost_price * po.ordered) as total_purchase_value, AVG(po.cost_price * po.ordered) as avg_order_value, CASE WHEN COUNT(*) > 0 THEN (COUNT(CASE WHEN po.received = po.ordered THEN 1 END) * 100.0) / COUNT(*) ELSE 0 END as order_fill_rate FROM purchase_orders po WHERE po.status = 'closed' GROUP BY po.vendor ), vendor_products AS ( SELECT p.vendor, COUNT(DISTINCT p.product_id) as total_products, COUNT(DISTINCT CASE WHEN p.visible = true THEN p.product_id END) as active_products, SUM(o.price * o.quantity) as total_revenue, CASE WHEN SUM(o.price * o.quantity) > 0 THEN (SUM((o.price - p.cost_price) * o.quantity) * 100.0) / SUM(o.price * o.quantity) ELSE 0 END as avg_margin_percent FROM products p LEFT JOIN orders o ON p.product_id = o.product_id AND o.canceled = false GROUP BY p.vendor ) SELECT vd.vendor, COALESCE(vo.avg_lead_time_days, 0) as avg_lead_time_days, CASE WHEN COALESCE(vo.total_orders, 0) > 0 THEN ((COALESCE(vo.total_orders, 0) - COALESCE(vo.total_late_orders, 0)) * 100.0) / COALESCE(vo.total_orders, 1) ELSE 0 END as on_time_delivery_rate, COALESCE(vo.order_fill_rate, 0) as order_fill_rate, COALESCE(vo.total_orders, 0) as total_orders, COALESCE(vo.total_late_orders, 0) as total_late_orders, COALESCE(vo.total_purchase_value, 0) as total_purchase_value, COALESCE(vo.avg_order_value, 0) as avg_order_value, COALESCE(vp.active_products, 0) as active_products, COALESCE(vp.total_products, 0) as total_products, COALESCE(vp.total_revenue, 0) as total_revenue, COALESCE(vp.avg_margin_percent, 0) as avg_margin_percent, vd.status FROM vendor_details vd LEFT JOIN vendor_orders vo ON vd.vendor = vo.vendor LEFT JOIN vendor_products vp ON vd.vendor = vp.vendor ON DUPLICATE KEY UPDATE avg_lead_time_days = VALUES(avg_lead_time_days), on_time_delivery_rate = VALUES(on_time_delivery_rate), order_fill_rate = VALUES(order_fill_rate), total_orders = VALUES(total_orders), total_late_orders = VALUES(total_late_orders), total_purchase_value = VALUES(total_purchase_value), avg_order_value = VALUES(avg_order_value), active_products = VALUES(active_products), total_products = VALUES(total_products), total_revenue = VALUES(total_revenue), avg_margin_percent = VALUES(avg_margin_percent), status = VALUES(status), last_calculated_at = CURRENT_TIMESTAMP `); // Calculate vendor time-based metrics await connection.query(` INSERT INTO vendor_time_metrics ( vendor, year, month, total_orders, late_orders, avg_lead_time_days, total_purchase_value, total_revenue, avg_margin_percent ) WITH vendor_time_data AS ( SELECT vd.vendor, YEAR(po.date) as year, MONTH(po.date) as month, COUNT(DISTINCT po.po_id) as total_orders, COUNT(DISTINCT CASE WHEN po.received_date > po.expected_date THEN po.po_id END) as late_orders, AVG(DATEDIFF(po.received_date, po.date)) as avg_lead_time_days, SUM(po.cost_price * po.ordered) as total_purchase_value, SUM(o.price * o.quantity) as total_revenue, CASE WHEN SUM(o.price * o.quantity) > 0 THEN (SUM((o.price - p.cost_price) * o.quantity) * 100.0) / SUM(o.price * o.quantity) ELSE 0 END as avg_margin_percent FROM vendor_details vd LEFT JOIN products p ON vd.vendor = p.vendor LEFT JOIN purchase_orders po ON p.product_id = po.product_id LEFT JOIN orders o ON p.product_id = o.product_id AND o.canceled = false WHERE po.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH) GROUP BY vd.vendor, YEAR(po.date), MONTH(po.date) ) SELECT vendor, year, month, COALESCE(total_orders, 0) as total_orders, COALESCE(late_orders, 0) as late_orders, COALESCE(avg_lead_time_days, 0) as avg_lead_time_days, COALESCE(total_purchase_value, 0) as total_purchase_value, COALESCE(total_revenue, 0) as total_revenue, COALESCE(avg_margin_percent, 0) as avg_margin_percent FROM vendor_time_data ON DUPLICATE KEY UPDATE total_orders = VALUES(total_orders), late_orders = VALUES(late_orders), avg_lead_time_days = VALUES(avg_lead_time_days), total_purchase_value = VALUES(total_purchase_value), total_revenue = VALUES(total_revenue), avg_margin_percent = VALUES(avg_margin_percent) `); } async function calculateCategoryMetrics(connection, startTime, totalProducts) { outputProgress({ status: 'running', operation: 'Calculating category metrics', current: Math.floor(totalProducts * 0.85), total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.85), totalProducts), rate: calculateRate(startTime, Math.floor(totalProducts * 0.85)), percentage: '85' }); // Calculate category performance metrics await connection.query(` INSERT INTO category_metrics ( category_id, product_count, active_products, total_value, avg_margin, turnover_rate, growth_rate, status ) WITH category_sales AS ( SELECT c.id as category_id, COUNT(DISTINCT p.product_id) as product_count, COUNT(DISTINCT CASE WHEN p.visible = true THEN p.product_id END) as active_products, SUM(p.stock_quantity * p.cost_price) as total_value, CASE WHEN SUM(o.price * o.quantity) > 0 THEN (SUM((o.price - p.cost_price) * o.quantity) * 100.0) / SUM(o.price * o.quantity) ELSE 0 END as avg_margin, CASE WHEN AVG(GREATEST(p.stock_quantity, 0)) >= 0.01 THEN LEAST( SUM(CASE WHEN o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 1 YEAR) THEN COALESCE(o.quantity, 0) ELSE 0 END) / GREATEST( AVG(GREATEST(p.stock_quantity, 0)), 1.0 ), 999.99 ) ELSE 0 END as turnover_rate, -- Current period (last 3 months) SUM(CASE WHEN o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 3 MONTH) THEN COALESCE(o.quantity * o.price, 0) ELSE 0 END) as current_period_sales, -- Previous year same period SUM(CASE WHEN o.date BETWEEN DATE_SUB(CURRENT_DATE, INTERVAL 15 MONTH) AND DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH) THEN COALESCE(o.quantity * o.price, 0) ELSE 0 END) as previous_year_period_sales, c.status FROM categories c LEFT JOIN product_categories pc ON c.id = pc.category_id LEFT JOIN products p ON pc.product_id = p.product_id LEFT JOIN orders o ON p.product_id = o.product_id AND o.canceled = false GROUP BY c.id, c.status ) SELECT category_id, product_count, active_products, total_value, COALESCE(avg_margin, 0) as avg_margin, COALESCE(turnover_rate, 0) as turnover_rate, -- Enhanced YoY growth rate calculation CASE WHEN previous_year_period_sales = 0 AND current_period_sales > 0 THEN 100.0 WHEN previous_year_period_sales = 0 THEN 0.0 ELSE LEAST( GREATEST( ((current_period_sales - previous_year_period_sales) / NULLIF(previous_year_period_sales, 0)) * 100.0, -100.0 ), 999.99 ) END as growth_rate, status FROM category_sales ON DUPLICATE KEY UPDATE product_count = VALUES(product_count), active_products = VALUES(active_products), total_value = VALUES(total_value), avg_margin = VALUES(avg_margin), turnover_rate = VALUES(turnover_rate), growth_rate = VALUES(growth_rate), status = VALUES(status), last_calculated_at = CURRENT_TIMESTAMP `); // Calculate category time-based metrics await connection.query(` INSERT INTO category_time_metrics ( category_id, year, month, product_count, active_products, total_value, total_revenue, avg_margin, turnover_rate ) SELECT c.id as category_id, YEAR(o.date) as year, MONTH(o.date) as month, COUNT(DISTINCT p.product_id) as product_count, COUNT(DISTINCT CASE WHEN p.visible = true THEN p.product_id END) as active_products, SUM(p.stock_quantity * p.cost_price) as total_value, SUM(o.price * o.quantity) as total_revenue, CASE WHEN SUM(o.price * o.quantity) > 0 THEN (SUM((o.price - p.cost_price) * o.quantity) * 100.0) / SUM(o.price * o.quantity) ELSE 0 END as avg_margin, CASE WHEN AVG(p.stock_quantity) > 0 THEN SUM(o.quantity) / AVG(p.stock_quantity) ELSE 0 END as turnover_rate FROM categories c LEFT JOIN product_categories pc ON c.id = pc.category_id LEFT JOIN products p ON pc.product_id = p.product_id LEFT JOIN orders o ON p.product_id = o.product_id AND o.canceled = false WHERE o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH) GROUP BY c.id, YEAR(o.date), MONTH(o.date) ON DUPLICATE KEY UPDATE product_count = VALUES(product_count), active_products = VALUES(active_products), total_value = VALUES(total_value), total_revenue = VALUES(total_revenue), avg_margin = VALUES(avg_margin), turnover_rate = VALUES(turnover_rate) `); } // Calculate turnover rate metrics async function calculateTurnoverMetrics(connection, startTime, totalProducts) { outputProgress({ status: 'running', operation: 'Calculating turnover metrics', current: Math.floor(totalProducts * 0.75), total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.75), totalProducts), rate: calculateRate(startTime, Math.floor(totalProducts * 0.75)), percentage: '75' }); await connection.query(` WITH product_turnover AS ( SELECT p.product_id, p.vendor, pc.category_id, COALESCE( (SELECT calculation_period_days FROM turnover_config tc WHERE tc.category_id = pc.category_id AND tc.vendor = p.vendor LIMIT 1), COALESCE( (SELECT calculation_period_days FROM turnover_config tc WHERE tc.category_id = pc.category_id AND tc.vendor IS NULL LIMIT 1), COALESCE( (SELECT calculation_period_days FROM turnover_config tc WHERE tc.category_id IS NULL AND tc.vendor = p.vendor LIMIT 1), (SELECT calculation_period_days FROM turnover_config WHERE category_id IS NULL AND vendor IS NULL LIMIT 1) ) ) ) as calculation_period_days, -- Calculate average daily sales over the calculation period SUM(o.quantity) / GREATEST(DATEDIFF(CURDATE(), DATE_SUB(CURDATE(), INTERVAL 30 DAY)), 1) as avg_daily_sales, -- Calculate average stock level, excluding zero stock periods AVG(CASE WHEN p.stock_quantity > 0 THEN p.stock_quantity ELSE NULL END) as avg_stock_level FROM products p LEFT JOIN product_categories pc ON p.product_id = pc.product_id LEFT JOIN orders o ON p.product_id = o.product_id WHERE o.date >= DATE_SUB(CURDATE(), INTERVAL 30 DAY) AND o.canceled = 0 GROUP BY p.product_id, p.vendor, pc.category_id ) UPDATE product_metrics pm JOIN product_turnover pt ON pm.product_id = pt.product_id SET pm.turnover_rate = CASE WHEN pt.avg_stock_level > 0 AND pt.avg_daily_sales > 0 THEN -- Calculate annualized turnover rate LEAST( (pt.avg_daily_sales * 365) / GREATEST(pt.avg_stock_level, 1), 999999.999 ) ELSE 0 END, pm.last_calculated_at = NOW() `); } // Enhance lead time calculations async function calculateLeadTimeMetrics(connection, startTime, totalProducts) { outputProgress({ status: 'running', operation: 'Calculating lead time metrics', current: Math.floor(totalProducts * 0.8), total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.8), totalProducts), rate: calculateRate(startTime, Math.floor(totalProducts * 0.8)), percentage: '80' }); await connection.query(` WITH lead_time_stats AS ( SELECT p.product_id, p.vendor, pc.category_id, AVG(DATEDIFF(po.received_date, po.date)) as current_lead_time, ( SELECT target_days FROM lead_time_thresholds lt WHERE lt.category_id = pc.category_id AND lt.vendor = p.vendor LIMIT 1 ) as target_lead_time_cat_vendor, ( SELECT target_days FROM lead_time_thresholds lt WHERE lt.category_id = pc.category_id AND lt.vendor IS NULL LIMIT 1 ) as target_lead_time_cat, ( SELECT target_days FROM lead_time_thresholds lt WHERE lt.category_id IS NULL AND lt.vendor = p.vendor LIMIT 1 ) as target_lead_time_vendor, ( SELECT target_days FROM lead_time_thresholds WHERE category_id IS NULL AND vendor IS NULL LIMIT 1 ) as target_lead_time_default, ( SELECT warning_days FROM lead_time_thresholds lt WHERE lt.category_id = pc.category_id AND lt.vendor = p.vendor LIMIT 1 ) as warning_lead_time_cat_vendor, ( SELECT warning_days FROM lead_time_thresholds lt WHERE lt.category_id = pc.category_id AND lt.vendor IS NULL LIMIT 1 ) as warning_lead_time_cat, ( SELECT warning_days FROM lead_time_thresholds lt WHERE lt.category_id IS NULL AND lt.vendor = p.vendor LIMIT 1 ) as warning_lead_time_vendor, ( SELECT warning_days FROM lead_time_thresholds WHERE category_id IS NULL AND vendor IS NULL LIMIT 1 ) as warning_lead_time_default FROM products p LEFT JOIN product_categories pc ON p.product_id = pc.product_id LEFT JOIN purchase_orders po ON p.product_id = po.product_id WHERE po.status = 'completed' AND po.received_date IS NOT NULL AND po.date >= DATE_SUB(CURDATE(), INTERVAL 90 DAY) GROUP BY p.product_id, p.vendor, pc.category_id ), lead_time_final AS ( SELECT product_id, current_lead_time, COALESCE( target_lead_time_cat_vendor, target_lead_time_cat, target_lead_time_vendor, target_lead_time_default, 14 ) as target_lead_time, COALESCE( warning_lead_time_cat_vendor, warning_lead_time_cat, warning_lead_time_vendor, warning_lead_time_default, 21 ) as warning_lead_time FROM lead_time_stats ) UPDATE product_metrics pm JOIN lead_time_final lt ON pm.product_id = lt.product_id SET pm.current_lead_time = lt.current_lead_time, pm.target_lead_time = lt.target_lead_time, pm.lead_time_status = CASE WHEN lt.current_lead_time <= lt.target_lead_time THEN 'On Target' WHEN lt.current_lead_time <= lt.warning_lead_time THEN 'Warning' ELSE 'Critical' END, pm.last_calculated_at = NOW() `); } // Add new function for category sales metrics async function calculateCategorySalesMetrics(connection, startTime, totalProducts) { outputProgress({ status: 'running', operation: 'Calculating category sales metrics', current: Math.floor(totalProducts * 0.9), total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.9), totalProducts), rate: calculateRate(startTime, Math.floor(totalProducts * 0.9)), percentage: '90' }); await connection.query(` INSERT INTO category_sales_metrics ( category_id, brand, period_start, period_end, avg_daily_sales, total_sold, num_products, avg_price, last_calculated_at ) WITH date_ranges AS ( SELECT DATE_SUB(CURDATE(), INTERVAL 30 DAY) as period_start, CURDATE() as period_end UNION ALL SELECT DATE_SUB(CURDATE(), INTERVAL 90 DAY), CURDATE() UNION ALL SELECT DATE_SUB(CURDATE(), INTERVAL 180 DAY), CURDATE() UNION ALL SELECT DATE_SUB(CURDATE(), INTERVAL 365 DAY), CURDATE() ), category_metrics AS ( SELECT c.id as category_id, p.brand, dr.period_start, dr.period_end, COUNT(DISTINCT p.product_id) as num_products, COALESCE(SUM(o.quantity), 0) / DATEDIFF(dr.period_end, dr.period_start) as avg_daily_sales, COALESCE(SUM(o.quantity), 0) as total_sold, COALESCE(AVG(o.price), 0) as avg_price FROM categories c JOIN product_categories pc ON c.id = pc.category_id JOIN products p ON pc.product_id = p.product_id CROSS JOIN date_ranges dr LEFT JOIN orders o ON p.product_id = o.product_id AND o.date BETWEEN dr.period_start AND dr.period_end AND o.canceled = false GROUP BY c.id, p.brand, dr.period_start, dr.period_end ) SELECT category_id, brand, period_start, period_end, avg_daily_sales, total_sold, num_products, avg_price, NOW() as last_calculated_at FROM category_metrics ON DUPLICATE KEY UPDATE avg_daily_sales = VALUES(avg_daily_sales), total_sold = VALUES(total_sold), num_products = VALUES(num_products), avg_price = VALUES(avg_price), last_calculated_at = NOW() `); } // Add new function for safety stock calculation async function calculateSafetyStock(connection, startTime, totalProducts) { outputProgress({ status: 'running', operation: 'Calculating safety stock levels', current: Math.floor(totalProducts * 0.7), total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.7), totalProducts), rate: calculateRate(startTime, Math.floor(totalProducts * 0.7)), percentage: '70' }); await connection.query(` WITH daily_sales AS ( SELECT o.product_id, DATE(o.date) as sale_date, SUM(o.quantity) as daily_quantity FROM orders o WHERE o.canceled = false AND o.date >= DATE_SUB(CURDATE(), INTERVAL 90 DAY) GROUP BY o.product_id, DATE(o.date) ), sales_stats AS ( SELECT ds.product_id, AVG(ds.daily_quantity) as avg_daily_sales, STDDEV_SAMP(ds.daily_quantity) as daily_sales_stddev, COUNT(DISTINCT ds.sale_date) as days_with_sales FROM daily_sales ds GROUP BY ds.product_id ), product_config AS ( SELECT p.product_id, p.vendor, pc.category_id, COALESCE( (SELECT service_level FROM safety_stock_config sc WHERE sc.category_id = pc.category_id AND sc.vendor = p.vendor LIMIT 1), COALESCE( (SELECT service_level FROM safety_stock_config sc WHERE sc.category_id = pc.category_id AND sc.vendor IS NULL LIMIT 1), COALESCE( (SELECT service_level FROM safety_stock_config sc WHERE sc.category_id IS NULL AND sc.vendor = p.vendor LIMIT 1), (SELECT service_level FROM safety_stock_config WHERE category_id IS NULL AND vendor IS NULL LIMIT 1) ) ) ) as service_level, COALESCE( (SELECT coverage_days FROM safety_stock_config sc WHERE sc.category_id = pc.category_id AND sc.vendor = p.vendor LIMIT 1), COALESCE( (SELECT coverage_days FROM safety_stock_config sc WHERE sc.category_id = pc.category_id AND sc.vendor IS NULL LIMIT 1), COALESCE( (SELECT coverage_days FROM safety_stock_config sc WHERE sc.category_id IS NULL AND sc.vendor = p.vendor LIMIT 1), (SELECT coverage_days FROM safety_stock_config WHERE category_id IS NULL AND vendor IS NULL LIMIT 1) ) ) ) as coverage_days FROM products p LEFT JOIN product_categories pc ON p.product_id = pc.product_id ) UPDATE product_metrics pm JOIN sales_stats ss ON pm.product_id = ss.product_id JOIN product_config pc ON pm.product_id = pc.product_id SET pm.safety_stock = GREATEST(1, CEIL( ss.avg_daily_sales * pc.coverage_days * (1 + ( COALESCE(ss.daily_sales_stddev, 0) * CASE WHEN pc.service_level >= 99.9 THEN 3.1 WHEN pc.service_level >= 99 THEN 2.33 WHEN pc.service_level >= 95 THEN 1.65 WHEN pc.service_level >= 90 THEN 1.29 ELSE 1 END )) ) ), pm.last_calculated_at = NOW() WHERE ss.days_with_sales > 0 `); } // Add new function for brand metrics calculation async function calculateBrandMetrics(connection, startTime, totalProducts) { outputProgress({ status: 'running', operation: 'Calculating brand metrics', current: Math.floor(totalProducts * 0.95), total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.95), totalProducts), rate: calculateRate(startTime, Math.floor(totalProducts * 0.95)), percentage: '95' }); // Calculate brand metrics await connection.query(` INSERT INTO brand_metrics ( brand, product_count, active_products, total_stock_units, total_stock_cost, total_stock_retail, total_revenue, avg_margin, growth_rate ) WITH brand_data AS ( SELECT p.brand, COUNT(DISTINCT p.product_id) as product_count, COUNT(DISTINCT CASE WHEN p.visible = true THEN p.product_id END) as active_products, SUM(p.stock_quantity) as total_stock_units, SUM(p.stock_quantity * p.cost_price) as total_stock_cost, SUM(p.stock_quantity * p.price) as total_stock_retail, SUM(o.price * o.quantity) as total_revenue, CASE WHEN SUM(o.price * o.quantity) > 0 THEN (SUM((o.price - p.cost_price) * o.quantity) * 100.0) / SUM(o.price * o.quantity) ELSE 0 END as avg_margin, -- Current period (last 3 months) SUM(CASE WHEN o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 3 MONTH) THEN COALESCE(o.quantity * o.price, 0) ELSE 0 END) as current_period_sales, -- Previous year same period SUM(CASE WHEN o.date BETWEEN DATE_SUB(CURRENT_DATE, INTERVAL 15 MONTH) AND DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH) THEN COALESCE(o.quantity * o.price, 0) ELSE 0 END) as previous_year_period_sales FROM products p LEFT JOIN orders o ON p.product_id = o.product_id AND o.canceled = false WHERE p.brand IS NOT NULL GROUP BY p.brand ) SELECT brand, product_count, active_products, total_stock_units, total_stock_cost, total_stock_retail, total_revenue, avg_margin, CASE WHEN previous_year_period_sales = 0 AND current_period_sales > 0 THEN 100.0 WHEN previous_year_period_sales = 0 THEN 0.0 ELSE LEAST( GREATEST( ((current_period_sales - previous_year_period_sales) / NULLIF(previous_year_period_sales, 0)) * 100.0, -100.0 ), 999.99 ) END as growth_rate FROM brand_data ON DUPLICATE KEY UPDATE product_count = VALUES(product_count), active_products = VALUES(active_products), total_stock_units = VALUES(total_stock_units), total_stock_cost = VALUES(total_stock_cost), total_stock_retail = VALUES(total_stock_retail), total_revenue = VALUES(total_revenue), avg_margin = VALUES(avg_margin), growth_rate = VALUES(growth_rate), last_calculated_at = CURRENT_TIMESTAMP `); // Calculate brand time-based metrics await connection.query(` INSERT INTO brand_time_metrics ( brand, year, month, product_count, active_products, total_stock_units, total_stock_cost, total_stock_retail, total_revenue, avg_margin ) SELECT p.brand, YEAR(o.date) as year, MONTH(o.date) as month, COUNT(DISTINCT p.product_id) as product_count, COUNT(DISTINCT CASE WHEN p.visible = true THEN p.product_id END) as active_products, SUM(p.stock_quantity) as total_stock_units, SUM(p.stock_quantity * p.cost_price) as total_stock_cost, SUM(p.stock_quantity * p.price) as total_stock_retail, SUM(o.price * o.quantity) as total_revenue, CASE WHEN SUM(o.price * o.quantity) > 0 THEN (SUM((o.price - p.cost_price) * o.quantity) * 100.0) / SUM(o.price * o.quantity) ELSE 0 END as avg_margin FROM products p LEFT JOIN orders o ON p.product_id = o.product_id AND o.canceled = false WHERE p.brand IS NOT NULL AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH) GROUP BY p.brand, YEAR(o.date), MONTH(o.date) ON DUPLICATE KEY UPDATE product_count = VALUES(product_count), active_products = VALUES(active_products), total_stock_units = VALUES(total_stock_units), total_stock_cost = VALUES(total_stock_cost), total_stock_retail = VALUES(total_stock_retail), total_revenue = VALUES(total_revenue), avg_margin = VALUES(avg_margin) `); } // Add new function for sales forecast calculation async function calculateSalesForecasts(connection, startTime, totalProducts) { outputProgress({ status: 'running', operation: 'Calculating sales forecasts', current: Math.floor(totalProducts * 0.98), total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.98), totalProducts), rate: calculateRate(startTime, Math.floor(totalProducts * 0.98)), percentage: '98' }); // Calculate product-level forecasts await connection.query(` INSERT INTO sales_forecasts ( product_id, forecast_date, forecast_units, forecast_revenue, confidence_level, last_calculated_at ) WITH daily_sales AS ( SELECT o.product_id, DATE(o.date) as sale_date, SUM(o.quantity) as daily_quantity, SUM(o.price * o.quantity) as daily_revenue FROM orders o WHERE o.canceled = false AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 90 DAY) GROUP BY o.product_id, DATE(o.date) ), forecast_dates AS ( SELECT DATE_ADD(CURRENT_DATE, INTERVAL n DAY) as forecast_date FROM ( SELECT 0 as n UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4 UNION SELECT 5 UNION SELECT 6 UNION SELECT 7 UNION SELECT 8 UNION SELECT 9 UNION SELECT 10 UNION SELECT 11 UNION SELECT 12 UNION SELECT 13 UNION SELECT 14 UNION SELECT 15 UNION SELECT 16 UNION SELECT 17 UNION SELECT 18 UNION SELECT 19 UNION SELECT 20 UNION SELECT 21 UNION SELECT 22 UNION SELECT 23 UNION SELECT 24 UNION SELECT 25 UNION SELECT 26 UNION SELECT 27 UNION SELECT 28 UNION SELECT 29 UNION SELECT 30 ) numbers ), product_stats AS ( SELECT ds.product_id, AVG(ds.daily_quantity) as avg_daily_quantity, STDDEV_SAMP(ds.daily_quantity) as std_daily_quantity, AVG(ds.daily_revenue) as avg_daily_revenue, STDDEV_SAMP(ds.daily_revenue) as std_daily_revenue, COUNT(*) as data_points, -- Calculate day-of-week averages AVG(CASE WHEN DAYOFWEEK(ds.sale_date) = 1 THEN ds.daily_revenue END) as sunday_avg, AVG(CASE WHEN DAYOFWEEK(ds.sale_date) = 2 THEN ds.daily_revenue END) as monday_avg, AVG(CASE WHEN DAYOFWEEK(ds.sale_date) = 3 THEN ds.daily_revenue END) as tuesday_avg, AVG(CASE WHEN DAYOFWEEK(ds.sale_date) = 4 THEN ds.daily_revenue END) as wednesday_avg, AVG(CASE WHEN DAYOFWEEK(ds.sale_date) = 5 THEN ds.daily_revenue END) as thursday_avg, AVG(CASE WHEN DAYOFWEEK(ds.sale_date) = 6 THEN ds.daily_revenue END) as friday_avg, AVG(CASE WHEN DAYOFWEEK(ds.sale_date) = 7 THEN ds.daily_revenue END) as saturday_avg FROM daily_sales ds GROUP BY ds.product_id ) SELECT ps.product_id, fd.forecast_date, GREATEST(0, ps.avg_daily_quantity * (1 + COALESCE( (SELECT seasonality_factor FROM sales_seasonality WHERE MONTH(fd.forecast_date) = month LIMIT 1), 0 )) ) as forecast_units, GREATEST(0, CASE DAYOFWEEK(fd.forecast_date) WHEN 1 THEN COALESCE(ps.sunday_avg, ps.avg_daily_revenue) WHEN 2 THEN COALESCE(ps.monday_avg, ps.avg_daily_revenue) WHEN 3 THEN COALESCE(ps.tuesday_avg, ps.avg_daily_revenue) WHEN 4 THEN COALESCE(ps.wednesday_avg, ps.avg_daily_revenue) WHEN 5 THEN COALESCE(ps.thursday_avg, ps.avg_daily_revenue) WHEN 6 THEN COALESCE(ps.friday_avg, ps.avg_daily_revenue) WHEN 7 THEN COALESCE(ps.saturday_avg, ps.avg_daily_revenue) END * (1 + COALESCE( (SELECT seasonality_factor FROM sales_seasonality WHERE MONTH(fd.forecast_date) = month LIMIT 1), 0 )) * -- Add some randomness within a small range (±5%) (0.95 + (RAND() * 0.1)) ) as forecast_revenue, CASE WHEN ps.data_points >= 60 THEN 90 WHEN ps.data_points >= 30 THEN 80 WHEN ps.data_points >= 14 THEN 70 ELSE 60 END as confidence_level, NOW() as last_calculated_at FROM product_stats ps CROSS JOIN forecast_dates fd WHERE ps.avg_daily_quantity > 0 ON DUPLICATE KEY UPDATE forecast_units = VALUES(forecast_units), forecast_revenue = VALUES(forecast_revenue), confidence_level = VALUES(confidence_level), last_calculated_at = NOW() `); // Calculate category-level forecasts await connection.query(` INSERT INTO category_forecasts ( category_id, forecast_date, forecast_units, forecast_revenue, confidence_level, last_calculated_at ) WITH category_daily_sales AS ( SELECT pc.category_id, DATE(o.date) as sale_date, SUM(o.quantity) as daily_quantity, SUM(o.price * o.quantity) as daily_revenue FROM orders o JOIN product_categories pc ON o.product_id = pc.product_id WHERE o.canceled = false AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 90 DAY) GROUP BY pc.category_id, DATE(o.date) ), forecast_dates AS ( SELECT DATE_ADD(CURRENT_DATE, INTERVAL n DAY) as forecast_date FROM ( SELECT 0 as n UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4 UNION SELECT 5 UNION SELECT 6 UNION SELECT 7 UNION SELECT 8 UNION SELECT 9 UNION SELECT 10 UNION SELECT 11 UNION SELECT 12 UNION SELECT 13 UNION SELECT 14 UNION SELECT 15 UNION SELECT 16 UNION SELECT 17 UNION SELECT 18 UNION SELECT 19 UNION SELECT 20 UNION SELECT 21 UNION SELECT 22 UNION SELECT 23 UNION SELECT 24 UNION SELECT 25 UNION SELECT 26 UNION SELECT 27 UNION SELECT 28 UNION SELECT 29 UNION SELECT 30 ) numbers ), category_stats AS ( SELECT cds.category_id, AVG(cds.daily_quantity) as avg_daily_quantity, STDDEV_SAMP(cds.daily_quantity) as std_daily_quantity, AVG(cds.daily_revenue) as avg_daily_revenue, STDDEV_SAMP(cds.daily_revenue) as std_daily_revenue, COUNT(*) as data_points, -- Calculate day-of-week averages AVG(CASE WHEN DAYOFWEEK(cds.sale_date) = 1 THEN cds.daily_revenue END) as sunday_avg, AVG(CASE WHEN DAYOFWEEK(cds.sale_date) = 2 THEN cds.daily_revenue END) as monday_avg, AVG(CASE WHEN DAYOFWEEK(cds.sale_date) = 3 THEN cds.daily_revenue END) as tuesday_avg, AVG(CASE WHEN DAYOFWEEK(cds.sale_date) = 4 THEN cds.daily_revenue END) as wednesday_avg, AVG(CASE WHEN DAYOFWEEK(cds.sale_date) = 5 THEN cds.daily_revenue END) as thursday_avg, AVG(CASE WHEN DAYOFWEEK(cds.sale_date) = 6 THEN cds.daily_revenue END) as friday_avg, AVG(CASE WHEN DAYOFWEEK(cds.sale_date) = 7 THEN cds.daily_revenue END) as saturday_avg FROM category_daily_sales cds GROUP BY cds.category_id ) SELECT cs.category_id, fd.forecast_date, GREATEST(0, cs.avg_daily_quantity * (1 + COALESCE( (SELECT seasonality_factor FROM sales_seasonality WHERE MONTH(fd.forecast_date) = month LIMIT 1), 0 )) ) as forecast_units, GREATEST(0, CASE DAYOFWEEK(fd.forecast_date) WHEN 1 THEN COALESCE(cs.sunday_avg, cs.avg_daily_revenue) WHEN 2 THEN COALESCE(cs.monday_avg, cs.avg_daily_revenue) WHEN 3 THEN COALESCE(cs.tuesday_avg, cs.avg_daily_revenue) WHEN 4 THEN COALESCE(cs.wednesday_avg, cs.avg_daily_revenue) WHEN 5 THEN COALESCE(cs.thursday_avg, cs.avg_daily_revenue) WHEN 6 THEN COALESCE(cs.friday_avg, cs.avg_daily_revenue) WHEN 7 THEN COALESCE(cs.saturday_avg, cs.avg_daily_revenue) END * (1 + COALESCE( (SELECT seasonality_factor FROM sales_seasonality WHERE MONTH(fd.forecast_date) = month LIMIT 1), 0 )) * -- Add some randomness within a small range (±5%) (0.95 + (RAND() * 0.1)) ) as forecast_revenue, CASE WHEN cs.data_points >= 60 THEN 90 WHEN cs.data_points >= 30 THEN 80 WHEN cs.data_points >= 14 THEN 70 ELSE 60 END as confidence_level, NOW() as last_calculated_at FROM category_stats cs CROSS JOIN forecast_dates fd WHERE cs.avg_daily_quantity > 0 ON DUPLICATE KEY UPDATE forecast_units = VALUES(forecast_units), forecast_revenue = VALUES(forecast_revenue), confidence_level = VALUES(confidence_level), last_calculated_at = NOW() `); } // Update the main calculation function to include the new metrics async function calculateMetrics() { let pool; const startTime = Date.now(); let processedCount = 0; let totalProducts = 0; try { isCancelled = false; pool = mysql.createPool(dbConfig); const connection = await pool.getConnection(); try { outputProgress({ status: 'running', operation: 'Starting metrics calculation', current: 0, total: 100, elapsed: '0s', remaining: 'Calculating...', rate: 0, percentage: '0' }); // Get total number of products const [countResult] = await connection.query('SELECT COUNT(*) as total FROM products') .catch(err => { logError(err, 'Failed to count products'); throw err; }); totalProducts = countResult[0].total; if (!SKIP_PRODUCT_METRICS) { // Initial progress with percentage outputProgress({ status: 'running', operation: 'Processing sales and stock metrics', current: processedCount, total: totalProducts, elapsed: '0s', remaining: 'Calculating...', rate: 0, percentage: '0' }); // Process in batches of 250 const batchSize = 250; for (let offset = 0; offset < totalProducts; offset += batchSize) { if (isCancelled) { throw new Error('Operation cancelled'); } const [products] = await connection.query('SELECT product_id, vendor FROM products LIMIT ? OFFSET ?', [batchSize, offset]) .catch(err => { logError(err, `Failed to fetch products batch at offset ${offset}`); throw err; }); processedCount += products.length; // Update progress after each batch outputProgress({ status: 'running', operation: 'Processing products', current: processedCount, total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, processedCount, totalProducts), rate: calculateRate(startTime, processedCount), percentage: ((processedCount / totalProducts) * 100).toFixed(1) }); // Process the batch const metricsUpdates = []; for (const product of products) { try { // Get configuration values for this product const [configs] = await connection.query(` WITH product_info AS ( SELECT p.product_id, p.vendor, pc.category_id FROM products p LEFT JOIN product_categories pc ON p.product_id = pc.product_id WHERE p.product_id = ? ), threshold_options AS ( SELECT st.*, CASE WHEN st.category_id = pi.category_id AND st.vendor = pi.vendor THEN 1 -- Category + vendor match WHEN st.category_id = pi.category_id AND st.vendor IS NULL THEN 2 -- Category match WHEN st.category_id IS NULL AND st.vendor = pi.vendor THEN 3 -- Vendor match WHEN st.category_id IS NULL AND st.vendor IS NULL THEN 4 -- Default ELSE 5 END as priority FROM product_info pi CROSS JOIN stock_thresholds st WHERE (st.category_id = pi.category_id OR st.category_id IS NULL) AND (st.vendor = pi.vendor OR st.vendor IS NULL) ), velocity_options AS ( SELECT sv.*, CASE WHEN sv.category_id = pi.category_id AND sv.vendor = pi.vendor THEN 1 WHEN sv.category_id = pi.category_id AND sv.vendor IS NULL THEN 2 WHEN sv.category_id IS NULL AND sv.vendor = pi.vendor THEN 3 WHEN sv.category_id IS NULL AND sv.vendor IS NULL THEN 4 ELSE 5 END as priority FROM product_info pi CROSS JOIN sales_velocity_config sv WHERE (sv.category_id = pi.category_id OR sv.category_id IS NULL) AND (sv.vendor = pi.vendor OR sv.vendor IS NULL) ), safety_options AS ( SELECT ss.*, CASE WHEN ss.category_id = pi.category_id AND ss.vendor = pi.vendor THEN 1 WHEN ss.category_id = pi.category_id AND ss.vendor IS NULL THEN 2 WHEN ss.category_id IS NULL AND ss.vendor = pi.vendor THEN 3 WHEN ss.category_id IS NULL AND ss.vendor IS NULL THEN 4 ELSE 5 END as priority FROM product_info pi CROSS JOIN safety_stock_config ss WHERE (ss.category_id = pi.category_id OR ss.category_id IS NULL) AND (ss.vendor = pi.vendor OR ss.vendor IS NULL) ) SELECT -- Stock thresholds COALESCE( (SELECT critical_days FROM threshold_options ORDER BY priority LIMIT 1), 7 ) as critical_days, COALESCE( (SELECT reorder_days FROM threshold_options ORDER BY priority LIMIT 1), 14 ) as reorder_days, COALESCE( (SELECT overstock_days FROM threshold_options ORDER BY priority LIMIT 1), 90 ) as overstock_days, COALESCE( (SELECT low_stock_threshold FROM threshold_options ORDER BY priority LIMIT 1), 5 ) as low_stock_threshold, -- Sales velocity windows COALESCE( (SELECT daily_window_days FROM velocity_options ORDER BY priority LIMIT 1), 30 ) as daily_window_days, COALESCE( (SELECT weekly_window_days FROM velocity_options ORDER BY priority LIMIT 1), 7 ) as weekly_window_days, COALESCE( (SELECT monthly_window_days FROM velocity_options ORDER BY priority LIMIT 1), 90 ) as monthly_window_days, -- Safety stock config COALESCE( (SELECT coverage_days FROM safety_options ORDER BY priority LIMIT 1), 14 ) as safety_stock_days, COALESCE( (SELECT service_level FROM safety_options ORDER BY priority LIMIT 1), 95.0 ) as service_level, -- ABC Classification (SELECT a_threshold FROM abc_classification_config WHERE id = 1) as abc_a_threshold, (SELECT b_threshold FROM abc_classification_config WHERE id = 1) as abc_b_threshold, (SELECT classification_period_days FROM abc_classification_config WHERE id = 1) as abc_period_days `, [product.product_id]); const config = configs[0] || { critical_days: 7, reorder_days: 14, overstock_days: 90, low_stock_threshold: 5, daily_window_days: 30, weekly_window_days: 7, monthly_window_days: 90, safety_stock_days: 14, service_level: 95.0, abc_a_threshold: 20.0, abc_b_threshold: 50.0, abc_period_days: 90 }; // Calculate sales metrics with trends using configured windows const [salesMetrics] = await connection.query(` WITH sales_summary AS ( SELECT SUM(o.quantity) as total_quantity_sold, SUM((o.price - COALESCE(o.discount, 0)) * o.quantity) as total_revenue, SUM(COALESCE(p.cost_price, 0) * o.quantity) as total_cost, MAX(o.date) as last_sale_date, MIN(o.date) as first_sale_date, COUNT(DISTINCT o.order_number) as number_of_orders, AVG(o.quantity) as avg_quantity_per_order, -- Calculate rolling averages using configured windows SUM(CASE WHEN o.date >= DATE_SUB(CURDATE(), INTERVAL ? DAY) THEN o.quantity ELSE 0 END) as last_30_days_qty, CASE WHEN SUM(CASE WHEN o.date >= DATE_SUB(CURDATE(), INTERVAL ? DAY) THEN o.quantity ELSE 0 END) IS NULL THEN 0 ELSE SUM(CASE WHEN o.date >= DATE_SUB(CURDATE(), INTERVAL ? DAY) THEN o.quantity ELSE 0 END) END as rolling_weekly_avg, SUM(CASE WHEN o.date >= DATE_SUB(CURDATE(), INTERVAL ? DAY) THEN o.quantity ELSE 0 END) as last_month_qty FROM orders o JOIN products p ON o.product_id = p.product_id WHERE o.canceled = 0 AND o.product_id = ? GROUP BY o.product_id ) SELECT total_quantity_sold, total_revenue, total_cost, last_sale_date, first_sale_date, number_of_orders, avg_quantity_per_order, last_30_days_qty / ? as rolling_daily_avg, rolling_weekly_avg / ? as rolling_weekly_avg, last_month_qty / ? as rolling_monthly_avg, total_quantity_sold as total_sales_to_date FROM sales_summary `, [ config.daily_window_days, config.weekly_window_days, config.weekly_window_days, config.monthly_window_days, product.product_id, config.daily_window_days, config.weekly_window_days, config.monthly_window_days ]).catch(err => { logError(err, `Failed to calculate sales metrics for product ${product.product_id}`); throw err; }); // Calculate purchase metrics with proper handling of negative quantities const [purchaseMetrics] = await connection.query(` WITH recent_orders AS ( SELECT date, received_date, received, cost_price, DATEDIFF(received_date, date) as lead_time_days, ROW_NUMBER() OVER (ORDER BY date DESC) as order_rank FROM purchase_orders WHERE status = 'closed' AND product_id = ? AND received > 0 AND received_date IS NOT NULL ), lead_time_orders AS ( SELECT * FROM recent_orders WHERE order_rank <= 5 -- Last 5 orders OR date >= DATE_SUB(CURDATE(), INTERVAL 90 DAY) -- Or orders from last 90 days ) SELECT SUM(CASE WHEN received >= 0 THEN received ELSE 0 END) as total_quantity_purchased, SUM(CASE WHEN received >= 0 THEN cost_price * received ELSE 0 END) as total_cost, MAX(date) as last_purchase_date, MIN(received_date) as first_received_date, MAX(received_date) as last_received_date, AVG(lead_time_days) as avg_lead_time_days, COUNT(*) as orders_analyzed FROM lead_time_orders `, [product.product_id]).catch(err => { logError(err, `Failed to calculate purchase metrics for product ${product.product_id}`); throw err; }); // Get current stock and stock age const [stockInfo] = await connection.query(` SELECT p.stock_quantity, p.cost_price, p.created_at, p.replenishable, p.moq, DATEDIFF(CURDATE(), MIN(po.received_date)) as days_since_first_stock, DATEDIFF(CURDATE(), COALESCE( (SELECT MAX(o2.date) FROM orders o2 WHERE o2.product_id = p.product_id AND o2.canceled = false), CURDATE() -- If no sales, use current date )) as days_since_last_sale, (SELECT SUM(quantity) FROM orders o3 WHERE o3.product_id = p.product_id AND o3.canceled = false) as total_quantity_sold, CASE WHEN EXISTS ( SELECT 1 FROM orders o WHERE o.product_id = p.product_id AND o.date >= DATE_SUB(CURDATE(), INTERVAL 30 DAY) AND o.canceled = false AND (SELECT SUM(quantity) FROM orders o2 WHERE o2.product_id = p.product_id AND o2.date >= o.date AND o2.canceled = false) = 0 ) THEN true ELSE false END as had_recent_stockout FROM products p LEFT JOIN purchase_orders po ON p.product_id = po.product_id AND po.status = 'closed' AND po.received > 0 WHERE p.product_id = ? GROUP BY p.product_id `, [product.product_id]).catch(err => { logError(err, `Failed to get stock info for product ${product.product_id}`); throw err; }); // Calculate metrics const metrics = salesMetrics[0] || {}; const purchases = purchaseMetrics[0] || {}; const stock = stockInfo[0] || {}; const daily_sales_avg = metrics.rolling_daily_avg || 0; const weekly_sales_avg = metrics.rolling_weekly_avg || 0; const monthly_sales_avg = metrics.total_quantity_sold ? metrics.total_quantity_sold / 30 : 0; // Calculate days of inventory with safety factor and lead times const days_of_inventory = daily_sales_avg > 0 ? Math.ceil( (stock.stock_quantity / daily_sales_avg) + (purchases.avg_lead_time_days || config.reorder_days) * (1 + (config.service_level / 100)) ) : null; const weeks_of_inventory = days_of_inventory ? Math.ceil(days_of_inventory / 7) : null; // Calculate margin percent with proper handling of zero revenue const margin_percent = metrics.total_revenue > 0 ? ((metrics.total_revenue - metrics.total_cost) / metrics.total_revenue) * 100 : null; // Calculate current inventory value const inventory_value = (stock.stock_quantity || 0) * (stock.cost_price || 0); // Calculate stock status with improved handling const stock_status = (() => { const days_since_first_stock = stockInfo[0]?.days_since_first_stock || 0; const days_since_last_sale = stockInfo[0]?.days_since_last_sale || 9999; const total_quantity_sold = stockInfo[0]?.total_quantity_sold || 0; const had_recent_stockout = stockInfo[0]?.had_recent_stockout || false; const dq = stock.stock_quantity || 0; const ds = daily_sales_avg || 0; const ws = weekly_sales_avg || 0; const ms = monthly_sales_avg || 0; // If no stock, return immediately if (dq === 0) { return had_recent_stockout ? 'Critical' : 'Out of Stock'; } // 1. Check if truly "New" (≤30 days and no sales) if (days_since_first_stock <= 30 && total_quantity_sold === 0) { return 'New'; } // 2. Handle zero or very low sales velocity cases if (ds === 0 || (ds < 0.1 && ws < 0.5)) { // Less than 1 sale per 10 days and less than 0.5 per week if (days_since_first_stock > config.overstock_days) { return 'Overstocked'; } if (days_since_first_stock > 30) { return 'At Risk'; } } // 3. Calculate days of supply and check velocity trends const days_of_supply = ds > 0 ? dq / ds : 999; const velocity_trend = ds > 0 ? (ds / (ms || ds) - 1) * 100 : 0; // Percent change from monthly to daily avg // Critical stock level if (days_of_supply <= config.critical_days) { return 'Critical'; } // Reorder cases if (days_of_supply <= config.reorder_days || (had_recent_stockout && days_of_supply <= config.reorder_days * 1.5)) { return 'Reorder'; } // At Risk cases (multiple scenarios) if ( // Approaching overstock threshold (days_of_supply >= config.overstock_days * 0.8) || // Significant sales decline (velocity_trend <= -50 && days_of_supply > config.reorder_days * 2) || // No recent sales (days_since_last_sale > 45 && dq > 0) || // Very low velocity with significant stock (ds > 0 && ds < 0.2 && dq > ds * config.overstock_days * 0.5) ) { return 'At Risk'; } // Overstock cases if (days_of_supply >= config.overstock_days) { return 'Overstocked'; } // If none of the above conditions are met return 'Healthy'; })(); // Calculate reorder quantity and overstocked amount let reorder_qty = 0; let overstocked_amt = 0; // Only calculate reorder quantity for replenishable products if (stock.replenishable && (stock_status === 'Critical' || stock_status === 'Reorder')) { const ds = daily_sales_avg || 0; const lt = purchases.avg_lead_time_days || 14; // Default to 14 days if no lead time data const sc = config.safety_stock_days || 14; const ss = config.safety_stock_days || 14; const dq = stock.stock_quantity || 0; const moq = stock.moq || 1; // Calculate desired stock level based on daily sales, lead time, coverage days, and safety stock const desired_stock = (ds * (lt + sc)) + ss; // Calculate raw reorder amount const raw_reorder = Math.max(0, desired_stock - dq); // Round up to nearest MOQ reorder_qty = Math.ceil(raw_reorder / moq) * moq; } // Calculate overstocked amount for overstocked products if (stock_status === 'Overstocked') { const ds = daily_sales_avg || 0; const dq = stock.stock_quantity || 0; const lt = purchases.avg_lead_time_days || 14; const sc = config.safety_stock_days || 14; const ss = config.safety_stock_days || 14; // Calculate maximum desired stock based on overstock days configuration const max_desired_stock = (ds * config.overstock_days) + ss; // Calculate excess inventory overstocked_amt = Math.max(0, dq - max_desired_stock); } // Add to batch update (remove safety_stock from the array since it's calculated separately) metricsUpdates.push([ product.product_id, daily_sales_avg || null, weekly_sales_avg || null, monthly_sales_avg || null, metrics.avg_quantity_per_order || null, metrics.number_of_orders || 0, metrics.first_sale_date || null, metrics.last_sale_date || null, days_of_inventory, weeks_of_inventory, daily_sales_avg > 0 ? Math.max(1, Math.ceil(daily_sales_avg * config.reorder_days)) : null, margin_percent, metrics.total_revenue || 0, inventory_value || 0, purchases.avg_lead_time_days || null, purchases.last_purchase_date || null, purchases.first_received_date || null, purchases.last_received_date || null, stock_status, reorder_qty, overstocked_amt ]); } catch (err) { logError(err, `Failed processing product ${product.product_id}`); // Continue with next product instead of failing entire batch continue; } } // Batch update metrics if (metricsUpdates.length > 0) { await connection.query(` INSERT INTO product_metrics ( product_id, daily_sales_avg, weekly_sales_avg, monthly_sales_avg, avg_quantity_per_order, number_of_orders, first_sale_date, last_sale_date, days_of_inventory, weeks_of_inventory, reorder_point, avg_margin_percent, total_revenue, inventory_value, avg_lead_time_days, last_purchase_date, first_received_date, last_received_date, stock_status, reorder_qty, overstocked_amt ) VALUES ? ON DUPLICATE KEY UPDATE last_calculated_at = NOW(), daily_sales_avg = VALUES(daily_sales_avg), weekly_sales_avg = VALUES(weekly_sales_avg), monthly_sales_avg = VALUES(monthly_sales_avg), avg_quantity_per_order = VALUES(avg_quantity_per_order), number_of_orders = VALUES(number_of_orders), first_sale_date = VALUES(first_sale_date), last_sale_date = VALUES(last_sale_date), days_of_inventory = VALUES(days_of_inventory), weeks_of_inventory = VALUES(weeks_of_inventory), reorder_point = VALUES(reorder_point), avg_margin_percent = VALUES(avg_margin_percent), total_revenue = VALUES(total_revenue), inventory_value = VALUES(inventory_value), avg_lead_time_days = VALUES(avg_lead_time_days), last_purchase_date = VALUES(last_purchase_date), first_received_date = VALUES(first_received_date), last_received_date = VALUES(last_received_date), stock_status = VALUES(stock_status), reorder_qty = VALUES(reorder_qty), overstocked_amt = VALUES(overstocked_amt) `, [metricsUpdates]).catch(err => { logError(err, `Failed to batch update metrics for ${metricsUpdates.length} products`); throw err; }); } } } else { console.log('Skipping product metrics calculation...'); outputProgress({ status: 'running', operation: 'Skipping product metrics calculation', current: Math.floor(totalProducts * 0.6), total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.6), totalProducts), rate: calculateRate(startTime, Math.floor(totalProducts * 0.6)), percentage: '60' }); } outputProgress({ status: 'running', operation: 'Starting financial metrics calculation', current: Math.floor(totalProducts * 0.6), total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.6), totalProducts), rate: calculateRate(startTime, Math.floor(totalProducts * 0.6)), percentage: '60' }); await calculateFinancialMetrics(connection, startTime, totalProducts); outputProgress({ status: 'running', operation: 'Starting vendor metrics calculation', current: Math.floor(totalProducts * 0.7), total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.7), totalProducts), rate: calculateRate(startTime, Math.floor(totalProducts * 0.7)), percentage: '70' }); await calculateVendorMetrics(connection, startTime, totalProducts); outputProgress({ status: 'running', operation: 'Starting turnover metrics calculation', current: Math.floor(totalProducts * 0.75), total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.75), totalProducts), rate: calculateRate(startTime, Math.floor(totalProducts * 0.75)), percentage: '75' }); await calculateTurnoverMetrics(connection, startTime, totalProducts); outputProgress({ status: 'running', operation: 'Starting lead time metrics calculation', current: Math.floor(totalProducts * 0.8), total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.8), totalProducts), rate: calculateRate(startTime, Math.floor(totalProducts * 0.8)), percentage: '80' }); await calculateLeadTimeMetrics(connection, startTime, totalProducts); // Calculate category metrics await calculateCategoryMetrics(connection, startTime, totalProducts); // Calculate category sales metrics await calculateCategorySalesMetrics(connection, startTime, totalProducts); // Calculate ABC classification const [abcConfig] = await connection.query('SELECT a_threshold, b_threshold FROM abc_classification_config WHERE id = 1'); const abcThresholds = abcConfig[0] || { a_threshold: 20, b_threshold: 50 }; await connection.query(` WITH revenue_rankings AS ( SELECT product_id, total_revenue, PERCENT_RANK() OVER (ORDER BY COALESCE(total_revenue, 0) DESC) * 100 as revenue_percentile FROM product_metrics ), classification_update AS ( SELECT product_id, CASE WHEN revenue_percentile <= ? THEN 'A' WHEN revenue_percentile <= ? THEN 'B' ELSE 'C' END as abc_class FROM revenue_rankings ) UPDATE product_metrics pm JOIN classification_update cu ON pm.product_id = cu.product_id SET pm.abc_class = cu.abc_class, pm.last_calculated_at = NOW() `, [abcThresholds.a_threshold, abcThresholds.b_threshold]); // Calculate time-based aggregates outputProgress({ status: 'running', operation: 'Starting time-based aggregates calculation', current: Math.floor(totalProducts * 0.85), total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.85), totalProducts), rate: calculateRate(startTime, Math.floor(totalProducts * 0.85)), percentage: '85' }); await connection.query('TRUNCATE TABLE product_time_aggregates;'); await connection.query(` INSERT INTO product_time_aggregates ( product_id, year, month, total_quantity_sold, total_revenue, total_cost, order_count, stock_received, stock_ordered, avg_price, profit_margin ) WITH sales_data AS ( SELECT o.product_id, YEAR(o.date) as year, MONTH(o.date) as month, SUM(o.quantity) as total_quantity_sold, SUM((o.price - COALESCE(o.discount, 0)) * o.quantity) as total_revenue, SUM(COALESCE(p.cost_price, 0) * o.quantity) as total_cost, COUNT(DISTINCT o.order_number) as order_count, AVG(o.price - COALESCE(o.discount, 0)) as avg_price, CASE WHEN SUM((o.price - COALESCE(o.discount, 0)) * o.quantity) = 0 THEN 0 ELSE ((SUM((o.price - COALESCE(o.discount, 0)) * o.quantity) - SUM(COALESCE(p.cost_price, 0) * o.quantity)) / SUM((o.price - COALESCE(o.discount, 0)) * o.quantity)) * 100 END as profit_margin FROM orders o JOIN products p ON o.product_id = p.product_id WHERE o.canceled = 0 GROUP BY o.product_id, YEAR(o.date), MONTH(o.date) ), purchase_data AS ( SELECT product_id, YEAR(date) as year, MONTH(date) as month, SUM(received) as stock_received, SUM(ordered) as stock_ordered FROM purchase_orders WHERE status = 'closed' GROUP BY product_id, YEAR(date), MONTH(date) ) SELECT s.product_id, s.year, s.month, s.total_quantity_sold, s.total_revenue, s.total_cost, s.order_count, COALESCE(p.stock_received, 0) as stock_received, COALESCE(p.stock_ordered, 0) as stock_ordered, 0 as avg_price, 0 as profit_margin FROM sales_data s LEFT JOIN purchase_data p ON s.product_id = p.product_id AND s.year = p.year AND s.month = p.month UNION SELECT p.product_id, p.year, p.month, 0 as total_quantity_sold, 0 as total_revenue, 0 as total_cost, 0 as order_count, p.stock_received, p.stock_ordered, 0 as avg_price, 0 as profit_margin FROM purchase_data p LEFT JOIN sales_data s ON p.product_id = s.product_id AND p.year = s.year AND p.month = s.month WHERE s.product_id IS NULL `); // Add new metric calculations before final success message await calculateBrandMetrics(connection, startTime, totalProducts); await calculateSalesForecasts(connection, startTime, totalProducts); // Final success message outputProgress({ status: 'complete', operation: 'Metrics calculation complete', current: totalProducts, total: totalProducts, elapsed: formatElapsedTime(startTime), remaining: '0s', rate: calculateRate(startTime, totalProducts), percentage: '100' }); // Clear progress file on successful completion clearProgress(); } catch (error) { if (isCancelled) { outputProgress({ status: 'cancelled', operation: 'Calculation cancelled', current: processedCount, total: totalProducts || 0, elapsed: formatElapsedTime(startTime), remaining: null, rate: calculateRate(startTime, processedCount), percentage: ((processedCount / (totalProducts || 1)) * 100).toFixed(1) }); } else { outputProgress({ status: 'error', operation: 'Error: ' + error.message, current: processedCount, total: totalProducts || 0, elapsed: formatElapsedTime(startTime), remaining: null, rate: calculateRate(startTime, processedCount), percentage: ((processedCount / (totalProducts || 1)) * 100).toFixed(1) }); } throw error; } finally { connection.release(); } } finally { if (pool) { await pool.end(); } } } // Export both functions and progress checker module.exports = calculateMetrics; module.exports.cancelCalculation = cancelCalculation; module.exports.getProgress = getProgress; // Run directly if called from command line if (require.main === module) { calculateMetrics().catch(error => { if (!error.message.includes('Operation cancelled')) { console.error('Error:', error); } process.exit(1); }); }