2 Commits

19 changed files with 660 additions and 136 deletions

View File

@@ -169,6 +169,9 @@ CREATE TABLE IF NOT EXISTS import_history (
duration_minutes DECIMAL(10,2) GENERATED ALWAYS AS (duration_seconds::decimal / 60.0) STORED,
records_added INTEGER DEFAULT 0,
records_updated INTEGER DEFAULT 0,
records_deleted INTEGER DEFAULT 0,
records_skipped INTEGER DEFAULT 0,
total_processed INTEGER DEFAULT 0,
is_incremental BOOLEAN DEFAULT FALSE,
status calculation_status DEFAULT 'running',
error_message TEXT,
@@ -179,3 +182,15 @@ CREATE TABLE IF NOT EXISTS import_history (
CREATE INDEX IF NOT EXISTS idx_last_calc ON calculate_status(last_calculation_timestamp);
CREATE INDEX IF NOT EXISTS idx_last_sync ON sync_status(last_sync_timestamp);
CREATE INDEX IF NOT EXISTS idx_table_time ON import_history(table_name, start_time);
CREATE INDEX IF NOT EXISTS idx_import_history_status ON import_history(status);
CREATE INDEX IF NOT EXISTS idx_calculate_history_status ON calculate_history(status);
-- Add comments for documentation
COMMENT ON TABLE import_history IS 'Tracks history of data import operations with detailed statistics';
COMMENT ON COLUMN import_history.records_deleted IS 'Number of records deleted during this import';
COMMENT ON COLUMN import_history.records_skipped IS 'Number of records skipped (e.g., unchanged, invalid)';
COMMENT ON COLUMN import_history.total_processed IS 'Total number of records examined/processed, including skipped';
COMMENT ON TABLE calculate_history IS 'Tracks history of metrics calculation runs with performance data';
COMMENT ON COLUMN calculate_history.duration_seconds IS 'Total duration of the calculation in seconds';
COMMENT ON COLUMN calculate_history.additional_info IS 'JSON object containing step timings, row counts, and other detailed metrics';

View File

@@ -1,4 +1,4 @@
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics-new/utils/progress');
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../scripts/metrics-new/utils/progress');
const fs = require('fs');
const path = require('path');
const { pipeline } = require('stream');

View File

@@ -24,7 +24,7 @@ process.on('unhandledRejection', (reason, promise) => {
});
// Load progress module
const progress = require('../utils/progress');
const progress = require('../scripts/metrics-new/utils/progress');
// Store progress functions in global scope to ensure availability
global.formatElapsedTime = progress.formatElapsedTime;
@@ -36,7 +36,7 @@ global.getProgress = progress.getProgress;
global.logError = progress.logError;
// Load database module
const { getConnection, closePool } = require('../utils/db');
const { getConnection, closePool } = require('../scripts/metrics-new/utils/db');
// Add cancel handler
let isCancelled = false;

View File

@@ -357,7 +357,7 @@ async function syncSettingsProductTable() {
* @param {string} config.historyType - Type identifier for calculate_history.
* @param {string} config.statusModule - Module name for calculate_status.
* @param {object} progress - Progress utility functions.
* @returns {Promise<{success: boolean, message: string, duration: number}>}
* @returns {Promise<{success: boolean, message: string, duration: number, rowsAffected: number}>}
*/
async function executeSqlStep(config, progress) {
if (isCancelled) throw new Error(`Calculation skipped step ${config.name} due to prior cancellation.`);
@@ -366,6 +366,7 @@ async function executeSqlStep(config, progress) {
console.log(`\n--- Starting Step: ${config.name} ---`);
const stepStartTime = Date.now();
let connection = null;
let rowsAffected = 0; // Track rows affected by this step
// Set timeout for this specific step
if (stepTimeoutHandle) clearTimeout(stepTimeoutHandle); // Clear previous step's timeout
@@ -414,7 +415,10 @@ async function executeSqlStep(config, progress) {
current: 0, total: 100,
elapsed: progress.formatElapsedTime(stepStartTime),
remaining: 'Calculating...', rate: 0, percentage: '0',
timing: { start_time: new Date(stepStartTime).toISOString() }
timing: {
start_time: new Date(stepStartTime).toISOString(),
step_start_ms: stepStartTime
}
});
// 5. Execute the Main SQL Query
@@ -423,15 +427,36 @@ async function executeSqlStep(config, progress) {
operation: `Executing SQL: ${config.name}`,
current: 25, total: 100,
elapsed: progress.formatElapsedTime(stepStartTime),
remaining: 'Executing...', rate: 0, percentage: '25',
timing: { start_time: new Date(stepStartTime).toISOString() }
remaining: 'Executing query...', rate: 0, percentage: '25',
timing: {
start_time: new Date(stepStartTime).toISOString(),
step_start_ms: stepStartTime
}
});
console.log(`Executing SQL for ${config.name}...`);
try {
// Try executing exactly as individual scripts do
console.log('Executing SQL with simple query method...');
await connection.query(sqlQuery);
const result = await connection.query(sqlQuery);
// Try to extract row count from result
if (result && result.rowCount !== undefined) {
rowsAffected = result.rowCount;
} else if (Array.isArray(result) && result[0] && result[0].rowCount !== undefined) {
rowsAffected = result[0].rowCount;
}
// Check if the query returned a result set with row count info
if (result && result.rows && result.rows.length > 0 && result.rows[0].rows_processed) {
rowsAffected = parseInt(result.rows[0].rows_processed) || rowsAffected;
console.log(`SQL returned metrics: ${JSON.stringify(result.rows[0])}`);
} else if (Array.isArray(result) && result[0] && result[0].rows && result[0].rows[0] && result[0].rows[0].rows_processed) {
rowsAffected = parseInt(result[0].rows[0].rows_processed) || rowsAffected;
console.log(`SQL returned metrics: ${JSON.stringify(result[0].rows[0])}`);
}
console.log(`SQL affected ${rowsAffected} rows`);
} catch (sqlError) {
if (sqlError.message.includes('could not determine data type of parameter')) {
console.log('Simple query failed with parameter type error, trying alternative method...');
@@ -492,7 +517,8 @@ async function executeSqlStep(config, progress) {
return {
success: true,
message: `${config.name} completed successfully`,
duration: stepDuration
duration: stepDuration,
rowsAffected: rowsAffected
};
} catch (error) {
@@ -664,6 +690,17 @@ async function runAllCalculations() {
combinedHistoryId = historyResult.rows[0].id;
console.log(`Created combined history record ID: ${combinedHistoryId}`);
// Get initial counts for tracking
const productCount = await connection.query('SELECT COUNT(*) as count FROM products');
const totalProducts = parseInt(productCount.rows[0].count);
// Update history with initial counts
await connection.query(`
UPDATE calculate_history
SET additional_info = additional_info || jsonb_build_object('total_products', $1::integer)
WHERE id = $2
`, [totalProducts, combinedHistoryId]);
connection.release();
} catch (historyError) {
console.error('Error creating combined history record:', historyError);
@@ -692,28 +729,49 @@ async function runAllCalculations() {
// Track completed steps
const completedSteps = [];
const stepTimings = {};
const stepRowCounts = {};
let currentStepIndex = 0;
// Now run the calculation steps
for (const step of steps) {
if (step.run) {
if (isCancelled) {
console.log(`Skipping step "${step.name}" due to cancellation.`);
overallSuccess = false; // Mark as not fully successful if steps are skipped due to cancel
continue; // Skip to next step
}
for (const step of stepsToRun) {
if (isCancelled) {
console.log(`Skipping step "${step.name}" due to cancellation.`);
overallSuccess = false; // Mark as not fully successful if steps are skipped due to cancel
continue; // Skip to next step
}
// Pass the progress utilities to the step executor
const result = await executeSqlStep(step, progressUtils);
currentStepIndex++;
if (result.success) {
completedSteps.push({
name: step.name,
duration: result.duration,
status: 'completed'
});
// Update overall progress
progressUtils.outputProgress({
status: 'running',
operation: 'Running calculations',
message: `Step ${currentStepIndex} of ${stepsToRun.length}: ${step.name}`,
current: currentStepIndex - 1,
total: stepsToRun.length,
elapsed: progressUtils.formatElapsedTime(overallStartTime),
remaining: progressUtils.estimateRemaining(overallStartTime, currentStepIndex - 1, stepsToRun.length),
percentage: Math.round(((currentStepIndex - 1) / stepsToRun.length) * 100).toString(),
timing: {
overall_start_time: new Date(overallStartTime).toISOString(),
current_step: step.name,
completed_steps: completedSteps.length
}
} else {
console.log(`Skipping step "${step.name}" (disabled by configuration).`);
});
// Pass the progress utilities to the step executor
const result = await executeSqlStep(step, progressUtils);
if (result.success) {
completedSteps.push({
name: step.name,
duration: result.duration,
status: 'completed',
rowsAffected: result.rowsAffected
});
stepTimings[step.name] = result.duration;
stepRowCounts[step.name] = result.rowsAffected;
}
}
@@ -726,18 +784,32 @@ async function runAllCalculations() {
connection = await getConnection();
const totalDuration = Math.round((Date.now() - overallStartTime) / 1000);
// Get final processed counts
const processedCounts = await connection.query(`
SELECT
(SELECT COUNT(*) FROM product_metrics WHERE last_calculated >= $1) as processed_products
`, [new Date(overallStartTime)]);
await connection.query(`
UPDATE calculate_history
SET
end_time = NOW(),
duration_seconds = $1::integer,
status = $2::calculation_status,
additional_info = additional_info || jsonb_build_object('completed_steps', $3::jsonb)
WHERE id = $4::integer;
additional_info = additional_info || jsonb_build_object(
'processed_products', $3::integer,
'completed_steps', $4::jsonb,
'step_timings', $5::jsonb,
'step_row_counts', $6::jsonb
)
WHERE id = $7::integer;
`, [
totalDuration,
isCancelled ? 'cancelled' : 'completed',
processedCounts.rows[0].processed_products,
JSON.stringify(completedSteps),
JSON.stringify(stepTimings),
JSON.stringify(stepRowCounts),
combinedHistoryId
]);
@@ -753,6 +825,26 @@ async function runAllCalculations() {
overallSuccess = false;
} else {
console.log("\n--- All enabled calculations finished successfully ---");
// Send final completion progress
progressUtils.outputProgress({
status: 'complete',
operation: 'All calculations completed',
message: `Successfully completed ${completedSteps.length} of ${stepsToRun.length} steps`,
current: stepsToRun.length,
total: stepsToRun.length,
elapsed: progressUtils.formatElapsedTime(overallStartTime),
remaining: '0s',
percentage: '100',
timing: {
overall_start_time: new Date(overallStartTime).toISOString(),
overall_end_time: new Date().toISOString(),
total_duration_seconds: Math.round((Date.now() - overallStartTime) / 1000),
step_timings: stepTimings,
completed_steps: completedSteps.length
}
});
progressUtils.clearProgress(); // Clear progress only on full success
}

View File

@@ -6,7 +6,6 @@ const importCategories = require('./import/categories');
const { importProducts } = require('./import/products');
const importOrders = require('./import/orders');
const importPurchaseOrders = require('./import/purchase-orders');
const importHistoricalData = require('./import/historical-data');
dotenv.config({ path: path.join(__dirname, "../.env") });
@@ -15,7 +14,6 @@ const IMPORT_CATEGORIES = true;
const IMPORT_PRODUCTS = true;
const IMPORT_ORDERS = true;
const IMPORT_PURCHASE_ORDERS = true;
const IMPORT_HISTORICAL_DATA = false;
// Add flag for incremental updates
const INCREMENTAL_UPDATE = process.env.INCREMENTAL_UPDATE !== 'false'; // Default to true unless explicitly set to false
@@ -80,8 +78,7 @@ async function main() {
IMPORT_CATEGORIES,
IMPORT_PRODUCTS,
IMPORT_ORDERS,
IMPORT_PURCHASE_ORDERS,
IMPORT_HISTORICAL_DATA
IMPORT_PURCHASE_ORDERS
].filter(Boolean).length;
try {
@@ -129,11 +126,10 @@ async function main() {
'categories_enabled', $2::boolean,
'products_enabled', $3::boolean,
'orders_enabled', $4::boolean,
'purchase_orders_enabled', $5::boolean,
'historical_data_enabled', $6::boolean
'purchase_orders_enabled', $5::boolean
)
) RETURNING id
`, [INCREMENTAL_UPDATE, IMPORT_CATEGORIES, IMPORT_PRODUCTS, IMPORT_ORDERS, IMPORT_PURCHASE_ORDERS, IMPORT_HISTORICAL_DATA]);
`, [INCREMENTAL_UPDATE, IMPORT_CATEGORIES, IMPORT_PRODUCTS, IMPORT_ORDERS, IMPORT_PURCHASE_ORDERS]);
importHistoryId = historyResult.rows[0].id;
} catch (error) {
console.error("Error creating import history record:", error);
@@ -150,16 +146,21 @@ async function main() {
categories: null,
products: null,
orders: null,
purchaseOrders: null,
historicalData: null
purchaseOrders: null
};
let totalRecordsAdded = 0;
let totalRecordsUpdated = 0;
let totalRecordsDeleted = 0; // Add tracking for deleted records
let totalRecordsSkipped = 0; // Track skipped/filtered records
const stepTimings = {};
// Run each import based on constants
if (IMPORT_CATEGORIES) {
const stepStart = Date.now();
results.categories = await importCategories(prodConnection, localConnection);
stepTimings.categories = Math.round((Date.now() - stepStart) / 1000);
if (isImportCancelled) throw new Error("Import cancelled");
completedSteps++;
console.log('Categories import result:', results.categories);
@@ -168,26 +169,37 @@ async function main() {
}
if (IMPORT_PRODUCTS) {
const stepStart = Date.now();
results.products = await importProducts(prodConnection, localConnection, INCREMENTAL_UPDATE);
stepTimings.products = Math.round((Date.now() - stepStart) / 1000);
if (isImportCancelled) throw new Error("Import cancelled");
completedSteps++;
console.log('Products import result:', results.products);
totalRecordsAdded += parseInt(results.products?.recordsAdded || 0);
totalRecordsUpdated += parseInt(results.products?.recordsUpdated || 0);
totalRecordsSkipped += parseInt(results.products?.skippedUnchanged || 0);
}
if (IMPORT_ORDERS) {
const stepStart = Date.now();
results.orders = await importOrders(prodConnection, localConnection, INCREMENTAL_UPDATE);
stepTimings.orders = Math.round((Date.now() - stepStart) / 1000);
if (isImportCancelled) throw new Error("Import cancelled");
completedSteps++;
console.log('Orders import result:', results.orders);
totalRecordsAdded += parseInt(results.orders?.recordsAdded || 0);
totalRecordsUpdated += parseInt(results.orders?.recordsUpdated || 0);
totalRecordsSkipped += parseInt(results.orders?.totalSkipped || 0);
}
if (IMPORT_PURCHASE_ORDERS) {
try {
const stepStart = Date.now();
results.purchaseOrders = await importPurchaseOrders(prodConnection, localConnection, INCREMENTAL_UPDATE);
stepTimings.purchaseOrders = Math.round((Date.now() - stepStart) / 1000);
if (isImportCancelled) throw new Error("Import cancelled");
completedSteps++;
console.log('Purchase orders import result:', results.purchaseOrders);
@@ -198,6 +210,7 @@ async function main() {
} else {
totalRecordsAdded += parseInt(results.purchaseOrders?.recordsAdded || 0);
totalRecordsUpdated += parseInt(results.purchaseOrders?.recordsUpdated || 0);
totalRecordsDeleted += parseInt(results.purchaseOrders?.recordsDeleted || 0);
}
} catch (error) {
console.error('Error during purchase orders import:', error);
@@ -211,32 +224,6 @@ async function main() {
}
}
if (IMPORT_HISTORICAL_DATA) {
try {
results.historicalData = await importHistoricalData(prodConnection, localConnection, INCREMENTAL_UPDATE);
if (isImportCancelled) throw new Error("Import cancelled");
completedSteps++;
console.log('Historical data import result:', results.historicalData);
// Handle potential error status
if (results.historicalData?.status === 'error') {
console.error('Historical data import had an error:', results.historicalData.error);
} else {
totalRecordsAdded += parseInt(results.historicalData?.recordsAdded || 0);
totalRecordsUpdated += parseInt(results.historicalData?.recordsUpdated || 0);
}
} catch (error) {
console.error('Error during historical data import:', error);
// Continue with other imports, don't fail the whole process
results.historicalData = {
status: 'error',
error: error.message,
recordsAdded: 0,
recordsUpdated: 0
};
}
}
const endTime = Date.now();
const totalElapsedSeconds = Math.round((endTime - startTime) / 1000);
@@ -254,14 +241,15 @@ async function main() {
'products_enabled', $5::boolean,
'orders_enabled', $6::boolean,
'purchase_orders_enabled', $7::boolean,
'historical_data_enabled', $8::boolean,
'categories_result', COALESCE($9::jsonb, 'null'::jsonb),
'products_result', COALESCE($10::jsonb, 'null'::jsonb),
'orders_result', COALESCE($11::jsonb, 'null'::jsonb),
'purchase_orders_result', COALESCE($12::jsonb, 'null'::jsonb),
'historical_data_result', COALESCE($13::jsonb, 'null'::jsonb)
'categories_result', COALESCE($8::jsonb, 'null'::jsonb),
'products_result', COALESCE($9::jsonb, 'null'::jsonb),
'orders_result', COALESCE($10::jsonb, 'null'::jsonb),
'purchase_orders_result', COALESCE($11::jsonb, 'null'::jsonb),
'total_deleted', $12::integer,
'total_skipped', $13::integer,
'step_timings', $14::jsonb
)
WHERE id = $14
WHERE id = $15
`, [
totalElapsedSeconds,
parseInt(totalRecordsAdded),
@@ -270,12 +258,13 @@ async function main() {
IMPORT_PRODUCTS,
IMPORT_ORDERS,
IMPORT_PURCHASE_ORDERS,
IMPORT_HISTORICAL_DATA,
JSON.stringify(results.categories),
JSON.stringify(results.products),
JSON.stringify(results.orders),
JSON.stringify(results.purchaseOrders),
JSON.stringify(results.historicalData),
totalRecordsDeleted,
totalRecordsSkipped,
JSON.stringify(stepTimings),
importHistoryId
]);

View File

@@ -92,6 +92,12 @@ async function importCategories(prodConnection, localConnection) {
description = EXCLUDED.description,
status = EXCLUDED.status,
updated_at = EXCLUDED.updated_at
WHERE -- Only update if at least one field has changed
categories.name IS DISTINCT FROM EXCLUDED.name OR
categories.type IS DISTINCT FROM EXCLUDED.type OR
categories.parent_id IS DISTINCT FROM EXCLUDED.parent_id OR
categories.description IS DISTINCT FROM EXCLUDED.description OR
categories.status IS DISTINCT FROM EXCLUDED.status
RETURNING
cat_id,
CASE
@@ -133,7 +139,7 @@ async function importCategories(prodConnection, localConnection) {
message: `Imported ${inserted} (updated ${updated}) categories of type ${type}`,
current: totalInserted + totalUpdated,
total: categories.length,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
elapsed: formatElapsedTime(startTime),
});
} catch (error) {
// Rollback to the savepoint for this type
@@ -161,7 +167,7 @@ async function importCategories(prodConnection, localConnection) {
operation: "Categories import completed",
current: totalInserted + totalUpdated,
total: totalInserted + totalUpdated,
duration: formatElapsedTime((Date.now() - startTime) / 1000),
duration: formatElapsedTime(startTime),
warnings: skippedCategories.length > 0 ? {
message: "Some categories were skipped due to missing parents",
skippedCategories

View File

@@ -221,8 +221,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
message: `Loading order items: ${processedCount} of ${totalOrderItems}`,
current: processedCount,
total: totalOrderItems,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, processedCount, totalOrderItems),
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalOrderItems),
rate: calculateRate(startTime, processedCount)
});
} catch (error) {
@@ -530,8 +530,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
message: `Loading order data: ${processedCount} of ${totalUniqueOrders}`,
current: processedCount,
total: totalUniqueOrders,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, processedCount, totalUniqueOrders),
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalUniqueOrders),
rate: calculateRate(startTime, processedCount)
});
}
@@ -681,6 +681,15 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
status = EXCLUDED.status,
canceled = EXCLUDED.canceled,
costeach = EXCLUDED.costeach
WHERE -- Only update if at least one key field has changed
orders.price IS DISTINCT FROM EXCLUDED.price OR
orders.quantity IS DISTINCT FROM EXCLUDED.quantity OR
orders.discount IS DISTINCT FROM EXCLUDED.discount OR
orders.tax IS DISTINCT FROM EXCLUDED.tax OR
orders.status IS DISTINCT FROM EXCLUDED.status OR
orders.canceled IS DISTINCT FROM EXCLUDED.canceled OR
orders.costeach IS DISTINCT FROM EXCLUDED.costeach OR
orders.date IS DISTINCT FROM EXCLUDED.date
RETURNING xmax = 0 as inserted
)
SELECT
@@ -704,7 +713,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
message: `Importing orders: ${cumulativeProcessedOrders} of ${totalUniqueOrders}`,
current: cumulativeProcessedOrders,
total: totalUniqueOrders,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, cumulativeProcessedOrders, totalUniqueOrders),
rate: calculateRate(startTime, cumulativeProcessedOrders)
});
@@ -751,8 +760,15 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
recordsUpdated: parseInt(recordsUpdated) || 0,
totalSkipped: skippedOrders.size || 0,
missingProducts: missingProducts.size || 0,
totalProcessed: orderItems.length, // Total order items in source
incrementalUpdate,
lastSyncTime
lastSyncTime,
details: {
uniqueOrdersProcessed: cumulativeProcessedOrders,
totalOrderItems: orderItems.length,
skippedDueToMissingProducts: skippedOrders.size,
missingProductIds: Array.from(missingProducts).slice(0, 100) // First 100 for debugging
}
};
} catch (error) {
console.error("Error during orders import:", error);

View File

@@ -576,8 +576,8 @@ async function materializeCalculations(prodConnection, localConnection, incremen
message: `Imported ${i + batch.length} of ${prodData.length} products`,
current: i + batch.length,
total: prodData.length,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, i + batch.length, prodData.length),
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, i + batch.length, prodData.length),
rate: calculateRate(startTime, i + batch.length)
});
}
@@ -587,6 +587,59 @@ async function materializeCalculations(prodConnection, localConnection, incremen
operation: "Products import",
message: "Finished materializing calculations"
});
// Add step to identify which products actually need updating
outputProgress({
status: "running",
operation: "Products import",
message: "Identifying changed products"
});
// Mark products that haven't changed as needs_update = false
await localConnection.query(`
UPDATE temp_products t
SET needs_update = FALSE
FROM products p
WHERE t.pid = p.pid
AND t.title IS NOT DISTINCT FROM p.title
AND t.description IS NOT DISTINCT FROM p.description
AND t.sku IS NOT DISTINCT FROM p.sku
AND t.stock_quantity = p.stock_quantity
AND t.price = p.price
AND t.regular_price = p.regular_price
AND t.cost_price IS NOT DISTINCT FROM p.cost_price
AND t.vendor IS NOT DISTINCT FROM p.vendor
AND t.brand IS NOT DISTINCT FROM p.brand
AND t.visible = p.visible
AND t.replenishable = p.replenishable
AND t.barcode IS NOT DISTINCT FROM p.barcode
AND t.updated_at IS NOT DISTINCT FROM p.updated_at
AND t.total_sold IS NOT DISTINCT FROM p.total_sold
-- Check key fields that are likely to change
-- We don't need to check every single field, just the important ones
`);
// Get count of products that need updating
const [countResult] = await localConnection.query(`
SELECT
COUNT(*) FILTER (WHERE needs_update = true) as update_count,
COUNT(*) FILTER (WHERE needs_update = false) as skip_count,
COUNT(*) as total_count
FROM temp_products
`);
outputProgress({
status: "running",
operation: "Products import",
message: `Found ${countResult.rows[0].update_count} products that need updating, ${countResult.rows[0].skip_count} unchanged`
});
// Return the total products processed
return {
totalProcessed: prodData.length,
needsUpdate: parseInt(countResult.rows[0].update_count),
skipped: parseInt(countResult.rows[0].skip_count)
};
}
async function importProducts(prodConnection, localConnection, incrementalUpdate = true) {
@@ -612,7 +665,7 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
await setupTemporaryTables(localConnection);
// Materialize calculations into temp table
await materializeCalculations(prodConnection, localConnection, incrementalUpdate, lastSyncTime, startTime);
const materializeResult = await materializeCalculations(prodConnection, localConnection, incrementalUpdate, lastSyncTime, startTime);
// Get the list of products that need updating
const [products] = await localConnection.query(`
@@ -847,8 +900,8 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
message: `Processing products: ${i + batch.length} of ${products.rows.length}`,
current: i + batch.length,
total: products.rows.length,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, i + batch.length, products.rows.length),
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, i + batch.length, products.rows.length),
rate: calculateRate(startTime, i + batch.length)
});
}
@@ -872,7 +925,10 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
recordsAdded,
recordsUpdated,
totalRecords: products.rows.length,
duration: formatElapsedTime(Date.now() - startTime)
totalProcessed: materializeResult.totalProcessed,
duration: formatElapsedTime(startTime),
needsUpdate: materializeResult.needsUpdate,
skippedUnchanged: materializeResult.skipped
};
} catch (error) {
// Rollback on error

View File

@@ -398,7 +398,7 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
message: `Processed ${offset} of ${totalPOs} purchase orders (${totalProcessed} line items)`,
current: offset,
total: totalPOs,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, offset, totalPOs),
rate: calculateRate(startTime, offset)
});
@@ -605,7 +605,7 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
message: `Processed ${offset} of ${totalReceivings} receivings (${totalProcessed} line items total)`,
current: offset,
total: totalReceivings,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, offset, totalReceivings),
rate: calculateRate(startTime, offset)
});
@@ -730,6 +730,13 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
date_created = EXCLUDED.date_created,
date_ordered = EXCLUDED.date_ordered,
updated = CURRENT_TIMESTAMP
WHERE -- Only update if at least one key field has changed
purchase_orders.ordered IS DISTINCT FROM EXCLUDED.ordered OR
purchase_orders.po_cost_price IS DISTINCT FROM EXCLUDED.po_cost_price OR
purchase_orders.status IS DISTINCT FROM EXCLUDED.status OR
purchase_orders.expected_date IS DISTINCT FROM EXCLUDED.expected_date OR
purchase_orders.date IS DISTINCT FROM EXCLUDED.date OR
purchase_orders.vendor IS DISTINCT FROM EXCLUDED.vendor
RETURNING (xmax = 0) as inserted
`);
@@ -806,6 +813,12 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
supplier_id = EXCLUDED.supplier_id,
status = EXCLUDED.status,
updated = CURRENT_TIMESTAMP
WHERE -- Only update if at least one key field has changed
receivings.qty_each IS DISTINCT FROM EXCLUDED.qty_each OR
receivings.cost_each IS DISTINCT FROM EXCLUDED.cost_each OR
receivings.status IS DISTINCT FROM EXCLUDED.status OR
receivings.received_date IS DISTINCT FROM EXCLUDED.received_date OR
receivings.received_by IS DISTINCT FROM EXCLUDED.received_by
RETURNING (xmax = 0) as inserted
`);

