2 Commits

19 changed files with 660 additions and 136 deletions

View File

@@ -169,6 +169,9 @@ CREATE TABLE IF NOT EXISTS import_history (
duration_minutes DECIMAL(10,2) GENERATED ALWAYS AS (duration_seconds::decimal / 60.0) STORED, duration_minutes DECIMAL(10,2) GENERATED ALWAYS AS (duration_seconds::decimal / 60.0) STORED,
records_added INTEGER DEFAULT 0, records_added INTEGER DEFAULT 0,
records_updated INTEGER DEFAULT 0, records_updated INTEGER DEFAULT 0,
records_deleted INTEGER DEFAULT 0,
records_skipped INTEGER DEFAULT 0,
total_processed INTEGER DEFAULT 0,
is_incremental BOOLEAN DEFAULT FALSE, is_incremental BOOLEAN DEFAULT FALSE,
status calculation_status DEFAULT 'running', status calculation_status DEFAULT 'running',
error_message TEXT, error_message TEXT,
@@ -179,3 +182,15 @@ CREATE TABLE IF NOT EXISTS import_history (
CREATE INDEX IF NOT EXISTS idx_last_calc ON calculate_status(last_calculation_timestamp); CREATE INDEX IF NOT EXISTS idx_last_calc ON calculate_status(last_calculation_timestamp);
CREATE INDEX IF NOT EXISTS idx_last_sync ON sync_status(last_sync_timestamp); CREATE INDEX IF NOT EXISTS idx_last_sync ON sync_status(last_sync_timestamp);
CREATE INDEX IF NOT EXISTS idx_table_time ON import_history(table_name, start_time); CREATE INDEX IF NOT EXISTS idx_table_time ON import_history(table_name, start_time);
CREATE INDEX IF NOT EXISTS idx_import_history_status ON import_history(status);
CREATE INDEX IF NOT EXISTS idx_calculate_history_status ON calculate_history(status);
-- Add comments for documentation
COMMENT ON TABLE import_history IS 'Tracks history of data import operations with detailed statistics';
COMMENT ON COLUMN import_history.records_deleted IS 'Number of records deleted during this import';
COMMENT ON COLUMN import_history.records_skipped IS 'Number of records skipped (e.g., unchanged, invalid)';
COMMENT ON COLUMN import_history.total_processed IS 'Total number of records examined/processed, including skipped';
COMMENT ON TABLE calculate_history IS 'Tracks history of metrics calculation runs with performance data';
COMMENT ON COLUMN calculate_history.duration_seconds IS 'Total duration of the calculation in seconds';
COMMENT ON COLUMN calculate_history.additional_info IS 'JSON object containing step timings, row counts, and other detailed metrics';

View File

@@ -1,4 +1,4 @@
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics-new/utils/progress'); const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../scripts/metrics-new/utils/progress');
const fs = require('fs'); const fs = require('fs');
const path = require('path'); const path = require('path');
const { pipeline } = require('stream'); const { pipeline } = require('stream');

View File

@@ -24,7 +24,7 @@ process.on('unhandledRejection', (reason, promise) => {
}); });
// Load progress module // Load progress module
const progress = require('../utils/progress'); const progress = require('../scripts/metrics-new/utils/progress');
// Store progress functions in global scope to ensure availability // Store progress functions in global scope to ensure availability
global.formatElapsedTime = progress.formatElapsedTime; global.formatElapsedTime = progress.formatElapsedTime;
@@ -36,7 +36,7 @@ global.getProgress = progress.getProgress;
global.logError = progress.logError; global.logError = progress.logError;
// Load database module // Load database module
const { getConnection, closePool } = require('../utils/db'); const { getConnection, closePool } = require('../scripts/metrics-new/utils/db');
// Add cancel handler // Add cancel handler
let isCancelled = false; let isCancelled = false;

View File

