Optimize imports, fix up tracking records and time overall

This commit is contained in:
2025-06-19 11:15:04 -04:00
parent a97819f4a6
commit 8606a90e34
16 changed files with 601 additions and 85 deletions

View File

@@ -169,6 +169,9 @@ CREATE TABLE IF NOT EXISTS import_history (
duration_minutes DECIMAL(10,2) GENERATED ALWAYS AS (duration_seconds::decimal / 60.0) STORED,
records_added INTEGER DEFAULT 0,
records_updated INTEGER DEFAULT 0,
records_deleted INTEGER DEFAULT 0,
records_skipped INTEGER DEFAULT 0,
total_processed INTEGER DEFAULT 0,
is_incremental BOOLEAN DEFAULT FALSE,
status calculation_status DEFAULT 'running',
error_message TEXT,
@@ -178,4 +181,16 @@ CREATE TABLE IF NOT EXISTS import_history (
-- Create all indexes after tables are fully created
CREATE INDEX IF NOT EXISTS idx_last_calc ON calculate_status(last_calculation_timestamp);
CREATE INDEX IF NOT EXISTS idx_last_sync ON sync_status(last_sync_timestamp);
CREATE INDEX IF NOT EXISTS idx_table_time ON import_history(table_name, start_time);
CREATE INDEX IF NOT EXISTS idx_table_time ON import_history(table_name, start_time);
CREATE INDEX IF NOT EXISTS idx_import_history_status ON import_history(status);
CREATE INDEX IF NOT EXISTS idx_calculate_history_status ON calculate_history(status);
-- Add comments for documentation
COMMENT ON TABLE import_history IS 'Tracks history of data import operations with detailed statistics';
COMMENT ON COLUMN import_history.records_deleted IS 'Number of records deleted during this import';
COMMENT ON COLUMN import_history.records_skipped IS 'Number of records skipped (e.g., unchanged, invalid)';
COMMENT ON COLUMN import_history.total_processed IS 'Total number of records examined/processed, including skipped';
COMMENT ON TABLE calculate_history IS 'Tracks history of metrics calculation runs with performance data';
COMMENT ON COLUMN calculate_history.duration_seconds IS 'Total duration of the calculation in seconds';
COMMENT ON COLUMN calculate_history.additional_info IS 'JSON object containing step timings, row counts, and other detailed metrics';

View File

@@ -357,7 +357,7 @@ async function syncSettingsProductTable() {
* @param {string} config.historyType - Type identifier for calculate_history.
* @param {string} config.statusModule - Module name for calculate_status.
* @param {object} progress - Progress utility functions.
* @returns {Promise<{success: boolean, message: string, duration: number}>}
* @returns {Promise<{success: boolean, message: string, duration: number, rowsAffected: number}>}
*/
async function executeSqlStep(config, progress) {
if (isCancelled) throw new Error(`Calculation skipped step ${config.name} due to prior cancellation.`);
@@ -366,6 +366,7 @@ async function executeSqlStep(config, progress) {
console.log(`\n--- Starting Step: ${config.name} ---`);
const stepStartTime = Date.now();
let connection = null;
let rowsAffected = 0; // Track rows affected by this step
// Set timeout for this specific step
if (stepTimeoutHandle) clearTimeout(stepTimeoutHandle); // Clear previous step's timeout
@@ -414,7 +415,10 @@ async function executeSqlStep(config, progress) {
current: 0, total: 100,
elapsed: progress.formatElapsedTime(stepStartTime),
remaining: 'Calculating...', rate: 0, percentage: '0',
timing: { start_time: new Date(stepStartTime).toISOString() }
timing: {
start_time: new Date(stepStartTime).toISOString(),
step_start_ms: stepStartTime
}
});
// 5. Execute the Main SQL Query
@@ -423,15 +427,36 @@ async function executeSqlStep(config, progress) {
operation: `Executing SQL: ${config.name}`,
current: 25, total: 100,
elapsed: progress.formatElapsedTime(stepStartTime),
remaining: 'Executing...', rate: 0, percentage: '25',
timing: { start_time: new Date(stepStartTime).toISOString() }
remaining: 'Executing query...', rate: 0, percentage: '25',
timing: {
start_time: new Date(stepStartTime).toISOString(),
step_start_ms: stepStartTime
}
});
console.log(`Executing SQL for ${config.name}...`);
try {
// Try executing exactly as individual scripts do
console.log('Executing SQL with simple query method...');
await connection.query(sqlQuery);
const result = await connection.query(sqlQuery);
// Try to extract row count from result
if (result && result.rowCount !== undefined) {
rowsAffected = result.rowCount;
} else if (Array.isArray(result) && result[0] && result[0].rowCount !== undefined) {
rowsAffected = result[0].rowCount;
}
// Check if the query returned a result set with row count info
if (result && result.rows && result.rows.length > 0 && result.rows[0].rows_processed) {
rowsAffected = parseInt(result.rows[0].rows_processed) || rowsAffected;
console.log(`SQL returned metrics: ${JSON.stringify(result.rows[0])}`);
} else if (Array.isArray(result) && result[0] && result[0].rows && result[0].rows[0] && result[0].rows[0].rows_processed) {
rowsAffected = parseInt(result[0].rows[0].rows_processed) || rowsAffected;
console.log(`SQL returned metrics: ${JSON.stringify(result[0].rows[0])}`);
}
console.log(`SQL affected ${rowsAffected} rows`);
} catch (sqlError) {
if (sqlError.message.includes('could not determine data type of parameter')) {
console.log('Simple query failed with parameter type error, trying alternative method...');
@@ -492,7 +517,8 @@ async function executeSqlStep(config, progress) {
return {
success: true,
message: `${config.name} completed successfully`,
duration: stepDuration
duration: stepDuration,
rowsAffected: rowsAffected
};
} catch (error) {
@@ -664,6 +690,17 @@ async function runAllCalculations() {
combinedHistoryId = historyResult.rows[0].id;
console.log(`Created combined history record ID: ${combinedHistoryId}`);
// Get initial counts for tracking
const productCount = await connection.query('SELECT COUNT(*) as count FROM products');
const totalProducts = parseInt(productCount.rows[0].count);
// Update history with initial counts
await connection.query(`
UPDATE calculate_history
SET additional_info = additional_info || jsonb_build_object('total_products', $1::integer)
WHERE id = $2
`, [totalProducts, combinedHistoryId]);
connection.release();
} catch (historyError) {
console.error('Error creating combined history record:', historyError);
@@ -692,28 +729,49 @@ async function runAllCalculations() {
// Track completed steps
const completedSteps = [];
const stepTimings = {};
const stepRowCounts = {};
let currentStepIndex = 0;
// Now run the calculation steps
for (const step of steps) {
if (step.run) {
if (isCancelled) {
console.log(`Skipping step "${step.name}" due to cancellation.`);
overallSuccess = false; // Mark as not fully successful if steps are skipped due to cancel
continue; // Skip to next step
for (const step of stepsToRun) {
if (isCancelled) {
console.log(`Skipping step "${step.name}" due to cancellation.`);
overallSuccess = false; // Mark as not fully successful if steps are skipped due to cancel
continue; // Skip to next step
}
currentStepIndex++;
// Update overall progress
progressUtils.outputProgress({
status: 'running',
operation: 'Running calculations',
message: `Step ${currentStepIndex} of ${stepsToRun.length}: ${step.name}`,
current: currentStepIndex - 1,
total: stepsToRun.length,
elapsed: progressUtils.formatElapsedTime(overallStartTime),
remaining: progressUtils.estimateRemaining(overallStartTime, currentStepIndex - 1, stepsToRun.length),
percentage: Math.round(((currentStepIndex - 1) / stepsToRun.length) * 100).toString(),
timing: {
overall_start_time: new Date(overallStartTime).toISOString(),
current_step: step.name,
completed_steps: completedSteps.length
}
// Pass the progress utilities to the step executor
const result = await executeSqlStep(step, progressUtils);
if (result.success) {
completedSteps.push({
name: step.name,
duration: result.duration,
status: 'completed'
});
}
} else {
console.log(`Skipping step "${step.name}" (disabled by configuration).`);
});
// Pass the progress utilities to the step executor
const result = await executeSqlStep(step, progressUtils);
if (result.success) {
completedSteps.push({
name: step.name,
duration: result.duration,
status: 'completed',
rowsAffected: result.rowsAffected
});
stepTimings[step.name] = result.duration;
stepRowCounts[step.name] = result.rowsAffected;
}
}
@@ -726,18 +784,32 @@ async function runAllCalculations() {
connection = await getConnection();
const totalDuration = Math.round((Date.now() - overallStartTime) / 1000);
// Get final processed counts
const processedCounts = await connection.query(`
SELECT
(SELECT COUNT(*) FROM product_metrics WHERE last_calculated >= $1) as processed_products
`, [new Date(overallStartTime)]);
await connection.query(`
UPDATE calculate_history
SET
end_time = NOW(),
duration_seconds = $1::integer,
status = $2::calculation_status,
additional_info = additional_info || jsonb_build_object('completed_steps', $3::jsonb)
WHERE id = $4::integer;
additional_info = additional_info || jsonb_build_object(
'processed_products', $3::integer,
'completed_steps', $4::jsonb,
'step_timings', $5::jsonb,
'step_row_counts', $6::jsonb
)
WHERE id = $7::integer;
`, [
totalDuration,
isCancelled ? 'cancelled' : 'completed',
JSON.stringify(completedSteps),
isCancelled ? 'cancelled' : 'completed',
processedCounts.rows[0].processed_products,
JSON.stringify(completedSteps),
JSON.stringify(stepTimings),
JSON.stringify(stepRowCounts),
combinedHistoryId
]);
@@ -753,6 +825,26 @@ async function runAllCalculations() {
overallSuccess = false;
} else {
console.log("\n--- All enabled calculations finished successfully ---");
// Send final completion progress
progressUtils.outputProgress({
status: 'complete',
operation: 'All calculations completed',
message: `Successfully completed ${completedSteps.length} of ${stepsToRun.length} steps`,
current: stepsToRun.length,
total: stepsToRun.length,
elapsed: progressUtils.formatElapsedTime(overallStartTime),
remaining: '0s',
percentage: '100',
timing: {
overall_start_time: new Date(overallStartTime).toISOString(),
overall_end_time: new Date().toISOString(),
total_duration_seconds: Math.round((Date.now() - overallStartTime) / 1000),
step_timings: stepTimings,
completed_steps: completedSteps.length
}
});
progressUtils.clearProgress(); // Clear progress only on full success
}

View File

@@ -151,10 +151,16 @@ async function main() {
let totalRecordsAdded = 0;
let totalRecordsUpdated = 0;
let totalRecordsDeleted = 0; // Add tracking for deleted records
let totalRecordsSkipped = 0; // Track skipped/filtered records
const stepTimings = {};
// Run each import based on constants
if (IMPORT_CATEGORIES) {
const stepStart = Date.now();
results.categories = await importCategories(prodConnection, localConnection);
stepTimings.categories = Math.round((Date.now() - stepStart) / 1000);
if (isImportCancelled) throw new Error("Import cancelled");
completedSteps++;
console.log('Categories import result:', results.categories);
@@ -163,26 +169,37 @@ async function main() {
}
if (IMPORT_PRODUCTS) {
const stepStart = Date.now();
results.products = await importProducts(prodConnection, localConnection, INCREMENTAL_UPDATE);
stepTimings.products = Math.round((Date.now() - stepStart) / 1000);
if (isImportCancelled) throw new Error("Import cancelled");
completedSteps++;
console.log('Products import result:', results.products);
totalRecordsAdded += parseInt(results.products?.recordsAdded || 0);
totalRecordsUpdated += parseInt(results.products?.recordsUpdated || 0);
totalRecordsSkipped += parseInt(results.products?.skippedUnchanged || 0);
}
if (IMPORT_ORDERS) {
const stepStart = Date.now();
results.orders = await importOrders(prodConnection, localConnection, INCREMENTAL_UPDATE);
stepTimings.orders = Math.round((Date.now() - stepStart) / 1000);
if (isImportCancelled) throw new Error("Import cancelled");
completedSteps++;
console.log('Orders import result:', results.orders);
totalRecordsAdded += parseInt(results.orders?.recordsAdded || 0);
totalRecordsUpdated += parseInt(results.orders?.recordsUpdated || 0);
totalRecordsSkipped += parseInt(results.orders?.totalSkipped || 0);
}
if (IMPORT_PURCHASE_ORDERS) {
try {
const stepStart = Date.now();
results.purchaseOrders = await importPurchaseOrders(prodConnection, localConnection, INCREMENTAL_UPDATE);
stepTimings.purchaseOrders = Math.round((Date.now() - stepStart) / 1000);
if (isImportCancelled) throw new Error("Import cancelled");
completedSteps++;
console.log('Purchase orders import result:', results.purchaseOrders);
@@ -193,6 +210,7 @@ async function main() {
} else {
totalRecordsAdded += parseInt(results.purchaseOrders?.recordsAdded || 0);
totalRecordsUpdated += parseInt(results.purchaseOrders?.recordsUpdated || 0);
totalRecordsDeleted += parseInt(results.purchaseOrders?.recordsDeleted || 0);
}
} catch (error) {
console.error('Error during purchase orders import:', error);
@@ -226,9 +244,12 @@ async function main() {
'categories_result', COALESCE($8::jsonb, 'null'::jsonb),
'products_result', COALESCE($9::jsonb, 'null'::jsonb),
'orders_result', COALESCE($10::jsonb, 'null'::jsonb),
'purchase_orders_result', COALESCE($11::jsonb, 'null'::jsonb)
'purchase_orders_result', COALESCE($11::jsonb, 'null'::jsonb),
'total_deleted', $12::integer,
'total_skipped', $13::integer,
'step_timings', $14::jsonb
)
WHERE id = $12
WHERE id = $15
`, [
totalElapsedSeconds,
parseInt(totalRecordsAdded),
@@ -241,6 +262,9 @@ async function main() {
JSON.stringify(results.products),
JSON.stringify(results.orders),
JSON.stringify(results.purchaseOrders),
totalRecordsDeleted,
totalRecordsSkipped,
JSON.stringify(stepTimings),
importHistoryId
]);

View File

@@ -92,6 +92,12 @@ async function importCategories(prodConnection, localConnection) {
description = EXCLUDED.description,
status = EXCLUDED.status,
updated_at = EXCLUDED.updated_at
WHERE -- Only update if at least one field has changed
categories.name IS DISTINCT FROM EXCLUDED.name OR
categories.type IS DISTINCT FROM EXCLUDED.type OR
categories.parent_id IS DISTINCT FROM EXCLUDED.parent_id OR
categories.description IS DISTINCT FROM EXCLUDED.description OR
categories.status IS DISTINCT FROM EXCLUDED.status
RETURNING
cat_id,
CASE
@@ -133,7 +139,7 @@ async function importCategories(prodConnection, localConnection) {
message: `Imported ${inserted} (updated ${updated}) categories of type ${type}`,
current: totalInserted + totalUpdated,
total: categories.length,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
elapsed: formatElapsedTime(startTime),
});
} catch (error) {
// Rollback to the savepoint for this type
@@ -161,7 +167,7 @@ async function importCategories(prodConnection, localConnection) {
operation: "Categories import completed",
current: totalInserted + totalUpdated,
total: totalInserted + totalUpdated,
duration: formatElapsedTime((Date.now() - startTime) / 1000),
duration: formatElapsedTime(startTime),
warnings: skippedCategories.length > 0 ? {
message: "Some categories were skipped due to missing parents",
skippedCategories

View File

@@ -221,8 +221,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
message: `Loading order items: ${processedCount} of ${totalOrderItems}`,
current: processedCount,
total: totalOrderItems,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, processedCount, totalOrderItems),
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalOrderItems),
rate: calculateRate(startTime, processedCount)
});
} catch (error) {
@@ -530,8 +530,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
message: `Loading order data: ${processedCount} of ${totalUniqueOrders}`,
current: processedCount,
total: totalUniqueOrders,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, processedCount, totalUniqueOrders),
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalUniqueOrders),
rate: calculateRate(startTime, processedCount)
});
}
@@ -681,6 +681,15 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
status = EXCLUDED.status,
canceled = EXCLUDED.canceled,
costeach = EXCLUDED.costeach
WHERE -- Only update if at least one key field has changed
orders.price IS DISTINCT FROM EXCLUDED.price OR
orders.quantity IS DISTINCT FROM EXCLUDED.quantity OR
orders.discount IS DISTINCT FROM EXCLUDED.discount OR
orders.tax IS DISTINCT FROM EXCLUDED.tax OR
orders.status IS DISTINCT FROM EXCLUDED.status OR
orders.canceled IS DISTINCT FROM EXCLUDED.canceled OR
orders.costeach IS DISTINCT FROM EXCLUDED.costeach OR
orders.date IS DISTINCT FROM EXCLUDED.date
RETURNING xmax = 0 as inserted
)
SELECT
@@ -704,7 +713,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
message: `Importing orders: ${cumulativeProcessedOrders} of ${totalUniqueOrders}`,
current: cumulativeProcessedOrders,
total: totalUniqueOrders,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, cumulativeProcessedOrders, totalUniqueOrders),
rate: calculateRate(startTime, cumulativeProcessedOrders)
});
@@ -751,8 +760,15 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
recordsUpdated: parseInt(recordsUpdated) || 0,
totalSkipped: skippedOrders.size || 0,
missingProducts: missingProducts.size || 0,
totalProcessed: orderItems.length, // Total order items in source
incrementalUpdate,
lastSyncTime
lastSyncTime,
details: {
uniqueOrdersProcessed: cumulativeProcessedOrders,
totalOrderItems: orderItems.length,
skippedDueToMissingProducts: skippedOrders.size,
missingProductIds: Array.from(missingProducts).slice(0, 100) // First 100 for debugging
}
};
} catch (error) {
console.error("Error during orders import:", error);

View File

@@ -576,8 +576,8 @@ async function materializeCalculations(prodConnection, localConnection, incremen
message: `Imported ${i + batch.length} of ${prodData.length} products`,
current: i + batch.length,
total: prodData.length,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, i + batch.length, prodData.length),
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, i + batch.length, prodData.length),
rate: calculateRate(startTime, i + batch.length)
});
}
@@ -587,6 +587,59 @@ async function materializeCalculations(prodConnection, localConnection, incremen
operation: "Products import",
message: "Finished materializing calculations"
});
// Add step to identify which products actually need updating
outputProgress({
status: "running",
operation: "Products import",
message: "Identifying changed products"
});
// Mark products that haven't changed as needs_update = false
await localConnection.query(`
UPDATE temp_products t
SET needs_update = FALSE
FROM products p
WHERE t.pid = p.pid
AND t.title IS NOT DISTINCT FROM p.title
AND t.description IS NOT DISTINCT FROM p.description
AND t.sku IS NOT DISTINCT FROM p.sku
AND t.stock_quantity = p.stock_quantity
AND t.price = p.price
AND t.regular_price = p.regular_price
AND t.cost_price IS NOT DISTINCT FROM p.cost_price
AND t.vendor IS NOT DISTINCT FROM p.vendor
AND t.brand IS NOT DISTINCT FROM p.brand
AND t.visible = p.visible
AND t.replenishable = p.replenishable
AND t.barcode IS NOT DISTINCT FROM p.barcode
AND t.updated_at IS NOT DISTINCT FROM p.updated_at
AND t.total_sold IS NOT DISTINCT FROM p.total_sold
-- Check key fields that are likely to change
-- We don't need to check every single field, just the important ones
`);
// Get count of products that need updating
const [countResult] = await localConnection.query(`
SELECT
COUNT(*) FILTER (WHERE needs_update = true) as update_count,
COUNT(*) FILTER (WHERE needs_update = false) as skip_count,
COUNT(*) as total_count
FROM temp_products
`);
outputProgress({
status: "running",
operation: "Products import",
message: `Found ${countResult.rows[0].update_count} products that need updating, ${countResult.rows[0].skip_count} unchanged`
});
// Return the total products processed
return {
totalProcessed: prodData.length,
needsUpdate: parseInt(countResult.rows[0].update_count),
skipped: parseInt(countResult.rows[0].skip_count)
};
}
async function importProducts(prodConnection, localConnection, incrementalUpdate = true) {
@@ -612,7 +665,7 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
await setupTemporaryTables(localConnection);
// Materialize calculations into temp table
await materializeCalculations(prodConnection, localConnection, incrementalUpdate, lastSyncTime, startTime);
const materializeResult = await materializeCalculations(prodConnection, localConnection, incrementalUpdate, lastSyncTime, startTime);
// Get the list of products that need updating
const [products] = await localConnection.query(`
@@ -847,8 +900,8 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
message: `Processing products: ${i + batch.length} of ${products.rows.length}`,
current: i + batch.length,
total: products.rows.length,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, i + batch.length, products.rows.length),
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, i + batch.length, products.rows.length),
rate: calculateRate(startTime, i + batch.length)
});
}
@@ -872,7 +925,10 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
recordsAdded,
recordsUpdated,
totalRecords: products.rows.length,
duration: formatElapsedTime(Date.now() - startTime)
totalProcessed: materializeResult.totalProcessed,
duration: formatElapsedTime(startTime),
needsUpdate: materializeResult.needsUpdate,
skippedUnchanged: materializeResult.skipped
};
} catch (error) {
// Rollback on error

View File

@@ -398,7 +398,7 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
message: `Processed ${offset} of ${totalPOs} purchase orders (${totalProcessed} line items)`,
current: offset,
total: totalPOs,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, offset, totalPOs),
rate: calculateRate(startTime, offset)
});
@@ -605,7 +605,7 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
message: `Processed ${offset} of ${totalReceivings} receivings (${totalProcessed} line items total)`,
current: offset,
total: totalReceivings,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, offset, totalReceivings),
rate: calculateRate(startTime, offset)
});
@@ -730,6 +730,13 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
date_created = EXCLUDED.date_created,
date_ordered = EXCLUDED.date_ordered,
updated = CURRENT_TIMESTAMP
WHERE -- Only update if at least one key field has changed
purchase_orders.ordered IS DISTINCT FROM EXCLUDED.ordered OR
purchase_orders.po_cost_price IS DISTINCT FROM EXCLUDED.po_cost_price OR
purchase_orders.status IS DISTINCT FROM EXCLUDED.status OR
purchase_orders.expected_date IS DISTINCT FROM EXCLUDED.expected_date OR
purchase_orders.date IS DISTINCT FROM EXCLUDED.date OR
purchase_orders.vendor IS DISTINCT FROM EXCLUDED.vendor
RETURNING (xmax = 0) as inserted
`);
@@ -806,6 +813,12 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
supplier_id = EXCLUDED.supplier_id,
status = EXCLUDED.status,
updated = CURRENT_TIMESTAMP
WHERE -- Only update if at least one key field has changed
receivings.qty_each IS DISTINCT FROM EXCLUDED.qty_each OR
receivings.cost_each IS DISTINCT FROM EXCLUDED.cost_each OR
receivings.status IS DISTINCT FROM EXCLUDED.status OR
receivings.received_date IS DISTINCT FROM EXCLUDED.received_date OR
receivings.received_by IS DISTINCT FROM EXCLUDED.received_by
RETURNING (xmax = 0) as inserted
`);

View File

@@ -110,4 +110,26 @@ BEGIN
ON CONFLICT (module_name) DO UPDATE SET last_calculation_timestamp = _start_time;
RAISE NOTICE 'Finished % calculation. Duration: %', _module_name, clock_timestamp() - _start_time;
END $$;
END $$;
-- Return metrics about the update operation for tracking
WITH update_stats AS (
SELECT
COUNT(*) as total_brands,
COUNT(*) FILTER (WHERE last_calculated >= NOW() - INTERVAL '5 minutes') as rows_processed,
SUM(product_count) as total_products,
SUM(active_product_count) as total_active_products,
SUM(sales_30d) as total_sales_30d,
SUM(revenue_30d) as total_revenue_30d,
AVG(avg_margin_30d) as overall_avg_margin_30d
FROM public.brand_metrics
)
SELECT
rows_processed,
total_brands,
total_products::int,
total_active_products::int,
total_sales_30d::int,
ROUND(total_revenue_30d, 2) as total_revenue_30d,
ROUND(overall_avg_margin_30d, 2) as overall_avg_margin_30d
FROM update_stats;

View File

@@ -284,4 +284,26 @@ BEGIN
ON CONFLICT (module_name) DO UPDATE SET last_calculation_timestamp = _start_time;
RAISE NOTICE 'Finished % calculation. Duration: %', _module_name, clock_timestamp() - _start_time;
END $$;
END $$;
-- Return metrics about the update operation for tracking
WITH update_stats AS (
SELECT
COUNT(*) as total_categories,
COUNT(*) FILTER (WHERE last_calculated >= NOW() - INTERVAL '5 minutes') as rows_processed,
COUNT(*) FILTER (WHERE category_type = 11) as main_categories, -- 11 = category
COUNT(*) FILTER (WHERE category_type = 12) as subcategories, -- 12 = subcategory
SUM(product_count) as total_products,
SUM(active_product_count) as total_active_products,
SUM(current_stock_units) as total_stock_units
FROM public.category_metrics
)
SELECT
rows_processed,
total_categories,
main_categories,
subcategories,
total_products::int,
total_active_products::int,
total_stock_units::int
FROM update_stats;

View File

@@ -140,4 +140,24 @@ BEGIN
ON CONFLICT (module_name) DO UPDATE SET last_calculation_timestamp = _start_time;
RAISE NOTICE 'Finished % calculation. Duration: %', _module_name, clock_timestamp() - _start_time;
END $$;
END $$;
-- Return metrics about the update operation for tracking
WITH update_stats AS (
SELECT
COUNT(*) as total_vendors,
COUNT(*) FILTER (WHERE last_calculated >= NOW() - INTERVAL '5 minutes') as rows_processed,
SUM(product_count) as total_products,
SUM(active_product_count) as total_active_products,
SUM(po_count_365d) as total_pos_365d,
AVG(avg_lead_time_days) as overall_avg_lead_time
FROM public.vendor_metrics
)
SELECT
rows_processed,
total_vendors,
total_products::int,
total_active_products::int,
total_pos_365d::int,
ROUND(overall_avg_lead_time, 1) as overall_avg_lead_time
FROM update_stats;

View File

@@ -201,4 +201,15 @@ BEGIN
RAISE NOTICE 'Finished % processing for multiple dates. Duration: %', _module_name, clock_timestamp() - _start_time;
END $$;
END $$;
-- Return the total records processed for tracking
SELECT
COUNT(*) as rows_processed,
COUNT(DISTINCT snapshot_date) as days_processed,
MIN(snapshot_date) as earliest_date,
MAX(snapshot_date) as latest_date,
SUM(units_sold) as total_units_sold,
SUM(units_received) as total_units_received
FROM public.daily_product_snapshots
WHERE calculation_timestamp >= (NOW() - INTERVAL '5 minutes'); -- Recent updates only

View File

@@ -114,4 +114,26 @@ BEGIN
RAISE NOTICE 'Finished % module. Duration: %', _module_name, clock_timestamp() - _start_time;
END $$;
END $$;
-- Return metrics about the update operation for tracking
WITH update_stats AS (
SELECT
COUNT(*) as total_products,
COUNT(*) FILTER (WHERE last_calculated >= NOW() - INTERVAL '5 minutes') as rows_processed,
COUNT(*) FILTER (WHERE abc_class = 'A') as abc_a_count,
COUNT(*) FILTER (WHERE abc_class = 'B') as abc_b_count,
COUNT(*) FILTER (WHERE abc_class = 'C') as abc_c_count,
COUNT(*) FILTER (WHERE avg_lead_time_days IS NOT NULL) as products_with_lead_time,
AVG(avg_lead_time_days) as overall_avg_lead_time
FROM public.product_metrics
)
SELECT
rows_processed,
total_products,
abc_a_count,
abc_b_count,
abc_c_count,
products_with_lead_time,
ROUND(overall_avg_lead_time, 1) as overall_avg_lead_time
FROM update_stats;

View File

@@ -760,4 +760,29 @@ BEGIN
RAISE NOTICE 'Finished % module. Duration: %', _module_name, clock_timestamp() - _start_time;
END $$;
END $$;
-- Return metrics about the update operation
WITH update_stats AS (
SELECT
COUNT(*) as total_products,
COUNT(*) FILTER (WHERE last_calculated >= NOW() - INTERVAL '5 minutes') as rows_processed,
COUNT(*) FILTER (WHERE status = 'Critical') as critical_count,
COUNT(*) FILTER (WHERE status = 'Reorder Soon') as reorder_soon_count,
COUNT(*) FILTER (WHERE status = 'Healthy') as healthy_count,
COUNT(*) FILTER (WHERE status = 'Overstock') as overstock_count,
COUNT(*) FILTER (WHERE status = 'At Risk') as at_risk_count,
COUNT(*) FILTER (WHERE status = 'New') as new_count
FROM public.product_metrics
)
SELECT
rows_processed,
total_products,
critical_count,
reorder_soon_count,
healthy_count,
overstock_count,
at_risk_count,
new_count,
ROUND((rows_processed::numeric / NULLIF(total_products, 0)) * 100, 2) as update_percentage
FROM update_stats;

View File

@@ -2,13 +2,23 @@ const fs = require('fs');
const path = require('path');
// Helper function to format elapsed time
function formatElapsedTime(elapsed) {
// If elapsed is a timestamp, convert to elapsed milliseconds
if (elapsed instanceof Date || elapsed > 1000000000000) {
elapsed = Date.now() - elapsed;
function formatElapsedTime(startTime) {
let elapsed;
// If startTime is a timestamp (number representing milliseconds since epoch)
if (typeof startTime === 'number') {
// Check if it's a timestamp (will be a large number like 1700000000000)
if (startTime > 1000000000) { // timestamps are in milliseconds since 1970
elapsed = Date.now() - startTime;
} else {
// Assume it's already elapsed milliseconds
elapsed = startTime;
}
} else if (startTime instanceof Date) {
elapsed = Date.now() - startTime.getTime();
} else {
// If elapsed is in seconds, convert to milliseconds
elapsed = elapsed * 1000;
// Default to 0 if invalid input
elapsed = 0;
}
const seconds = Math.floor(elapsed / 1000);
@@ -16,7 +26,7 @@ function formatElapsedTime(elapsed) {
const hours = Math.floor(minutes / 60);
if (hours > 0) {
return `${hours}h ${minutes % 60}m`;
return `${hours}h ${minutes % 60}m ${seconds % 60}s`;
} else if (minutes > 0) {
return `${minutes}m ${seconds % 60}s`;
} else {
@@ -26,16 +36,31 @@ function formatElapsedTime(elapsed) {
// Helper function to estimate remaining time
function estimateRemaining(startTime, current, total) {
if (current === 0) return null;
// Handle edge cases
if (!current || current === 0 || !total || total === 0 || current >= total) {
return null;
}
// Calculate elapsed time in milliseconds
const elapsed = Date.now() - startTime;
if (elapsed <= 0) return null;
// Calculate rate (items per millisecond)
const rate = current / elapsed;
if (rate <= 0) return null;
// Calculate remaining time in milliseconds
const remaining = (total - current) / rate;
const minutes = Math.floor(remaining / 60000);
const seconds = Math.floor((remaining % 60000) / 1000);
// Convert to readable format
const seconds = Math.floor(remaining / 1000);
const minutes = Math.floor(seconds / 60);
const hours = Math.floor(minutes / 60);
if (minutes > 0) {
return `${minutes}m ${seconds}s`;
if (hours > 0) {
return `${hours}h ${minutes % 60}m`;
} else if (minutes > 0) {
return `${minutes}m ${seconds % 60}s`;
} else {
return `${seconds}s`;
}

View File

@@ -286,7 +286,21 @@ router.post('/full-reset', async (req, res) => {
router.get('/history/import', async (req, res) => {
try {
const pool = req.app.locals.pool;
const { rows } = await pool.query(`
// First check which columns exist
const { rows: columns } = await pool.query(`
SELECT column_name
FROM information_schema.columns
WHERE table_name = 'import_history'
AND column_name IN ('records_deleted', 'records_skipped', 'total_processed')
`);
const hasDeletedColumn = columns.some(col => col.column_name === 'records_deleted');
const hasSkippedColumn = columns.some(col => col.column_name === 'records_skipped');
const hasTotalProcessedColumn = columns.some(col => col.column_name === 'total_processed');
// Build query dynamically based on available columns
const query = `
SELECT
id,
start_time,
@@ -294,11 +308,19 @@ router.get('/history/import', async (req, res) => {
status,
error_message,
records_added::integer,
records_updated::integer
records_updated::integer,
${hasDeletedColumn ? 'records_deleted::integer,' : '0 as records_deleted,'}
${hasSkippedColumn ? 'records_skipped::integer,' : '0 as records_skipped,'}
${hasTotalProcessedColumn ? 'total_processed::integer,' : '0 as total_processed,'}
is_incremental,
additional_info,
EXTRACT(EPOCH FROM (COALESCE(end_time, NOW()) - start_time)) / 60 as duration_minutes
FROM import_history
ORDER BY start_time DESC
LIMIT 20
`);
`;
const { rows } = await pool.query(query);
res.json(rows || []);
} catch (error) {
console.error('Error fetching import history:', error);
@@ -315,7 +337,8 @@ router.get('/history/calculate', async (req, res) => {
id,
start_time,
end_time,
duration_minutes,
EXTRACT(EPOCH FROM (COALESCE(end_time, NOW()) - start_time)) / 60 as duration_minutes,
duration_seconds,
status,
error_message,
total_products,