View File

@@ -95,7 +95,14 @@ BEGIN
profit_30d = EXCLUDED.profit_30d, cogs_30d = EXCLUDED.cogs_30d,
sales_365d = EXCLUDED.sales_365d, revenue_365d = EXCLUDED.revenue_365d,
lifetime_sales = EXCLUDED.lifetime_sales, lifetime_revenue = EXCLUDED.lifetime_revenue,
avg_margin_30d = EXCLUDED.avg_margin_30d;
avg_margin_30d = EXCLUDED.avg_margin_30d
WHERE -- Only update if at least one value has changed
brand_metrics.product_count IS DISTINCT FROM EXCLUDED.product_count OR
brand_metrics.active_product_count IS DISTINCT FROM EXCLUDED.active_product_count OR
brand_metrics.current_stock_units IS DISTINCT FROM EXCLUDED.current_stock_units OR
brand_metrics.sales_30d IS DISTINCT FROM EXCLUDED.sales_30d OR
brand_metrics.revenue_30d IS DISTINCT FROM EXCLUDED.revenue_30d OR
brand_metrics.lifetime_sales IS DISTINCT FROM EXCLUDED.lifetime_sales;
-- Update calculate_status
INSERT INTO public.calculate_status (module_name, last_calculation_timestamp)
@@ -104,3 +111,25 @@ BEGIN
RAISE NOTICE 'Finished % calculation. Duration: %', _module_name, clock_timestamp() - _start_time;
END $$;
-- Return metrics about the update operation for tracking
WITH update_stats AS (
SELECT
COUNT(*) as total_brands,
COUNT(*) FILTER (WHERE last_calculated >= NOW() - INTERVAL '5 minutes') as rows_processed,
SUM(product_count) as total_products,
SUM(active_product_count) as total_active_products,
SUM(sales_30d) as total_sales_30d,
SUM(revenue_30d) as total_revenue_30d,
AVG(avg_margin_30d) as overall_avg_margin_30d
FROM public.brand_metrics
)
SELECT
rows_processed,
total_brands,
total_products::int,
total_active_products::int,
total_sales_30d::int,
ROUND(total_revenue_30d, 2) as total_revenue_30d,
ROUND(overall_avg_margin_30d, 2) as overall_avg_margin_30d
FROM update_stats;