@@ -357,7 +357,7 @@ async function syncSettingsProductTable() {
* @param {string} config.historyType - Type identifier for calculate_history. * @param {string} config.historyType - Type identifier for calculate_history.
* @param {string} config.statusModule - Module name for calculate_status. * @param {string} config.statusModule - Module name for calculate_status.
* @param {object} progress - Progress utility functions. * @param {object} progress - Progress utility functions.
* @returns {Promise<{success: boolean, message: string, duration: number}>} * @returns {Promise<{success: boolean, message: string, duration: number, rowsAffected: number}>}
*/ */
async function executeSqlStep(config, progress) { async function executeSqlStep(config, progress) {
if (isCancelled) throw new Error(`Calculation skipped step ${config.name} due to prior cancellation.`); if (isCancelled) throw new Error(`Calculation skipped step ${config.name} due to prior cancellation.`);
@@ -366,6 +366,7 @@ async function executeSqlStep(config, progress) {
console.log(`\n--- Starting Step: ${config.name} ---`); console.log(`\n--- Starting Step: ${config.name} ---`);
const stepStartTime = Date.now(); const stepStartTime = Date.now();
let connection = null; let connection = null;
let rowsAffected = 0; // Track rows affected by this step
// Set timeout for this specific step // Set timeout for this specific step
if (stepTimeoutHandle) clearTimeout(stepTimeoutHandle); // Clear previous step's timeout if (stepTimeoutHandle) clearTimeout(stepTimeoutHandle); // Clear previous step's timeout
@@ -414,7 +415,10 @@ async function executeSqlStep(config, progress) {
current: 0, total: 100, current: 0, total: 100,
elapsed: progress.formatElapsedTime(stepStartTime), elapsed: progress.formatElapsedTime(stepStartTime),
remaining: 'Calculating...', rate: 0, percentage: '0', remaining: 'Calculating...', rate: 0, percentage: '0',
timing: { start_time: new Date(stepStartTime).toISOString() } timing: {
start_time: new Date(stepStartTime).toISOString(),
step_start_ms: stepStartTime
}
}); });
// 5. Execute the Main SQL Query // 5. Execute the Main SQL Query
@@ -423,15 +427,36 @@ async function executeSqlStep(config, progress) {
operation: `Executing SQL: ${config.name}`, operation: `Executing SQL: ${config.name}`,
current: 25, total: 100, current: 25, total: 100,
elapsed: progress.formatElapsedTime(stepStartTime), elapsed: progress.formatElapsedTime(stepStartTime),
remaining: 'Executing...', rate: 0, percentage: '25', remaining: 'Executing query...', rate: 0, percentage: '25',
timing: { start_time: new Date(stepStartTime).toISOString() } timing: {
start_time: new Date(stepStartTime).toISOString(),
step_start_ms: stepStartTime
}
}); });
console.log(`Executing SQL for ${config.name}...`); console.log(`Executing SQL for ${config.name}...`);
try { try {
// Try executing exactly as individual scripts do // Try executing exactly as individual scripts do
console.log('Executing SQL with simple query method...'); console.log('Executing SQL with simple query method...');
await connection.query(sqlQuery); const result = await connection.query(sqlQuery);
// Try to extract row count from result
if (result && result.rowCount !== undefined) {
rowsAffected = result.rowCount;
} else if (Array.isArray(result) && result[0] && result[0].rowCount !== undefined) {
rowsAffected = result[0].rowCount;
}
// Check if the query returned a result set with row count info
if (result && result.rows && result.rows.length > 0 && result.rows[0].rows_processed) {
rowsAffected = parseInt(result.rows[0].rows_processed) || rowsAffected;
console.log(`SQL returned metrics: ${JSON.stringify(result.rows[0])}`);
} else if (Array.isArray(result) && result[0] && result[0].rows && result[0].rows[0] && result[0].rows[0].rows_processed) {
rowsAffected = parseInt(result[0].rows[0].rows_processed) || rowsAffected;
console.log(`SQL returned metrics: ${JSON.stringify(result[0].rows[0])}`);
}
console.log(`SQL affected ${rowsAffected} rows`);
} catch (sqlError) { } catch (sqlError) {
if (sqlError.message.includes('could not determine data type of parameter')) { if (sqlError.message.includes('could not determine data type of parameter')) {
console.log('Simple query failed with parameter type error, trying alternative method...'); console.log('Simple query failed with parameter type error, trying alternative method...');
@@ -492,7 +517,8 @@ async function executeSqlStep(config, progress) {
return { return {
success: true, success: true,
message: `${config.name} completed successfully`, message: `${config.name} completed successfully`,
duration: stepDuration duration: stepDuration,
rowsAffected: rowsAffected
}; };
} catch (error) { } catch (error) {
@@ -664,6 +690,17 @@ async function runAllCalculations() {
combinedHistoryId = historyResult.rows[0].id; combinedHistoryId = historyResult.rows[0].id;
console.log(`Created combined history record ID: ${combinedHistoryId}`); console.log(`Created combined history record ID: ${combinedHistoryId}`);
// Get initial counts for tracking
const productCount = await connection.query('SELECT COUNT(*) as count FROM products');
const totalProducts = parseInt(productCount.rows[0].count);
// Update history with initial counts
await connection.query(`
UPDATE calculate_history
SET additional_info = additional_info || jsonb_build_object('total_products', $1::integer)
WHERE id = $2
`, [totalProducts, combinedHistoryId]);
connection.release(); connection.release();
} catch (historyError) { } catch (historyError) {
console.error('Error creating combined history record:', historyError); console.error('Error creating combined history record:', historyError);
@@ -692,28 +729,49 @@ async function runAllCalculations() {
// Track completed steps // Track completed steps
const completedSteps = []; const completedSteps = [];
const stepTimings = {};
const stepRowCounts = {};
let currentStepIndex = 0;
// Now run the calculation steps // Now run the calculation steps
for (const step of steps) { for (const step of stepsToRun) {
if (step.run) { if (isCancelled) {
if (isCancelled) { console.log(`Skipping step "${step.name}" due to cancellation.`);
console.log(`Skipping step "${step.name}" due to cancellation.`); overallSuccess = false; // Mark as not fully successful if steps are skipped due to cancel
overallSuccess = false; // Mark as not fully successful if steps are skipped due to cancel continue; // Skip to next step
continue; // Skip to next step }
}
// Pass the progress utilities to the step executor currentStepIndex++;
const result = await executeSqlStep(step, progressUtils);
if (result.success) { // Update overall progress
completedSteps.push({ progressUtils.outputProgress({
name: step.name, status: 'running',
duration: result.duration, operation: 'Running calculations',
status: 'completed' message: `Step ${currentStepIndex} of ${stepsToRun.length}: ${step.name}`,
}); current: currentStepIndex - 1,
total: stepsToRun.length,
elapsed: progressUtils.formatElapsedTime(overallStartTime),
remaining: progressUtils.estimateRemaining(overallStartTime, currentStepIndex - 1, stepsToRun.length),
percentage: Math.round(((currentStepIndex - 1) / stepsToRun.length) * 100).toString(),
timing: {
overall_start_time: new Date(overallStartTime).toISOString(),
current_step: step.name,
completed_steps: completedSteps.length
} }
} else { });
console.log(`Skipping step "${step.name}" (disabled by configuration).`);
// Pass the progress utilities to the step executor
const result = await executeSqlStep(step, progressUtils);
if (result.success) {
completedSteps.push({
name: step.name,
duration: result.duration,
status: 'completed',
rowsAffected: result.rowsAffected
});
stepTimings[step.name] = result.duration;
stepRowCounts[step.name] = result.rowsAffected;
} }
} }
@@ -726,18 +784,32 @@ async function runAllCalculations() {
connection = await getConnection(); connection = await getConnection();
const totalDuration = Math.round((Date.now() - overallStartTime) / 1000); const totalDuration = Math.round((Date.now() - overallStartTime) / 1000);
// Get final processed counts
const processedCounts = await connection.query(`
SELECT
(SELECT COUNT(*) FROM product_metrics WHERE last_calculated >= $1) as processed_products
`, [new Date(overallStartTime)]);
await connection.query(` await connection.query(`
UPDATE calculate_history UPDATE calculate_history
SET SET
end_time = NOW(), end_time = NOW(),
duration_seconds = $1::integer, duration_seconds = $1::integer,
status = $2::calculation_status, status = $2::calculation_status,
additional_info = additional_info || jsonb_build_object('completed_steps', $3::jsonb) additional_info = additional_info || jsonb_build_object(
WHERE id = $4::integer; 'processed_products', $3::integer,
'completed_steps', $4::jsonb,
'step_timings', $5::jsonb,
'step_row_counts', $6::jsonb
)
WHERE id = $7::integer;
`, [ `, [
totalDuration, totalDuration,
isCancelled ? 'cancelled' : 'completed', isCancelled ? 'cancelled' : 'completed',
processedCounts.rows[0].processed_products,
JSON.stringify(completedSteps), JSON.stringify(completedSteps),
JSON.stringify(stepTimings),
JSON.stringify(stepRowCounts),
combinedHistoryId combinedHistoryId
]); ]);
@@ -753,6 +825,26 @@ async function runAllCalculations() {
overallSuccess = false; overallSuccess = false;
} else { } else {
console.log("\n--- All enabled calculations finished successfully ---"); console.log("\n--- All enabled calculations finished successfully ---");
// Send final completion progress
progressUtils.outputProgress({
status: 'complete',
operation: 'All calculations completed',
message: `Successfully completed ${completedSteps.length} of ${stepsToRun.length} steps`,
current: stepsToRun.length,
total: stepsToRun.length,
elapsed: progressUtils.formatElapsedTime(overallStartTime),
remaining: '0s',
percentage: '100',
timing: {
overall_start_time: new Date(overallStartTime).toISOString(),
overall_end_time: new Date().toISOString(),
total_duration_seconds: Math.round((Date.now() - overallStartTime) / 1000),
step_timings: stepTimings,
completed_steps: completedSteps.length
}
});
progressUtils.clearProgress(); // Clear progress only on full success progressUtils.clearProgress(); // Clear progress only on full success
} }

View File

@@ -6,7 +6,6 @@ const importCategories = require('./import/categories');
const { importProducts } = require('./import/products'); const { importProducts } = require('./import/products');
const importOrders = require('./import/orders'); const importOrders = require('./import/orders');
const importPurchaseOrders = require('./import/purchase-orders'); const importPurchaseOrders = require('./import/purchase-orders');
const importHistoricalData = require('./import/historical-data');
dotenv.config({ path: path.join(__dirname, "../.env") }); dotenv.config({ path: path.join(__dirname, "../.env") });
@@ -15,7 +14,6 @@ const IMPORT_CATEGORIES = true;
const IMPORT_PRODUCTS = true; const IMPORT_PRODUCTS = true;
const IMPORT_ORDERS = true; const IMPORT_ORDERS = true;
const IMPORT_PURCHASE_ORDERS = true; const IMPORT_PURCHASE_ORDERS = true;
const IMPORT_HISTORICAL_DATA = false;
// Add flag for incremental updates // Add flag for incremental updates
const INCREMENTAL_UPDATE = process.env.INCREMENTAL_UPDATE !== 'false'; // Default to true unless explicitly set to false const INCREMENTAL_UPDATE = process.env.INCREMENTAL_UPDATE !== 'false'; // Default to true unless explicitly set to false
@@ -80,8 +78,7 @@ async function main() {
IMPORT_CATEGORIES, IMPORT_CATEGORIES,
IMPORT_PRODUCTS, IMPORT_PRODUCTS,
IMPORT_ORDERS, IMPORT_ORDERS,
IMPORT_PURCHASE_ORDERS, IMPORT_PURCHASE_ORDERS
IMPORT_HISTORICAL_DATA
].filter(Boolean).length; ].filter(Boolean).length;
try { try {
@@ -129,11 +126,10 @@ async function main() {
'categories_enabled', $2::boolean, 'categories_enabled', $2::boolean,
'products_enabled', $3::boolean, 'products_enabled', $3::boolean,
'orders_enabled', $4::boolean, 'orders_enabled', $4::boolean,
'purchase_orders_enabled', $5::boolean, 'purchase_orders_enabled', $5::boolean
'historical_data_enabled', $6::boolean
) )
) RETURNING id ) RETURNING id
`, [INCREMENTAL_UPDATE, IMPORT_CATEGORIES, IMPORT_PRODUCTS, IMPORT_ORDERS, IMPORT_PURCHASE_ORDERS, IMPORT_HISTORICAL_DATA]); `, [INCREMENTAL_UPDATE, IMPORT_CATEGORIES, IMPORT_PRODUCTS, IMPORT_ORDERS, IMPORT_PURCHASE_ORDERS]);
importHistoryId = historyResult.rows[0].id; importHistoryId = historyResult.rows[0].id;
} catch (error) { } catch (error) {
console.error("Error creating import history record:", error); console.error("Error creating import history record:", error);
@@ -150,16 +146,21 @@ async function main() {
categories: null, categories: null,
products: null, products: null,
orders: null, orders: null,
purchaseOrders: null, purchaseOrders: null
historicalData: null
}; };
let totalRecordsAdded = 0; let totalRecordsAdded = 0;
let totalRecordsUpdated = 0; let totalRecordsUpdated = 0;
let totalRecordsDeleted = 0; // Add tracking for deleted records
let totalRecordsSkipped = 0; // Track skipped/filtered records
const stepTimings = {};
// Run each import based on constants // Run each import based on constants
if (IMPORT_CATEGORIES) { if (IMPORT_CATEGORIES) {
const stepStart = Date.now();
results.categories = await importCategories(prodConnection, localConnection); results.categories = await importCategories(prodConnection, localConnection);
stepTimings.categories = Math.round((Date.now() - stepStart) / 1000);
if (isImportCancelled) throw new Error("Import cancelled"); if (isImportCancelled) throw new Error("Import cancelled");
completedSteps++; completedSteps++;
console.log('Categories import result:', results.categories); console.log('Categories import result:', results.categories);
@@ -168,26 +169,37 @@ async function main() {
} }
if (IMPORT_PRODUCTS) { if (IMPORT_PRODUCTS) {
const stepStart = Date.now();
results.products = await importProducts(prodConnection, localConnection, INCREMENTAL_UPDATE); results.products = await importProducts(prodConnection, localConnection, INCREMENTAL_UPDATE);
stepTimings.products = Math.round((Date.now() - stepStart) / 1000);
if (isImportCancelled) throw new Error("Import cancelled"); if (isImportCancelled) throw new Error("Import cancelled");
completedSteps++; completedSteps++;
console.log('Products import result:', results.products); console.log('Products import result:', results.products);
totalRecordsAdded += parseInt(results.products?.recordsAdded || 0); totalRecordsAdded += parseInt(results.products?.recordsAdded || 0);
totalRecordsUpdated += parseInt(results.products?.recordsUpdated || 0); totalRecordsUpdated += parseInt(results.products?.recordsUpdated || 0);
totalRecordsSkipped += parseInt(results.products?.skippedUnchanged || 0);
} }
if (IMPORT_ORDERS) { if (IMPORT_ORDERS) {
const stepStart = Date.now();
results.orders = await importOrders(prodConnection, localConnection, INCREMENTAL_UPDATE); results.orders = await importOrders(prodConnection, localConnection, INCREMENTAL_UPDATE);
stepTimings.orders = Math.round((Date.now() - stepStart) / 1000);
if (isImportCancelled) throw new Error("Import cancelled"); if (isImportCancelled) throw new Error("Import cancelled");
completedSteps++; completedSteps++;
console.log('Orders import result:', results.orders); console.log('Orders import result:', results.orders);
totalRecordsAdded += parseInt(results.orders?.recordsAdded || 0); totalRecordsAdded += parseInt(results.orders?.recordsAdded || 0);
totalRecordsUpdated += parseInt(results.orders?.recordsUpdated || 0); totalRecordsUpdated += parseInt(results.orders?.recordsUpdated || 0);
totalRecordsSkipped += parseInt(results.orders?.totalSkipped || 0);
} }
if (IMPORT_PURCHASE_ORDERS) { if (IMPORT_PURCHASE_ORDERS) {
try { try {
const stepStart = Date.now();
results.purchaseOrders = await importPurchaseOrders(prodConnection, localConnection, INCREMENTAL_UPDATE); results.purchaseOrders = await importPurchaseOrders(prodConnection, localConnection, INCREMENTAL_UPDATE);
stepTimings.purchaseOrders = Math.round((Date.now() - stepStart) / 1000);
if (isImportCancelled) throw new Error("Import cancelled"); if (isImportCancelled) throw new Error("Import cancelled");
completedSteps++; completedSteps++;
console.log('Purchase orders import result:', results.purchaseOrders); console.log('Purchase orders import result:', results.purchaseOrders);
@@ -198,6 +210,7 @@ async function main() {
} else { } else {
totalRecordsAdded += parseInt(results.purchaseOrders?.recordsAdded || 0); totalRecordsAdded += parseInt(results.purchaseOrders?.recordsAdded || 0);
totalRecordsUpdated += parseInt(results.purchaseOrders?.recordsUpdated || 0); totalRecordsUpdated += parseInt(results.purchaseOrders?.recordsUpdated || 0);
totalRecordsDeleted += parseInt(results.purchaseOrders?.recordsDeleted || 0);
} }
} catch (error) { } catch (error) {
console.error('Error during purchase orders import:', error); console.error('Error during purchase orders import:', error);
@@ -211,32 +224,6 @@ async function main() {
} }
} }
if (IMPORT_HISTORICAL_DATA) {
try {
results.historicalData = await importHistoricalData(prodConnection, localConnection, INCREMENTAL_UPDATE);
if (isImportCancelled) throw new Error("Import cancelled");
completedSteps++;
console.log('Historical data import result:', results.historicalData);
// Handle potential error status
if (results.historicalData?.status === 'error') {
console.error('Historical data import had an error:', results.historicalData.error);
} else {
totalRecordsAdded += parseInt(results.historicalData?.recordsAdded || 0);
totalRecordsUpdated += parseInt(results.historicalData?.recordsUpdated || 0);
}
} catch (error) {
console.error('Error during historical data import:', error);
// Continue with other imports, don't fail the whole process
results.historicalData = {
status: 'error',
error: error.message,
recordsAdded: 0,
recordsUpdated: 0
};
}
}
const endTime = Date.now(); const endTime = Date.now();
const totalElapsedSeconds = Math.round((endTime - startTime) / 1000); const totalElapsedSeconds = Math.round((endTime - startTime) / 1000);
@@ -254,14 +241,15 @@ async function main() {
'products_enabled', $5::boolean, 'products_enabled', $5::boolean,
'orders_enabled', $6::boolean, 'orders_enabled', $6::boolean,
'purchase_orders_enabled', $7::boolean, 'purchase_orders_enabled', $7::boolean,
'historical_data_enabled', $8::boolean, 'categories_result', COALESCE($8::jsonb, 'null'::jsonb),
'categories_result', COALESCE($9::jsonb, 'null'::jsonb), 'products_result', COALESCE($9::jsonb, 'null'::jsonb),
'products_result', COALESCE($10::jsonb, 'null'::jsonb), 'orders_result', COALESCE($10::jsonb, 'null'::jsonb),
'orders_result', COALESCE($11::jsonb, 'null'::jsonb), 'purchase_orders_result', COALESCE($11::jsonb, 'null'::jsonb),
'purchase_orders_result', COALESCE($12::jsonb, 'null'::jsonb), 'total_deleted', $12::integer,
'historical_data_result', COALESCE($13::jsonb, 'null'::jsonb) 'total_skipped', $13::integer,
'step_timings', $14::jsonb
) )
WHERE id = $14 WHERE id = $15
`, [ `, [
totalElapsedSeconds, totalElapsedSeconds,
parseInt(totalRecordsAdded), parseInt(totalRecordsAdded),
@@ -270,12 +258,13 @@ async function main() {
IMPORT_PRODUCTS, IMPORT_PRODUCTS,
IMPORT_ORDERS, IMPORT_ORDERS,
IMPORT_PURCHASE_ORDERS, IMPORT_PURCHASE_ORDERS,
IMPORT_HISTORICAL_DATA,
JSON.stringify(results.categories), JSON.stringify(results.categories),
JSON.stringify(results.products), JSON.stringify(results.products),
JSON.stringify(results.orders), JSON.stringify(results.orders),
JSON.stringify(results.purchaseOrders), JSON.stringify(results.purchaseOrders),
JSON.stringify(results.historicalData), totalRecordsDeleted,
totalRecordsSkipped,
JSON.stringify(stepTimings),
importHistoryId importHistoryId
]); ]);

