Enhance metrics calculation scripts with improved progress tracking and cancellation support

This commit is contained in:
2025-01-28 20:54:05 -05:00
parent a1e3803ca3
commit 9c34e24909
12 changed files with 915 additions and 327 deletions

View File

@@ -186,6 +186,19 @@ async function calculateMetrics() {
} }
// Calculate ABC classification // Calculate ABC classification
outputProgress({
status: 'running',
operation: 'Starting ABC classification',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
if (isCancelled) return processedCount;
const [abcConfig] = await connection.query('SELECT a_threshold, b_threshold FROM abc_classification_config WHERE id = 1'); const [abcConfig] = await connection.query('SELECT a_threshold, b_threshold FROM abc_classification_config WHERE id = 1');
const abcThresholds = abcConfig[0] || { a_threshold: 20, b_threshold: 50 }; const abcThresholds = abcConfig[0] || { a_threshold: 20, b_threshold: 50 };
@@ -202,6 +215,19 @@ async function calculateMetrics() {
) ENGINE=MEMORY ) ENGINE=MEMORY
`); `);
outputProgress({
status: 'running',
operation: 'Creating revenue rankings',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
if (isCancelled) return processedCount;
await connection.query(` await connection.query(`
INSERT INTO temp_revenue_ranks INSERT INTO temp_revenue_ranks
SELECT SELECT
@@ -222,11 +248,26 @@ async function calculateMetrics() {
const [rankingCount] = await connection.query('SELECT MAX(rank_num) as total_count FROM temp_revenue_ranks'); const [rankingCount] = await connection.query('SELECT MAX(rank_num) as total_count FROM temp_revenue_ranks');
const totalCount = rankingCount[0].total_count || 1; const totalCount = rankingCount[0].total_count || 1;
outputProgress({
status: 'running',
operation: 'Updating ABC classifications',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
if (isCancelled) return processedCount;
// Process updates in batches // Process updates in batches
let abcProcessedCount = 0; let abcProcessedCount = 0;
const batchSize = 5000; const batchSize = 5000;
while (true) { while (true) {
if (isCancelled) return processedCount;
// First get a batch of PIDs that need updating // First get a batch of PIDs that need updating
const [pids] = await connection.query(` const [pids] = await connection.query(`
SELECT pm.pid SELECT pm.pid
@@ -267,6 +308,18 @@ async function calculateMetrics() {
pids.map(row => row.pid)]); pids.map(row => row.pid)]);
abcProcessedCount += result.affectedRows; abcProcessedCount += result.affectedRows;
processedCount = Math.floor(totalProducts * (0.99 + (abcProcessedCount / totalCount) * 0.01));
outputProgress({
status: 'running',
operation: 'ABC classification progress',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
// Small delay between batches to allow other transactions // Small delay between batches to allow other transactions
await new Promise(resolve => setTimeout(resolve, 100)); await new Promise(resolve => setTimeout(resolve, 100));
@@ -276,14 +329,14 @@ async function calculateMetrics() {
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_revenue_ranks'); await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_revenue_ranks');
// Final success message // Final success message
global.outputProgress({ outputProgress({
status: 'complete', status: 'complete',
operation: 'Metrics calculation complete', operation: 'Metrics calculation complete',
current: totalProducts, current: totalProducts,
total: totalProducts, total: totalProducts,
elapsed: global.formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: '0s', remaining: '0s',
rate: global.calculateRate(startTime, totalProducts), rate: calculateRate(startTime, totalProducts),
percentage: '100' percentage: '100'
}); });

View File

@@ -3,6 +3,7 @@ const path = require('path');
const csv = require('csv-parse'); const csv = require('csv-parse');
const mysql = require('mysql2/promise'); const mysql = require('mysql2/promise');
const dotenv = require('dotenv'); const dotenv = require('dotenv');
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('./metrics/utils/progress');
// Get test limits from environment variables // Get test limits from environment variables
const PRODUCTS_TEST_LIMIT = parseInt(process.env.PRODUCTS_TEST_LIMIT || '0'); const PRODUCTS_TEST_LIMIT = parseInt(process.env.PRODUCTS_TEST_LIMIT || '0');
@@ -106,20 +107,19 @@ async function countRows(filePath) {
} }
// Helper function to update progress with time estimate // Helper function to update progress with time estimate
function updateProgress(current, total, operation, startTime) { function updateProgress(current, total, operation, startTime, added = 0, updated = 0, skipped = 0) {
const elapsed = (Date.now() - startTime) / 1000;
const rate = current / elapsed; // rows per second
const remaining = (total - current) / rate;
outputProgress({ outputProgress({
status: 'running', status: 'running',
operation, operation,
current, current,
total, total,
rate, rate: calculateRate(startTime, current),
elapsed: formatDuration(elapsed), elapsed: formatElapsedTime(startTime),
remaining: formatDuration(remaining), remaining: estimateRemaining(startTime, current, total),
percentage: ((current / total) * 100).toFixed(1) percentage: ((current / total) * 100).toFixed(1),
added,
updated,
skipped
}); });
} }
@@ -474,7 +474,7 @@ async function importProducts(pool, filePath) {
// Update progress every 100ms to avoid console flooding // Update progress every 100ms to avoid console flooding
const now = Date.now(); const now = Date.now();
if (now - lastUpdate > 100) { if (now - lastUpdate > 100) {
updateProgress(rowCount, totalRows, 'Products import', startTime); updateProgress(rowCount, totalRows, 'Products import', startTime, added, updated, 0);
lastUpdate = now; lastUpdate = now;
} }
@@ -678,7 +678,7 @@ async function importOrders(pool, filePath) {
// Update progress every 100ms // Update progress every 100ms
const now = Date.now(); const now = Date.now();
if (now - lastUpdate > 100) { if (now - lastUpdate > 100) {
updateProgress(rowCount, totalRows, 'Orders import', startTime); updateProgress(rowCount, totalRows, 'Orders import', startTime, added, updated, skipped);
lastUpdate = now; lastUpdate = now;
} }
@@ -845,7 +845,7 @@ async function importPurchaseOrders(pool, filePath) {
// Update progress every 100ms // Update progress every 100ms
const now = Date.now(); const now = Date.now();
if (now - lastUpdate > 100) { if (now - lastUpdate > 100) {
updateProgress(rowCount, totalRows, 'Purchase orders import', startTime); updateProgress(rowCount, totalRows, 'Purchase orders import', startTime, added, updated, skipped);
lastUpdate = now; lastUpdate = now;
} }

View File

@@ -2,6 +2,7 @@ const mysql = require("mysql2/promise");
const { Client } = require("ssh2"); const { Client } = require("ssh2");
const dotenv = require("dotenv"); const dotenv = require("dotenv");
const path = require("path"); const path = require("path");
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('./metrics/utils/progress');
dotenv.config({ path: path.join(__dirname, "../.env") }); dotenv.config({ path: path.join(__dirname, "../.env") });
@@ -43,42 +44,9 @@ const localDbConfig = {
namedPlaceholders: true, namedPlaceholders: true,
}; };
// Helper function to output progress // Constants
function outputProgress(data) { const BATCH_SIZE = 1000;
process.stdout.write(JSON.stringify(data) + "\n"); const PROGRESS_INTERVAL = 1000; // Update progress every second
}
// Helper function to format duration
function formatDuration(seconds) {
const hours = Math.floor(seconds / 3600);
const minutes = Math.floor((seconds % 3600) / 60);
seconds = Math.floor(seconds % 60);
const parts = [];
if (hours > 0) parts.push(`${hours}h`);
if (minutes > 0) parts.push(`${minutes}m`);
if (seconds > 0 || parts.length === 0) parts.push(`${seconds}s`);
return parts.join(" ");
}
// Helper function to update progress with time estimate
function updateProgress(current, total, operation, startTime) {
const elapsed = (Date.now() - startTime) / 1000;
const rate = current / elapsed;
const remaining = (total - current) / rate;
outputProgress({
status: "running",
operation,
current,
total,
rate,
elapsed: formatDuration(elapsed),
remaining: formatDuration(remaining),
percentage: ((current / total) * 100).toFixed(1),
});
}
let isImportCancelled = false; let isImportCancelled = false;
@@ -86,8 +54,27 @@ let isImportCancelled = false;
function cancelImport() { function cancelImport() {
isImportCancelled = true; isImportCancelled = true;
outputProgress({ outputProgress({
status: "cancelled", status: 'cancelled',
operation: "Import cancelled", operation: 'Import cancelled',
current: 0,
total: 0,
elapsed: null,
remaining: null,
rate: 0
});
}
// Helper function to update progress with time estimate
function updateProgress(current, total, operation, startTime) {
outputProgress({
status: 'running',
operation,
current,
total,
rate: calculateRate(startTime, current),
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, current, total),
percentage: ((current / total) * 100).toFixed(1)
}); });
} }
@@ -276,7 +263,7 @@ async function importCategories(prodConnection, localConnection) {
operation: "Categories import completed", operation: "Categories import completed",
current: totalInserted, current: totalInserted,
total: totalInserted, total: totalInserted,
duration: formatDuration((Date.now() - startTime) / 1000), duration: formatElapsedTime((Date.now() - startTime) / 1000),
}); });
} catch (error) { } catch (error) {
console.error("Error importing categories:", error); console.error("Error importing categories:", error);
@@ -510,7 +497,6 @@ async function importProducts(prodConnection, localConnection) {
const total = rows.length; const total = rows.length;
// Process products in batches // Process products in batches
const BATCH_SIZE = 100;
for (let i = 0; i < rows.length; i += BATCH_SIZE) { for (let i = 0; i < rows.length; i += BATCH_SIZE) {
let batch = rows.slice(i, i + BATCH_SIZE); let batch = rows.slice(i, i + BATCH_SIZE);
@@ -641,7 +627,7 @@ async function importProducts(prodConnection, localConnection) {
operation: "Products import completed", operation: "Products import completed",
current: total, current: total,
total, total,
duration: formatDuration((Date.now() - startTime) / 1000), duration: formatElapsedTime((Date.now() - startTime) / 1000),
}); });
} catch (error) { } catch (error) {
console.error("Error importing products:", error); console.error("Error importing products:", error);
@@ -1384,7 +1370,7 @@ async function importPurchaseOrders(prodConnection, localConnection) {
timing: { timing: {
start_time: new Date(startTime).toISOString(), start_time: new Date(startTime).toISOString(),
end_time: new Date(endTime).toISOString(), end_time: new Date(endTime).toISOString(),
elapsed_time: formatDuration((endTime - startTime) / 1000), elapsed_time: formatElapsedTime((endTime - startTime) / 1000),
elapsed_seconds: Math.round((endTime - startTime) / 1000) elapsed_seconds: Math.round((endTime - startTime) / 1000)
} }
}); });
@@ -1459,7 +1445,7 @@ async function main() {
timing: { timing: {
start_time: new Date(startTime).toISOString(), start_time: new Date(startTime).toISOString(),
end_time: new Date(endTime).toISOString(), end_time: new Date(endTime).toISOString(),
elapsed_time: formatDuration((endTime - startTime) / 1000), elapsed_time: formatElapsedTime((endTime - startTime) / 1000),
elapsed_seconds: Math.round((endTime - startTime) / 1000) elapsed_seconds: Math.round((endTime - startTime) / 1000)
} }
}); });
@@ -1473,7 +1459,7 @@ async function main() {
timing: { timing: {
start_time: new Date(startTime).toISOString(), start_time: new Date(startTime).toISOString(),
end_time: new Date(endTime).toISOString(), end_time: new Date(endTime).toISOString(),
elapsed_time: formatDuration((endTime - startTime) / 1000), elapsed_time: formatElapsedTime((endTime - startTime) / 1000),
elapsed_seconds: Math.round((endTime - startTime) / 1000) elapsed_seconds: Math.round((endTime - startTime) / 1000)
} }
}); });

