From 6c524aa3a96a6ae05c050ede883a39f18328c960 Mon Sep 17 00:00:00 2001 From: Matt Date: Sun, 12 Jan 2025 13:09:40 -0500 Subject: [PATCH] Fix time out error on data import and fix regression on progress display + sort out core and metrics schemas better --- inventory-server/db/metrics-schema.sql | 22 +- inventory-server/db/schema.sql | 19 +- inventory-server/scripts/import-csv.js | 177 ++---------- inventory-server/scripts/reset-metrics.js | 8 + inventory/src/pages/Settings.tsx | 326 +++++++++++----------- 5 files changed, 211 insertions(+), 341 deletions(-) diff --git a/inventory-server/db/metrics-schema.sql b/inventory-server/db/metrics-schema.sql index ebf1e63..0925982 100644 --- a/inventory-server/db/metrics-schema.sql +++ b/inventory-server/db/metrics-schema.sql @@ -107,4 +107,24 @@ FROM LEFT JOIN product_metrics pm ON p.product_id = pm.product_id WHERE - p.managing_stock = true; \ No newline at end of file + p.managing_stock = true; + +-- Create view for sales trends analysis +CREATE OR REPLACE VIEW product_sales_trends AS +SELECT + p.product_id, + p.SKU, + p.title, + COALESCE(SUM(o.quantity), 0) as total_sold, + COALESCE(AVG(o.quantity), 0) as avg_quantity_per_order, + COALESCE(COUNT(DISTINCT o.order_number), 0) as number_of_orders, + MIN(o.date) as first_sale_date, + MAX(o.date) as last_sale_date +FROM + products p +LEFT JOIN + orders o ON p.product_id = o.product_id +WHERE + o.canceled = false +GROUP BY + p.product_id, p.SKU, p.title; \ No newline at end of file diff --git a/inventory-server/db/schema.sql b/inventory-server/db/schema.sql index c681001..cc6e0bc 100644 --- a/inventory-server/db/schema.sql +++ b/inventory-server/db/schema.sql @@ -110,21 +110,4 @@ CREATE TABLE purchase_orders ( SET FOREIGN_KEY_CHECKS = 1; -- Create views for common calculations -CREATE OR REPLACE VIEW product_sales_trends AS -SELECT - p.product_id, - p.SKU, - p.title, - COALESCE(SUM(o.quantity), 0) as total_sold, - COALESCE(AVG(o.quantity), 0) as avg_quantity_per_order, - COALESCE(COUNT(DISTINCT o.order_number), 0) as number_of_orders, - MIN(o.date) as first_sale_date, - MAX(o.date) as last_sale_date -FROM - products p -LEFT JOIN - orders o ON p.product_id = o.product_id -WHERE - o.canceled = false -GROUP BY - p.product_id, p.SKU, p.title; \ No newline at end of file +-- product_sales_trends view moved to metrics-schema.sql \ No newline at end of file diff --git a/inventory-server/scripts/import-csv.js b/inventory-server/scripts/import-csv.js index 10d8117..fe9345a 100644 --- a/inventory-server/scripts/import-csv.js +++ b/inventory-server/scripts/import-csv.js @@ -54,20 +54,23 @@ function logImport(message) { fs.appendFileSync(IMPORT_LOG, logMessage); } -// Helper function to output progress in JSON format +// Helper function to format duration +function formatDuration(seconds) { + const hours = Math.floor(seconds / 3600); + const minutes = Math.floor((seconds % 3600) / 60); + seconds = Math.floor(seconds % 60); + + const parts = []; + if (hours > 0) parts.push(`${hours}h`); + if (minutes > 0) parts.push(`${minutes}m`); + if (seconds > 0 || parts.length === 0) parts.push(`${seconds}s`); + + return parts.join(' '); +} + +// Helper function to output progress function outputProgress(data) { - if (!data.status) { - data = { - status: 'running', - ...data - }; - } - - // Log progress to import log - logImport(JSON.stringify(data)); - - // Output to console - console.log(JSON.stringify(data)); + process.stdout.write(JSON.stringify(data) + '\n'); } // Helper function to count total rows in a CSV file @@ -82,14 +85,6 @@ async function countRows(filePath) { }); } -// Helper function to format time duration -function formatDuration(seconds) { - if (seconds < 60) return `${Math.round(seconds)}s`; - const minutes = Math.floor(seconds / 60); - seconds = Math.round(seconds % 60); - return `${minutes}m ${seconds}s`; -} - // Helper function to update progress with time estimate function updateProgress(current, total, operation, startTime) { const elapsed = (Date.now() - startTime) / 1000; @@ -401,118 +396,6 @@ async function calculateVendorMetrics(connection) { } } -// Helper function to calculate metrics in batches -async function calculateMetricsInBatch(connection) { - try { - // Clear temporary tables - await connection.query('TRUNCATE TABLE temp_sales_metrics'); - await connection.query('TRUNCATE TABLE temp_purchase_metrics'); - - // Calculate sales metrics for all products in one go - await connection.query(` - INSERT INTO temp_sales_metrics - SELECT - o.product_id, - COUNT(*) / NULLIF(DATEDIFF(MAX(o.date), MIN(o.date)), 0) as daily_sales_avg, - SUM(o.quantity) / NULLIF(DATEDIFF(MAX(o.date), MIN(o.date)), 0) * 7 as weekly_sales_avg, - SUM(o.quantity) / NULLIF(DATEDIFF(MAX(o.date), MIN(o.date)), 0) * 30 as monthly_sales_avg, - SUM(o.price * o.quantity) as total_revenue, - AVG((o.price - p.cost_price) / o.price * 100) as avg_margin_percent, - MIN(o.date) as first_sale_date, - MAX(o.date) as last_sale_date - FROM orders o - JOIN products p ON o.product_id = p.product_id - WHERE o.canceled = false - GROUP BY o.product_id - `); - - // Calculate purchase metrics for all products in one go - await connection.query(` - INSERT INTO temp_purchase_metrics - SELECT - product_id, - AVG(DATEDIFF(received_date, date)) as avg_lead_time_days, - MAX(date) as last_purchase_date, - MAX(received_date) as last_received_date - FROM purchase_orders - WHERE status = 'closed' - GROUP BY product_id - `); - - // Update product_metrics table with all metrics at once - await connection.query(` - INSERT INTO product_metrics ( - product_id, daily_sales_avg, weekly_sales_avg, monthly_sales_avg, - days_of_inventory, weeks_of_inventory, safety_stock, reorder_point, - avg_margin_percent, total_revenue, avg_lead_time_days, - last_purchase_date, last_received_date - ) - SELECT - p.product_id, - COALESCE(s.daily_sales_avg, 0), - COALESCE(s.weekly_sales_avg, 0), - COALESCE(s.monthly_sales_avg, 0), - CASE - WHEN s.daily_sales_avg > 0 THEN FLOOR(p.stock_quantity / s.daily_sales_avg) - ELSE 999 - END as days_of_inventory, - CASE - WHEN s.daily_sales_avg > 0 THEN FLOOR(p.stock_quantity / s.daily_sales_avg / 7) - ELSE 999 - END as weeks_of_inventory, - CEIL(COALESCE(s.daily_sales_avg, 0) * 14) as safety_stock, - CEIL(COALESCE(s.daily_sales_avg, 0) * 21) as reorder_point, - COALESCE(s.avg_margin_percent, 0), - COALESCE(s.total_revenue, 0), - COALESCE(pm.avg_lead_time_days, 0), - pm.last_purchase_date, - pm.last_received_date - FROM products p - LEFT JOIN temp_sales_metrics s ON p.product_id = s.product_id - LEFT JOIN temp_purchase_metrics pm ON p.product_id = pm.product_id - ON DUPLICATE KEY UPDATE - daily_sales_avg = VALUES(daily_sales_avg), - weekly_sales_avg = VALUES(weekly_sales_avg), - monthly_sales_avg = VALUES(monthly_sales_avg), - days_of_inventory = VALUES(days_of_inventory), - weeks_of_inventory = VALUES(weeks_of_inventory), - safety_stock = VALUES(safety_stock), - reorder_point = VALUES(reorder_point), - avg_margin_percent = VALUES(avg_margin_percent), - total_revenue = VALUES(total_revenue), - avg_lead_time_days = VALUES(avg_lead_time_days), - last_purchase_date = VALUES(last_purchase_date), - last_received_date = VALUES(last_received_date), - last_calculated_at = CURRENT_TIMESTAMP - `); - - // Calculate ABC classification in one go - await connection.query(` - WITH revenue_ranks AS ( - SELECT - product_id, - total_revenue, - total_revenue / SUM(total_revenue) OVER () * 100 as revenue_percent, - ROW_NUMBER() OVER (ORDER BY total_revenue DESC) as rank - FROM product_metrics - WHERE total_revenue > 0 - ) - UPDATE product_metrics pm - JOIN revenue_ranks r ON pm.product_id = r.product_id - SET abc_class = - CASE - WHEN r.revenue_percent >= 20 THEN 'A' - WHEN r.revenue_percent >= 5 THEN 'B' - ELSE 'C' - END - `); - - } catch (error) { - logError(error, 'Error in batch metrics calculation'); - throw error; - } -} - async function importProducts(pool, filePath) { const parser = fs.createReadStream(filePath).pipe(csv.parse({ columns: true, trim: true })); const totalRows = PRODUCTS_TEST_LIMIT > 0 ? Math.min(await countRows(filePath), PRODUCTS_TEST_LIMIT) : await countRows(filePath); @@ -1085,7 +968,7 @@ async function main() { connection.release(); } - // Step 1: Import all data first + // Import all data try { // Import products first since they're referenced by other tables await importProducts(pool, path.join(__dirname, '../csv/39f2x83-products.csv')); @@ -1101,34 +984,16 @@ async function main() { importPurchaseOrders(pool, path.join(__dirname, '../csv/39f2x83-purchase_orders.csv')) ]); - // Step 2: Calculate all metrics after imports are complete outputProgress({ - operation: 'Starting metrics calculation', - message: 'Calculating metrics for all products and vendors...' + status: 'complete', + operation: 'Import process completed', + duration: formatDuration((Date.now() - startTime) / 1000) }); - const connection = await pool.getConnection(); - try { - // Calculate metrics in batches - await calculateMetricsInBatch(connection); - - // Calculate vendor metrics - await calculateVendorMetrics(connection); - - } finally { - connection.release(); - } - } catch (error) { - logError(error, 'Error during import/metrics calculation'); + logError(error, 'Error during import'); throw error; } - - outputProgress({ - status: 'complete', - operation: 'Import process completed', - duration: formatDuration((Date.now() - startTime) / 1000) - }); } catch (error) { logError(error, 'Fatal error during import process'); outputProgress({ diff --git a/inventory-server/scripts/reset-metrics.js b/inventory-server/scripts/reset-metrics.js index 443655f..287fb1b 100644 --- a/inventory-server/scripts/reset-metrics.js +++ b/inventory-server/scripts/reset-metrics.js @@ -84,6 +84,14 @@ async function resetMetrics() { // Disable foreign key checks first await connection.query('SET FOREIGN_KEY_CHECKS = 0'); + // Drop the metrics views first + outputProgress({ + status: 'running', + operation: 'Dropping metrics views', + percentage: '15' + }); + await connection.query('DROP VIEW IF EXISTS inventory_health, product_sales_trends'); + // Drop only the metrics tables if they exist const [existing] = await connection.query(` SELECT GROUP_CONCAT(table_name) as tables diff --git a/inventory/src/pages/Settings.tsx b/inventory/src/pages/Settings.tsx index 093148d..c5c0ed2 100644 --- a/inventory/src/pages/Settings.tsx +++ b/inventory/src/pages/Settings.tsx @@ -94,119 +94,102 @@ export function Settings() { setEventSource(null); } - try { - console.log(`Creating new EventSource for ${config.apiUrl}/csv/${type}/progress`); - const source = new EventSource(`${config.apiUrl}/csv/${type}/progress`, { - withCredentials: true - }); - - // Set up handlers before setting state - source.onopen = () => { - console.log('EventSource connected successfully'); - // Set event source state only after successful connection - setEventSource(source); - }; + let retryCount = 0; + const MAX_RETRIES = 3; + const RETRY_DELAY = 2000; // 2 seconds - source.onerror = (event) => { - console.error('EventSource failed:', event); - source.close(); + const setupConnection = () => { + try { + console.log(`Creating new EventSource for ${config.apiUrl}/csv/${type}/progress`); + const source = new EventSource(`${config.apiUrl}/csv/${type}/progress`, { + withCredentials: true + }); - // Reset states based on type - switch (type) { - case 'update': - setIsUpdating(false); - setUpdateProgress(null); - break; - case 'import': - setIsImporting(false); - setImportProgress(null); - break; - case 'reset': - setIsResetting(false); - setResetProgress(null); - break; - case 'reset-metrics': - setIsResettingMetrics(false); - setResetMetricsProgress(null); - break; - case 'calculate-metrics': - setIsCalculatingMetrics(false); - setMetricsProgress(null); - break; - } - - setEventSource(null); - handleError(type.charAt(0).toUpperCase() + type.slice(1), 'Lost connection to server'); - }; + source.onopen = () => { + console.log('EventSource connected successfully'); + retryCount = 0; // Reset retry count on successful connection + setEventSource(source); + }; - source.onmessage = (event) => { - try { - console.log(`Received message for ${type}:`, event.data); + source.onerror = async (event) => { + console.error('EventSource error:', event); + source.close(); - // First parse the outer message - const data = JSON.parse(event.data); - - // If we have a progress field that's a string containing multiple JSON objects - if (data.progress && typeof data.progress === 'string') { - // Split the progress string into separate JSON objects - const progressMessages = data.progress - .split('\n') - .filter(Boolean) - .map((message: string) => { - try { - return JSON.parse(message); - } catch (err) { - console.warn('Failed to parse progress message:', message, err); - return null; - } - }) - .filter((msg: unknown): msg is ImportProgress => msg !== null); + // Only retry if we haven't exceeded max retries and we're still in the active state + const isActive = type === 'import' ? isImporting : + type === 'update' ? isUpdating : + type === 'reset' ? isResetting : + type === 'reset-metrics' ? isResettingMetrics : + type === 'calculate-metrics' ? isCalculatingMetrics : false; - // Process each progress message - progressMessages.forEach((progressData: ImportProgress) => { - handleProgressUpdate(type, progressData, source); - }); - } else { - // Handle single message case - const progressData = data.progress || data; - handleProgressUpdate(type, progressData, source); + if (retryCount < MAX_RETRIES && isActive) { + console.log(`Retrying connection (${retryCount + 1}/${MAX_RETRIES})...`); + retryCount++; + // Wait before retrying + await new Promise(resolve => setTimeout(resolve, RETRY_DELAY)); + setupConnection(); + } else if (retryCount >= MAX_RETRIES) { + console.log('Max retries exceeded, but operation may still be running...'); + // Don't reset states or show error, just log it + console.warn(`Lost connection to ${type} progress stream after ${MAX_RETRIES} retries`); } - } catch (error) { - console.error('Error parsing event data:', error, event.data); - handleError(type.charAt(0).toUpperCase() + type.slice(1), 'Failed to parse server response'); - } - }; + + setEventSource(null); + }; - } catch (error) { - console.error('Failed to set up EventSource:', error); - - // Reset operation state - switch (type) { - case 'update': - setIsUpdating(false); - setUpdateProgress(null); - break; - case 'import': - setIsImporting(false); - setImportProgress(null); - break; - case 'reset': - setIsResetting(false); - setResetProgress(null); - break; - case 'reset-metrics': - setIsResettingMetrics(false); - setResetMetricsProgress(null); - break; - case 'calculate-metrics': - setIsCalculatingMetrics(false); - setMetricsProgress(null); - break; + source.onmessage = (event) => { + try { + console.log(`Received message for ${type}:`, event.data); + + // First parse the outer message + const data = JSON.parse(event.data); + + // If we have a progress field that's a string containing multiple JSON objects + if (data.progress && typeof data.progress === 'string') { + // Split the progress string into separate JSON objects + const progressMessages = data.progress + .split('\n') + .filter(Boolean) + .map((message: string) => { + try { + return JSON.parse(message); + } catch (err) { + console.warn('Failed to parse progress message:', message, err); + return null; + } + }) + .filter((msg: unknown): msg is ImportProgress => msg !== null); + + // Process each progress message + progressMessages.forEach((progressData: ImportProgress) => { + handleProgressUpdate(type, progressData, source); + }); + } else { + // Handle single message case + const progressData = data.progress || data; + handleProgressUpdate(type, progressData, source); + } + } catch (error) { + console.error('Error parsing event data:', error, event.data); + // Don't show error to user for parsing issues, just log them + console.warn('Failed to parse server response:', error); + } + }; + + } catch (error) { + console.error('Failed to set up EventSource:', error); + if (retryCount < MAX_RETRIES) { + console.log(`Retrying connection (${retryCount + 1}/${MAX_RETRIES})...`); + retryCount++; + setTimeout(setupConnection, RETRY_DELAY); + } else { + console.log('Max retries exceeded, but operation may still be running...'); + } } - - handleError(type.charAt(0).toUpperCase() + type.slice(1), 'Failed to connect to server'); - } - }, [eventSource, handleComplete, handleError]); + }; + + setupConnection(); + }, [eventSource, handleComplete, handleError, isImporting, isUpdating, isResetting, isResettingMetrics, isCalculatingMetrics]); // Helper function to process a single progress update const handleProgressUpdate = ( @@ -225,6 +208,18 @@ export function Settings() { status: progressData.status || 'running' }; + // For import type, handle orders and purchase orders separately + if (type === 'import' && progressData.operation) { + const operation = progressData.operation.toLowerCase(); + if (operation.includes('purchase orders')) { + setPurchaseOrdersProgress(prev => ({ + ...prev, + ...processedData + })); + return; // Don't update main import progress for PO updates + } + } + // Update progress state based on type switch (type) { case 'update': @@ -261,6 +256,29 @@ export function Settings() { // Handle completion if (progressData.status === 'complete') { console.log(`Operation ${type} completed`); + + // For import, only close connection when both operations are complete + if (type === 'import') { + const operation = progressData.operation?.toLowerCase() || ''; + if (operation.includes('purchase orders')) { + setPurchaseOrdersProgress(null); + } else { + setImportProgress(null); + } + + // Only fully complete if both are done + if (!importProgress || !purchaseOrdersProgress) { + source.close(); + setEventSource(null); + setIsImporting(false); + if (!progressData.operation?.includes('cancelled')) { + handleComplete(`${type.charAt(0).toUpperCase() + type.slice(1)}`); + } + } + return; + } + + // For other operations, close immediately source.close(); setEventSource(null); @@ -270,11 +288,6 @@ export function Settings() { setIsUpdating(false); setUpdateProgress(null); break; - case 'import': - setIsImporting(false); - setImportProgress(null); - setPurchaseOrdersProgress(null); - break; case 'reset': setIsResetting(false); setResetProgress(null); @@ -306,6 +319,8 @@ export function Settings() { break; case 'import': setIsImporting(false); + setImportProgress(null); + setPurchaseOrdersProgress(null); break; case 'reset': setIsResetting(false); @@ -489,7 +504,7 @@ export function Settings() { // Connect to SSE for progress updates first connectToEventSource('import'); - // Make the import request + // Make the import request - no timeout needed since this just starts the process const response = await fetch(`${config.apiUrl}/csv/import`, { method: 'POST', headers: { @@ -497,8 +512,22 @@ export function Settings() { }, credentials: 'include', body: JSON.stringify(limits) + }).catch(error => { + // If we already have progress, ignore network errors + if ((importProgress?.current || purchaseOrdersProgress?.current) && + (error.name === 'TypeError')) { + console.log('Request error but import is in progress:', error); + return null; + } + throw error; }); + // If response is null, it means we caught an error but have progress + if (!response) { + console.log('Continuing with existing progress...'); + return; + } + if (!response.ok) { const data = await response.json().catch(() => ({})); // Only handle error if we don't have any progress yet @@ -507,17 +536,23 @@ export function Settings() { } } } catch (error) { - // Only clean up if we don't have any progress - if (!importProgress?.current && !purchaseOrdersProgress?.current) { - if (eventSource) { - eventSource.close(); - setEventSource(null); - } - setIsImporting(false); - setImportProgress(null); - setPurchaseOrdersProgress(null); - handleError('Data import', error instanceof Error ? error.message : 'Unknown error'); + console.log('Import error:', error); + + // If we have any progress, assume the import is still running + if (importProgress?.current || purchaseOrdersProgress?.current) { + console.log('Error occurred but import appears to be running'); + return; } + + // Only clean up if we don't have any progress + if (eventSource) { + eventSource.close(); + setEventSource(null); + } + setIsImporting(false); + setImportProgress(null); + setPurchaseOrdersProgress(null); + handleError('Data import', error instanceof Error ? error.message : 'Unknown error'); } }; @@ -783,12 +818,12 @@ export function Settings() {