View File

@@ -92,6 +92,12 @@ async function importCategories(prodConnection, localConnection) {
description = EXCLUDED.description, description = EXCLUDED.description,
status = EXCLUDED.status, status = EXCLUDED.status,
updated_at = EXCLUDED.updated_at updated_at = EXCLUDED.updated_at
WHERE -- Only update if at least one field has changed
categories.name IS DISTINCT FROM EXCLUDED.name OR
categories.type IS DISTINCT FROM EXCLUDED.type OR
categories.parent_id IS DISTINCT FROM EXCLUDED.parent_id OR
categories.description IS DISTINCT FROM EXCLUDED.description OR
categories.status IS DISTINCT FROM EXCLUDED.status
RETURNING RETURNING
cat_id, cat_id,
CASE CASE
@@ -133,7 +139,7 @@ async function importCategories(prodConnection, localConnection) {
message: `Imported ${inserted} (updated ${updated}) categories of type ${type}`, message: `Imported ${inserted} (updated ${updated}) categories of type ${type}`,
current: totalInserted + totalUpdated, current: totalInserted + totalUpdated,
total: categories.length, total: categories.length,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000), elapsed: formatElapsedTime(startTime),
}); });
} catch (error) { } catch (error) {
// Rollback to the savepoint for this type // Rollback to the savepoint for this type
@@ -161,7 +167,7 @@ async function importCategories(prodConnection, localConnection) {
operation: "Categories import completed", operation: "Categories import completed",
current: totalInserted + totalUpdated, current: totalInserted + totalUpdated,
total: totalInserted + totalUpdated, total: totalInserted + totalUpdated,
duration: formatElapsedTime((Date.now() - startTime) / 1000), duration: formatElapsedTime(startTime),
warnings: skippedCategories.length > 0 ? { warnings: skippedCategories.length > 0 ? {
message: "Some categories were skipped due to missing parents", message: "Some categories were skipped due to missing parents",
skippedCategories skippedCategories

View File

@@ -221,8 +221,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
message: `Loading order items: ${processedCount} of ${totalOrderItems}`, message: `Loading order items: ${processedCount} of ${totalOrderItems}`,
current: processedCount, current: processedCount,
total: totalOrderItems, total: totalOrderItems,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalOrderItems), remaining: estimateRemaining(startTime, processedCount, totalOrderItems),
rate: calculateRate(startTime, processedCount) rate: calculateRate(startTime, processedCount)
}); });
} catch (error) { } catch (error) {
@@ -530,8 +530,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
message: `Loading order data: ${processedCount} of ${totalUniqueOrders}`, message: `Loading order data: ${processedCount} of ${totalUniqueOrders}`,
current: processedCount, current: processedCount,
total: totalUniqueOrders, total: totalUniqueOrders,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalUniqueOrders), remaining: estimateRemaining(startTime, processedCount, totalUniqueOrders),
rate: calculateRate(startTime, processedCount) rate: calculateRate(startTime, processedCount)
}); });
} }
@@ -681,6 +681,15 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
status = EXCLUDED.status, status = EXCLUDED.status,
canceled = EXCLUDED.canceled, canceled = EXCLUDED.canceled,
costeach = EXCLUDED.costeach costeach = EXCLUDED.costeach
WHERE -- Only update if at least one key field has changed
orders.price IS DISTINCT FROM EXCLUDED.price OR
orders.quantity IS DISTINCT FROM EXCLUDED.quantity OR
orders.discount IS DISTINCT FROM EXCLUDED.discount OR
orders.tax IS DISTINCT FROM EXCLUDED.tax OR
orders.status IS DISTINCT FROM EXCLUDED.status OR
orders.canceled IS DISTINCT FROM EXCLUDED.canceled OR
orders.costeach IS DISTINCT FROM EXCLUDED.costeach OR
orders.date IS DISTINCT FROM EXCLUDED.date
RETURNING xmax = 0 as inserted RETURNING xmax = 0 as inserted
) )
SELECT SELECT
@@ -704,7 +713,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
message: `Importing orders: ${cumulativeProcessedOrders} of ${totalUniqueOrders}`, message: `Importing orders: ${cumulativeProcessedOrders} of ${totalUniqueOrders}`,
current: cumulativeProcessedOrders, current: cumulativeProcessedOrders,
total: totalUniqueOrders, total: totalUniqueOrders,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, cumulativeProcessedOrders, totalUniqueOrders), remaining: estimateRemaining(startTime, cumulativeProcessedOrders, totalUniqueOrders),
rate: calculateRate(startTime, cumulativeProcessedOrders) rate: calculateRate(startTime, cumulativeProcessedOrders)
}); });
@@ -751,8 +760,15 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
recordsUpdated: parseInt(recordsUpdated) || 0, recordsUpdated: parseInt(recordsUpdated) || 0,
totalSkipped: skippedOrders.size || 0, totalSkipped: skippedOrders.size || 0,
missingProducts: missingProducts.size || 0, missingProducts: missingProducts.size || 0,
totalProcessed: orderItems.length, // Total order items in source
incrementalUpdate, incrementalUpdate,
lastSyncTime lastSyncTime,
details: {
uniqueOrdersProcessed: cumulativeProcessedOrders,
totalOrderItems: orderItems.length,
skippedDueToMissingProducts: skippedOrders.size,
missingProductIds: Array.from(missingProducts).slice(0, 100) // First 100 for debugging
}
}; };
} catch (error) { } catch (error) {
console.error("Error during orders import:", error); console.error("Error during orders import:", error);

View File

@@ -576,8 +576,8 @@ async function materializeCalculations(prodConnection, localConnection, incremen
message: `Imported ${i + batch.length} of ${prodData.length} products`, message: `Imported ${i + batch.length} of ${prodData.length} products`,
current: i + batch.length, current: i + batch.length,
total: prodData.length, total: prodData.length,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, i + batch.length, prodData.length), remaining: estimateRemaining(startTime, i + batch.length, prodData.length),
rate: calculateRate(startTime, i + batch.length) rate: calculateRate(startTime, i + batch.length)
}); });
} }
@@ -587,6 +587,59 @@ async function materializeCalculations(prodConnection, localConnection, incremen
operation: "Products import", operation: "Products import",
message: "Finished materializing calculations" message: "Finished materializing calculations"
}); });
// Add step to identify which products actually need updating
outputProgress({
status: "running",
operation: "Products import",
message: "Identifying changed products"
});
// Mark products that haven't changed as needs_update = false
await localConnection.query(`
UPDATE temp_products t
SET needs_update = FALSE
FROM products p
WHERE t.pid = p.pid
AND t.title IS NOT DISTINCT FROM p.title
AND t.description IS NOT DISTINCT FROM p.description
AND t.sku IS NOT DISTINCT FROM p.sku
AND t.stock_quantity = p.stock_quantity
AND t.price = p.price
AND t.regular_price = p.regular_price
AND t.cost_price IS NOT DISTINCT FROM p.cost_price
AND t.vendor IS NOT DISTINCT FROM p.vendor
AND t.brand IS NOT DISTINCT FROM p.brand
AND t.visible = p.visible
AND t.replenishable = p.replenishable
AND t.barcode IS NOT DISTINCT FROM p.barcode
AND t.updated_at IS NOT DISTINCT FROM p.updated_at
AND t.total_sold IS NOT DISTINCT FROM p.total_sold
-- Check key fields that are likely to change
-- We don't need to check every single field, just the important ones
`);
// Get count of products that need updating
const [countResult] = await localConnection.query(`
SELECT
COUNT(*) FILTER (WHERE needs_update = true) as update_count,
COUNT(*) FILTER (WHERE needs_update = false) as skip_count,
COUNT(*) as total_count
FROM temp_products
`);
outputProgress({
status: "running",
operation: "Products import",
message: `Found ${countResult.rows[0].update_count} products that need updating, ${countResult.rows[0].skip_count} unchanged`
});
// Return the total products processed
return {
totalProcessed: prodData.length,
needsUpdate: parseInt(countResult.rows[0].update_count),
skipped: parseInt(countResult.rows[0].skip_count)
};
} }
async function importProducts(prodConnection, localConnection, incrementalUpdate = true) { async function importProducts(prodConnection, localConnection, incrementalUpdate = true) {
@@ -612,7 +665,7 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
await setupTemporaryTables(localConnection); await setupTemporaryTables(localConnection);
// Materialize calculations into temp table // Materialize calculations into temp table
await materializeCalculations(prodConnection, localConnection, incrementalUpdate, lastSyncTime, startTime); const materializeResult = await materializeCalculations(prodConnection, localConnection, incrementalUpdate, lastSyncTime, startTime);
// Get the list of products that need updating // Get the list of products that need updating
const [products] = await localConnection.query(` const [products] = await localConnection.query(`
@@ -847,8 +900,8 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
message: `Processing products: ${i + batch.length} of ${products.rows.length}`, message: `Processing products: ${i + batch.length} of ${products.rows.length}`,
current: i + batch.length, current: i + batch.length,
total: products.rows.length, total: products.rows.length,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, i + batch.length, products.rows.length), remaining: estimateRemaining(startTime, i + batch.length, products.rows.length),
rate: calculateRate(startTime, i + batch.length) rate: calculateRate(startTime, i + batch.length)
}); });
} }
@@ -872,7 +925,10 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
recordsAdded, recordsAdded,
recordsUpdated, recordsUpdated,
totalRecords: products.rows.length, totalRecords: products.rows.length,
duration: formatElapsedTime(Date.now() - startTime) totalProcessed: materializeResult.totalProcessed,
duration: formatElapsedTime(startTime),
needsUpdate: materializeResult.needsUpdate,
skippedUnchanged: materializeResult.skipped
}; };
} catch (error) { } catch (error) {
// Rollback on error // Rollback on error

View File

@@ -398,7 +398,7 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
message: `Processed ${offset} of ${totalPOs} purchase orders (${totalProcessed} line items)`, message: `Processed ${offset} of ${totalPOs} purchase orders (${totalProcessed} line items)`,
current: offset, current: offset,
total: totalPOs, total: totalPOs,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, offset, totalPOs), remaining: estimateRemaining(startTime, offset, totalPOs),
rate: calculateRate(startTime, offset) rate: calculateRate(startTime, offset)
}); });
@@ -605,7 +605,7 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
message: `Processed ${offset} of ${totalReceivings} receivings (${totalProcessed} line items total)`, message: `Processed ${offset} of ${totalReceivings} receivings (${totalProcessed} line items total)`,
current: offset, current: offset,
total: totalReceivings, total: totalReceivings,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, offset, totalReceivings), remaining: estimateRemaining(startTime, offset, totalReceivings),
rate: calculateRate(startTime, offset) rate: calculateRate(startTime, offset)
}); });
@@ -730,6 +730,13 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
date_created = EXCLUDED.date_created, date_created = EXCLUDED.date_created,
date_ordered = EXCLUDED.date_ordered, date_ordered = EXCLUDED.date_ordered,
updated = CURRENT_TIMESTAMP updated = CURRENT_TIMESTAMP
WHERE -- Only update if at least one key field has changed
purchase_orders.ordered IS DISTINCT FROM EXCLUDED.ordered OR
purchase_orders.po_cost_price IS DISTINCT FROM EXCLUDED.po_cost_price OR
purchase_orders.status IS DISTINCT FROM EXCLUDED.status OR
purchase_orders.expected_date IS DISTINCT FROM EXCLUDED.expected_date OR
purchase_orders.date IS DISTINCT FROM EXCLUDED.date OR
purchase_orders.vendor IS DISTINCT FROM EXCLUDED.vendor
RETURNING (xmax = 0) as inserted RETURNING (xmax = 0) as inserted
`); `);
@@ -806,6 +813,12 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
supplier_id = EXCLUDED.supplier_id, supplier_id = EXCLUDED.supplier_id,
status = EXCLUDED.status, status = EXCLUDED.status,
updated = CURRENT_TIMESTAMP updated = CURRENT_TIMESTAMP
WHERE -- Only update if at least one key field has changed
receivings.qty_each IS DISTINCT FROM EXCLUDED.qty_each OR
receivings.cost_each IS DISTINCT FROM EXCLUDED.cost_each OR
receivings.status IS DISTINCT FROM EXCLUDED.status OR
receivings.received_date IS DISTINCT FROM EXCLUDED.received_date OR
receivings.received_by IS DISTINCT FROM EXCLUDED.received_by
RETURNING (xmax = 0) as inserted RETURNING (xmax = 0) as inserted
`); `);