View File

@@ -238,7 +238,8 @@ BEGIN
category_type = EXCLUDED.category_type,
parent_id = EXCLUDED.parent_id,
last_calculated = EXCLUDED.last_calculated,
-- Update rolled-up metrics
-- ROLLED-UP METRICS (includes this category + all descendants)
product_count = EXCLUDED.product_count,
active_product_count = EXCLUDED.active_product_count,
replenishable_product_count = EXCLUDED.replenishable_product_count,
@@ -250,7 +251,8 @@ BEGIN
profit_30d = EXCLUDED.profit_30d, cogs_30d = EXCLUDED.cogs_30d,
sales_365d = EXCLUDED.sales_365d, revenue_365d = EXCLUDED.revenue_365d,
lifetime_sales = EXCLUDED.lifetime_sales, lifetime_revenue = EXCLUDED.lifetime_revenue,
-- Update direct metrics
-- DIRECT METRICS (only products directly in this category)
direct_product_count = EXCLUDED.direct_product_count,
direct_active_product_count = EXCLUDED.direct_active_product_count,
direct_replenishable_product_count = EXCLUDED.direct_replenishable_product_count,
@@ -262,9 +264,19 @@ BEGIN
direct_profit_30d = EXCLUDED.direct_profit_30d, direct_cogs_30d = EXCLUDED.direct_cogs_30d,
direct_sales_365d = EXCLUDED.direct_sales_365d, direct_revenue_365d = EXCLUDED.direct_revenue_365d,
direct_lifetime_sales = EXCLUDED.direct_lifetime_sales, direct_lifetime_revenue = EXCLUDED.direct_lifetime_revenue,
-- Update KPIs
-- Calculated KPIs
avg_margin_30d = EXCLUDED.avg_margin_30d,
stock_turn_30d = EXCLUDED.stock_turn_30d;
stock_turn_30d = EXCLUDED.stock_turn_30d
WHERE -- Only update if at least one value has changed
category_metrics.product_count IS DISTINCT FROM EXCLUDED.product_count OR
category_metrics.active_product_count IS DISTINCT FROM EXCLUDED.active_product_count OR
category_metrics.current_stock_units IS DISTINCT FROM EXCLUDED.current_stock_units OR
category_metrics.sales_30d IS DISTINCT FROM EXCLUDED.sales_30d OR
category_metrics.revenue_30d IS DISTINCT FROM EXCLUDED.revenue_30d OR
category_metrics.lifetime_sales IS DISTINCT FROM EXCLUDED.lifetime_sales OR
category_metrics.direct_product_count IS DISTINCT FROM EXCLUDED.direct_product_count OR
category_metrics.direct_sales_30d IS DISTINCT FROM EXCLUDED.direct_sales_30d;
-- Update calculate_status
INSERT INTO public.calculate_status (module_name, last_calculation_timestamp)
@@ -273,3 +285,25 @@ BEGIN
RAISE NOTICE 'Finished % calculation. Duration: %', _module_name, clock_timestamp() - _start_time;
END $$;
-- Return metrics about the update operation for tracking
WITH update_stats AS (
SELECT
COUNT(*) as total_categories,
COUNT(*) FILTER (WHERE last_calculated >= NOW() - INTERVAL '5 minutes') as rows_processed,
COUNT(*) FILTER (WHERE category_type = 11) as main_categories, -- 11 = category
COUNT(*) FILTER (WHERE category_type = 12) as subcategories, -- 12 = subcategory
SUM(product_count) as total_products,
SUM(active_product_count) as total_active_products,
SUM(current_stock_units) as total_stock_units
FROM public.category_metrics
)
SELECT
rows_processed,
total_categories,
main_categories,
subcategories,
total_products::int,
total_active_products::int,
total_stock_units::int
FROM update_stats;

