Add calculate history tracking

This commit is contained in:
2025-02-02 20:41:23 -05:00
parent e62c6ac8ee
commit b926aba9ff
2 changed files with 201 additions and 50 deletions

View File

@@ -171,6 +171,32 @@ ORDER BY
c.name,
st.vendor;
-- Update calculate_history table to track all record types
ALTER TABLE calculate_history
ADD COLUMN total_orders INT DEFAULT 0 AFTER total_products,
ADD COLUMN total_purchase_orders INT DEFAULT 0 AFTER total_orders,
CHANGE COLUMN products_processed processed_products INT DEFAULT 0,
ADD COLUMN processed_orders INT DEFAULT 0 AFTER processed_products,
ADD COLUMN processed_purchase_orders INT DEFAULT 0 AFTER processed_orders;
CREATE TABLE IF NOT EXISTS calculate_history (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
start_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
end_time TIMESTAMP NULL,
duration_seconds INT,
duration_minutes DECIMAL(10,2) GENERATED ALWAYS AS (duration_seconds / 60.0) STORED,
total_products INT DEFAULT 0,
total_orders INT DEFAULT 0,
total_purchase_orders INT DEFAULT 0,
processed_products INT DEFAULT 0,
processed_orders INT DEFAULT 0,
processed_purchase_orders INT DEFAULT 0,
status ENUM('running', 'completed', 'failed', 'cancelled') DEFAULT 'running',
error_message TEXT,
additional_info JSON,
INDEX idx_status_time (status, start_time)
);
CREATE TABLE IF NOT EXISTS sync_status (
table_name VARCHAR(50) PRIMARY KEY,
last_sync_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,

View File

@@ -83,10 +83,78 @@ process.on('SIGTERM', cancelCalculation);
async function calculateMetrics() {
let connection;
const startTime = Date.now();
let processedCount = 0;
let processedProducts = 0;
let processedOrders = 0;
let processedPurchaseOrders = 0;
let totalProducts = 0;
let totalOrders = 0;
let totalPurchaseOrders = 0;
let calculateHistoryId;
try {
// Clean up any previously running calculations
connection = await getConnection();
await connection.query(`
UPDATE calculate_history
SET
status = 'cancelled',
end_time = NOW(),
duration_seconds = TIMESTAMPDIFF(SECOND, start_time, NOW()),
error_message = 'Previous calculation was not completed properly'
WHERE status = 'running'
`);
// Get counts from all relevant tables
const [[productCount], [orderCount], [poCount]] = await Promise.all([
connection.query('SELECT COUNT(*) as total FROM products'),
connection.query('SELECT COUNT(*) as total FROM orders'),
connection.query('SELECT COUNT(*) as total FROM purchase_orders')
]);
totalProducts = productCount.total;
totalOrders = orderCount.total;
totalPurchaseOrders = poCount.total;
// Create history record for this calculation
const [historyResult] = await connection.query(`
INSERT INTO calculate_history (
start_time,
status,
total_products,
total_orders,
total_purchase_orders,
additional_info
) VALUES (
NOW(),
'running',
?,
?,
?,
JSON_OBJECT(
'skip_product_metrics', ?,
'skip_time_aggregates', ?,
'skip_financial_metrics', ?,
'skip_vendor_metrics', ?,
'skip_category_metrics', ?,
'skip_brand_metrics', ?,
'skip_sales_forecasts', ?
)
)
`, [
totalProducts,
totalOrders,
totalPurchaseOrders,
SKIP_PRODUCT_METRICS,
SKIP_TIME_AGGREGATES,
SKIP_FINANCIAL_METRICS,
SKIP_VENDOR_METRICS,
SKIP_CATEGORY_METRICS,
SKIP_BRAND_METRICS,
SKIP_SALES_FORECASTS
]);
calculateHistoryId = historyResult.insertId;
connection.release();
// Add debug logging for the progress functions
console.log('Debug - Progress functions:', {
formatElapsedTime: typeof global.formatElapsedTime,
@@ -123,27 +191,37 @@ async function calculateMetrics() {
}
});
// Get total number of products
const [countResult] = await connection.query('SELECT COUNT(*) as total FROM products')
.catch(err => {
global.logError(err, 'Failed to count products');
throw err;
});
totalProducts = countResult[0].total;
// Update progress periodically
const updateProgress = async (products = null, orders = null, purchaseOrders = null) => {
if (products !== null) processedProducts = products;
if (orders !== null) processedOrders = orders;
if (purchaseOrders !== null) processedPurchaseOrders = purchaseOrders;
await connection.query(`
UPDATE calculate_history
SET
processed_products = ?,
processed_orders = ?,
processed_purchase_orders = ?
WHERE id = ?
`, [processedProducts, processedOrders, processedPurchaseOrders, calculateHistoryId]);
};
if (!SKIP_PRODUCT_METRICS) {
processedCount = await calculateProductMetrics(startTime, totalProducts);
processedProducts = await calculateProductMetrics(startTime, totalProducts);
await updateProgress(processedProducts);
} else {
console.log('Skipping product metrics calculation...');
processedCount = Math.floor(totalProducts * 0.6);
processedProducts = Math.floor(totalProducts * 0.6);
await updateProgress(processedProducts);
global.outputProgress({
status: 'running',
operation: 'Skipping product metrics calculation',
current: processedCount,
current: processedProducts,
total: totalProducts,
elapsed: global.formatElapsedTime(startTime),
remaining: global.estimateRemaining(startTime, processedCount, totalProducts),
rate: global.calculateRate(startTime, processedCount),
remaining: global.estimateRemaining(startTime, processedProducts, totalProducts),
rate: global.calculateRate(startTime, processedProducts),
percentage: '60',
timing: {
start_time: new Date(startTime).toISOString(),
@@ -155,42 +233,48 @@ async function calculateMetrics() {
// Calculate time-based aggregates
if (!SKIP_TIME_AGGREGATES) {
processedCount = await calculateTimeAggregates(startTime, totalProducts, processedCount);
processedProducts = await calculateTimeAggregates(startTime, totalProducts, processedProducts);
await updateProgress(processedProducts);
} else {
console.log('Skipping time aggregates calculation');
}
// Calculate financial metrics
if (!SKIP_FINANCIAL_METRICS) {
processedCount = await calculateFinancialMetrics(startTime, totalProducts, processedCount);
processedProducts = await calculateFinancialMetrics(startTime, totalProducts, processedProducts);
await updateProgress(processedProducts);
} else {
console.log('Skipping financial metrics calculation');
}
// Calculate vendor metrics
if (!SKIP_VENDOR_METRICS) {
processedCount = await calculateVendorMetrics(startTime, totalProducts, processedCount);
processedProducts = await calculateVendorMetrics(startTime, totalProducts, processedProducts);
await updateProgress(processedProducts);
} else {
console.log('Skipping vendor metrics calculation');
}
// Calculate category metrics
if (!SKIP_CATEGORY_METRICS) {
processedCount = await calculateCategoryMetrics(startTime, totalProducts, processedCount);
processedProducts = await calculateCategoryMetrics(startTime, totalProducts, processedProducts);
await updateProgress(processedProducts);
} else {
console.log('Skipping category metrics calculation');
}
// Calculate brand metrics
if (!SKIP_BRAND_METRICS) {
processedCount = await calculateBrandMetrics(startTime, totalProducts, processedCount);
processedProducts = await calculateBrandMetrics(startTime, totalProducts, processedProducts);
await updateProgress(processedProducts);
} else {
console.log('Skipping brand metrics calculation');
}
// Calculate sales forecasts
if (!SKIP_SALES_FORECASTS) {
processedCount = await calculateSalesForecasts(startTime, totalProducts, processedCount);
processedProducts = await calculateSalesForecasts(startTime, totalProducts, processedProducts);
await updateProgress(processedProducts);
} else {
console.log('Skipping sales forecasts calculation');
}
@@ -199,12 +283,12 @@ async function calculateMetrics() {
outputProgress({
status: 'running',
operation: 'Starting ABC classification',
current: processedCount,
current: processedProducts,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
remaining: estimateRemaining(startTime, processedProducts, totalProducts),
rate: calculateRate(startTime, processedProducts),
percentage: ((processedProducts / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
@@ -212,7 +296,7 @@ async function calculateMetrics() {
}
});
if (isCancelled) return processedCount;
if (isCancelled) return processedProducts;
const [abcConfig] = await connection.query('SELECT a_threshold, b_threshold FROM abc_classification_config WHERE id = 1');
const abcThresholds = abcConfig[0] || { a_threshold: 20, b_threshold: 50 };
@@ -233,12 +317,12 @@ async function calculateMetrics() {
outputProgress({
status: 'running',
operation: 'Creating revenue rankings',
current: processedCount,
current: processedProducts,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
remaining: estimateRemaining(startTime, processedProducts, totalProducts),
rate: calculateRate(startTime, processedProducts),
percentage: ((processedProducts / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
@@ -246,7 +330,7 @@ async function calculateMetrics() {
}
});
if (isCancelled) return processedCount;
if (isCancelled) return processedProducts;
await connection.query(`
INSERT INTO temp_revenue_ranks
@@ -272,12 +356,12 @@ async function calculateMetrics() {
outputProgress({
status: 'running',
operation: 'Updating ABC classifications',
current: processedCount,
current: processedProducts,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
remaining: estimateRemaining(startTime, processedProducts, totalProducts),
rate: calculateRate(startTime, processedProducts),
percentage: ((processedProducts / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
@@ -285,14 +369,14 @@ async function calculateMetrics() {
}
});
if (isCancelled) return processedCount;
if (isCancelled) return processedProducts;
// Process updates in batches
let abcProcessedCount = 0;
let abcProcessedProducts = 0;
const batchSize = 5000;
while (true) {
if (isCancelled) return processedCount;
if (isCancelled) return processedProducts;
// First get a batch of PIDs that need updating
const [pids] = await connection.query(`
@@ -333,18 +417,18 @@ async function calculateMetrics() {
max_rank, abcThresholds.b_threshold,
pids.map(row => row.pid)]);
abcProcessedCount += result.affectedRows;
processedCount = Math.floor(totalProducts * (0.99 + (abcProcessedCount / totalCount) * 0.01));
abcProcessedProducts += result.affectedRows;
processedProducts = Math.floor(totalProducts * (0.99 + (abcProcessedProducts / totalCount) * 0.01));
outputProgress({
status: 'running',
operation: 'ABC classification progress',
current: processedCount,
current: processedProducts,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
remaining: estimateRemaining(startTime, processedProducts, totalProducts),
rate: calculateRate(startTime, processedProducts),
percentage: ((processedProducts / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
@@ -359,6 +443,22 @@ async function calculateMetrics() {
// Clean up
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_revenue_ranks');
const endTime = Date.now();
const totalElapsedSeconds = Math.round((endTime - startTime) / 1000);
// Update history with completion
await connection.query(`
UPDATE calculate_history
SET
end_time = NOW(),
duration_seconds = ?,
processed_products = ?,
processed_orders = ?,
processed_purchase_orders = ?,
status = 'completed'
WHERE id = ?
`, [totalElapsedSeconds, processedProducts, processedOrders, processedPurchaseOrders, calculateHistoryId]);
// Final success message
outputProgress({
status: 'complete',
@@ -380,16 +480,41 @@ async function calculateMetrics() {
global.clearProgress();
} catch (error) {
const endTime = Date.now();
const totalElapsedSeconds = Math.round((endTime - startTime) / 1000);
// Update history with error
await connection.query(`
UPDATE calculate_history
SET
end_time = NOW(),
duration_seconds = ?,
processed_products = ?,
processed_orders = ?,
processed_purchase_orders = ?,
status = ?,
error_message = ?
WHERE id = ?
`, [
totalElapsedSeconds,
processedProducts,
processedOrders,
processedPurchaseOrders,
isCancelled ? 'cancelled' : 'failed',
error.message,
calculateHistoryId
]);
if (isCancelled) {
global.outputProgress({
status: 'cancelled',
operation: 'Calculation cancelled',
current: processedCount,
current: processedProducts,
total: totalProducts || 0,
elapsed: global.formatElapsedTime(startTime),
remaining: null,
rate: global.calculateRate(startTime, processedCount),
percentage: ((processedCount / (totalProducts || 1)) * 100).toFixed(1),
rate: global.calculateRate(startTime, processedProducts),
percentage: ((processedProducts / (totalProducts || 1)) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
@@ -400,12 +525,12 @@ async function calculateMetrics() {
global.outputProgress({
status: 'error',
operation: 'Error: ' + error.message,
current: processedCount,
current: processedProducts,
total: totalProducts || 0,
elapsed: global.formatElapsedTime(startTime),
remaining: null,
rate: global.calculateRate(startTime, processedCount),
percentage: ((processedCount / (totalProducts || 1)) * 100).toFixed(1),
rate: global.calculateRate(startTime, processedProducts),
percentage: ((processedProducts / (totalProducts || 1)) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),