View File

@@ -95,7 +95,14 @@ BEGIN
profit_30d = EXCLUDED.profit_30d, cogs_30d = EXCLUDED.cogs_30d, profit_30d = EXCLUDED.profit_30d, cogs_30d = EXCLUDED.cogs_30d,
sales_365d = EXCLUDED.sales_365d, revenue_365d = EXCLUDED.revenue_365d, sales_365d = EXCLUDED.sales_365d, revenue_365d = EXCLUDED.revenue_365d,
lifetime_sales = EXCLUDED.lifetime_sales, lifetime_revenue = EXCLUDED.lifetime_revenue, lifetime_sales = EXCLUDED.lifetime_sales, lifetime_revenue = EXCLUDED.lifetime_revenue,
avg_margin_30d = EXCLUDED.avg_margin_30d; avg_margin_30d = EXCLUDED.avg_margin_30d
WHERE -- Only update if at least one value has changed
brand_metrics.product_count IS DISTINCT FROM EXCLUDED.product_count OR
brand_metrics.active_product_count IS DISTINCT FROM EXCLUDED.active_product_count OR
brand_metrics.current_stock_units IS DISTINCT FROM EXCLUDED.current_stock_units OR
brand_metrics.sales_30d IS DISTINCT FROM EXCLUDED.sales_30d OR
brand_metrics.revenue_30d IS DISTINCT FROM EXCLUDED.revenue_30d OR
brand_metrics.lifetime_sales IS DISTINCT FROM EXCLUDED.lifetime_sales;
-- Update calculate_status -- Update calculate_status
INSERT INTO public.calculate_status (module_name, last_calculation_timestamp) INSERT INTO public.calculate_status (module_name, last_calculation_timestamp)
@@ -104,3 +111,25 @@ BEGIN
RAISE NOTICE 'Finished % calculation. Duration: %', _module_name, clock_timestamp() - _start_time; RAISE NOTICE 'Finished % calculation. Duration: %', _module_name, clock_timestamp() - _start_time;
END $$; END $$;
-- Return metrics about the update operation for tracking
WITH update_stats AS (
SELECT
COUNT(*) as total_brands,
COUNT(*) FILTER (WHERE last_calculated >= NOW() - INTERVAL '5 minutes') as rows_processed,
SUM(product_count) as total_products,
SUM(active_product_count) as total_active_products,
SUM(sales_30d) as total_sales_30d,
SUM(revenue_30d) as total_revenue_30d,
AVG(avg_margin_30d) as overall_avg_margin_30d
FROM public.brand_metrics
)
SELECT
rows_processed,
total_brands,
total_products::int,
total_active_products::int,
total_sales_30d::int,
ROUND(total_revenue_30d, 2) as total_revenue_30d,
ROUND(overall_avg_margin_30d, 2) as overall_avg_margin_30d
FROM update_stats;