View File

@@ -124,7 +124,15 @@ BEGIN
profit_30d = EXCLUDED.profit_30d, cogs_30d = EXCLUDED.cogs_30d,
sales_365d = EXCLUDED.sales_365d, revenue_365d = EXCLUDED.revenue_365d,
lifetime_sales = EXCLUDED.lifetime_sales, lifetime_revenue = EXCLUDED.lifetime_revenue,
avg_margin_30d = EXCLUDED.avg_margin_30d;
avg_margin_30d = EXCLUDED.avg_margin_30d
WHERE -- Only update if at least one value has changed
vendor_metrics.product_count IS DISTINCT FROM EXCLUDED.product_count OR
vendor_metrics.active_product_count IS DISTINCT FROM EXCLUDED.active_product_count OR
vendor_metrics.current_stock_units IS DISTINCT FROM EXCLUDED.current_stock_units OR
vendor_metrics.on_order_units IS DISTINCT FROM EXCLUDED.on_order_units OR
vendor_metrics.sales_30d IS DISTINCT FROM EXCLUDED.sales_30d OR
vendor_metrics.revenue_30d IS DISTINCT FROM EXCLUDED.revenue_30d OR
vendor_metrics.lifetime_sales IS DISTINCT FROM EXCLUDED.lifetime_sales;
-- Update calculate_status
INSERT INTO public.calculate_status (module_name, last_calculation_timestamp)
@@ -133,3 +141,23 @@ BEGIN
RAISE NOTICE 'Finished % calculation. Duration: %', _module_name, clock_timestamp() - _start_time;
END $$;
-- Return metrics about the update operation for tracking
WITH update_stats AS (
SELECT
COUNT(*) as total_vendors,
COUNT(*) FILTER (WHERE last_calculated >= NOW() - INTERVAL '5 minutes') as rows_processed,
SUM(product_count) as total_products,
SUM(active_product_count) as total_active_products,
SUM(po_count_365d) as total_pos_365d,
AVG(avg_lead_time_days) as overall_avg_lead_time
FROM public.vendor_metrics
)
SELECT
rows_processed,
total_vendors,
total_products::int,
total_active_products::int,
total_pos_365d::int,
ROUND(overall_avg_lead_time, 1) as overall_avg_lead_time
FROM update_stats;

