Put back files

This commit is contained in:
2025-10-04 16:14:09 -04:00
parent 4953355b91
commit e84c7e568f
166 changed files with 61620 additions and 259 deletions

View File

@@ -0,0 +1,321 @@
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress');
const { getConnection } = require('./utils/db');
async function calculateBrandMetrics(startTime, totalProducts, processedCount = 0, isCancelled = false) {
const connection = await getConnection();
let success = false;
let processedOrders = 0;
try {
if (isCancelled) {
outputProgress({
status: 'cancelled',
operation: 'Brand metrics calculation cancelled',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: null,
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
return {
processedProducts: processedCount,
processedOrders: 0,
processedPurchaseOrders: 0,
success
};
}
// Get order count that will be processed
const orderCount = await connection.query(`
SELECT COUNT(*) as count
FROM orders o
WHERE o.canceled = false
`);
processedOrders = parseInt(orderCount.rows[0].count);
outputProgress({
status: 'running',
operation: 'Starting brand metrics calculation',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
// Calculate brand metrics with optimized queries
await connection.query(`
INSERT INTO brand_metrics (
brand,
product_count,
active_products,
total_stock_units,
total_stock_cost,
total_stock_retail,
total_revenue,
avg_margin,
growth_rate
)
WITH filtered_products AS (
SELECT
p.*,
CASE
WHEN p.stock_quantity <= 5000 AND p.stock_quantity >= 0
THEN p.pid
END as valid_pid,
CASE
WHEN p.visible = true
AND p.stock_quantity <= 5000
AND p.stock_quantity >= 0
THEN p.pid
END as active_pid,
CASE
WHEN p.stock_quantity IS NULL
OR p.stock_quantity < 0
OR p.stock_quantity > 5000
THEN 0
ELSE p.stock_quantity
END as valid_stock
FROM products p
WHERE p.brand IS NOT NULL
),
sales_periods AS (
SELECT
p.brand,
SUM(o.quantity * (o.price - COALESCE(o.discount, 0))) as period_revenue,
SUM(o.quantity * (o.price - COALESCE(o.discount, 0) - p.cost_price)) as period_margin,
COUNT(DISTINCT DATE(o.date)) as period_days,
CASE
WHEN o.date >= CURRENT_DATE - INTERVAL '3 months' THEN 'current'
WHEN o.date BETWEEN CURRENT_DATE - INTERVAL '15 months'
AND CURRENT_DATE - INTERVAL '12 months' THEN 'previous'
END as period_type
FROM filtered_products p
JOIN orders o ON p.pid = o.pid
WHERE o.canceled = false
AND o.date >= CURRENT_DATE - INTERVAL '15 months'
GROUP BY p.brand, period_type
),
brand_data AS (
SELECT
p.brand,
COUNT(DISTINCT p.valid_pid) as product_count,
COUNT(DISTINCT p.active_pid) as active_products,
SUM(p.valid_stock) as total_stock_units,
SUM(p.valid_stock * p.cost_price) as total_stock_cost,
SUM(p.valid_stock * p.price) as total_stock_retail,
COALESCE(SUM(o.quantity * (o.price - COALESCE(o.discount, 0))), 0) as total_revenue,
CASE
WHEN SUM(o.quantity * o.price) > 0
THEN GREATEST(
-100.0,
LEAST(
100.0,
(
SUM(o.quantity * o.price) - -- Use gross revenue (before discounts)
SUM(o.quantity * COALESCE(p.cost_price, 0)) -- Total costs
) * 100.0 /
NULLIF(SUM(o.quantity * o.price), 0) -- Divide by gross revenue
)
)
ELSE 0
END as avg_margin
FROM filtered_products p
LEFT JOIN orders o ON p.pid = o.pid AND o.canceled = false
GROUP BY p.brand
)
SELECT
bd.brand,
bd.product_count,
bd.active_products,
bd.total_stock_units,
bd.total_stock_cost,
bd.total_stock_retail,
bd.total_revenue,
bd.avg_margin,
CASE
WHEN MAX(CASE WHEN sp.period_type = 'previous' THEN sp.period_revenue END) = 0
AND MAX(CASE WHEN sp.period_type = 'current' THEN sp.period_revenue END) > 0
THEN 100.0
WHEN MAX(CASE WHEN sp.period_type = 'previous' THEN sp.period_revenue END) = 0
THEN 0.0
ELSE GREATEST(
-100.0,
LEAST(
((MAX(CASE WHEN sp.period_type = 'current' THEN sp.period_revenue END) -
MAX(CASE WHEN sp.period_type = 'previous' THEN sp.period_revenue END)) /
NULLIF(ABS(MAX(CASE WHEN sp.period_type = 'previous' THEN sp.period_revenue END)), 0)) * 100.0,
999.99
)
)
END as growth_rate
FROM brand_data bd
LEFT JOIN sales_periods sp ON bd.brand = sp.brand
GROUP BY bd.brand, bd.product_count, bd.active_products, bd.total_stock_units,
bd.total_stock_cost, bd.total_stock_retail, bd.total_revenue, bd.avg_margin
ON CONFLICT (brand) DO UPDATE
SET
product_count = EXCLUDED.product_count,
active_products = EXCLUDED.active_products,
total_stock_units = EXCLUDED.total_stock_units,
total_stock_cost = EXCLUDED.total_stock_cost,
total_stock_retail = EXCLUDED.total_stock_retail,
total_revenue = EXCLUDED.total_revenue,
avg_margin = EXCLUDED.avg_margin,
growth_rate = EXCLUDED.growth_rate,
last_calculated_at = CURRENT_TIMESTAMP
`);
processedCount = Math.floor(totalProducts * 0.97);
outputProgress({
status: 'running',
operation: 'Brand metrics calculated, starting time-based metrics',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (isCancelled) return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders: 0,
success
};
// Calculate brand time-based metrics with optimized query
await connection.query(`
INSERT INTO brand_time_metrics (
brand,
year,
month,
product_count,
active_products,
total_stock_units,
total_stock_cost,
total_stock_retail,
total_revenue,
avg_margin
)
WITH filtered_products AS (
SELECT
p.*,
CASE WHEN p.stock_quantity <= 5000 THEN p.pid END as valid_pid,
CASE WHEN p.visible = true AND p.stock_quantity <= 5000 THEN p.pid END as active_pid,
CASE
WHEN p.stock_quantity IS NULL OR p.stock_quantity < 0 OR p.stock_quantity > 5000 THEN 0
ELSE p.stock_quantity
END as valid_stock
FROM products p
WHERE p.brand IS NOT NULL
),
monthly_metrics AS (
SELECT
p.brand,
EXTRACT(YEAR FROM o.date::timestamp with time zone) as year,
EXTRACT(MONTH FROM o.date::timestamp with time zone) as month,
COUNT(DISTINCT p.valid_pid) as product_count,
COUNT(DISTINCT p.active_pid) as active_products,
SUM(p.valid_stock) as total_stock_units,
SUM(p.valid_stock * p.cost_price) as total_stock_cost,
SUM(p.valid_stock * p.price) as total_stock_retail,
SUM(o.quantity * o.price) as total_revenue,
CASE
WHEN SUM(o.quantity * o.price) > 0
THEN GREATEST(
-100.0,
LEAST(
100.0,
(
SUM(o.quantity * o.price) - -- Use gross revenue (before discounts)
SUM(o.quantity * COALESCE(p.cost_price, 0)) -- Total costs
) * 100.0 /
NULLIF(SUM(o.quantity * o.price), 0) -- Divide by gross revenue
)
)
ELSE 0
END as avg_margin
FROM filtered_products p
LEFT JOIN orders o ON p.pid = o.pid AND o.canceled = false
WHERE o.date >= CURRENT_DATE - INTERVAL '12 months'
GROUP BY p.brand, EXTRACT(YEAR FROM o.date::timestamp with time zone), EXTRACT(MONTH FROM o.date::timestamp with time zone)
)
SELECT *
FROM monthly_metrics
ON CONFLICT (brand, year, month) DO UPDATE
SET
product_count = EXCLUDED.product_count,
active_products = EXCLUDED.active_products,
total_stock_units = EXCLUDED.total_stock_units,
total_stock_cost = EXCLUDED.total_stock_cost,
total_stock_retail = EXCLUDED.total_stock_retail,
total_revenue = EXCLUDED.total_revenue,
avg_margin = EXCLUDED.avg_margin
`);
processedCount = Math.floor(totalProducts * 0.99);
outputProgress({
status: 'running',
operation: 'Brand time-based metrics calculated',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
// If we get here, everything completed successfully
success = true;
// Update calculate_status
await connection.query(`
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
VALUES ('brand_metrics', NOW())
ON CONFLICT (module_name) DO UPDATE
SET last_calculation_timestamp = NOW()
`);
return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders: 0,
success
};
} catch (error) {
success = false;
logError(error, 'Error calculating brand metrics');
throw error;
} finally {
if (connection) {
connection.release();
}
}
}
module.exports = calculateBrandMetrics;

View File

@@ -0,0 +1,554 @@
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress');
const { getConnection } = require('./utils/db');
async function calculateCategoryMetrics(startTime, totalProducts, processedCount = 0, isCancelled = false) {
const connection = await getConnection();
let success = false;
let processedOrders = 0;
try {
if (isCancelled) {
outputProgress({
status: 'cancelled',
operation: 'Category metrics calculation cancelled',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: null,
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
return {
processedProducts: processedCount,
processedOrders: 0,
processedPurchaseOrders: 0,
success
};
}
// Get order count that will be processed
const orderCount = await connection.query(`
SELECT COUNT(*) as count
FROM orders o
WHERE o.canceled = false
`);
processedOrders = parseInt(orderCount.rows[0].count);
outputProgress({
status: 'running',
operation: 'Starting category metrics calculation',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
// First, calculate base category metrics
await connection.query(`
INSERT INTO category_metrics (
category_id,
product_count,
active_products,
total_value,
status,
last_calculated_at
)
SELECT
c.cat_id,
COUNT(DISTINCT p.pid) as product_count,
COUNT(DISTINCT CASE WHEN p.visible = true THEN p.pid END) as active_products,
COALESCE(SUM(p.stock_quantity * p.cost_price), 0) as total_value,
c.status,
NOW() as last_calculated_at
FROM categories c
LEFT JOIN product_categories pc ON c.cat_id = pc.cat_id
LEFT JOIN products p ON pc.pid = p.pid
GROUP BY c.cat_id, c.status
ON CONFLICT (category_id) DO UPDATE
SET
product_count = EXCLUDED.product_count,
active_products = EXCLUDED.active_products,
total_value = EXCLUDED.total_value,
status = EXCLUDED.status,
last_calculated_at = EXCLUDED.last_calculated_at
`);
processedCount = Math.floor(totalProducts * 0.90);
outputProgress({
status: 'running',
operation: 'Base category metrics calculated, updating with margin data',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (isCancelled) return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders: 0,
success
};
// Then update with margin and turnover data
await connection.query(`
WITH category_sales AS (
SELECT
pc.cat_id,
SUM(o.quantity * o.price) as total_sales,
SUM(o.quantity * (o.price - p.cost_price)) as total_margin,
SUM(o.quantity) as units_sold,
AVG(GREATEST(p.stock_quantity, 0)) as avg_stock,
COUNT(DISTINCT DATE(o.date)) as active_days
FROM product_categories pc
JOIN products p ON pc.pid = p.pid
JOIN orders o ON p.pid = o.pid
LEFT JOIN turnover_config tc ON
(tc.category_id = pc.cat_id AND tc.vendor = p.vendor) OR
(tc.category_id = pc.cat_id AND tc.vendor IS NULL) OR
(tc.category_id IS NULL AND tc.vendor = p.vendor) OR
(tc.category_id IS NULL AND tc.vendor IS NULL)
WHERE o.canceled = false
AND o.date >= CURRENT_DATE - (COALESCE(tc.calculation_period_days, 30) || ' days')::INTERVAL
GROUP BY pc.cat_id
)
UPDATE category_metrics
SET
avg_margin = COALESCE(cs.total_margin * 100.0 / NULLIF(cs.total_sales, 0), 0),
turnover_rate = CASE
WHEN cs.avg_stock > 0 AND cs.active_days > 0
THEN LEAST(
(cs.units_sold / cs.avg_stock) * (365.0 / cs.active_days),
999.99
)
ELSE 0
END,
last_calculated_at = NOW()
FROM category_sales cs
WHERE category_id = cs.cat_id
`);
processedCount = Math.floor(totalProducts * 0.95);
outputProgress({
status: 'running',
operation: 'Margin data updated, calculating growth rates',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (isCancelled) return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders: 0,
success
};
// Finally update growth rates
await connection.query(`
WITH current_period AS (
SELECT
pc.cat_id,
SUM(o.quantity * (o.price - COALESCE(o.discount, 0)) /
(1 + COALESCE(ss.seasonality_factor, 0))) as revenue,
SUM(o.quantity * (o.price - COALESCE(o.discount, 0) - p.cost_price)) as gross_profit,
COUNT(DISTINCT DATE(o.date)) as days
FROM product_categories pc
JOIN products p ON pc.pid = p.pid
JOIN orders o ON p.pid = o.pid
LEFT JOIN sales_seasonality ss ON EXTRACT(MONTH FROM o.date) = ss.month
WHERE o.canceled = false
AND o.date >= CURRENT_DATE - INTERVAL '3 months'
GROUP BY pc.cat_id
),
previous_period AS (
SELECT
pc.cat_id,
SUM(o.quantity * (o.price - COALESCE(o.discount, 0)) /
(1 + COALESCE(ss.seasonality_factor, 0))) as revenue,
COUNT(DISTINCT DATE(o.date)) as days
FROM product_categories pc
JOIN products p ON pc.pid = p.pid
JOIN orders o ON p.pid = o.pid
LEFT JOIN sales_seasonality ss ON EXTRACT(MONTH FROM o.date) = ss.month
WHERE o.canceled = false
AND o.date BETWEEN CURRENT_DATE - INTERVAL '15 months'
AND CURRENT_DATE - INTERVAL '12 months'
GROUP BY pc.cat_id
),
trend_data AS (
SELECT
pc.cat_id,
EXTRACT(MONTH FROM o.date) as month,
SUM(o.quantity * (o.price - COALESCE(o.discount, 0)) /
(1 + COALESCE(ss.seasonality_factor, 0))) as revenue,
COUNT(DISTINCT DATE(o.date)) as days_in_month
FROM product_categories pc
JOIN products p ON pc.pid = p.pid
JOIN orders o ON p.pid = o.pid
LEFT JOIN sales_seasonality ss ON EXTRACT(MONTH FROM o.date) = ss.month
WHERE o.canceled = false
AND o.date >= CURRENT_DATE - INTERVAL '15 months'
GROUP BY pc.cat_id, EXTRACT(MONTH FROM o.date)
),
trend_stats AS (
SELECT
cat_id,
COUNT(*) as n,
AVG(month) as avg_x,
AVG(revenue / NULLIF(days_in_month, 0)) as avg_y,
SUM(month * (revenue / NULLIF(days_in_month, 0))) as sum_xy,
SUM(month * month) as sum_xx
FROM trend_data
GROUP BY cat_id
HAVING COUNT(*) >= 6
),
trend_analysis AS (
SELECT
cat_id,
((n * sum_xy) - (avg_x * n * avg_y)) /
NULLIF((n * sum_xx) - (n * avg_x * avg_x), 0) as trend_slope,
avg_y as avg_daily_revenue
FROM trend_stats
),
margin_calc AS (
SELECT
pc.cat_id,
CASE
WHEN SUM(o.quantity * o.price) > 0 THEN
GREATEST(
-100.0,
LEAST(
100.0,
(
SUM(o.quantity * o.price) - -- Use gross revenue (before discounts)
SUM(o.quantity * COALESCE(p.cost_price, 0)) -- Total costs
) * 100.0 /
NULLIF(SUM(o.quantity * o.price), 0) -- Divide by gross revenue
)
)
ELSE NULL
END as avg_margin
FROM product_categories pc
JOIN products p ON pc.pid = p.pid
JOIN orders o ON p.pid = o.pid
WHERE o.canceled = false
AND o.date >= CURRENT_DATE - INTERVAL '3 months'
GROUP BY pc.cat_id
),
combined_metrics AS (
SELECT
COALESCE(cp.cat_id, pp.cat_id) as category_id,
CASE
WHEN pp.revenue = 0 AND COALESCE(cp.revenue, 0) > 0 THEN 100.0
WHEN pp.revenue = 0 OR cp.revenue IS NULL THEN 0.0
WHEN ta.trend_slope IS NOT NULL THEN
GREATEST(
-100.0,
LEAST(
(ta.trend_slope / NULLIF(ta.avg_daily_revenue, 0)) * 365 * 100,
999.99
)
)
ELSE
GREATEST(
-100.0,
LEAST(
((COALESCE(cp.revenue, 0) - pp.revenue) /
NULLIF(ABS(pp.revenue), 0)) * 100.0,
999.99
)
)
END as growth_rate,
mc.avg_margin
FROM current_period cp
FULL OUTER JOIN previous_period pp ON cp.cat_id = pp.cat_id
LEFT JOIN trend_analysis ta ON COALESCE(cp.cat_id, pp.cat_id) = ta.cat_id
LEFT JOIN margin_calc mc ON COALESCE(cp.cat_id, pp.cat_id) = mc.cat_id
)
UPDATE category_metrics cm
SET
growth_rate = CASE
WHEN pp.revenue = 0 AND COALESCE(cp.revenue, 0) > 0 THEN 100.0
WHEN pp.revenue = 0 OR cp.revenue IS NULL THEN 0.0
WHEN ta.trend_slope IS NOT NULL THEN
GREATEST(
-100.0,
LEAST(
(ta.trend_slope / NULLIF(ta.avg_daily_revenue, 0)) * 365 * 100,
999.99
)
)
ELSE
GREATEST(
-100.0,
LEAST(
((COALESCE(cp.revenue, 0) - pp.revenue) /
NULLIF(ABS(pp.revenue), 0)) * 100.0,
999.99
)
)
END,
avg_margin = COALESCE(mc.avg_margin, cm.avg_margin),
last_calculated_at = NOW()
FROM current_period cp
FULL OUTER JOIN previous_period pp ON cp.cat_id = pp.cat_id
LEFT JOIN trend_analysis ta ON COALESCE(cp.cat_id, pp.cat_id) = ta.cat_id
LEFT JOIN margin_calc mc ON COALESCE(cp.cat_id, pp.cat_id) = mc.cat_id
WHERE cm.category_id = COALESCE(cp.cat_id, pp.cat_id)
`);
processedCount = Math.floor(totalProducts * 0.97);
outputProgress({
status: 'running',
operation: 'Growth rates calculated, updating time-based metrics',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (isCancelled) return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders: 0,
success
};
// Calculate time-based metrics
await connection.query(`
INSERT INTO category_time_metrics (
category_id,
year,
month,
product_count,
active_products,
total_value,
total_revenue,
avg_margin,
turnover_rate
)
SELECT
pc.cat_id,
EXTRACT(YEAR FROM o.date::timestamp with time zone) as year,
EXTRACT(MONTH FROM o.date::timestamp with time zone) as month,
COUNT(DISTINCT p.pid) as product_count,
COUNT(DISTINCT CASE WHEN p.visible = true THEN p.pid END) as active_products,
SUM(p.stock_quantity * p.cost_price) as total_value,
SUM(o.quantity * o.price) as total_revenue,
CASE
WHEN SUM(o.quantity * o.price) > 0 THEN
LEAST(
GREATEST(
SUM(o.quantity * (o.price - GREATEST(p.cost_price, 0))) * 100.0 /
SUM(o.quantity * o.price),
-100
),
100
)
ELSE 0
END as avg_margin,
COALESCE(
LEAST(
SUM(o.quantity) / NULLIF(AVG(GREATEST(p.stock_quantity, 0)), 0),
999.99
),
0
) as turnover_rate
FROM product_categories pc
JOIN products p ON pc.pid = p.pid
JOIN orders o ON p.pid = o.pid
WHERE o.canceled = false
AND o.date >= CURRENT_DATE - INTERVAL '12 months'
GROUP BY pc.cat_id, EXTRACT(YEAR FROM o.date::timestamp with time zone), EXTRACT(MONTH FROM o.date::timestamp with time zone)
ON CONFLICT (category_id, year, month) DO UPDATE
SET
product_count = EXCLUDED.product_count,
active_products = EXCLUDED.active_products,
total_value = EXCLUDED.total_value,
total_revenue = EXCLUDED.total_revenue,
avg_margin = EXCLUDED.avg_margin,
turnover_rate = EXCLUDED.turnover_rate
`);
processedCount = Math.floor(totalProducts * 0.99);
outputProgress({
status: 'running',
operation: 'Time-based metrics calculated, updating category-sales metrics',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (isCancelled) return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders: 0,
success
};
// Calculate category-sales metrics
await connection.query(`
INSERT INTO category_sales_metrics (
category_id,
brand,
period_start,
period_end,
avg_daily_sales,
total_sold,
num_products,
avg_price,
last_calculated_at
)
WITH date_ranges AS (
SELECT
CURRENT_DATE - INTERVAL '30 days' as period_start,
CURRENT_DATE as period_end
UNION ALL
SELECT
CURRENT_DATE - INTERVAL '90 days',
CURRENT_DATE - INTERVAL '31 days'
UNION ALL
SELECT
CURRENT_DATE - INTERVAL '180 days',
CURRENT_DATE - INTERVAL '91 days'
UNION ALL
SELECT
CURRENT_DATE - INTERVAL '365 days',
CURRENT_DATE - INTERVAL '181 days'
),
sales_data AS (
SELECT
pc.cat_id,
COALESCE(p.brand, 'Unknown') as brand,
dr.period_start,
dr.period_end,
COUNT(DISTINCT p.pid) as num_products,
SUM(o.quantity) as total_sold,
SUM(o.quantity * o.price) as total_revenue,
COUNT(DISTINCT DATE(o.date)) as num_days
FROM products p
JOIN product_categories pc ON p.pid = pc.pid
JOIN orders o ON p.pid = o.pid
CROSS JOIN date_ranges dr
WHERE o.canceled = false
AND o.date BETWEEN dr.period_start AND dr.period_end
GROUP BY pc.cat_id, p.brand, dr.period_start, dr.period_end
)
SELECT
cat_id as category_id,
brand,
period_start,
period_end,
CASE
WHEN num_days > 0
THEN total_sold / num_days
ELSE 0
END as avg_daily_sales,
total_sold,
num_products,
CASE
WHEN total_sold > 0
THEN total_revenue / total_sold
ELSE 0
END as avg_price,
NOW() as last_calculated_at
FROM sales_data
ON CONFLICT (category_id, brand, period_start, period_end) DO UPDATE
SET
avg_daily_sales = EXCLUDED.avg_daily_sales,
total_sold = EXCLUDED.total_sold,
num_products = EXCLUDED.num_products,
avg_price = EXCLUDED.avg_price,
last_calculated_at = EXCLUDED.last_calculated_at
`);
processedCount = Math.floor(totalProducts * 1.0);
outputProgress({
status: 'running',
operation: 'Category-sales metrics calculated',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
// If we get here, everything completed successfully
success = true;
// Update calculate_status
await connection.query(`
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
VALUES ('category_metrics', NOW())
ON CONFLICT (module_name) DO UPDATE
SET last_calculation_timestamp = NOW()
`);
return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders: 0,
success
};
} catch (error) {
success = false;
logError(error, 'Error calculating category metrics');
throw error;
} finally {
if (connection) {
connection.release();
}
}
}
module.exports = calculateCategoryMetrics;

View File

@@ -0,0 +1,214 @@
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress');
const { getConnection } = require('./utils/db');
async function calculateFinancialMetrics(startTime, totalProducts, processedCount = 0, isCancelled = false) {
const connection = await getConnection();
let success = false;
let processedOrders = 0;
try {
if (isCancelled) {
outputProgress({
status: 'cancelled',
operation: 'Financial metrics calculation cancelled',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: null,
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
return {
processedProducts: processedCount,
processedOrders: 0,
processedPurchaseOrders: 0,
success
};
}
// Get order count that will be processed
const orderCount = await connection.query(`
SELECT COUNT(*) as count
FROM orders o
WHERE o.canceled = false
AND DATE(o.date) >= CURRENT_DATE - INTERVAL '12 months'
`);
processedOrders = parseInt(orderCount.rows[0].count);
outputProgress({
status: 'running',
operation: 'Starting financial metrics calculation',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
// First, calculate beginning inventory values (12 months ago)
await connection.query(`
CREATE TEMPORARY TABLE IF NOT EXISTS temp_beginning_inventory AS
WITH beginning_inventory_calc AS (
SELECT
p.pid,
p.stock_quantity as current_quantity,
COALESCE(SUM(o.quantity), 0) as sold_quantity,
COALESCE(SUM(po.received), 0) as received_quantity,
GREATEST(0, (p.stock_quantity + COALESCE(SUM(o.quantity), 0) - COALESCE(SUM(po.received), 0))) as beginning_quantity,
p.cost_price
FROM
products p
LEFT JOIN
orders o ON p.pid = o.pid
AND o.canceled = false
AND o.date >= CURRENT_DATE - INTERVAL '12 months'::interval
LEFT JOIN
purchase_orders po ON p.pid = po.pid
AND po.received_date IS NOT NULL
AND po.received_date >= CURRENT_DATE - INTERVAL '12 months'::interval
GROUP BY
p.pid, p.stock_quantity, p.cost_price
)
SELECT
pid,
beginning_quantity,
beginning_quantity * cost_price as beginning_value,
current_quantity * cost_price as current_value,
((beginning_quantity * cost_price) + (current_quantity * cost_price)) / 2 as average_inventory_value
FROM
beginning_inventory_calc
`);
processedCount = Math.floor(totalProducts * 0.60);
outputProgress({
status: 'running',
operation: 'Beginning inventory values calculated, computing financial metrics',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
// Calculate financial metrics with optimized query and standard formulas
await connection.query(`
WITH product_financials AS (
SELECT
p.pid,
COALESCE(bi.average_inventory_value, p.cost_price * p.stock_quantity) as avg_inventory_value,
p.cost_price * p.stock_quantity as current_inventory_value,
SUM(o.quantity * (o.price - COALESCE(o.discount, 0))) as total_revenue,
SUM(o.quantity * COALESCE(o.costeach, 0)) as cost_of_goods_sold,
SUM(o.quantity * (o.price - COALESCE(o.discount, 0) - COALESCE(o.costeach, 0))) as gross_profit,
MIN(o.date) as first_sale_date,
MAX(o.date) as last_sale_date,
EXTRACT(DAY FROM (MAX(o.date)::timestamp with time zone - MIN(o.date)::timestamp with time zone)) + 1 as calculation_period_days,
COUNT(DISTINCT DATE(o.date)) as active_days
FROM products p
LEFT JOIN orders o ON p.pid = o.pid
LEFT JOIN temp_beginning_inventory bi ON p.pid = bi.pid
WHERE o.canceled = false
AND DATE(o.date) >= CURRENT_DATE - INTERVAL '12 months'::interval
GROUP BY p.pid, p.cost_price, p.stock_quantity, bi.average_inventory_value
)
UPDATE product_metrics pm
SET
inventory_value = COALESCE(pf.current_inventory_value, 0)::decimal(10,3),
total_revenue = COALESCE(pf.total_revenue, 0)::decimal(10,3),
cost_of_goods_sold = COALESCE(pf.cost_of_goods_sold, 0)::decimal(10,3),
gross_profit = COALESCE(pf.gross_profit, 0)::decimal(10,3),
turnover_rate = CASE
WHEN COALESCE(pf.avg_inventory_value, 0) > 0 THEN
COALESCE(pf.cost_of_goods_sold, 0) / NULLIF(pf.avg_inventory_value, 0)
ELSE 0
END::decimal(12,3),
gmroi = CASE
WHEN COALESCE(pf.avg_inventory_value, 0) > 0 THEN
COALESCE(pf.gross_profit, 0) / NULLIF(pf.avg_inventory_value, 0)
ELSE 0
END::decimal(10,3),
last_calculated_at = CURRENT_TIMESTAMP
FROM product_financials pf
WHERE pm.pid = pf.pid
`);
processedCount = Math.floor(totalProducts * 0.65);
outputProgress({
status: 'running',
operation: 'Base financial metrics calculated, updating time aggregates',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (isCancelled) return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders: 0,
success
};
// Clean up temporary tables
await connection.query('DROP TABLE IF EXISTS temp_beginning_inventory');
// If we get here, everything completed successfully
success = true;
// Update calculate_status
await connection.query(`
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
VALUES ('financial_metrics', NOW())
ON CONFLICT (module_name) DO UPDATE
SET last_calculation_timestamp = NOW()
`);
return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders: 0,
success
};
} catch (error) {
success = false;
logError(error, 'Error calculating financial metrics');
throw error;
} finally {
if (connection) {
try {
// Make sure temporary tables are always cleaned up
await connection.query('DROP TABLE IF EXISTS temp_beginning_inventory');
} catch (err) {
console.error('Error cleaning up temp tables:', err);
}
connection.release();
}
}
}
module.exports = calculateFinancialMetrics;

View File

@@ -0,0 +1,736 @@
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress');
const { getConnection } = require('./utils/db');
// Helper function to handle NaN and undefined values
function sanitizeValue(value) {
if (value === undefined || value === null || Number.isNaN(value)) {
return null;
}
return value;
}
async function calculateProductMetrics(startTime, totalProducts, processedCount = 0, isCancelled = false) {
let connection;
let success = false;
let processedOrders = 0;
const BATCH_SIZE = 5000;
try {
connection = await getConnection();
// Skip flags are inherited from the parent scope
const SKIP_PRODUCT_BASE_METRICS = 0;
const SKIP_PRODUCT_TIME_AGGREGATES = 0;
// Get total product count if not provided
if (!totalProducts) {
const productCount = await connection.query('SELECT COUNT(*) as count FROM products');
totalProducts = parseInt(productCount.rows[0].count);
}
if (isCancelled) {
outputProgress({
status: 'cancelled',
operation: 'Product metrics calculation cancelled',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: null,
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders: 0,
success
};
}
// First ensure all products have a metrics record
await connection.query(`
INSERT INTO product_metrics (pid, last_calculated_at)
SELECT pid, NOW()
FROM products
ON CONFLICT (pid) DO NOTHING
`);
// Get threshold settings once
const thresholds = await connection.query(`
SELECT critical_days, reorder_days, overstock_days, low_stock_threshold
FROM stock_thresholds
WHERE category_id IS NULL AND vendor IS NULL
LIMIT 1
`);
// Check if threshold data was returned
if (!thresholds.rows || thresholds.rows.length === 0) {
console.warn('No default thresholds found in the database. Using explicit type casting in the query.');
}
const defaultThresholds = thresholds.rows[0];
// Get financial calculation configuration parameters
const financialConfig = await connection.query(`
SELECT
order_cost,
holding_rate,
service_level_z_score,
min_reorder_qty,
default_reorder_qty,
default_safety_stock
FROM financial_calc_config
WHERE id = 1
LIMIT 1
`);
const finConfig = financialConfig.rows[0] || {
order_cost: 25.00,
holding_rate: 0.25,
service_level_z_score: 1.96,
min_reorder_qty: 1,
default_reorder_qty: 5,
default_safety_stock: 5
};
// Calculate base product metrics
if (!SKIP_PRODUCT_BASE_METRICS) {
outputProgress({
status: 'running',
operation: 'Starting base product metrics calculation',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
// Get order count that will be processed
const orderCount = await connection.query(`
SELECT COUNT(*) as count
FROM orders o
WHERE o.canceled = false
`);
processedOrders = parseInt(orderCount.rows[0].count);
// Clear temporary tables
await connection.query('DROP TABLE IF EXISTS temp_sales_metrics');
await connection.query('DROP TABLE IF EXISTS temp_purchase_metrics');
// Create temp_sales_metrics
await connection.query(`
CREATE TEMPORARY TABLE temp_sales_metrics (
pid BIGINT NOT NULL,
daily_sales_avg DECIMAL(10,3),
weekly_sales_avg DECIMAL(10,3),
monthly_sales_avg DECIMAL(10,3),
total_revenue DECIMAL(10,3),
avg_margin_percent DECIMAL(10,3),
first_sale_date DATE,
last_sale_date DATE,
stddev_daily_sales DECIMAL(10,3),
PRIMARY KEY (pid)
)
`);
// Create temp_purchase_metrics
await connection.query(`
CREATE TEMPORARY TABLE temp_purchase_metrics (
pid BIGINT NOT NULL,
avg_lead_time_days DECIMAL(10,2),
last_purchase_date DATE,
first_received_date DATE,
last_received_date DATE,
stddev_lead_time_days DECIMAL(10,2),
PRIMARY KEY (pid)
)
`);
// Populate temp_sales_metrics with base stats and sales averages
await connection.query(`
INSERT INTO temp_sales_metrics
SELECT
p.pid,
COALESCE(SUM(o.quantity) / NULLIF(COUNT(DISTINCT DATE(o.date)), 0), 0) as daily_sales_avg,
COALESCE(SUM(o.quantity) / NULLIF(CEIL(COUNT(DISTINCT DATE(o.date)) / 7), 0), 0) as weekly_sales_avg,
COALESCE(SUM(o.quantity) / NULLIF(CEIL(COUNT(DISTINCT DATE(o.date)) / 30), 0), 0) as monthly_sales_avg,
COALESCE(SUM(o.quantity * o.price), 0) as total_revenue,
CASE
WHEN SUM(o.quantity * o.price) > 0
THEN ((SUM(o.quantity * o.price) - SUM(o.quantity * p.cost_price)) / SUM(o.quantity * o.price)) * 100
ELSE 0
END as avg_margin_percent,
MIN(o.date) as first_sale_date,
MAX(o.date) as last_sale_date,
COALESCE(STDDEV_SAMP(daily_qty.quantity), 0) as stddev_daily_sales
FROM products p
LEFT JOIN orders o ON p.pid = o.pid
AND o.canceled = false
AND o.date >= CURRENT_DATE - INTERVAL '90 days'
LEFT JOIN (
SELECT
pid,
DATE(date) as sale_date,
SUM(quantity) as quantity
FROM orders
WHERE canceled = false
AND date >= CURRENT_DATE - INTERVAL '90 days'
GROUP BY pid, DATE(date)
) daily_qty ON p.pid = daily_qty.pid
GROUP BY p.pid
`);
// Populate temp_purchase_metrics with timeout protection
await Promise.race([
connection.query(`
INSERT INTO temp_purchase_metrics
SELECT
p.pid,
AVG(
CASE
WHEN po.received_date IS NOT NULL AND po.date IS NOT NULL
THEN EXTRACT(EPOCH FROM (po.received_date::timestamp with time zone - po.date::timestamp with time zone)) / 86400.0
ELSE NULL
END
) as avg_lead_time_days,
MAX(po.date) as last_purchase_date,
MIN(po.received_date) as first_received_date,
MAX(po.received_date) as last_received_date,
STDDEV_SAMP(
CASE
WHEN po.received_date IS NOT NULL AND po.date IS NOT NULL
THEN EXTRACT(EPOCH FROM (po.received_date::timestamp with time zone - po.date::timestamp with time zone)) / 86400.0
ELSE NULL
END
) as stddev_lead_time_days
FROM products p
LEFT JOIN purchase_orders po ON p.pid = po.pid
AND po.received_date IS NOT NULL
AND po.date IS NOT NULL
AND po.date >= CURRENT_DATE - INTERVAL '365 days'
GROUP BY p.pid
`),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Timeout: temp_purchase_metrics query took too long')), 60000)
)
]).catch(async (err) => {
logError(err, 'Error populating temp_purchase_metrics, continuing with empty table');
// Create an empty fallback to continue processing
await connection.query(`
INSERT INTO temp_purchase_metrics
SELECT
p.pid,
30.0 as avg_lead_time_days,
NULL as last_purchase_date,
NULL as first_received_date,
NULL as last_received_date,
0.0 as stddev_lead_time_days
FROM products p
LEFT JOIN temp_purchase_metrics tpm ON p.pid = tpm.pid
WHERE tpm.pid IS NULL
`);
});
// Process updates in batches
let lastPid = 0;
let batchCount = 0;
const MAX_BATCHES = 1000; // Safety limit for number of batches to prevent infinite loops
while (batchCount < MAX_BATCHES) {
if (isCancelled) break;
batchCount++;
const batch = await connection.query(
'SELECT pid FROM products WHERE pid > $1 ORDER BY pid LIMIT $2',
[lastPid, BATCH_SIZE]
);
if (batch.rows.length === 0) break;
// Process the entire batch in a single efficient query
const lowStockThreshold = parseInt(defaultThresholds?.low_stock_threshold) || 5;
const criticalDays = parseInt(defaultThresholds?.critical_days) || 7;
const reorderDays = parseInt(defaultThresholds?.reorder_days) || 14;
const overstockDays = parseInt(defaultThresholds?.overstock_days) || 90;
const serviceLevel = parseFloat(finConfig?.service_level_z_score) || 1.96;
const defaultSafetyStock = parseInt(finConfig?.default_safety_stock) || 5;
const defaultReorderQty = parseInt(finConfig?.default_reorder_qty) || 5;
const orderCost = parseFloat(finConfig?.order_cost) || 25.00;
const holdingRate = parseFloat(finConfig?.holding_rate) || 0.25;
const minReorderQty = parseInt(finConfig?.min_reorder_qty) || 1;
await connection.query(`
UPDATE product_metrics pm
SET
inventory_value = p.stock_quantity * NULLIF(p.cost_price, 0),
daily_sales_avg = COALESCE(sm.daily_sales_avg, 0),
weekly_sales_avg = COALESCE(sm.weekly_sales_avg, 0),
monthly_sales_avg = COALESCE(sm.monthly_sales_avg, 0),
total_revenue = COALESCE(sm.total_revenue, 0),
avg_margin_percent = COALESCE(sm.avg_margin_percent, 0),
first_sale_date = sm.first_sale_date,
last_sale_date = sm.last_sale_date,
avg_lead_time_days = COALESCE(lm.avg_lead_time_days, 30.0),
days_of_inventory = CASE
WHEN COALESCE(sm.daily_sales_avg, 0) > 0
THEN FLOOR(p.stock_quantity / NULLIF(sm.daily_sales_avg, 0))
ELSE NULL
END,
weeks_of_inventory = CASE
WHEN COALESCE(sm.weekly_sales_avg, 0) > 0
THEN FLOOR(p.stock_quantity / NULLIF(sm.weekly_sales_avg, 0))
ELSE NULL
END,
stock_status = CASE
WHEN p.stock_quantity <= 0 THEN 'Out of Stock'
WHEN COALESCE(sm.daily_sales_avg, 0) = 0 AND p.stock_quantity <= ${lowStockThreshold} THEN 'Low Stock'
WHEN COALESCE(sm.daily_sales_avg, 0) = 0 THEN 'In Stock'
WHEN p.stock_quantity / NULLIF(sm.daily_sales_avg, 0) <= ${criticalDays} THEN 'Critical'
WHEN p.stock_quantity / NULLIF(sm.daily_sales_avg, 0) <= ${reorderDays} THEN 'Reorder'
WHEN p.stock_quantity / NULLIF(sm.daily_sales_avg, 0) > ${overstockDays} THEN 'Overstocked'
ELSE 'Healthy'
END,
safety_stock = CASE
WHEN COALESCE(sm.daily_sales_avg, 0) > 0 AND COALESCE(lm.avg_lead_time_days, 0) > 0 THEN
CEIL(
${serviceLevel} * SQRT(
GREATEST(0, COALESCE(lm.avg_lead_time_days, 0)) * POWER(COALESCE(sm.stddev_daily_sales, 0), 2) +
POWER(COALESCE(sm.daily_sales_avg, 0), 2) * POWER(COALESCE(lm.stddev_lead_time_days, 0), 2)
)
)
ELSE ${defaultSafetyStock}
END,
reorder_point = CASE
WHEN COALESCE(sm.daily_sales_avg, 0) > 0 THEN
CEIL(sm.daily_sales_avg * GREATEST(0, COALESCE(lm.avg_lead_time_days, 30.0))) +
(CASE
WHEN COALESCE(sm.daily_sales_avg, 0) > 0 AND COALESCE(lm.avg_lead_time_days, 0) > 0 THEN
CEIL(
${serviceLevel} * SQRT(
GREATEST(0, COALESCE(lm.avg_lead_time_days, 0)) * POWER(COALESCE(sm.stddev_daily_sales, 0), 2) +
POWER(COALESCE(sm.daily_sales_avg, 0), 2) * POWER(COALESCE(lm.stddev_lead_time_days, 0), 2)
)
)
ELSE ${defaultSafetyStock}
END)
ELSE ${lowStockThreshold}
END,
reorder_qty = CASE
WHEN COALESCE(sm.daily_sales_avg, 0) > 0 AND NULLIF(p.cost_price, 0) IS NOT NULL AND NULLIF(p.cost_price, 0) > 0 THEN
GREATEST(
CEIL(SQRT(
(2 * (sm.daily_sales_avg * 365) * ${orderCost}) /
NULLIF(p.cost_price * ${holdingRate}, 0)
)),
${minReorderQty}
)
ELSE ${defaultReorderQty}
END,
overstocked_amt = CASE
WHEN p.stock_quantity / NULLIF(sm.daily_sales_avg, 0) > ${overstockDays}
THEN GREATEST(0, p.stock_quantity - CEIL(sm.daily_sales_avg * ${overstockDays}))
ELSE 0
END,
last_calculated_at = NOW()
FROM products p
LEFT JOIN temp_sales_metrics sm ON p.pid = sm.pid
LEFT JOIN temp_purchase_metrics lm ON p.pid = lm.pid
WHERE p.pid = ANY($1::BIGINT[])
AND pm.pid = p.pid
`, [batch.rows.map(row => row.pid)]);
lastPid = batch.rows[batch.rows.length - 1].pid;
processedCount += batch.rows.length;
outputProgress({
status: 'running',
operation: 'Processing base metrics batch',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
}
// Add safety check if the loop processed MAX_BATCHES
if (batchCount >= MAX_BATCHES) {
logError(new Error(`Reached maximum batch count (${MAX_BATCHES}). Process may have entered an infinite loop.`), 'Batch processing safety limit reached');
}
}
// Calculate forecast accuracy and bias in batches
let forecastPid = 0;
while (true) {
if (isCancelled) break;
const forecastBatch = await connection.query(
'SELECT pid FROM products WHERE pid > $1 ORDER BY pid LIMIT $2',
[forecastPid, BATCH_SIZE]
);
if (forecastBatch.rows.length === 0) break;
const forecastPidArray = forecastBatch.rows.map(row => row.pid);
// Use array_to_string to convert the array to a string of comma-separated values
await connection.query(`
WITH forecast_metrics AS (
SELECT
sf.pid,
AVG(CASE
WHEN o.quantity > 0
THEN ABS(sf.forecast_quantity - o.quantity) / o.quantity * 100
ELSE 100
END) as avg_forecast_error,
AVG(CASE
WHEN o.quantity > 0
THEN (sf.forecast_quantity - o.quantity) / o.quantity * 100
ELSE 0
END) as avg_forecast_bias,
MAX(sf.forecast_date) as last_forecast_date
FROM sales_forecasts sf
JOIN orders o ON sf.pid = o.pid
AND DATE(o.date) = sf.forecast_date
WHERE o.canceled = false
AND sf.forecast_date >= CURRENT_DATE - INTERVAL '90 days'
AND sf.pid = ANY('{${forecastPidArray.join(',')}}'::BIGINT[])
GROUP BY sf.pid
)
UPDATE product_metrics pm
SET
forecast_accuracy = GREATEST(0, 100 - LEAST(fm.avg_forecast_error, 100)),
forecast_bias = GREATEST(-100, LEAST(fm.avg_forecast_bias, 100)),
last_forecast_date = fm.last_forecast_date,
last_calculated_at = NOW()
FROM forecast_metrics fm
WHERE pm.pid = fm.pid
`);
forecastPid = forecastBatch.rows[forecastBatch.rows.length - 1].pid;
}
// Calculate product time aggregates
if (!SKIP_PRODUCT_TIME_AGGREGATES) {
outputProgress({
status: 'running',
operation: 'Starting product time aggregates calculation',
current: processedCount || 0,
total: totalProducts || 0,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount || 0, totalProducts || 0),
rate: calculateRate(startTime, processedCount || 0),
percentage: (((processedCount || 0) / (totalProducts || 1)) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
// Note: The time-aggregates calculation has been moved to time-aggregates.js
// This module will not duplicate that functionality
processedCount = Math.floor(totalProducts * 0.6);
outputProgress({
status: 'running',
operation: 'Product time aggregates calculation delegated to time-aggregates module',
current: processedCount || 0,
total: totalProducts || 0,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount || 0, totalProducts || 0),
rate: calculateRate(startTime, processedCount || 0),
percentage: (((processedCount || 0) / (totalProducts || 1)) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
} else {
processedCount = Math.floor(totalProducts * 0.6);
outputProgress({
status: 'running',
operation: 'Skipping product time aggregates calculation',
current: processedCount || 0,
total: totalProducts || 0,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount || 0, totalProducts || 0),
rate: calculateRate(startTime, processedCount || 0),
percentage: (((processedCount || 0) / (totalProducts || 1)) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
}
// Calculate ABC classification
outputProgress({
status: 'running',
operation: 'Starting ABC classification',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (isCancelled) return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders: 0, // This module doesn't process POs
success
};
const abcConfig = await connection.query('SELECT a_threshold, b_threshold FROM abc_classification_config WHERE id = 1');
const abcThresholds = abcConfig.rows[0] || { a_threshold: 20, b_threshold: 50 };
// Extract values and ensure they are valid numbers
const aThreshold = parseFloat(abcThresholds.a_threshold) || 20;
const bThreshold = parseFloat(abcThresholds.b_threshold) || 50;
// First, create and populate the rankings table with an index
await connection.query('DROP TABLE IF EXISTS temp_revenue_ranks');
await connection.query(`
CREATE TEMPORARY TABLE temp_revenue_ranks (
pid BIGINT NOT NULL,
total_revenue DECIMAL(10,3),
rank_num INT,
dense_rank_num INT,
percentile DECIMAL(5,2),
total_count INT,
PRIMARY KEY (pid)
)
`);
await connection.query('CREATE INDEX ON temp_revenue_ranks (rank_num)');
await connection.query('CREATE INDEX ON temp_revenue_ranks (dense_rank_num)');
await connection.query('CREATE INDEX ON temp_revenue_ranks (percentile)');
// Calculate rankings with proper tie handling
await connection.query(`
INSERT INTO temp_revenue_ranks
WITH revenue_data AS (
SELECT
pid,
total_revenue,
COUNT(*) OVER () as total_count,
PERCENT_RANK() OVER (ORDER BY total_revenue DESC) * 100 as percentile,
RANK() OVER (ORDER BY total_revenue DESC) as rank_num,
DENSE_RANK() OVER (ORDER BY total_revenue DESC) as dense_rank_num
FROM product_metrics
WHERE total_revenue > 0
)
SELECT
pid,
total_revenue,
rank_num,
dense_rank_num,
percentile,
total_count
FROM revenue_data
`);
// Get total count for percentage calculation
const rankingCount = await connection.query('SELECT MAX(rank_num) as total_count FROM temp_revenue_ranks');
const totalCount = parseInt(rankingCount.rows[0].total_count) || 1;
// Process updates in batches
let abcProcessedCount = 0;
const batchSize = 5000;
const maxPid = await connection.query('SELECT MAX(pid) as max_pid FROM products');
const maxProductId = parseInt(maxPid.rows[0].max_pid);
while (abcProcessedCount < maxProductId) {
if (isCancelled) return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders: 0,
success
};
// Get a batch of PIDs that need updating
const pids = await connection.query(`
SELECT pm.pid
FROM product_metrics pm
LEFT JOIN temp_revenue_ranks tr ON pm.pid = tr.pid
WHERE pm.pid > $1
AND (pm.abc_class IS NULL
OR pm.abc_class !=
CASE
WHEN tr.pid IS NULL THEN 'C'
WHEN tr.percentile <= ${aThreshold} THEN 'A'
WHEN tr.percentile <= ${bThreshold} THEN 'B'
ELSE 'C'
END)
ORDER BY pm.pid
LIMIT $2
`, [abcProcessedCount, batchSize]);
if (pids.rows.length === 0) break;
const pidValues = pids.rows.map(row => row.pid);
await connection.query(`
UPDATE product_metrics pm
SET abc_class =
CASE
WHEN tr.pid IS NULL THEN 'C'
WHEN tr.percentile <= ${aThreshold} THEN 'A'
WHEN tr.percentile <= ${bThreshold} THEN 'B'
ELSE 'C'
END,
last_calculated_at = NOW()
FROM (SELECT pid, percentile FROM temp_revenue_ranks) tr
WHERE pm.pid = tr.pid AND pm.pid = ANY($1::BIGINT[])
OR (pm.pid = ANY($1::BIGINT[]) AND tr.pid IS NULL)
`, [pidValues]);
// Now update turnover rate with proper handling of zero inventory periods
await connection.query(`
UPDATE product_metrics pm
SET
turnover_rate = CASE
WHEN sales.avg_nonzero_stock > 0 AND sales.active_days > 0
THEN LEAST(
(sales.total_sold / sales.avg_nonzero_stock) * (365.0 / sales.active_days),
999.99
)
ELSE 0
END,
last_calculated_at = NOW()
FROM (
SELECT
o.pid,
SUM(o.quantity) as total_sold,
COUNT(DISTINCT DATE(o.date)) as active_days,
AVG(CASE
WHEN p.stock_quantity > 0 THEN p.stock_quantity
ELSE NULL
END) as avg_nonzero_stock
FROM orders o
JOIN products p ON o.pid = p.pid
WHERE o.canceled = false
AND o.date >= CURRENT_DATE - INTERVAL '90 days'
AND o.pid = ANY($1::BIGINT[])
GROUP BY o.pid
) sales
WHERE pm.pid = sales.pid
`, [pidValues]);
abcProcessedCount = pids.rows[pids.rows.length - 1].pid;
// Calculate progress proportionally to total products
processedCount = Math.floor(totalProducts * (0.60 + (abcProcessedCount / maxProductId) * 0.2));
outputProgress({
status: 'running',
operation: 'ABC classification progress',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
}
// If we get here, everything completed successfully
success = true;
// Update calculate_status
await connection.query(`
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
VALUES ('product_metrics', NOW())
ON CONFLICT (module_name) DO UPDATE
SET last_calculation_timestamp = NOW()
`);
return {
processedProducts: processedCount || 0,
processedOrders: processedOrders || 0,
processedPurchaseOrders: 0, // This module doesn't process POs
success
};
} catch (error) {
success = false;
logError(error, 'Error calculating product metrics');
throw error;
} finally {
// Always clean up temporary tables, even if an error occurred
if (connection) {
try {
await connection.query('DROP TABLE IF EXISTS temp_sales_metrics');
await connection.query('DROP TABLE IF EXISTS temp_purchase_metrics');
} catch (err) {
console.error('Error cleaning up temporary tables:', err);
}
// Make sure to release the connection
connection.release();
}
}
}
function calculateStockStatus(stock, config, daily_sales_avg, weekly_sales_avg, monthly_sales_avg) {
if (stock <= 0) {
return 'Out of Stock';
}
// Use the most appropriate sales average based on data quality
let sales_avg = daily_sales_avg;
if (sales_avg === 0) {
sales_avg = weekly_sales_avg / 7;
}
if (sales_avg === 0) {
sales_avg = monthly_sales_avg / 30;
}
if (sales_avg === 0) {
return stock <= config.low_stock_threshold ? 'Low Stock' : 'In Stock';
}
const days_of_stock = stock / sales_avg;
if (days_of_stock <= config.critical_days) {
return 'Critical';
} else if (days_of_stock <= config.reorder_days) {
return 'Reorder';
} else if (days_of_stock > config.overstock_days) {
return 'Overstocked';
}
return 'Healthy';
}
// Note: calculateReorderQuantities function has been removed as its logic has been incorporated
// in the main SQL query with configurable parameters
module.exports = calculateProductMetrics;

View File

@@ -0,0 +1,440 @@
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress');
const { getConnection } = require('./utils/db');
async function calculateSalesForecasts(startTime, totalProducts, processedCount = 0, isCancelled = false) {
const connection = await getConnection();
let success = false;
let processedOrders = 0;
try {
if (isCancelled) {
outputProgress({
status: 'cancelled',
operation: 'Sales forecasts calculation cancelled',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: null,
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
return {
processedProducts: processedCount,
processedOrders: 0,
processedPurchaseOrders: 0,
success
};
}
// Get order count that will be processed
const orderCount = await connection.query(`
SELECT COUNT(*) as count
FROM orders o
WHERE o.canceled = false
AND o.date >= CURRENT_DATE - INTERVAL '90 days'
`);
processedOrders = parseInt(orderCount.rows[0].count);
outputProgress({
status: 'running',
operation: 'Starting sales forecasts calculation',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
// First, create a temporary table for forecast dates
await connection.query(`
CREATE TEMPORARY TABLE IF NOT EXISTS temp_forecast_dates (
forecast_date DATE,
day_of_week INT,
month INT,
PRIMARY KEY (forecast_date)
)
`);
await connection.query(`
INSERT INTO temp_forecast_dates
SELECT
CURRENT_DATE + (n || ' days')::INTERVAL as forecast_date,
EXTRACT(DOW FROM CURRENT_DATE + (n || ' days')::INTERVAL) + 1 as day_of_week,
EXTRACT(MONTH FROM CURRENT_DATE + (n || ' days')::INTERVAL) as month
FROM (
SELECT a.n + b.n * 10 as n
FROM
(SELECT 0 as n UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4 UNION
SELECT 5 UNION SELECT 6 UNION SELECT 7 UNION SELECT 8 UNION SELECT 9) a,
(SELECT 0 as n UNION SELECT 1 UNION SELECT 2) b
ORDER BY n
LIMIT 31
) numbers
`);
processedCount = Math.floor(totalProducts * 0.92);
outputProgress({
status: 'running',
operation: 'Forecast dates prepared, calculating daily sales stats',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (isCancelled) return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders: 0,
success
};
// Create temporary table for daily sales stats
await connection.query(`
CREATE TEMPORARY TABLE temp_daily_sales AS
SELECT
o.pid,
EXTRACT(DOW FROM o.date) + 1 as day_of_week,
SUM(o.quantity) as daily_quantity,
SUM(o.price * o.quantity) as daily_revenue,
COUNT(DISTINCT DATE(o.date)) as day_count
FROM orders o
WHERE o.canceled = false
AND o.date >= CURRENT_DATE - INTERVAL '90 days'
GROUP BY o.pid, EXTRACT(DOW FROM o.date) + 1
`);
processedCount = Math.floor(totalProducts * 0.94);
outputProgress({
status: 'running',
operation: 'Daily sales stats calculated, preparing product stats',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (isCancelled) return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders: 0,
success
};
// Create temporary table for product stats
await connection.query(`
CREATE TEMPORARY TABLE temp_product_stats AS
SELECT
pid,
AVG(daily_revenue) as overall_avg_revenue,
SUM(day_count) as total_days
FROM temp_daily_sales
GROUP BY pid
`);
processedCount = Math.floor(totalProducts * 0.96);
outputProgress({
status: 'running',
operation: 'Product stats prepared, calculating product-level forecasts',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (isCancelled) return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders: 0,
success
};
// Calculate product-level forecasts
await connection.query(`
INSERT INTO sales_forecasts (
pid,
forecast_date,
forecast_quantity,
confidence_level,
created_at
)
WITH daily_stats AS (
SELECT
ds.pid,
AVG(ds.daily_quantity) as avg_daily_qty,
STDDEV(ds.daily_quantity) as std_daily_qty,
COUNT(DISTINCT ds.day_count) as data_points,
SUM(ds.day_count) as total_days,
AVG(ds.daily_revenue) as avg_daily_revenue,
STDDEV(ds.daily_revenue) as std_daily_revenue,
MIN(ds.daily_quantity) as min_daily_qty,
MAX(ds.daily_quantity) as max_daily_qty,
-- Calculate variance without using LAG
COALESCE(
STDDEV(ds.daily_quantity) / NULLIF(AVG(ds.daily_quantity), 0),
0
) as daily_variance_ratio
FROM temp_daily_sales ds
GROUP BY ds.pid
HAVING AVG(ds.daily_quantity) > 0
)
SELECT
ds.pid,
fd.forecast_date,
GREATEST(0,
ROUND(
ds.avg_daily_qty *
(1 + COALESCE(sf.seasonality_factor, 0))
)
) as forecast_quantity,
CASE
WHEN ds.total_days >= 60 AND ds.daily_variance_ratio < 0.5 THEN 90
WHEN ds.total_days >= 60 THEN 85
WHEN ds.total_days >= 30 AND ds.daily_variance_ratio < 0.5 THEN 80
WHEN ds.total_days >= 30 THEN 75
WHEN ds.total_days >= 14 AND ds.daily_variance_ratio < 0.5 THEN 70
WHEN ds.total_days >= 14 THEN 65
ELSE 60
END as confidence_level,
NOW() as created_at
FROM daily_stats ds
JOIN temp_product_stats ps ON ds.pid = ps.pid
CROSS JOIN temp_forecast_dates fd
LEFT JOIN sales_seasonality sf ON fd.month = sf.month
GROUP BY ds.pid, fd.forecast_date, ps.overall_avg_revenue, sf.seasonality_factor,
ds.avg_daily_qty, ds.std_daily_qty, ds.avg_daily_qty, ds.total_days, ds.daily_variance_ratio
ON CONFLICT (pid, forecast_date) DO UPDATE
SET
forecast_quantity = EXCLUDED.forecast_quantity,
confidence_level = EXCLUDED.confidence_level,
created_at = NOW()
`);
processedCount = Math.floor(totalProducts * 0.98);
outputProgress({
status: 'running',
operation: 'Product forecasts calculated, preparing category stats',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (isCancelled) return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders: 0,
success
};
// Create temporary table for category stats
await connection.query(`
CREATE TEMPORARY TABLE temp_category_sales AS
SELECT
pc.cat_id,
EXTRACT(DOW FROM o.date) + 1 as day_of_week,
SUM(o.quantity) as daily_quantity,
SUM(o.price * o.quantity) as daily_revenue,
COUNT(DISTINCT DATE(o.date)) as day_count
FROM orders o
JOIN product_categories pc ON o.pid = pc.pid
WHERE o.canceled = false
AND o.date >= CURRENT_DATE - INTERVAL '90 days'
GROUP BY pc.cat_id, EXTRACT(DOW FROM o.date) + 1
`);
await connection.query(`
CREATE TEMPORARY TABLE temp_category_stats AS
SELECT
cat_id,
AVG(daily_revenue) as overall_avg_revenue,
SUM(day_count) as total_days
FROM temp_category_sales
GROUP BY cat_id
`);
processedCount = Math.floor(totalProducts * 0.99);
outputProgress({
status: 'running',
operation: 'Category stats prepared, calculating category-level forecasts',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (isCancelled) return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders: 0,
success
};
// Calculate category-level forecasts
await connection.query(`
INSERT INTO category_forecasts (
category_id,
forecast_date,
forecast_units,
forecast_revenue,
confidence_level,
created_at
)
SELECT
cs.cat_id::bigint as category_id,
fd.forecast_date,
GREATEST(0,
ROUND(AVG(cs.daily_quantity) *
(1 + COALESCE(sf.seasonality_factor, 0)))
) as forecast_units,
GREATEST(0,
COALESCE(
CASE
WHEN SUM(cs.day_count) >= 4 THEN AVG(cs.daily_revenue)
ELSE ct.overall_avg_revenue
END *
(1 + COALESCE(sf.seasonality_factor, 0)),
0
)
) as forecast_revenue,
CASE
WHEN ct.total_days >= 60 THEN 90
WHEN ct.total_days >= 30 THEN 80
WHEN ct.total_days >= 14 THEN 70
ELSE 60
END as confidence_level,
NOW() as created_at
FROM temp_category_sales cs
JOIN temp_category_stats ct ON cs.cat_id = ct.cat_id
CROSS JOIN temp_forecast_dates fd
LEFT JOIN sales_seasonality sf ON fd.month = sf.month
GROUP BY
cs.cat_id,
fd.forecast_date,
ct.overall_avg_revenue,
ct.total_days,
sf.seasonality_factor,
sf.month
HAVING AVG(cs.daily_quantity) > 0
ON CONFLICT (category_id, forecast_date) DO UPDATE
SET
forecast_units = EXCLUDED.forecast_units,
forecast_revenue = EXCLUDED.forecast_revenue,
confidence_level = EXCLUDED.confidence_level,
created_at = NOW()
`);
// Clean up temporary tables
await connection.query(`
DROP TABLE IF EXISTS temp_forecast_dates;
DROP TABLE IF EXISTS temp_daily_sales;
DROP TABLE IF EXISTS temp_product_stats;
DROP TABLE IF EXISTS temp_category_sales;
DROP TABLE IF EXISTS temp_category_stats;
`);
processedCount = Math.floor(totalProducts * 1.0);
outputProgress({
status: 'running',
operation: 'Category forecasts calculated and temporary tables cleaned up',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
// If we get here, everything completed successfully
success = true;
// Update calculate_status
await connection.query(`
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
VALUES ('sales_forecasts', NOW())
ON CONFLICT (module_name) DO UPDATE
SET last_calculation_timestamp = NOW()
`);
return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders: 0,
success
};
} catch (error) {
success = false;
logError(error, 'Error calculating sales forecasts');
throw error;
} finally {
if (connection) {
try {
// Ensure temporary tables are cleaned up
await connection.query(`
DROP TABLE IF EXISTS temp_forecast_dates;
DROP TABLE IF EXISTS temp_daily_sales;
DROP TABLE IF EXISTS temp_product_stats;
DROP TABLE IF EXISTS temp_category_sales;
DROP TABLE IF EXISTS temp_category_stats;
`);
} catch (err) {
console.error('Error cleaning up temporary tables:', err);
}
connection.release();
}
}
}
module.exports = calculateSalesForecasts;

View File

@@ -0,0 +1,344 @@
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress');
const { getConnection } = require('./utils/db');
async function calculateTimeAggregates(startTime, totalProducts, processedCount = 0, isCancelled = false) {
const connection = await getConnection();
let success = false;
let processedOrders = 0;
try {
if (isCancelled) {
outputProgress({
status: 'cancelled',
operation: 'Time aggregates calculation cancelled',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: null,
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
return {
processedProducts: processedCount,
processedOrders: 0,
processedPurchaseOrders: 0,
success
};
}
// Get order count that will be processed
const orderCount = await connection.query(`
SELECT COUNT(*) as count
FROM orders o
WHERE o.canceled = false
`);
processedOrders = parseInt(orderCount.rows[0].count);
outputProgress({
status: 'running',
operation: 'Starting time aggregates calculation',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
// Create a temporary table for end-of-month inventory values
await connection.query(`
CREATE TEMPORARY TABLE IF NOT EXISTS temp_monthly_inventory AS
WITH months AS (
-- Generate all year/month combinations for the last 12 months
SELECT
EXTRACT(YEAR FROM month_date)::INTEGER as year,
EXTRACT(MONTH FROM month_date)::INTEGER as month,
month_date as start_date,
(month_date + INTERVAL '1 month'::interval - INTERVAL '1 day'::interval)::DATE as end_date
FROM (
SELECT generate_series(
DATE_TRUNC('month', CURRENT_DATE - INTERVAL '12 months'::interval)::DATE,
DATE_TRUNC('month', CURRENT_DATE)::DATE,
INTERVAL '1 month'::interval
) as month_date
) dates
),
monthly_inventory_calc AS (
SELECT
p.pid,
m.year,
m.month,
m.end_date,
p.stock_quantity as current_quantity,
-- Calculate sold during period (before end_date)
COALESCE(SUM(
CASE
WHEN o.date <= m.end_date THEN o.quantity
ELSE 0
END
), 0) as sold_after_end_date,
-- Calculate received during period (before end_date)
COALESCE(SUM(
CASE
WHEN po.received_date <= m.end_date THEN po.received
ELSE 0
END
), 0) as received_after_end_date,
p.cost_price
FROM
products p
CROSS JOIN
months m
LEFT JOIN
orders o ON p.pid = o.pid
AND o.canceled = false
AND o.date > m.end_date
AND o.date <= CURRENT_DATE
LEFT JOIN
purchase_orders po ON p.pid = po.pid
AND po.received_date IS NOT NULL
AND po.received_date > m.end_date
AND po.received_date <= CURRENT_DATE
GROUP BY
p.pid, m.year, m.month, m.end_date, p.stock_quantity, p.cost_price
)
SELECT
pid,
year,
month,
-- End of month quantity = current quantity - sold after + received after
GREATEST(0, current_quantity - sold_after_end_date + received_after_end_date) as end_of_month_quantity,
-- End of month inventory value
GREATEST(0, current_quantity - sold_after_end_date + received_after_end_date) * cost_price as end_of_month_value,
cost_price
FROM
monthly_inventory_calc
`);
processedCount = Math.floor(totalProducts * 0.40);
outputProgress({
status: 'running',
operation: 'Monthly inventory values calculated, processing time aggregates',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
// Initial insert of time-based aggregates
await connection.query(`
INSERT INTO product_time_aggregates (
pid,
year,
month,
total_quantity_sold,
total_revenue,
total_cost,
order_count,
stock_received,
stock_ordered,
avg_price,
profit_margin,
inventory_value,
gmroi
)
WITH monthly_sales AS (
SELECT
o.pid,
EXTRACT(YEAR FROM o.date::timestamp with time zone)::INTEGER as year,
EXTRACT(MONTH FROM o.date::timestamp with time zone)::INTEGER as month,
SUM(o.quantity) as total_quantity_sold,
SUM((o.price - COALESCE(o.discount, 0)) * o.quantity) as total_revenue,
SUM(COALESCE(o.costeach, 0) * o.quantity) as total_cost,
COUNT(DISTINCT o.order_number) as order_count,
AVG(o.price - COALESCE(o.discount, 0)) as avg_price,
CASE
WHEN SUM((o.price - COALESCE(o.discount, 0)) * o.quantity) > 0
THEN ((SUM((o.price - COALESCE(o.discount, 0)) * o.quantity) - SUM(COALESCE(o.costeach, 0) * o.quantity))
/ SUM((o.price - COALESCE(o.discount, 0)) * o.quantity)) * 100
ELSE 0
END as profit_margin,
COUNT(DISTINCT DATE(o.date)) as active_days
FROM orders o
JOIN products p ON o.pid = p.pid
WHERE o.canceled = false
GROUP BY o.pid, EXTRACT(YEAR FROM o.date::timestamp with time zone), EXTRACT(MONTH FROM o.date::timestamp with time zone)
),
monthly_stock AS (
SELECT
pid,
EXTRACT(YEAR FROM date::timestamp with time zone)::INTEGER as year,
EXTRACT(MONTH FROM date::timestamp with time zone)::INTEGER as month,
SUM(received) as stock_received,
SUM(ordered) as stock_ordered
FROM purchase_orders
GROUP BY pid, EXTRACT(YEAR FROM date::timestamp with time zone), EXTRACT(MONTH FROM date::timestamp with time zone)
)
SELECT
COALESCE(s.pid, ms.pid, mi.pid) as pid,
COALESCE(s.year, ms.year, mi.year) as year,
COALESCE(s.month, ms.month, mi.month) as month,
COALESCE(s.total_quantity_sold, 0)::INTEGER as total_quantity_sold,
COALESCE(s.total_revenue, 0)::DECIMAL(10,3) as total_revenue,
COALESCE(s.total_cost, 0)::DECIMAL(10,3) as total_cost,
COALESCE(s.order_count, 0)::INTEGER as order_count,
COALESCE(ms.stock_received, 0)::INTEGER as stock_received,
COALESCE(ms.stock_ordered, 0)::INTEGER as stock_ordered,
COALESCE(s.avg_price, 0)::DECIMAL(10,3) as avg_price,
COALESCE(s.profit_margin, 0)::DECIMAL(10,3) as profit_margin,
COALESCE(mi.end_of_month_value, 0)::DECIMAL(10,3) as inventory_value,
CASE
WHEN COALESCE(mi.end_of_month_value, 0) > 0
THEN (COALESCE(s.total_revenue, 0) - COALESCE(s.total_cost, 0))
/ NULLIF(COALESCE(mi.end_of_month_value, 0), 0)
ELSE 0
END::DECIMAL(10,3) as gmroi
FROM (
SELECT * FROM monthly_sales s
UNION ALL
SELECT
pid,
year,
month,
0 as total_quantity_sold,
0 as total_revenue,
0 as total_cost,
0 as order_count,
NULL as avg_price,
0 as profit_margin,
0 as active_days
FROM monthly_stock ms
WHERE NOT EXISTS (
SELECT 1 FROM monthly_sales s2
WHERE s2.pid = ms.pid
AND s2.year = ms.year
AND s2.month = ms.month
)
UNION ALL
SELECT
pid,
year,
month,
0 as total_quantity_sold,
0 as total_revenue,
0 as total_cost,
0 as order_count,
NULL as avg_price,
0 as profit_margin,
0 as active_days
FROM temp_monthly_inventory mi
WHERE NOT EXISTS (
SELECT 1 FROM monthly_sales s3
WHERE s3.pid = mi.pid
AND s3.year = mi.year
AND s3.month = mi.month
)
AND NOT EXISTS (
SELECT 1 FROM monthly_stock ms3
WHERE ms3.pid = mi.pid
AND ms3.year = mi.year
AND ms3.month = mi.month
)
) s
LEFT JOIN monthly_stock ms
ON s.pid = ms.pid
AND s.year = ms.year
AND s.month = ms.month
LEFT JOIN temp_monthly_inventory mi
ON s.pid = mi.pid
AND s.year = mi.year
AND s.month = mi.month
ON CONFLICT (pid, year, month) DO UPDATE
SET
total_quantity_sold = EXCLUDED.total_quantity_sold,
total_revenue = EXCLUDED.total_revenue,
total_cost = EXCLUDED.total_cost,
order_count = EXCLUDED.order_count,
stock_received = EXCLUDED.stock_received,
stock_ordered = EXCLUDED.stock_ordered,
avg_price = EXCLUDED.avg_price,
profit_margin = EXCLUDED.profit_margin,
inventory_value = EXCLUDED.inventory_value,
gmroi = EXCLUDED.gmroi
`);
processedCount = Math.floor(totalProducts * 0.60);
outputProgress({
status: 'running',
operation: 'Base time aggregates calculated',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (isCancelled) return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders: 0,
success
};
// Clean up temporary tables
await connection.query('DROP TABLE IF EXISTS temp_monthly_inventory');
// If we get here, everything completed successfully
success = true;
// Update calculate_status
await connection.query(`
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
VALUES ('time_aggregates', NOW())
ON CONFLICT (module_name) DO UPDATE
SET last_calculation_timestamp = NOW()
`);
return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders: 0,
success
};
} catch (error) {
success = false;
logError(error, 'Error calculating time aggregates');
throw error;
} finally {
if (connection) {
try {
// Ensure temporary tables are cleaned up
await connection.query('DROP TABLE IF EXISTS temp_monthly_inventory');
} catch (err) {
console.error('Error cleaning up temporary tables:', err);
}
connection.release();
}
}
}
module.exports = calculateTimeAggregates;

View File

@@ -0,0 +1,39 @@
const { Pool } = require('pg');
const path = require('path');
require('dotenv').config({ path: path.resolve(__dirname, '../../..', '.env') });
// Database configuration
const dbConfig = {
host: process.env.DB_HOST,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
database: process.env.DB_NAME,
port: process.env.DB_PORT || 5432,
ssl: process.env.DB_SSL === 'true',
// Add performance optimizations
max: 10, // connection pool max size
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 60000
};
// Create a single pool instance to be reused
const pool = new Pool(dbConfig);
// Add event handlers for pool
pool.on('error', (err, client) => {
console.error('Unexpected error on idle client', err);
});
async function getConnection() {
return await pool.connect();
}
async function closePool() {
await pool.end();
}
module.exports = {
dbConfig,
getConnection,
closePool
};

View File

@@ -0,0 +1,158 @@
const fs = require('fs');
const path = require('path');
// Helper function to format elapsed time
function formatElapsedTime(elapsed) {
// If elapsed is a timestamp, convert to elapsed milliseconds
if (elapsed instanceof Date || elapsed > 1000000000000) {
elapsed = Date.now() - elapsed;
} else {
// If elapsed is in seconds, convert to milliseconds
elapsed = elapsed * 1000;
}
const seconds = Math.floor(elapsed / 1000);
const minutes = Math.floor(seconds / 60);
const hours = Math.floor(minutes / 60);
if (hours > 0) {
return `${hours}h ${minutes % 60}m`;
} else if (minutes > 0) {
return `${minutes}m ${seconds % 60}s`;
} else {
return `${seconds}s`;
}
}
// Helper function to estimate remaining time
function estimateRemaining(startTime, current, total) {
if (current === 0) return null;
const elapsed = Date.now() - startTime;
const rate = current / elapsed;
const remaining = (total - current) / rate;
const minutes = Math.floor(remaining / 60000);
const seconds = Math.floor((remaining % 60000) / 1000);
if (minutes > 0) {
return `${minutes}m ${seconds}s`;
} else {
return `${seconds}s`;
}
}
// Helper function to calculate rate
function calculateRate(startTime, current) {
const elapsed = (Date.now() - startTime) / 1000; // Convert to seconds
return elapsed > 0 ? Math.round(current / elapsed) : 0;
}
// Set up logging
const LOG_DIR = path.join(__dirname, '../../../logs');
const ERROR_LOG = path.join(LOG_DIR, 'import-errors.log');
const IMPORT_LOG = path.join(LOG_DIR, 'import.log');
const STATUS_FILE = path.join(LOG_DIR, 'metrics-status.json');
// Ensure log directory exists
if (!fs.existsSync(LOG_DIR)) {
fs.mkdirSync(LOG_DIR, { recursive: true });
}
// Helper function to log errors
function logError(error, context = '') {
const timestamp = new Date().toISOString();
const errorMessage = `[${timestamp}] ${context}\nError: ${error.message}\nStack: ${error.stack}\n\n`;
// Log to error file
fs.appendFileSync(ERROR_LOG, errorMessage);
// Also log to console
console.error(`\n${context}\nError: ${error.message}`);
}
// Helper function to log import progress
function logImport(message) {
const timestamp = new Date().toISOString();
const logMessage = `[${timestamp}] ${message}\n`;
fs.appendFileSync(IMPORT_LOG, logMessage);
}
// Helper function to output progress
function outputProgress(data) {
// Save progress to file for resumption
saveProgress(data);
// Format as SSE event
const event = {
progress: data
};
// Always send to stdout for frontend
process.stdout.write(JSON.stringify(event) + '\n');
// Log significant events to disk
const isSignificant =
// Operation starts
(data.operation && !data.current) ||
// Operation completions and errors
data.status === 'complete' ||
data.status === 'error' ||
// Major phase changes
data.operation?.includes('Starting ABC classification') ||
data.operation?.includes('Starting time-based aggregates') ||
data.operation?.includes('Starting vendor metrics');
if (isSignificant) {
logImport(`${data.operation || 'Operation'}${data.message ? ': ' + data.message : ''}${data.error ? ' Error: ' + data.error : ''}${data.status ? ' Status: ' + data.status : ''}`);
}
}
function saveProgress(progress) {
try {
fs.writeFileSync(STATUS_FILE, JSON.stringify({
...progress,
timestamp: Date.now()
}));
} catch (err) {
console.error('Failed to save progress:', err);
}
}
function clearProgress() {
try {
if (fs.existsSync(STATUS_FILE)) {
fs.unlinkSync(STATUS_FILE);
}
} catch (err) {
console.error('Failed to clear progress:', err);
}
}
function getProgress() {
try {
if (fs.existsSync(STATUS_FILE)) {
const progress = JSON.parse(fs.readFileSync(STATUS_FILE, 'utf8'));
// Check if the progress is still valid (less than 1 hour old)
if (progress.timestamp && Date.now() - progress.timestamp < 3600000) {
return progress;
} else {
// Clear old progress
clearProgress();
}
}
} catch (err) {
console.error('Failed to read progress:', err);
clearProgress();
}
return null;
}
module.exports = {
formatElapsedTime,
estimateRemaining,
calculateRate,
logError,
logImport,
outputProgress,
saveProgress,
clearProgress,
getProgress
};

View File

@@ -0,0 +1,378 @@
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress');
const { getConnection } = require('./utils/db');
async function calculateVendorMetrics(startTime, totalProducts, processedCount = 0, isCancelled = false) {
const connection = await getConnection();
let success = false;
let processedOrders = 0;
let processedPurchaseOrders = 0;
try {
if (isCancelled) {
outputProgress({
status: 'cancelled',
operation: 'Vendor metrics calculation cancelled',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: null,
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders,
success
};
}
// Get counts of records that will be processed
const [orderCountResult, poCountResult] = await Promise.all([
connection.query(`
SELECT COUNT(*) as count
FROM orders o
WHERE o.canceled = false
`),
connection.query(`
SELECT COUNT(*) as count
FROM purchase_orders po
WHERE po.status != 0
`)
]);
processedOrders = parseInt(orderCountResult.rows[0].count);
processedPurchaseOrders = parseInt(poCountResult.rows[0].count);
outputProgress({
status: 'running',
operation: 'Starting vendor metrics calculation',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
// First ensure all vendors exist in vendor_details
await connection.query(`
INSERT INTO vendor_details (vendor, status, created_at, updated_at)
SELECT DISTINCT
vendor,
'active' as status,
NOW() as created_at,
NOW() as updated_at
FROM products
WHERE vendor IS NOT NULL
ON CONFLICT (vendor) DO NOTHING
`);
processedCount = Math.floor(totalProducts * 0.8);
outputProgress({
status: 'running',
operation: 'Vendor details updated, calculating metrics',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (isCancelled) return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders,
success
};
// Now calculate vendor metrics
await connection.query(`
INSERT INTO vendor_metrics (
vendor,
total_revenue,
total_orders,
total_late_orders,
avg_lead_time_days,
on_time_delivery_rate,
order_fill_rate,
avg_order_value,
active_products,
total_products,
total_purchase_value,
avg_margin_percent,
status,
last_calculated_at
)
WITH vendor_sales AS (
SELECT
p.vendor,
SUM(o.quantity * o.price) as total_revenue,
COUNT(DISTINCT o.id) as total_orders,
COUNT(DISTINCT p.pid) as active_products,
SUM(o.quantity * (o.price - p.cost_price)) as total_margin
FROM products p
JOIN orders o ON p.pid = o.pid
WHERE o.canceled = false
AND o.date >= CURRENT_DATE - INTERVAL '12 months'
GROUP BY p.vendor
),
vendor_po AS (
SELECT
p.vendor,
COUNT(DISTINCT CASE WHEN po.receiving_status = 40 THEN po.id END) as received_orders,
COUNT(DISTINCT po.id) as total_orders,
AVG(CASE
WHEN po.receiving_status = 40
AND po.received_date IS NOT NULL
AND po.date IS NOT NULL
THEN EXTRACT(EPOCH FROM (po.received_date::timestamp with time zone - po.date::timestamp with time zone)) / 86400.0
ELSE NULL
END) as avg_lead_time_days,
SUM(po.ordered * po.po_cost_price) as total_purchase_value
FROM products p
JOIN purchase_orders po ON p.pid = po.pid
WHERE po.date >= CURRENT_DATE - INTERVAL '12 months'
GROUP BY p.vendor
),
vendor_products AS (
SELECT
vendor,
COUNT(DISTINCT pid) as total_products
FROM products
GROUP BY vendor
)
SELECT
vs.vendor,
COALESCE(vs.total_revenue, 0) as total_revenue,
COALESCE(vp.total_orders, 0) as total_orders,
COALESCE(vp.total_orders - vp.received_orders, 0) as total_late_orders,
COALESCE(vp.avg_lead_time_days, 0) as avg_lead_time_days,
CASE
WHEN vp.total_orders > 0
THEN (vp.received_orders / vp.total_orders) * 100
ELSE 0
END as on_time_delivery_rate,
CASE
WHEN vp.total_orders > 0
THEN (vp.received_orders / vp.total_orders) * 100
ELSE 0
END as order_fill_rate,
CASE
WHEN vs.total_orders > 0
THEN vs.total_revenue / vs.total_orders
ELSE 0
END as avg_order_value,
COALESCE(vs.active_products, 0) as active_products,
COALESCE(vpr.total_products, 0) as total_products,
COALESCE(vp.total_purchase_value, 0) as total_purchase_value,
CASE
WHEN vs.total_revenue > 0
THEN (vs.total_margin / vs.total_revenue) * 100
ELSE 0
END as avg_margin_percent,
'active' as status,
NOW() as last_calculated_at
FROM vendor_sales vs
LEFT JOIN vendor_po vp ON vs.vendor = vp.vendor
LEFT JOIN vendor_products vpr ON vs.vendor = vpr.vendor
WHERE vs.vendor IS NOT NULL
ON CONFLICT (vendor) DO UPDATE
SET
total_revenue = EXCLUDED.total_revenue,
total_orders = EXCLUDED.total_orders,
total_late_orders = EXCLUDED.total_late_orders,
avg_lead_time_days = EXCLUDED.avg_lead_time_days,
on_time_delivery_rate = EXCLUDED.on_time_delivery_rate,
order_fill_rate = EXCLUDED.order_fill_rate,
avg_order_value = EXCLUDED.avg_order_value,
active_products = EXCLUDED.active_products,
total_products = EXCLUDED.total_products,
total_purchase_value = EXCLUDED.total_purchase_value,
avg_margin_percent = EXCLUDED.avg_margin_percent,
status = EXCLUDED.status,
last_calculated_at = EXCLUDED.last_calculated_at
`);
processedCount = Math.floor(totalProducts * 0.9);
outputProgress({
status: 'running',
operation: 'Vendor metrics calculated, updating time-based metrics',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (isCancelled) return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders,
success
};
// Calculate time-based metrics
await connection.query(`
INSERT INTO vendor_time_metrics (
vendor,
year,
month,
total_orders,
late_orders,
avg_lead_time_days,
total_purchase_value,
total_revenue,
avg_margin_percent
)
WITH monthly_orders AS (
SELECT
p.vendor,
EXTRACT(YEAR FROM o.date::timestamp with time zone) as year,
EXTRACT(MONTH FROM o.date::timestamp with time zone) as month,
COUNT(DISTINCT o.id) as total_orders,
SUM(o.quantity * o.price) as total_revenue,
SUM(o.quantity * (o.price - p.cost_price)) as total_margin
FROM products p
JOIN orders o ON p.pid = o.pid
WHERE o.canceled = false
AND o.date >= CURRENT_DATE - INTERVAL '12 months'
AND p.vendor IS NOT NULL
GROUP BY p.vendor, EXTRACT(YEAR FROM o.date::timestamp with time zone), EXTRACT(MONTH FROM o.date::timestamp with time zone)
),
monthly_po AS (
SELECT
p.vendor,
EXTRACT(YEAR FROM po.date::timestamp with time zone) as year,
EXTRACT(MONTH FROM po.date::timestamp with time zone) as month,
COUNT(DISTINCT po.id) as total_po,
COUNT(DISTINCT CASE
WHEN po.receiving_status = 40 AND po.received_date > po.expected_date
THEN po.id
END) as late_orders,
AVG(CASE
WHEN po.receiving_status = 40
AND po.received_date IS NOT NULL
AND po.date IS NOT NULL
THEN EXTRACT(EPOCH FROM (po.received_date::timestamp with time zone - po.date::timestamp with time zone)) / 86400.0
ELSE NULL
END) as avg_lead_time_days,
SUM(po.ordered * po.po_cost_price) as total_purchase_value
FROM products p
JOIN purchase_orders po ON p.pid = po.pid
WHERE po.date >= CURRENT_DATE - INTERVAL '12 months'
AND p.vendor IS NOT NULL
GROUP BY p.vendor, EXTRACT(YEAR FROM po.date::timestamp with time zone), EXTRACT(MONTH FROM po.date::timestamp with time zone)
)
SELECT
mo.vendor,
mo.year,
mo.month,
COALESCE(mp.total_po, 0) as total_orders,
COALESCE(mp.late_orders, 0) as late_orders,
COALESCE(mp.avg_lead_time_days, 0) as avg_lead_time_days,
COALESCE(mp.total_purchase_value, 0) as total_purchase_value,
mo.total_revenue,
CASE
WHEN mo.total_revenue > 0
THEN (mo.total_margin / mo.total_revenue) * 100
ELSE 0
END as avg_margin_percent
FROM monthly_orders mo
LEFT JOIN monthly_po mp ON mo.vendor = mp.vendor
AND mo.year = mp.year
AND mo.month = mp.month
UNION
SELECT
mp.vendor,
mp.year,
mp.month,
mp.total_po as total_orders,
mp.late_orders,
mp.avg_lead_time_days,
mp.total_purchase_value,
0 as total_revenue,
0 as avg_margin_percent
FROM monthly_po mp
LEFT JOIN monthly_orders mo ON mp.vendor = mo.vendor
AND mp.year = mo.year
AND mp.month = mo.month
WHERE mo.vendor IS NULL
ON CONFLICT (vendor, year, month) DO UPDATE
SET
total_orders = EXCLUDED.total_orders,
late_orders = EXCLUDED.late_orders,
avg_lead_time_days = EXCLUDED.avg_lead_time_days,
total_purchase_value = EXCLUDED.total_purchase_value,
total_revenue = EXCLUDED.total_revenue,
avg_margin_percent = EXCLUDED.avg_margin_percent
`);
processedCount = Math.floor(totalProducts * 0.95);
outputProgress({
status: 'running',
operation: 'Time-based vendor metrics calculated',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
// If we get here, everything completed successfully
success = true;
// Update calculate_status
await connection.query(`
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
VALUES ('vendor_metrics', NOW())
ON CONFLICT (module_name) DO UPDATE
SET last_calculation_timestamp = NOW()
`);
return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders,
success
};
} catch (error) {
success = false;
logError(error, 'Error calculating vendor metrics');
throw error;
} finally {
if (connection) {
connection.release();
}
}
}
module.exports = calculateVendorMetrics;