View File

@@ -238,7 +238,8 @@ BEGIN
category_type = EXCLUDED.category_type, category_type = EXCLUDED.category_type,
parent_id = EXCLUDED.parent_id, parent_id = EXCLUDED.parent_id,
last_calculated = EXCLUDED.last_calculated, last_calculated = EXCLUDED.last_calculated,
-- Update rolled-up metrics
-- ROLLED-UP METRICS (includes this category + all descendants)
product_count = EXCLUDED.product_count, product_count = EXCLUDED.product_count,
active_product_count = EXCLUDED.active_product_count, active_product_count = EXCLUDED.active_product_count,
replenishable_product_count = EXCLUDED.replenishable_product_count, replenishable_product_count = EXCLUDED.replenishable_product_count,
@@ -250,7 +251,8 @@ BEGIN
profit_30d = EXCLUDED.profit_30d, cogs_30d = EXCLUDED.cogs_30d, profit_30d = EXCLUDED.profit_30d, cogs_30d = EXCLUDED.cogs_30d,
sales_365d = EXCLUDED.sales_365d, revenue_365d = EXCLUDED.revenue_365d, sales_365d = EXCLUDED.sales_365d, revenue_365d = EXCLUDED.revenue_365d,
lifetime_sales = EXCLUDED.lifetime_sales, lifetime_revenue = EXCLUDED.lifetime_revenue, lifetime_sales = EXCLUDED.lifetime_sales, lifetime_revenue = EXCLUDED.lifetime_revenue,
-- Update direct metrics
-- DIRECT METRICS (only products directly in this category)
direct_product_count = EXCLUDED.direct_product_count, direct_product_count = EXCLUDED.direct_product_count,
direct_active_product_count = EXCLUDED.direct_active_product_count, direct_active_product_count = EXCLUDED.direct_active_product_count,
direct_replenishable_product_count = EXCLUDED.direct_replenishable_product_count, direct_replenishable_product_count = EXCLUDED.direct_replenishable_product_count,
@@ -262,9 +264,19 @@ BEGIN
direct_profit_30d = EXCLUDED.direct_profit_30d, direct_cogs_30d = EXCLUDED.direct_cogs_30d, direct_profit_30d = EXCLUDED.direct_profit_30d, direct_cogs_30d = EXCLUDED.direct_cogs_30d,
direct_sales_365d = EXCLUDED.direct_sales_365d, direct_revenue_365d = EXCLUDED.direct_revenue_365d, direct_sales_365d = EXCLUDED.direct_sales_365d, direct_revenue_365d = EXCLUDED.direct_revenue_365d,
direct_lifetime_sales = EXCLUDED.direct_lifetime_sales, direct_lifetime_revenue = EXCLUDED.direct_lifetime_revenue, direct_lifetime_sales = EXCLUDED.direct_lifetime_sales, direct_lifetime_revenue = EXCLUDED.direct_lifetime_revenue,
-- Update KPIs
-- Calculated KPIs
avg_margin_30d = EXCLUDED.avg_margin_30d, avg_margin_30d = EXCLUDED.avg_margin_30d,
stock_turn_30d = EXCLUDED.stock_turn_30d; stock_turn_30d = EXCLUDED.stock_turn_30d
WHERE -- Only update if at least one value has changed
category_metrics.product_count IS DISTINCT FROM EXCLUDED.product_count OR
category_metrics.active_product_count IS DISTINCT FROM EXCLUDED.active_product_count OR
category_metrics.current_stock_units IS DISTINCT FROM EXCLUDED.current_stock_units OR
category_metrics.sales_30d IS DISTINCT FROM EXCLUDED.sales_30d OR
category_metrics.revenue_30d IS DISTINCT FROM EXCLUDED.revenue_30d OR
category_metrics.lifetime_sales IS DISTINCT FROM EXCLUDED.lifetime_sales OR
category_metrics.direct_product_count IS DISTINCT FROM EXCLUDED.direct_product_count OR
category_metrics.direct_sales_30d IS DISTINCT FROM EXCLUDED.direct_sales_30d;
-- Update calculate_status -- Update calculate_status
INSERT INTO public.calculate_status (module_name, last_calculation_timestamp) INSERT INTO public.calculate_status (module_name, last_calculation_timestamp)
@@ -273,3 +285,25 @@ BEGIN
RAISE NOTICE 'Finished % calculation. Duration: %', _module_name, clock_timestamp() - _start_time; RAISE NOTICE 'Finished % calculation. Duration: %', _module_name, clock_timestamp() - _start_time;
END $$; END $$;
-- Return metrics about the update operation for tracking
WITH update_stats AS (
SELECT
COUNT(*) as total_categories,
COUNT(*) FILTER (WHERE last_calculated >= NOW() - INTERVAL '5 minutes') as rows_processed,
COUNT(*) FILTER (WHERE category_type = 11) as main_categories, -- 11 = category
COUNT(*) FILTER (WHERE category_type = 12) as subcategories, -- 12 = subcategory
SUM(product_count) as total_products,
SUM(active_product_count) as total_active_products,
SUM(current_stock_units) as total_stock_units
FROM public.category_metrics
)
SELECT
rows_processed,
total_categories,
main_categories,
subcategories,
total_products::int,
total_active_products::int,
total_stock_units::int
FROM update_stats;