View File

@@ -202,3 +202,14 @@ BEGIN
RAISE NOTICE 'Finished % processing for multiple dates. Duration: %', _module_name, clock_timestamp() - _start_time;
END $$;
-- Return the total records processed for tracking
SELECT
COUNT(*) as rows_processed,
COUNT(DISTINCT snapshot_date) as days_processed,
MIN(snapshot_date) as earliest_date,
MAX(snapshot_date) as latest_date,
SUM(units_sold) as total_units_sold,
SUM(units_received) as total_units_received
FROM public.daily_product_snapshots
WHERE calculation_timestamp >= (NOW() - INTERVAL '5 minutes'); -- Recent updates only

View File

@@ -115,3 +115,25 @@ BEGIN
RAISE NOTICE 'Finished % module. Duration: %', _module_name, clock_timestamp() - _start_time;
END $$;
-- Return metrics about the update operation for tracking
WITH update_stats AS (
SELECT
COUNT(*) as total_products,
COUNT(*) FILTER (WHERE last_calculated >= NOW() - INTERVAL '5 minutes') as rows_processed,
COUNT(*) FILTER (WHERE abc_class = 'A') as abc_a_count,
COUNT(*) FILTER (WHERE abc_class = 'B') as abc_b_count,
COUNT(*) FILTER (WHERE abc_class = 'C') as abc_c_count,
COUNT(*) FILTER (WHERE avg_lead_time_days IS NOT NULL) as products_with_lead_time,
AVG(avg_lead_time_days) as overall_avg_lead_time
FROM public.product_metrics
)
SELECT
rows_processed,
total_products,
abc_a_count,
abc_b_count,
abc_c_count,
products_with_lead_time,
ROUND(overall_avg_lead_time, 1) as overall_avg_lead_time
FROM update_stats;

