diff --git a/inventory-server/db/metrics-schema-new.sql b/inventory-server/db/metrics-schema-new.sql index 46866c9..a7f852f 100644 --- a/inventory-server/db/metrics-schema-new.sql +++ b/inventory-server/db/metrics-schema-new.sql @@ -107,16 +107,16 @@ CREATE TABLE public.product_metrics ( avg_ros_30d NUMERIC(10, 4), -- profit_30d / sales_30d avg_sales_per_day_30d NUMERIC(10, 2), -- sales_30d / 30.0 avg_sales_per_month_30d NUMERIC(10, 2), -- sales_30d (assuming 30d = 1 month for this metric) - margin_30d NUMERIC(5, 2), -- (profit_30d / revenue_30d) * 100 - markup_30d NUMERIC(5, 2), -- (profit_30d / cogs_30d) * 100 + margin_30d NUMERIC(8, 2), -- (profit_30d / revenue_30d) * 100 + markup_30d NUMERIC(8, 2), -- (profit_30d / cogs_30d) * 100 gmroi_30d NUMERIC(10, 2), -- profit_30d / avg_stock_cost_30d stockturn_30d NUMERIC(10, 2), -- sales_30d / avg_stock_units_30d - return_rate_30d NUMERIC(5, 2), -- returns_units_30d / (sales_30d + returns_units_30d) * 100 - discount_rate_30d NUMERIC(5, 2), -- discounts_30d / gross_revenue_30d * 100 - stockout_rate_30d NUMERIC(5, 2), -- stockout_days_30d / 30.0 * 100 + return_rate_30d NUMERIC(8, 2), -- returns_units_30d / (sales_30d + returns_units_30d) * 100 + discount_rate_30d NUMERIC(8, 2), -- discounts_30d / gross_revenue_30d * 100 + stockout_rate_30d NUMERIC(8, 2), -- stockout_days_30d / 30.0 * 100 markdown_30d NUMERIC(14, 4), -- gross_regular_revenue_30d - gross_revenue_30d - markdown_rate_30d NUMERIC(5, 2), -- markdown_30d / gross_regular_revenue_30d * 100 - sell_through_30d NUMERIC(5, 2), -- sales_30d / (current_stock + sales_30d) * 100 + markdown_rate_30d NUMERIC(8, 2), -- markdown_30d / gross_regular_revenue_30d * 100 + sell_through_30d NUMERIC(8, 2), -- sales_30d / (current_stock + sales_30d) * 100 avg_lead_time_days INT, -- Calculated Periodically from purchase_orders -- Forecasting & Replenishment (Refreshed Hourly) diff --git a/inventory-server/scripts/calculate-metrics.js b/inventory-server/old/calculate-metrics.js similarity index 100% rename from inventory-server/scripts/calculate-metrics.js rename to inventory-server/old/calculate-metrics.js diff --git a/inventory-server/scripts/metrics/brand-metrics.js b/inventory-server/old/metrics/brand-metrics.js similarity index 100% rename from inventory-server/scripts/metrics/brand-metrics.js rename to inventory-server/old/metrics/brand-metrics.js diff --git a/inventory-server/scripts/metrics/category-metrics.js b/inventory-server/old/metrics/category-metrics.js similarity index 100% rename from inventory-server/scripts/metrics/category-metrics.js rename to inventory-server/old/metrics/category-metrics.js diff --git a/inventory-server/scripts/metrics/financial-metrics.js b/inventory-server/old/metrics/financial-metrics.js similarity index 100% rename from inventory-server/scripts/metrics/financial-metrics.js rename to inventory-server/old/metrics/financial-metrics.js diff --git a/inventory-server/scripts/metrics/product-metrics.js b/inventory-server/old/metrics/product-metrics.js similarity index 100% rename from inventory-server/scripts/metrics/product-metrics.js rename to inventory-server/old/metrics/product-metrics.js diff --git a/inventory-server/scripts/metrics/sales-forecasts.js b/inventory-server/old/metrics/sales-forecasts.js similarity index 100% rename from inventory-server/scripts/metrics/sales-forecasts.js rename to inventory-server/old/metrics/sales-forecasts.js diff --git a/inventory-server/scripts/metrics/time-aggregates.js b/inventory-server/old/metrics/time-aggregates.js similarity index 100% rename from inventory-server/scripts/metrics/time-aggregates.js rename to inventory-server/old/metrics/time-aggregates.js diff --git a/inventory-server/scripts/metrics/utils/db.js b/inventory-server/old/metrics/utils/db.js similarity index 100% rename from inventory-server/scripts/metrics/utils/db.js rename to inventory-server/old/metrics/utils/db.js diff --git a/inventory-server/scripts/metrics/utils/progress.js b/inventory-server/old/metrics/utils/progress.js similarity index 100% rename from inventory-server/scripts/metrics/utils/progress.js rename to inventory-server/old/metrics/utils/progress.js diff --git a/inventory-server/scripts/metrics/vendor-metrics.js b/inventory-server/old/metrics/vendor-metrics.js similarity index 100% rename from inventory-server/scripts/metrics/vendor-metrics.js rename to inventory-server/old/metrics/vendor-metrics.js diff --git a/inventory-server/scripts/reset-metrics.js b/inventory-server/old/reset-metrics.js similarity index 100% rename from inventory-server/scripts/reset-metrics.js rename to inventory-server/old/reset-metrics.js diff --git a/inventory-server/scripts/update-order-costs.js b/inventory-server/old/update-order-costs.js similarity index 98% rename from inventory-server/scripts/update-order-costs.js rename to inventory-server/old/update-order-costs.js index 135af46..3cf6674 100644 --- a/inventory-server/scripts/update-order-costs.js +++ b/inventory-server/old/update-order-costs.js @@ -5,7 +5,7 @@ const dotenv = require("dotenv"); const path = require("path"); const fs = require("fs"); -const { setupConnections, closeConnections } = require('./import/utils'); +const { setupConnections, closeConnections } = require('../scripts/import/utils'); const { outputProgress, formatElapsedTime } = require('./metrics/utils/progress'); dotenv.config({ path: path.join(__dirname, "../.env") }); diff --git a/inventory-server/package-lock.json b/inventory-server/package-lock.json index 97e0479..e44b10b 100755 --- a/inventory-server/package-lock.json +++ b/inventory-server/package-lock.json @@ -12,6 +12,7 @@ "@types/diff": "^7.0.1", "axios": "^1.8.1", "bcrypt": "^5.1.1", + "commander": "^13.1.0", "cors": "^2.8.5", "csv-parse": "^5.6.0", "diff": "^7.0.0", @@ -20,7 +21,7 @@ "multer": "^1.4.5-lts.1", "mysql2": "^3.12.0", "openai": "^4.85.3", - "pg": "^8.13.3", + "pg": "^8.14.1", "pm2": "^5.3.0", "ssh2": "^1.16.0", "uuid": "^9.0.1" @@ -922,10 +923,13 @@ } }, "node_modules/commander": { - "version": "2.15.1", - "resolved": "https://registry.npmjs.org/commander/-/commander-2.15.1.tgz", - "integrity": "sha512-VlfT9F3V0v+jr4yxPc5gg9s62/fIVWsd2Bk2iD435um1NlGMYdVCq+MjcXnhYq2icNOizHr1kK+5TI6H0Hy0ag==", - "license": "MIT" + "version": "13.1.0", + "resolved": "https://registry.npmjs.org/commander/-/commander-13.1.0.tgz", + "integrity": "sha512-/rFeCpNJQbhSZjGVwO9RFV3xPqbnERS8MmIQzCtD/zl6gpJuV/bMLuN92oG3F7d8oDEHHRrujSXNUr8fpjntKw==", + "license": "MIT", + "engines": { + "node": ">=18" + } }, "node_modules/concat-map": { "version": "0.0.1", @@ -2739,14 +2743,14 @@ "license": "MIT" }, "node_modules/pg": { - "version": "8.13.3", - "resolved": "https://registry.npmjs.org/pg/-/pg-8.13.3.tgz", - "integrity": "sha512-P6tPt9jXbL9HVu/SSRERNYaYG++MjnscnegFh9pPHihfoBSujsrka0hyuymMzeJKFWrcG8wvCKy8rCe8e5nDUQ==", + "version": "8.14.1", + "resolved": "https://registry.npmjs.org/pg/-/pg-8.14.1.tgz", + "integrity": "sha512-0TdbqfjwIun9Fm/r89oB7RFQ0bLgduAhiIqIXOsyKoiC/L54DbuAAzIEN/9Op0f1Po9X7iCPXGoa/Ah+2aI8Xw==", "license": "MIT", "dependencies": { "pg-connection-string": "^2.7.0", - "pg-pool": "^3.7.1", - "pg-protocol": "^1.7.1", + "pg-pool": "^3.8.0", + "pg-protocol": "^1.8.0", "pg-types": "^2.1.0", "pgpass": "1.x" }, @@ -2788,18 +2792,18 @@ } }, "node_modules/pg-pool": { - "version": "3.7.1", - "resolved": "https://registry.npmjs.org/pg-pool/-/pg-pool-3.7.1.tgz", - "integrity": "sha512-xIOsFoh7Vdhojas6q3596mXFsR8nwBQBXX5JiV7p9buEVAGqYL4yFzclON5P9vFrpu1u7Zwl2oriyDa89n0wbw==", + "version": "3.8.0", + "resolved": "https://registry.npmjs.org/pg-pool/-/pg-pool-3.8.0.tgz", + "integrity": "sha512-VBw3jiVm6ZOdLBTIcXLNdSotb6Iy3uOCwDGFAksZCXmi10nyRvnP2v3jl4d+IsLYRyXf6o9hIm/ZtUzlByNUdw==", "license": "MIT", "peerDependencies": { "pg": ">=8.0" } }, "node_modules/pg-protocol": { - "version": "1.7.1", - "resolved": "https://registry.npmjs.org/pg-protocol/-/pg-protocol-1.7.1.tgz", - "integrity": "sha512-gjTHWGYWsEgy9MsY0Gp6ZJxV24IjDqdpTW7Eh0x+WfJLFsm/TJx1MzL6T0D88mBvkpxotCQ6TwW6N+Kko7lhgQ==", + "version": "1.8.0", + "resolved": "https://registry.npmjs.org/pg-protocol/-/pg-protocol-1.8.0.tgz", + "integrity": "sha512-jvuYlEkL03NRvOoyoRktBK7+qU5kOvlAwvmrH8sr3wbLrOdVWsRxQfz8mMy9sZFsqJ1hEWNfdWKI4SAmoL+j7g==", "license": "MIT" }, "node_modules/pg-types": { @@ -3047,6 +3051,12 @@ "node": ">=8" } }, + "node_modules/pm2/node_modules/commander": { + "version": "2.15.1", + "resolved": "https://registry.npmjs.org/commander/-/commander-2.15.1.tgz", + "integrity": "sha512-VlfT9F3V0v+jr4yxPc5gg9s62/fIVWsd2Bk2iD435um1NlGMYdVCq+MjcXnhYq2icNOizHr1kK+5TI6H0Hy0ag==", + "license": "MIT" + }, "node_modules/pm2/node_modules/debug": { "version": "4.4.0", "resolved": "https://registry.npmjs.org/debug/-/debug-4.4.0.tgz", diff --git a/inventory-server/package.json b/inventory-server/package.json index ac667aa..dc359e7 100755 --- a/inventory-server/package.json +++ b/inventory-server/package.json @@ -21,6 +21,7 @@ "@types/diff": "^7.0.1", "axios": "^1.8.1", "bcrypt": "^5.1.1", + "commander": "^13.1.0", "cors": "^2.8.5", "csv-parse": "^5.6.0", "diff": "^7.0.0", @@ -29,7 +30,7 @@ "multer": "^1.4.5-lts.1", "mysql2": "^3.12.0", "openai": "^4.85.3", - "pg": "^8.13.3", + "pg": "^8.14.1", "pm2": "^5.3.0", "ssh2": "^1.16.0", "uuid": "^9.0.1" diff --git a/inventory-server/scripts/calculate-metrics-new.js b/inventory-server/scripts/calculate-metrics-new.js new file mode 100644 index 0000000..7a9fa0a --- /dev/null +++ b/inventory-server/scripts/calculate-metrics-new.js @@ -0,0 +1,608 @@ +// run-all-updates.js +const path = require('path'); +const fs = require('fs'); +const { Pool } = require('pg'); // Assuming you use 'pg' + +// --- Configuration --- +// Toggle these constants to enable/disable specific steps for testing +const RUN_DAILY_SNAPSHOTS = true; +const RUN_PRODUCT_METRICS = true; +const RUN_PERIODIC_METRICS = true; + +// Maximum execution time for the entire sequence (e.g., 90 minutes) +const MAX_EXECUTION_TIME_TOTAL = 90 * 60 * 1000; +// Maximum execution time per individual SQL step (e.g., 30 minutes) +const MAX_EXECUTION_TIME_PER_STEP = 30 * 60 * 1000; +// Query cancellation timeout +const CANCEL_QUERY_AFTER_SECONDS = 5; +// --- End Configuration --- + +// Change working directory to script directory +process.chdir(path.dirname(__filename)); + +// Log script path for debugging +console.log('Script running from:', __dirname); + +// Try to load environment variables from multiple locations +const envPaths = [ + path.resolve(__dirname, '../..', '.env'), // Two levels up (inventory/.env) + path.resolve(__dirname, '..', '.env'), // One level up (inventory-server/.env) + path.resolve(__dirname, '.env'), // Same directory + '/var/www/html/inventory/.env' // Server absolute path +]; + +let envLoaded = false; +for (const envPath of envPaths) { + if (fs.existsSync(envPath)) { + console.log(`Loading environment from: ${envPath}`); + require('dotenv').config({ path: envPath }); + envLoaded = true; + break; + } +} + +if (!envLoaded) { + console.warn('WARNING: Could not find .env file in any of the expected locations.'); + console.warn('Checked paths:', envPaths); +} + +// --- Database Setup --- +// Make sure we have the required DB credentials +if (!process.env.DB_HOST && !process.env.DATABASE_URL) { + console.error('WARNING: Neither DB_HOST nor DATABASE_URL environment variables found'); +} + +// Only validate individual parameters if not using connection string +if (!process.env.DATABASE_URL) { + if (!process.env.DB_USER) console.error('WARNING: DB_USER environment variable is missing'); + if (!process.env.DB_NAME) console.error('WARNING: DB_NAME environment variable is missing'); + + // Password must be a string for PostgreSQL SCRAM authentication + if (!process.env.DB_PASSWORD || typeof process.env.DB_PASSWORD !== 'string') { + console.error('WARNING: DB_PASSWORD environment variable is missing or not a string'); + } +} + +// Configure database connection to match individual scripts +let dbConfig; + +// Check if a DATABASE_URL exists (common in production environments) +if (process.env.DATABASE_URL && typeof process.env.DATABASE_URL === 'string') { + console.log('Using DATABASE_URL for connection'); + dbConfig = { + connectionString: process.env.DATABASE_URL, + ssl: process.env.DB_SSL === 'true' ? { rejectUnauthorized: false } : false, + // Add performance optimizations + max: 10, // connection pool max size + idleTimeoutMillis: 30000, + connectionTimeoutMillis: 60000, + // Set timeouts for long-running queries + statement_timeout: 1800000, // 30 minutes + query_timeout: 1800000 // 30 minutes + }; +} else { + // Use individual connection parameters + dbConfig = { + host: process.env.DB_HOST, + user: process.env.DB_USER, + password: process.env.DB_PASSWORD, + database: process.env.DB_NAME, + port: process.env.DB_PORT || 5432, + ssl: process.env.DB_SSL === 'true', + // Add performance optimizations + max: 10, // connection pool max size + idleTimeoutMillis: 30000, + connectionTimeoutMillis: 60000, + // Set timeouts for long-running queries + statement_timeout: 1800000, // 30 minutes + query_timeout: 1800000 // 30 minutes + }; +} + +// Try to load from utils DB module as a last resort +try { + if (!process.env.DB_HOST && !process.env.DATABASE_URL) { + console.log('Attempting to load DB config from individual script modules...'); + const dbModule = require('./metrics-new/utils/db'); + if (dbModule && dbModule.dbConfig) { + console.log('Found DB config in individual script module'); + dbConfig = { + ...dbModule.dbConfig, + // Add performance optimizations if not present + max: dbModule.dbConfig.max || 10, + idleTimeoutMillis: dbModule.dbConfig.idleTimeoutMillis || 30000, + connectionTimeoutMillis: dbModule.dbConfig.connectionTimeoutMillis || 60000, + statement_timeout: 1800000, + query_timeout: 1800000 + }; + } + } +} catch (err) { + console.warn('Could not load DB config from individual script modules:', err.message); +} + +// Debug log connection info (without password) +console.log('DB Connection Info:', { + connectionString: dbConfig.connectionString ? 'PROVIDED' : undefined, + host: dbConfig.host, + user: dbConfig.user, + database: dbConfig.database, + port: dbConfig.port, + ssl: dbConfig.ssl ? 'ENABLED' : 'DISABLED', + password: (dbConfig.password || dbConfig.connectionString) ? '****' : 'MISSING' // Only show if credentials exist +}); + +const pool = new Pool(dbConfig); + +const getConnection = () => { + return pool.connect(); +}; + +const closePool = () => { + console.log("Closing database connection pool."); + return pool.end(); +}; + +// --- Progress Utilities --- +// Using functions directly instead of globals +const progressUtils = require('./metrics-new/utils/progress'); // Assuming utils/progress.js exports these + +// --- State & Cancellation --- +let isCancelled = false; +let currentStep = ''; // Track which step is running for cancellation message +let overallStartTime = null; +let mainTimeoutHandle = null; +let stepTimeoutHandle = null; + +async function cancelCalculation(reason = 'cancelled by user') { + if (isCancelled) return; // Prevent multiple cancellations + isCancelled = true; + console.log(`Calculation ${reason}. Attempting to cancel active step: ${currentStep}`); + + // Clear timeouts + if (mainTimeoutHandle) clearTimeout(mainTimeoutHandle); + if (stepTimeoutHandle) clearTimeout(stepTimeoutHandle); + + // Attempt to cancel the long-running query in Postgres + let conn = null; + try { + console.log(`Attempting to cancel queries running longer than ${CANCEL_QUERY_AFTER_SECONDS} seconds...`); + conn = await getConnection(); + const result = await conn.query(` + SELECT pg_cancel_backend(pid) + FROM pg_stat_activity + WHERE query_start < now() - interval '${CANCEL_QUERY_AFTER_SECONDS} seconds' + AND application_name = 'node-metrics-calculator' -- Match specific app name + AND state = 'active' -- Only cancel active queries + AND query NOT LIKE '%pg_cancel_backend%' + AND pid <> pg_backend_pid(); -- Don't cancel self + `); + console.log(`Sent ${result.rowCount} cancellation signal(s).`); + conn.release(); + } catch (err) { + console.error('Error during database query cancellation:', err.message); + if (conn) { + try { conn.release(); } catch (e) { console.error("Error releasing cancellation connection", e); } + } + // Proceed with script termination attempt even if DB cancel fails + } finally { + // Update progress to show cancellation + progressUtils.outputProgress({ + status: 'cancelled', + operation: `Calculation ${reason} during step: ${currentStep}`, + current: 0, // Reset progress indicators + total: 100, + elapsed: overallStartTime ? progressUtils.formatElapsedTime(overallStartTime) : 'N/A', + remaining: null, + rate: 0, + percentage: '0', // Or keep last known percentage? + timing: { + start_time: overallStartTime ? new Date(overallStartTime).toISOString() : 'N/A', + end_time: new Date().toISOString(), + elapsed_seconds: overallStartTime ? Math.round((Date.now() - overallStartTime) / 1000) : 0 + } + }); + } + + // Note: We don't force exit here anymore. We let the main function's error + // handling catch the cancellation error thrown by executeSqlStep or the timeout. + return { + success: true, // Indicates cancellation was initiated + message: `Calculation ${reason}` + }; +} + +// Handle SIGINT (Ctrl+C) and SIGTERM (kill) signals +process.on('SIGINT', () => { + console.log('\nReceived SIGINT (Ctrl+C).'); + cancelCalculation('cancelled by user (SIGINT)'); + // Give cancellation a moment to propagate before force-exiting if needed + setTimeout(() => process.exit(1), 2000); +}); +process.on('SIGTERM', () => { + console.log('Received SIGTERM.'); + cancelCalculation('cancelled by system (SIGTERM)'); + // Give cancellation a moment to propagate before force-exiting if needed + setTimeout(() => process.exit(1), 2000); +}); + +// Add error handlers for uncaught exceptions/rejections +process.on('uncaughtException', (error) => { + console.error('Uncaught Exception:', error); + // Attempt graceful shutdown/logging if possible, then exit + cancelCalculation('failed due to uncaught exception').finally(() => { + closePool().finally(() => process.exit(1)); + }); +}); + +process.on('unhandledRejection', (reason, promise) => { + console.error('Unhandled Rejection at:', promise, 'reason:', reason); + // Attempt graceful shutdown/logging if possible, then exit + cancelCalculation('failed due to unhandled rejection').finally(() => { + closePool().finally(() => process.exit(1)); + }); +}); + + +// --- Core Logic --- + +/** + * Executes a single SQL calculation step. + * @param {object} config - Configuration for the step. + * @param {string} config.name - User-friendly name of the step. + * @param {string} config.sqlFile - Path to the SQL file. + * @param {string} config.historyType - Type identifier for calculate_history. + * @param {string} config.statusModule - Module name for calculate_status. + * @param {object} progress - Progress utility functions. + * @returns {Promise<{success: boolean, message: string, duration: number}>} + */ +async function executeSqlStep(config, progress) { + if (isCancelled) throw new Error(`Calculation skipped step ${config.name} due to prior cancellation.`); + + currentStep = config.name; // Update global state + console.log(`\n--- Starting Step: ${config.name} ---`); + const stepStartTime = Date.now(); + let connection = null; + let calculateHistoryId = null; + + // Set timeout for this specific step + if (stepTimeoutHandle) clearTimeout(stepTimeoutHandle); // Clear previous step's timeout + stepTimeoutHandle = setTimeout(() => { + // Don't exit directly, throw an error to be caught by the main loop + const timeoutError = new Error(`Step "${config.name}" timed out after ${MAX_EXECUTION_TIME_PER_STEP / 1000} seconds.`); + cancelCalculation(`timed out during step: ${config.name}`); // Initiate cancellation process + // The error will likely be thrown before cancelCalculation fully completes, + // but cancelCalculation attempts to stop the query. + // The main catch block will handle cleanup. + }, MAX_EXECUTION_TIME_PER_STEP); + + + try { + // 1. Read SQL File + const sqlFilePath = path.resolve(__dirname, config.sqlFile); + if (!fs.existsSync(sqlFilePath)) { + throw new Error(`SQL file not found: ${sqlFilePath}`); + } + const sqlQuery = fs.readFileSync(sqlFilePath, 'utf8'); + console.log(`Read SQL file: ${config.sqlFile}`); + + // Check for potential parameter references that might cause issues + const parameterMatches = sqlQuery.match(/\$\d+(?!\:\:)/g); + if (parameterMatches && parameterMatches.length > 0) { + console.warn(`WARNING: Found ${parameterMatches.length} untyped parameters in SQL: ${parameterMatches.slice(0, 5).join(', ')}${parameterMatches.length > 5 ? '...' : ''}`); + console.warn('These might cause "could not determine data type of parameter" errors.'); + } + + // 2. Get Database Connection + connection = await getConnection(); + console.log("Database connection acquired."); + + // 3. Clean up Previous Runs & Create History Record (within a transaction) + await connection.query('BEGIN'); + + // Ensure calculate_status table exists + await connection.query(` + CREATE TABLE IF NOT EXISTS calculate_status ( + module_name TEXT PRIMARY KEY, + last_calculation_timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP + ); + `); + + // Ensure calculate_history table exists (basic structure) + await connection.query(` + CREATE TABLE IF NOT EXISTS calculate_history ( + id SERIAL PRIMARY KEY, + start_time TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, + end_time TIMESTAMP WITH TIME ZONE, + duration_seconds INTEGER, + status TEXT, -- 'running', 'completed', 'failed', 'cancelled' + error_message TEXT, + additional_info JSONB + ); + `); + + // Mark previous runs of this type as cancelled + await connection.query(` + UPDATE calculate_history + SET + status = 'cancelled', + end_time = NOW(), + duration_seconds = EXTRACT(EPOCH FROM (NOW() - start_time))::INTEGER, + error_message = 'Previous calculation was not completed properly or was superseded.' + WHERE status = 'running' AND additional_info->>'type' = $1::text; + `, [config.historyType]); + + // Create history record for this run + const historyResult = await connection.query(` + INSERT INTO calculate_history (status, additional_info) + VALUES ('running', jsonb_build_object('type', $1::text, 'sql_file', $2::text)) + RETURNING id; + `, [config.historyType, config.sqlFile]); + calculateHistoryId = historyResult.rows[0].id; + + await connection.query('COMMIT'); + console.log(`Created history record ID: ${calculateHistoryId}`); + + // 4. Initial Progress Update + progress.outputProgress({ + status: 'running', + operation: `Starting: ${config.name}`, + current: 0, total: 100, + elapsed: progress.formatElapsedTime(stepStartTime), + remaining: 'Calculating...', rate: 0, percentage: '0', + timing: { start_time: new Date(stepStartTime).toISOString() } + }); + + // 5. Execute the Main SQL Query + progress.outputProgress({ + status: 'running', + operation: `Executing SQL: ${config.name}`, + current: 25, total: 100, + elapsed: progress.formatElapsedTime(stepStartTime), + remaining: 'Executing...', rate: 0, percentage: '25', + timing: { start_time: new Date(stepStartTime).toISOString() } + }); + console.log(`Executing SQL for ${config.name}...`); + + try { + // Try executing exactly as individual scripts do + console.log('Executing SQL with simple query method...'); + await connection.query(sqlQuery); + } catch (sqlError) { + if (sqlError.message.includes('could not determine data type of parameter')) { + console.log('Simple query failed with parameter type error, trying alternative method...'); + try { + // Execute with explicit text mode to avoid parameter confusion + await connection.query({ + text: sqlQuery, + rowMode: 'text' + }); + } catch (altError) { + console.error('Alternative execution method also failed:', altError.message); + throw altError; // Re-throw the alternative error + } + } else { + console.error('SQL Execution Error:', sqlError.message); + if (sqlError.position) { + // If the error has a position, try to show the relevant part of the SQL query + const position = parseInt(sqlError.position, 10); + const startPos = Math.max(0, position - 100); + const endPos = Math.min(sqlQuery.length, position + 100); + console.error('SQL Error Context:'); + console.error('...' + sqlQuery.substring(startPos, position) + ' [ERROR HERE] ' + sqlQuery.substring(position, endPos) + '...'); + } + throw sqlError; // Re-throw to be caught by the main try/catch + } + } + + // Check for cancellation immediately after query finishes + if (isCancelled) throw new Error(`Calculation cancelled during SQL execution for ${config.name}`); + + console.log(`SQL execution finished for ${config.name}.`); + + // 6. Update Status & History (within a transaction) + await connection.query('BEGIN'); + + await connection.query(` + INSERT INTO calculate_status (module_name, last_calculation_timestamp) + VALUES ($1::text, NOW()) + ON CONFLICT (module_name) DO UPDATE + SET last_calculation_timestamp = EXCLUDED.last_calculation_timestamp; + `, [config.statusModule]); + + const stepDuration = Math.round((Date.now() - stepStartTime) / 1000); + await connection.query(` + UPDATE calculate_history + SET + end_time = NOW(), + duration_seconds = $1::integer, + status = 'completed' + WHERE id = $2::integer; + `, [stepDuration, calculateHistoryId]); + + await connection.query('COMMIT'); + + // 7. Final Progress Update for Step + progress.outputProgress({ + status: 'complete', + operation: `Completed: ${config.name}`, + current: 100, total: 100, + elapsed: progress.formatElapsedTime(stepStartTime), + remaining: '0s', rate: 0, percentage: '100', + timing: { + start_time: new Date(stepStartTime).toISOString(), + end_time: new Date().toISOString(), + elapsed_seconds: stepDuration + } + }); + console.log(`--- Finished Step: ${config.name} (Duration: ${progress.formatElapsedTime(stepStartTime)}) ---`); + + return { + success: true, + message: `${config.name} completed successfully`, + duration: stepDuration + }; + + } catch (error) { + clearTimeout(stepTimeoutHandle); // Clear timeout on error + const errorEndTime = Date.now(); + const errorDuration = Math.round((errorEndTime - stepStartTime) / 1000); + const finalStatus = isCancelled ? 'cancelled' : 'failed'; + const errorMessage = error.message || 'Unknown error'; + + console.error(`--- ERROR in Step: ${config.name} ---`); + console.error(error); // Log the full error + console.error(`------------------------------------`); + + // Update history with error/cancellation status + if (connection && calculateHistoryId) { + try { + // Use a separate transaction for error logging + await connection.query('ROLLBACK'); // Rollback any partial transaction from try block + await connection.query('BEGIN'); + await connection.query(` + UPDATE calculate_history + SET + end_time = NOW(), + duration_seconds = $1::integer, + status = $2::text, + error_message = $3::text + WHERE id = $4::integer; + `, [errorDuration, finalStatus, errorMessage.substring(0, 1000), calculateHistoryId]); // Limit error message size + await connection.query('COMMIT'); + console.log(`Updated history record ID ${calculateHistoryId} with status: ${finalStatus}`); + } catch (historyError) { + console.error("FATAL: Failed to update history record on error:", historyError); + // Cannot rollback here if already rolled back or commit failed + } + } else { + console.warn("Could not update history record on error (no connection or history ID)."); + } + + // Update progress file with error/cancellation + progress.outputProgress({ + status: finalStatus, + operation: `Error in ${config.name}: ${errorMessage.split('\n')[0]}`, // Show first line of error + current: 50, total: 100, // Indicate partial completion + elapsed: progress.formatElapsedTime(stepStartTime), + remaining: null, rate: 0, percentage: '50', + timing: { + start_time: new Date(stepStartTime).toISOString(), + end_time: new Date(errorEndTime).toISOString(), + elapsed_seconds: errorDuration + } + }); + + // Rethrow the error to be caught by the main runCalculations function + throw error; // Add context if needed: new Error(`Step ${config.name} failed: ${errorMessage}`) + + } finally { + clearTimeout(stepTimeoutHandle); // Ensure timeout is cleared + currentStep = ''; // Reset current step + if (connection) { + try { + await connection.release(); + console.log("Database connection released."); + } catch (releaseError) { + console.error("Error releasing database connection:", releaseError); + } + } + } +} + +/** + * Main function to run all calculation steps sequentially. + */ +async function runAllCalculations() { + overallStartTime = Date.now(); + isCancelled = false; // Reset cancellation flag at start + + // Overall timeout for the entire script + mainTimeoutHandle = setTimeout(() => { + console.error(`--- OVERALL TIMEOUT REACHED (${MAX_EXECUTION_TIME_TOTAL / 1000}s) ---`); + cancelCalculation(`overall timeout reached`); + // The process should exit via the unhandled rejection/exception handlers + // or the SIGTERM/SIGINT handlers after cancellation attempt. + }, MAX_EXECUTION_TIME_TOTAL); + + const steps = [ + { + run: RUN_DAILY_SNAPSHOTS, + name: 'Daily Snapshots Update', + sqlFile: 'metrics-new/update_daily_snapshots.sql', + historyType: 'daily_snapshots', + statusModule: 'daily_snapshots' + }, + { + run: RUN_PRODUCT_METRICS, + name: 'Product Metrics Update', + sqlFile: 'metrics-new/update_product_metrics.sql', // ASSUMING the initial population is now part of a regular update + historyType: 'product_metrics', + statusModule: 'product_metrics' + }, + { + run: RUN_PERIODIC_METRICS, + name: 'Periodic Metrics Update', + sqlFile: 'metrics-new/update_periodic_metrics.sql', + historyType: 'periodic_metrics', + statusModule: 'periodic_metrics' + } + ]; + + let overallSuccess = true; + + try { + for (const step of steps) { + if (step.run) { + if (isCancelled) { + console.log(`Skipping step "${step.name}" due to cancellation.`); + overallSuccess = false; // Mark as not fully successful if steps are skipped due to cancel + continue; // Skip to next step + } + // Pass the progress utilities to the step executor + await executeSqlStep(step, progressUtils); + } else { + console.log(`Skipping step "${step.name}" (disabled by configuration).`); + } + } + + // If we finished naturally (no errors thrown out) + clearTimeout(mainTimeoutHandle); // Clear the main timeout + + if (isCancelled) { + console.log("\n--- Calculation finished with cancellation ---"); + overallSuccess = false; + } else { + console.log("\n--- All enabled calculations finished successfully ---"); + progressUtils.clearProgress(); // Clear progress only on full success + } + + } catch (error) { + clearTimeout(mainTimeoutHandle); // Clear the main timeout + console.error("\n--- SCRIPT EXECUTION FAILED ---"); + // Error details were already logged by executeSqlStep or global handlers + overallSuccess = false; + // Don't re-log the error here unless adding context + // console.error("Overall failure reason:", error.message); + } finally { + await closePool(); + console.log(`Total execution time: ${progressUtils.formatElapsedTime(overallStartTime)}`); + process.exit(overallSuccess ? 0 : 1); + } +} + +// --- Script Execution --- +if (require.main === module) { + runAllCalculations(); +} else { + // Export functions if needed as a module (e.g., for testing or API) + module.exports = { + runAllCalculations, + cancelCalculation, + // Expose individual steps if useful, wrapping them slightly + runDailySnapshots: () => executeSqlStep({ name: 'Daily Snapshots Update', sqlFile: 'update_daily_snapshots.sql', historyType: 'daily_snapshots', statusModule: 'daily_snapshots' }, progressUtils), + runProductMetrics: () => executeSqlStep({ name: 'Product Metrics Update', sqlFile: 'update_product_metrics.sql', historyType: 'product_metrics', statusModule: 'product_metrics' }, progressUtils), + runPeriodicMetrics: () => executeSqlStep({ name: 'Periodic Metrics Update', sqlFile: 'update_periodic_metrics.sql', historyType: 'periodic_metrics', statusModule: 'periodic_metrics' }, progressUtils), + getProgress: progressUtils.getProgress + }; +} \ No newline at end of file diff --git a/inventory-server/scripts/metrics-new/backfill-snapshots.js b/inventory-server/scripts/metrics-new/backfill-snapshots.js new file mode 100644 index 0000000..e7b7015 --- /dev/null +++ b/inventory-server/scripts/metrics-new/backfill-snapshots.js @@ -0,0 +1,426 @@ +const path = require('path'); +const fs = require('fs'); +const progress = require('./utils/progress'); // Assuming progress utils are here +const { getConnection, closePool } = require('./utils/db'); // Assuming db utils are here +const os = require('os'); // For detecting number of CPU cores + +// --- Configuration --- +const BATCH_SIZE_DAYS = 1; // Process 1 day per database function call +const SQL_FUNCTION_FILE = path.resolve(__dirname, 'backfill_historical_snapshots.sql'); // Correct path +const LOG_PROGRESS_INTERVAL_MS = 5000; // Update console progress roughly every 5 seconds +const HISTORY_TYPE = 'backfill_snapshots'; // Identifier for history table +const MAX_WORKERS = Math.max(1, Math.floor(os.cpus().length / 2)); // Use half of available CPU cores +const USE_PARALLEL = false; // Set to true to enable parallel processing +const PG_STATEMENT_TIMEOUT_MS = 1800000; // 30 minutes max per query + +// --- Cancellation Handling --- +let isCancelled = false; +let runningQueryPromise = null; // To potentially track the active query + +function requestCancellation() { + if (!isCancelled) { + isCancelled = true; + console.warn('\nCancellation requested. Finishing current batch then stopping...'); + // Note: We are NOT forcefully cancelling the backend query anymore. + } +} + +process.on('SIGINT', requestCancellation); // Handle Ctrl+C +process.on('SIGTERM', requestCancellation); // Handle termination signals + +// --- Main Backfill Function --- +async function backfillSnapshots(cmdStartDate, cmdEndDate, cmdStartBatch = 1) { + let connection; + const overallStartTime = Date.now(); + let calculateHistoryId = null; + let processedDaysTotal = 0; // Track total days processed across all batches executed in this run + let currentBatchNum = cmdStartBatch > 0 ? cmdStartBatch : 1; + let totalBatches = 0; // Initialize totalBatches + let totalDays = 0; // Initialize totalDays + + console.log(`Starting snapshot backfill process...`); + console.log(`SQL Function definition file: ${SQL_FUNCTION_FILE}`); + if (!fs.existsSync(SQL_FUNCTION_FILE)) { + console.error(`FATAL: SQL file not found at ${SQL_FUNCTION_FILE}`); + process.exit(1); // Exit early if file doesn't exist + } + + try { + // Set up a connection with higher memory limits + connection = await getConnection({ + // Add performance-related settings + application_name: 'backfill_snapshots', + statement_timeout: PG_STATEMENT_TIMEOUT_MS, // 30 min timeout per statement + // These parameters may need to be configured in your database: + // work_mem: '1GB', + // maintenance_work_mem: '2GB', + // temp_buffers: '1GB', + }); + + console.log('Database connection acquired.'); + + // --- Ensure Function Exists --- + console.log('Ensuring database function is up-to-date...'); + try { + const sqlFunctionDef = fs.readFileSync(SQL_FUNCTION_FILE, 'utf8'); + if (!sqlFunctionDef.includes('CREATE OR REPLACE FUNCTION backfill_daily_snapshots_range_final')) { + throw new Error(`SQL file ${SQL_FUNCTION_FILE} does not seem to contain the function definition.`); + } + await connection.query(sqlFunctionDef); // Execute the whole file + console.log('Database function `backfill_daily_snapshots_range_final` created/updated.'); + + // Add performance query hints to the database + await connection.query(` + -- Analyze tables for better query planning + ANALYZE public.products; + ANALYZE public.imported_daily_inventory; + ANALYZE public.imported_product_stat_history; + ANALYZE public.daily_product_snapshots; + ANALYZE public.imported_product_current_prices; + `).catch(err => { + // Non-fatal if analyze fails + console.warn('Failed to analyze tables (non-fatal):', err.message); + }); + + } catch (err) { + console.error(`Error processing SQL function file ${SQL_FUNCTION_FILE}:`, err); + throw new Error(`Failed to create or replace DB function: ${err.message}`); + } + + // --- Prepare History Record --- + console.log('Preparing calculation history record...'); + // Ensure history table exists (optional, could be done elsewhere) + await connection.query(` + CREATE TABLE IF NOT EXISTS public.calculate_history ( + id SERIAL PRIMARY KEY, + start_time TIMESTAMPTZ NOT NULL DEFAULT NOW(), + end_time TIMESTAMPTZ, + duration_seconds INTEGER, + status VARCHAR(20) NOT NULL, -- e.g., 'running', 'completed', 'failed', 'cancelled' + error_message TEXT, + additional_info JSONB -- Store type, file, batch info etc. + ); + `); + // Mark previous runs of this type as potentially failed if they were left 'running' + await connection.query(` + UPDATE public.calculate_history + SET status = 'failed', error_message = 'Interrupted by new run.' + WHERE status = 'running' AND additional_info->>'type' = $1; + `, [HISTORY_TYPE]); + + // Create new history record + const historyResult = await connection.query(` + INSERT INTO public.calculate_history (start_time, status, additional_info) + VALUES (NOW(), 'running', jsonb_build_object('type', $1::text, 'sql_file', $2::text, 'start_batch', $3::integer)) + RETURNING id; + `, [HISTORY_TYPE, path.basename(SQL_FUNCTION_FILE), cmdStartBatch]); + calculateHistoryId = historyResult.rows[0].id; + console.log(`Calculation history record created with ID: ${calculateHistoryId}`); + + + // --- Determine Date Range --- + console.log('Determining date range...'); + let effectiveStartDate, effectiveEndDate; + + // Use command-line dates if provided, otherwise query DB + if (cmdStartDate) { + effectiveStartDate = cmdStartDate; + } else { + const minDateResult = await connection.query(` + SELECT LEAST( + COALESCE((SELECT MIN(date) FROM public.imported_daily_inventory WHERE date > '1970-01-01'), CURRENT_DATE), + COALESCE((SELECT MIN(date) FROM public.imported_product_stat_history WHERE date > '1970-01-01'), CURRENT_DATE) + )::date as min_date; + `); + effectiveStartDate = minDateResult.rows[0]?.min_date || new Date().toISOString().split('T')[0]; // Fallback + console.log(`Auto-detected start date: ${effectiveStartDate}`); + } + + if (cmdEndDate) { + effectiveEndDate = cmdEndDate; + } else { + const maxDateResult = await connection.query(` + SELECT GREATEST( + COALESCE((SELECT MAX(date) FROM public.imported_daily_inventory WHERE date < CURRENT_DATE), '1970-01-01'::date), + COALESCE((SELECT MAX(date) FROM public.imported_product_stat_history WHERE date < CURRENT_DATE), '1970-01-01'::date) + )::date as max_date; + `); + // Ensure end date is not today or in the future + effectiveEndDate = maxDateResult.rows[0]?.max_date || new Date(Date.now() - 86400000).toISOString().split('T')[0]; // Default yesterday + if (new Date(effectiveEndDate) >= new Date(new Date().toISOString().split('T')[0])) { + effectiveEndDate = new Date(Date.now() - 86400000).toISOString().split('T')[0]; // Set to yesterday if >= today + } + console.log(`Auto-detected end date: ${effectiveEndDate}`); + } + + // Validate dates + const dStart = new Date(effectiveStartDate); + const dEnd = new Date(effectiveEndDate); + if (isNaN(dStart.getTime()) || isNaN(dEnd.getTime()) || dStart > dEnd) { + throw new Error(`Invalid date range: Start "${effectiveStartDate}", End "${effectiveEndDate}"`); + } + + // --- Batch Processing --- + totalDays = Math.ceil((dEnd - dStart) / (1000 * 60 * 60 * 24)) + 1; // Inclusive + totalBatches = Math.ceil(totalDays / BATCH_SIZE_DAYS); + + console.log(`Target Date Range: ${effectiveStartDate} to ${effectiveEndDate} (${totalDays} days)`); + console.log(`Total Batches: ${totalBatches} (Batch Size: ${BATCH_SIZE_DAYS} days)`); + console.log(`Starting from Batch: ${currentBatchNum}`); + + // Initial progress update + progress.outputProgress({ + status: 'running', + operation: 'Starting Batch Processing', + currentBatch: currentBatchNum, + totalBatches: totalBatches, + totalDays: totalDays, + elapsed: '0s', + remaining: 'Calculating...', + rate: 0, + historyId: calculateHistoryId // Include history ID in the object + }); + + while (currentBatchNum <= totalBatches && !isCancelled) { + const batchOffset = (currentBatchNum - 1) * BATCH_SIZE_DAYS; + const batchStartDate = new Date(dStart); + batchStartDate.setDate(dStart.getDate() + batchOffset); + + const batchEndDate = new Date(batchStartDate); + batchEndDate.setDate(batchStartDate.getDate() + BATCH_SIZE_DAYS - 1); + + // Clamp batch end date to the overall effective end date + if (batchEndDate > dEnd) { + batchEndDate.setTime(dEnd.getTime()); + } + + const batchStartDateStr = batchStartDate.toISOString().split('T')[0]; + const batchEndDateStr = batchEndDate.toISOString().split('T')[0]; + const batchStartTime = Date.now(); + + console.log(`\n--- Processing Batch ${currentBatchNum} / ${totalBatches} ---`); + console.log(` Dates: ${batchStartDateStr} to ${batchEndDateStr}`); + + // Execute the function for the batch + try { + progress.outputProgress({ + status: 'running', + operation: `Executing DB function for batch ${currentBatchNum}...`, + currentBatch: currentBatchNum, + totalBatches: totalBatches, + totalDays: totalDays, + elapsed: progress.formatElapsedTime(overallStartTime), + remaining: 'Executing...', + rate: 0, + historyId: calculateHistoryId + }); + + // Performance improvement: Add batch processing hint + await connection.query('SET LOCAL enable_parallel_append = on; SET LOCAL enable_parallel_hash = on; SET LOCAL max_parallel_workers_per_gather = 4;'); + + // Store promise in case we need to try and cancel (though not implemented forcefully) + runningQueryPromise = connection.query( + `SELECT backfill_daily_snapshots_range_final($1::date, $2::date);`, + [batchStartDateStr, batchEndDateStr] + ); + await runningQueryPromise; // Wait for the function call to complete + runningQueryPromise = null; // Clear the promise + + const batchDurationMs = Date.now() - batchStartTime; + const daysInThisBatch = Math.ceil((batchEndDate - batchStartDate) / (1000 * 60 * 60 * 24)) + 1; + processedDaysTotal += daysInThisBatch; + + console.log(` Batch ${currentBatchNum} completed in ${progress.formatElapsedTime(batchStartTime)}.`); + + // --- Update Progress & History --- + const overallElapsedSec = Math.round((Date.now() - overallStartTime) / 1000); + progress.outputProgress({ + status: 'running', + operation: `Completed batch ${currentBatchNum}`, + currentBatch: currentBatchNum, + totalBatches: totalBatches, + totalDays: totalDays, + processedDays: processedDaysTotal, + elapsed: progress.formatElapsedTime(overallStartTime), + remaining: progress.estimateRemaining(overallStartTime, processedDaysTotal, totalDays), + rate: progress.calculateRate(overallStartTime, processedDaysTotal), + batchDuration: progress.formatElapsedTime(batchStartTime), + historyId: calculateHistoryId + }); + + // Save checkpoint in history + await connection.query(` + UPDATE public.calculate_history + SET additional_info = jsonb_set(additional_info, '{last_completed_batch}', $1::jsonb) + || jsonb_build_object('last_processed_date', $2::text) + WHERE id = $3::integer; + `, [JSON.stringify(currentBatchNum), batchEndDateStr, calculateHistoryId]); + + + } catch (batchError) { + console.error(`\n--- ERROR in Batch ${currentBatchNum} (${batchStartDateStr} to ${batchEndDateStr}) ---`); + console.error(' Database Error:', batchError.message); + console.error(' DB Error Code:', batchError.code); + // Log detailed error to history and re-throw to stop the process + await connection.query(` + UPDATE public.calculate_history + SET status = 'failed', + end_time = NOW(), + duration_seconds = $1::integer, + error_message = $2::text, + additional_info = additional_info || jsonb_build_object('failed_batch', $3::integer, 'failed_date_range', $4::text) + WHERE id = $5::integer; + `, [ + Math.round((Date.now() - overallStartTime) / 1000), + `Batch ${currentBatchNum} failed: ${batchError.message} (Code: ${batchError.code || 'N/A'})`, + currentBatchNum, + `${batchStartDateStr} to ${batchEndDateStr}`, + calculateHistoryId + ]); + throw batchError; // Stop execution + } + + currentBatchNum++; + // Optional delay between batches + // await new Promise(resolve => setTimeout(resolve, 500)); + + } // End while loop + + // --- Final Outcome --- + const finalStatus = isCancelled ? 'cancelled' : 'completed'; + const finalMessage = isCancelled ? `Calculation stopped after completing batch ${currentBatchNum - 1}.` : 'Historical snapshots backfill completed successfully.'; + const finalDurationSec = Math.round((Date.now() - overallStartTime) / 1000); + + console.log(`\n--- Backfill ${finalStatus.toUpperCase()} ---`); + console.log(finalMessage); + console.log(`Total duration: ${progress.formatElapsedTime(overallStartTime)}`); + + // Update history record + await connection.query(` + UPDATE public.calculate_history SET status = $1::calculation_status, end_time = NOW(), duration_seconds = $2::integer, error_message = $3 + WHERE id = $4::integer; + `, [finalStatus, finalDurationSec, (isCancelled ? 'User cancelled' : null), calculateHistoryId]); + + if (!isCancelled) { + progress.clearProgress(); // Clear progress state only on successful completion + } else { + progress.outputProgress({ // Final cancelled status update + status: 'cancelled', + operation: finalMessage, + currentBatch: currentBatchNum - 1, + totalBatches: totalBatches, + totalDays: totalDays, + processedDays: processedDaysTotal, + elapsed: progress.formatElapsedTime(overallStartTime), + remaining: 'Cancelled', + rate: 0, + historyId: calculateHistoryId + }); + } + + return { success: true, status: finalStatus, message: finalMessage, duration: finalDurationSec }; + + } catch (error) { + console.error('\n--- Backfill encountered an unrecoverable error ---'); + console.error(error.message); + const finalDurationSec = Math.round((Date.now() - overallStartTime) / 1000); + + // Update history if possible + if (connection && calculateHistoryId) { + try { + await connection.query(` + UPDATE public.calculate_history + SET status = $1::calculation_status, end_time = NOW(), duration_seconds = $2::integer, error_message = $3::text + WHERE id = $4::integer; + `, [ + isCancelled ? 'cancelled' : 'failed', + finalDurationSec, + error.message, + calculateHistoryId + ]); + } catch (histError) { + console.error("Failed to update history record with error state:", histError); + } + } else { + console.error("Could not update history record (no ID or connection)."); + } + + // FIX: Use initialized value or a default if loop never started + const batchNumForError = currentBatchNum > cmdStartBatch ? currentBatchNum - 1 : cmdStartBatch - 1; + + // Update progress.outputProgress call to match actual function signature + try { + // Create progress data object + const progressData = { + status: 'failed', + operation: 'Backfill failed', + message: error.message, + currentBatch: batchNumForError, + totalBatches: totalBatches, + totalDays: totalDays, + processedDays: processedDaysTotal, + elapsed: progress.formatElapsedTime(overallStartTime), + remaining: 'Failed', + rate: 0, + // Include history ID in progress data if needed + historyId: calculateHistoryId + }; + + // Call with single object parameter (not separate historyId) + progress.outputProgress(progressData); + } catch (progressError) { + console.error('Failed to report progress:', progressError); + } + + return { success: false, status: 'failed', error: error.message, duration: finalDurationSec }; + + } finally { + if (connection) { + console.log('Releasing database connection.'); + connection.release(); + } + // Close pool only if this script is meant to be standalone + // If part of a larger app, the app should manage pool closure + // console.log('Closing database pool.'); + // await closePool(); + } +} + +// --- Script Execution --- + +// Parse command-line arguments +const args = process.argv.slice(2); +let cmdStartDateArg, cmdEndDateArg, cmdStartBatchArg = 1; // Default start batch is 1 + +for (let i = 0; i < args.length; i++) { + if (args[i] === '--start-date' && args[i+1]) cmdStartDateArg = args[++i]; + else if (args[i] === '--end-date' && args[i+1]) cmdEndDateArg = args[++i]; + else if (args[i] === '--start-batch' && args[i+1]) cmdStartBatchArg = parseInt(args[++i], 10); +} + +if (isNaN(cmdStartBatchArg) || cmdStartBatchArg < 1) { + console.warn(`Invalid --start-batch value. Defaulting to 1.`); + cmdStartBatchArg = 1; +} + +// Run the backfill process +backfillSnapshots(cmdStartDateArg, cmdEndDateArg, cmdStartBatchArg) + .then(result => { + if (result.success) { + console.log(`\n✅ ${result.message} (Duration: ${result.duration}s)`); + process.exitCode = 0; // Success + } else { + console.error(`\n❌ Backfill failed: ${result.error || 'Unknown error'} (Duration: ${result.duration}s)`); + process.exitCode = 1; // Failure + } + }) + .catch(err => { + console.error('\n❌ Unexpected error during backfill execution:', err); + process.exitCode = 1; // Failure + }) + .finally(async () => { + // Ensure pool is closed if run standalone + console.log('Backfill script finished. Closing pool.'); + await closePool(); // Make sure closePool exists and works in your db utils + process.exit(process.exitCode); // Exit with appropriate code + }); \ No newline at end of file diff --git a/inventory-server/scripts/metrics-new/backfill_historical_snapshots.sql b/inventory-server/scripts/metrics-new/backfill_historical_snapshots.sql new file mode 100644 index 0000000..f0c3037 --- /dev/null +++ b/inventory-server/scripts/metrics-new/backfill_historical_snapshots.sql @@ -0,0 +1,161 @@ +-- Description: Backfills the daily_product_snapshots table using imported historical unit data +-- (daily inventory/stats) and historical price data (current prices table). +-- - Uses imported daily sales/receipt UNIT counts for accuracy. +-- - ESTIMATES historical stock levels using a forward calculation. +-- - APPROXIMATES historical REVENUE using looked-up historical base prices. +-- - APPROXIMATES historical COGS, PROFIT, and STOCK VALUE using CURRENT product costs/prices. +-- Run ONCE after importing historical data and before initial product_metrics population. +-- Dependencies: Core import tables (products), imported history tables (imported_daily_inventory, +-- imported_product_stat_history, imported_product_current_prices), +-- daily_product_snapshots table must exist. +-- Frequency: Run ONCE. + +CREATE OR REPLACE FUNCTION backfill_daily_snapshots_range_final( + _start_date DATE, + _end_date DATE +) +RETURNS VOID AS $$ +DECLARE + _current_processing_date DATE := _start_date; + _batch_start_time TIMESTAMPTZ; + _row_count INTEGER; +BEGIN + RAISE NOTICE 'Starting FINAL historical snapshot backfill from % to %.', _start_date, _end_date; + RAISE NOTICE 'Using historical units and historical prices (for revenue approximation).'; + RAISE NOTICE 'WARNING: Historical COGS, Profit, and Stock Value use CURRENT product costs/prices.'; + + -- Ensure end date is not in the future + IF _end_date >= CURRENT_DATE THEN + _end_date := CURRENT_DATE - INTERVAL '1 day'; + RAISE NOTICE 'Adjusted end date to % to avoid conflict with hourly script.', _end_date; + END IF; + + -- Performance: Create temporary table with product info to avoid repeated lookups + CREATE TEMP TABLE IF NOT EXISTS temp_product_info AS + SELECT + pid, + sku, + COALESCE(landing_cost_price, cost_price, 0.00) as effective_cost_price, + COALESCE(price, 0.00) as current_price, + COALESCE(regular_price, 0.00) as current_regular_price + FROM public.products; + + -- Performance: Create index on temporary table + CREATE INDEX IF NOT EXISTS temp_product_info_pid_idx ON temp_product_info(pid); + + ANALYZE temp_product_info; + + RAISE NOTICE 'Created temporary product info table with % products', (SELECT COUNT(*) FROM temp_product_info); + + WHILE _current_processing_date <= _end_date LOOP + _batch_start_time := clock_timestamp(); + RAISE NOTICE 'Processing date: %', _current_processing_date; + + -- Get Daily Transaction Unit Info from imported history + WITH DailyHistoryUnits AS ( + SELECT + pids.pid, + -- Prioritize daily_inventory, fallback to product_stat_history for sold qty + COALESCE(di.amountsold, ps.qty_sold, 0)::integer as units_sold_today, + COALESCE(di.qtyreceived, 0)::integer as units_received_today + FROM + (SELECT DISTINCT pid FROM temp_product_info) pids -- Ensure all products are considered + LEFT JOIN public.imported_daily_inventory di + ON pids.pid = di.pid AND di.date = _current_processing_date + LEFT JOIN public.imported_product_stat_history ps + ON pids.pid = ps.pid AND ps.date = _current_processing_date + -- Removed WHERE clause to ensure snapshots are created even for days with 0 activity, + -- allowing stock carry-over. The main query will handle products properly. + ), + HistoricalPrice AS ( + -- Find the base price (qty_buy=1) active on the processing date + SELECT DISTINCT ON (pid) + pid, + price_each + FROM public.imported_product_current_prices + WHERE + qty_buy = 1 + -- Use TIMESTAMPTZ comparison logic: + AND date_active <= (_current_processing_date + interval '1 day' - interval '1 second') -- Active sometime on or before end of processing day + AND (date_deactive IS NULL OR date_deactive > _current_processing_date) -- Not deactivated before start of processing day + -- Assuming 'active' flag isn't needed if dates are correct; add 'AND active != 0' if necessary + ORDER BY + pid, date_active DESC -- Get the most recently activated price + ), + PreviousStock AS ( + -- Get the estimated stock from the PREVIOUS day snapshot + SELECT pid, eod_stock_quantity + FROM public.daily_product_snapshots + WHERE snapshot_date = _current_processing_date - INTERVAL '1 day' + ) + -- Insert into the daily snapshots table + INSERT INTO public.daily_product_snapshots ( + snapshot_date, pid, sku, + eod_stock_quantity, eod_stock_cost, eod_stock_retail, eod_stock_gross, stockout_flag, + units_sold, units_returned, + gross_revenue, discounts, returns_revenue, + net_revenue, cogs, gross_regular_revenue, profit, + units_received, cost_received, + calculation_timestamp + ) + SELECT + _current_processing_date AS snapshot_date, + p.pid, + p.sku, + -- Estimated EOD Stock (using historical daily units) + -- Handle potential NULL from joins with COALESCE 0 + COALESCE(ps.eod_stock_quantity, 0) + COALESCE(dh.units_received_today, 0) - COALESCE(dh.units_sold_today, 0) AS estimated_eod_stock, + -- Valued Stock (using estimated stock and CURRENT prices/costs - APPROXIMATION) + GREATEST(0, COALESCE(ps.eod_stock_quantity, 0) + COALESCE(dh.units_received_today, 0) - COALESCE(dh.units_sold_today, 0)) * p.effective_cost_price AS eod_stock_cost, + GREATEST(0, COALESCE(ps.eod_stock_quantity, 0) + COALESCE(dh.units_received_today, 0) - COALESCE(dh.units_sold_today, 0)) * p.current_price AS eod_stock_retail, -- Stock retail uses current price + GREATEST(0, COALESCE(ps.eod_stock_quantity, 0) + COALESCE(dh.units_received_today, 0) - COALESCE(dh.units_sold_today, 0)) * p.current_regular_price AS eod_stock_gross, -- Stock gross uses current regular price + -- Stockout Flag (based on estimated stock) + (COALESCE(ps.eod_stock_quantity, 0) + COALESCE(dh.units_received_today, 0) - COALESCE(dh.units_sold_today, 0)) <= 0 AS stockout_flag, + + -- Today's Unit Aggregates from History + COALESCE(dh.units_sold_today, 0) as units_sold, + 0 AS units_returned, -- Placeholder: Cannot determine returns from daily summary + + -- Monetary Values using looked-up Historical Price and CURRENT Cost/RegPrice + COALESCE(dh.units_sold_today, 0) * COALESCE(hp.price_each, p.current_price) AS gross_revenue, -- Approx Revenue + 0 AS discounts, -- Placeholder + 0 AS returns_revenue, -- Placeholder + COALESCE(dh.units_sold_today, 0) * COALESCE(hp.price_each, p.current_price) AS net_revenue, -- Approx Net Revenue + COALESCE(dh.units_sold_today, 0) * p.effective_cost_price AS cogs, -- Approx COGS (uses CURRENT cost) + COALESCE(dh.units_sold_today, 0) * p.current_regular_price AS gross_regular_revenue, -- Approx Gross Regular Revenue + -- Approx Profit + (COALESCE(dh.units_sold_today, 0) * COALESCE(hp.price_each, p.current_price)) - (COALESCE(dh.units_sold_today, 0) * p.effective_cost_price) AS profit, + + COALESCE(dh.units_received_today, 0) as units_received, + -- Estimate received cost using CURRENT product cost + COALESCE(dh.units_received_today, 0) * p.effective_cost_price AS cost_received, -- Approx + + clock_timestamp() -- Timestamp of this specific calculation + FROM temp_product_info p -- Use the temp table for better performance + LEFT JOIN PreviousStock ps ON p.pid = ps.pid + LEFT JOIN DailyHistoryUnits dh ON p.pid = dh.pid -- Join today's historical activity + LEFT JOIN HistoricalPrice hp ON p.pid = hp.pid -- Join the looked-up historical price + -- Optimization: Only process products with activity or previous stock + WHERE (dh.units_sold_today > 0 OR dh.units_received_today > 0 OR COALESCE(ps.eod_stock_quantity, 0) > 0) + + ON CONFLICT (snapshot_date, pid) DO NOTHING; -- Avoid errors if rerunning parts, but prefer clean runs + + GET DIAGNOSTICS _row_count = ROW_COUNT; + RAISE NOTICE 'Processed %: Inserted/Skipped % rows. Duration: %', + _current_processing_date, + _row_count, + clock_timestamp() - _batch_start_time; + + _current_processing_date := _current_processing_date + INTERVAL '1 day'; + + END LOOP; + + -- Clean up temporary tables + DROP TABLE IF EXISTS temp_product_info; + + RAISE NOTICE 'Finished FINAL historical snapshot backfill.'; +END; +$$ LANGUAGE plpgsql; + +-- Example usage: +-- SELECT backfill_daily_snapshots_range_final('2023-01-01'::date, '2023-12-31'::date); \ No newline at end of file diff --git a/inventory-server/scripts/metrics-new/populate-initial-metrics.js b/inventory-server/scripts/metrics-new/populate-initial-metrics.js new file mode 100644 index 0000000..74d4ea4 --- /dev/null +++ b/inventory-server/scripts/metrics-new/populate-initial-metrics.js @@ -0,0 +1,396 @@ +const path = require('path'); +const fs = require('fs'); +const os = require('os'); // For detecting CPU cores + +// Get the base directory (the directory containing the inventory-server folder) +const baseDir = path.resolve(__dirname, '../../..'); + +// Load environment variables from the inventory-server directory +require('dotenv').config({ path: path.resolve(__dirname, '../..', '.env') }); + +// Configure statement timeout (30 minutes) +const PG_STATEMENT_TIMEOUT_MS = 1800000; + +// Add error handler for uncaught exceptions +process.on('uncaughtException', (error) => { + console.error('Uncaught Exception:', error); + process.exit(1); +}); + +// Add error handler for unhandled promise rejections +process.on('unhandledRejection', (reason, promise) => { + console.error('Unhandled Rejection at:', promise, 'reason:', reason); + process.exit(1); +}); + +// Load progress module +const progress = require('./utils/progress'); + +// Store progress functions in global scope to ensure availability +global.formatElapsedTime = progress.formatElapsedTime; +global.estimateRemaining = progress.estimateRemaining; +global.calculateRate = progress.calculateRate; +global.outputProgress = progress.outputProgress; +global.clearProgress = progress.clearProgress; +global.getProgress = progress.getProgress; +global.logError = progress.logError; + +// Load database module +const { getConnection, closePool } = require('./utils/db'); + +// Add cancel handler +let isCancelled = false; +let runningQueryPromise = null; + +function cancelCalculation() { + if (!isCancelled) { + isCancelled = true; + console.log('Calculation has been cancelled by user'); + + // Store the query promise to potentially cancel it + const queryToCancel = runningQueryPromise; + if (queryToCancel) { + console.log('Attempting to cancel the running query...'); + } + + // Force-terminate any query that's been running for more than 5 seconds + try { + const connection = getConnection(); + connection.then(async (conn) => { + try { + // Identify and terminate long-running queries from our application + await conn.query(` + SELECT pg_cancel_backend(pid) + FROM pg_stat_activity + WHERE query_start < now() - interval '5 seconds' + AND application_name = 'populate_metrics' + AND query NOT LIKE '%pg_cancel_backend%' + `); + + // Release connection + conn.release(); + } catch (err) { + console.error('Error during force cancellation:', err); + conn.release(); + } + }).catch(err => { + console.error('Could not get connection for cancellation:', err); + }); + } catch (err) { + console.error('Failed to terminate running queries:', err); + } + } + + return { + success: true, + message: 'Calculation has been cancelled' + }; +} + +// Handle SIGTERM signal for cancellation +process.on('SIGTERM', cancelCalculation); +process.on('SIGINT', cancelCalculation); + +async function populateInitialMetrics() { + let connection; + const startTime = Date.now(); + let calculateHistoryId; + + try { + // Clean up any previously running calculations + connection = await getConnection({ + // Add performance-related settings + application_name: 'populate_metrics', + statement_timeout: PG_STATEMENT_TIMEOUT_MS, // 30 min timeout per statement + }); + + // Ensure the calculate_status table exists and has the correct structure + await connection.query(` + CREATE TABLE IF NOT EXISTS calculate_status ( + module_name TEXT PRIMARY KEY, + last_calculation_timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP + ) + `); + + await connection.query(` + UPDATE calculate_history + SET + status = 'cancelled', + end_time = NOW(), + duration_seconds = EXTRACT(EPOCH FROM (NOW() - start_time))::INTEGER, + error_message = 'Previous calculation was not completed properly' + WHERE status = 'running' AND additional_info->>'type' = 'populate_initial_metrics' + `); + + // Create history record for this calculation + const historyResult = await connection.query(` + INSERT INTO calculate_history ( + start_time, + status, + additional_info + ) VALUES ( + NOW(), + 'running', + jsonb_build_object( + 'type', 'populate_initial_metrics', + 'sql_file', 'populate_initial_product_metrics.sql' + ) + ) RETURNING id + `); + calculateHistoryId = historyResult.rows[0].id; + + // Initialize progress + global.outputProgress({ + status: 'running', + operation: 'Starting initial product metrics population', + current: 0, + total: 100, + elapsed: '0s', + remaining: 'Calculating... (this may take a while)', + rate: 0, + percentage: '0', + timing: { + start_time: new Date(startTime).toISOString(), + end_time: new Date().toISOString(), + elapsed_seconds: Math.round((Date.now() - startTime) / 1000) + }, + historyId: calculateHistoryId + }); + + // Prepare the database - analyze tables + global.outputProgress({ + status: 'running', + operation: 'Analyzing database tables for better query performance', + current: 2, + total: 100, + elapsed: global.formatElapsedTime(startTime), + remaining: 'Analyzing...', + rate: 0, + percentage: '2', + timing: { + start_time: new Date(startTime).toISOString(), + end_time: new Date().toISOString(), + elapsed_seconds: Math.round((Date.now() - startTime) / 1000) + }, + historyId: calculateHistoryId + }); + + // Enable better query planning and parallel operations + await connection.query(` + -- Analyze tables for better query planning + ANALYZE public.products; + ANALYZE public.purchase_orders; + ANALYZE public.daily_product_snapshots; + ANALYZE public.orders; + + -- Enable parallel operations + SET LOCAL enable_parallel_append = on; + SET LOCAL enable_parallel_hash = on; + SET LOCAL max_parallel_workers_per_gather = 4; + + -- Larger work memory for complex sorts/joins + SET LOCAL work_mem = '128MB'; + `).catch(err => { + // Non-fatal if analyze fails + console.warn('Failed to analyze tables (non-fatal):', err.message); + }); + + // Execute the SQL query + global.outputProgress({ + status: 'running', + operation: 'Executing initial metrics SQL query', + current: 5, + total: 100, + elapsed: global.formatElapsedTime(startTime), + remaining: 'Calculating... (this could take several hours with 150M+ records)', + rate: 0, + percentage: '5', + timing: { + start_time: new Date(startTime).toISOString(), + end_time: new Date().toISOString(), + elapsed_seconds: Math.round((Date.now() - startTime) / 1000) + }, + historyId: calculateHistoryId + }); + + // Read the SQL file + const sqlFilePath = path.resolve(__dirname, 'populate_initial_product_metrics.sql'); + console.log('Base directory:', baseDir); + console.log('Script directory:', __dirname); + console.log('SQL file path:', sqlFilePath); + console.log('Current working directory:', process.cwd()); + + if (!fs.existsSync(sqlFilePath)) { + throw new Error(`SQL file not found at ${sqlFilePath}`); + } + + // Read and clean up the SQL (Slightly more robust cleaning) + const sqlQuery = fs.readFileSync(sqlFilePath, 'utf8') + .replace(/\r\n/g, '\n') // Handle Windows endings + .replace(/\r/g, '\n') // Handle old Mac endings + .trim(); // Remove leading/trailing whitespace VERY IMPORTANT + + // Log details again AFTER cleaning + console.log('SQL Query length (cleaned):', sqlQuery.length); + console.log('SQL Query structure validation:'); + console.log('- Contains DO block:', sqlQuery.includes('DO $$') || sqlQuery.includes('DO $')); // Check both types of tag start + console.log('- Contains BEGIN:', sqlQuery.includes('BEGIN')); + console.log('- Contains END:', sqlQuery.includes('END $$;') || sqlQuery.includes('END $')); // Check both types of tag end + console.log('- First 50 chars:', JSON.stringify(sqlQuery.slice(0, 50))); + console.log('- Last 100 chars (cleaned):', JSON.stringify(sqlQuery.slice(-100))); + + // Final check to ensure clean SQL ending + if (!sqlQuery.endsWith('END $$;')) { + console.warn('WARNING: SQL does not end with "END $$;". This might cause issues.'); + console.log('Exact ending:', JSON.stringify(sqlQuery.slice(-20))); + } + + // Execute the script + console.log('Starting initial product metrics population...'); + + // Track the query promise for potential cancellation + runningQueryPromise = connection.query({ + text: sqlQuery, + rowMode: 'array' + }); + await runningQueryPromise; + runningQueryPromise = null; + + // Update progress to 100% + global.outputProgress({ + status: 'complete', + operation: 'Initial product metrics population complete', + current: 100, + total: 100, + elapsed: global.formatElapsedTime(startTime), + remaining: '0s', + rate: 0, + percentage: '100', + timing: { + start_time: new Date(startTime).toISOString(), + end_time: new Date().toISOString(), + elapsed_seconds: Math.round((Date.now() - startTime) / 1000) + }, + historyId: calculateHistoryId + }); + + // Update history with completion + await connection.query(` + UPDATE calculate_history + SET + end_time = NOW(), + duration_seconds = $1, + status = 'completed' + WHERE id = $2 + `, [Math.round((Date.now() - startTime) / 1000), calculateHistoryId]); + + // Clear progress file on successful completion + global.clearProgress(); + + return { + success: true, + message: 'Initial product metrics population completed successfully', + duration: Math.round((Date.now() - startTime) / 1000) + }; + } catch (error) { + const endTime = Date.now(); + const totalElapsedSeconds = Math.round((endTime - startTime) / 1000); + + // Enhanced error logging + console.error('Error details:', { + message: error.message, + code: error.code, + hint: error.hint, + position: error.position, + detail: error.detail, + where: error.where ? error.where.substring(0, 500) + '...' : undefined, // Truncate to avoid huge logs + severity: error.severity, + file: error.file, + line: error.line, + routine: error.routine + }); + + // Update history with error + if (connection && calculateHistoryId) { + await connection.query(` + UPDATE calculate_history + SET + end_time = NOW(), + duration_seconds = $1, + status = $2, + error_message = $3 + WHERE id = $4 + `, [ + totalElapsedSeconds, + isCancelled ? 'cancelled' : 'failed', + error.message, + calculateHistoryId + ]); + } + + if (isCancelled) { + global.outputProgress({ + status: 'cancelled', + operation: 'Calculation cancelled', + current: 50, + total: 100, + elapsed: global.formatElapsedTime(startTime), + remaining: null, + rate: 0, + percentage: '50', + timing: { + start_time: new Date(startTime).toISOString(), + end_time: new Date().toISOString(), + elapsed_seconds: totalElapsedSeconds + }, + historyId: calculateHistoryId + }); + } else { + global.outputProgress({ + status: 'error', + operation: 'Error during initial product metrics population', + message: error.message, + current: 0, + total: 100, + elapsed: global.formatElapsedTime(startTime), + remaining: null, + rate: 0, + percentage: '0', + timing: { + start_time: new Date(startTime).toISOString(), + end_time: new Date().toISOString(), + elapsed_seconds: totalElapsedSeconds + }, + historyId: calculateHistoryId + }); + } + + console.error('Error during initial product metrics population:', error); + return { + success: false, + error: error.message, + duration: totalElapsedSeconds + }; + } finally { + if (connection) { + connection.release(); + } + await closePool(); + } +} + +// Start population process +populateInitialMetrics() + .then(result => { + if (result.success) { + console.log(`Initial product metrics population completed successfully in ${result.duration} seconds`); + process.exit(0); + } else { + console.error(`Initial product metrics population failed: ${result.error}`); + process.exit(1); + } + }) + .catch(err => { + console.error('Unexpected error:', err); + process.exit(1); + }); \ No newline at end of file diff --git a/inventory-server/scripts/metrics-new/populate_initial_product_metrics.sql b/inventory-server/scripts/metrics-new/populate_initial_product_metrics.sql new file mode 100644 index 0000000..a1a3f41 --- /dev/null +++ b/inventory-server/scripts/metrics-new/populate_initial_product_metrics.sql @@ -0,0 +1,448 @@ +-- Description: Performs the first population OR full recalculation of the product_metrics table based on +-- historically backfilled daily_product_snapshots and current product/PO data. +-- Calculates all metrics considering the full available history up to 'yesterday'. +-- Run ONCE after backfill_historical_snapshots_final.sql completes successfully. +-- Dependencies: Core import tables (products, purchase_orders), daily_product_snapshots (historically populated), +-- configuration tables (settings_*), product_metrics table must exist. +-- Frequency: Run ONCE. +DO $$ +DECLARE + _module_name VARCHAR := 'product_metrics_population'; -- Generic name + _start_time TIMESTAMPTZ := clock_timestamp(); + -- Calculate metrics AS OF the end of the last fully completed day + _calculation_date DATE := CURRENT_DATE - INTERVAL '1 day'; +BEGIN + RAISE NOTICE 'Running % module. Calculating AS OF: %. Start Time: %', _module_name, _calculation_date, _start_time; + + -- Optional: Consider TRUNCATE if you want a completely fresh start, + -- otherwise ON CONFLICT will update existing rows if this is rerun. + -- TRUNCATE TABLE public.product_metrics; + RAISE NOTICE 'Populating product_metrics table. This may take some time...'; + + -- CTEs to gather necessary information AS OF _calculation_date + WITH CurrentInfo AS ( + -- Fetches current product details, including costs/prices used for forecasting & fallbacks + SELECT + p.pid, p.sku, p.title, p.brand, p.vendor, COALESCE(p.image_175, p.image) as image_url, + p.visible as is_visible, p.replenishable, + COALESCE(p.price, 0.00) as current_price, COALESCE(p.regular_price, 0.00) as current_regular_price, + COALESCE(p.cost_price, 0.00) as current_cost_price, + COALESCE(p.landing_cost_price, p.cost_price, 0.00) as current_effective_cost, -- Use landing if available, else cost + p.stock_quantity as current_stock, -- Use actual current stock for forecast base + p.created_at, p.first_received, p.date_last_sold, + p.moq, + p.uom + FROM public.products p + ), + OnOrderInfo AS ( + -- Calculates current on-order quantities and costs + SELECT + pid, + COALESCE(SUM(ordered - received), 0) AS on_order_qty, + COALESCE(SUM((ordered - received) * cost_price), 0.00) AS on_order_cost, + MIN(expected_date) AS earliest_expected_date + FROM public.purchase_orders + -- Use the most common statuses representing active, unfulfilled POs + WHERE status IN ('open', 'partially_received', 'ordered', 'preordered', 'receiving_started', 'electronically_sent', 'electronically_ready_send') + AND (ordered - received) > 0 + GROUP BY pid + ), + HistoricalDates AS ( + -- Determines key historical dates from orders and PO history (receiving_history) + SELECT + p.pid, + MIN(o.date)::date AS date_first_sold, + MAX(o.date)::date AS max_order_date, -- Used as fallback for date_last_sold + MIN(rh.first_receipt_date) AS date_first_received_calc, + MAX(rh.last_receipt_date) AS date_last_received_calc + FROM public.products p + LEFT JOIN public.orders o ON p.pid = o.pid AND o.quantity > 0 AND o.status NOT IN ('canceled', 'returned') + LEFT JOIN ( + SELECT + po.pid, + MIN((rh.item->>'received_at')::date) as first_receipt_date, + MAX((rh.item->>'received_at')::date) as last_receipt_date + FROM public.purchase_orders po + CROSS JOIN LATERAL jsonb_array_elements(po.receiving_history) AS rh(item) + WHERE jsonb_typeof(po.receiving_history) = 'array' AND jsonb_array_length(po.receiving_history) > 0 + GROUP BY po.pid + ) rh ON p.pid = rh.pid + GROUP BY p.pid + ), + SnapshotAggregates AS ( + -- Aggregates metrics from historical snapshots up to the _calculation_date + SELECT + pid, + -- Rolling periods relative to _calculation_date + SUM(CASE WHEN snapshot_date BETWEEN _calculation_date - INTERVAL '6 days' AND _calculation_date THEN units_sold ELSE 0 END) AS sales_7d, + SUM(CASE WHEN snapshot_date BETWEEN _calculation_date - INTERVAL '6 days' AND _calculation_date THEN net_revenue ELSE 0 END) AS revenue_7d, + SUM(CASE WHEN snapshot_date BETWEEN _calculation_date - INTERVAL '13 days' AND _calculation_date THEN units_sold ELSE 0 END) AS sales_14d, + SUM(CASE WHEN snapshot_date BETWEEN _calculation_date - INTERVAL '13 days' AND _calculation_date THEN net_revenue ELSE 0 END) AS revenue_14d, + SUM(CASE WHEN snapshot_date BETWEEN _calculation_date - INTERVAL '29 days' AND _calculation_date THEN units_sold ELSE 0 END) AS sales_30d, + SUM(CASE WHEN snapshot_date BETWEEN _calculation_date - INTERVAL '29 days' AND _calculation_date THEN net_revenue ELSE 0 END) AS revenue_30d, + SUM(CASE WHEN snapshot_date BETWEEN _calculation_date - INTERVAL '29 days' AND _calculation_date THEN cogs ELSE 0 END) AS cogs_30d, + SUM(CASE WHEN snapshot_date BETWEEN _calculation_date - INTERVAL '29 days' AND _calculation_date THEN profit ELSE 0 END) AS profit_30d, + SUM(CASE WHEN snapshot_date BETWEEN _calculation_date - INTERVAL '29 days' AND _calculation_date THEN units_returned ELSE 0 END) AS returns_units_30d, + SUM(CASE WHEN snapshot_date BETWEEN _calculation_date - INTERVAL '29 days' AND _calculation_date THEN returns_revenue ELSE 0 END) AS returns_revenue_30d, + SUM(CASE WHEN snapshot_date BETWEEN _calculation_date - INTERVAL '29 days' AND _calculation_date THEN discounts ELSE 0 END) AS discounts_30d, + SUM(CASE WHEN snapshot_date BETWEEN _calculation_date - INTERVAL '29 days' AND _calculation_date THEN gross_revenue ELSE 0 END) AS gross_revenue_30d, + SUM(CASE WHEN snapshot_date BETWEEN _calculation_date - INTERVAL '29 days' AND _calculation_date THEN gross_regular_revenue ELSE 0 END) AS gross_regular_revenue_30d, + SUM(CASE WHEN snapshot_date BETWEEN _calculation_date - INTERVAL '29 days' AND _calculation_date AND stockout_flag THEN 1 ELSE 0 END) AS stockout_days_30d, + -- Add 90-day aggregates if needed + SUM(CASE WHEN snapshot_date BETWEEN _calculation_date - INTERVAL '89 days' AND _calculation_date THEN units_sold ELSE 0 END) AS sales_90d, + SUM(CASE WHEN snapshot_date BETWEEN _calculation_date - INTERVAL '89 days' AND _calculation_date THEN net_revenue ELSE 0 END) AS revenue_90d, + SUM(CASE WHEN snapshot_date BETWEEN _calculation_date - INTERVAL '89 days' AND _calculation_date THEN cogs ELSE 0 END) AS cogs_90d, + SUM(CASE WHEN snapshot_date BETWEEN _calculation_date - INTERVAL '89 days' AND _calculation_date THEN profit ELSE 0 END) AS profit_90d, + -- Add 60-day aggregates if needed + SUM(CASE WHEN snapshot_date BETWEEN _calculation_date - INTERVAL '59 days' AND _calculation_date THEN units_sold ELSE 0 END) AS sales_60d, + SUM(CASE WHEN snapshot_date BETWEEN _calculation_date - INTERVAL '59 days' AND _calculation_date THEN net_revenue ELSE 0 END) AS revenue_60d, + SUM(CASE WHEN snapshot_date BETWEEN _calculation_date - INTERVAL '59 days' AND _calculation_date THEN cogs ELSE 0 END) AS cogs_60d, + SUM(CASE WHEN snapshot_date BETWEEN _calculation_date - INTERVAL '59 days' AND _calculation_date THEN profit ELSE 0 END) AS profit_60d, + SUM(CASE WHEN snapshot_date BETWEEN _calculation_date - INTERVAL '364 days' AND _calculation_date THEN units_sold ELSE 0 END) AS sales_365d, + SUM(CASE WHEN snapshot_date BETWEEN _calculation_date - INTERVAL '364 days' AND _calculation_date THEN net_revenue ELSE 0 END) AS revenue_365d, + SUM(CASE WHEN snapshot_date BETWEEN _calculation_date - INTERVAL '29 days' AND _calculation_date THEN units_received ELSE 0 END) AS received_qty_30d, + SUM(CASE WHEN snapshot_date BETWEEN _calculation_date - INTERVAL '29 days' AND _calculation_date THEN cost_received ELSE 0 END) AS received_cost_30d, + + -- Averages over the last 30 days ending _calculation_date + AVG(CASE WHEN snapshot_date BETWEEN _calculation_date - INTERVAL '29 days' AND _calculation_date THEN eod_stock_quantity END) AS avg_stock_units_30d, + AVG(CASE WHEN snapshot_date BETWEEN _calculation_date - INTERVAL '29 days' AND _calculation_date THEN eod_stock_cost END) AS avg_stock_cost_30d, + AVG(CASE WHEN snapshot_date BETWEEN _calculation_date - INTERVAL '29 days' AND _calculation_date THEN eod_stock_retail END) AS avg_stock_retail_30d, + AVG(CASE WHEN snapshot_date BETWEEN _calculation_date - INTERVAL '29 days' AND _calculation_date THEN eod_stock_gross END) AS avg_stock_gross_30d, + + -- Lifetime (Sum over ALL available snapshots up to calculation date) + SUM(units_sold) AS lifetime_sales, + SUM(net_revenue) AS lifetime_revenue, + + -- Yesterday (Sales for the specific _calculation_date) + SUM(CASE WHEN snapshot_date = _calculation_date THEN units_sold ELSE 0 END) as yesterday_sales + + FROM public.daily_product_snapshots + WHERE snapshot_date <= _calculation_date -- Ensure we only use data up to the calculation point + GROUP BY pid + ), + FirstPeriodMetrics AS ( + -- Calculates sales/revenue for first X days after first sale date + -- Uses HistoricalDates CTE to get the first sale date + SELECT + pid, date_first_sold, + SUM(CASE WHEN snapshot_date BETWEEN date_first_sold AND date_first_sold + INTERVAL '6 days' THEN units_sold ELSE 0 END) AS first_7_days_sales, + SUM(CASE WHEN snapshot_date BETWEEN date_first_sold AND date_first_sold + INTERVAL '6 days' THEN net_revenue ELSE 0 END) AS first_7_days_revenue, + SUM(CASE WHEN snapshot_date BETWEEN date_first_sold AND date_first_sold + INTERVAL '29 days' THEN units_sold ELSE 0 END) AS first_30_days_sales, + SUM(CASE WHEN snapshot_date BETWEEN date_first_sold AND date_first_sold + INTERVAL '29 days' THEN net_revenue ELSE 0 END) AS first_30_days_revenue, + SUM(CASE WHEN snapshot_date BETWEEN date_first_sold AND date_first_sold + INTERVAL '59 days' THEN units_sold ELSE 0 END) AS first_60_days_sales, + SUM(CASE WHEN snapshot_date BETWEEN date_first_sold AND date_first_sold + INTERVAL '59 days' THEN net_revenue ELSE 0 END) AS first_60_days_revenue, + SUM(CASE WHEN snapshot_date BETWEEN date_first_sold AND date_first_sold + INTERVAL '89 days' THEN units_sold ELSE 0 END) AS first_90_days_sales, + SUM(CASE WHEN snapshot_date BETWEEN date_first_sold AND date_first_sold + INTERVAL '89 days' THEN net_revenue ELSE 0 END) AS first_90_days_revenue + FROM public.daily_product_snapshots ds + JOIN HistoricalDates hd USING(pid) + WHERE date_first_sold IS NOT NULL + AND snapshot_date >= date_first_sold -- Only consider snapshots after first sale + AND snapshot_date <= _calculation_date -- Only up to the overall calculation date + GROUP BY pid, date_first_sold + ), + Settings AS ( + -- Fetches effective configuration settings (Product > Vendor > Global) + SELECT + p.pid, + COALESCE(sp.lead_time_days, sv.default_lead_time_days, (SELECT setting_value FROM settings_global WHERE setting_key = 'default_lead_time_days')::int, 14) AS effective_lead_time, + COALESCE(sp.days_of_stock, sv.default_days_of_stock, (SELECT setting_value FROM settings_global WHERE setting_key = 'default_days_of_stock')::int, 30) AS effective_days_of_stock, + COALESCE(sp.safety_stock, (SELECT setting_value::int FROM settings_global WHERE setting_key = 'default_safety_stock_units'), 0) AS effective_safety_stock, + COALESCE(sp.exclude_from_forecast, FALSE) AS exclude_forecast + FROM public.products p + LEFT JOIN public.settings_product sp ON p.pid = sp.pid + LEFT JOIN public.settings_vendor sv ON p.vendor = sv.vendor + ), + AvgLeadTime AS ( + -- Calculate Average Lead Time from historical POs + SELECT + pid, + AVG(GREATEST(1, + CASE + WHEN last_received_date IS NOT NULL AND date IS NOT NULL + THEN (last_received_date::date - date::date) + ELSE 1 + END + ))::int AS avg_lead_time_days_calc + FROM public.purchase_orders + WHERE status = 'received' -- Assumes 'received' marks full receipt + AND last_received_date IS NOT NULL + AND date IS NOT NULL + AND last_received_date >= date + GROUP BY pid + ), + RankedForABC AS ( + -- Ranks products based on the configured ABC metric (using historical data) + SELECT + p.pid, + CASE COALESCE((SELECT setting_value FROM settings_global WHERE setting_key = 'abc_calculation_basis'), 'revenue_30d') + WHEN 'sales_30d' THEN COALESCE(sa.sales_30d, 0) + WHEN 'lifetime_revenue' THEN COALESCE(sa.lifetime_revenue, 0)::numeric + ELSE COALESCE(sa.revenue_30d, 0) -- Default to revenue_30d + END AS metric_value + FROM public.products p -- Use products as the base + JOIN SnapshotAggregates sa ON p.pid = sa.pid + WHERE p.replenishable = TRUE -- Only rank replenishable products + AND (CASE COALESCE((SELECT setting_value FROM settings_global WHERE setting_key = 'abc_calculation_basis'), 'revenue_30d') + WHEN 'sales_30d' THEN COALESCE(sa.sales_30d, 0) + WHEN 'lifetime_revenue' THEN COALESCE(sa.lifetime_revenue, 0)::numeric + ELSE COALESCE(sa.revenue_30d, 0) + END) > 0 -- Exclude zero-value products from ranking + ), + CumulativeABC AS ( + -- Calculates cumulative metric values for ABC ranking + SELECT + pid, metric_value, + SUM(metric_value) OVER (ORDER BY metric_value DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as cumulative_metric, + SUM(metric_value) OVER () as total_metric + FROM RankedForABC + ), + FinalABC AS ( + -- Assigns A, B, or C class based on thresholds + SELECT + pid, + CASE + WHEN cumulative_metric / NULLIF(total_metric, 0) <= COALESCE((SELECT setting_value::numeric FROM settings_global WHERE setting_key = 'abc_revenue_threshold_a'), 0.8) THEN 'A'::char(1) + WHEN cumulative_metric / NULLIF(total_metric, 0) <= COALESCE((SELECT setting_value::numeric FROM settings_global WHERE setting_key = 'abc_revenue_threshold_b'), 0.95) THEN 'B'::char(1) + ELSE 'C'::char(1) + END AS abc_class_calc + FROM CumulativeABC + ) + -- Final INSERT/UPDATE statement using all the prepared CTEs + INSERT INTO public.product_metrics ( + pid, last_calculated, sku, title, brand, vendor, image_url, is_visible, is_replenishable, + current_price, current_regular_price, current_cost_price, current_landing_cost_price, + current_stock, current_stock_cost, current_stock_retail, current_stock_gross, + on_order_qty, on_order_cost, on_order_retail, earliest_expected_date, + date_created, date_first_received, date_last_received, date_first_sold, date_last_sold, age_days, + sales_7d, revenue_7d, sales_14d, revenue_14d, sales_30d, revenue_30d, cogs_30d, profit_30d, + sales_60d, revenue_60d, cogs_60d, profit_60d, + sales_90d, revenue_90d, cogs_90d, profit_90d, + returns_units_30d, returns_revenue_30d, discounts_30d, gross_revenue_30d, gross_regular_revenue_30d, + stockout_days_30d, sales_365d, revenue_365d, + avg_stock_units_30d, avg_stock_cost_30d, avg_stock_retail_30d, avg_stock_gross_30d, + received_qty_30d, received_cost_30d, + lifetime_sales, lifetime_revenue, + first_7_days_sales, first_7_days_revenue, first_30_days_sales, first_30_days_revenue, + first_60_days_sales, first_60_days_revenue, first_90_days_sales, first_90_days_revenue, + asp_30d, acp_30d, avg_ros_30d, avg_sales_per_day_30d, + margin_30d, markup_30d, gmroi_30d, stockturn_30d, return_rate_30d, discount_rate_30d, + stockout_rate_30d, markdown_30d, markdown_rate_30d, sell_through_30d, + avg_lead_time_days, abc_class, + sales_velocity_daily, config_lead_time, config_days_of_stock, config_safety_stock, + planning_period_days, lead_time_forecast_units, days_of_stock_forecast_units, + planning_period_forecast_units, lead_time_closing_stock, days_of_stock_closing_stock, + replenishment_needed_raw, replenishment_units, replenishment_cost, replenishment_retail, replenishment_profit, + to_order_units, forecast_lost_sales_units, forecast_lost_revenue, + stock_cover_in_days, po_cover_in_days, sells_out_in_days, replenish_date, + overstocked_units, overstocked_cost, overstocked_retail, is_old_stock, + yesterday_sales + ) + SELECT + -- Select columns in order, joining all CTEs by pid + ci.pid, _start_time, ci.sku, ci.title, ci.brand, ci.vendor, ci.image_url, ci.is_visible, ci.replenishable, + ci.current_price, ci.current_regular_price, ci.current_cost_price, ci.current_effective_cost, + ci.current_stock, (ci.current_stock * COALESCE(ci.current_effective_cost, 0.00))::numeric(12,2), (ci.current_stock * COALESCE(ci.current_price, 0.00))::numeric(12,2), (ci.current_stock * COALESCE(ci.current_regular_price, 0.00))::numeric(12,2), + COALESCE(ooi.on_order_qty, 0), COALESCE(ooi.on_order_cost, 0.00)::numeric(12,2), (COALESCE(ooi.on_order_qty, 0) * COALESCE(ci.current_price, 0.00))::numeric(12,2), ooi.earliest_expected_date, + + -- Fix type issue with date calculation - properly cast timestamps to dates before arithmetic + ci.created_at::date, + COALESCE(ci.first_received::date, hd.date_first_received_calc), + hd.date_last_received_calc, + hd.date_first_sold, + COALESCE(ci.date_last_sold, hd.max_order_date), + -- Fix timestamp + integer error by ensuring we work only with dates + CASE + WHEN LEAST(ci.created_at::date, COALESCE(hd.date_first_sold, ci.created_at::date)) IS NOT NULL + THEN (_calculation_date::date - LEAST(ci.created_at::date, COALESCE(hd.date_first_sold, ci.created_at::date)))::int + ELSE NULL + END, + + COALESCE(sa.sales_7d, 0), COALESCE(sa.revenue_7d, 0), COALESCE(sa.sales_14d, 0), COALESCE(sa.revenue_14d, 0), COALESCE(sa.sales_30d, 0), COALESCE(sa.revenue_30d, 0), COALESCE(sa.cogs_30d, 0), COALESCE(sa.profit_30d, 0), + COALESCE(sa.sales_60d, 0), COALESCE(sa.revenue_60d, 0), COALESCE(sa.cogs_60d, 0), COALESCE(sa.profit_60d, 0), + COALESCE(sa.sales_90d, 0), COALESCE(sa.revenue_90d, 0), COALESCE(sa.cogs_90d, 0), COALESCE(sa.profit_90d, 0), + COALESCE(sa.returns_units_30d, 0), COALESCE(sa.returns_revenue_30d, 0), COALESCE(sa.discounts_30d, 0), COALESCE(sa.gross_revenue_30d, 0), COALESCE(sa.gross_regular_revenue_30d, 0), + COALESCE(sa.stockout_days_30d, 0), COALESCE(sa.sales_365d, 0), COALESCE(sa.revenue_365d, 0), + sa.avg_stock_units_30d, sa.avg_stock_cost_30d, sa.avg_stock_retail_30d, sa.avg_stock_gross_30d, -- Averages can be NULL if no data + COALESCE(sa.received_qty_30d, 0), COALESCE(sa.received_cost_30d, 0), + COALESCE(sa.lifetime_sales, 0), COALESCE(sa.lifetime_revenue, 0), + fpm.first_7_days_sales, fpm.first_7_days_revenue, fpm.first_30_days_sales, fpm.first_30_days_revenue, + fpm.first_60_days_sales, fpm.first_60_days_revenue, fpm.first_90_days_sales, fpm.first_90_days_revenue, + + -- Calculated KPIs (using COALESCE on inputs where appropriate) + sa.revenue_30d / NULLIF(sa.sales_30d, 0) AS asp_30d, + sa.cogs_30d / NULLIF(sa.sales_30d, 0) AS acp_30d, + sa.profit_30d / NULLIF(sa.sales_30d, 0) AS avg_ros_30d, + COALESCE(sa.sales_30d, 0) / 30.0 AS avg_sales_per_day_30d, + + -- Fix for percentages - cast to numeric with appropriate precision + ((sa.profit_30d / NULLIF(sa.revenue_30d, 0)) * 100)::numeric(8,2) AS margin_30d, + ((sa.profit_30d / NULLIF(sa.cogs_30d, 0)) * 100)::numeric(8,2) AS markup_30d, + sa.profit_30d / NULLIF(sa.avg_stock_cost_30d, 0) AS gmroi_30d, + sa.sales_30d / NULLIF(sa.avg_stock_units_30d, 0) AS stockturn_30d, + ((sa.returns_units_30d / NULLIF(COALESCE(sa.sales_30d, 0) + COALESCE(sa.returns_units_30d, 0), 0)) * 100)::numeric(8,2) AS return_rate_30d, + ((sa.discounts_30d / NULLIF(sa.gross_revenue_30d, 0)) * 100)::numeric(8,2) AS discount_rate_30d, + ((COALESCE(sa.stockout_days_30d, 0) / 30.0) * 100)::numeric(8,2) AS stockout_rate_30d, + GREATEST(0, sa.gross_regular_revenue_30d - sa.gross_revenue_30d) AS markdown_30d, -- Ensure markdown isn't negative + ((GREATEST(0, sa.gross_regular_revenue_30d - sa.gross_revenue_30d) / NULLIF(sa.gross_regular_revenue_30d, 0)) * 100)::numeric(8,2) AS markdown_rate_30d, + -- Sell Through Rate: Sales / (Stock at end of period + Sales). This is one definition proxying for Sales / Beginning Stock. + ((sa.sales_30d / NULLIF( + (SELECT eod_stock_quantity FROM daily_product_snapshots WHERE snapshot_date = _calculation_date AND pid = ci.pid LIMIT 1) + COALESCE(sa.sales_30d, 0) + , 0)) * 100)::numeric(8,2) AS sell_through_30d, + + -- Use calculated periodic metrics + alt.avg_lead_time_days_calc, + CASE + WHEN ci.replenishable = FALSE THEN NULL -- Non-replenishable don't get a class + ELSE COALESCE(fa.abc_class_calc, 'C') -- Default ranked replenishable but non-contributing to C + END, + + -- Forecasting intermediate values (based on historical aggregates ending _calculation_date) + (COALESCE(sa.sales_30d, 0) / NULLIF(GREATEST(1.0, 30.0 - COALESCE(sa.stockout_days_30d, 0)), 0)) AS sales_velocity_daily, -- Ensure divisor > 0 + s.effective_lead_time AS config_lead_time, s.effective_days_of_stock AS config_days_of_stock, s.effective_safety_stock AS config_safety_stock, + (s.effective_lead_time + s.effective_days_of_stock) AS planning_period_days, + -- Calculate raw forecast need components (using safe velocity) + (COALESCE(sa.sales_30d, 0) / NULLIF(GREATEST(1.0, 30.0 - COALESCE(sa.stockout_days_30d, 0)), 0)) * s.effective_lead_time AS lead_time_forecast_units, + (COALESCE(sa.sales_30d, 0) / NULLIF(GREATEST(1.0, 30.0 - COALESCE(sa.stockout_days_30d, 0)), 0)) * s.effective_days_of_stock AS days_of_stock_forecast_units, + -- Planning period forecast units (sum of lead time and DOS units) + CEILING((COALESCE(sa.sales_30d, 0) / NULLIF(GREATEST(1.0, 30.0 - COALESCE(sa.stockout_days_30d, 0)), 0)) * s.effective_lead_time) + + CEILING((COALESCE(sa.sales_30d, 0) / NULLIF(GREATEST(1.0, 30.0 - COALESCE(sa.stockout_days_30d, 0)), 0)) * s.effective_days_of_stock) AS planning_period_forecast_units, + -- Closing stock calculations (using raw forecast components for accuracy before rounding) + (ci.current_stock + COALESCE(ooi.on_order_qty, 0) - ((COALESCE(sa.sales_30d, 0) / NULLIF(GREATEST(1.0, 30.0 - COALESCE(sa.stockout_days_30d, 0)), 0)) * s.effective_lead_time)) AS lead_time_closing_stock, + ((ci.current_stock + COALESCE(ooi.on_order_qty, 0) - ((COALESCE(sa.sales_30d, 0) / NULLIF(GREATEST(1.0, 30.0 - COALESCE(sa.stockout_days_30d, 0)), 0)) * s.effective_lead_time))) + - ((COALESCE(sa.sales_30d, 0) / NULLIF(GREATEST(1.0, 30.0 - COALESCE(sa.stockout_days_30d, 0)), 0)) * s.effective_days_of_stock) AS days_of_stock_closing_stock, + -- Raw replenishment needed + (CEILING((COALESCE(sa.sales_30d, 0) / NULLIF(GREATEST(1.0, 30.0 - COALESCE(sa.stockout_days_30d, 0)), 0)) * s.effective_lead_time) -- Use rounded forecast units + + CEILING((COALESCE(sa.sales_30d, 0) / NULLIF(GREATEST(1.0, 30.0 - COALESCE(sa.stockout_days_30d, 0)), 0)) * s.effective_days_of_stock)) + + s.effective_safety_stock - ci.current_stock - COALESCE(ooi.on_order_qty, 0) AS replenishment_needed_raw, + + -- Final Forecasting Metrics + -- Replenishment Units (calculated need, before MOQ) + CEILING(GREATEST(0, + (CEILING((COALESCE(sa.sales_30d, 0) / NULLIF(GREATEST(1.0, 30.0 - COALESCE(sa.stockout_days_30d, 0)), 0)) * s.effective_lead_time) + + CEILING((COALESCE(sa.sales_30d, 0) / NULLIF(GREATEST(1.0, 30.0 - COALESCE(sa.stockout_days_30d, 0)), 0)) * s.effective_days_of_stock)) + + s.effective_safety_stock - ci.current_stock - COALESCE(ooi.on_order_qty, 0) + ))::int AS replenishment_units, + -- Replenishment Cost/Retail/Profit (based on replenishment_units) + (CEILING(GREATEST(0, + (CEILING((COALESCE(sa.sales_30d, 0) / NULLIF(GREATEST(1.0, 30.0 - COALESCE(sa.stockout_days_30d, 0)), 0)) * s.effective_lead_time) + + CEILING((COALESCE(sa.sales_30d, 0) / NULLIF(GREATEST(1.0, 30.0 - COALESCE(sa.stockout_days_30d, 0)), 0)) * s.effective_days_of_stock)) + + s.effective_safety_stock - ci.current_stock - COALESCE(ooi.on_order_qty, 0) + ))::int) * COALESCE(ci.current_effective_cost, 0.00)::numeric(12,2) AS replenishment_cost, + (CEILING(GREATEST(0, + (CEILING((COALESCE(sa.sales_30d, 0) / NULLIF(GREATEST(1.0, 30.0 - COALESCE(sa.stockout_days_30d, 0)), 0)) * s.effective_lead_time) + + CEILING((COALESCE(sa.sales_30d, 0) / NULLIF(GREATEST(1.0, 30.0 - COALESCE(sa.stockout_days_30d, 0)), 0)) * s.effective_days_of_stock)) + + s.effective_safety_stock - ci.current_stock - COALESCE(ooi.on_order_qty, 0) + ))::int) * COALESCE(ci.current_price, 0.00)::numeric(12,2) AS replenishment_retail, + (CEILING(GREATEST(0, + (CEILING((COALESCE(sa.sales_30d, 0) / NULLIF(GREATEST(1.0, 30.0 - COALESCE(sa.stockout_days_30d, 0)), 0)) * s.effective_lead_time) + + CEILING((COALESCE(sa.sales_30d, 0) / NULLIF(GREATEST(1.0, 30.0 - COALESCE(sa.stockout_days_30d, 0)), 0)) * s.effective_days_of_stock)) + + s.effective_safety_stock - ci.current_stock - COALESCE(ooi.on_order_qty, 0) + ))::int) * (COALESCE(ci.current_price, 0.00) - COALESCE(ci.current_effective_cost, 0.00))::numeric(12,2) AS replenishment_profit, + + -- *** FIX: To Order Units (Apply MOQ rounding) *** + CASE + WHEN COALESCE(ci.moq, 0) <= 1 THEN -- Treat no/invalid MOQ or MOQ=1 as no rounding needed + CEILING(GREATEST(0, + (CEILING((COALESCE(sa.sales_30d, 0) / NULLIF(GREATEST(1.0, 30.0 - COALESCE(sa.stockout_days_30d, 0)), 0)) * s.effective_lead_time) + + CEILING((COALESCE(sa.sales_30d, 0) / NULLIF(GREATEST(1.0, 30.0 - COALESCE(sa.stockout_days_30d, 0)), 0)) * s.effective_days_of_stock)) + + s.effective_safety_stock - ci.current_stock - COALESCE(ooi.on_order_qty, 0) + ))::int + ELSE -- Apply MOQ rounding: Round UP to nearest multiple of MOQ + (CEILING(GREATEST(0, + (CEILING((COALESCE(sa.sales_30d, 0) / NULLIF(GREATEST(1.0, 30.0 - COALESCE(sa.stockout_days_30d, 0)), 0)) * s.effective_lead_time) + + CEILING((COALESCE(sa.sales_30d, 0) / NULLIF(GREATEST(1.0, 30.0 - COALESCE(sa.stockout_days_30d, 0)), 0)) * s.effective_days_of_stock)) + + s.effective_safety_stock - ci.current_stock - COALESCE(ooi.on_order_qty, 0) + ) / NULLIF(ci.moq::numeric, 0)) * COALESCE(ci.moq, 1))::int + END AS to_order_units, + + -- Forecast Lost Sales (Units occurring during lead time if current+on_order is insufficient) + CEILING(GREATEST(0, + ((COALESCE(sa.sales_30d, 0) / NULLIF(GREATEST(1.0, 30.0 - COALESCE(sa.stockout_days_30d, 0)), 0)) * s.effective_lead_time) -- Demand during lead time + - (ci.current_stock + COALESCE(ooi.on_order_qty, 0)) -- Supply available before order arrives + ))::int AS forecast_lost_sales_units, + -- Forecast Lost Revenue + (CEILING(GREATEST(0, + ((COALESCE(sa.sales_30d, 0) / NULLIF(GREATEST(1.0, 30.0 - COALESCE(sa.stockout_days_30d, 0)), 0)) * s.effective_lead_time) + - (ci.current_stock + COALESCE(ooi.on_order_qty, 0)) + ))::int) * COALESCE(ci.current_price, 0.00)::numeric(12,2) AS forecast_lost_revenue, + + -- Stock Cover etc (using safe velocity) + ci.current_stock / NULLIF((COALESCE(sa.sales_30d, 0) / NULLIF(GREATEST(1.0, 30.0 - COALESCE(sa.stockout_days_30d, 0)), 0)), 0) AS stock_cover_in_days, + COALESCE(ooi.on_order_qty, 0) / NULLIF((COALESCE(sa.sales_30d, 0) / NULLIF(GREATEST(1.0, 30.0 - COALESCE(sa.stockout_days_30d, 0)), 0)), 0) AS po_cover_in_days, + (ci.current_stock + COALESCE(ooi.on_order_qty, 0)) / NULLIF((COALESCE(sa.sales_30d, 0) / NULLIF(GREATEST(1.0, 30.0 - COALESCE(sa.stockout_days_30d, 0)), 0)), 0) AS sells_out_in_days, + -- Replenish Date (Project forward from 'today', which is _calculation_date + 1 day) + CASE + WHEN (COALESCE(sa.sales_30d, 0) / NULLIF(GREATEST(1.0, 30.0 - COALESCE(sa.stockout_days_30d, 0)), 0)) > 0 -- Check for positive velocity + THEN + _calculation_date + INTERVAL '1 day' -- Today + + FLOOR(GREATEST(0, ci.current_stock - s.effective_safety_stock) -- Stock above safety + / (COALESCE(sa.sales_30d, 0) / NULLIF(GREATEST(1.0, 30.0 - COALESCE(sa.stockout_days_30d, 0)), 0)) -- divided by velocity + )::integer * INTERVAL '1 day' -- Gives date safety stock is hit + - s.effective_lead_time * INTERVAL '1 day' -- Subtract lead time + ELSE NULL -- Cannot calculate if no sales velocity + END AS replenish_date, + -- Overstocked Units (Stock above safety + planning period demand) + GREATEST(0, ci.current_stock - s.effective_safety_stock - + (CEILING((COALESCE(sa.sales_30d, 0) / NULLIF(GREATEST(1.0, 30.0 - COALESCE(sa.stockout_days_30d, 0)), 0)) * s.effective_lead_time) -- Demand during lead time + + CEILING((COALESCE(sa.sales_30d, 0) / NULLIF(GREATEST(1.0, 30.0 - COALESCE(sa.stockout_days_30d, 0)), 0)) * s.effective_days_of_stock)) -- Demand during DOS + )::int AS overstocked_units, + (GREATEST(0, ci.current_stock - s.effective_safety_stock - + (CEILING((COALESCE(sa.sales_30d, 0) / NULLIF(GREATEST(1.0, 30.0 - COALESCE(sa.stockout_days_30d, 0)), 0)) * s.effective_lead_time) + + CEILING((COALESCE(sa.sales_30d, 0) / NULLIF(GREATEST(1.0, 30.0 - COALESCE(sa.stockout_days_30d, 0)), 0)) * s.effective_days_of_stock)) + )::int) * COALESCE(ci.current_effective_cost, 0.00)::numeric(12,2) AS overstocked_cost, + (GREATEST(0, ci.current_stock - s.effective_safety_stock - + (CEILING((COALESCE(sa.sales_30d, 0) / NULLIF(GREATEST(1.0, 30.0 - COALESCE(sa.stockout_days_30d, 0)), 0)) * s.effective_lead_time) + + CEILING((COALESCE(sa.sales_30d, 0) / NULLIF(GREATEST(1.0, 30.0 - COALESCE(sa.stockout_days_30d, 0)), 0)) * s.effective_days_of_stock)) + )::int) * COALESCE(ci.current_price, 0.00)::numeric(12,2) AS overstocked_retail, + -- Old Stock Flag + (ci.created_at::date < (_calculation_date - INTERVAL '60 day')::date) AND + (COALESCE(ci.date_last_sold, hd.max_order_date) IS NULL OR COALESCE(ci.date_last_sold, hd.max_order_date) < (_calculation_date - INTERVAL '60 day')::date) AND + (hd.date_last_received_calc IS NULL OR hd.date_last_received_calc < (_calculation_date - INTERVAL '60 day')::date) AND + COALESCE(ooi.on_order_qty, 0) = 0 AS is_old_stock, + COALESCE(sa.yesterday_sales, 0) -- Sales for _calculation_date + + FROM CurrentInfo ci + LEFT JOIN OnOrderInfo ooi ON ci.pid = ooi.pid + LEFT JOIN HistoricalDates hd ON ci.pid = hd.pid + LEFT JOIN SnapshotAggregates sa ON ci.pid = sa.pid + LEFT JOIN FirstPeriodMetrics fpm ON ci.pid = fpm.pid + LEFT JOIN Settings s ON ci.pid = s.pid + LEFT JOIN AvgLeadTime alt ON ci.pid = alt.pid -- Join calculated avg lead time + LEFT JOIN FinalABC fa ON ci.pid = fa.pid -- Join calculated ABC class + WHERE s.exclude_forecast IS FALSE OR s.exclude_forecast IS NULL + + ON CONFLICT (pid) DO UPDATE SET + -- *** IMPORTANT: List ALL columns here, ensuring order matches INSERT list *** + -- Update ALL columns to ensure entire row is refreshed + last_calculated = EXCLUDED.last_calculated, sku = EXCLUDED.sku, title = EXCLUDED.title, brand = EXCLUDED.brand, vendor = EXCLUDED.vendor, image_url = EXCLUDED.image_url, is_visible = EXCLUDED.is_visible, is_replenishable = EXCLUDED.is_replenishable, + current_price = EXCLUDED.current_price, current_regular_price = EXCLUDED.current_regular_price, current_cost_price = EXCLUDED.current_cost_price, current_landing_cost_price = EXCLUDED.current_landing_cost_price, + current_stock = EXCLUDED.current_stock, current_stock_cost = EXCLUDED.current_stock_cost, current_stock_retail = EXCLUDED.current_stock_retail, current_stock_gross = EXCLUDED.current_stock_gross, + on_order_qty = EXCLUDED.on_order_qty, on_order_cost = EXCLUDED.on_order_cost, on_order_retail = EXCLUDED.on_order_retail, earliest_expected_date = EXCLUDED.earliest_expected_date, + date_created = EXCLUDED.date_created, date_first_received = EXCLUDED.date_first_received, date_last_received = EXCLUDED.date_last_received, date_first_sold = EXCLUDED.date_first_sold, date_last_sold = EXCLUDED.date_last_sold, age_days = EXCLUDED.age_days, + sales_7d = EXCLUDED.sales_7d, revenue_7d = EXCLUDED.revenue_7d, sales_14d = EXCLUDED.sales_14d, revenue_14d = EXCLUDED.revenue_14d, sales_30d = EXCLUDED.sales_30d, revenue_30d = EXCLUDED.revenue_30d, cogs_30d = EXCLUDED.cogs_30d, profit_30d = EXCLUDED.profit_30d, + -- Add 60d/90d columns + sales_60d = EXCLUDED.sales_60d, revenue_60d = EXCLUDED.revenue_60d, cogs_60d = EXCLUDED.cogs_60d, profit_60d = EXCLUDED.profit_60d, + sales_90d = EXCLUDED.sales_90d, revenue_90d = EXCLUDED.revenue_90d, cogs_90d = EXCLUDED.cogs_90d, profit_90d = EXCLUDED.profit_90d, + returns_units_30d = EXCLUDED.returns_units_30d, returns_revenue_30d = EXCLUDED.returns_revenue_30d, discounts_30d = EXCLUDED.discounts_30d, gross_revenue_30d = EXCLUDED.gross_revenue_30d, gross_regular_revenue_30d = EXCLUDED.gross_regular_revenue_30d, + stockout_days_30d = EXCLUDED.stockout_days_30d, sales_365d = EXCLUDED.sales_365d, revenue_365d = EXCLUDED.revenue_365d, + avg_stock_units_30d = EXCLUDED.avg_stock_units_30d, avg_stock_cost_30d = EXCLUDED.avg_stock_cost_30d, avg_stock_retail_30d = EXCLUDED.avg_stock_retail_30d, avg_stock_gross_30d = EXCLUDED.avg_stock_gross_30d, + received_qty_30d = EXCLUDED.received_qty_30d, received_cost_30d = EXCLUDED.received_cost_30d, + lifetime_sales = EXCLUDED.lifetime_sales, lifetime_revenue = EXCLUDED.lifetime_revenue, + first_7_days_sales = EXCLUDED.first_7_days_sales, first_7_days_revenue = EXCLUDED.first_7_days_revenue, first_30_days_sales = EXCLUDED.first_30_days_sales, first_30_days_revenue = EXCLUDED.first_30_days_revenue, + first_60_days_sales = EXCLUDED.first_60_days_sales, first_60_days_revenue = EXCLUDED.first_60_days_revenue, first_90_days_sales = EXCLUDED.first_90_days_sales, first_90_days_revenue = EXCLUDED.first_90_days_revenue, + asp_30d = EXCLUDED.asp_30d, acp_30d = EXCLUDED.acp_30d, avg_ros_30d = EXCLUDED.avg_ros_30d, avg_sales_per_day_30d = EXCLUDED.avg_sales_per_day_30d, + -- *** REMOVED avg_sales_per_month_30d *** + margin_30d = EXCLUDED.margin_30d, markup_30d = EXCLUDED.markup_30d, gmroi_30d = EXCLUDED.gmroi_30d, stockturn_30d = EXCLUDED.stockturn_30d, return_rate_30d = EXCLUDED.return_rate_30d, discount_rate_30d = EXCLUDED.discount_rate_30d, + stockout_rate_30d = EXCLUDED.stockout_rate_30d, markdown_30d = EXCLUDED.markdown_30d, markdown_rate_30d = EXCLUDED.markdown_rate_30d, sell_through_30d = EXCLUDED.sell_through_30d, + avg_lead_time_days = EXCLUDED.avg_lead_time_days, abc_class = EXCLUDED.abc_class, + sales_velocity_daily = EXCLUDED.sales_velocity_daily, config_lead_time = EXCLUDED.config_lead_time, config_days_of_stock = EXCLUDED.config_days_of_stock, config_safety_stock = EXCLUDED.config_safety_stock, + planning_period_days = EXCLUDED.planning_period_days, lead_time_forecast_units = EXCLUDED.lead_time_forecast_units, days_of_stock_forecast_units = EXCLUDED.days_of_stock_forecast_units, + planning_period_forecast_units = EXCLUDED.planning_period_forecast_units, lead_time_closing_stock = EXCLUDED.lead_time_closing_stock, days_of_stock_closing_stock = EXCLUDED.days_of_stock_closing_stock, + replenishment_needed_raw = EXCLUDED.replenishment_needed_raw, replenishment_units = EXCLUDED.replenishment_units, replenishment_cost = EXCLUDED.replenishment_cost, replenishment_retail = EXCLUDED.replenishment_retail, replenishment_profit = EXCLUDED.replenishment_profit, + to_order_units = EXCLUDED.to_order_units, -- *** Update to use EXCLUDED *** + forecast_lost_sales_units = EXCLUDED.forecast_lost_sales_units, forecast_lost_revenue = EXCLUDED.forecast_lost_revenue, + stock_cover_in_days = EXCLUDED.stock_cover_in_days, po_cover_in_days = EXCLUDED.po_cover_in_days, sells_out_in_days = EXCLUDED.sells_out_in_days, replenish_date = EXCLUDED.replenish_date, + overstocked_units = EXCLUDED.overstocked_units, overstocked_cost = EXCLUDED.overstocked_cost, overstocked_retail = EXCLUDED.overstocked_retail, is_old_stock = EXCLUDED.is_old_stock, + yesterday_sales = EXCLUDED.yesterday_sales; + RAISE NOTICE 'Finished % module. Duration: %', _module_name, clock_timestamp() - _start_time; +END $$; \ No newline at end of file diff --git a/inventory-server/scripts/metrics-new/update-daily-snapshots.js b/inventory-server/scripts/metrics-new/update-daily-snapshots.js deleted file mode 100644 index 44c84b5..0000000 --- a/inventory-server/scripts/metrics-new/update-daily-snapshots.js +++ /dev/null @@ -1,306 +0,0 @@ -const path = require('path'); -const fs = require('fs'); - -// Change working directory to script directory -process.chdir(path.dirname(__filename)); - -// Load environment variables -require('dotenv').config({ path: path.resolve(__dirname, '../..', '.env') }); - -// Add error handler for uncaught exceptions -process.on('uncaughtException', (error) => { - console.error('Uncaught Exception:', error); - process.exit(1); -}); - -// Add error handler for unhandled promise rejections -process.on('unhandledRejection', (reason, promise) => { - console.error('Unhandled Rejection at:', promise, 'reason:', reason); - process.exit(1); -}); - -// Load progress module -const progress = require('./utils/progress'); - -// Store progress functions in global scope to ensure availability -global.formatElapsedTime = progress.formatElapsedTime; -global.estimateRemaining = progress.estimateRemaining; -global.calculateRate = progress.calculateRate; -global.outputProgress = progress.outputProgress; -global.clearProgress = progress.clearProgress; -global.getProgress = progress.getProgress; -global.logError = progress.logError; - -// Load database module -const { getConnection, closePool } = require('./utils/db'); - -// Add cancel handler -let isCancelled = false; - -function cancelCalculation() { - isCancelled = true; - console.log('Calculation has been cancelled by user'); - - // Force-terminate any query that's been running for more than 5 seconds - try { - const connection = getConnection(); - connection.then(async (conn) => { - try { - // Identify and terminate long-running queries from our application - await conn.query(` - SELECT pg_cancel_backend(pid) - FROM pg_stat_activity - WHERE query_start < now() - interval '5 seconds' - AND application_name LIKE '%node%' - AND query NOT LIKE '%pg_cancel_backend%' - `); - - // Release connection - conn.release(); - } catch (err) { - console.error('Error during force cancellation:', err); - conn.release(); - } - }).catch(err => { - console.error('Could not get connection for cancellation:', err); - }); - } catch (err) { - console.error('Failed to terminate running queries:', err); - } - - return { - success: true, - message: 'Calculation has been cancelled' - }; -} - -// Handle SIGTERM signal for cancellation -process.on('SIGTERM', cancelCalculation); - -async function updateDailySnapshots() { - let connection; - const startTime = Date.now(); - let calculateHistoryId; - - // Set a maximum execution time (30 minutes) - const MAX_EXECUTION_TIME = 30 * 60 * 1000; - const timeout = setTimeout(() => { - console.error(`Calculation timed out after ${MAX_EXECUTION_TIME/1000} seconds, forcing termination`); - // Call cancel and force exit - cancelCalculation(); - process.exit(1); - }, MAX_EXECUTION_TIME); - - try { - // Read the SQL file - const sqlFilePath = path.resolve(__dirname, 'update_daily_snapshots.sql'); - const sqlQuery = fs.readFileSync(sqlFilePath, 'utf8'); - - // Clean up any previously running calculations - connection = await getConnection(); - - // Ensure the calculate_status table exists and has the correct structure - await connection.query(` - CREATE TABLE IF NOT EXISTS calculate_status ( - module_name TEXT PRIMARY KEY, - last_calculation_timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP - ) - `); - - await connection.query(` - UPDATE calculate_history - SET - status = 'cancelled', - end_time = NOW(), - duration_seconds = EXTRACT(EPOCH FROM (NOW() - start_time))::INTEGER, - error_message = 'Previous calculation was not completed properly' - WHERE status = 'running' AND additional_info->>'type' = 'daily_snapshots' - `); - - // Create history record for this calculation - const historyResult = await connection.query(` - INSERT INTO calculate_history ( - start_time, - status, - additional_info - ) VALUES ( - NOW(), - 'running', - jsonb_build_object( - 'type', 'daily_snapshots', - 'sql_file', 'update_daily_snapshots.sql' - ) - ) RETURNING id - `); - calculateHistoryId = historyResult.rows[0].id; - - // Initialize progress - global.outputProgress({ - status: 'running', - operation: 'Starting daily snapshots calculation', - current: 0, - total: 100, - elapsed: '0s', - remaining: 'Calculating...', - rate: 0, - percentage: '0', - timing: { - start_time: new Date(startTime).toISOString(), - end_time: new Date().toISOString(), - elapsed_seconds: Math.round((Date.now() - startTime) / 1000) - } - }); - - // Execute the SQL query - global.outputProgress({ - status: 'running', - operation: 'Executing daily snapshots SQL query', - current: 25, - total: 100, - elapsed: global.formatElapsedTime(startTime), - remaining: 'Calculating...', - rate: 0, - percentage: '25', - timing: { - start_time: new Date(startTime).toISOString(), - end_time: new Date().toISOString(), - elapsed_seconds: Math.round((Date.now() - startTime) / 1000) - } - }); - - await connection.query(sqlQuery); - - // Update calculate_status table - await connection.query(` - INSERT INTO calculate_status (module_name, last_calculation_timestamp) - VALUES ($1, $2) - ON CONFLICT (module_name) DO UPDATE - SET last_calculation_timestamp = EXCLUDED.last_calculation_timestamp - `, ['daily_snapshots', new Date()]); - - // Update progress to 100% - global.outputProgress({ - status: 'complete', - operation: 'Daily snapshots calculation complete', - current: 100, - total: 100, - elapsed: global.formatElapsedTime(startTime), - remaining: '0s', - rate: 0, - percentage: '100', - timing: { - start_time: new Date(startTime).toISOString(), - end_time: new Date().toISOString(), - elapsed_seconds: Math.round((Date.now() - startTime) / 1000) - } - }); - - // Update history with completion - await connection.query(` - UPDATE calculate_history - SET - end_time = NOW(), - duration_seconds = $1, - status = 'completed' - WHERE id = $2 - `, [Math.round((Date.now() - startTime) / 1000), calculateHistoryId]); - - // Clear progress file on successful completion - global.clearProgress(); - - return { - success: true, - message: 'Daily snapshots calculation completed successfully', - duration: Math.round((Date.now() - startTime) / 1000) - }; - } catch (error) { - const endTime = Date.now(); - const totalElapsedSeconds = Math.round((endTime - startTime) / 1000); - - // Update history with error - if (connection && calculateHistoryId) { - await connection.query(` - UPDATE calculate_history - SET - end_time = NOW(), - duration_seconds = $1, - status = $2, - error_message = $3 - WHERE id = $4 - `, [ - totalElapsedSeconds, - isCancelled ? 'cancelled' : 'failed', - error.message, - calculateHistoryId - ]); - } - - if (isCancelled) { - global.outputProgress({ - status: 'cancelled', - operation: 'Calculation cancelled', - current: 50, - total: 100, - elapsed: global.formatElapsedTime(startTime), - remaining: null, - rate: 0, - percentage: '50', - timing: { - start_time: new Date(startTime).toISOString(), - end_time: new Date().toISOString(), - elapsed_seconds: Math.round((Date.now() - startTime) / 1000) - } - }); - } else { - global.outputProgress({ - status: 'error', - operation: 'Error: ' + error.message, - current: 50, - total: 100, - elapsed: global.formatElapsedTime(startTime), - remaining: null, - rate: 0, - percentage: '50', - timing: { - start_time: new Date(startTime).toISOString(), - end_time: new Date().toISOString(), - elapsed_seconds: Math.round((Date.now() - startTime) / 1000) - } - }); - } - throw error; - } finally { - // Clear the timeout to prevent forced termination - clearTimeout(timeout); - - // Always release connection - if (connection) { - try { - connection.release(); - } catch (err) { - console.error('Error in final cleanup:', err); - } - } - } -} - -// Export as a module with all necessary functions -module.exports = { - updateDailySnapshots, - cancelCalculation, - getProgress: global.getProgress -}; - -// Run directly if called from command line -if (require.main === module) { - updateDailySnapshots().then(() => { - closePool().then(() => { - process.exit(0); - }); - }).catch(error => { - console.error('Error:', error); - closePool().then(() => { - process.exit(1); - }); - }); -} \ No newline at end of file diff --git a/inventory-server/scripts/metrics-new/update-periodic-metrics.js b/inventory-server/scripts/metrics-new/update-periodic-metrics.js deleted file mode 100644 index 33617a7..0000000 --- a/inventory-server/scripts/metrics-new/update-periodic-metrics.js +++ /dev/null @@ -1,306 +0,0 @@ -const path = require('path'); -const fs = require('fs'); - -// Change working directory to script directory -process.chdir(path.dirname(__filename)); - -// Load environment variables -require('dotenv').config({ path: path.resolve(__dirname, '../..', '.env') }); - -// Add error handler for uncaught exceptions -process.on('uncaughtException', (error) => { - console.error('Uncaught Exception:', error); - process.exit(1); -}); - -// Add error handler for unhandled promise rejections -process.on('unhandledRejection', (reason, promise) => { - console.error('Unhandled Rejection at:', promise, 'reason:', reason); - process.exit(1); -}); - -// Load progress module -const progress = require('./utils/progress'); - -// Store progress functions in global scope to ensure availability -global.formatElapsedTime = progress.formatElapsedTime; -global.estimateRemaining = progress.estimateRemaining; -global.calculateRate = progress.calculateRate; -global.outputProgress = progress.outputProgress; -global.clearProgress = progress.clearProgress; -global.getProgress = progress.getProgress; -global.logError = progress.logError; - -// Load database module -const { getConnection, closePool } = require('./utils/db'); - -// Add cancel handler -let isCancelled = false; - -function cancelCalculation() { - isCancelled = true; - console.log('Calculation has been cancelled by user'); - - // Force-terminate any query that's been running for more than 5 seconds - try { - const connection = getConnection(); - connection.then(async (conn) => { - try { - // Identify and terminate long-running queries from our application - await conn.query(` - SELECT pg_cancel_backend(pid) - FROM pg_stat_activity - WHERE query_start < now() - interval '5 seconds' - AND application_name LIKE '%node%' - AND query NOT LIKE '%pg_cancel_backend%' - `); - - // Release connection - conn.release(); - } catch (err) { - console.error('Error during force cancellation:', err); - conn.release(); - } - }).catch(err => { - console.error('Could not get connection for cancellation:', err); - }); - } catch (err) { - console.error('Failed to terminate running queries:', err); - } - - return { - success: true, - message: 'Calculation has been cancelled' - }; -} - -// Handle SIGTERM signal for cancellation -process.on('SIGTERM', cancelCalculation); - -async function updatePeriodicMetrics() { - let connection; - const startTime = Date.now(); - let calculateHistoryId; - - // Set a maximum execution time (30 minutes) - const MAX_EXECUTION_TIME = 30 * 60 * 1000; - const timeout = setTimeout(() => { - console.error(`Calculation timed out after ${MAX_EXECUTION_TIME/1000} seconds, forcing termination`); - // Call cancel and force exit - cancelCalculation(); - process.exit(1); - }, MAX_EXECUTION_TIME); - - try { - // Read the SQL file - const sqlFilePath = path.resolve(__dirname, 'update_periodic_metrics.sql'); - const sqlQuery = fs.readFileSync(sqlFilePath, 'utf8'); - - // Clean up any previously running calculations - connection = await getConnection(); - - // Ensure the calculate_status table exists and has the correct structure - await connection.query(` - CREATE TABLE IF NOT EXISTS calculate_status ( - module_name TEXT PRIMARY KEY, - last_calculation_timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP - ) - `); - - await connection.query(` - UPDATE calculate_history - SET - status = 'cancelled', - end_time = NOW(), - duration_seconds = EXTRACT(EPOCH FROM (NOW() - start_time))::INTEGER, - error_message = 'Previous calculation was not completed properly' - WHERE status = 'running' AND additional_info->>'type' = 'periodic_metrics' - `); - - // Create history record for this calculation - const historyResult = await connection.query(` - INSERT INTO calculate_history ( - start_time, - status, - additional_info - ) VALUES ( - NOW(), - 'running', - jsonb_build_object( - 'type', 'periodic_metrics', - 'sql_file', 'update_periodic_metrics.sql' - ) - ) RETURNING id - `); - calculateHistoryId = historyResult.rows[0].id; - - // Initialize progress - global.outputProgress({ - status: 'running', - operation: 'Starting periodic metrics calculation', - current: 0, - total: 100, - elapsed: '0s', - remaining: 'Calculating...', - rate: 0, - percentage: '0', - timing: { - start_time: new Date(startTime).toISOString(), - end_time: new Date().toISOString(), - elapsed_seconds: Math.round((Date.now() - startTime) / 1000) - } - }); - - // Execute the SQL query - global.outputProgress({ - status: 'running', - operation: 'Executing periodic metrics SQL query', - current: 25, - total: 100, - elapsed: global.formatElapsedTime(startTime), - remaining: 'Calculating...', - rate: 0, - percentage: '25', - timing: { - start_time: new Date(startTime).toISOString(), - end_time: new Date().toISOString(), - elapsed_seconds: Math.round((Date.now() - startTime) / 1000) - } - }); - - await connection.query(sqlQuery); - - // Update calculate_status table - await connection.query(` - INSERT INTO calculate_status (module_name, last_calculation_timestamp) - VALUES ($1, $2) - ON CONFLICT (module_name) DO UPDATE - SET last_calculation_timestamp = EXCLUDED.last_calculation_timestamp - `, ['periodic_metrics', new Date()]); - - // Update progress to 100% - global.outputProgress({ - status: 'complete', - operation: 'Periodic metrics calculation complete', - current: 100, - total: 100, - elapsed: global.formatElapsedTime(startTime), - remaining: '0s', - rate: 0, - percentage: '100', - timing: { - start_time: new Date(startTime).toISOString(), - end_time: new Date().toISOString(), - elapsed_seconds: Math.round((Date.now() - startTime) / 1000) - } - }); - - // Update history with completion - await connection.query(` - UPDATE calculate_history - SET - end_time = NOW(), - duration_seconds = $1, - status = 'completed' - WHERE id = $2 - `, [Math.round((Date.now() - startTime) / 1000), calculateHistoryId]); - - // Clear progress file on successful completion - global.clearProgress(); - - return { - success: true, - message: 'Periodic metrics calculation completed successfully', - duration: Math.round((Date.now() - startTime) / 1000) - }; - } catch (error) { - const endTime = Date.now(); - const totalElapsedSeconds = Math.round((endTime - startTime) / 1000); - - // Update history with error - if (connection && calculateHistoryId) { - await connection.query(` - UPDATE calculate_history - SET - end_time = NOW(), - duration_seconds = $1, - status = $2, - error_message = $3 - WHERE id = $4 - `, [ - totalElapsedSeconds, - isCancelled ? 'cancelled' : 'failed', - error.message, - calculateHistoryId - ]); - } - - if (isCancelled) { - global.outputProgress({ - status: 'cancelled', - operation: 'Calculation cancelled', - current: 50, - total: 100, - elapsed: global.formatElapsedTime(startTime), - remaining: null, - rate: 0, - percentage: '50', - timing: { - start_time: new Date(startTime).toISOString(), - end_time: new Date().toISOString(), - elapsed_seconds: Math.round((Date.now() - startTime) / 1000) - } - }); - } else { - global.outputProgress({ - status: 'error', - operation: 'Error: ' + error.message, - current: 50, - total: 100, - elapsed: global.formatElapsedTime(startTime), - remaining: null, - rate: 0, - percentage: '50', - timing: { - start_time: new Date(startTime).toISOString(), - end_time: new Date().toISOString(), - elapsed_seconds: Math.round((Date.now() - startTime) / 1000) - } - }); - } - throw error; - } finally { - // Clear the timeout to prevent forced termination - clearTimeout(timeout); - - // Always release connection - if (connection) { - try { - connection.release(); - } catch (err) { - console.error('Error in final cleanup:', err); - } - } - } -} - -// Export as a module with all necessary functions -module.exports = { - updatePeriodicMetrics, - cancelCalculation, - getProgress: global.getProgress -}; - -// Run directly if called from command line -if (require.main === module) { - updatePeriodicMetrics().then(() => { - closePool().then(() => { - process.exit(0); - }); - }).catch(error => { - console.error('Error:', error); - closePool().then(() => { - process.exit(1); - }); - }); -} \ No newline at end of file diff --git a/inventory-server/scripts/metrics-new/update-product-metrics.js b/inventory-server/scripts/metrics-new/update-product-metrics.js deleted file mode 100644 index a6c00d8..0000000 --- a/inventory-server/scripts/metrics-new/update-product-metrics.js +++ /dev/null @@ -1,306 +0,0 @@ -const path = require('path'); -const fs = require('fs'); - -// Change working directory to script directory -process.chdir(path.dirname(__filename)); - -// Load environment variables -require('dotenv').config({ path: path.resolve(__dirname, '../..', '.env') }); - -// Add error handler for uncaught exceptions -process.on('uncaughtException', (error) => { - console.error('Uncaught Exception:', error); - process.exit(1); -}); - -// Add error handler for unhandled promise rejections -process.on('unhandledRejection', (reason, promise) => { - console.error('Unhandled Rejection at:', promise, 'reason:', reason); - process.exit(1); -}); - -// Load progress module -const progress = require('./utils/progress'); - -// Store progress functions in global scope to ensure availability -global.formatElapsedTime = progress.formatElapsedTime; -global.estimateRemaining = progress.estimateRemaining; -global.calculateRate = progress.calculateRate; -global.outputProgress = progress.outputProgress; -global.clearProgress = progress.clearProgress; -global.getProgress = progress.getProgress; -global.logError = progress.logError; - -// Load database module -const { getConnection, closePool } = require('./utils/db'); - -// Add cancel handler -let isCancelled = false; - -function cancelCalculation() { - isCancelled = true; - console.log('Calculation has been cancelled by user'); - - // Force-terminate any query that's been running for more than 5 seconds - try { - const connection = getConnection(); - connection.then(async (conn) => { - try { - // Identify and terminate long-running queries from our application - await conn.query(` - SELECT pg_cancel_backend(pid) - FROM pg_stat_activity - WHERE query_start < now() - interval '5 seconds' - AND application_name LIKE '%node%' - AND query NOT LIKE '%pg_cancel_backend%' - `); - - // Release connection - conn.release(); - } catch (err) { - console.error('Error during force cancellation:', err); - conn.release(); - } - }).catch(err => { - console.error('Could not get connection for cancellation:', err); - }); - } catch (err) { - console.error('Failed to terminate running queries:', err); - } - - return { - success: true, - message: 'Calculation has been cancelled' - }; -} - -// Handle SIGTERM signal for cancellation -process.on('SIGTERM', cancelCalculation); - -async function updateProductMetrics() { - let connection; - const startTime = Date.now(); - let calculateHistoryId; - - // Set a maximum execution time (30 minutes) - const MAX_EXECUTION_TIME = 30 * 60 * 1000; - const timeout = setTimeout(() => { - console.error(`Calculation timed out after ${MAX_EXECUTION_TIME/1000} seconds, forcing termination`); - // Call cancel and force exit - cancelCalculation(); - process.exit(1); - }, MAX_EXECUTION_TIME); - - try { - // Read the SQL file - const sqlFilePath = path.resolve(__dirname, 'update_product_metrics.sql'); - const sqlQuery = fs.readFileSync(sqlFilePath, 'utf8'); - - // Clean up any previously running calculations - connection = await getConnection(); - - // Ensure the calculate_status table exists and has the correct structure - await connection.query(` - CREATE TABLE IF NOT EXISTS calculate_status ( - module_name TEXT PRIMARY KEY, - last_calculation_timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP - ) - `); - - await connection.query(` - UPDATE calculate_history - SET - status = 'cancelled', - end_time = NOW(), - duration_seconds = EXTRACT(EPOCH FROM (NOW() - start_time))::INTEGER, - error_message = 'Previous calculation was not completed properly' - WHERE status = 'running' AND additional_info->>'type' = 'product_metrics' - `); - - // Create history record for this calculation - const historyResult = await connection.query(` - INSERT INTO calculate_history ( - start_time, - status, - additional_info - ) VALUES ( - NOW(), - 'running', - jsonb_build_object( - 'type', 'product_metrics', - 'sql_file', 'update_product_metrics.sql' - ) - ) RETURNING id - `); - calculateHistoryId = historyResult.rows[0].id; - - // Initialize progress - global.outputProgress({ - status: 'running', - operation: 'Starting product metrics calculation', - current: 0, - total: 100, - elapsed: '0s', - remaining: 'Calculating...', - rate: 0, - percentage: '0', - timing: { - start_time: new Date(startTime).toISOString(), - end_time: new Date().toISOString(), - elapsed_seconds: Math.round((Date.now() - startTime) / 1000) - } - }); - - // Execute the SQL query - global.outputProgress({ - status: 'running', - operation: 'Executing product metrics SQL query', - current: 25, - total: 100, - elapsed: global.formatElapsedTime(startTime), - remaining: 'Calculating...', - rate: 0, - percentage: '25', - timing: { - start_time: new Date(startTime).toISOString(), - end_time: new Date().toISOString(), - elapsed_seconds: Math.round((Date.now() - startTime) / 1000) - } - }); - - await connection.query(sqlQuery); - - // Update calculate_status table - await connection.query(` - INSERT INTO calculate_status (module_name, last_calculation_timestamp) - VALUES ($1, $2) - ON CONFLICT (module_name) DO UPDATE - SET last_calculation_timestamp = EXCLUDED.last_calculation_timestamp - `, ['product_metrics', new Date()]); - - // Update progress to 100% - global.outputProgress({ - status: 'complete', - operation: 'Product metrics calculation complete', - current: 100, - total: 100, - elapsed: global.formatElapsedTime(startTime), - remaining: '0s', - rate: 0, - percentage: '100', - timing: { - start_time: new Date(startTime).toISOString(), - end_time: new Date().toISOString(), - elapsed_seconds: Math.round((Date.now() - startTime) / 1000) - } - }); - - // Update history with completion - await connection.query(` - UPDATE calculate_history - SET - end_time = NOW(), - duration_seconds = $1, - status = 'completed' - WHERE id = $2 - `, [Math.round((Date.now() - startTime) / 1000), calculateHistoryId]); - - // Clear progress file on successful completion - global.clearProgress(); - - return { - success: true, - message: 'Product metrics calculation completed successfully', - duration: Math.round((Date.now() - startTime) / 1000) - }; - } catch (error) { - const endTime = Date.now(); - const totalElapsedSeconds = Math.round((endTime - startTime) / 1000); - - // Update history with error - if (connection && calculateHistoryId) { - await connection.query(` - UPDATE calculate_history - SET - end_time = NOW(), - duration_seconds = $1, - status = $2, - error_message = $3 - WHERE id = $4 - `, [ - totalElapsedSeconds, - isCancelled ? 'cancelled' : 'failed', - error.message, - calculateHistoryId - ]); - } - - if (isCancelled) { - global.outputProgress({ - status: 'cancelled', - operation: 'Calculation cancelled', - current: 50, - total: 100, - elapsed: global.formatElapsedTime(startTime), - remaining: null, - rate: 0, - percentage: '50', - timing: { - start_time: new Date(startTime).toISOString(), - end_time: new Date().toISOString(), - elapsed_seconds: Math.round((Date.now() - startTime) / 1000) - } - }); - } else { - global.outputProgress({ - status: 'error', - operation: 'Error: ' + error.message, - current: 50, - total: 100, - elapsed: global.formatElapsedTime(startTime), - remaining: null, - rate: 0, - percentage: '50', - timing: { - start_time: new Date(startTime).toISOString(), - end_time: new Date().toISOString(), - elapsed_seconds: Math.round((Date.now() - startTime) / 1000) - } - }); - } - throw error; - } finally { - // Clear the timeout to prevent forced termination - clearTimeout(timeout); - - // Always release connection - if (connection) { - try { - connection.release(); - } catch (err) { - console.error('Error in final cleanup:', err); - } - } - } -} - -// Export as a module with all necessary functions -module.exports = { - updateProductMetrics, - cancelCalculation, - getProgress: global.getProgress -}; - -// Run directly if called from command line -if (require.main === module) { - updateProductMetrics().then(() => { - closePool().then(() => { - process.exit(0); - }); - }).catch(error => { - console.error('Error:', error); - closePool().then(() => { - process.exit(1); - }); - }); -} \ No newline at end of file diff --git a/inventory-server/scripts/metrics-new/update_daily_snapshots.sql b/inventory-server/scripts/metrics-new/update_daily_snapshots.sql index b0f9f8a..1094a39 100644 --- a/inventory-server/scripts/metrics-new/update_daily_snapshots.sql +++ b/inventory-server/scripts/metrics-new/update_daily_snapshots.sql @@ -35,18 +35,59 @@ BEGIN FROM public.products p -- Start from products to include those with no orders today LEFT JOIN public.orders o ON p.pid = o.pid - AND o.date >= _target_date -- Filter orders for the target date - AND o.date < _target_date + INTERVAL '1 day' + AND o.date::date = _target_date -- Cast to date to ensure compatibility regardless of original type GROUP BY p.pid, p.sku ), ReceivingData AS ( SELECT po.pid, - COALESCE(SUM((rh.item->>'qty')::numeric), 0) AS units_received, - COALESCE(SUM((rh.item->>'qty')::numeric * COALESCE((rh.item->>'cost')::numeric, po.cost_price)), 0.00) AS cost_received + -- Prioritize the actual table fields over the JSON data + COALESCE( + -- First try the received field from purchase_orders table + SUM(CASE WHEN po.date::date = _target_date THEN po.received ELSE 0 END), + + -- Otherwise fall back to the receiving_history JSON as secondary source + SUM( + CASE + WHEN (rh.item->>'date')::date = _target_date THEN (rh.item->>'qty')::numeric + WHEN (rh.item->>'received_at')::date = _target_date THEN (rh.item->>'qty')::numeric + WHEN (rh.item->>'receipt_date')::date = _target_date THEN (rh.item->>'qty')::numeric + ELSE 0 + END + ), + 0 + ) AS units_received, + + COALESCE( + -- First try the actual cost_price from purchase_orders + SUM(CASE WHEN po.date::date = _target_date THEN po.received * po.cost_price ELSE 0 END), + + -- Otherwise fall back to receiving_history JSON + SUM( + CASE + WHEN (rh.item->>'date')::date = _target_date THEN (rh.item->>'qty')::numeric + WHEN (rh.item->>'received_at')::date = _target_date THEN (rh.item->>'qty')::numeric + WHEN (rh.item->>'receipt_date')::date = _target_date THEN (rh.item->>'qty')::numeric + ELSE 0 + END + * COALESCE((rh.item->>'cost')::numeric, po.cost_price) + ), + 0.00 + ) AS cost_received FROM public.purchase_orders po - CROSS JOIN LATERAL jsonb_array_elements(po.receiving_history) AS rh(item) - WHERE (rh.item->>'received_at')::date = _target_date -- Filter receipts for the target date + LEFT JOIN LATERAL jsonb_array_elements(po.receiving_history) AS rh(item) ON + jsonb_typeof(po.receiving_history) = 'array' AND + jsonb_array_length(po.receiving_history) > 0 AND + ( + (rh.item->>'date')::date = _target_date OR + (rh.item->>'received_at')::date = _target_date OR + (rh.item->>'receipt_date')::date = _target_date + ) + -- Include POs with the current date or relevant receiving_history + WHERE + po.date::date = _target_date OR + jsonb_typeof(po.receiving_history) = 'array' AND + jsonb_array_length(po.receiving_history) > 0 GROUP BY po.pid ), CurrentStock AS ( diff --git a/inventory-server/scripts/metrics-new/update_product_metrics.sql b/inventory-server/scripts/metrics-new/update_product_metrics.sql index 6f28a94..7409ce6 100644 --- a/inventory-server/scripts/metrics-new/update_product_metrics.sql +++ b/inventory-server/scripts/metrics-new/update_product_metrics.sql @@ -58,15 +58,41 @@ BEGIN p.pid, MIN(o.date)::date AS date_first_sold, MAX(o.date)::date AS max_order_date, -- Use MAX for potential recalc of date_last_sold - MIN(rh.first_receipt_date) AS date_first_received_calc, - MAX(rh.last_receipt_date) AS date_last_received_calc + + -- For first received date, try table data first then fall back to JSON + COALESCE( + MIN(po.date)::date, -- Try purchase_order date first + MIN(rh.first_receipt_date) -- Fall back to JSON data if needed + ) AS date_first_received_calc, + + -- If we only have one receipt date (first = last), use that for last_received too + COALESCE( + MAX(po.date)::date, -- Try purchase_order date first + NULLIF(MAX(rh.last_receipt_date), NULL), + MIN(rh.first_receipt_date) + ) AS date_last_received_calc FROM public.products p LEFT JOIN public.orders o ON p.pid = o.pid AND o.quantity > 0 AND o.status NOT IN ('canceled', 'returned') + LEFT JOIN public.purchase_orders po ON p.pid = po.pid AND po.received > 0 LEFT JOIN ( SELECT po.pid, - MIN((rh.item->>'received_at')::date) as first_receipt_date, - MAX((rh.item->>'received_at')::date) as last_receipt_date + MIN( + CASE + WHEN rh.item->>'date' IS NOT NULL THEN (rh.item->>'date')::date + WHEN rh.item->>'received_at' IS NOT NULL THEN (rh.item->>'received_at')::date + WHEN rh.item->>'receipt_date' IS NOT NULL THEN (rh.item->>'receipt_date')::date + ELSE NULL + END + ) as first_receipt_date, + MAX( + CASE + WHEN rh.item->>'date' IS NOT NULL THEN (rh.item->>'date')::date + WHEN rh.item->>'received_at' IS NOT NULL THEN (rh.item->>'received_at')::date + WHEN rh.item->>'receipt_date' IS NOT NULL THEN (rh.item->>'receipt_date')::date + ELSE NULL + END + ) as last_receipt_date FROM public.purchase_orders po CROSS JOIN LATERAL jsonb_array_elements(po.receiving_history) AS rh(item) WHERE jsonb_typeof(po.receiving_history) = 'array' AND jsonb_array_length(po.receiving_history) > 0 @@ -77,7 +103,14 @@ BEGIN SnapshotAggregates AS ( SELECT pid, - -- Rolling periods (ensure dates are inclusive/exclusive as needed) + -- Get the counts of all available data + COUNT(DISTINCT snapshot_date) AS available_days, + + -- Rolling periods with no time constraint - just sum everything we have + SUM(units_sold) AS total_units_sold, + SUM(net_revenue) AS total_net_revenue, + + -- Specific time windows if we have enough data SUM(CASE WHEN snapshot_date >= _current_date - INTERVAL '6 days' THEN units_sold ELSE 0 END) AS sales_7d, SUM(CASE WHEN snapshot_date >= _current_date - INTERVAL '6 days' THEN net_revenue ELSE 0 END) AS revenue_7d, SUM(CASE WHEN snapshot_date >= _current_date - INTERVAL '13 days' THEN units_sold ELSE 0 END) AS sales_14d, @@ -103,7 +136,7 @@ BEGIN AVG(CASE WHEN snapshot_date >= _current_date - INTERVAL '29 days' THEN eod_stock_retail END) AS avg_stock_retail_30d, AVG(CASE WHEN snapshot_date >= _current_date - INTERVAL '29 days' THEN eod_stock_gross END) AS avg_stock_gross_30d, - -- Lifetime + -- Lifetime - should match total values above SUM(units_sold) AS lifetime_sales, SUM(net_revenue) AS lifetime_revenue, @@ -111,8 +144,6 @@ BEGIN SUM(CASE WHEN snapshot_date = _current_date - INTERVAL '1 day' THEN units_sold ELSE 0 END) as yesterday_sales FROM public.daily_product_snapshots - WHERE snapshot_date <= _current_date -- Include today's snapshot - AND snapshot_date >= _current_date - INTERVAL '365 days' -- Limit history scan slightly GROUP BY pid ), FirstPeriodMetrics AS ( @@ -191,7 +222,9 @@ BEGIN sa.stockout_days_30d, sa.sales_365d, sa.revenue_365d, sa.avg_stock_units_30d, sa.avg_stock_cost_30d, sa.avg_stock_retail_30d, sa.avg_stock_gross_30d, sa.received_qty_30d, sa.received_cost_30d, - sa.lifetime_sales, sa.lifetime_revenue, + -- Use total counts for lifetime values to ensure we have data even with limited history + COALESCE(sa.total_units_sold, sa.lifetime_sales) AS lifetime_sales, + COALESCE(sa.total_net_revenue, sa.lifetime_revenue) AS lifetime_revenue, fpm.first_7_days_sales, fpm.first_7_days_revenue, fpm.first_30_days_sales, fpm.first_30_days_revenue, fpm.first_60_days_sales, fpm.first_60_days_revenue, fpm.first_90_days_sales, fpm.first_90_days_revenue, diff --git a/inventory-server/scripts/psql-csv-import.sh b/inventory-server/scripts/psql-csv-import.sh new file mode 100755 index 0000000..e616011 --- /dev/null +++ b/inventory-server/scripts/psql-csv-import.sh @@ -0,0 +1,428 @@ +#!/bin/bash + +# Simple script to import CSV to PostgreSQL using psql +# Usage: ./psql-csv-import.sh [start-batch] + +# Exit on error +set -e + +# Get arguments +CSV_FILE=$1 +TABLE_NAME=$2 +BATCH_SIZE=500000 # Process 500,000 rows at a time +START_BATCH=${3:-1} # Optional third parameter to start from a specific batch + +if [ -z "$CSV_FILE" ] || [ -z "$TABLE_NAME" ]; then + echo "Usage: ./psql-csv-import.sh [start-batch]" + exit 1 +fi + +# Check if file exists (only needed for batch 1) +if [ "$START_BATCH" -eq 1 ] && [ ! -f "$CSV_FILE" ]; then + echo "Error: CSV file '$CSV_FILE' not found" + exit 1 +fi + +# Load environment variables +if [ -f "../.env" ]; then + source "../.env" +else + echo "Warning: .env file not found, using default connection parameters" +fi + +# Set default connection parameters if not from .env +DB_HOST=${DB_HOST:-localhost} +DB_PORT=${DB_PORT:-5432} +DB_NAME=${DB_NAME:-inventory_db} +DB_USER=${DB_USER:-postgres} +export PGPASSWORD=${DB_PASSWORD:-} # Export password for psql + +# Common psql parameters +PSQL_OPTS="-h $DB_HOST -p $DB_PORT -U $DB_USER -d $DB_NAME" + +# Function to clean up database state +cleanup_and_optimize() { + echo "Cleaning up and optimizing database state..." + + # Analyze the target table to update statistics + psql $PSQL_OPTS -c "ANALYZE $TABLE_NAME;" + + # Perform vacuum to reclaim space and update stats + psql $PSQL_OPTS -c "VACUUM $TABLE_NAME;" + + # Reset connection pool + psql $PSQL_OPTS -c "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = current_database() AND pid <> pg_backend_pid();" + + # Clean up shared memory + psql $PSQL_OPTS -c "DISCARD ALL;" + + echo "Optimization complete." +} + +# Show connection info +echo "Importing $CSV_FILE into $TABLE_NAME" +echo "Database: $DB_NAME on $DB_HOST:$DB_PORT with batch size: $BATCH_SIZE starting at batch $START_BATCH" + +# Start timer +START_TIME=$(date +%s) + +# Create progress tracking file +PROGRESS_FILE="/tmp/import_progress_${TABLE_NAME}.txt" +touch "$PROGRESS_FILE" +echo "Starting import at $(date), batch $START_BATCH" >> "$PROGRESS_FILE" + +# If we're resuming, run cleanup first +if [ "$START_BATCH" -gt 1 ]; then + cleanup_and_optimize +fi + +# For imported_product_stat_history, use optimized approach with hardcoded column names +if [ "$TABLE_NAME" = "imported_product_stat_history" ]; then + echo "Using optimized import for $TABLE_NAME" + + # Only drop constraints/indexes and create staging table for batch 1 + if [ "$START_BATCH" -eq 1 ]; then + # Extract CSV header + CSV_HEADER=$(head -n 1 "$CSV_FILE") + echo "CSV header: $CSV_HEADER" + + # Step 1: Drop constraints and indexes + echo "Dropping constraints and indexes..." + psql $PSQL_OPTS -c " + DO \$\$ + DECLARE + constraint_name TEXT; + BEGIN + -- Drop primary key constraint if exists + SELECT conname INTO constraint_name + FROM pg_constraint + WHERE conrelid = '$TABLE_NAME'::regclass AND contype = 'p'; + + IF FOUND THEN + EXECUTE 'ALTER TABLE $TABLE_NAME DROP CONSTRAINT IF EXISTS ' || constraint_name; + RAISE NOTICE 'Dropped primary key constraint: %', constraint_name; + END IF; + END \$\$; + " + + # Drop all indexes on the table + psql $PSQL_OPTS -c " + DO \$\$ + DECLARE + index_name TEXT; + index_record RECORD; + BEGIN + FOR index_record IN + SELECT indexname + FROM pg_indexes + WHERE tablename = '$TABLE_NAME' + LOOP + EXECUTE 'DROP INDEX IF EXISTS ' || index_record.indexname; + RAISE NOTICE 'Dropped index: %', index_record.indexname; + END LOOP; + END \$\$; + " + + # Step 2: Set maintenance_work_mem and disable triggers + echo "Setting maintenance_work_mem and disabling triggers..." + psql $PSQL_OPTS -c " + SET maintenance_work_mem = '1GB'; + ALTER TABLE $TABLE_NAME DISABLE TRIGGER ALL; + " + + # Step 3: Create staging table + echo "Creating staging table..." + psql $PSQL_OPTS -c " + DROP TABLE IF EXISTS staging_import; + CREATE UNLOGGED TABLE staging_import ( + pid TEXT, + date TEXT, + score TEXT, + score2 TEXT, + qty_in_baskets TEXT, + qty_sold TEXT, + notifies_set TEXT, + visibility_score TEXT, + health_score TEXT, + sold_view_score TEXT + ); + + -- Create an index on staging_import to improve OFFSET performance + CREATE INDEX ON staging_import (pid); + " + + # Step 4: Import CSV into staging table + echo "Importing CSV into staging table..." + psql $PSQL_OPTS -c "\copy staging_import FROM '$CSV_FILE' WITH CSV HEADER DELIMITER ','" + else + echo "Resuming import from batch $START_BATCH - skipping table creation and CSV import" + + # Check if staging table exists + STAGING_EXISTS=$(psql $PSQL_OPTS -t -c "SELECT EXISTS(SELECT 1 FROM pg_tables WHERE tablename='staging_import');" | tr -d '[:space:]') + + if [ "$STAGING_EXISTS" != "t" ]; then + echo "Error: Staging table 'staging_import' does not exist. Run without batch parameter first." + exit 1 + fi + + # Ensure triggers are disabled + psql $PSQL_OPTS -c "ALTER TABLE $TABLE_NAME DISABLE TRIGGER ALL;" + + # Optimize PostgreSQL for better performance + psql $PSQL_OPTS -c " + -- Increase work mem for this session + SET work_mem = '256MB'; + SET maintenance_work_mem = '1GB'; + " + fi + + # Step 5: Get total row count + TOTAL_ROWS=$(psql $PSQL_OPTS -t -c "SELECT COUNT(*) FROM staging_import;" | tr -d '[:space:]') + echo "Total rows to import: $TOTAL_ROWS" + + # Calculate starting point + PROCESSED=$(( ($START_BATCH - 1) * $BATCH_SIZE )) + if [ $PROCESSED -ge $TOTAL_ROWS ]; then + echo "Error: Start batch $START_BATCH is beyond the available rows ($TOTAL_ROWS)" + exit 1 + fi + + # Step 6: Process in batches with shell loop + BATCH_NUM=$(( $START_BATCH - 1 )) + + # We'll process batches in chunks of 10 before cleaning up + CHUNKS_SINCE_CLEANUP=0 + + while [ $PROCESSED -lt $TOTAL_ROWS ]; do + BATCH_NUM=$(( $BATCH_NUM + 1 )) + BATCH_START=$(date +%s) + MAX_ROWS=$(( $PROCESSED + $BATCH_SIZE )) + if [ $MAX_ROWS -gt $TOTAL_ROWS ]; then + MAX_ROWS=$TOTAL_ROWS + fi + + echo "Processing batch $BATCH_NUM (rows $PROCESSED to $MAX_ROWS)..." + + # Optimize query buffer for this batch + psql $PSQL_OPTS -c "SET work_mem = '256MB';" + + # Insert batch with type casts + psql $PSQL_OPTS -c " + INSERT INTO $TABLE_NAME ( + pid, date, score, score2, qty_in_baskets, qty_sold, + notifies_set, visibility_score, health_score, sold_view_score + ) + SELECT + pid::bigint, + date::date, + score::numeric, + score2::numeric, + qty_in_baskets::smallint, + qty_sold::smallint, + notifies_set::smallint, + visibility_score::numeric, + health_score::varchar, + sold_view_score::numeric + FROM staging_import + LIMIT $BATCH_SIZE + OFFSET $PROCESSED; + " + + # Update progress + BATCH_END=$(date +%s) + BATCH_ELAPSED=$(( $BATCH_END - $BATCH_START )) + PROGRESS_PCT=$(echo "scale=2; $MAX_ROWS * 100 / $TOTAL_ROWS" | bc) + + echo "Batch $BATCH_NUM committed in ${BATCH_ELAPSED}s, $MAX_ROWS of $TOTAL_ROWS rows processed ($PROGRESS_PCT%)" | tee -a "$PROGRESS_FILE" + + # Increment counter + PROCESSED=$(( $PROCESSED + $BATCH_SIZE )) + CHUNKS_SINCE_CLEANUP=$(( $CHUNKS_SINCE_CLEANUP + 1 )) + + # Check current row count every 10 batches + if [ $(( $BATCH_NUM % 10 )) -eq 0 ]; then + CURRENT_COUNT=$(psql $PSQL_OPTS -t -c "SELECT COUNT(*) FROM $TABLE_NAME;" | tr -d '[:space:]') + echo "Current row count in $TABLE_NAME: $CURRENT_COUNT" | tee -a "$PROGRESS_FILE" + + # Every 10 batches, run an intermediate cleanup + if [ $CHUNKS_SINCE_CLEANUP -ge 10 ]; then + echo "Running intermediate cleanup and optimization..." + psql $PSQL_OPTS -c "VACUUM $TABLE_NAME;" + CHUNKS_SINCE_CLEANUP=0 + fi + fi + + # Optional - write a checkpoint file to know where to restart + echo "$BATCH_NUM" > "/tmp/import_last_batch_${TABLE_NAME}.txt" + done + + # Only recreate indexes if we've completed the import + if [ $PROCESSED -ge $TOTAL_ROWS ]; then + # Step 7: Re-enable triggers and recreate primary key + echo "Re-enabling triggers and recreating primary key..." + psql $PSQL_OPTS -c " + ALTER TABLE $TABLE_NAME ENABLE TRIGGER ALL; + ALTER TABLE $TABLE_NAME ADD PRIMARY KEY (pid, date); + " + + # Step 8: Clean up and get final count + echo "Cleaning up and getting final count..." + psql $PSQL_OPTS -c " + DROP TABLE staging_import; + VACUUM ANALYZE $TABLE_NAME; + SELECT COUNT(*) AS \"Total rows in $TABLE_NAME\" FROM $TABLE_NAME; + " + else + echo "Import interrupted at batch $BATCH_NUM. To resume, run:" + echo "./psql-csv-import.sh $CSV_FILE $TABLE_NAME $BATCH_NUM" + fi + +else + # Generic approach for other tables + if [ "$START_BATCH" -eq 1 ]; then + # Extract CSV header + CSV_HEADER=$(head -n 1 "$CSV_FILE") + echo "CSV header: $CSV_HEADER" + + # Extract CSV header and format it for SQL + CSV_COLUMNS=$(echo "$CSV_HEADER" | tr ',' '\n' | sed 's/^/"/;s/$/"/' | tr '\n' ',' | sed 's/,$//') + TEMP_COLUMNS=$(echo "$CSV_HEADER" | tr ',' '\n' | sed 's/$/ TEXT/' | tr '\n' ',' | sed 's/,$//') + + echo "Importing columns: $CSV_COLUMNS" + + # Step 1: Set maintenance_work_mem and disable triggers + echo "Setting maintenance_work_mem and disabling triggers..." + psql $PSQL_OPTS -c " + SET maintenance_work_mem = '1GB'; + ALTER TABLE $TABLE_NAME DISABLE TRIGGER ALL; + " + + # Step 2: Create temp table + echo "Creating temporary table..." + psql $PSQL_OPTS -c " + DROP TABLE IF EXISTS temp_import; + CREATE UNLOGGED TABLE temp_import ($TEMP_COLUMNS); + + -- Create an index on temp_import to improve OFFSET performance + CREATE INDEX ON temp_import ((1)); -- Index on first column + " + + # Step 3: Import CSV into temp table + echo "Importing CSV into temporary table..." + psql $PSQL_OPTS -c "\copy temp_import FROM '$CSV_FILE' WITH CSV HEADER DELIMITER ','" + else + echo "Resuming import from batch $START_BATCH - skipping table creation and CSV import" + + # Check if temp table exists + TEMP_EXISTS=$(psql $PSQL_OPTS -t -c "SELECT EXISTS(SELECT 1 FROM pg_tables WHERE tablename='temp_import');" | tr -d '[:space:]') + + if [ "$TEMP_EXISTS" != "t" ]; then + echo "Error: Temporary table 'temp_import' does not exist. Run without batch parameter first." + exit 1 + fi + + # Ensure triggers are disabled + psql $PSQL_OPTS -c "ALTER TABLE $TABLE_NAME DISABLE TRIGGER ALL;" + + # Optimize PostgreSQL for better performance + psql $PSQL_OPTS -c " + -- Increase work mem for this session + SET work_mem = '256MB'; + SET maintenance_work_mem = '1GB'; + " + + # Hard-code columns since we know them + CSV_COLUMNS='"pid","date","score","score2","qty_in_baskets","qty_sold","notifies_set","visibility_score","health_score","sold_view_score"' + + echo "Using standard columns: $CSV_COLUMNS" + fi + + # Step 4: Get total row count + TOTAL_ROWS=$(psql $PSQL_OPTS -t -c "SELECT COUNT(*) FROM temp_import;" | tr -d '[:space:]') + echo "Total rows to import: $TOTAL_ROWS" + + # Calculate starting point + PROCESSED=$(( ($START_BATCH - 1) * $BATCH_SIZE )) + if [ $PROCESSED -ge $TOTAL_ROWS ]; then + echo "Error: Start batch $START_BATCH is beyond the available rows ($TOTAL_ROWS)" + exit 1 + fi + + # Step 5: Process in batches with shell loop + BATCH_NUM=$(( $START_BATCH - 1 )) + + # We'll process batches in chunks of 10 before cleaning up + CHUNKS_SINCE_CLEANUP=0 + + while [ $PROCESSED -lt $TOTAL_ROWS ]; do + BATCH_NUM=$(( $BATCH_NUM + 1 )) + BATCH_START=$(date +%s) + MAX_ROWS=$(( $PROCESSED + $BATCH_SIZE )) + if [ $MAX_ROWS -gt $TOTAL_ROWS ]; then + MAX_ROWS=$TOTAL_ROWS + fi + + echo "Processing batch $BATCH_NUM (rows $PROCESSED to $MAX_ROWS)..." + + # Optimize query buffer for this batch + psql $PSQL_OPTS -c "SET work_mem = '256MB';" + + # Insert batch + psql $PSQL_OPTS -c " + INSERT INTO $TABLE_NAME ($CSV_COLUMNS) + SELECT $CSV_COLUMNS + FROM temp_import + LIMIT $BATCH_SIZE + OFFSET $PROCESSED; + " + + # Update progress + BATCH_END=$(date +%s) + BATCH_ELAPSED=$(( $BATCH_END - $BATCH_START )) + PROGRESS_PCT=$(echo "scale=2; $MAX_ROWS * 100 / $TOTAL_ROWS" | bc) + + echo "Batch $BATCH_NUM committed in ${BATCH_ELAPSED}s, $MAX_ROWS of $TOTAL_ROWS rows processed ($PROGRESS_PCT%)" | tee -a "$PROGRESS_FILE" + + # Increment counter + PROCESSED=$(( $PROCESSED + $BATCH_SIZE )) + CHUNKS_SINCE_CLEANUP=$(( $CHUNKS_SINCE_CLEANUP + 1 )) + + # Check current row count every 10 batches + if [ $(( $BATCH_NUM % 10 )) -eq 0 ]; then + CURRENT_COUNT=$(psql $PSQL_OPTS -t -c "SELECT COUNT(*) FROM $TABLE_NAME;" | tr -d '[:space:]') + echo "Current row count in $TABLE_NAME: $CURRENT_COUNT" | tee -a "$PROGRESS_FILE" + + # Every 10 batches, run an intermediate cleanup + if [ $CHUNKS_SINCE_CLEANUP -ge 10 ]; then + echo "Running intermediate cleanup and optimization..." + psql $PSQL_OPTS -c "VACUUM $TABLE_NAME;" + CHUNKS_SINCE_CLEANUP=0 + fi + fi + + # Optional - write a checkpoint file to know where to restart + echo "$BATCH_NUM" > "/tmp/import_last_batch_${TABLE_NAME}.txt" + done + + # Only clean up if we've completed the import + if [ $PROCESSED -ge $TOTAL_ROWS ]; then + # Step 6: Re-enable triggers and clean up + echo "Re-enabling triggers and cleaning up..." + psql $PSQL_OPTS -c " + ALTER TABLE $TABLE_NAME ENABLE TRIGGER ALL; + DROP TABLE temp_import; + VACUUM ANALYZE $TABLE_NAME; + SELECT COUNT(*) AS \"Total rows in $TABLE_NAME\" FROM $TABLE_NAME; + " + else + echo "Import interrupted at batch $BATCH_NUM. To resume, run:" + echo "./psql-csv-import.sh $CSV_FILE $TABLE_NAME $BATCH_NUM" + fi +fi + +# Calculate elapsed time +END_TIME=$(date +%s) +ELAPSED=$((END_TIME - START_TIME)) + +echo "Import completed successfully in ${ELAPSED}s ($(($ELAPSED / 60)) minutes)" +echo "Progress log saved to $PROGRESS_FILE" \ No newline at end of file