View File

@@ -124,7 +124,15 @@ BEGIN
profit_30d = EXCLUDED.profit_30d, cogs_30d = EXCLUDED.cogs_30d, profit_30d = EXCLUDED.profit_30d, cogs_30d = EXCLUDED.cogs_30d,
sales_365d = EXCLUDED.sales_365d, revenue_365d = EXCLUDED.revenue_365d, sales_365d = EXCLUDED.sales_365d, revenue_365d = EXCLUDED.revenue_365d,
lifetime_sales = EXCLUDED.lifetime_sales, lifetime_revenue = EXCLUDED.lifetime_revenue, lifetime_sales = EXCLUDED.lifetime_sales, lifetime_revenue = EXCLUDED.lifetime_revenue,
avg_margin_30d = EXCLUDED.avg_margin_30d; avg_margin_30d = EXCLUDED.avg_margin_30d
WHERE -- Only update if at least one value has changed
vendor_metrics.product_count IS DISTINCT FROM EXCLUDED.product_count OR
vendor_metrics.active_product_count IS DISTINCT FROM EXCLUDED.active_product_count OR
vendor_metrics.current_stock_units IS DISTINCT FROM EXCLUDED.current_stock_units OR
vendor_metrics.on_order_units IS DISTINCT FROM EXCLUDED.on_order_units OR
vendor_metrics.sales_30d IS DISTINCT FROM EXCLUDED.sales_30d OR
vendor_metrics.revenue_30d IS DISTINCT FROM EXCLUDED.revenue_30d OR
vendor_metrics.lifetime_sales IS DISTINCT FROM EXCLUDED.lifetime_sales;
-- Update calculate_status -- Update calculate_status
INSERT INTO public.calculate_status (module_name, last_calculation_timestamp) INSERT INTO public.calculate_status (module_name, last_calculation_timestamp)
@@ -133,3 +141,23 @@ BEGIN
RAISE NOTICE 'Finished % calculation. Duration: %', _module_name, clock_timestamp() - _start_time; RAISE NOTICE 'Finished % calculation. Duration: %', _module_name, clock_timestamp() - _start_time;
END $$; END $$;
-- Return metrics about the update operation for tracking
WITH update_stats AS (
SELECT
COUNT(*) as total_vendors,
COUNT(*) FILTER (WHERE last_calculated >= NOW() - INTERVAL '5 minutes') as rows_processed,
SUM(product_count) as total_products,
SUM(active_product_count) as total_active_products,
SUM(po_count_365d) as total_pos_365d,
AVG(avg_lead_time_days) as overall_avg_lead_time
FROM public.vendor_metrics
)
SELECT
rows_processed,
total_vendors,
total_products::int,
total_active_products::int,
total_pos_365d::int,
ROUND(overall_avg_lead_time, 1) as overall_avg_lead_time
FROM update_stats;

View File

@@ -202,3 +202,14 @@ BEGIN
RAISE NOTICE 'Finished % processing for multiple dates. Duration: %', _module_name, clock_timestamp() - _start_time; RAISE NOTICE 'Finished % processing for multiple dates. Duration: %', _module_name, clock_timestamp() - _start_time;
END $$; END $$;
-- Return the total records processed for tracking
SELECT
COUNT(*) as rows_processed,
COUNT(DISTINCT snapshot_date) as days_processed,
MIN(snapshot_date) as earliest_date,
MAX(snapshot_date) as latest_date,
SUM(units_sold) as total_units_sold,
SUM(units_received) as total_units_received
FROM public.daily_product_snapshots
WHERE calculation_timestamp >= (NOW() - INTERVAL '5 minutes'); -- Recent updates only

View File

