Fix time out error on data import and fix regression on progress display + sort out core and metrics schemas better

This commit is contained in:
2025-01-12 13:09:40 -05:00
parent 6e1a8cf17d
commit 6c524aa3a9
5 changed files with 211 additions and 341 deletions

View File

@@ -54,20 +54,23 @@ function logImport(message) {
fs.appendFileSync(IMPORT_LOG, logMessage);
}
// Helper function to output progress in JSON format
// Helper function to format duration
function formatDuration(seconds) {
const hours = Math.floor(seconds / 3600);
const minutes = Math.floor((seconds % 3600) / 60);
seconds = Math.floor(seconds % 60);
const parts = [];
if (hours > 0) parts.push(`${hours}h`);
if (minutes > 0) parts.push(`${minutes}m`);
if (seconds > 0 || parts.length === 0) parts.push(`${seconds}s`);
return parts.join(' ');
}
// Helper function to output progress
function outputProgress(data) {
if (!data.status) {
data = {
status: 'running',
...data
};
}
// Log progress to import log
logImport(JSON.stringify(data));
// Output to console
console.log(JSON.stringify(data));
process.stdout.write(JSON.stringify(data) + '\n');
}
// Helper function to count total rows in a CSV file
@@ -82,14 +85,6 @@ async function countRows(filePath) {
});
}
// Helper function to format time duration
function formatDuration(seconds) {
if (seconds < 60) return `${Math.round(seconds)}s`;
const minutes = Math.floor(seconds / 60);
seconds = Math.round(seconds % 60);
return `${minutes}m ${seconds}s`;
}
// Helper function to update progress with time estimate
function updateProgress(current, total, operation, startTime) {
const elapsed = (Date.now() - startTime) / 1000;
@@ -401,118 +396,6 @@ async function calculateVendorMetrics(connection) {
}
}
// Helper function to calculate metrics in batches
async function calculateMetricsInBatch(connection) {
try {
// Clear temporary tables
await connection.query('TRUNCATE TABLE temp_sales_metrics');
await connection.query('TRUNCATE TABLE temp_purchase_metrics');
// Calculate sales metrics for all products in one go
await connection.query(`
INSERT INTO temp_sales_metrics
SELECT
o.product_id,
COUNT(*) / NULLIF(DATEDIFF(MAX(o.date), MIN(o.date)), 0) as daily_sales_avg,
SUM(o.quantity) / NULLIF(DATEDIFF(MAX(o.date), MIN(o.date)), 0) * 7 as weekly_sales_avg,
SUM(o.quantity) / NULLIF(DATEDIFF(MAX(o.date), MIN(o.date)), 0) * 30 as monthly_sales_avg,
SUM(o.price * o.quantity) as total_revenue,
AVG((o.price - p.cost_price) / o.price * 100) as avg_margin_percent,
MIN(o.date) as first_sale_date,
MAX(o.date) as last_sale_date
FROM orders o
JOIN products p ON o.product_id = p.product_id
WHERE o.canceled = false
GROUP BY o.product_id
`);
// Calculate purchase metrics for all products in one go
await connection.query(`
INSERT INTO temp_purchase_metrics
SELECT
product_id,
AVG(DATEDIFF(received_date, date)) as avg_lead_time_days,
MAX(date) as last_purchase_date,
MAX(received_date) as last_received_date
FROM purchase_orders
WHERE status = 'closed'
GROUP BY product_id
`);
// Update product_metrics table with all metrics at once
await connection.query(`
INSERT INTO product_metrics (
product_id, daily_sales_avg, weekly_sales_avg, monthly_sales_avg,
days_of_inventory, weeks_of_inventory, safety_stock, reorder_point,
avg_margin_percent, total_revenue, avg_lead_time_days,
last_purchase_date, last_received_date
)
SELECT
p.product_id,
COALESCE(s.daily_sales_avg, 0),
COALESCE(s.weekly_sales_avg, 0),
COALESCE(s.monthly_sales_avg, 0),
CASE
WHEN s.daily_sales_avg > 0 THEN FLOOR(p.stock_quantity / s.daily_sales_avg)
ELSE 999
END as days_of_inventory,
CASE
WHEN s.daily_sales_avg > 0 THEN FLOOR(p.stock_quantity / s.daily_sales_avg / 7)
ELSE 999
END as weeks_of_inventory,
CEIL(COALESCE(s.daily_sales_avg, 0) * 14) as safety_stock,
CEIL(COALESCE(s.daily_sales_avg, 0) * 21) as reorder_point,
COALESCE(s.avg_margin_percent, 0),
COALESCE(s.total_revenue, 0),
COALESCE(pm.avg_lead_time_days, 0),
pm.last_purchase_date,
pm.last_received_date
FROM products p
LEFT JOIN temp_sales_metrics s ON p.product_id = s.product_id
LEFT JOIN temp_purchase_metrics pm ON p.product_id = pm.product_id
ON DUPLICATE KEY UPDATE
daily_sales_avg = VALUES(daily_sales_avg),
weekly_sales_avg = VALUES(weekly_sales_avg),
monthly_sales_avg = VALUES(monthly_sales_avg),
days_of_inventory = VALUES(days_of_inventory),
weeks_of_inventory = VALUES(weeks_of_inventory),
safety_stock = VALUES(safety_stock),
reorder_point = VALUES(reorder_point),
avg_margin_percent = VALUES(avg_margin_percent),
total_revenue = VALUES(total_revenue),
avg_lead_time_days = VALUES(avg_lead_time_days),
last_purchase_date = VALUES(last_purchase_date),
last_received_date = VALUES(last_received_date),
last_calculated_at = CURRENT_TIMESTAMP
`);
// Calculate ABC classification in one go
await connection.query(`
WITH revenue_ranks AS (
SELECT
product_id,
total_revenue,
total_revenue / SUM(total_revenue) OVER () * 100 as revenue_percent,
ROW_NUMBER() OVER (ORDER BY total_revenue DESC) as rank
FROM product_metrics
WHERE total_revenue > 0
)
UPDATE product_metrics pm
JOIN revenue_ranks r ON pm.product_id = r.product_id
SET abc_class =
CASE
WHEN r.revenue_percent >= 20 THEN 'A'
WHEN r.revenue_percent >= 5 THEN 'B'
ELSE 'C'
END
`);
} catch (error) {
logError(error, 'Error in batch metrics calculation');
throw error;
}
}
async function importProducts(pool, filePath) {
const parser = fs.createReadStream(filePath).pipe(csv.parse({ columns: true, trim: true }));
const totalRows = PRODUCTS_TEST_LIMIT > 0 ? Math.min(await countRows(filePath), PRODUCTS_TEST_LIMIT) : await countRows(filePath);
@@ -1085,7 +968,7 @@ async function main() {
connection.release();
}
// Step 1: Import all data first
// Import all data
try {
// Import products first since they're referenced by other tables
await importProducts(pool, path.join(__dirname, '../csv/39f2x83-products.csv'));
@@ -1101,34 +984,16 @@ async function main() {
importPurchaseOrders(pool, path.join(__dirname, '../csv/39f2x83-purchase_orders.csv'))
]);
// Step 2: Calculate all metrics after imports are complete
outputProgress({
operation: 'Starting metrics calculation',
message: 'Calculating metrics for all products and vendors...'
status: 'complete',
operation: 'Import process completed',
duration: formatDuration((Date.now() - startTime) / 1000)
});
const connection = await pool.getConnection();
try {
// Calculate metrics in batches
await calculateMetricsInBatch(connection);
// Calculate vendor metrics
await calculateVendorMetrics(connection);
} finally {
connection.release();
}
} catch (error) {
logError(error, 'Error during import/metrics calculation');
logError(error, 'Error during import');
throw error;
}
outputProgress({
status: 'complete',
operation: 'Import process completed',
duration: formatDuration((Date.now() - startTime) / 1000)
});
} catch (error) {
logError(error, 'Fatal error during import process');
outputProgress({

View File

@@ -84,6 +84,14 @@ async function resetMetrics() {
// Disable foreign key checks first
await connection.query('SET FOREIGN_KEY_CHECKS = 0');
// Drop the metrics views first
outputProgress({
status: 'running',
operation: 'Dropping metrics views',
percentage: '15'
});
await connection.query('DROP VIEW IF EXISTS inventory_health, product_sales_trends');
// Drop only the metrics tables if they exist
const [existing] = await connection.query(`
SELECT GROUP_CONCAT(table_name) as tables