diff --git a/inventory-server/db/schema.sql b/inventory-server/db/schema.sql index 779e3a1..fa32a8e 100644 --- a/inventory-server/db/schema.sql +++ b/inventory-server/db/schema.sql @@ -211,4 +211,57 @@ CREATE INDEX idx_po_updated ON purchase_orders(updated); SET session_replication_role = 'origin'; -- Re-enable foreign key checks -- Create views for common calculations --- product_sales_trends view moved to metrics-schema.sql \ No newline at end of file +-- product_sales_trends view moved to metrics-schema.sql + +-- Historical data tables imported from production +CREATE TABLE imported_product_current_prices ( + price_id BIGSERIAL PRIMARY KEY, + pid BIGINT NOT NULL, + qty_buy SMALLINT NOT NULL, + is_min_qty_buy BOOLEAN NOT NULL, + price_each NUMERIC(10,3) NOT NULL, + qty_limit SMALLINT NOT NULL, + no_promo BOOLEAN NOT NULL, + checkout_offer BOOLEAN NOT NULL, + active BOOLEAN NOT NULL, + date_active TIMESTAMP WITH TIME ZONE, + date_deactive TIMESTAMP WITH TIME ZONE, + updated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX idx_imported_product_current_prices_pid ON imported_product_current_prices(pid, active, qty_buy); +CREATE INDEX idx_imported_product_current_prices_checkout ON imported_product_current_prices(checkout_offer, active); +CREATE INDEX idx_imported_product_current_prices_deactive ON imported_product_current_prices(date_deactive, active); +CREATE INDEX idx_imported_product_current_prices_active ON imported_product_current_prices(date_active, active); + +CREATE TABLE imported_daily_inventory ( + date DATE NOT NULL, + pid BIGINT NOT NULL, + amountsold SMALLINT NOT NULL DEFAULT 0, + times_sold SMALLINT NOT NULL DEFAULT 0, + qtyreceived SMALLINT NOT NULL DEFAULT 0, + price NUMERIC(7,2) NOT NULL DEFAULT 0, + costeach NUMERIC(7,2) NOT NULL DEFAULT 0, + stamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (date, pid) +); + +CREATE INDEX idx_imported_daily_inventory_pid ON imported_daily_inventory(pid); + +CREATE TABLE imported_product_stat_history ( + pid BIGINT NOT NULL, + date DATE NOT NULL, + score NUMERIC(10,2) NOT NULL, + score2 NUMERIC(10,2) NOT NULL, + qty_in_baskets SMALLINT NOT NULL, + qty_sold SMALLINT NOT NULL, + notifies_set SMALLINT NOT NULL, + visibility_score NUMERIC(10,2) NOT NULL, + health_score VARCHAR(5) NOT NULL, + sold_view_score NUMERIC(6,3) NOT NULL, + updated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (pid, date) +); + +CREATE INDEX idx_imported_product_stat_history_date ON imported_product_stat_history(date); \ No newline at end of file diff --git a/inventory-server/scripts/import-from-prod.js b/inventory-server/scripts/import-from-prod.js index 7c0538a..da2d15d 100644 --- a/inventory-server/scripts/import-from-prod.js +++ b/inventory-server/scripts/import-from-prod.js @@ -6,6 +6,7 @@ const importCategories = require('./import/categories'); const { importProducts } = require('./import/products'); const importOrders = require('./import/orders'); const importPurchaseOrders = require('./import/purchase-orders'); +const importHistoricalData = require('./import/historical-data'); dotenv.config({ path: path.join(__dirname, "../.env") }); @@ -13,7 +14,8 @@ dotenv.config({ path: path.join(__dirname, "../.env") }); const IMPORT_CATEGORIES = false; const IMPORT_PRODUCTS = false; const IMPORT_ORDERS = false; -const IMPORT_PURCHASE_ORDERS = true; +const IMPORT_PURCHASE_ORDERS = false; +const IMPORT_HISTORICAL_DATA = true; // Add flag for incremental updates const INCREMENTAL_UPDATE = process.env.INCREMENTAL_UPDATE !== 'false'; // Default to true unless explicitly set to false @@ -78,7 +80,8 @@ async function main() { IMPORT_CATEGORIES, IMPORT_PRODUCTS, IMPORT_ORDERS, - IMPORT_PURCHASE_ORDERS + IMPORT_PURCHASE_ORDERS, + IMPORT_HISTORICAL_DATA ].filter(Boolean).length; try { @@ -108,17 +111,6 @@ async function main() { WHERE status = 'running' `); - // Initialize sync_status table if it doesn't exist - await localConnection.query(` - CREATE TABLE IF NOT EXISTS sync_status ( - table_name TEXT PRIMARY KEY, - last_sync_timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, - last_sync_id BIGINT - ); - - CREATE INDEX IF NOT EXISTS idx_last_sync ON sync_status (last_sync_timestamp); - `); - // Create import history record for the overall session try { const [historyResult] = await localConnection.query(` @@ -137,10 +129,11 @@ async function main() { 'categories_enabled', $2::boolean, 'products_enabled', $3::boolean, 'orders_enabled', $4::boolean, - 'purchase_orders_enabled', $5::boolean + 'purchase_orders_enabled', $5::boolean, + 'historical_data_enabled', $6::boolean ) ) RETURNING id - `, [INCREMENTAL_UPDATE, IMPORT_CATEGORIES, IMPORT_PRODUCTS, IMPORT_ORDERS, IMPORT_PURCHASE_ORDERS]); + `, [INCREMENTAL_UPDATE, IMPORT_CATEGORIES, IMPORT_PRODUCTS, IMPORT_ORDERS, IMPORT_PURCHASE_ORDERS, IMPORT_HISTORICAL_DATA]); importHistoryId = historyResult.rows[0].id; } catch (error) { console.error("Error creating import history record:", error); @@ -157,7 +150,8 @@ async function main() { categories: null, products: null, orders: null, - purchaseOrders: null + purchaseOrders: null, + historicalData: null }; let totalRecordsAdded = 0; @@ -217,6 +211,32 @@ async function main() { } } + if (IMPORT_HISTORICAL_DATA) { + try { + results.historicalData = await importHistoricalData(prodConnection, localConnection, INCREMENTAL_UPDATE); + if (isImportCancelled) throw new Error("Import cancelled"); + completedSteps++; + console.log('Historical data import result:', results.historicalData); + + // Handle potential error status + if (results.historicalData?.status === 'error') { + console.error('Historical data import had an error:', results.historicalData.error); + } else { + totalRecordsAdded += parseInt(results.historicalData?.recordsAdded || 0); + totalRecordsUpdated += parseInt(results.historicalData?.recordsUpdated || 0); + } + } catch (error) { + console.error('Error during historical data import:', error); + // Continue with other imports, don't fail the whole process + results.historicalData = { + status: 'error', + error: error.message, + recordsAdded: 0, + recordsUpdated: 0 + }; + } + } + const endTime = Date.now(); const totalElapsedSeconds = Math.round((endTime - startTime) / 1000); @@ -234,12 +254,14 @@ async function main() { 'products_enabled', $5::boolean, 'orders_enabled', $6::boolean, 'purchase_orders_enabled', $7::boolean, - 'categories_result', COALESCE($8::jsonb, 'null'::jsonb), - 'products_result', COALESCE($9::jsonb, 'null'::jsonb), - 'orders_result', COALESCE($10::jsonb, 'null'::jsonb), - 'purchase_orders_result', COALESCE($11::jsonb, 'null'::jsonb) + 'historical_data_enabled', $8::boolean, + 'categories_result', COALESCE($9::jsonb, 'null'::jsonb), + 'products_result', COALESCE($10::jsonb, 'null'::jsonb), + 'orders_result', COALESCE($11::jsonb, 'null'::jsonb), + 'purchase_orders_result', COALESCE($12::jsonb, 'null'::jsonb), + 'historical_data_result', COALESCE($13::jsonb, 'null'::jsonb) ) - WHERE id = $12 + WHERE id = $14 `, [ totalElapsedSeconds, parseInt(totalRecordsAdded), @@ -248,10 +270,12 @@ async function main() { IMPORT_PRODUCTS, IMPORT_ORDERS, IMPORT_PURCHASE_ORDERS, + IMPORT_HISTORICAL_DATA, JSON.stringify(results.categories), JSON.stringify(results.products), JSON.stringify(results.orders), JSON.stringify(results.purchaseOrders), + JSON.stringify(results.historicalData), importHistoryId ]); diff --git a/inventory-server/scripts/import/historical-data.js b/inventory-server/scripts/import/historical-data.js new file mode 100644 index 0000000..2eee159 --- /dev/null +++ b/inventory-server/scripts/import/historical-data.js @@ -0,0 +1,961 @@ +const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress'); +const fs = require('fs'); +const path = require('path'); +const { pipeline } = require('stream'); +const { promisify } = require('util'); + +// Configuration constants to control which tables get imported +const IMPORT_PRODUCT_CURRENT_PRICES = false; +const IMPORT_DAILY_INVENTORY = false; +const IMPORT_PRODUCT_STAT_HISTORY = true; + +// For product stat history, limit to more recent data for faster initial import +const USE_RECENT_MONTHS = 12; // Just use the most recent months for product_stat_history + +/** + * Validates a date from MySQL before inserting it into PostgreSQL + * @param {string|Date|null} mysqlDate - Date string or object from MySQL + * @returns {string|null} Valid date string or null if invalid + */ +function validateDate(mysqlDate) { + // Handle null, undefined, or empty values + if (!mysqlDate) { + return null; + } + + // Convert to string if it's not already + const dateStr = String(mysqlDate); + + // Handle MySQL zero dates and empty values + if (dateStr === '0000-00-00' || + dateStr === '0000-00-00 00:00:00' || + dateStr.indexOf('0000-00-00') !== -1 || + dateStr === '') { + return null; + } + + // Check if the date is valid + const date = new Date(mysqlDate); + + // If the date is invalid or suspiciously old (pre-1970), return null + if (isNaN(date.getTime()) || date.getFullYear() < 1970) { + return null; + } + + return mysqlDate; +} + +/** + * Imports historical data from MySQL to PostgreSQL + */ +async function importHistoricalData( + prodConnection, + localConnection, + options = {} +) { + const { + incrementalUpdate = true, + oneYearAgo = new Date(new Date().setFullYear(new Date().getFullYear() - 1)) + } = options; + + const oneYearAgoStr = oneYearAgo.toISOString().split('T')[0]; + const startTime = Date.now(); + + // Use larger batch sizes to improve performance + const BATCH_SIZE = 5000; // For fetching from small tables + const INSERT_BATCH_SIZE = 500; // For inserting to small tables + const LARGE_BATCH_SIZE = 10000; // For fetching from large tables + const LARGE_INSERT_BATCH_SIZE = 1000; // For inserting to large tables + + // Calculate date for recent data + const recentDateStr = new Date( + new Date().setMonth(new Date().getMonth() - USE_RECENT_MONTHS) + ).toISOString().split('T')[0]; + + console.log(`Starting import with: + - One year ago date: ${oneYearAgoStr} + - Recent months date: ${recentDateStr} (for product_stat_history) + - Incremental update: ${incrementalUpdate} + - Standard batch size: ${BATCH_SIZE} + - Standard insert batch size: ${INSERT_BATCH_SIZE} + - Large table batch size: ${LARGE_BATCH_SIZE} + - Large table insert batch size: ${LARGE_INSERT_BATCH_SIZE} + - Import product_current_prices: ${IMPORT_PRODUCT_CURRENT_PRICES} + - Import daily_inventory: ${IMPORT_DAILY_INVENTORY} + - Import product_stat_history: ${IMPORT_PRODUCT_STAT_HISTORY}`); + + try { + // Get last sync time for incremental updates + const lastSyncTimes = {}; + + if (incrementalUpdate) { + try { + const syncResult = await localConnection.query(` + SELECT table_name, last_sync_timestamp + FROM sync_status + WHERE table_name IN ( + 'imported_product_current_prices', + 'imported_daily_inventory', + 'imported_product_stat_history' + ) + `); + + // Add check for rows existence and type + if (syncResult && Array.isArray(syncResult.rows)) { + for (const row of syncResult.rows) { + lastSyncTimes[row.table_name] = row.last_sync_timestamp; + console.log(`Last sync time for ${row.table_name}: ${row.last_sync_timestamp}`); + } + } else { + console.warn('Sync status query did not return expected rows. Proceeding without last sync times.'); + } + } catch (error) { + console.error('Error fetching sync status:', error); + } + } + + // Determine how many tables will be imported + const tablesCount = [ + IMPORT_PRODUCT_CURRENT_PRICES, + IMPORT_DAILY_INVENTORY, + IMPORT_PRODUCT_STAT_HISTORY + ].filter(Boolean).length; + + // Run all imports sequentially for better reliability + console.log(`Starting sequential imports for ${tablesCount} tables...`); + outputProgress({ + status: "running", + operation: "Historical data import", + message: `Starting sequential imports for ${tablesCount} tables...`, + current: 0, + total: tablesCount, + elapsed: formatElapsedTime(startTime) + }); + + let progressCount = 0; + let productCurrentPricesResult = { recordsAdded: 0, recordsUpdated: 0, totalProcessed: 0, errors: [] }; + let dailyInventoryResult = { recordsAdded: 0, recordsUpdated: 0, totalProcessed: 0, errors: [] }; + let productStatHistoryResult = { recordsAdded: 0, recordsUpdated: 0, totalProcessed: 0, errors: [] }; + + // Import product current prices + if (IMPORT_PRODUCT_CURRENT_PRICES) { + console.log('Importing product current prices...'); + productCurrentPricesResult = await importProductCurrentPrices( + prodConnection, + localConnection, + oneYearAgoStr, + lastSyncTimes['imported_product_current_prices'], + BATCH_SIZE, + INSERT_BATCH_SIZE, + incrementalUpdate, + startTime + ); + progressCount++; + + outputProgress({ + status: "running", + operation: "Historical data import", + message: `Completed import ${progressCount} of ${tablesCount}`, + current: progressCount, + total: tablesCount, + elapsed: formatElapsedTime(startTime) + }); + } + + // Import daily inventory + if (IMPORT_DAILY_INVENTORY) { + console.log('Importing daily inventory...'); + dailyInventoryResult = await importDailyInventory( + prodConnection, + localConnection, + oneYearAgoStr, + lastSyncTimes['imported_daily_inventory'], + BATCH_SIZE, + INSERT_BATCH_SIZE, + incrementalUpdate, + startTime + ); + progressCount++; + + outputProgress({ + status: "running", + operation: "Historical data import", + message: `Completed import ${progressCount} of ${tablesCount}`, + current: progressCount, + total: tablesCount, + elapsed: formatElapsedTime(startTime) + }); + } + + // Import product stat history - using optimized approach + if (IMPORT_PRODUCT_STAT_HISTORY) { + console.log('Importing product stat history...'); + productStatHistoryResult = await importProductStatHistory( + prodConnection, + localConnection, + recentDateStr, // Use more recent date for this massive table + lastSyncTimes['imported_product_stat_history'], + LARGE_BATCH_SIZE, + LARGE_INSERT_BATCH_SIZE, + incrementalUpdate, + startTime, + USE_RECENT_MONTHS // Pass the recent months constant + ); + progressCount++; + + outputProgress({ + status: "running", + operation: "Historical data import", + message: `Completed import ${progressCount} of ${tablesCount}`, + current: progressCount, + total: tablesCount, + elapsed: formatElapsedTime(startTime) + }); + } + + // Aggregate results + const totalRecordsAdded = + productCurrentPricesResult.recordsAdded + + dailyInventoryResult.recordsAdded + + productStatHistoryResult.recordsAdded; + + const totalRecordsUpdated = + productCurrentPricesResult.recordsUpdated + + dailyInventoryResult.recordsUpdated + + productStatHistoryResult.recordsUpdated; + + const totalProcessed = + productCurrentPricesResult.totalProcessed + + dailyInventoryResult.totalProcessed + + productStatHistoryResult.totalProcessed; + + const allErrors = [ + ...productCurrentPricesResult.errors, + ...dailyInventoryResult.errors, + ...productStatHistoryResult.errors + ]; + + // Log import summary + console.log(` +Historical data import complete: +------------------------------- +Records added: ${totalRecordsAdded} +Records updated: ${totalRecordsUpdated} +Total processed: ${totalProcessed} +Errors: ${allErrors.length} +Time taken: ${formatElapsedTime(startTime)} + `); + + // Final progress update + outputProgress({ + status: "complete", + operation: "Historical data import", + message: `Import complete. Added: ${totalRecordsAdded}, Updated: ${totalRecordsUpdated}, Errors: ${allErrors.length}`, + current: tablesCount, + total: tablesCount, + elapsed: formatElapsedTime(startTime) + }); + + // Log any errors + if (allErrors.length > 0) { + console.log('Errors encountered during import:'); + console.log(JSON.stringify(allErrors, null, 2)); + } + + // Calculate duration + const endTime = Date.now(); + const durationSeconds = Math.round((endTime - startTime) / 1000); + const finalStatus = allErrors.length === 0 ? 'complete' : 'failed'; + const errorMessage = allErrors.length > 0 ? JSON.stringify(allErrors) : null; + + // Update import history + await localConnection.query(` + INSERT INTO import_history ( + table_name, + end_time, + duration_seconds, + records_added, + records_updated, + is_incremental, + status, + error_message, + additional_info + ) + VALUES ($1, NOW(), $2, $3, $4, $5, $6, $7, $8) + `, [ + 'historical_data_combined', + durationSeconds, + totalRecordsAdded, + totalRecordsUpdated, + incrementalUpdate, + finalStatus, + errorMessage, + JSON.stringify({ + totalProcessed, + tablesImported: { + imported_product_current_prices: IMPORT_PRODUCT_CURRENT_PRICES, + imported_daily_inventory: IMPORT_DAILY_INVENTORY, + imported_product_stat_history: IMPORT_PRODUCT_STAT_HISTORY + } + }) + ]); + + // Return summary + return { + recordsAdded: totalRecordsAdded, + recordsUpdated: totalRecordsUpdated, + totalProcessed, + errors: allErrors, + timeTaken: formatElapsedTime(startTime) + }; + } catch (error) { + console.error('Error importing historical data:', error); + + // Final progress update on error + outputProgress({ + status: "failed", + operation: "Historical data import", + message: `Import failed: ${error.message}`, + elapsed: formatElapsedTime(startTime) + }); + + throw error; + } +} + +/** + * Imports product_current_prices data from MySQL to PostgreSQL + */ +async function importProductCurrentPrices( + prodConnection, + localConnection, + oneYearAgoStr, + lastSyncTime, + batchSize, + insertBatchSize, + incrementalUpdate, + startTime +) { + let recordsAdded = 0; + let recordsUpdated = 0; + let totalProcessed = 0; + let errors = []; + let offset = 0; + let allProcessed = false; + + try { + // Get total count for progress reporting + const [countResult] = await prodConnection.query(` + SELECT COUNT(*) as total + FROM product_current_prices + WHERE (date_active >= ? OR date_deactive >= ?) + ${incrementalUpdate && lastSyncTime ? `AND date_deactive > ?` : ''} + `, [oneYearAgoStr, oneYearAgoStr, ...(incrementalUpdate && lastSyncTime ? [lastSyncTime] : [])]); + + const totalCount = countResult[0].total; + + outputProgress({ + status: "running", + operation: "Historical data import - Product Current Prices", + message: `Found ${totalCount} records to process`, + current: 0, + total: totalCount, + elapsed: formatElapsedTime(startTime) + }); + + // Process in batches for better performance + while (!allProcessed) { + try { + // Fetch batch from production + const [rows] = await prodConnection.query(` + SELECT + price_id, + pid, + qty_buy, + is_min_qty_buy, + price_each, + qty_limit, + no_promo, + checkout_offer, + active, + date_active, + date_deactive + FROM product_current_prices + WHERE (date_active >= ? OR date_deactive >= ?) + ${incrementalUpdate && lastSyncTime ? `AND date_deactive > ?` : ''} + ORDER BY price_id + LIMIT ? OFFSET ? + `, [ + oneYearAgoStr, + oneYearAgoStr, + ...(incrementalUpdate && lastSyncTime ? [lastSyncTime] : []), + batchSize, + offset + ]); + + if (rows.length === 0) { + allProcessed = true; + break; + } + + // Process rows in smaller batches for better performance + for (let i = 0; i < rows.length; i += insertBatchSize) { + const batch = rows.slice(i, i + insertBatchSize); + + if (batch.length === 0) continue; + + try { + // Build parameterized query to handle NULL values properly + const values = []; + const placeholders = []; + let placeholderIndex = 1; + + for (const row of batch) { + const rowPlaceholders = [ + `$${placeholderIndex++}`, + `$${placeholderIndex++}`, + `$${placeholderIndex++}`, + `$${placeholderIndex++}`, + `$${placeholderIndex++}`, + `$${placeholderIndex++}`, + `$${placeholderIndex++}`, + `$${placeholderIndex++}`, + `$${placeholderIndex++}`, + `$${placeholderIndex++}`, + `$${placeholderIndex++}` + ]; + + placeholders.push(`(${rowPlaceholders.join(', ')})`); + + values.push( + row.price_id, + row.pid, + row.qty_buy, + row.is_min_qty_buy ? true : false, + row.price_each, + row.qty_limit, // PostgreSQL will handle null values properly + row.no_promo ? true : false, + row.checkout_offer ? true : false, + row.active ? true : false, + validateDate(row.date_active), + validateDate(row.date_deactive) + ); + } + + // Execute batch insert + const result = await localConnection.query(` + WITH ins AS ( + INSERT INTO imported_product_current_prices ( + price_id, pid, qty_buy, is_min_qty_buy, price_each, qty_limit, + no_promo, checkout_offer, active, date_active, date_deactive + ) + VALUES ${placeholders.join(',\n')} + ON CONFLICT (price_id) DO UPDATE SET + pid = EXCLUDED.pid, + qty_buy = EXCLUDED.qty_buy, + is_min_qty_buy = EXCLUDED.is_min_qty_buy, + price_each = EXCLUDED.price_each, + qty_limit = EXCLUDED.qty_limit, + no_promo = EXCLUDED.no_promo, + checkout_offer = EXCLUDED.checkout_offer, + active = EXCLUDED.active, + date_active = EXCLUDED.date_active, + date_deactive = EXCLUDED.date_deactive, + updated = CURRENT_TIMESTAMP + RETURNING (xmax = 0) AS inserted + ) + SELECT + COUNT(*) FILTER (WHERE inserted) AS inserted_count, + COUNT(*) FILTER (WHERE NOT inserted) AS updated_count + FROM ins + `, values); + + // Safely update counts based on the result + if (result && result.rows && result.rows.length > 0) { + const insertedCount = parseInt(result.rows[0].inserted_count || 0); + const updatedCount = parseInt(result.rows[0].updated_count || 0); + + recordsAdded += insertedCount; + recordsUpdated += updatedCount; + } + } catch (error) { + console.error(`Error in batch import of product_current_prices at offset ${i}:`, error); + errors.push({ + table: 'imported_product_current_prices', + batchOffset: i, + batchSize: batch.length, + error: error.message + }); + } + } + + totalProcessed += rows.length; + offset += rows.length; + + // Update progress + outputProgress({ + status: "running", + operation: "Historical data import - Product Current Prices", + message: `Processed ${totalProcessed} of ${totalCount} records`, + current: totalProcessed, + total: totalCount, + elapsed: formatElapsedTime(startTime), + remaining: estimateRemaining(startTime, totalProcessed, totalCount), + rate: calculateRate(startTime, totalProcessed) + }); + } catch (error) { + console.error('Error in batch import of product_current_prices:', error); + errors.push({ + table: 'imported_product_current_prices', + error: error.message, + offset: offset, + batchSize: batchSize + }); + + // Try to continue with next batch + offset += batchSize; + } + } + + // Update sync status + await localConnection.query(` + INSERT INTO sync_status (table_name, last_sync_timestamp) + VALUES ('imported_product_current_prices', NOW()) + ON CONFLICT (table_name) DO UPDATE SET + last_sync_timestamp = NOW() + `); + + return { recordsAdded, recordsUpdated, totalProcessed, errors }; + } catch (error) { + console.error('Error in product current prices import:', error); + return { + recordsAdded, + recordsUpdated, + totalProcessed, + errors: [...errors, { + table: 'imported_product_current_prices', + error: error.message + }] + }; + } +} + +/** + * Imports daily_inventory data from MySQL to PostgreSQL + */ +async function importDailyInventory( + prodConnection, + localConnection, + oneYearAgoStr, + lastSyncTime, + batchSize, + insertBatchSize, + incrementalUpdate, + startTime +) { + let recordsAdded = 0; + let recordsUpdated = 0; + let totalProcessed = 0; + let errors = []; + let offset = 0; + let allProcessed = false; + + try { + // Get total count for progress reporting + const [countResult] = await prodConnection.query(` + SELECT COUNT(*) as total + FROM daily_inventory + WHERE date >= ? + ${incrementalUpdate && lastSyncTime ? `AND stamp > ?` : ''} + `, [oneYearAgoStr, ...(incrementalUpdate && lastSyncTime ? [lastSyncTime] : [])]); + + const totalCount = countResult[0].total; + + outputProgress({ + status: "running", + operation: "Historical data import - Daily Inventory", + message: `Found ${totalCount} records to process`, + current: 0, + total: totalCount, + elapsed: formatElapsedTime(startTime) + }); + + // Process in batches for better performance + while (!allProcessed) { + try { + // Fetch batch from production + const [rows] = await prodConnection.query(` + SELECT + date, + pid, + amountsold, + times_sold, + qtyreceived, + price, + costeach, + stamp + FROM daily_inventory + WHERE date >= ? + ${incrementalUpdate && lastSyncTime ? `AND stamp > ?` : ''} + ORDER BY date, pid + LIMIT ? OFFSET ? + `, [ + oneYearAgoStr, + ...(incrementalUpdate && lastSyncTime ? [lastSyncTime] : []), + batchSize, + offset + ]); + + if (rows.length === 0) { + allProcessed = true; + break; + } + + // Process rows in smaller batches for better performance + for (let i = 0; i < rows.length; i += insertBatchSize) { + const batch = rows.slice(i, i + insertBatchSize); + + if (batch.length === 0) continue; + + try { + // Build parameterized query to handle NULL values properly + const values = []; + const placeholders = []; + let placeholderIndex = 1; + + for (const row of batch) { + const rowPlaceholders = [ + `$${placeholderIndex++}`, + `$${placeholderIndex++}`, + `$${placeholderIndex++}`, + `$${placeholderIndex++}`, + `$${placeholderIndex++}`, + `$${placeholderIndex++}`, + `$${placeholderIndex++}`, + `$${placeholderIndex++}` + ]; + + placeholders.push(`(${rowPlaceholders.join(', ')})`); + + values.push( + validateDate(row.date), + row.pid, + row.amountsold || 0, + row.times_sold || 0, + row.qtyreceived || 0, + row.price || 0, + row.costeach || 0, + validateDate(row.stamp) + ); + } + + // Execute batch insert + const result = await localConnection.query(` + WITH ins AS ( + INSERT INTO imported_daily_inventory ( + date, pid, amountsold, times_sold, qtyreceived, price, costeach, stamp + ) + VALUES ${placeholders.join(',\n')} + ON CONFLICT (date, pid) DO UPDATE SET + amountsold = EXCLUDED.amountsold, + times_sold = EXCLUDED.times_sold, + qtyreceived = EXCLUDED.qtyreceived, + price = EXCLUDED.price, + costeach = EXCLUDED.costeach, + stamp = EXCLUDED.stamp, + updated = CURRENT_TIMESTAMP + RETURNING (xmax = 0) AS inserted + ) + SELECT + COUNT(*) FILTER (WHERE inserted) AS inserted_count, + COUNT(*) FILTER (WHERE NOT inserted) AS updated_count + FROM ins + `, values); + + // Safely update counts based on the result + if (result && result.rows && result.rows.length > 0) { + const insertedCount = parseInt(result.rows[0].inserted_count || 0); + const updatedCount = parseInt(result.rows[0].updated_count || 0); + + recordsAdded += insertedCount; + recordsUpdated += updatedCount; + } + } catch (error) { + console.error(`Error in batch import of daily_inventory at offset ${i}:`, error); + errors.push({ + table: 'imported_daily_inventory', + batchOffset: i, + batchSize: batch.length, + error: error.message + }); + } + } + + totalProcessed += rows.length; + offset += rows.length; + + // Update progress + outputProgress({ + status: "running", + operation: "Historical data import - Daily Inventory", + message: `Processed ${totalProcessed} of ${totalCount} records`, + current: totalProcessed, + total: totalCount, + elapsed: formatElapsedTime(startTime), + remaining: estimateRemaining(startTime, totalProcessed, totalCount), + rate: calculateRate(startTime, totalProcessed) + }); + } catch (error) { + console.error('Error in batch import of daily_inventory:', error); + errors.push({ + table: 'imported_daily_inventory', + error: error.message, + offset: offset, + batchSize: batchSize + }); + + // Try to continue with next batch + offset += batchSize; + } + } + + // Update sync status + await localConnection.query(` + INSERT INTO sync_status (table_name, last_sync_timestamp) + VALUES ('imported_daily_inventory', NOW()) + ON CONFLICT (table_name) DO UPDATE SET + last_sync_timestamp = NOW() + `); + + return { recordsAdded, recordsUpdated, totalProcessed, errors }; + } catch (error) { + console.error('Error in daily inventory import:', error); + return { + recordsAdded, + recordsUpdated, + totalProcessed, + errors: [...errors, { + table: 'imported_daily_inventory', + error: error.message + }] + }; + } +} + +/** + * Imports product_stat_history data from MySQL to PostgreSQL + * Using fast direct inserts without conflict checking + */ +async function importProductStatHistory( + prodConnection, + localConnection, + recentDateStr, // Use more recent date instead of one year ago + lastSyncTime, + batchSize, + insertBatchSize, + incrementalUpdate, + startTime, + recentMonths // Add parameter for recent months +) { + let recordsAdded = 0; + let recordsUpdated = 0; + let totalProcessed = 0; + let errors = []; + let offset = 0; + let allProcessed = false; + let lastRateCheck = Date.now(); + let lastProcessed = 0; + + try { + // Get total count for progress reporting + const [countResult] = await prodConnection.query(` + SELECT COUNT(*) as total + FROM product_stat_history + WHERE date >= ? + ${incrementalUpdate && lastSyncTime ? `AND date > ?` : ''} + `, [recentDateStr, ...(incrementalUpdate && lastSyncTime ? [lastSyncTime] : [])]); + + const totalCount = countResult[0].total; + console.log(`Found ${totalCount} records to process in product_stat_history (using recent date: ${recentDateStr})`); + + // Progress indicator + outputProgress({ + status: "running", + operation: "Historical data import - Product Stat History", + message: `Found ${totalCount} records to process (last ${recentMonths} months only)`, + current: 0, + total: totalCount, + elapsed: formatElapsedTime(startTime) + }); + + // If not incremental, truncate the table first for better performance + if (!incrementalUpdate) { + console.log('Truncating imported_product_stat_history for full import...'); + await localConnection.query('TRUNCATE TABLE imported_product_stat_history'); + } else if (lastSyncTime) { + // For incremental updates, delete records that will be reimported + console.log(`Deleting records from imported_product_stat_history since ${lastSyncTime}...`); + await localConnection.query('DELETE FROM imported_product_stat_history WHERE date > $1', [lastSyncTime]); + } + + // Process in batches for better performance + while (!allProcessed) { + try { + // Fetch batch from production with minimal filtering and no sorting + const [rows] = await prodConnection.query(` + SELECT + pid, + date, + COALESCE(score, 0) as score, + COALESCE(score2, 0) as score2, + COALESCE(qty_in_baskets, 0) as qty_in_baskets, + COALESCE(qty_sold, 0) as qty_sold, + COALESCE(notifies_set, 0) as notifies_set, + COALESCE(visibility_score, 0) as visibility_score, + COALESCE(health_score, 0) as health_score, + COALESCE(sold_view_score, 0) as sold_view_score + FROM product_stat_history + WHERE date >= ? + ${incrementalUpdate && lastSyncTime ? `AND date > ?` : ''} + LIMIT ? OFFSET ? + `, [ + recentDateStr, + ...(incrementalUpdate && lastSyncTime ? [lastSyncTime] : []), + batchSize, + offset + ]); + + if (rows.length === 0) { + allProcessed = true; + break; + } + + // Process rows in smaller batches for better performance + for (let i = 0; i < rows.length; i += insertBatchSize) { + const batch = rows.slice(i, i + insertBatchSize); + + if (batch.length === 0) continue; + + try { + // Build parameterized query to handle NULL values properly + const values = []; + const placeholders = []; + let placeholderIndex = 1; + + for (const row of batch) { + const rowPlaceholders = [ + `$${placeholderIndex++}`, + `$${placeholderIndex++}`, + `$${placeholderIndex++}`, + `$${placeholderIndex++}`, + `$${placeholderIndex++}`, + `$${placeholderIndex++}`, + `$${placeholderIndex++}`, + `$${placeholderIndex++}`, + `$${placeholderIndex++}`, + `$${placeholderIndex++}` + ]; + + placeholders.push(`(${rowPlaceholders.join(', ')})`); + + values.push( + row.pid, + validateDate(row.date), + row.score, + row.score2, + row.qty_in_baskets, + row.qty_sold, + row.notifies_set, + row.visibility_score, + row.health_score, + row.sold_view_score + ); + } + + // Execute direct batch insert without conflict checking + await localConnection.query(` + INSERT INTO imported_product_stat_history ( + pid, date, score, score2, qty_in_baskets, qty_sold, notifies_set, + visibility_score, health_score, sold_view_score + ) + VALUES ${placeholders.join(',\n')} + `, values); + + // All inserts are new records when using this approach + recordsAdded += batch.length; + } catch (error) { + console.error(`Error in batch insert of product_stat_history at offset ${i}:`, error); + errors.push({ + table: 'imported_product_stat_history', + batchOffset: i, + batchSize: batch.length, + error: error.message + }); + } + } + + totalProcessed += rows.length; + offset += rows.length; + + // Calculate current rate every 10 seconds or 100,000 records + const now = Date.now(); + if (now - lastRateCheck > 10000 || totalProcessed - lastProcessed > 100000) { + const timeElapsed = (now - lastRateCheck) / 1000; // seconds + const recordsProcessed = totalProcessed - lastProcessed; + const currentRate = Math.round(recordsProcessed / timeElapsed); + + console.log(`Current import rate: ${currentRate} records/second`); + + lastRateCheck = now; + lastProcessed = totalProcessed; + } + + // Update progress + outputProgress({ + status: "running", + operation: "Historical data import - Product Stat History", + message: `Processed ${totalProcessed} of ${totalCount} records`, + current: totalProcessed, + total: totalCount, + elapsed: formatElapsedTime(startTime), + remaining: estimateRemaining(startTime, totalProcessed, totalCount), + rate: calculateRate(startTime, totalProcessed) + }); + } catch (error) { + console.error('Error in batch import of product_stat_history:', error); + errors.push({ + table: 'imported_product_stat_history', + error: error.message, + offset: offset, + batchSize: batchSize + }); + + // Try to continue with next batch + offset += batchSize; + } + } + + // Update sync status + await localConnection.query(` + INSERT INTO sync_status (table_name, last_sync_timestamp) + VALUES ('imported_product_stat_history', NOW()) + ON CONFLICT (table_name) DO UPDATE SET + last_sync_timestamp = NOW() + `); + + return { recordsAdded, recordsUpdated, totalProcessed, errors }; + } catch (error) { + console.error('Error in product stat history import:', error); + return { + recordsAdded, + recordsUpdated, + totalProcessed, + errors: [...errors, { + table: 'imported_product_stat_history', + error: error.message + }] + }; + } +} + +module.exports = importHistoricalData; \ No newline at end of file diff --git a/inventory-server/scripts/metrics-new/update-daily-snapshots.js b/inventory-server/scripts/metrics-new/update-daily-snapshots.js new file mode 100644 index 0000000..44c84b5 --- /dev/null +++ b/inventory-server/scripts/metrics-new/update-daily-snapshots.js @@ -0,0 +1,306 @@ +const path = require('path'); +const fs = require('fs'); + +// Change working directory to script directory +process.chdir(path.dirname(__filename)); + +// Load environment variables +require('dotenv').config({ path: path.resolve(__dirname, '../..', '.env') }); + +// Add error handler for uncaught exceptions +process.on('uncaughtException', (error) => { + console.error('Uncaught Exception:', error); + process.exit(1); +}); + +// Add error handler for unhandled promise rejections +process.on('unhandledRejection', (reason, promise) => { + console.error('Unhandled Rejection at:', promise, 'reason:', reason); + process.exit(1); +}); + +// Load progress module +const progress = require('./utils/progress'); + +// Store progress functions in global scope to ensure availability +global.formatElapsedTime = progress.formatElapsedTime; +global.estimateRemaining = progress.estimateRemaining; +global.calculateRate = progress.calculateRate; +global.outputProgress = progress.outputProgress; +global.clearProgress = progress.clearProgress; +global.getProgress = progress.getProgress; +global.logError = progress.logError; + +// Load database module +const { getConnection, closePool } = require('./utils/db'); + +// Add cancel handler +let isCancelled = false; + +function cancelCalculation() { + isCancelled = true; + console.log('Calculation has been cancelled by user'); + + // Force-terminate any query that's been running for more than 5 seconds + try { + const connection = getConnection(); + connection.then(async (conn) => { + try { + // Identify and terminate long-running queries from our application + await conn.query(` + SELECT pg_cancel_backend(pid) + FROM pg_stat_activity + WHERE query_start < now() - interval '5 seconds' + AND application_name LIKE '%node%' + AND query NOT LIKE '%pg_cancel_backend%' + `); + + // Release connection + conn.release(); + } catch (err) { + console.error('Error during force cancellation:', err); + conn.release(); + } + }).catch(err => { + console.error('Could not get connection for cancellation:', err); + }); + } catch (err) { + console.error('Failed to terminate running queries:', err); + } + + return { + success: true, + message: 'Calculation has been cancelled' + }; +} + +// Handle SIGTERM signal for cancellation +process.on('SIGTERM', cancelCalculation); + +async function updateDailySnapshots() { + let connection; + const startTime = Date.now(); + let calculateHistoryId; + + // Set a maximum execution time (30 minutes) + const MAX_EXECUTION_TIME = 30 * 60 * 1000; + const timeout = setTimeout(() => { + console.error(`Calculation timed out after ${MAX_EXECUTION_TIME/1000} seconds, forcing termination`); + // Call cancel and force exit + cancelCalculation(); + process.exit(1); + }, MAX_EXECUTION_TIME); + + try { + // Read the SQL file + const sqlFilePath = path.resolve(__dirname, 'update_daily_snapshots.sql'); + const sqlQuery = fs.readFileSync(sqlFilePath, 'utf8'); + + // Clean up any previously running calculations + connection = await getConnection(); + + // Ensure the calculate_status table exists and has the correct structure + await connection.query(` + CREATE TABLE IF NOT EXISTS calculate_status ( + module_name TEXT PRIMARY KEY, + last_calculation_timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP + ) + `); + + await connection.query(` + UPDATE calculate_history + SET + status = 'cancelled', + end_time = NOW(), + duration_seconds = EXTRACT(EPOCH FROM (NOW() - start_time))::INTEGER, + error_message = 'Previous calculation was not completed properly' + WHERE status = 'running' AND additional_info->>'type' = 'daily_snapshots' + `); + + // Create history record for this calculation + const historyResult = await connection.query(` + INSERT INTO calculate_history ( + start_time, + status, + additional_info + ) VALUES ( + NOW(), + 'running', + jsonb_build_object( + 'type', 'daily_snapshots', + 'sql_file', 'update_daily_snapshots.sql' + ) + ) RETURNING id + `); + calculateHistoryId = historyResult.rows[0].id; + + // Initialize progress + global.outputProgress({ + status: 'running', + operation: 'Starting daily snapshots calculation', + current: 0, + total: 100, + elapsed: '0s', + remaining: 'Calculating...', + rate: 0, + percentage: '0', + timing: { + start_time: new Date(startTime).toISOString(), + end_time: new Date().toISOString(), + elapsed_seconds: Math.round((Date.now() - startTime) / 1000) + } + }); + + // Execute the SQL query + global.outputProgress({ + status: 'running', + operation: 'Executing daily snapshots SQL query', + current: 25, + total: 100, + elapsed: global.formatElapsedTime(startTime), + remaining: 'Calculating...', + rate: 0, + percentage: '25', + timing: { + start_time: new Date(startTime).toISOString(), + end_time: new Date().toISOString(), + elapsed_seconds: Math.round((Date.now() - startTime) / 1000) + } + }); + + await connection.query(sqlQuery); + + // Update calculate_status table + await connection.query(` + INSERT INTO calculate_status (module_name, last_calculation_timestamp) + VALUES ($1, $2) + ON CONFLICT (module_name) DO UPDATE + SET last_calculation_timestamp = EXCLUDED.last_calculation_timestamp + `, ['daily_snapshots', new Date()]); + + // Update progress to 100% + global.outputProgress({ + status: 'complete', + operation: 'Daily snapshots calculation complete', + current: 100, + total: 100, + elapsed: global.formatElapsedTime(startTime), + remaining: '0s', + rate: 0, + percentage: '100', + timing: { + start_time: new Date(startTime).toISOString(), + end_time: new Date().toISOString(), + elapsed_seconds: Math.round((Date.now() - startTime) / 1000) + } + }); + + // Update history with completion + await connection.query(` + UPDATE calculate_history + SET + end_time = NOW(), + duration_seconds = $1, + status = 'completed' + WHERE id = $2 + `, [Math.round((Date.now() - startTime) / 1000), calculateHistoryId]); + + // Clear progress file on successful completion + global.clearProgress(); + + return { + success: true, + message: 'Daily snapshots calculation completed successfully', + duration: Math.round((Date.now() - startTime) / 1000) + }; + } catch (error) { + const endTime = Date.now(); + const totalElapsedSeconds = Math.round((endTime - startTime) / 1000); + + // Update history with error + if (connection && calculateHistoryId) { + await connection.query(` + UPDATE calculate_history + SET + end_time = NOW(), + duration_seconds = $1, + status = $2, + error_message = $3 + WHERE id = $4 + `, [ + totalElapsedSeconds, + isCancelled ? 'cancelled' : 'failed', + error.message, + calculateHistoryId + ]); + } + + if (isCancelled) { + global.outputProgress({ + status: 'cancelled', + operation: 'Calculation cancelled', + current: 50, + total: 100, + elapsed: global.formatElapsedTime(startTime), + remaining: null, + rate: 0, + percentage: '50', + timing: { + start_time: new Date(startTime).toISOString(), + end_time: new Date().toISOString(), + elapsed_seconds: Math.round((Date.now() - startTime) / 1000) + } + }); + } else { + global.outputProgress({ + status: 'error', + operation: 'Error: ' + error.message, + current: 50, + total: 100, + elapsed: global.formatElapsedTime(startTime), + remaining: null, + rate: 0, + percentage: '50', + timing: { + start_time: new Date(startTime).toISOString(), + end_time: new Date().toISOString(), + elapsed_seconds: Math.round((Date.now() - startTime) / 1000) + } + }); + } + throw error; + } finally { + // Clear the timeout to prevent forced termination + clearTimeout(timeout); + + // Always release connection + if (connection) { + try { + connection.release(); + } catch (err) { + console.error('Error in final cleanup:', err); + } + } + } +} + +// Export as a module with all necessary functions +module.exports = { + updateDailySnapshots, + cancelCalculation, + getProgress: global.getProgress +}; + +// Run directly if called from command line +if (require.main === module) { + updateDailySnapshots().then(() => { + closePool().then(() => { + process.exit(0); + }); + }).catch(error => { + console.error('Error:', error); + closePool().then(() => { + process.exit(1); + }); + }); +} \ No newline at end of file diff --git a/inventory-server/scripts/metrics-new/update-periodic-metrics.js b/inventory-server/scripts/metrics-new/update-periodic-metrics.js new file mode 100644 index 0000000..33617a7 --- /dev/null +++ b/inventory-server/scripts/metrics-new/update-periodic-metrics.js @@ -0,0 +1,306 @@ +const path = require('path'); +const fs = require('fs'); + +// Change working directory to script directory +process.chdir(path.dirname(__filename)); + +// Load environment variables +require('dotenv').config({ path: path.resolve(__dirname, '../..', '.env') }); + +// Add error handler for uncaught exceptions +process.on('uncaughtException', (error) => { + console.error('Uncaught Exception:', error); + process.exit(1); +}); + +// Add error handler for unhandled promise rejections +process.on('unhandledRejection', (reason, promise) => { + console.error('Unhandled Rejection at:', promise, 'reason:', reason); + process.exit(1); +}); + +// Load progress module +const progress = require('./utils/progress'); + +// Store progress functions in global scope to ensure availability +global.formatElapsedTime = progress.formatElapsedTime; +global.estimateRemaining = progress.estimateRemaining; +global.calculateRate = progress.calculateRate; +global.outputProgress = progress.outputProgress; +global.clearProgress = progress.clearProgress; +global.getProgress = progress.getProgress; +global.logError = progress.logError; + +// Load database module +const { getConnection, closePool } = require('./utils/db'); + +// Add cancel handler +let isCancelled = false; + +function cancelCalculation() { + isCancelled = true; + console.log('Calculation has been cancelled by user'); + + // Force-terminate any query that's been running for more than 5 seconds + try { + const connection = getConnection(); + connection.then(async (conn) => { + try { + // Identify and terminate long-running queries from our application + await conn.query(` + SELECT pg_cancel_backend(pid) + FROM pg_stat_activity + WHERE query_start < now() - interval '5 seconds' + AND application_name LIKE '%node%' + AND query NOT LIKE '%pg_cancel_backend%' + `); + + // Release connection + conn.release(); + } catch (err) { + console.error('Error during force cancellation:', err); + conn.release(); + } + }).catch(err => { + console.error('Could not get connection for cancellation:', err); + }); + } catch (err) { + console.error('Failed to terminate running queries:', err); + } + + return { + success: true, + message: 'Calculation has been cancelled' + }; +} + +// Handle SIGTERM signal for cancellation +process.on('SIGTERM', cancelCalculation); + +async function updatePeriodicMetrics() { + let connection; + const startTime = Date.now(); + let calculateHistoryId; + + // Set a maximum execution time (30 minutes) + const MAX_EXECUTION_TIME = 30 * 60 * 1000; + const timeout = setTimeout(() => { + console.error(`Calculation timed out after ${MAX_EXECUTION_TIME/1000} seconds, forcing termination`); + // Call cancel and force exit + cancelCalculation(); + process.exit(1); + }, MAX_EXECUTION_TIME); + + try { + // Read the SQL file + const sqlFilePath = path.resolve(__dirname, 'update_periodic_metrics.sql'); + const sqlQuery = fs.readFileSync(sqlFilePath, 'utf8'); + + // Clean up any previously running calculations + connection = await getConnection(); + + // Ensure the calculate_status table exists and has the correct structure + await connection.query(` + CREATE TABLE IF NOT EXISTS calculate_status ( + module_name TEXT PRIMARY KEY, + last_calculation_timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP + ) + `); + + await connection.query(` + UPDATE calculate_history + SET + status = 'cancelled', + end_time = NOW(), + duration_seconds = EXTRACT(EPOCH FROM (NOW() - start_time))::INTEGER, + error_message = 'Previous calculation was not completed properly' + WHERE status = 'running' AND additional_info->>'type' = 'periodic_metrics' + `); + + // Create history record for this calculation + const historyResult = await connection.query(` + INSERT INTO calculate_history ( + start_time, + status, + additional_info + ) VALUES ( + NOW(), + 'running', + jsonb_build_object( + 'type', 'periodic_metrics', + 'sql_file', 'update_periodic_metrics.sql' + ) + ) RETURNING id + `); + calculateHistoryId = historyResult.rows[0].id; + + // Initialize progress + global.outputProgress({ + status: 'running', + operation: 'Starting periodic metrics calculation', + current: 0, + total: 100, + elapsed: '0s', + remaining: 'Calculating...', + rate: 0, + percentage: '0', + timing: { + start_time: new Date(startTime).toISOString(), + end_time: new Date().toISOString(), + elapsed_seconds: Math.round((Date.now() - startTime) / 1000) + } + }); + + // Execute the SQL query + global.outputProgress({ + status: 'running', + operation: 'Executing periodic metrics SQL query', + current: 25, + total: 100, + elapsed: global.formatElapsedTime(startTime), + remaining: 'Calculating...', + rate: 0, + percentage: '25', + timing: { + start_time: new Date(startTime).toISOString(), + end_time: new Date().toISOString(), + elapsed_seconds: Math.round((Date.now() - startTime) / 1000) + } + }); + + await connection.query(sqlQuery); + + // Update calculate_status table + await connection.query(` + INSERT INTO calculate_status (module_name, last_calculation_timestamp) + VALUES ($1, $2) + ON CONFLICT (module_name) DO UPDATE + SET last_calculation_timestamp = EXCLUDED.last_calculation_timestamp + `, ['periodic_metrics', new Date()]); + + // Update progress to 100% + global.outputProgress({ + status: 'complete', + operation: 'Periodic metrics calculation complete', + current: 100, + total: 100, + elapsed: global.formatElapsedTime(startTime), + remaining: '0s', + rate: 0, + percentage: '100', + timing: { + start_time: new Date(startTime).toISOString(), + end_time: new Date().toISOString(), + elapsed_seconds: Math.round((Date.now() - startTime) / 1000) + } + }); + + // Update history with completion + await connection.query(` + UPDATE calculate_history + SET + end_time = NOW(), + duration_seconds = $1, + status = 'completed' + WHERE id = $2 + `, [Math.round((Date.now() - startTime) / 1000), calculateHistoryId]); + + // Clear progress file on successful completion + global.clearProgress(); + + return { + success: true, + message: 'Periodic metrics calculation completed successfully', + duration: Math.round((Date.now() - startTime) / 1000) + }; + } catch (error) { + const endTime = Date.now(); + const totalElapsedSeconds = Math.round((endTime - startTime) / 1000); + + // Update history with error + if (connection && calculateHistoryId) { + await connection.query(` + UPDATE calculate_history + SET + end_time = NOW(), + duration_seconds = $1, + status = $2, + error_message = $3 + WHERE id = $4 + `, [ + totalElapsedSeconds, + isCancelled ? 'cancelled' : 'failed', + error.message, + calculateHistoryId + ]); + } + + if (isCancelled) { + global.outputProgress({ + status: 'cancelled', + operation: 'Calculation cancelled', + current: 50, + total: 100, + elapsed: global.formatElapsedTime(startTime), + remaining: null, + rate: 0, + percentage: '50', + timing: { + start_time: new Date(startTime).toISOString(), + end_time: new Date().toISOString(), + elapsed_seconds: Math.round((Date.now() - startTime) / 1000) + } + }); + } else { + global.outputProgress({ + status: 'error', + operation: 'Error: ' + error.message, + current: 50, + total: 100, + elapsed: global.formatElapsedTime(startTime), + remaining: null, + rate: 0, + percentage: '50', + timing: { + start_time: new Date(startTime).toISOString(), + end_time: new Date().toISOString(), + elapsed_seconds: Math.round((Date.now() - startTime) / 1000) + } + }); + } + throw error; + } finally { + // Clear the timeout to prevent forced termination + clearTimeout(timeout); + + // Always release connection + if (connection) { + try { + connection.release(); + } catch (err) { + console.error('Error in final cleanup:', err); + } + } + } +} + +// Export as a module with all necessary functions +module.exports = { + updatePeriodicMetrics, + cancelCalculation, + getProgress: global.getProgress +}; + +// Run directly if called from command line +if (require.main === module) { + updatePeriodicMetrics().then(() => { + closePool().then(() => { + process.exit(0); + }); + }).catch(error => { + console.error('Error:', error); + closePool().then(() => { + process.exit(1); + }); + }); +} \ No newline at end of file diff --git a/inventory-server/scripts/metrics-new/update-product-metrics.js b/inventory-server/scripts/metrics-new/update-product-metrics.js new file mode 100644 index 0000000..a6c00d8 --- /dev/null +++ b/inventory-server/scripts/metrics-new/update-product-metrics.js @@ -0,0 +1,306 @@ +const path = require('path'); +const fs = require('fs'); + +// Change working directory to script directory +process.chdir(path.dirname(__filename)); + +// Load environment variables +require('dotenv').config({ path: path.resolve(__dirname, '../..', '.env') }); + +// Add error handler for uncaught exceptions +process.on('uncaughtException', (error) => { + console.error('Uncaught Exception:', error); + process.exit(1); +}); + +// Add error handler for unhandled promise rejections +process.on('unhandledRejection', (reason, promise) => { + console.error('Unhandled Rejection at:', promise, 'reason:', reason); + process.exit(1); +}); + +// Load progress module +const progress = require('./utils/progress'); + +// Store progress functions in global scope to ensure availability +global.formatElapsedTime = progress.formatElapsedTime; +global.estimateRemaining = progress.estimateRemaining; +global.calculateRate = progress.calculateRate; +global.outputProgress = progress.outputProgress; +global.clearProgress = progress.clearProgress; +global.getProgress = progress.getProgress; +global.logError = progress.logError; + +// Load database module +const { getConnection, closePool } = require('./utils/db'); + +// Add cancel handler +let isCancelled = false; + +function cancelCalculation() { + isCancelled = true; + console.log('Calculation has been cancelled by user'); + + // Force-terminate any query that's been running for more than 5 seconds + try { + const connection = getConnection(); + connection.then(async (conn) => { + try { + // Identify and terminate long-running queries from our application + await conn.query(` + SELECT pg_cancel_backend(pid) + FROM pg_stat_activity + WHERE query_start < now() - interval '5 seconds' + AND application_name LIKE '%node%' + AND query NOT LIKE '%pg_cancel_backend%' + `); + + // Release connection + conn.release(); + } catch (err) { + console.error('Error during force cancellation:', err); + conn.release(); + } + }).catch(err => { + console.error('Could not get connection for cancellation:', err); + }); + } catch (err) { + console.error('Failed to terminate running queries:', err); + } + + return { + success: true, + message: 'Calculation has been cancelled' + }; +} + +// Handle SIGTERM signal for cancellation +process.on('SIGTERM', cancelCalculation); + +async function updateProductMetrics() { + let connection; + const startTime = Date.now(); + let calculateHistoryId; + + // Set a maximum execution time (30 minutes) + const MAX_EXECUTION_TIME = 30 * 60 * 1000; + const timeout = setTimeout(() => { + console.error(`Calculation timed out after ${MAX_EXECUTION_TIME/1000} seconds, forcing termination`); + // Call cancel and force exit + cancelCalculation(); + process.exit(1); + }, MAX_EXECUTION_TIME); + + try { + // Read the SQL file + const sqlFilePath = path.resolve(__dirname, 'update_product_metrics.sql'); + const sqlQuery = fs.readFileSync(sqlFilePath, 'utf8'); + + // Clean up any previously running calculations + connection = await getConnection(); + + // Ensure the calculate_status table exists and has the correct structure + await connection.query(` + CREATE TABLE IF NOT EXISTS calculate_status ( + module_name TEXT PRIMARY KEY, + last_calculation_timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP + ) + `); + + await connection.query(` + UPDATE calculate_history + SET + status = 'cancelled', + end_time = NOW(), + duration_seconds = EXTRACT(EPOCH FROM (NOW() - start_time))::INTEGER, + error_message = 'Previous calculation was not completed properly' + WHERE status = 'running' AND additional_info->>'type' = 'product_metrics' + `); + + // Create history record for this calculation + const historyResult = await connection.query(` + INSERT INTO calculate_history ( + start_time, + status, + additional_info + ) VALUES ( + NOW(), + 'running', + jsonb_build_object( + 'type', 'product_metrics', + 'sql_file', 'update_product_metrics.sql' + ) + ) RETURNING id + `); + calculateHistoryId = historyResult.rows[0].id; + + // Initialize progress + global.outputProgress({ + status: 'running', + operation: 'Starting product metrics calculation', + current: 0, + total: 100, + elapsed: '0s', + remaining: 'Calculating...', + rate: 0, + percentage: '0', + timing: { + start_time: new Date(startTime).toISOString(), + end_time: new Date().toISOString(), + elapsed_seconds: Math.round((Date.now() - startTime) / 1000) + } + }); + + // Execute the SQL query + global.outputProgress({ + status: 'running', + operation: 'Executing product metrics SQL query', + current: 25, + total: 100, + elapsed: global.formatElapsedTime(startTime), + remaining: 'Calculating...', + rate: 0, + percentage: '25', + timing: { + start_time: new Date(startTime).toISOString(), + end_time: new Date().toISOString(), + elapsed_seconds: Math.round((Date.now() - startTime) / 1000) + } + }); + + await connection.query(sqlQuery); + + // Update calculate_status table + await connection.query(` + INSERT INTO calculate_status (module_name, last_calculation_timestamp) + VALUES ($1, $2) + ON CONFLICT (module_name) DO UPDATE + SET last_calculation_timestamp = EXCLUDED.last_calculation_timestamp + `, ['product_metrics', new Date()]); + + // Update progress to 100% + global.outputProgress({ + status: 'complete', + operation: 'Product metrics calculation complete', + current: 100, + total: 100, + elapsed: global.formatElapsedTime(startTime), + remaining: '0s', + rate: 0, + percentage: '100', + timing: { + start_time: new Date(startTime).toISOString(), + end_time: new Date().toISOString(), + elapsed_seconds: Math.round((Date.now() - startTime) / 1000) + } + }); + + // Update history with completion + await connection.query(` + UPDATE calculate_history + SET + end_time = NOW(), + duration_seconds = $1, + status = 'completed' + WHERE id = $2 + `, [Math.round((Date.now() - startTime) / 1000), calculateHistoryId]); + + // Clear progress file on successful completion + global.clearProgress(); + + return { + success: true, + message: 'Product metrics calculation completed successfully', + duration: Math.round((Date.now() - startTime) / 1000) + }; + } catch (error) { + const endTime = Date.now(); + const totalElapsedSeconds = Math.round((endTime - startTime) / 1000); + + // Update history with error + if (connection && calculateHistoryId) { + await connection.query(` + UPDATE calculate_history + SET + end_time = NOW(), + duration_seconds = $1, + status = $2, + error_message = $3 + WHERE id = $4 + `, [ + totalElapsedSeconds, + isCancelled ? 'cancelled' : 'failed', + error.message, + calculateHistoryId + ]); + } + + if (isCancelled) { + global.outputProgress({ + status: 'cancelled', + operation: 'Calculation cancelled', + current: 50, + total: 100, + elapsed: global.formatElapsedTime(startTime), + remaining: null, + rate: 0, + percentage: '50', + timing: { + start_time: new Date(startTime).toISOString(), + end_time: new Date().toISOString(), + elapsed_seconds: Math.round((Date.now() - startTime) / 1000) + } + }); + } else { + global.outputProgress({ + status: 'error', + operation: 'Error: ' + error.message, + current: 50, + total: 100, + elapsed: global.formatElapsedTime(startTime), + remaining: null, + rate: 0, + percentage: '50', + timing: { + start_time: new Date(startTime).toISOString(), + end_time: new Date().toISOString(), + elapsed_seconds: Math.round((Date.now() - startTime) / 1000) + } + }); + } + throw error; + } finally { + // Clear the timeout to prevent forced termination + clearTimeout(timeout); + + // Always release connection + if (connection) { + try { + connection.release(); + } catch (err) { + console.error('Error in final cleanup:', err); + } + } + } +} + +// Export as a module with all necessary functions +module.exports = { + updateProductMetrics, + cancelCalculation, + getProgress: global.getProgress +}; + +// Run directly if called from command line +if (require.main === module) { + updateProductMetrics().then(() => { + closePool().then(() => { + process.exit(0); + }); + }).catch(error => { + console.error('Error:', error); + closePool().then(() => { + process.exit(1); + }); + }); +} \ No newline at end of file diff --git a/inventory-server/scripts/metrics-new/update_daily_snapshots.sql b/inventory-server/scripts/metrics-new/update_daily_snapshots.sql index c99c0d2..b0f9f8a 100644 --- a/inventory-server/scripts/metrics-new/update_daily_snapshots.sql +++ b/inventory-server/scripts/metrics-new/update_daily_snapshots.sql @@ -5,7 +5,7 @@ DO $$ DECLARE - _module_name VARCHAR := 'daily_snapshots'; + _module_name TEXT := 'daily_snapshots'; _start_time TIMESTAMPTZ := clock_timestamp(); -- Time execution started _last_calc_time TIMESTAMPTZ; _target_date DATE := CURRENT_DATE; -- Always recalculate today for simplicity with hourly runs diff --git a/inventory-server/scripts/metrics-new/update_periodic_metrics.sql b/inventory-server/scripts/metrics-new/update_periodic_metrics.sql index 5988c81..46b0591 100644 --- a/inventory-server/scripts/metrics-new/update_periodic_metrics.sql +++ b/inventory-server/scripts/metrics-new/update_periodic_metrics.sql @@ -5,7 +5,7 @@ DO $$ DECLARE - _module_name VARCHAR := 'periodic_metrics'; + _module_name TEXT := 'periodic_metrics'; _start_time TIMESTAMPTZ := clock_timestamp(); _last_calc_time TIMESTAMPTZ; _abc_basis VARCHAR; @@ -25,7 +25,7 @@ BEGIN WITH LeadTimes AS ( SELECT pid, - AVG(GREATEST(1, DATE_PART('day', last_received_date - date))) AS avg_days -- Use GREATEST(1,...) to avoid 0 or negative days + AVG(GREATEST(1, (last_received_date::date - date::date))) AS avg_days -- Use GREATEST(1,...) to avoid 0 or negative days FROM public.purchase_orders WHERE status = 'received' -- Or potentially 'full_received' if using that status AND last_received_date IS NOT NULL diff --git a/inventory-server/scripts/metrics-new/update_product_metrics.sql b/inventory-server/scripts/metrics-new/update_product_metrics.sql index bac9653..6f28a94 100644 --- a/inventory-server/scripts/metrics-new/update_product_metrics.sql +++ b/inventory-server/scripts/metrics-new/update_product_metrics.sql @@ -5,7 +5,7 @@ DO $$ DECLARE - _module_name VARCHAR := 'product_metrics'; + _module_name TEXT := 'product_metrics'; _start_time TIMESTAMPTZ := clock_timestamp(); _last_calc_time TIMESTAMPTZ; _current_date DATE := CURRENT_DATE; @@ -179,7 +179,13 @@ BEGIN ci.current_price, ci.current_regular_price, ci.current_cost_price, ci.current_effective_cost, ci.current_stock, ci.current_stock * ci.current_effective_cost, ci.current_stock * ci.current_price, ci.current_stock * ci.current_regular_price, COALESCE(ooi.on_order_qty, 0), COALESCE(ooi.on_order_cost, 0.00), COALESCE(ooi.on_order_qty, 0) * ci.current_price, ooi.earliest_expected_date, - ci.created_at::date, COALESCE(ci.first_received::date, hd.date_first_received_calc), hd.date_last_received_calc, hd.date_first_sold, COALESCE(ci.date_last_sold, hd.max_order_date), DATE_PART('day', _current_date - LEAST(ci.created_at::date, hd.date_first_sold)), + ci.created_at::date, COALESCE(ci.first_received::date, hd.date_first_received_calc), hd.date_last_received_calc, hd.date_first_sold, COALESCE(ci.date_last_sold, hd.max_order_date), + CASE + WHEN ci.created_at IS NULL AND hd.date_first_sold IS NULL THEN 0 + WHEN ci.created_at IS NULL THEN (_current_date - hd.date_first_sold)::integer + WHEN hd.date_first_sold IS NULL THEN (_current_date - ci.created_at::date)::integer + ELSE (_current_date - LEAST(ci.created_at::date, hd.date_first_sold))::integer + END AS age_days, sa.sales_7d, sa.revenue_7d, sa.sales_14d, sa.revenue_14d, sa.sales_30d, sa.revenue_30d, sa.cogs_30d, sa.profit_30d, sa.returns_units_30d, sa.returns_revenue_30d, sa.discounts_30d, sa.gross_revenue_30d, sa.gross_regular_revenue_30d, sa.stockout_days_30d, sa.sales_365d, sa.revenue_365d, @@ -287,7 +293,7 @@ BEGIN sales_velocity_daily = EXCLUDED.sales_velocity_daily, config_lead_time = EXCLUDED.config_lead_time, config_days_of_stock = EXCLUDED.config_days_of_stock, config_safety_stock = EXCLUDED.config_safety_stock, planning_period_days = EXCLUDED.planning_period_days, lead_time_forecast_units = EXCLUDED.lead_time_forecast_units, days_of_stock_forecast_units = EXCLUDED.days_of_stock_forecast_units, planning_period_forecast_units = EXCLUDED.planning_period_forecast_units, lead_time_closing_stock = EXCLUDED.lead_time_closing_stock, days_of_stock_closing_stock = EXCLUDED.days_of_stock_closing_stock, - replenishment_needed_raw = EXCLUDED.replenishment_needed_raw, replenishment_units = EXCLUDED.replenishment_units, replenishment_cost = EXCLUDED.replenishment_cost, replenishment_retail = EXcluded.replenishment_retail, replenishment_profit = EXCLUDED.replenishment_profit, + replenishment_needed_raw = EXCLUDED.replenishment_needed_raw, replenishment_units = EXCLUDED.replenishment_units, replenishment_cost = EXCLUDED.replenishment_cost, replenishment_retail = EXCLUDED.replenishment_retail, replenishment_profit = EXCLUDED.replenishment_profit, to_order_units = EXCLUDED.to_order_units, forecast_lost_sales_units = EXCLUDED.forecast_lost_sales_units, forecast_lost_revenue = EXCLUDED.forecast_lost_revenue, stock_cover_in_days = EXCLUDED.stock_cover_in_days, po_cover_in_days = EXCLUDED.po_cover_in_days, sells_out_in_days = EXCLUDED.sells_out_in_days, replenish_date = EXCLUDED.replenish_date, overstocked_units = EXCLUDED.overstocked_units, overstocked_cost = EXCLUDED.overstocked_cost, overstocked_retail = EXCLUDED.overstocked_retail, is_old_stock = EXCLUDED.is_old_stock,