View File

@@ -735,6 +735,22 @@ BEGIN
overstocked_units = EXCLUDED.overstocked_units, overstocked_cost = EXCLUDED.overstocked_cost, overstocked_retail = EXCLUDED.overstocked_retail, is_old_stock = EXCLUDED.is_old_stock,
yesterday_sales = EXCLUDED.yesterday_sales,
status = EXCLUDED.status
WHERE -- Only update if at least one key metric has changed
product_metrics.current_stock IS DISTINCT FROM EXCLUDED.current_stock OR
product_metrics.current_price IS DISTINCT FROM EXCLUDED.current_price OR
product_metrics.current_cost_price IS DISTINCT FROM EXCLUDED.current_cost_price OR
product_metrics.on_order_qty IS DISTINCT FROM EXCLUDED.on_order_qty OR
product_metrics.sales_7d IS DISTINCT FROM EXCLUDED.sales_7d OR
product_metrics.sales_30d IS DISTINCT FROM EXCLUDED.sales_30d OR
product_metrics.revenue_30d IS DISTINCT FROM EXCLUDED.revenue_30d OR
product_metrics.status IS DISTINCT FROM EXCLUDED.status OR
product_metrics.replenishment_units IS DISTINCT FROM EXCLUDED.replenishment_units OR
product_metrics.stock_cover_in_days IS DISTINCT FROM EXCLUDED.stock_cover_in_days OR
product_metrics.yesterday_sales IS DISTINCT FROM EXCLUDED.yesterday_sales OR
-- Check a few other important fields that might change
product_metrics.date_last_sold IS DISTINCT FROM EXCLUDED.date_last_sold OR
product_metrics.earliest_expected_date IS DISTINCT FROM EXCLUDED.earliest_expected_date OR
product_metrics.lifetime_sales IS DISTINCT FROM EXCLUDED.lifetime_sales
;
-- Update the status table with the timestamp from the START of this run
@@ -745,3 +761,28 @@ BEGIN
RAISE NOTICE 'Finished % module. Duration: %', _module_name, clock_timestamp() - _start_time;
END $$;
-- Return metrics about the update operation
WITH update_stats AS (
SELECT
COUNT(*) as total_products,
COUNT(*) FILTER (WHERE last_calculated >= NOW() - INTERVAL '5 minutes') as rows_processed,
COUNT(*) FILTER (WHERE status = 'Critical') as critical_count,
COUNT(*) FILTER (WHERE status = 'Reorder Soon') as reorder_soon_count,
COUNT(*) FILTER (WHERE status = 'Healthy') as healthy_count,
COUNT(*) FILTER (WHERE status = 'Overstock') as overstock_count,
COUNT(*) FILTER (WHERE status = 'At Risk') as at_risk_count,
COUNT(*) FILTER (WHERE status = 'New') as new_count
FROM public.product_metrics
)
SELECT
rows_processed,
total_products,
critical_count,
reorder_soon_count,
healthy_count,
overstock_count,
at_risk_count,
new_count,
ROUND((rows_processed::numeric / NULLIF(total_products, 0)) * 100, 2) as update_percentage
FROM update_stats;

