736 lines
33 KiB
JavaScript
736 lines
33 KiB
JavaScript
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress');
|
|
const { getConnection } = require('./utils/db');
|
|
|
|
// Helper function to handle NaN and undefined values
|
|
function sanitizeValue(value) {
|
|
if (value === undefined || value === null || Number.isNaN(value)) {
|
|
return null;
|
|
}
|
|
return value;
|
|
}
|
|
|
|
async function calculateProductMetrics(startTime, totalProducts, processedCount = 0, isCancelled = false) {
|
|
let connection;
|
|
let success = false;
|
|
let processedOrders = 0;
|
|
const BATCH_SIZE = 5000;
|
|
|
|
try {
|
|
connection = await getConnection();
|
|
// Skip flags are inherited from the parent scope
|
|
const SKIP_PRODUCT_BASE_METRICS = 0;
|
|
const SKIP_PRODUCT_TIME_AGGREGATES = 0;
|
|
|
|
// Get total product count if not provided
|
|
if (!totalProducts) {
|
|
const productCount = await connection.query('SELECT COUNT(*) as count FROM products');
|
|
totalProducts = parseInt(productCount.rows[0].count);
|
|
}
|
|
|
|
if (isCancelled) {
|
|
outputProgress({
|
|
status: 'cancelled',
|
|
operation: 'Product metrics calculation cancelled',
|
|
current: processedCount,
|
|
total: totalProducts,
|
|
elapsed: formatElapsedTime(startTime),
|
|
remaining: null,
|
|
rate: calculateRate(startTime, processedCount),
|
|
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
|
timing: {
|
|
start_time: new Date(startTime).toISOString(),
|
|
end_time: new Date().toISOString(),
|
|
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
|
}
|
|
});
|
|
return {
|
|
processedProducts: processedCount,
|
|
processedOrders,
|
|
processedPurchaseOrders: 0,
|
|
success
|
|
};
|
|
}
|
|
|
|
// First ensure all products have a metrics record
|
|
await connection.query(`
|
|
INSERT INTO product_metrics (pid, last_calculated_at)
|
|
SELECT pid, NOW()
|
|
FROM products
|
|
ON CONFLICT (pid) DO NOTHING
|
|
`);
|
|
|
|
// Get threshold settings once
|
|
const thresholds = await connection.query(`
|
|
SELECT critical_days, reorder_days, overstock_days, low_stock_threshold
|
|
FROM stock_thresholds
|
|
WHERE category_id IS NULL AND vendor IS NULL
|
|
LIMIT 1
|
|
`);
|
|
|
|
// Check if threshold data was returned
|
|
if (!thresholds.rows || thresholds.rows.length === 0) {
|
|
console.warn('No default thresholds found in the database. Using explicit type casting in the query.');
|
|
}
|
|
|
|
const defaultThresholds = thresholds.rows[0];
|
|
|
|
// Get financial calculation configuration parameters
|
|
const financialConfig = await connection.query(`
|
|
SELECT
|
|
order_cost,
|
|
holding_rate,
|
|
service_level_z_score,
|
|
min_reorder_qty,
|
|
default_reorder_qty,
|
|
default_safety_stock
|
|
FROM financial_calc_config
|
|
WHERE id = 1
|
|
LIMIT 1
|
|
`);
|
|
const finConfig = financialConfig.rows[0] || {
|
|
order_cost: 25.00,
|
|
holding_rate: 0.25,
|
|
service_level_z_score: 1.96,
|
|
min_reorder_qty: 1,
|
|
default_reorder_qty: 5,
|
|
default_safety_stock: 5
|
|
};
|
|
|
|
// Calculate base product metrics
|
|
if (!SKIP_PRODUCT_BASE_METRICS) {
|
|
outputProgress({
|
|
status: 'running',
|
|
operation: 'Starting base product metrics calculation',
|
|
current: processedCount,
|
|
total: totalProducts,
|
|
elapsed: formatElapsedTime(startTime),
|
|
remaining: estimateRemaining(startTime, processedCount, totalProducts),
|
|
rate: calculateRate(startTime, processedCount),
|
|
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
|
timing: {
|
|
start_time: new Date(startTime).toISOString(),
|
|
end_time: new Date().toISOString(),
|
|
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
|
}
|
|
});
|
|
|
|
// Get order count that will be processed
|
|
const orderCount = await connection.query(`
|
|
SELECT COUNT(*) as count
|
|
FROM orders o
|
|
WHERE o.canceled = false
|
|
`);
|
|
processedOrders = parseInt(orderCount.rows[0].count);
|
|
|
|
// Clear temporary tables
|
|
await connection.query('DROP TABLE IF EXISTS temp_sales_metrics');
|
|
await connection.query('DROP TABLE IF EXISTS temp_purchase_metrics');
|
|
|
|
// Create temp_sales_metrics
|
|
await connection.query(`
|
|
CREATE TEMPORARY TABLE temp_sales_metrics (
|
|
pid BIGINT NOT NULL,
|
|
daily_sales_avg DECIMAL(10,3),
|
|
weekly_sales_avg DECIMAL(10,3),
|
|
monthly_sales_avg DECIMAL(10,3),
|
|
total_revenue DECIMAL(10,3),
|
|
avg_margin_percent DECIMAL(10,3),
|
|
first_sale_date DATE,
|
|
last_sale_date DATE,
|
|
stddev_daily_sales DECIMAL(10,3),
|
|
PRIMARY KEY (pid)
|
|
)
|
|
`);
|
|
|
|
// Create temp_purchase_metrics
|
|
await connection.query(`
|
|
CREATE TEMPORARY TABLE temp_purchase_metrics (
|
|
pid BIGINT NOT NULL,
|
|
avg_lead_time_days DECIMAL(10,2),
|
|
last_purchase_date DATE,
|
|
first_received_date DATE,
|
|
last_received_date DATE,
|
|
stddev_lead_time_days DECIMAL(10,2),
|
|
PRIMARY KEY (pid)
|
|
)
|
|
`);
|
|
|
|
// Populate temp_sales_metrics with base stats and sales averages
|
|
await connection.query(`
|
|
INSERT INTO temp_sales_metrics
|
|
SELECT
|
|
p.pid,
|
|
COALESCE(SUM(o.quantity) / NULLIF(COUNT(DISTINCT DATE(o.date)), 0), 0) as daily_sales_avg,
|
|
COALESCE(SUM(o.quantity) / NULLIF(CEIL(COUNT(DISTINCT DATE(o.date)) / 7), 0), 0) as weekly_sales_avg,
|
|
COALESCE(SUM(o.quantity) / NULLIF(CEIL(COUNT(DISTINCT DATE(o.date)) / 30), 0), 0) as monthly_sales_avg,
|
|
COALESCE(SUM(o.quantity * o.price), 0) as total_revenue,
|
|
CASE
|
|
WHEN SUM(o.quantity * o.price) > 0
|
|
THEN ((SUM(o.quantity * o.price) - SUM(o.quantity * p.cost_price)) / SUM(o.quantity * o.price)) * 100
|
|
ELSE 0
|
|
END as avg_margin_percent,
|
|
MIN(o.date) as first_sale_date,
|
|
MAX(o.date) as last_sale_date,
|
|
COALESCE(STDDEV_SAMP(daily_qty.quantity), 0) as stddev_daily_sales
|
|
FROM products p
|
|
LEFT JOIN orders o ON p.pid = o.pid
|
|
AND o.canceled = false
|
|
AND o.date >= CURRENT_DATE - INTERVAL '90 days'
|
|
LEFT JOIN (
|
|
SELECT
|
|
pid,
|
|
DATE(date) as sale_date,
|
|
SUM(quantity) as quantity
|
|
FROM orders
|
|
WHERE canceled = false
|
|
AND date >= CURRENT_DATE - INTERVAL '90 days'
|
|
GROUP BY pid, DATE(date)
|
|
) daily_qty ON p.pid = daily_qty.pid
|
|
GROUP BY p.pid
|
|
`);
|
|
|
|
// Populate temp_purchase_metrics with timeout protection
|
|
await Promise.race([
|
|
connection.query(`
|
|
INSERT INTO temp_purchase_metrics
|
|
SELECT
|
|
p.pid,
|
|
AVG(
|
|
CASE
|
|
WHEN po.received_date IS NOT NULL AND po.date IS NOT NULL
|
|
THEN EXTRACT(EPOCH FROM (po.received_date::timestamp with time zone - po.date::timestamp with time zone)) / 86400.0
|
|
ELSE NULL
|
|
END
|
|
) as avg_lead_time_days,
|
|
MAX(po.date) as last_purchase_date,
|
|
MIN(po.received_date) as first_received_date,
|
|
MAX(po.received_date) as last_received_date,
|
|
STDDEV_SAMP(
|
|
CASE
|
|
WHEN po.received_date IS NOT NULL AND po.date IS NOT NULL
|
|
THEN EXTRACT(EPOCH FROM (po.received_date::timestamp with time zone - po.date::timestamp with time zone)) / 86400.0
|
|
ELSE NULL
|
|
END
|
|
) as stddev_lead_time_days
|
|
FROM products p
|
|
LEFT JOIN purchase_orders po ON p.pid = po.pid
|
|
AND po.received_date IS NOT NULL
|
|
AND po.date IS NOT NULL
|
|
AND po.date >= CURRENT_DATE - INTERVAL '365 days'
|
|
GROUP BY p.pid
|
|
`),
|
|
new Promise((_, reject) =>
|
|
setTimeout(() => reject(new Error('Timeout: temp_purchase_metrics query took too long')), 60000)
|
|
)
|
|
]).catch(async (err) => {
|
|
logError(err, 'Error populating temp_purchase_metrics, continuing with empty table');
|
|
// Create an empty fallback to continue processing
|
|
await connection.query(`
|
|
INSERT INTO temp_purchase_metrics
|
|
SELECT
|
|
p.pid,
|
|
30.0 as avg_lead_time_days,
|
|
NULL as last_purchase_date,
|
|
NULL as first_received_date,
|
|
NULL as last_received_date,
|
|
0.0 as stddev_lead_time_days
|
|
FROM products p
|
|
LEFT JOIN temp_purchase_metrics tpm ON p.pid = tpm.pid
|
|
WHERE tpm.pid IS NULL
|
|
`);
|
|
});
|
|
|
|
// Process updates in batches
|
|
let lastPid = 0;
|
|
let batchCount = 0;
|
|
const MAX_BATCHES = 1000; // Safety limit for number of batches to prevent infinite loops
|
|
|
|
while (batchCount < MAX_BATCHES) {
|
|
if (isCancelled) break;
|
|
|
|
batchCount++;
|
|
const batch = await connection.query(
|
|
'SELECT pid FROM products WHERE pid > $1 ORDER BY pid LIMIT $2',
|
|
[lastPid, BATCH_SIZE]
|
|
);
|
|
|
|
if (batch.rows.length === 0) break;
|
|
|
|
// Process the entire batch in a single efficient query
|
|
const lowStockThreshold = parseInt(defaultThresholds?.low_stock_threshold) || 5;
|
|
const criticalDays = parseInt(defaultThresholds?.critical_days) || 7;
|
|
const reorderDays = parseInt(defaultThresholds?.reorder_days) || 14;
|
|
const overstockDays = parseInt(defaultThresholds?.overstock_days) || 90;
|
|
const serviceLevel = parseFloat(finConfig?.service_level_z_score) || 1.96;
|
|
const defaultSafetyStock = parseInt(finConfig?.default_safety_stock) || 5;
|
|
const defaultReorderQty = parseInt(finConfig?.default_reorder_qty) || 5;
|
|
const orderCost = parseFloat(finConfig?.order_cost) || 25.00;
|
|
const holdingRate = parseFloat(finConfig?.holding_rate) || 0.25;
|
|
const minReorderQty = parseInt(finConfig?.min_reorder_qty) || 1;
|
|
|
|
await connection.query(`
|
|
UPDATE product_metrics pm
|
|
SET
|
|
inventory_value = p.stock_quantity * NULLIF(p.cost_price, 0),
|
|
daily_sales_avg = COALESCE(sm.daily_sales_avg, 0),
|
|
weekly_sales_avg = COALESCE(sm.weekly_sales_avg, 0),
|
|
monthly_sales_avg = COALESCE(sm.monthly_sales_avg, 0),
|
|
total_revenue = COALESCE(sm.total_revenue, 0),
|
|
avg_margin_percent = COALESCE(sm.avg_margin_percent, 0),
|
|
first_sale_date = sm.first_sale_date,
|
|
last_sale_date = sm.last_sale_date,
|
|
avg_lead_time_days = COALESCE(lm.avg_lead_time_days, 30.0),
|
|
days_of_inventory = CASE
|
|
WHEN COALESCE(sm.daily_sales_avg, 0) > 0
|
|
THEN FLOOR(p.stock_quantity / NULLIF(sm.daily_sales_avg, 0))
|
|
ELSE NULL
|
|
END,
|
|
weeks_of_inventory = CASE
|
|
WHEN COALESCE(sm.weekly_sales_avg, 0) > 0
|
|
THEN FLOOR(p.stock_quantity / NULLIF(sm.weekly_sales_avg, 0))
|
|
ELSE NULL
|
|
END,
|
|
stock_status = CASE
|
|
WHEN p.stock_quantity <= 0 THEN 'Out of Stock'
|
|
WHEN COALESCE(sm.daily_sales_avg, 0) = 0 AND p.stock_quantity <= ${lowStockThreshold} THEN 'Low Stock'
|
|
WHEN COALESCE(sm.daily_sales_avg, 0) = 0 THEN 'In Stock'
|
|
WHEN p.stock_quantity / NULLIF(sm.daily_sales_avg, 0) <= ${criticalDays} THEN 'Critical'
|
|
WHEN p.stock_quantity / NULLIF(sm.daily_sales_avg, 0) <= ${reorderDays} THEN 'Reorder'
|
|
WHEN p.stock_quantity / NULLIF(sm.daily_sales_avg, 0) > ${overstockDays} THEN 'Overstocked'
|
|
ELSE 'Healthy'
|
|
END,
|
|
safety_stock = CASE
|
|
WHEN COALESCE(sm.daily_sales_avg, 0) > 0 AND COALESCE(lm.avg_lead_time_days, 0) > 0 THEN
|
|
CEIL(
|
|
${serviceLevel} * SQRT(
|
|
GREATEST(0, COALESCE(lm.avg_lead_time_days, 0)) * POWER(COALESCE(sm.stddev_daily_sales, 0), 2) +
|
|
POWER(COALESCE(sm.daily_sales_avg, 0), 2) * POWER(COALESCE(lm.stddev_lead_time_days, 0), 2)
|
|
)
|
|
)
|
|
ELSE ${defaultSafetyStock}
|
|
END,
|
|
reorder_point = CASE
|
|
WHEN COALESCE(sm.daily_sales_avg, 0) > 0 THEN
|
|
CEIL(sm.daily_sales_avg * GREATEST(0, COALESCE(lm.avg_lead_time_days, 30.0))) +
|
|
(CASE
|
|
WHEN COALESCE(sm.daily_sales_avg, 0) > 0 AND COALESCE(lm.avg_lead_time_days, 0) > 0 THEN
|
|
CEIL(
|
|
${serviceLevel} * SQRT(
|
|
GREATEST(0, COALESCE(lm.avg_lead_time_days, 0)) * POWER(COALESCE(sm.stddev_daily_sales, 0), 2) +
|
|
POWER(COALESCE(sm.daily_sales_avg, 0), 2) * POWER(COALESCE(lm.stddev_lead_time_days, 0), 2)
|
|
)
|
|
)
|
|
ELSE ${defaultSafetyStock}
|
|
END)
|
|
ELSE ${lowStockThreshold}
|
|
END,
|
|
reorder_qty = CASE
|
|
WHEN COALESCE(sm.daily_sales_avg, 0) > 0 AND NULLIF(p.cost_price, 0) IS NOT NULL AND NULLIF(p.cost_price, 0) > 0 THEN
|
|
GREATEST(
|
|
CEIL(SQRT(
|
|
(2 * (sm.daily_sales_avg * 365) * ${orderCost}) /
|
|
NULLIF(p.cost_price * ${holdingRate}, 0)
|
|
)),
|
|
${minReorderQty}
|
|
)
|
|
ELSE ${defaultReorderQty}
|
|
END,
|
|
overstocked_amt = CASE
|
|
WHEN p.stock_quantity / NULLIF(sm.daily_sales_avg, 0) > ${overstockDays}
|
|
THEN GREATEST(0, p.stock_quantity - CEIL(sm.daily_sales_avg * ${overstockDays}))
|
|
ELSE 0
|
|
END,
|
|
last_calculated_at = NOW()
|
|
FROM products p
|
|
LEFT JOIN temp_sales_metrics sm ON p.pid = sm.pid
|
|
LEFT JOIN temp_purchase_metrics lm ON p.pid = lm.pid
|
|
WHERE p.pid = ANY($1::BIGINT[])
|
|
AND pm.pid = p.pid
|
|
`, [batch.rows.map(row => row.pid)]);
|
|
|
|
lastPid = batch.rows[batch.rows.length - 1].pid;
|
|
processedCount += batch.rows.length;
|
|
|
|
outputProgress({
|
|
status: 'running',
|
|
operation: 'Processing base metrics batch',
|
|
current: processedCount,
|
|
total: totalProducts,
|
|
elapsed: formatElapsedTime(startTime),
|
|
remaining: estimateRemaining(startTime, processedCount, totalProducts),
|
|
rate: calculateRate(startTime, processedCount),
|
|
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
|
timing: {
|
|
start_time: new Date(startTime).toISOString(),
|
|
end_time: new Date().toISOString(),
|
|
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
|
}
|
|
});
|
|
}
|
|
|
|
// Add safety check if the loop processed MAX_BATCHES
|
|
if (batchCount >= MAX_BATCHES) {
|
|
logError(new Error(`Reached maximum batch count (${MAX_BATCHES}). Process may have entered an infinite loop.`), 'Batch processing safety limit reached');
|
|
}
|
|
}
|
|
|
|
// Calculate forecast accuracy and bias in batches
|
|
let forecastPid = 0;
|
|
while (true) {
|
|
if (isCancelled) break;
|
|
|
|
const forecastBatch = await connection.query(
|
|
'SELECT pid FROM products WHERE pid > $1 ORDER BY pid LIMIT $2',
|
|
[forecastPid, BATCH_SIZE]
|
|
);
|
|
|
|
if (forecastBatch.rows.length === 0) break;
|
|
|
|
const forecastPidArray = forecastBatch.rows.map(row => row.pid);
|
|
|
|
// Use array_to_string to convert the array to a string of comma-separated values
|
|
await connection.query(`
|
|
WITH forecast_metrics AS (
|
|
SELECT
|
|
sf.pid,
|
|
AVG(CASE
|
|
WHEN o.quantity > 0
|
|
THEN ABS(sf.forecast_quantity - o.quantity) / o.quantity * 100
|
|
ELSE 100
|
|
END) as avg_forecast_error,
|
|
AVG(CASE
|
|
WHEN o.quantity > 0
|
|
THEN (sf.forecast_quantity - o.quantity) / o.quantity * 100
|
|
ELSE 0
|
|
END) as avg_forecast_bias,
|
|
MAX(sf.forecast_date) as last_forecast_date
|
|
FROM sales_forecasts sf
|
|
JOIN orders o ON sf.pid = o.pid
|
|
AND DATE(o.date) = sf.forecast_date
|
|
WHERE o.canceled = false
|
|
AND sf.forecast_date >= CURRENT_DATE - INTERVAL '90 days'
|
|
AND sf.pid = ANY('{${forecastPidArray.join(',')}}'::BIGINT[])
|
|
GROUP BY sf.pid
|
|
)
|
|
UPDATE product_metrics pm
|
|
SET
|
|
forecast_accuracy = GREATEST(0, 100 - LEAST(fm.avg_forecast_error, 100)),
|
|
forecast_bias = GREATEST(-100, LEAST(fm.avg_forecast_bias, 100)),
|
|
last_forecast_date = fm.last_forecast_date,
|
|
last_calculated_at = NOW()
|
|
FROM forecast_metrics fm
|
|
WHERE pm.pid = fm.pid
|
|
`);
|
|
|
|
forecastPid = forecastBatch.rows[forecastBatch.rows.length - 1].pid;
|
|
}
|
|
|
|
// Calculate product time aggregates
|
|
if (!SKIP_PRODUCT_TIME_AGGREGATES) {
|
|
outputProgress({
|
|
status: 'running',
|
|
operation: 'Starting product time aggregates calculation',
|
|
current: processedCount || 0,
|
|
total: totalProducts || 0,
|
|
elapsed: formatElapsedTime(startTime),
|
|
remaining: estimateRemaining(startTime, processedCount || 0, totalProducts || 0),
|
|
rate: calculateRate(startTime, processedCount || 0),
|
|
percentage: (((processedCount || 0) / (totalProducts || 1)) * 100).toFixed(1),
|
|
timing: {
|
|
start_time: new Date(startTime).toISOString(),
|
|
end_time: new Date().toISOString(),
|
|
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
|
}
|
|
});
|
|
|
|
// Note: The time-aggregates calculation has been moved to time-aggregates.js
|
|
// This module will not duplicate that functionality
|
|
processedCount = Math.floor(totalProducts * 0.6);
|
|
outputProgress({
|
|
status: 'running',
|
|
operation: 'Product time aggregates calculation delegated to time-aggregates module',
|
|
current: processedCount || 0,
|
|
total: totalProducts || 0,
|
|
elapsed: formatElapsedTime(startTime),
|
|
remaining: estimateRemaining(startTime, processedCount || 0, totalProducts || 0),
|
|
rate: calculateRate(startTime, processedCount || 0),
|
|
percentage: (((processedCount || 0) / (totalProducts || 1)) * 100).toFixed(1),
|
|
timing: {
|
|
start_time: new Date(startTime).toISOString(),
|
|
end_time: new Date().toISOString(),
|
|
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
|
}
|
|
});
|
|
} else {
|
|
processedCount = Math.floor(totalProducts * 0.6);
|
|
outputProgress({
|
|
status: 'running',
|
|
operation: 'Skipping product time aggregates calculation',
|
|
current: processedCount || 0,
|
|
total: totalProducts || 0,
|
|
elapsed: formatElapsedTime(startTime),
|
|
remaining: estimateRemaining(startTime, processedCount || 0, totalProducts || 0),
|
|
rate: calculateRate(startTime, processedCount || 0),
|
|
percentage: (((processedCount || 0) / (totalProducts || 1)) * 100).toFixed(1),
|
|
timing: {
|
|
start_time: new Date(startTime).toISOString(),
|
|
end_time: new Date().toISOString(),
|
|
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
|
}
|
|
});
|
|
}
|
|
|
|
// Calculate ABC classification
|
|
outputProgress({
|
|
status: 'running',
|
|
operation: 'Starting ABC classification',
|
|
current: processedCount,
|
|
total: totalProducts,
|
|
elapsed: formatElapsedTime(startTime),
|
|
remaining: estimateRemaining(startTime, processedCount, totalProducts),
|
|
rate: calculateRate(startTime, processedCount),
|
|
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
|
timing: {
|
|
start_time: new Date(startTime).toISOString(),
|
|
end_time: new Date().toISOString(),
|
|
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
|
}
|
|
});
|
|
|
|
if (isCancelled) return {
|
|
processedProducts: processedCount,
|
|
processedOrders,
|
|
processedPurchaseOrders: 0, // This module doesn't process POs
|
|
success
|
|
};
|
|
|
|
const abcConfig = await connection.query('SELECT a_threshold, b_threshold FROM abc_classification_config WHERE id = 1');
|
|
const abcThresholds = abcConfig.rows[0] || { a_threshold: 20, b_threshold: 50 };
|
|
|
|
// Extract values and ensure they are valid numbers
|
|
const aThreshold = parseFloat(abcThresholds.a_threshold) || 20;
|
|
const bThreshold = parseFloat(abcThresholds.b_threshold) || 50;
|
|
|
|
// First, create and populate the rankings table with an index
|
|
await connection.query('DROP TABLE IF EXISTS temp_revenue_ranks');
|
|
await connection.query(`
|
|
CREATE TEMPORARY TABLE temp_revenue_ranks (
|
|
pid BIGINT NOT NULL,
|
|
total_revenue DECIMAL(10,3),
|
|
rank_num INT,
|
|
dense_rank_num INT,
|
|
percentile DECIMAL(5,2),
|
|
total_count INT,
|
|
PRIMARY KEY (pid)
|
|
)
|
|
`);
|
|
await connection.query('CREATE INDEX ON temp_revenue_ranks (rank_num)');
|
|
await connection.query('CREATE INDEX ON temp_revenue_ranks (dense_rank_num)');
|
|
await connection.query('CREATE INDEX ON temp_revenue_ranks (percentile)');
|
|
|
|
// Calculate rankings with proper tie handling
|
|
await connection.query(`
|
|
INSERT INTO temp_revenue_ranks
|
|
WITH revenue_data AS (
|
|
SELECT
|
|
pid,
|
|
total_revenue,
|
|
COUNT(*) OVER () as total_count,
|
|
PERCENT_RANK() OVER (ORDER BY total_revenue DESC) * 100 as percentile,
|
|
RANK() OVER (ORDER BY total_revenue DESC) as rank_num,
|
|
DENSE_RANK() OVER (ORDER BY total_revenue DESC) as dense_rank_num
|
|
FROM product_metrics
|
|
WHERE total_revenue > 0
|
|
)
|
|
SELECT
|
|
pid,
|
|
total_revenue,
|
|
rank_num,
|
|
dense_rank_num,
|
|
percentile,
|
|
total_count
|
|
FROM revenue_data
|
|
`);
|
|
|
|
// Get total count for percentage calculation
|
|
const rankingCount = await connection.query('SELECT MAX(rank_num) as total_count FROM temp_revenue_ranks');
|
|
const totalCount = parseInt(rankingCount.rows[0].total_count) || 1;
|
|
|
|
// Process updates in batches
|
|
let abcProcessedCount = 0;
|
|
const batchSize = 5000;
|
|
const maxPid = await connection.query('SELECT MAX(pid) as max_pid FROM products');
|
|
const maxProductId = parseInt(maxPid.rows[0].max_pid);
|
|
|
|
while (abcProcessedCount < maxProductId) {
|
|
if (isCancelled) return {
|
|
processedProducts: processedCount,
|
|
processedOrders,
|
|
processedPurchaseOrders: 0,
|
|
success
|
|
};
|
|
|
|
// Get a batch of PIDs that need updating
|
|
const pids = await connection.query(`
|
|
SELECT pm.pid
|
|
FROM product_metrics pm
|
|
LEFT JOIN temp_revenue_ranks tr ON pm.pid = tr.pid
|
|
WHERE pm.pid > $1
|
|
AND (pm.abc_class IS NULL
|
|
OR pm.abc_class !=
|
|
CASE
|
|
WHEN tr.pid IS NULL THEN 'C'
|
|
WHEN tr.percentile <= ${aThreshold} THEN 'A'
|
|
WHEN tr.percentile <= ${bThreshold} THEN 'B'
|
|
ELSE 'C'
|
|
END)
|
|
ORDER BY pm.pid
|
|
LIMIT $2
|
|
`, [abcProcessedCount, batchSize]);
|
|
|
|
if (pids.rows.length === 0) break;
|
|
|
|
const pidValues = pids.rows.map(row => row.pid);
|
|
|
|
await connection.query(`
|
|
UPDATE product_metrics pm
|
|
SET abc_class =
|
|
CASE
|
|
WHEN tr.pid IS NULL THEN 'C'
|
|
WHEN tr.percentile <= ${aThreshold} THEN 'A'
|
|
WHEN tr.percentile <= ${bThreshold} THEN 'B'
|
|
ELSE 'C'
|
|
END,
|
|
last_calculated_at = NOW()
|
|
FROM (SELECT pid, percentile FROM temp_revenue_ranks) tr
|
|
WHERE pm.pid = tr.pid AND pm.pid = ANY($1::BIGINT[])
|
|
OR (pm.pid = ANY($1::BIGINT[]) AND tr.pid IS NULL)
|
|
`, [pidValues]);
|
|
|
|
// Now update turnover rate with proper handling of zero inventory periods
|
|
await connection.query(`
|
|
UPDATE product_metrics pm
|
|
SET
|
|
turnover_rate = CASE
|
|
WHEN sales.avg_nonzero_stock > 0 AND sales.active_days > 0
|
|
THEN LEAST(
|
|
(sales.total_sold / sales.avg_nonzero_stock) * (365.0 / sales.active_days),
|
|
999.99
|
|
)
|
|
ELSE 0
|
|
END,
|
|
last_calculated_at = NOW()
|
|
FROM (
|
|
SELECT
|
|
o.pid,
|
|
SUM(o.quantity) as total_sold,
|
|
COUNT(DISTINCT DATE(o.date)) as active_days,
|
|
AVG(CASE
|
|
WHEN p.stock_quantity > 0 THEN p.stock_quantity
|
|
ELSE NULL
|
|
END) as avg_nonzero_stock
|
|
FROM orders o
|
|
JOIN products p ON o.pid = p.pid
|
|
WHERE o.canceled = false
|
|
AND o.date >= CURRENT_DATE - INTERVAL '90 days'
|
|
AND o.pid = ANY($1::BIGINT[])
|
|
GROUP BY o.pid
|
|
) sales
|
|
WHERE pm.pid = sales.pid
|
|
`, [pidValues]);
|
|
|
|
abcProcessedCount = pids.rows[pids.rows.length - 1].pid;
|
|
|
|
// Calculate progress proportionally to total products
|
|
processedCount = Math.floor(totalProducts * (0.60 + (abcProcessedCount / maxProductId) * 0.2));
|
|
|
|
outputProgress({
|
|
status: 'running',
|
|
operation: 'ABC classification progress',
|
|
current: processedCount,
|
|
total: totalProducts,
|
|
elapsed: formatElapsedTime(startTime),
|
|
remaining: estimateRemaining(startTime, processedCount, totalProducts),
|
|
rate: calculateRate(startTime, processedCount),
|
|
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
|
timing: {
|
|
start_time: new Date(startTime).toISOString(),
|
|
end_time: new Date().toISOString(),
|
|
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
|
}
|
|
});
|
|
}
|
|
|
|
// If we get here, everything completed successfully
|
|
success = true;
|
|
|
|
// Update calculate_status
|
|
await connection.query(`
|
|
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
|
|
VALUES ('product_metrics', NOW())
|
|
ON CONFLICT (module_name) DO UPDATE
|
|
SET last_calculation_timestamp = NOW()
|
|
`);
|
|
|
|
return {
|
|
processedProducts: processedCount || 0,
|
|
processedOrders: processedOrders || 0,
|
|
processedPurchaseOrders: 0, // This module doesn't process POs
|
|
success
|
|
};
|
|
|
|
} catch (error) {
|
|
success = false;
|
|
logError(error, 'Error calculating product metrics');
|
|
throw error;
|
|
} finally {
|
|
// Always clean up temporary tables, even if an error occurred
|
|
if (connection) {
|
|
try {
|
|
await connection.query('DROP TABLE IF EXISTS temp_sales_metrics');
|
|
await connection.query('DROP TABLE IF EXISTS temp_purchase_metrics');
|
|
} catch (err) {
|
|
console.error('Error cleaning up temporary tables:', err);
|
|
}
|
|
|
|
// Make sure to release the connection
|
|
connection.release();
|
|
}
|
|
}
|
|
}
|
|
|
|
function calculateStockStatus(stock, config, daily_sales_avg, weekly_sales_avg, monthly_sales_avg) {
|
|
if (stock <= 0) {
|
|
return 'Out of Stock';
|
|
}
|
|
|
|
// Use the most appropriate sales average based on data quality
|
|
let sales_avg = daily_sales_avg;
|
|
if (sales_avg === 0) {
|
|
sales_avg = weekly_sales_avg / 7;
|
|
}
|
|
if (sales_avg === 0) {
|
|
sales_avg = monthly_sales_avg / 30;
|
|
}
|
|
|
|
if (sales_avg === 0) {
|
|
return stock <= config.low_stock_threshold ? 'Low Stock' : 'In Stock';
|
|
}
|
|
|
|
const days_of_stock = stock / sales_avg;
|
|
|
|
if (days_of_stock <= config.critical_days) {
|
|
return 'Critical';
|
|
} else if (days_of_stock <= config.reorder_days) {
|
|
return 'Reorder';
|
|
} else if (days_of_stock > config.overstock_days) {
|
|
return 'Overstocked';
|
|
}
|
|
|
|
return 'Healthy';
|
|
}
|
|
|
|
// Note: calculateReorderQuantities function has been removed as its logic has been incorporated
|
|
// in the main SQL query with configurable parameters
|
|
|
|
module.exports = calculateProductMetrics;
|