@@ -115,3 +115,25 @@ BEGIN
RAISE NOTICE 'Finished % module. Duration: %', _module_name, clock_timestamp() - _start_time; RAISE NOTICE 'Finished % module. Duration: %', _module_name, clock_timestamp() - _start_time;
END $$; END $$;
-- Return metrics about the update operation for tracking
WITH update_stats AS (
SELECT
COUNT(*) as total_products,
COUNT(*) FILTER (WHERE last_calculated >= NOW() - INTERVAL '5 minutes') as rows_processed,
COUNT(*) FILTER (WHERE abc_class = 'A') as abc_a_count,
COUNT(*) FILTER (WHERE abc_class = 'B') as abc_b_count,
COUNT(*) FILTER (WHERE abc_class = 'C') as abc_c_count,
COUNT(*) FILTER (WHERE avg_lead_time_days IS NOT NULL) as products_with_lead_time,
AVG(avg_lead_time_days) as overall_avg_lead_time
FROM public.product_metrics
)
SELECT
rows_processed,
total_products,
abc_a_count,
abc_b_count,
abc_c_count,
products_with_lead_time,
ROUND(overall_avg_lead_time, 1) as overall_avg_lead_time
FROM update_stats;

View File

@@ -735,6 +735,22 @@ BEGIN
overstocked_units = EXCLUDED.overstocked_units, overstocked_cost = EXCLUDED.overstocked_cost, overstocked_retail = EXCLUDED.overstocked_retail, is_old_stock = EXCLUDED.is_old_stock, overstocked_units = EXCLUDED.overstocked_units, overstocked_cost = EXCLUDED.overstocked_cost, overstocked_retail = EXCLUDED.overstocked_retail, is_old_stock = EXCLUDED.is_old_stock,
yesterday_sales = EXCLUDED.yesterday_sales, yesterday_sales = EXCLUDED.yesterday_sales,
status = EXCLUDED.status status = EXCLUDED.status
WHERE -- Only update if at least one key metric has changed
product_metrics.current_stock IS DISTINCT FROM EXCLUDED.current_stock OR
product_metrics.current_price IS DISTINCT FROM EXCLUDED.current_price OR
product_metrics.current_cost_price IS DISTINCT FROM EXCLUDED.current_cost_price OR
product_metrics.on_order_qty IS DISTINCT FROM EXCLUDED.on_order_qty OR
product_metrics.sales_7d IS DISTINCT FROM EXCLUDED.sales_7d OR
product_metrics.sales_30d IS DISTINCT FROM EXCLUDED.sales_30d OR
product_metrics.revenue_30d IS DISTINCT FROM EXCLUDED.revenue_30d OR
product_metrics.status IS DISTINCT FROM EXCLUDED.status OR
product_metrics.replenishment_units IS DISTINCT FROM EXCLUDED.replenishment_units OR
product_metrics.stock_cover_in_days IS DISTINCT FROM EXCLUDED.stock_cover_in_days OR
product_metrics.yesterday_sales IS DISTINCT FROM EXCLUDED.yesterday_sales OR
-- Check a few other important fields that might change
product_metrics.date_last_sold IS DISTINCT FROM EXCLUDED.date_last_sold OR
product_metrics.earliest_expected_date IS DISTINCT FROM EXCLUDED.earliest_expected_date OR
product_metrics.lifetime_sales IS DISTINCT FROM EXCLUDED.lifetime_sales
; ;
-- Update the status table with the timestamp from the START of this run -- Update the status table with the timestamp from the START of this run
@@ -745,3 +761,28 @@ BEGIN
RAISE NOTICE 'Finished % module. Duration: %', _module_name, clock_timestamp() - _start_time; RAISE NOTICE 'Finished % module. Duration: %', _module_name, clock_timestamp() - _start_time;
END $$; END $$;
-- Return metrics about the update operation
WITH update_stats AS (
SELECT
COUNT(*) as total_products,
COUNT(*) FILTER (WHERE last_calculated >= NOW() - INTERVAL '5 minutes') as rows_processed,
COUNT(*) FILTER (WHERE status = 'Critical') as critical_count,
COUNT(*) FILTER (WHERE status = 'Reorder Soon') as reorder_soon_count,
COUNT(*) FILTER (WHERE status = 'Healthy') as healthy_count,
COUNT(*) FILTER (WHERE status = 'Overstock') as overstock_count,
COUNT(*) FILTER (WHERE status = 'At Risk') as at_risk_count,
COUNT(*) FILTER (WHERE status = 'New') as new_count
FROM public.product_metrics
)
SELECT
rows_processed,
total_products,
critical_count,
reorder_soon_count,
healthy_count,
overstock_count,
at_risk_count,
new_count,
ROUND((rows_processed::numeric / NULLIF(total_products, 0)) * 100, 2) as update_percentage
FROM update_stats;

View File