View File

@@ -2,13 +2,23 @@ const fs = require('fs');
const path = require('path');
// Helper function to format elapsed time
function formatElapsedTime(elapsed) {
// If elapsed is a timestamp, convert to elapsed milliseconds
if (elapsed instanceof Date || elapsed > 1000000000000) {
elapsed = Date.now() - elapsed;
function formatElapsedTime(startTime) {
let elapsed;
// If startTime is a timestamp (number representing milliseconds since epoch)
if (typeof startTime === 'number') {
// Check if it's a timestamp (will be a large number like 1700000000000)
if (startTime > 1000000000) { // timestamps are in milliseconds since 1970
elapsed = Date.now() - startTime;
} else {
// Assume it's already elapsed milliseconds
elapsed = startTime;
}
} else if (startTime instanceof Date) {
elapsed = Date.now() - startTime.getTime();
} else {
// If elapsed is in seconds, convert to milliseconds
elapsed = elapsed * 1000;
// Default to 0 if invalid input
elapsed = 0;
}
const seconds = Math.floor(elapsed / 1000);
@@ -16,7 +26,7 @@ function formatElapsedTime(elapsed) {
const hours = Math.floor(minutes / 60);
if (hours > 0) {
return `${hours}h ${minutes % 60}m`;
return `${hours}h ${minutes % 60}m ${seconds % 60}s`;
} else if (minutes > 0) {
return `${minutes}m ${seconds % 60}s`;
} else {
@@ -26,16 +36,31 @@ function formatElapsedTime(elapsed) {
// Helper function to estimate remaining time
function estimateRemaining(startTime, current, total) {
if (current === 0) return null;
// Handle edge cases
if (!current || current === 0 || !total || total === 0 || current >= total) {
return null;
}
// Calculate elapsed time in milliseconds
const elapsed = Date.now() - startTime;
if (elapsed <= 0) return null;
// Calculate rate (items per millisecond)
const rate = current / elapsed;
if (rate <= 0) return null;
// Calculate remaining time in milliseconds
const remaining = (total - current) / rate;
const minutes = Math.floor(remaining / 60000);
const seconds = Math.floor((remaining % 60000) / 1000);
// Convert to readable format
const seconds = Math.floor(remaining / 1000);
const minutes = Math.floor(seconds / 60);
const hours = Math.floor(minutes / 60);
if (minutes > 0) {
return `${minutes}m ${seconds}s`;
if (hours > 0) {
return `${hours}h ${minutes % 60}m`;
} else if (minutes > 0) {
return `${minutes}m ${seconds % 60}s`;
} else {
return `${seconds}s`;
}

View File

@@ -286,7 +286,21 @@ router.post('/full-reset', async (req, res) => {
router.get('/history/import', async (req, res) => {
try {
const pool = req.app.locals.pool;
const { rows } = await pool.query(`
// First check which columns exist
const { rows: columns } = await pool.query(`
SELECT column_name
FROM information_schema.columns
WHERE table_name = 'import_history'
AND column_name IN ('records_deleted', 'records_skipped', 'total_processed')
`);
const hasDeletedColumn = columns.some(col => col.column_name === 'records_deleted');
const hasSkippedColumn = columns.some(col => col.column_name === 'records_skipped');
const hasTotalProcessedColumn = columns.some(col => col.column_name === 'total_processed');
// Build query dynamically based on available columns
const query = `
SELECT
id,
start_time,
@@ -294,11 +308,19 @@ router.get('/history/import', async (req, res) => {
status,
error_message,
records_added::integer,
records_updated::integer
records_updated::integer,
${hasDeletedColumn ? 'records_deleted::integer,' : '0 as records_deleted,'}
${hasSkippedColumn ? 'records_skipped::integer,' : '0 as records_skipped,'}
${hasTotalProcessedColumn ? 'total_processed::integer,' : '0 as total_processed,'}
is_incremental,
additional_info,
EXTRACT(EPOCH FROM (COALESCE(end_time, NOW()) - start_time)) / 60 as duration_minutes
FROM import_history
ORDER BY start_time DESC
LIMIT 20
`);
`;
const { rows } = await pool.query(query);
res.json(rows || []);
} catch (error) {
console.error('Error fetching import history:', error);
@@ -315,7 +337,8 @@ router.get('/history/calculate', async (req, res) => {
id,
start_time,
end_time,
duration_minutes,
EXTRACT(EPOCH FROM (COALESCE(end_time, NOW()) - start_time)) / 60 as duration_minutes,
duration_seconds,
status,
error_message,
total_products,

View File

@@ -44,17 +44,44 @@ interface HistoryRecord {
interface ImportHistoryRecord extends HistoryRecord {
records_added: number;
records_updated: number;
records_deleted?: number;
records_skipped?: number;
is_incremental?: boolean;
total_processed?: number;
additional_info?: {
step_timings?: Record<string, number>;
details?: {
categories?: { recordsAdded: number; recordsUpdated: number; skippedCategories?: number };
products?: { totalProcessed: number; needsUpdate: number; skippedUnchanged?: number };
orders?: { totalProcessed: number; totalSkipped: number; missingProducts: number };
purchaseOrders?: { recordsAdded: number; recordsUpdated: number; recordsDeleted: number; skippedProducts: number };
};
[key: string]: any;
};
}
interface CalculateHistoryRecord extends HistoryRecord {
total_products: number;
total_orders: number;
total_purchase_orders: number;
processed_products: number;
processed_orders: number;
processed_purchase_orders: number;
total_products?: number;
total_orders?: number;
total_purchase_orders?: number;
processed_products?: number;
processed_orders?: number;
processed_purchase_orders?: number;
duration_minutes?: number;
duration_seconds?: number;
additional_info?: {
type?: string;
steps?: string[];
completed_steps?: Array<{
name: string;
duration: number;
status: string;
rowsAffected?: number;
}>;
step_timings?: Record<string, number>;
step_row_counts?: Record<string, number>;
[key: string]: any;
};
}
interface ModuleStatus {
@@ -369,6 +396,82 @@ export function DataManagement() {
const formatJsonData = (data: Record<string, any>) => {
if (!data) return null;
// Special handling for completed_steps
if (data.completed_steps && Array.isArray(data.completed_steps)) {
return (
<div className="space-y-2 mt-2">
<div className="text-sm font-semibold text-gray-700">Completed Steps:</div>
<div className="space-y-1 bg-gray-50 p-2 rounded">
{data.completed_steps.map((step: any, idx: number) => (
<div key={idx} className="flex justify-between text-sm">
<span className="font-medium">{step.name}</span>
<div className="flex gap-4 text-gray-600">
<span>{step.duration}s</span>
{step.rowsAffected !== undefined && (
<span>{formatNumber(step.rowsAffected)} rows</span>
)}
<span className={step.status === 'completed' ? 'text-green-600' : 'text-red-600'}>
{step.status}
</span>
</div>
</div>
))}
</div>
{/* Show other data if present */}
{Object.keys(data).filter(k => k !== 'completed_steps').length > 0 && (
<div className="mt-2">
{formatJsonDataSimple(Object.fromEntries(
Object.entries(data).filter(([k]) => k !== 'completed_steps')
))}
</div>
)}
</div>
);
}
// Special handling for import details
if (data.details) {
return (
<div className="space-y-2 mt-2">
<div className="text-sm font-semibold text-gray-700">Import Details:</div>
<div className="space-y-2 bg-gray-50 p-2 rounded">
{Object.entries(data.details).map(([table, stats]: [string, any]) => (
<div key={table} className="border-b last:border-0 pb-2 last:pb-0">
<div className="font-medium text-sm capitalize mb-1">{table}:</div>
<div className="grid grid-cols-2 gap-x-4 gap-y-1 text-xs">
{Object.entries(stats).map(([key, value]) => (
<div key={key} className="flex justify-between">
<span className="text-gray-600">{key}:</span>
<span className="font-mono">{typeof value === 'number' ? formatNumber(value) : String(value)}</span>
</div>
))}
</div>
</div>
))}
</div>
{/* Show step timings if present */}
{data.step_timings && (
<div className="mt-2">
<div className="text-sm font-semibold text-gray-700">Step Timings:</div>
<div className="space-y-1 bg-gray-50 p-2 rounded text-sm">
{Object.entries(data.step_timings).map(([step, duration]) => (
<div key={step} className="flex justify-between">
<span className="text-gray-600">{step}:</span>
<span className="font-mono">{String(duration)}s</span>
</div>
))}
</div>
</div>
)}
</div>
);
}
// Default simple format
return formatJsonDataSimple(data);
};
const formatJsonDataSimple = (data: Record<string, any>) => {
// Find the longest key length
const maxKeyLength = Object.keys(data).reduce(
(max, key) => Math.max(max, key.length),
@@ -384,13 +487,13 @@ export function DataManagement() {
style={{ width: `${maxKeyLength + 2}ch` }}
>
{key}:
</span>
</span>
<span className="break-all">
{typeof value === "object"
? JSON.stringify(value)
: value?.toString()}
</span>
</div>
</span>
</div>
))}
</div>
);
@@ -1133,12 +1236,33 @@ export function DataManagement() {
: "N/A"}
</span>
</div>
<div className="flex justify-between text-sm">
<span className="text-gray-600">Records:</span>
<span>
{record.records_added} added,{" "}
{record.records_updated} updated
</span>
<div className="grid grid-cols-2 gap-2 text-sm">
<div className="flex justify-between">
<span className="text-gray-600">Added:</span>
<span className="text-green-600 font-medium">{formatNumber(record.records_added)}</span>
</div>
<div className="flex justify-between">
<span className="text-gray-600">Updated:</span>
<span className="text-blue-600 font-medium">{formatNumber(record.records_updated)}</span>
</div>
{record.records_deleted !== undefined && (
<div className="flex justify-between">
<span className="text-gray-600">Deleted:</span>
<span className="text-red-600 font-medium">{formatNumber(record.records_deleted)}</span>
</div>
)}
{record.records_skipped !== undefined && (
<div className="flex justify-between">
<span className="text-gray-600">Skipped:</span>
<span className="text-yellow-600 font-medium">{formatNumber(record.records_skipped)}</span>
</div>
)}
{record.total_processed !== undefined && (
<div className="flex justify-between col-span-2">
<span className="text-gray-600">Total Processed:</span>
<span className="font-medium">{formatNumber(record.total_processed)}</span>
</div>
)}
</div>
{record.error_message && (
<div className="text-sm text-red-600 mt-2">