From 8e19e6cd746c3f9a209fd7f597c682f2ae728cf4 Mon Sep 17 00:00:00 2001 From: Matt Date: Wed, 26 Mar 2025 14:22:08 -0400 Subject: [PATCH] Finish fixing calculate scripts --- inventory-server/scripts/calculate-metrics.js | 343 +++++------------- .../scripts/metrics/brand-metrics.js | 6 +- .../scripts/metrics/category-metrics.js | 51 ++- .../scripts/metrics/financial-metrics.js | 8 +- .../scripts/metrics/product-metrics.js | 228 +++++++----- .../scripts/metrics/time-aggregates.js | 18 +- .../scripts/metrics/vendor-metrics.js | 16 +- 7 files changed, 295 insertions(+), 375 deletions(-) diff --git a/inventory-server/scripts/calculate-metrics.js b/inventory-server/scripts/calculate-metrics.js index 04b353b..adaf7dd 100644 --- a/inventory-server/scripts/calculate-metrics.js +++ b/inventory-server/scripts/calculate-metrics.js @@ -62,13 +62,24 @@ const TEMP_TABLES = [ // Add cleanup function for temporary tables async function cleanupTemporaryTables(connection) { + // List of possible temporary tables that might exist + const tempTables = [ + 'temp_sales_metrics', + 'temp_purchase_metrics', + 'temp_forecast_dates', + 'temp_daily_sales', + 'temp_product_stats', + 'temp_category_sales', + 'temp_category_stats' + ]; + try { - for (const table of TEMP_TABLES) { + // Drop each temporary table if it exists + for (const table of tempTables) { await connection.query(`DROP TABLE IF EXISTS ${table}`); } - } catch (error) { - logError(error, 'Error cleaning up temporary tables'); - throw error; // Re-throw to be handled by the caller + } catch (err) { + console.error('Error cleaning up temporary tables:', err); } } @@ -86,22 +97,42 @@ let isCancelled = false; function cancelCalculation() { isCancelled = true; - global.clearProgress(); - // Format as SSE event - const event = { - progress: { - status: 'cancelled', - operation: 'Calculation cancelled', - current: 0, - total: 0, - elapsed: null, - remaining: null, - rate: 0, - timestamp: Date.now() - } + console.log('Calculation has been cancelled by user'); + + // Force-terminate any query that's been running for more than 5 seconds + try { + const connection = getConnection(); + connection.then(async (conn) => { + try { + // Identify and terminate long-running queries from our application + await conn.query(` + SELECT pg_cancel_backend(pid) + FROM pg_stat_activity + WHERE query_start < now() - interval '5 seconds' + AND application_name LIKE '%node%' + AND query NOT LIKE '%pg_cancel_backend%' + `); + + // Clean up any temporary tables + await cleanupTemporaryTables(conn); + + // Release connection + conn.release(); + } catch (err) { + console.error('Error during force cancellation:', err); + conn.release(); + } + }).catch(err => { + console.error('Could not get connection for cancellation:', err); + }); + } catch (err) { + console.error('Failed to terminate running queries:', err); + } + + return { + success: true, + message: 'Calculation has been cancelled' }; - process.stdout.write(JSON.stringify(event) + '\n'); - process.exit(0); } // Handle SIGTERM signal for cancellation @@ -119,6 +150,15 @@ async function calculateMetrics() { let totalPurchaseOrders = 0; let calculateHistoryId; + // Set a maximum execution time (30 minutes) + const MAX_EXECUTION_TIME = 30 * 60 * 1000; + const timeout = setTimeout(() => { + console.error(`Calculation timed out after ${MAX_EXECUTION_TIME/1000} seconds, forcing termination`); + // Call cancel and force exit + cancelCalculation(); + process.exit(1); + }, MAX_EXECUTION_TIME); + try { // Clean up any previously running calculations connection = await getConnection(); @@ -360,223 +400,6 @@ async function calculateMetrics() { console.log('Skipping sales forecasts calculation'); } - // Calculate ABC classification - outputProgress({ - status: 'running', - operation: 'Starting ABC classification', - current: processedProducts || 0, - total: totalProducts || 0, - elapsed: formatElapsedTime(startTime), - remaining: estimateRemaining(startTime, processedProducts || 0, totalProducts || 0), - rate: calculateRate(startTime, processedProducts || 0), - percentage: (((processedProducts || 0) / (totalProducts || 1)) * 100).toFixed(1), - timing: { - start_time: new Date(startTime).toISOString(), - end_time: new Date().toISOString(), - elapsed_seconds: Math.round((Date.now() - startTime) / 1000) - } - }); - - if (isCancelled) return { - processedProducts: processedProducts || 0, - processedOrders: processedOrders || 0, - processedPurchaseOrders: 0, - success: false - }; - - const abcConfigResult = await connection.query('SELECT a_threshold, b_threshold FROM abc_classification_config WHERE id = 1'); - const abcThresholds = abcConfigResult.rows[0] || { a_threshold: 20, b_threshold: 50 }; - - // First, create and populate the rankings table - await connection.query('DROP TABLE IF EXISTS temp_revenue_ranks'); - await connection.query(` - CREATE TEMPORARY TABLE temp_revenue_ranks ( - pid BIGINT NOT NULL, - total_revenue DECIMAL(10,3), - rank_num INT, - total_count INT, - PRIMARY KEY (pid) - ) - `); - await connection.query('CREATE INDEX ON temp_revenue_ranks (rank_num)'); - - outputProgress({ - status: 'running', - operation: 'Creating revenue rankings', - current: processedProducts || 0, - total: totalProducts || 0, - elapsed: formatElapsedTime(startTime), - remaining: estimateRemaining(startTime, processedProducts || 0, totalProducts || 0), - rate: calculateRate(startTime, processedProducts || 0), - percentage: (((processedProducts || 0) / (totalProducts || 1)) * 100).toFixed(1), - timing: { - start_time: new Date(startTime).toISOString(), - end_time: new Date().toISOString(), - elapsed_seconds: Math.round((Date.now() - startTime) / 1000) - } - }); - - if (isCancelled) return { - processedProducts: processedProducts || 0, - processedOrders: processedOrders || 0, - processedPurchaseOrders: 0, - success: false - }; - - // Use window functions instead of user variables - await connection.query(` - INSERT INTO temp_revenue_ranks - WITH ranked AS ( - SELECT - pid, - total_revenue, - ROW_NUMBER() OVER (ORDER BY total_revenue DESC) as rank_num, - COUNT(*) OVER () as total_count - FROM product_metrics - WHERE total_revenue > 0 - ) - SELECT - pid, - total_revenue, - rank_num, - total_count - FROM ranked - `); - - // Get total count for percentage calculation - const rankingCountResult = await connection.query('SELECT MAX(rank_num) as total_count FROM temp_revenue_ranks'); - const totalCount = parseInt(rankingCountResult.rows[0].total_count) || 1; - const max_rank = totalCount; // Store max_rank for use in classification - - outputProgress({ - status: 'running', - operation: 'Updating ABC classifications', - current: processedProducts || 0, - total: totalProducts || 0, - elapsed: formatElapsedTime(startTime), - remaining: estimateRemaining(startTime, processedProducts || 0, totalProducts || 0), - rate: calculateRate(startTime, processedProducts || 0), - percentage: (((processedProducts || 0) / (totalProducts || 1)) * 100).toFixed(1), - timing: { - start_time: new Date(startTime).toISOString(), - end_time: new Date().toISOString(), - elapsed_seconds: Math.round((Date.now() - startTime) / 1000) - } - }); - - if (isCancelled) return { - processedProducts: processedProducts || 0, - processedOrders: processedOrders || 0, - processedPurchaseOrders: 0, - success: false - }; - - // ABC classification progress tracking - let abcProcessedCount = 0; - const batchSize = 5000; - let lastProgressUpdate = Date.now(); - const progressUpdateInterval = 1000; // Update every second - - while (true) { - if (isCancelled) return { - processedProducts: Number(processedProducts) || 0, - processedOrders: Number(processedOrders) || 0, - processedPurchaseOrders: 0, - success: false - }; - - // First get a batch of PIDs that need updating - const pidsResult = await connection.query(` - SELECT pm.pid - FROM product_metrics pm - LEFT JOIN temp_revenue_ranks tr ON pm.pid = tr.pid - WHERE pm.abc_class IS NULL - OR pm.abc_class != - CASE - WHEN tr.rank_num IS NULL THEN 'C' - WHEN (tr.rank_num::float / $1::float) * 100 <= $2 THEN 'A' - WHEN (tr.rank_num::float / $1::float) * 100 <= $3 THEN 'B' - ELSE 'C' - END - LIMIT $4 - `, [max_rank, abcThresholds.a_threshold, - abcThresholds.b_threshold, - batchSize]); - - if (pidsResult.rows.length === 0) { - break; - } - - // Then update just those PIDs - const pidValues = pidsResult.rows.map(row => row.pid); - const result = await connection.query(` - UPDATE product_metrics pm - SET abc_class = - CASE - WHEN tr.rank_num IS NULL THEN 'C' - WHEN (tr.rank_num::float / $1::float) * 100 <= $2 THEN 'A' - WHEN (tr.rank_num::float / $1::float) * 100 <= $3 THEN 'B' - ELSE 'C' - END, - last_calculated_at = NOW() - FROM temp_revenue_ranks tr - WHERE pm.pid = tr.pid AND pm.pid = ANY($4::bigint[]) - OR (pm.pid = ANY($4::bigint[]) AND tr.pid IS NULL) - `, [max_rank, abcThresholds.a_threshold, - abcThresholds.b_threshold, - pidValues]); - - abcProcessedCount += result.rowCount; - - // Calculate progress ensuring valid numbers - const currentProgress = Math.floor(totalProducts * (0.99 + (abcProcessedCount / (totalCount || 1)) * 0.01)); - processedProducts = Number(currentProgress) || processedProducts || 0; - - // Only update progress at most once per second - const now = Date.now(); - if (now - lastProgressUpdate >= progressUpdateInterval) { - const progress = ensureValidProgress(processedProducts, totalProducts); - - outputProgress({ - status: 'running', - operation: 'ABC classification progress', - current: progress.current, - total: progress.total, - elapsed: formatElapsedTime(startTime), - remaining: estimateRemaining(startTime, progress.current, progress.total), - rate: calculateRate(startTime, progress.current), - percentage: progress.percentage, - timing: { - start_time: new Date(startTime).toISOString(), - end_time: new Date().toISOString(), - elapsed_seconds: Math.round((Date.now() - startTime) / 1000) - } - }); - - lastProgressUpdate = now; - } - - // Update database progress - await updateProgress(processedProducts, processedOrders, processedPurchaseOrders); - - // Small delay between batches to allow other transactions - await new Promise(resolve => setTimeout(resolve, 100)); - } - - // Clean up - await connection.query('DROP TABLE IF EXISTS temp_revenue_ranks'); - - const endTime = Date.now(); - const totalElapsedSeconds = Math.round((endTime - startTime) / 1000); - - // Update calculate_status for ABC classification - await connection.query(` - INSERT INTO calculate_status (module_name, last_calculation_timestamp) - VALUES ('abc_classification', NOW()) - ON CONFLICT (module_name) DO UPDATE - SET last_calculation_timestamp = NOW() - `); - // Final progress update with guaranteed valid numbers const finalProgress = ensureValidProgress(totalProducts, totalProducts); @@ -586,14 +409,14 @@ async function calculateMetrics() { operation: 'Metrics calculation complete', current: finalProgress.current, total: finalProgress.total, - elapsed: formatElapsedTime(startTime), + elapsed: global.formatElapsedTime(startTime), remaining: '0s', - rate: calculateRate(startTime, finalProgress.current), + rate: global.calculateRate(startTime, finalProgress.current), percentage: '100', timing: { start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(), - elapsed_seconds: totalElapsedSeconds + elapsed_seconds: Math.round((Date.now() - startTime) / 1000) } }); @@ -615,7 +438,7 @@ async function calculateMetrics() { processed_purchase_orders = $4, status = 'completed' WHERE id = $5 - `, [totalElapsedSeconds, + `, [Math.round((Date.now() - startTime) / 1000), finalStats.processedProducts, finalStats.processedOrders, finalStats.processedPurchaseOrders, @@ -624,6 +447,11 @@ async function calculateMetrics() { // Clear progress file on successful completion global.clearProgress(); + return { + success: true, + message: 'Calculation completed successfully', + duration: Math.round((Date.now() - startTime) / 1000) + }; } catch (error) { const endTime = Date.now(); const totalElapsedSeconds = Math.round((endTime - startTime) / 1000); @@ -685,17 +513,38 @@ async function calculateMetrics() { } throw error; } finally { + // Clear the timeout to prevent forced termination + clearTimeout(timeout); + + // Always clean up and release connection if (connection) { - // Ensure temporary tables are cleaned up - await cleanupTemporaryTables(connection); - connection.release(); + try { + await cleanupTemporaryTables(connection); + connection.release(); + } catch (err) { + console.error('Error in final cleanup:', err); + } } - // Close the connection pool when we're done - await closePool(); } } catch (error) { - success = false; - logError(error, 'Error in metrics calculation'); + console.error('Error in metrics calculation', error); + + try { + if (connection) { + await connection.query(` + UPDATE calculate_history + SET + status = 'error', + end_time = NOW(), + duration_seconds = EXTRACT(EPOCH FROM (NOW() - start_time))::INTEGER, + error_message = $1 + WHERE id = $2 + `, [error.message.substring(0, 500), calculateHistoryId]); + } + } catch (updateError) { + console.error('Error updating calculation history:', updateError); + } + throw error; } } diff --git a/inventory-server/scripts/metrics/brand-metrics.js b/inventory-server/scripts/metrics/brand-metrics.js index 2b8bb60..8e7c4dc 100644 --- a/inventory-server/scripts/metrics/brand-metrics.js +++ b/inventory-server/scripts/metrics/brand-metrics.js @@ -231,8 +231,8 @@ async function calculateBrandMetrics(startTime, totalProducts, processedCount = monthly_metrics AS ( SELECT p.brand, - EXTRACT(YEAR FROM o.date) as year, - EXTRACT(MONTH FROM o.date) as month, + EXTRACT(YEAR FROM o.date::timestamp with time zone) as year, + EXTRACT(MONTH FROM o.date::timestamp with time zone) as month, COUNT(DISTINCT p.valid_pid) as product_count, COUNT(DISTINCT p.active_pid) as active_products, SUM(p.valid_stock) as total_stock_units, @@ -257,7 +257,7 @@ async function calculateBrandMetrics(startTime, totalProducts, processedCount = FROM filtered_products p LEFT JOIN orders o ON p.pid = o.pid AND o.canceled = false WHERE o.date >= CURRENT_DATE - INTERVAL '12 months' - GROUP BY p.brand, EXTRACT(YEAR FROM o.date), EXTRACT(MONTH FROM o.date) + GROUP BY p.brand, EXTRACT(YEAR FROM o.date::timestamp with time zone), EXTRACT(MONTH FROM o.date::timestamp with time zone) ) SELECT * FROM monthly_metrics diff --git a/inventory-server/scripts/metrics/category-metrics.js b/inventory-server/scripts/metrics/category-metrics.js index 0ac71f9..027dd8e 100644 --- a/inventory-server/scripts/metrics/category-metrics.js +++ b/inventory-server/scripts/metrics/category-metrics.js @@ -131,7 +131,7 @@ async function calculateCategoryMetrics(startTime, totalProducts, processedCount AND o.date >= CURRENT_DATE - (COALESCE(tc.calculation_period_days, 30) || ' days')::INTERVAL GROUP BY pc.cat_id ) - UPDATE category_metrics cm + UPDATE category_metrics SET avg_margin = COALESCE(cs.total_margin * 100.0 / NULLIF(cs.total_sales, 0), 0), turnover_rate = CASE @@ -144,10 +144,7 @@ async function calculateCategoryMetrics(startTime, totalProducts, processedCount END, last_calculated_at = NOW() FROM category_sales cs - LEFT JOIN turnover_config tc ON - (tc.category_id = cm.category_id AND tc.vendor IS NULL) OR - (tc.category_id IS NULL AND tc.vendor IS NULL) - WHERE cm.category_id = cs.cat_id + WHERE category_id = cs.cat_id `); processedCount = Math.floor(totalProducts * 0.95); @@ -265,6 +262,36 @@ async function calculateCategoryMetrics(startTime, totalProducts, processedCount WHERE o.canceled = false AND o.date >= CURRENT_DATE - INTERVAL '3 months' GROUP BY pc.cat_id + ), + combined_metrics AS ( + SELECT + COALESCE(cp.cat_id, pp.cat_id) as category_id, + CASE + WHEN pp.revenue = 0 AND COALESCE(cp.revenue, 0) > 0 THEN 100.0 + WHEN pp.revenue = 0 OR cp.revenue IS NULL THEN 0.0 + WHEN ta.trend_slope IS NOT NULL THEN + GREATEST( + -100.0, + LEAST( + (ta.trend_slope / NULLIF(ta.avg_daily_revenue, 0)) * 365 * 100, + 999.99 + ) + ) + ELSE + GREATEST( + -100.0, + LEAST( + ((COALESCE(cp.revenue, 0) - pp.revenue) / + NULLIF(ABS(pp.revenue), 0)) * 100.0, + 999.99 + ) + ) + END as growth_rate, + mc.avg_margin + FROM current_period cp + FULL OUTER JOIN previous_period pp ON cp.cat_id = pp.cat_id + LEFT JOIN trend_analysis ta ON COALESCE(cp.cat_id, pp.cat_id) = ta.cat_id + LEFT JOIN margin_calc mc ON COALESCE(cp.cat_id, pp.cat_id) = mc.cat_id ) UPDATE category_metrics cm SET @@ -292,10 +319,10 @@ async function calculateCategoryMetrics(startTime, totalProducts, processedCount avg_margin = COALESCE(mc.avg_margin, cm.avg_margin), last_calculated_at = NOW() FROM current_period cp - FULL OUTER JOIN previous_period pp ON cm.category_id = pp.cat_id - LEFT JOIN trend_analysis ta ON cm.category_id = ta.cat_id - LEFT JOIN margin_calc mc ON cm.category_id = mc.cat_id - WHERE cm.category_id = cp.cat_id OR cm.category_id = pp.cat_id + FULL OUTER JOIN previous_period pp ON cp.cat_id = pp.cat_id + LEFT JOIN trend_analysis ta ON COALESCE(cp.cat_id, pp.cat_id) = ta.cat_id + LEFT JOIN margin_calc mc ON COALESCE(cp.cat_id, pp.cat_id) = mc.cat_id + WHERE cm.category_id = COALESCE(cp.cat_id, pp.cat_id) `); processedCount = Math.floor(totalProducts * 0.97); @@ -337,8 +364,8 @@ async function calculateCategoryMetrics(startTime, totalProducts, processedCount ) SELECT pc.cat_id, - EXTRACT(YEAR FROM o.date) as year, - EXTRACT(MONTH FROM o.date) as month, + EXTRACT(YEAR FROM o.date::timestamp with time zone) as year, + EXTRACT(MONTH FROM o.date::timestamp with time zone) as month, COUNT(DISTINCT p.pid) as product_count, COUNT(DISTINCT CASE WHEN p.visible = true THEN p.pid END) as active_products, SUM(p.stock_quantity * p.cost_price) as total_value, @@ -367,7 +394,7 @@ async function calculateCategoryMetrics(startTime, totalProducts, processedCount JOIN orders o ON p.pid = o.pid WHERE o.canceled = false AND o.date >= CURRENT_DATE - INTERVAL '12 months' - GROUP BY pc.cat_id, EXTRACT(YEAR FROM o.date), EXTRACT(MONTH FROM o.date) + GROUP BY pc.cat_id, EXTRACT(YEAR FROM o.date::timestamp with time zone), EXTRACT(MONTH FROM o.date::timestamp with time zone) ON CONFLICT (category_id, year, month) DO UPDATE SET product_count = EXCLUDED.product_count, diff --git a/inventory-server/scripts/metrics/financial-metrics.js b/inventory-server/scripts/metrics/financial-metrics.js index e12c72e..1cba3ff 100644 --- a/inventory-server/scripts/metrics/financial-metrics.js +++ b/inventory-server/scripts/metrics/financial-metrics.js @@ -67,7 +67,7 @@ async function calculateFinancialMetrics(startTime, totalProducts, processedCoun SUM(o.quantity * (o.price - p.cost_price)) as gross_profit, MIN(o.date) as first_sale_date, MAX(o.date) as last_sale_date, - EXTRACT(DAY FROM (MAX(o.date) - MIN(o.date))) + 1 as calculation_period_days, + EXTRACT(DAY FROM (MAX(o.date)::timestamp with time zone - MIN(o.date)::timestamp with time zone)) + 1 as calculation_period_days, COUNT(DISTINCT DATE(o.date)) as active_days FROM products p LEFT JOIN orders o ON p.pid = o.pid @@ -120,8 +120,8 @@ async function calculateFinancialMetrics(startTime, totalProducts, processedCoun WITH monthly_financials AS ( SELECT p.pid, - EXTRACT(YEAR FROM o.date) as year, - EXTRACT(MONTH FROM o.date) as month, + EXTRACT(YEAR FROM o.date::timestamp with time zone) as year, + EXTRACT(MONTH FROM o.date::timestamp with time zone) as month, p.cost_price * p.stock_quantity as inventory_value, SUM(o.quantity * (o.price - p.cost_price)) as gross_profit, COUNT(DISTINCT DATE(o.date)) as active_days, @@ -130,7 +130,7 @@ async function calculateFinancialMetrics(startTime, totalProducts, processedCoun FROM products p LEFT JOIN orders o ON p.pid = o.pid WHERE o.canceled = false - GROUP BY p.pid, EXTRACT(YEAR FROM o.date), EXTRACT(MONTH FROM o.date), p.cost_price, p.stock_quantity + GROUP BY p.pid, EXTRACT(YEAR FROM o.date::timestamp with time zone), EXTRACT(MONTH FROM o.date::timestamp with time zone), p.cost_price, p.stock_quantity ) UPDATE product_time_aggregates pta SET diff --git a/inventory-server/scripts/metrics/product-metrics.js b/inventory-server/scripts/metrics/product-metrics.js index f5b4920..18f703c 100644 --- a/inventory-server/scripts/metrics/product-metrics.js +++ b/inventory-server/scripts/metrics/product-metrics.js @@ -10,12 +10,13 @@ function sanitizeValue(value) { } async function calculateProductMetrics(startTime, totalProducts, processedCount = 0, isCancelled = false) { - const connection = await getConnection(); + let connection; let success = false; let processedOrders = 0; const BATCH_SIZE = 5000; try { + connection = await getConnection(); // Skip flags are inherited from the parent scope const SKIP_PRODUCT_BASE_METRICS = 0; const SKIP_PRODUCT_TIME_AGGREGATES = 0; @@ -147,33 +148,58 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount GROUP BY p.pid `); - // Populate temp_purchase_metrics - await connection.query(` - INSERT INTO temp_purchase_metrics - SELECT - p.pid, - AVG( - CASE - WHEN po.received_date IS NOT NULL AND po.date IS NOT NULL - THEN EXTRACT(EPOCH FROM (po.received_date::timestamp with time zone - po.date::timestamp with time zone)) / 86400.0 - ELSE NULL - END - ) as avg_lead_time_days, - MAX(po.date) as last_purchase_date, - MIN(po.received_date) as first_received_date, - MAX(po.received_date) as last_received_date - FROM products p - LEFT JOIN purchase_orders po ON p.pid = po.pid - AND po.received_date IS NOT NULL - AND po.date >= CURRENT_DATE - INTERVAL '365 days' - GROUP BY p.pid - `); + // Populate temp_purchase_metrics with timeout protection + await Promise.race([ + connection.query(` + INSERT INTO temp_purchase_metrics + SELECT + p.pid, + AVG( + CASE + WHEN po.received_date IS NOT NULL AND po.date IS NOT NULL + THEN EXTRACT(EPOCH FROM (po.received_date::timestamp with time zone - po.date::timestamp with time zone)) / 86400.0 + ELSE NULL + END + ) as avg_lead_time_days, + MAX(po.date) as last_purchase_date, + MIN(po.received_date) as first_received_date, + MAX(po.received_date) as last_received_date + FROM products p + LEFT JOIN purchase_orders po ON p.pid = po.pid + AND po.received_date IS NOT NULL + AND po.date IS NOT NULL + AND po.date >= CURRENT_DATE - INTERVAL '365 days' + GROUP BY p.pid + `), + new Promise((_, reject) => + setTimeout(() => reject(new Error('Timeout: temp_purchase_metrics query took too long')), 60000) + ) + ]).catch(async (err) => { + logError(err, 'Error populating temp_purchase_metrics, continuing with empty table'); + // Create an empty fallback to continue processing + await connection.query(` + INSERT INTO temp_purchase_metrics + SELECT + p.pid, + 30.0 as avg_lead_time_days, + NULL as last_purchase_date, + NULL as first_received_date, + NULL as last_received_date + FROM products p + LEFT JOIN temp_purchase_metrics tpm ON p.pid = tpm.pid + WHERE tpm.pid IS NULL + `); + }); // Process updates in batches let lastPid = 0; - while (true) { + let batchCount = 0; + const MAX_BATCHES = 1000; // Safety limit for number of batches to prevent infinite loops + + while (batchCount < MAX_BATCHES) { if (isCancelled) break; - + + batchCount++; const batch = await connection.query( 'SELECT pid FROM products WHERE pid > $1 ORDER BY pid LIMIT $2', [lastPid, BATCH_SIZE] @@ -181,6 +207,7 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount if (batch.rows.length === 0) break; + // Process the entire batch in a single efficient query await connection.query(` UPDATE product_metrics pm SET @@ -241,6 +268,7 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount LEFT JOIN temp_sales_metrics sm ON p.pid = sm.pid LEFT JOIN temp_purchase_metrics lm ON p.pid = lm.pid WHERE p.pid = ANY($11::bigint[]) + AND pm.pid = p.pid `, [ defaultThresholds.low_stock_threshold, @@ -254,8 +282,7 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount defaultThresholds.overstock_days, defaultThresholds.overstock_days, batch.rows.map(row => row.pid) - ] - ); + ]); lastPid = batch.rows[batch.rows.length - 1].pid; processedCount += batch.rows.length; @@ -277,54 +304,59 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount }); } - // Calculate forecast accuracy and bias in batches - lastPid = 0; - while (true) { - if (isCancelled) break; - - const batch = await connection.query( - 'SELECT pid FROM products WHERE pid > $1 ORDER BY pid LIMIT $2', - [lastPid, BATCH_SIZE] - ); - - if (batch.rows.length === 0) break; - - await connection.query(` - UPDATE product_metrics pm - SET - forecast_accuracy = GREATEST(0, 100 - LEAST(fa.avg_forecast_error, 100)), - forecast_bias = GREATEST(-100, LEAST(fa.avg_forecast_bias, 100)), - last_forecast_date = fa.last_forecast_date, - last_calculated_at = NOW() - FROM ( - SELECT - sf.pid, - AVG(CASE - WHEN o.quantity > 0 - THEN ABS(sf.forecast_quantity - o.quantity) / o.quantity * 100 - ELSE 100 - END) as avg_forecast_error, - AVG(CASE - WHEN o.quantity > 0 - THEN (sf.forecast_quantity - o.quantity) / o.quantity * 100 - ELSE 0 - END) as avg_forecast_bias, - MAX(sf.forecast_date) as last_forecast_date - FROM sales_forecasts sf - JOIN orders o ON sf.pid = o.pid - AND DATE(o.date) = sf.forecast_date - WHERE o.canceled = false - AND sf.forecast_date >= CURRENT_DATE - INTERVAL '90 days' - AND sf.pid = ANY($1::bigint[]) - GROUP BY sf.pid - ) fa - WHERE pm.pid = fa.pid - `, [batch.rows.map(row => row.pid)]); - - lastPid = batch.rows[batch.rows.length - 1].pid; + // Add safety check if the loop processed MAX_BATCHES + if (batchCount >= MAX_BATCHES) { + logError(new Error(`Reached maximum batch count (${MAX_BATCHES}). Process may have entered an infinite loop.`), 'Batch processing safety limit reached'); } } + // Calculate forecast accuracy and bias in batches + lastPid = 0; + while (true) { + if (isCancelled) break; + + const batch = await connection.query( + 'SELECT pid FROM products WHERE pid > $1 ORDER BY pid LIMIT $2', + [lastPid, BATCH_SIZE] + ); + + if (batch.rows.length === 0) break; + + await connection.query(` + UPDATE product_metrics pm + SET + forecast_accuracy = GREATEST(0, 100 - LEAST(fa.avg_forecast_error, 100)), + forecast_bias = GREATEST(-100, LEAST(fa.avg_forecast_bias, 100)), + last_forecast_date = fa.last_forecast_date, + last_calculated_at = NOW() + FROM ( + SELECT + sf.pid, + AVG(CASE + WHEN o.quantity > 0 + THEN ABS(sf.forecast_quantity - o.quantity) / o.quantity * 100 + ELSE 100 + END) as avg_forecast_error, + AVG(CASE + WHEN o.quantity > 0 + THEN (sf.forecast_quantity - o.quantity) / o.quantity * 100 + ELSE 0 + END) as avg_forecast_bias, + MAX(sf.forecast_date) as last_forecast_date + FROM sales_forecasts sf + JOIN orders o ON sf.pid = o.pid + AND DATE(o.date) = sf.forecast_date + WHERE o.canceled = false + AND sf.forecast_date >= CURRENT_DATE - INTERVAL '90 days' + AND sf.pid = ANY($1::bigint[]) + GROUP BY sf.pid + ) fa + WHERE pm.pid = fa.pid + `, [batch.rows.map(row => row.pid)]); + + lastPid = batch.rows[batch.rows.length - 1].pid; + } + // Calculate product time aggregates if (!SKIP_PRODUCT_TIME_AGGREGATES) { outputProgress({ @@ -360,11 +392,11 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount ) SELECT p.pid, - EXTRACT(YEAR FROM o.date) as year, - EXTRACT(MONTH FROM o.date) as month, + EXTRACT(YEAR FROM o.date::timestamp with time zone) as year, + EXTRACT(MONTH FROM o.date::timestamp with time zone) as month, SUM(o.quantity) as total_quantity_sold, - SUM(o.quantity * o.price) as total_revenue, - SUM(o.quantity * p.cost_price) as total_cost, + SUM(o.price * o.quantity) as total_revenue, + SUM(p.cost_price * o.quantity) as total_cost, COUNT(DISTINCT o.order_number) as order_count, AVG(o.price) as avg_price, CASE @@ -381,7 +413,7 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount FROM products p LEFT JOIN orders o ON p.pid = o.pid AND o.canceled = false WHERE o.date >= CURRENT_DATE - INTERVAL '12 months' - GROUP BY p.pid, EXTRACT(YEAR FROM o.date), EXTRACT(MONTH FROM o.date) + GROUP BY p.pid, EXTRACT(YEAR FROM o.date::timestamp with time zone), EXTRACT(MONTH FROM o.date::timestamp with time zone) ON CONFLICT (pid, year, month) DO UPDATE SET total_quantity_sold = EXCLUDED.total_quantity_sold, @@ -500,17 +532,18 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount // Get total count for percentage calculation const rankingCount = await connection.query('SELECT MAX(rank_num) as total_count FROM temp_revenue_ranks'); const totalCount = parseInt(rankingCount.rows[0].total_count) || 1; - const max_rank = totalCount; // Process updates in batches let abcProcessedCount = 0; const batchSize = 5000; + const maxPid = await connection.query('SELECT MAX(pid) as max_pid FROM products'); + const maxProductId = parseInt(maxPid.rows[0].max_pid); - while (true) { + while (abcProcessedCount < maxProductId) { if (isCancelled) return { processedProducts: processedCount, processedOrders, - processedPurchaseOrders: 0, // This module doesn't process POs + processedPurchaseOrders: 0, success }; @@ -519,16 +552,18 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount SELECT pm.pid FROM product_metrics pm LEFT JOIN temp_revenue_ranks tr ON pm.pid = tr.pid - WHERE pm.abc_class IS NULL - OR pm.abc_class != - CASE - WHEN tr.pid IS NULL THEN 'C' - WHEN tr.percentile <= $1 THEN 'A' - WHEN tr.percentile <= $2 THEN 'B' - ELSE 'C' - END - LIMIT $3 - `, [abcThresholds.a_threshold, abcThresholds.b_threshold, batchSize]); + WHERE pm.pid > $1 + AND (pm.abc_class IS NULL + OR pm.abc_class != + CASE + WHEN tr.pid IS NULL THEN 'C' + WHEN tr.percentile <= $2 THEN 'A' + WHEN tr.percentile <= $3 THEN 'B' + ELSE 'C' + END) + ORDER BY pm.pid + LIMIT $4 + `, [abcProcessedCount, abcThresholds.a_threshold, abcThresholds.b_threshold, batchSize]); if (pids.rows.length === 0) break; @@ -581,10 +616,10 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount WHERE pm.pid = sales.pid `, [pidValues]); - abcProcessedCount += pids.rows.length; + abcProcessedCount = pids.rows[pids.rows.length - 1].pid; - // Calculate progress proportionally to batch size - processedCount = Math.floor(totalProducts * (0.60 + (abcProcessedCount / totalProducts) * 0.2)); + // Calculate progress proportionally to total products + processedCount = Math.floor(totalProducts * (0.60 + (abcProcessedCount / maxProductId) * 0.2)); outputProgress({ status: 'running', @@ -626,7 +661,16 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount logError(error, 'Error calculating product metrics'); throw error; } finally { + // Always clean up temporary tables, even if an error occurred if (connection) { + try { + await connection.query('DROP TABLE IF EXISTS temp_sales_metrics'); + await connection.query('DROP TABLE IF EXISTS temp_purchase_metrics'); + } catch (err) { + console.error('Error cleaning up temporary tables:', err); + } + + // Make sure to release the connection connection.release(); } } diff --git a/inventory-server/scripts/metrics/time-aggregates.js b/inventory-server/scripts/metrics/time-aggregates.js index 81bfc4d..492a03e 100644 --- a/inventory-server/scripts/metrics/time-aggregates.js +++ b/inventory-server/scripts/metrics/time-aggregates.js @@ -75,8 +75,8 @@ async function calculateTimeAggregates(startTime, totalProducts, processedCount WITH monthly_sales AS ( SELECT o.pid, - EXTRACT(YEAR FROM o.date) as year, - EXTRACT(MONTH FROM o.date) as month, + EXTRACT(YEAR FROM o.date::timestamp with time zone) as year, + EXTRACT(MONTH FROM o.date::timestamp with time zone) as month, SUM(o.quantity) as total_quantity_sold, SUM((o.price - COALESCE(o.discount, 0)) * o.quantity) as total_revenue, SUM(COALESCE(p.cost_price, 0) * o.quantity) as total_cost, @@ -93,17 +93,17 @@ async function calculateTimeAggregates(startTime, totalProducts, processedCount FROM orders o JOIN products p ON o.pid = p.pid WHERE o.canceled = false - GROUP BY o.pid, EXTRACT(YEAR FROM o.date), EXTRACT(MONTH FROM o.date), p.cost_price, p.stock_quantity + GROUP BY o.pid, EXTRACT(YEAR FROM o.date::timestamp with time zone), EXTRACT(MONTH FROM o.date::timestamp with time zone), p.cost_price, p.stock_quantity ), monthly_stock AS ( SELECT pid, - EXTRACT(YEAR FROM date) as year, - EXTRACT(MONTH FROM date) as month, + EXTRACT(YEAR FROM date::timestamp with time zone) as year, + EXTRACT(MONTH FROM date::timestamp with time zone) as month, SUM(received) as stock_received, SUM(ordered) as stock_ordered FROM purchase_orders - GROUP BY pid, EXTRACT(YEAR FROM date), EXTRACT(MONTH FROM date) + GROUP BY pid, EXTRACT(YEAR FROM date::timestamp with time zone), EXTRACT(MONTH FROM date::timestamp with time zone) ), base_products AS ( SELECT @@ -242,15 +242,15 @@ async function calculateTimeAggregates(startTime, totalProducts, processedCount FROM ( SELECT p.pid, - EXTRACT(YEAR FROM o.date) as year, - EXTRACT(MONTH FROM o.date) as month, + EXTRACT(YEAR FROM o.date::timestamp with time zone) as year, + EXTRACT(MONTH FROM o.date::timestamp with time zone) as month, p.cost_price * p.stock_quantity as inventory_value, SUM(o.quantity * (o.price - p.cost_price)) as gross_profit, COUNT(DISTINCT DATE(o.date)) as active_days FROM products p LEFT JOIN orders o ON p.pid = o.pid WHERE o.canceled = false - GROUP BY p.pid, EXTRACT(YEAR FROM o.date), EXTRACT(MONTH FROM o.date), p.cost_price, p.stock_quantity + GROUP BY p.pid, EXTRACT(YEAR FROM o.date::timestamp with time zone), EXTRACT(MONTH FROM o.date::timestamp with time zone), p.cost_price, p.stock_quantity ) fin WHERE pta.pid = fin.pid AND pta.year = fin.year diff --git a/inventory-server/scripts/metrics/vendor-metrics.js b/inventory-server/scripts/metrics/vendor-metrics.js index fb123ae..b1aa08b 100644 --- a/inventory-server/scripts/metrics/vendor-metrics.js +++ b/inventory-server/scripts/metrics/vendor-metrics.js @@ -141,7 +141,7 @@ async function calculateVendorMetrics(startTime, totalProducts, processedCount = WHEN po.receiving_status = 40 AND po.received_date IS NOT NULL AND po.date IS NOT NULL - THEN EXTRACT(EPOCH FROM (po.received_date - po.date)) / 86400.0 + THEN EXTRACT(EPOCH FROM (po.received_date::timestamp with time zone - po.date::timestamp with time zone)) / 86400.0 ELSE NULL END) as avg_lead_time_days, SUM(po.ordered * po.po_cost_price) as total_purchase_value @@ -249,8 +249,8 @@ async function calculateVendorMetrics(startTime, totalProducts, processedCount = WITH monthly_orders AS ( SELECT p.vendor, - EXTRACT(YEAR FROM o.date) as year, - EXTRACT(MONTH FROM o.date) as month, + EXTRACT(YEAR FROM o.date::timestamp with time zone) as year, + EXTRACT(MONTH FROM o.date::timestamp with time zone) as month, COUNT(DISTINCT o.id) as total_orders, SUM(o.quantity * o.price) as total_revenue, SUM(o.quantity * (o.price - p.cost_price)) as total_margin @@ -259,13 +259,13 @@ async function calculateVendorMetrics(startTime, totalProducts, processedCount = WHERE o.canceled = false AND o.date >= CURRENT_DATE - INTERVAL '12 months' AND p.vendor IS NOT NULL - GROUP BY p.vendor, EXTRACT(YEAR FROM o.date), EXTRACT(MONTH FROM o.date) + GROUP BY p.vendor, EXTRACT(YEAR FROM o.date::timestamp with time zone), EXTRACT(MONTH FROM o.date::timestamp with time zone) ), monthly_po AS ( SELECT p.vendor, - EXTRACT(YEAR FROM po.date) as year, - EXTRACT(MONTH FROM po.date) as month, + EXTRACT(YEAR FROM po.date::timestamp with time zone) as year, + EXTRACT(MONTH FROM po.date::timestamp with time zone) as month, COUNT(DISTINCT po.id) as total_po, COUNT(DISTINCT CASE WHEN po.receiving_status = 40 AND po.received_date > po.expected_date @@ -275,7 +275,7 @@ async function calculateVendorMetrics(startTime, totalProducts, processedCount = WHEN po.receiving_status = 40 AND po.received_date IS NOT NULL AND po.date IS NOT NULL - THEN EXTRACT(EPOCH FROM (po.received_date - po.date)) / 86400.0 + THEN EXTRACT(EPOCH FROM (po.received_date::timestamp with time zone - po.date::timestamp with time zone)) / 86400.0 ELSE NULL END) as avg_lead_time_days, SUM(po.ordered * po.po_cost_price) as total_purchase_value @@ -283,7 +283,7 @@ async function calculateVendorMetrics(startTime, totalProducts, processedCount = JOIN purchase_orders po ON p.pid = po.pid WHERE po.date >= CURRENT_DATE - INTERVAL '12 months' AND p.vendor IS NOT NULL - GROUP BY p.vendor, EXTRACT(YEAR FROM po.date), EXTRACT(MONTH FROM po.date) + GROUP BY p.vendor, EXTRACT(YEAR FROM po.date::timestamp with time zone), EXTRACT(MONTH FROM po.date::timestamp with time zone) ) SELECT mo.vendor,