View File

@@ -1,18 +1,32 @@
const { outputProgress } = require('./utils/progress'); const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress');
const { getConnection } = require('./utils/db'); const { getConnection } = require('./utils/db');
async function calculateBrandMetrics(startTime, totalProducts, processedCount) { async function calculateBrandMetrics(startTime, totalProducts, processedCount, isCancelled = false) {
const connection = await getConnection(); const connection = await getConnection();
try { try {
if (isCancelled) {
outputProgress({ outputProgress({
status: 'running', status: 'cancelled',
operation: 'Calculating brand metrics', operation: 'Brand metrics calculation cancelled',
current: Math.floor(totalProducts * 0.95), current: processedCount,
total: totalProducts, total: totalProducts,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.95), totalProducts), remaining: null,
rate: calculateRate(startTime, Math.floor(totalProducts * 0.95)), rate: calculateRate(startTime, processedCount),
percentage: '95' percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
return processedCount;
}
outputProgress({
status: 'running',
operation: 'Starting brand metrics calculation',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
}); });
// Calculate brand metrics with optimized queries // Calculate brand metrics with optimized queries
@@ -111,6 +125,20 @@ async function calculateBrandMetrics(startTime, totalProducts, processedCount) {
last_calculated_at = CURRENT_TIMESTAMP last_calculated_at = CURRENT_TIMESTAMP
`); `);
processedCount = Math.floor(totalProducts * 0.97);
outputProgress({
status: 'running',
operation: 'Brand metrics calculated, starting time-based metrics',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
if (isCancelled) return processedCount;
// Calculate brand time-based metrics with optimized query // Calculate brand time-based metrics with optimized query
await connection.query(` await connection.query(`
INSERT INTO brand_time_metrics ( INSERT INTO brand_time_metrics (
@@ -170,10 +198,27 @@ async function calculateBrandMetrics(startTime, totalProducts, processedCount) {
avg_margin = VALUES(avg_margin) avg_margin = VALUES(avg_margin)
`); `);
return Math.floor(totalProducts * 0.98); processedCount = Math.floor(totalProducts * 0.99);
outputProgress({
status: 'running',
operation: 'Brand time-based metrics calculated',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
return processedCount;
} catch (error) {
logError(error, 'Error calculating brand metrics');
throw error;
} finally { } finally {
if (connection) {
connection.release(); connection.release();
} }
}
} }
module.exports = calculateBrandMetrics; module.exports = calculateBrandMetrics;

View File

@@ -1,18 +1,32 @@
const { outputProgress } = require('./utils/progress'); const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress');
const { getConnection } = require('./utils/db'); const { getConnection } = require('./utils/db');
async function calculateCategoryMetrics(startTime, totalProducts, processedCount) { async function calculateCategoryMetrics(startTime, totalProducts, processedCount, isCancelled = false) {
const connection = await getConnection(); const connection = await getConnection();
try { try {
if (isCancelled) {
outputProgress({ outputProgress({
status: 'running', status: 'cancelled',
operation: 'Calculating category metrics', operation: 'Category metrics calculation cancelled',
current: Math.floor(totalProducts * 0.85), current: processedCount,
total: totalProducts, total: totalProducts,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.85), totalProducts), remaining: null,
rate: calculateRate(startTime, Math.floor(totalProducts * 0.85)), rate: calculateRate(startTime, processedCount),
percentage: '85' percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
return processedCount;
}
outputProgress({
status: 'running',
operation: 'Starting category metrics calculation',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
}); });
// First, calculate base category metrics // First, calculate base category metrics
@@ -44,6 +58,20 @@ async function calculateCategoryMetrics(startTime, totalProducts, processedCount
last_calculated_at = VALUES(last_calculated_at) last_calculated_at = VALUES(last_calculated_at)
`); `);
processedCount = Math.floor(totalProducts * 0.90);
outputProgress({
status: 'running',
operation: 'Base category metrics calculated, updating with margin data',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
if (isCancelled) return processedCount;
// Then update with margin and turnover data // Then update with margin and turnover data
await connection.query(` await connection.query(`
WITH category_sales AS ( WITH category_sales AS (
@@ -68,6 +96,20 @@ async function calculateCategoryMetrics(startTime, totalProducts, processedCount
cm.last_calculated_at = NOW() cm.last_calculated_at = NOW()
`); `);
processedCount = Math.floor(totalProducts * 0.95);
outputProgress({
status: 'running',
operation: 'Margin data updated, calculating growth rates',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
if (isCancelled) return processedCount;
// Finally update growth rates // Finally update growth rates
await connection.query(` await connection.query(`
WITH current_period AS ( WITH current_period AS (
@@ -112,6 +154,20 @@ async function calculateCategoryMetrics(startTime, totalProducts, processedCount
WHERE cp.cat_id IS NOT NULL OR pp.cat_id IS NOT NULL WHERE cp.cat_id IS NOT NULL OR pp.cat_id IS NOT NULL
`); `);
processedCount = Math.floor(totalProducts * 0.97);
outputProgress({
status: 'running',
operation: 'Growth rates calculated, updating time-based metrics',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
if (isCancelled) return processedCount;
// Calculate time-based metrics // Calculate time-based metrics
await connection.query(` await connection.query(`
INSERT INTO category_time_metrics ( INSERT INTO category_time_metrics (
@@ -157,50 +213,27 @@ async function calculateCategoryMetrics(startTime, totalProducts, processedCount
turnover_rate = VALUES(turnover_rate) turnover_rate = VALUES(turnover_rate)
`); `);
// Calculate sales metrics for different time periods processedCount = Math.floor(totalProducts * 0.99);
const periods = [30, 90, 180, 365]; outputProgress({
for (const days of periods) { status: 'running',
await connection.query(` operation: 'Time-based metrics calculated',
INSERT INTO category_sales_metrics ( current: processedCount,
category_id, total: totalProducts,
brand, elapsed: formatElapsedTime(startTime),
period_start, remaining: estimateRemaining(startTime, processedCount, totalProducts),
period_end, rate: calculateRate(startTime, processedCount),
avg_daily_sales, percentage: ((processedCount / totalProducts) * 100).toFixed(1)
total_sold, });
num_products,
avg_price,
last_calculated_at
)
SELECT
pc.cat_id as category_id,
COALESCE(p.brand, 'Unbranded') as brand,
DATE_SUB(CURDATE(), INTERVAL ? DAY) as period_start,
CURDATE() as period_end,
COALESCE(SUM(o.quantity), 0) / ? as avg_daily_sales,
COALESCE(SUM(o.quantity), 0) as total_sold,
COUNT(DISTINCT p.pid) as num_products,
COALESCE(AVG(o.price), 0) as avg_price,
NOW() as last_calculated_at
FROM product_categories pc
JOIN products p ON pc.pid = p.pid
LEFT JOIN orders o ON p.pid = o.pid
AND o.date >= DATE_SUB(CURDATE(), INTERVAL ? DAY)
AND o.canceled = false
GROUP BY pc.cat_id, p.brand
ON DUPLICATE KEY UPDATE
avg_daily_sales = VALUES(avg_daily_sales),
total_sold = VALUES(total_sold),
num_products = VALUES(num_products),
avg_price = VALUES(avg_price),
last_calculated_at = NOW()
`, [days, days, days]);
}
return Math.floor(totalProducts * 0.9); return processedCount;
} catch (error) {
logError(error, 'Error calculating category metrics');
throw error;
} finally { } finally {
if (connection) {
connection.release(); connection.release();
} }
}
} }
module.exports = calculateCategoryMetrics; module.exports = calculateCategoryMetrics;

View File

@@ -1,18 +1,32 @@
const { outputProgress } = require('./utils/progress'); const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress');
const { getConnection } = require('./utils/db'); const { getConnection } = require('./utils/db');
async function calculateFinancialMetrics(startTime, totalProducts, processedCount) { async function calculateFinancialMetrics(startTime, totalProducts, processedCount, isCancelled = false) {
const connection = await getConnection(); const connection = await getConnection();
try { try {
if (isCancelled) {
outputProgress({ outputProgress({
status: 'running', status: 'cancelled',
operation: 'Calculating financial metrics', operation: 'Financial metrics calculation cancelled',
current: Math.floor(totalProducts * 0.6), current: processedCount,
total: totalProducts, total: totalProducts,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.6), totalProducts), remaining: null,
rate: calculateRate(startTime, Math.floor(totalProducts * 0.6)), rate: calculateRate(startTime, processedCount),
percentage: '60' percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
return processedCount;
}
outputProgress({
status: 'running',
operation: 'Starting financial metrics calculation',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
}); });
// Calculate financial metrics with optimized query // Calculate financial metrics with optimized query
@@ -48,6 +62,20 @@ async function calculateFinancialMetrics(startTime, totalProducts, processedCoun
END END
`); `);
processedCount = Math.floor(totalProducts * 0.65);
outputProgress({
status: 'running',
operation: 'Base financial metrics calculated, updating time aggregates',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
if (isCancelled) return processedCount;
// Update time-based aggregates with optimized query // Update time-based aggregates with optimized query
await connection.query(` await connection.query(`
WITH monthly_financials AS ( WITH monthly_financials AS (
@@ -78,10 +106,27 @@ async function calculateFinancialMetrics(startTime, totalProducts, processedCoun
END END
`); `);
return Math.floor(totalProducts * 0.7); processedCount = Math.floor(totalProducts * 0.70);
outputProgress({
status: 'running',
operation: 'Time-based aggregates updated',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
return processedCount;
} catch (error) {
logError(error, 'Error calculating financial metrics');
throw error;
} finally { } finally {
if (connection) {
connection.release(); connection.release();
} }
}
} }
module.exports = calculateFinancialMetrics; module.exports = calculateFinancialMetrics;

View File

@@ -1,4 +1,4 @@
const { outputProgress, logError } = require('./utils/progress'); const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress');
const { getConnection } = require('./utils/db'); const { getConnection } = require('./utils/db');
// Helper function to handle NaN and undefined values // Helper function to handle NaN and undefined values
@@ -9,24 +9,38 @@ function sanitizeValue(value) {
return value; return value;
} }
async function calculateProductMetrics(startTime, totalProducts, processedCount = 0) { async function calculateProductMetrics(startTime, totalProducts, processedCount = 0, isCancelled = false) {
const connection = await getConnection(); const connection = await getConnection();
try { try {
// Skip flags are inherited from the parent scope // Skip flags are inherited from the parent scope
const SKIP_PRODUCT_BASE_METRICS = 0; const SKIP_PRODUCT_BASE_METRICS = 0;
const SKIP_PRODUCT_TIME_AGGREGATES =0; const SKIP_PRODUCT_TIME_AGGREGATES = 0;
if (isCancelled) {
outputProgress({
status: 'cancelled',
operation: 'Product metrics calculation cancelled',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: null,
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
return processedCount;
}
// Calculate base product metrics // Calculate base product metrics
if (!SKIP_PRODUCT_BASE_METRICS) { if (!SKIP_PRODUCT_BASE_METRICS) {
outputProgress({ outputProgress({
status: 'running', status: 'running',
operation: 'Calculating base product metrics', operation: 'Starting base product metrics calculation',
current: Math.floor(totalProducts * 0.2), current: processedCount,
total: totalProducts, total: totalProducts,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.2), totalProducts), remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, Math.floor(totalProducts * 0.2)), rate: calculateRate(startTime, processedCount),
percentage: '20' percentage: ((processedCount / totalProducts) * 100).toFixed(1)
}); });
// Calculate base metrics // Calculate base metrics
@@ -72,8 +86,17 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
`); `);
processedCount = Math.floor(totalProducts * 0.4); processedCount = Math.floor(totalProducts * 0.4);
outputProgress({
status: 'running',
operation: 'Base product metrics calculated',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
} else { } else {
console.log('Skipping base product metrics calculation');
processedCount = Math.floor(totalProducts * 0.4); processedCount = Math.floor(totalProducts * 0.4);
outputProgress({ outputProgress({
status: 'running', status: 'running',
@@ -83,21 +106,23 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts), remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount), rate: calculateRate(startTime, processedCount),
percentage: '40' percentage: ((processedCount / totalProducts) * 100).toFixed(1)
}); });
} }
if (isCancelled) return processedCount;
// Calculate product time aggregates // Calculate product time aggregates
if (!SKIP_PRODUCT_TIME_AGGREGATES) { if (!SKIP_PRODUCT_TIME_AGGREGATES) {
outputProgress({ outputProgress({
status: 'running', status: 'running',
operation: 'Calculating product time aggregates', operation: 'Starting product time aggregates calculation',
current: Math.floor(totalProducts * 0.4), current: processedCount,
total: totalProducts, total: totalProducts,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.4), totalProducts), remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, Math.floor(totalProducts * 0.4)), rate: calculateRate(startTime, processedCount),
percentage: '40' percentage: ((processedCount / totalProducts) * 100).toFixed(1)
}); });
// Calculate time-based aggregates // Calculate time-based aggregates
@@ -151,8 +176,17 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
`); `);
processedCount = Math.floor(totalProducts * 0.6); processedCount = Math.floor(totalProducts * 0.6);
outputProgress({
status: 'running',
operation: 'Product time aggregates calculated',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
} else { } else {
console.log('Skipping product time aggregates calculation');
processedCount = Math.floor(totalProducts * 0.6); processedCount = Math.floor(totalProducts * 0.6);
outputProgress({ outputProgress({
status: 'running', status: 'running',
@@ -162,11 +196,14 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts), remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount), rate: calculateRate(startTime, processedCount),
percentage: '60' percentage: ((processedCount / totalProducts) * 100).toFixed(1)
}); });
} }
return processedCount; return processedCount;
} catch (error) {
logError(error, 'Error calculating product metrics');
throw error;
} finally { } finally {
if (connection) { if (connection) {
connection.release(); connection.release();

View File

@@ -1,18 +1,32 @@
const { outputProgress } = require('./utils/progress'); const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress');
const { getConnection } = require('./utils/db'); const { getConnection } = require('./utils/db');
async function calculateSalesForecasts(startTime, totalProducts, processedCount) { async function calculateSalesForecasts(startTime, totalProducts, processedCount, isCancelled = false) {
const connection = await getConnection(); const connection = await getConnection();
try { try {
if (isCancelled) {
outputProgress({ outputProgress({
status: 'running', status: 'cancelled',
operation: 'Calculating sales forecasts', operation: 'Sales forecasts calculation cancelled',
current: Math.floor(totalProducts * 0.98), current: processedCount,
total: totalProducts, total: totalProducts,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.98), totalProducts), remaining: null,
rate: calculateRate(startTime, Math.floor(totalProducts * 0.98)), rate: calculateRate(startTime, processedCount),
percentage: '98' percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
return processedCount;
}
outputProgress({
status: 'running',
operation: 'Starting sales forecasts calculation',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
}); });
// First, create a temporary table for forecast dates // First, create a temporary table for forecast dates
@@ -42,6 +56,20 @@ async function calculateSalesForecasts(startTime, totalProducts, processedCount)
) numbers ) numbers
`); `);
processedCount = Math.floor(totalProducts * 0.92);
outputProgress({
status: 'running',
operation: 'Forecast dates prepared, calculating daily sales stats',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
if (isCancelled) return processedCount;
// Create temporary table for daily sales stats // Create temporary table for daily sales stats
await connection.query(` await connection.query(`
CREATE TEMPORARY TABLE IF NOT EXISTS temp_daily_sales AS CREATE TEMPORARY TABLE IF NOT EXISTS temp_daily_sales AS
@@ -57,6 +85,20 @@ async function calculateSalesForecasts(startTime, totalProducts, processedCount)
GROUP BY o.pid, DAYOFWEEK(o.date) GROUP BY o.pid, DAYOFWEEK(o.date)
`); `);
processedCount = Math.floor(totalProducts * 0.94);
outputProgress({
status: 'running',
operation: 'Daily sales stats calculated, preparing product stats',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
if (isCancelled) return processedCount;
// Create temporary table for product stats // Create temporary table for product stats
await connection.query(` await connection.query(`
CREATE TEMPORARY TABLE IF NOT EXISTS temp_product_stats AS CREATE TEMPORARY TABLE IF NOT EXISTS temp_product_stats AS
@@ -68,6 +110,20 @@ async function calculateSalesForecasts(startTime, totalProducts, processedCount)
GROUP BY pid GROUP BY pid
`); `);
processedCount = Math.floor(totalProducts * 0.96);
outputProgress({
status: 'running',
operation: 'Product stats prepared, calculating product-level forecasts',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
if (isCancelled) return processedCount;
// Calculate product-level forecasts // Calculate product-level forecasts
await connection.query(` await connection.query(`
INSERT INTO sales_forecasts ( INSERT INTO sales_forecasts (
@@ -116,6 +172,20 @@ async function calculateSalesForecasts(startTime, totalProducts, processedCount)
last_calculated_at = NOW() last_calculated_at = NOW()
`); `);
processedCount = Math.floor(totalProducts * 0.98);
outputProgress({
status: 'running',
operation: 'Product forecasts calculated, preparing category stats',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
if (isCancelled) return processedCount;
// Create temporary table for category stats // Create temporary table for category stats
await connection.query(` await connection.query(`
CREATE TEMPORARY TABLE IF NOT EXISTS temp_category_sales AS CREATE TEMPORARY TABLE IF NOT EXISTS temp_category_sales AS
@@ -142,6 +212,20 @@ async function calculateSalesForecasts(startTime, totalProducts, processedCount)
GROUP BY cat_id GROUP BY cat_id
`); `);
processedCount = Math.floor(totalProducts * 0.99);
outputProgress({
status: 'running',
operation: 'Category stats prepared, calculating category-level forecasts',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
if (isCancelled) return processedCount;
// Calculate category-level forecasts // Calculate category-level forecasts
await connection.query(` await connection.query(`
INSERT INTO category_forecasts ( INSERT INTO category_forecasts (
@@ -199,10 +283,27 @@ async function calculateSalesForecasts(startTime, totalProducts, processedCount)
DROP TEMPORARY TABLE IF EXISTS temp_category_stats; DROP TEMPORARY TABLE IF EXISTS temp_category_stats;
`); `);
return Math.floor(totalProducts * 1.0); processedCount = Math.floor(totalProducts * 1.0);
outputProgress({
status: 'running',
operation: 'Category forecasts calculated and temporary tables cleaned up',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
return processedCount;
} catch (error) {
logError(error, 'Error calculating sales forecasts');
throw error;
} finally { } finally {
if (connection) {
connection.release(); connection.release();
} }
}
} }
module.exports = calculateSalesForecasts; module.exports = calculateSalesForecasts;

View File

@@ -1,18 +1,32 @@
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('./utils/progress'); const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress');
const { getConnection } = require('./utils/db'); const { getConnection } = require('./utils/db');
async function calculateTimeAggregates(startTime, totalProducts, processedCount) { async function calculateTimeAggregates(startTime, totalProducts, processedCount, isCancelled = false) {
const connection = await getConnection(); const connection = await getConnection();
try { try {
if (isCancelled) {
outputProgress({ outputProgress({
status: 'running', status: 'cancelled',
operation: 'Calculating time aggregates', operation: 'Time aggregates calculation cancelled',
current: Math.floor(totalProducts * 0.95), current: processedCount,
total: totalProducts, total: totalProducts,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.95), totalProducts), remaining: null,
rate: calculateRate(startTime, Math.floor(totalProducts * 0.95)), rate: calculateRate(startTime, processedCount),
percentage: '95' percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
return processedCount;
}
outputProgress({
status: 'running',
operation: 'Starting time aggregates calculation',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
}); });
// Initial insert of time-based aggregates // Initial insert of time-based aggregates
@@ -109,6 +123,20 @@ async function calculateTimeAggregates(startTime, totalProducts, processedCount)
profit_margin = VALUES(profit_margin) profit_margin = VALUES(profit_margin)
`); `);
processedCount = Math.floor(totalProducts * 0.60);
outputProgress({
status: 'running',
operation: 'Base time aggregates calculated, updating financial metrics',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
if (isCancelled) return processedCount;
// Update with financial metrics // Update with financial metrics
await connection.query(` await connection.query(`
UPDATE product_time_aggregates pta UPDATE product_time_aggregates pta
@@ -136,7 +164,22 @@ async function calculateTimeAggregates(startTime, totalProducts, processedCount)
END END
`); `);
return Math.floor(totalProducts * 0.65); processedCount = Math.floor(totalProducts * 0.65);
outputProgress({
status: 'running',
operation: 'Financial metrics updated',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
return processedCount;
} catch (error) {
logError(error, 'Error calculating time aggregates');
throw error;
} finally { } finally {
if (connection) { if (connection) {
connection.release(); connection.release();

View File

@@ -1,18 +1,32 @@
const { outputProgress } = require('./utils/progress'); const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress');
const { getConnection } = require('./utils/db'); const { getConnection } = require('./utils/db');
async function calculateVendorMetrics(startTime, totalProducts, processedCount) { async function calculateVendorMetrics(startTime, totalProducts, processedCount, isCancelled = false) {
const connection = await getConnection(); const connection = await getConnection();
try { try {
if (isCancelled) {
outputProgress({ outputProgress({
status: 'running', status: 'cancelled',
operation: 'Ensuring vendors exist in vendor_details', operation: 'Vendor metrics calculation cancelled',
current: Math.floor(totalProducts * 0.7), current: processedCount,
total: totalProducts, total: totalProducts,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.7), totalProducts), remaining: null,
rate: calculateRate(startTime, Math.floor(totalProducts * 0.7)), rate: calculateRate(startTime, processedCount),
percentage: '70' percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
return processedCount;
}
outputProgress({
status: 'running',
operation: 'Starting vendor metrics calculation',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
}); });
// First ensure all vendors exist in vendor_details // First ensure all vendors exist in vendor_details
@@ -27,17 +41,20 @@ async function calculateVendorMetrics(startTime, totalProducts, processedCount)
WHERE vendor IS NOT NULL WHERE vendor IS NOT NULL
`); `);
processedCount = Math.floor(totalProducts * 0.8);
outputProgress({ outputProgress({
status: 'running', status: 'running',
operation: 'Calculating vendor metrics', operation: 'Vendor details updated, calculating metrics',
current: Math.floor(totalProducts * 0.8), current: processedCount,
total: totalProducts, total: totalProducts,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.8), totalProducts), remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, Math.floor(totalProducts * 0.8)), rate: calculateRate(startTime, processedCount),
percentage: '80' percentage: ((processedCount / totalProducts) * 100).toFixed(1)
}); });
if (isCancelled) return processedCount;
// Now calculate vendor metrics // Now calculate vendor metrics
await connection.query(` await connection.query(`
INSERT INTO vendor_metrics ( INSERT INTO vendor_metrics (
@@ -130,10 +147,27 @@ async function calculateVendorMetrics(startTime, totalProducts, processedCount)
last_calculated_at = VALUES(last_calculated_at) last_calculated_at = VALUES(last_calculated_at)
`); `);
return Math.floor(totalProducts * 0.9); processedCount = Math.floor(totalProducts * 0.9);
outputProgress({
status: 'running',
operation: 'Vendor metrics calculated',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
return processedCount;
} catch (error) {
logError(error, 'Error calculating vendor metrics');
throw error;
} finally { } finally {
if (connection) {
connection.release(); connection.release();
} }
}
} }
module.exports = calculateVendorMetrics; module.exports = calculateVendorMetrics;

View File

@@ -12,6 +12,12 @@ const dbConfig = {
}; };
function outputProgress(data) { function outputProgress(data) {
if (!data.status) {
data = {
status: 'running',
...data
};
}
console.log(JSON.stringify(data)); console.log(JSON.stringify(data));
} }
@@ -51,36 +57,228 @@ const REQUIRED_CORE_TABLES = [
'purchase_orders' 'purchase_orders'
]; ];
// Split SQL into individual statements
function splitSQLStatements(sql) {
sql = sql.replace(/\r\n/g, '\n');
let statements = [];
let currentStatement = '';
let inString = false;
let stringChar = '';
for (let i = 0; i < sql.length; i++) {
const char = sql[i];
const nextChar = sql[i + 1] || '';
if ((char === "'" || char === '"') && sql[i - 1] !== '\\') {
if (!inString) {
inString = true;
stringChar = char;
} else if (char === stringChar) {
inString = false;
}
}
if (!inString && char === '-' && nextChar === '-') {
while (i < sql.length && sql[i] !== '\n') i++;
continue;
}
if (!inString && char === '/' && nextChar === '*') {
i += 2;
while (i < sql.length && (sql[i] !== '*' || sql[i + 1] !== '/')) i++;
i++;
continue;
}
if (!inString && char === ';') {
if (currentStatement.trim()) {
statements.push(currentStatement.trim());
}
currentStatement = '';
} else {
currentStatement += char;
}
}
if (currentStatement.trim()) {
statements.push(currentStatement.trim());
}
return statements;
}
async function resetMetrics() { async function resetMetrics() {
let connection; let connection;
try { try {
outputProgress({
operation: 'Starting metrics reset',
message: 'Connecting to database...'
});
connection = await mysql.createConnection(dbConfig); connection = await mysql.createConnection(dbConfig);
await connection.beginTransaction(); await connection.beginTransaction();
// Verify required core tables exist
outputProgress({
operation: 'Verifying core tables',
message: 'Checking required tables exist...'
});
const [tables] = await connection.query(`
SELECT table_name
FROM information_schema.tables
WHERE table_schema = DATABASE()
AND table_name IN (?)
`, [REQUIRED_CORE_TABLES]);
const existingCoreTables = tables.map(t => t.table_name);
const missingCoreTables = REQUIRED_CORE_TABLES.filter(t => !existingCoreTables.includes(t));
if (missingCoreTables.length > 0) {
throw new Error(`Required core tables missing: ${missingCoreTables.join(', ')}`);
}
// Verify config tables exist
outputProgress({
operation: 'Verifying config tables',
message: 'Checking configuration tables exist...'
});
const [configTables] = await connection.query(`
SELECT table_name
FROM information_schema.tables
WHERE table_schema = DATABASE()
AND table_name IN (?)
`, [CONFIG_TABLES]);
const existingConfigTables = configTables.map(t => t.table_name);
const missingConfigTables = CONFIG_TABLES.filter(t => !existingConfigTables.includes(t));
if (missingConfigTables.length > 0) {
throw new Error(`Required config tables missing: ${missingConfigTables.join(', ')}`);
}
// Drop all metrics tables // Drop all metrics tables
outputProgress({
operation: 'Dropping metrics tables',
message: 'Removing existing metrics tables...'
});
for (const table of METRICS_TABLES) { for (const table of METRICS_TABLES) {
console.log(`Dropping table: ${table}`);
try { try {
await connection.query(`DROP TABLE IF EXISTS ${table}`); await connection.query(`DROP TABLE IF EXISTS ${table}`);
console.log(`Successfully dropped: ${table}`); outputProgress({
operation: 'Table dropped',
message: `Successfully dropped table: ${table}`
});
} catch (err) { } catch (err) {
console.error(`Error dropping ${table}:`, err.message); outputProgress({
status: 'error',
operation: 'Drop table error',
message: `Error dropping table ${table}: ${err.message}`
});
throw err; throw err;
} }
} }
// Recreate all metrics tables from schema // Read metrics schema
const schemaSQL = fs.readFileSync(path.resolve(__dirname, '../db/metrics-schema.sql'), 'utf8'); outputProgress({
await connection.query(schemaSQL); operation: 'Reading schema',
console.log('All metrics tables recreated successfully'); message: 'Loading metrics schema file...'
});
const schemaPath = path.resolve(__dirname, '../db/metrics-schema.sql');
if (!fs.existsSync(schemaPath)) {
throw new Error(`Schema file not found at: ${schemaPath}`);
}
const schemaSQL = fs.readFileSync(schemaPath, 'utf8');
const statements = splitSQLStatements(schemaSQL);
outputProgress({
operation: 'Schema loaded',
message: `Found ${statements.length} SQL statements to execute`
});
// Execute schema statements
for (let i = 0; i < statements.length; i++) {
const stmt = statements[i];
try {
const [result] = await connection.query(stmt);
// Check for warnings
const [warnings] = await connection.query('SHOW WARNINGS');
if (warnings && warnings.length > 0) {
outputProgress({
status: 'warning',
operation: 'SQL Warning',
message: warnings
});
}
outputProgress({
operation: 'SQL Progress',
message: {
statement: i + 1,
total: statements.length,
preview: stmt.substring(0, 100) + (stmt.length > 100 ? '...' : ''),
affectedRows: result.affectedRows
}
});
} catch (sqlError) {
outputProgress({
status: 'error',
operation: 'SQL Error',
message: {
error: sqlError.message,
sqlState: sqlError.sqlState,
errno: sqlError.errno,
statement: stmt,
statementNumber: i + 1
}
});
throw sqlError;
}
}
// Verify metrics tables were created
outputProgress({
operation: 'Verifying metrics tables',
message: 'Checking all metrics tables were created...'
});
const [metricsTablesResult] = await connection.query(`
SELECT table_name
FROM information_schema.tables
WHERE table_schema = DATABASE()
AND table_name IN (?)
`, [METRICS_TABLES]);
const existingMetricsTables = metricsTablesResult.map(t => t.table_name);
const missingMetricsTables = METRICS_TABLES.filter(t => !existingMetricsTables.includes(t));
if (missingMetricsTables.length > 0) {
throw new Error(`Failed to create metrics tables: ${missingMetricsTables.join(', ')}`);
}
await connection.commit(); await connection.commit();
console.log('All metrics tables reset successfully');
outputProgress({
status: 'complete',
operation: 'Reset complete',
message: 'All metrics tables have been reset successfully'
});
} catch (error) { } catch (error) {
outputProgress({
status: 'error',
operation: 'Reset failed',
message: error.message,
stack: error.stack
});
if (connection) { if (connection) {
await connection.rollback(); await connection.rollback();
} }
console.error('Error resetting metrics:', error);
throw error; throw error;
} finally { } finally {
if (connection) { if (connection) {

View File

@@ -1,167 +1,180 @@
const fs = require('fs');
const path = require('path'); const path = require('path');
const https = require('https'); const fs = require('fs');
const axios = require('axios');
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('./metrics/utils/progress');
// Change working directory to script directory
process.chdir(path.dirname(__filename));
require('dotenv').config({ path: path.resolve(__dirname, '..', '.env') });
// Configuration
const FILES = [ const FILES = [
{ {
name: '39f2x83-products.csv', name: '39f2x83-products.csv',
url: 'https://feeds.acherryontop.com/39f2x83-products.csv' url: process.env.PRODUCTS_CSV_URL
}, },
{ {
name: '39f2x83-orders.csv', name: '39f2x83-orders.csv',
url: 'https://feeds.acherryontop.com/39f2x83-orders.csv' url: process.env.ORDERS_CSV_URL
}, },
{ {
name: '39f2x83-purchase_orders.csv', name: '39f2x83-purchase_orders.csv',
url: 'https://feeds.acherryontop.com/39f2x83-purchase_orders.csv' url: process.env.PURCHASE_ORDERS_CSV_URL
} }
]; ];
const CSV_DIR = path.join(__dirname, '..', 'csv'); let isCancelled = false;
// Ensure CSV directory exists function cancelUpdate() {
if (!fs.existsSync(CSV_DIR)) { isCancelled = true;
fs.mkdirSync(CSV_DIR, { recursive: true }); outputProgress({
status: 'cancelled',
operation: 'CSV update cancelled',
current: 0,
total: FILES.length,
elapsed: null,
remaining: null,
rate: 0
});
} }
// Function to download a file async function downloadFile(file, index, startTime) {
function downloadFile(url, filePath) { if (isCancelled) return;
return new Promise((resolve, reject) => {
const file = fs.createWriteStream(filePath);
https.get(url, response => { const csvDir = path.join(__dirname, '../csv');
if (response.statusCode !== 200) { if (!fs.existsSync(csvDir)) {
reject(new Error(`Failed to download: ${response.statusCode} ${response.statusMessage}`)); fs.mkdirSync(csvDir, { recursive: true });
}
const writer = fs.createWriteStream(path.join(csvDir, file.name));
try {
const response = await axios({
url: file.url,
method: 'GET',
responseType: 'stream'
});
const totalLength = response.headers['content-length'];
let downloadedLength = 0;
let lastProgressUpdate = Date.now();
const PROGRESS_INTERVAL = 1000; // Update progress every second
response.data.on('data', (chunk) => {
if (isCancelled) {
writer.end();
return; return;
} }
const totalSize = parseInt(response.headers['content-length'], 10); downloadedLength += chunk.length;
let downloadedSize = 0;
let lastProgressUpdate = Date.now();
const startTime = Date.now();
response.on('data', chunk => { // Update progress based on time interval
downloadedSize += chunk.length;
const now = Date.now(); const now = Date.now();
// Update progress at most every 100ms to avoid console flooding if (now - lastProgressUpdate >= PROGRESS_INTERVAL) {
if (now - lastProgressUpdate > 100) { const progress = (downloadedLength / totalLength) * 100;
const elapsed = (now - startTime) / 1000; outputProgress({
const rate = downloadedSize / elapsed;
const remaining = (totalSize - downloadedSize) / rate;
console.log(JSON.stringify({
status: 'running', status: 'running',
operation: `Downloading ${path.basename(filePath)}`, operation: `Downloading ${file.name}`,
current: downloadedSize, current: index + (downloadedLength / totalLength),
total: totalSize, total: FILES.length,
rate: (rate / 1024 / 1024).toFixed(2), // MB/s elapsed: formatElapsedTime(startTime),
elapsed: formatDuration(elapsed), remaining: estimateRemaining(startTime, index + (downloadedLength / totalLength), FILES.length),
remaining: formatDuration(remaining), rate: calculateRate(startTime, index + (downloadedLength / totalLength)),
percentage: ((downloadedSize / totalSize) * 100).toFixed(1) percentage: progress.toFixed(1),
})); file_progress: {
name: file.name,
downloaded: downloadedLength,
total: totalLength,
percentage: progress.toFixed(1)
}
});
lastProgressUpdate = now; lastProgressUpdate = now;
} }
}); });
response.pipe(file); response.data.pipe(writer);
file.on('finish', () => { return new Promise((resolve, reject) => {
console.log(JSON.stringify({ writer.on('finish', resolve);
status: 'running', writer.on('error', reject);
operation: `Completed ${path.basename(filePath)}`,
current: totalSize,
total: totalSize,
percentage: '100'
}));
file.close();
resolve();
}); });
}).on('error', error => { } catch (error) {
fs.unlink(filePath, () => {}); // Delete the file if download failed fs.unlinkSync(path.join(csvDir, file.name));
reject(error); throw error;
}); }
file.on('error', error => {
fs.unlink(filePath, () => {}); // Delete the file if there was an error
reject(error);
});
});
}
// Helper function to format duration
function formatDuration(seconds) {
if (seconds < 60) return `${Math.round(seconds)}s`;
const minutes = Math.floor(seconds / 60);
seconds = Math.round(seconds % 60);
return `${minutes}m ${seconds}s`;
} }
// Main function to update all files // Main function to update all files
async function updateFiles() { async function updateFiles() {
console.log(JSON.stringify({ const startTime = Date.now();
status: 'running',
operation: 'Starting CSV file updates',
total: FILES.length,
current: 0
}));
for (let i = 0; i < FILES.length; i++) { outputProgress({
const file = FILES[i]; status: 'running',
const filePath = path.join(CSV_DIR, file.name); operation: 'Starting CSV update',
current: 0,
total: FILES.length,
elapsed: '0s',
remaining: null,
rate: 0,
percentage: '0'
});
try { try {
// Delete existing file if it exists for (let i = 0; i < FILES.length; i++) {
if (fs.existsSync(filePath)) { if (isCancelled) {
console.log(JSON.stringify({ return;
status: 'running',
operation: `Removing existing file: ${file.name}`,
current: i,
total: FILES.length,
percentage: ((i / FILES.length) * 100).toFixed(1)
}));
fs.unlinkSync(filePath);
} }
// Download new file const file = FILES[i];
console.log(JSON.stringify({ await downloadFile(file, i, startTime);
outputProgress({
status: 'running', status: 'running',
operation: `Starting download: ${file.name}`, operation: 'CSV update in progress',
current: i,
total: FILES.length,
percentage: ((i / FILES.length) * 100).toFixed(1)
}));
await downloadFile(file.url, filePath);
console.log(JSON.stringify({
status: 'running',
operation: `Successfully updated ${file.name}`,
current: i + 1, current: i + 1,
total: FILES.length, total: FILES.length,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, i + 1, FILES.length),
rate: calculateRate(startTime, i + 1),
percentage: (((i + 1) / FILES.length) * 100).toFixed(1) percentage: (((i + 1) / FILES.length) * 100).toFixed(1)
})); });
} catch (error) {
console.error(JSON.stringify({
status: 'error',
operation: `Error updating ${file.name}`,
error: error.message
}));
throw error;
}
} }
console.log(JSON.stringify({ outputProgress({
status: 'complete', status: 'complete',
operation: 'CSV file update complete', operation: 'CSV update complete',
current: FILES.length, current: FILES.length,
total: FILES.length, total: FILES.length,
elapsed: formatElapsedTime(startTime),
remaining: '0s',
rate: calculateRate(startTime, FILES.length),
percentage: '100' percentage: '100'
})); });
} catch (error) {
outputProgress({
status: 'error',
operation: 'CSV update failed',
error: error.message,
current: 0,
total: FILES.length,
elapsed: formatElapsedTime(startTime),
remaining: null,
rate: 0
});
throw error;
}
} }
// Run the update // Run the update only if this is the main module
updateFiles().catch(error => { if (require.main === module) {
console.error(JSON.stringify({ updateFiles().catch((error) => {
error: `Update failed: ${error.message}` console.error('Error updating CSV files:', error);
}));
process.exit(1); process.exit(1);
}); });
}
// Export the functions needed by the route
module.exports = {
updateFiles,
cancelUpdate
};