@@ -2,13 +2,23 @@ const fs = require('fs');
const path = require('path'); const path = require('path');
// Helper function to format elapsed time // Helper function to format elapsed time
function formatElapsedTime(elapsed) { function formatElapsedTime(startTime) {
// If elapsed is a timestamp, convert to elapsed milliseconds let elapsed;
if (elapsed instanceof Date || elapsed > 1000000000000) {
elapsed = Date.now() - elapsed; // If startTime is a timestamp (number representing milliseconds since epoch)
if (typeof startTime === 'number') {
// Check if it's a timestamp (will be a large number like 1700000000000)
if (startTime > 1000000000) { // timestamps are in milliseconds since 1970
elapsed = Date.now() - startTime;
} else {
// Assume it's already elapsed milliseconds
elapsed = startTime;
}
} else if (startTime instanceof Date) {
elapsed = Date.now() - startTime.getTime();
} else { } else {
// If elapsed is in seconds, convert to milliseconds // Default to 0 if invalid input
elapsed = elapsed * 1000; elapsed = 0;
} }
const seconds = Math.floor(elapsed / 1000); const seconds = Math.floor(elapsed / 1000);
@@ -16,7 +26,7 @@ function formatElapsedTime(elapsed) {
const hours = Math.floor(minutes / 60); const hours = Math.floor(minutes / 60);
if (hours > 0) { if (hours > 0) {
return `${hours}h ${minutes % 60}m`; return `${hours}h ${minutes % 60}m ${seconds % 60}s`;
} else if (minutes > 0) { } else if (minutes > 0) {
return `${minutes}m ${seconds % 60}s`; return `${minutes}m ${seconds % 60}s`;
} else { } else {
@@ -26,16 +36,31 @@ function formatElapsedTime(elapsed) {
// Helper function to estimate remaining time // Helper function to estimate remaining time
function estimateRemaining(startTime, current, total) { function estimateRemaining(startTime, current, total) {
if (current === 0) return null; // Handle edge cases
if (!current || current === 0 || !total || total === 0 || current >= total) {
return null;
}
// Calculate elapsed time in milliseconds
const elapsed = Date.now() - startTime; const elapsed = Date.now() - startTime;
if (elapsed <= 0) return null;
// Calculate rate (items per millisecond)
const rate = current / elapsed; const rate = current / elapsed;
if (rate <= 0) return null;
// Calculate remaining time in milliseconds
const remaining = (total - current) / rate; const remaining = (total - current) / rate;
const minutes = Math.floor(remaining / 60000); // Convert to readable format
const seconds = Math.floor((remaining % 60000) / 1000); const seconds = Math.floor(remaining / 1000);
const minutes = Math.floor(seconds / 60);
const hours = Math.floor(minutes / 60);
if (minutes > 0) { if (hours > 0) {
return `${minutes}m ${seconds}s`; return `${hours}h ${minutes % 60}m`;
} else if (minutes > 0) {
return `${minutes}m ${seconds % 60}s`;
} else { } else {
return `${seconds}s`; return `${seconds}s`;
} }

View File

@@ -286,7 +286,21 @@ router.post('/full-reset', async (req, res) => {
router.get('/history/import', async (req, res) => { router.get('/history/import', async (req, res) => {
try { try {
const pool = req.app.locals.pool; const pool = req.app.locals.pool;
const { rows } = await pool.query(`
// First check which columns exist
const { rows: columns } = await pool.query(`
SELECT column_name
FROM information_schema.columns
WHERE table_name = 'import_history'
AND column_name IN ('records_deleted', 'records_skipped', 'total_processed')
`);
const hasDeletedColumn = columns.some(col => col.column_name === 'records_deleted');
const hasSkippedColumn = columns.some(col => col.column_name === 'records_skipped');
const hasTotalProcessedColumn = columns.some(col => col.column_name === 'total_processed');
// Build query dynamically based on available columns
const query = `
SELECT SELECT
id, id,
start_time, start_time,
@@ -294,11 +308,19 @@ router.get('/history/import', async (req, res) => {
status, status,
error_message, error_message,
records_added::integer, records_added::integer,
records_updated::integer records_updated::integer,
${hasDeletedColumn ? 'records_deleted::integer,' : '0 as records_deleted,'}
${hasSkippedColumn ? 'records_skipped::integer,' : '0 as records_skipped,'}
${hasTotalProcessedColumn ? 'total_processed::integer,' : '0 as total_processed,'}
is_incremental,
additional_info,
EXTRACT(EPOCH FROM (COALESCE(end_time, NOW()) - start_time)) / 60 as duration_minutes
FROM import_history FROM import_history
ORDER BY start_time DESC ORDER BY start_time DESC
LIMIT 20 LIMIT 20
`); `;
const { rows } = await pool.query(query);
res.json(rows || []); res.json(rows || []);
} catch (error) { } catch (error) {
console.error('Error fetching import history:', error); console.error('Error fetching import history:', error);
@@ -315,7 +337,8 @@ router.get('/history/calculate', async (req, res) => {
id, id,
start_time, start_time,
end_time, end_time,
duration_minutes, EXTRACT(EPOCH FROM (COALESCE(end_time, NOW()) - start_time)) / 60 as duration_minutes,
duration_seconds,
status, status,
error_message, error_message,
total_products, total_products,

View File

@@ -44,17 +44,44 @@ interface HistoryRecord {
interface ImportHistoryRecord extends HistoryRecord { interface ImportHistoryRecord extends HistoryRecord {
records_added: number; records_added: number;
records_updated: number; records_updated: number;
records_deleted?: number;
records_skipped?: number;
is_incremental?: boolean; is_incremental?: boolean;
total_processed?: number;
additional_info?: {
step_timings?: Record<string, number>;
details?: {
categories?: { recordsAdded: number; recordsUpdated: number; skippedCategories?: number };
products?: { totalProcessed: number; needsUpdate: number; skippedUnchanged?: number };
orders?: { totalProcessed: number; totalSkipped: number; missingProducts: number };
purchaseOrders?: { recordsAdded: number; recordsUpdated: number; recordsDeleted: number; skippedProducts: number };
};
[key: string]: any;
};
} }
interface CalculateHistoryRecord extends HistoryRecord { interface CalculateHistoryRecord extends HistoryRecord {
total_products: number; total_products?: number;
total_orders: number; total_orders?: number;
total_purchase_orders: number; total_purchase_orders?: number;
processed_products: number; processed_products?: number;
processed_orders: number; processed_orders?: number;
processed_purchase_orders: number; processed_purchase_orders?: number;
duration_minutes?: number; duration_minutes?: number;
duration_seconds?: number;
additional_info?: {
type?: string;
steps?: string[];
completed_steps?: Array<{
name: string;
duration: number;
status: string;
rowsAffected?: number;
}>;
step_timings?: Record<string, number>;
step_row_counts?: Record<string, number>;
[key: string]: any;
};
} }
interface ModuleStatus { interface ModuleStatus {
@@ -369,6 +396,82 @@ export function DataManagement() {
const formatJsonData = (data: Record<string, any>) => { const formatJsonData = (data: Record<string, any>) => {
if (!data) return null; if (!data) return null;
// Special handling for completed_steps
if (data.completed_steps && Array.isArray(data.completed_steps)) {
return (
<div className="space-y-2 mt-2">
<div className="text-sm font-semibold text-gray-700">Completed Steps:</div>
<div className="space-y-1 bg-gray-50 p-2 rounded">
{data.completed_steps.map((step: any, idx: number) => (
<div key={idx} className="flex justify-between text-sm">
<span className="font-medium">{step.name}</span>
<div className="flex gap-4 text-gray-600">
<span>{step.duration}s</span>
{step.rowsAffected !== undefined && (
<span>{formatNumber(step.rowsAffected)} rows</span>
)}
<span className={step.status === 'completed' ? 'text-green-600' : 'text-red-600'}>
{step.status}
</span>
</div>
</div>
))}
</div>
{/* Show other data if present */}
{Object.keys(data).filter(k => k !== 'completed_steps').length > 0 && (
<div className="mt-2">
{formatJsonDataSimple(Object.fromEntries(
Object.entries(data).filter(([k]) => k !== 'completed_steps')
))}
</div>
)}
</div>
);
}
// Special handling for import details
if (data.details) {
return (
<div className="space-y-2 mt-2">
<div className="text-sm font-semibold text-gray-700">Import Details:</div>
<div className="space-y-2 bg-gray-50 p-2 rounded">
{Object.entries(data.details).map(([table, stats]: [string, any]) => (
<div key={table} className="border-b last:border-0 pb-2 last:pb-0">
<div className="font-medium text-sm capitalize mb-1">{table}:</div>
<div className="grid grid-cols-2 gap-x-4 gap-y-1 text-xs">
{Object.entries(stats).map(([key, value]) => (
<div key={key} className="flex justify-between">
<span className="text-gray-600">{key}:</span>
<span className="font-mono">{typeof value === 'number' ? formatNumber(value) : String(value)}</span>
</div>
))}
</div>
</div>
))}
</div>
{/* Show step timings if present */}
{data.step_timings && (
<div className="mt-2">
<div className="text-sm font-semibold text-gray-700">Step Timings:</div>
<div className="space-y-1 bg-gray-50 p-2 rounded text-sm">
{Object.entries(data.step_timings).map(([step, duration]) => (
<div key={step} className="flex justify-between">
<span className="text-gray-600">{step}:</span>
<span className="font-mono">{String(duration)}s</span>
</div>
))}
</div>
</div>
)}
</div>
);
}
// Default simple format
return formatJsonDataSimple(data);
};
const formatJsonDataSimple = (data: Record<string, any>) => {
// Find the longest key length // Find the longest key length
const maxKeyLength = Object.keys(data).reduce( const maxKeyLength = Object.keys(data).reduce(
(max, key) => Math.max(max, key.length), (max, key) => Math.max(max, key.length),
@@ -384,13 +487,13 @@ export function DataManagement() {
style={{ width: `${maxKeyLength + 2}ch` }} style={{ width: `${maxKeyLength + 2}ch` }}
> >
{key}: {key}:
</span> </span>
<span className="break-all"> <span className="break-all">
{typeof value === "object" {typeof value === "object"
? JSON.stringify(value) ? JSON.stringify(value)
: value?.toString()} : value?.toString()}
</span> </span>
</div> </div>
))} ))}
</div> </div>
); );
@@ -1133,12 +1236,33 @@ export function DataManagement() {
: "N/A"} : "N/A"}
</span> </span>
</div> </div>
<div className="flex justify-between text-sm"> <div className="grid grid-cols-2 gap-2 text-sm">
<span className="text-gray-600">Records:</span> <div className="flex justify-between">
<span> <span className="text-gray-600">Added:</span>
{record.records_added} added,{" "} <span className="text-green-600 font-medium">{formatNumber(record.records_added)}</span>
{record.records_updated} updated </div>
</span> <div className="flex justify-between">
<span className="text-gray-600">Updated:</span>
<span className="text-blue-600 font-medium">{formatNumber(record.records_updated)}</span>
</div>
{record.records_deleted !== undefined && (
<div className="flex justify-between">
<span className="text-gray-600">Deleted:</span>
<span className="text-red-600 font-medium">{formatNumber(record.records_deleted)}</span>
</div>
)}
{record.records_skipped !== undefined && (
<div className="flex justify-between">
<span className="text-gray-600">Skipped:</span>
<span className="text-yellow-600 font-medium">{formatNumber(record.records_skipped)}</span>
</div>
)}
{record.total_processed !== undefined && (
<div className="flex justify-between col-span-2">
<span className="text-gray-600">Total Processed:</span>
<span className="font-medium">{formatNumber(record.total_processed)}</span>
</div>
)}
</div> </div>
{record.error_message && ( {record.error_message && (
<div className="text-sm text-red-600 mt-2"> <div className="text-sm text-red-600 mt-2">