10 Commits

Author SHA1 Message Date
90379386d6 Apply gemini suggested calculate improvements 2025-02-10 15:20:22 -05:00
09f7103472 Fix TopReplenishProducts component 2025-02-10 14:35:24 -05:00
d8fd64cf62 More try to speed up time-aggregate calcs 2025-02-10 14:30:52 -05:00
619409847d Try to speed up time-aggregates 2025-02-10 14:14:52 -05:00
eea57528ab Try to speed up category calcs 2025-02-10 11:21:20 -05:00
3d2d1b3946 Try to speed up brand calcs 2025-02-10 10:20:32 -05:00
d936d50f83 Vendor calculate script fix 2025-02-10 09:26:24 -05:00
610e26689c Try to speed up calculate script + fixes 2025-02-10 01:29:01 -05:00
7ff757203f Calculate script fixes 2025-02-09 15:40:57 -05:00
843ce71506 Make calculations incremental 2025-02-09 13:35:44 -05:00
10 changed files with 1492 additions and 1833 deletions

View File

@@ -126,13 +126,13 @@ CREATE TABLE IF NOT EXISTS vendor_metrics (
order_fill_rate DECIMAL(5,2), order_fill_rate DECIMAL(5,2),
total_orders INT DEFAULT 0, total_orders INT DEFAULT 0,
total_late_orders INT DEFAULT 0, total_late_orders INT DEFAULT 0,
total_purchase_value DECIMAL(10,3) DEFAULT 0, total_purchase_value DECIMAL(15,3) DEFAULT 0,
avg_order_value DECIMAL(10,3), avg_order_value DECIMAL(15,3),
-- Product metrics -- Product metrics
active_products INT DEFAULT 0, active_products INT DEFAULT 0,
total_products INT DEFAULT 0, total_products INT DEFAULT 0,
-- Financial metrics -- Financial metrics
total_revenue DECIMAL(10,3) DEFAULT 0, total_revenue DECIMAL(15,3) DEFAULT 0,
avg_margin_percent DECIMAL(5,2), avg_margin_percent DECIMAL(5,2),
-- Status -- Status
status VARCHAR(20) DEFAULT 'active', status VARCHAR(20) DEFAULT 'active',

View File

@@ -83,6 +83,7 @@ process.on('SIGTERM', cancelCalculation);
async function calculateMetrics() { async function calculateMetrics() {
let connection; let connection;
const startTime = Date.now(); const startTime = Date.now();
// Initialize all counts to 0
let processedProducts = 0; let processedProducts = 0;
let processedOrders = 0; let processedOrders = 0;
let processedPurchaseOrders = 0; let processedPurchaseOrders = 0;
@@ -90,7 +91,7 @@ async function calculateMetrics() {
let totalOrders = 0; let totalOrders = 0;
let totalPurchaseOrders = 0; let totalPurchaseOrders = 0;
let calculateHistoryId; let calculateHistoryId;
try { try {
// Clean up any previously running calculations // Clean up any previously running calculations
connection = await getConnection(); connection = await getConnection();
@@ -104,18 +105,57 @@ async function calculateMetrics() {
WHERE status = 'running' WHERE status = 'running'
`); `);
// Get counts from all relevant tables // Get counts of records that need updating based on last calculation time
const [[productCount], [orderCount], [poCount]] = await Promise.all([ const [[productCount], [orderCount], [poCount]] = await Promise.all([
connection.query('SELECT COUNT(*) as total FROM products'), connection.query(`
connection.query('SELECT COUNT(*) as total FROM orders'), SELECT COUNT(DISTINCT p.pid) as total
connection.query('SELECT COUNT(*) as total FROM purchase_orders') FROM products p
FORCE INDEX (PRIMARY)
LEFT JOIN calculate_status cs ON cs.module_name = 'product_metrics'
LEFT JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid
AND o.updated > COALESCE(cs.last_calculation_timestamp, '1970-01-01')
AND o.canceled = false
LEFT JOIN purchase_orders po FORCE INDEX (idx_purchase_orders_metrics) ON p.pid = po.pid
AND po.updated > COALESCE(cs.last_calculation_timestamp, '1970-01-01')
WHERE p.updated > COALESCE(cs.last_calculation_timestamp, '1970-01-01')
OR o.pid IS NOT NULL
OR po.pid IS NOT NULL
`),
connection.query(`
SELECT COUNT(DISTINCT o.id) as total
FROM orders o
FORCE INDEX (idx_orders_metrics)
LEFT JOIN calculate_status cs ON cs.module_name = 'product_metrics'
WHERE o.updated > COALESCE(cs.last_calculation_timestamp, '1970-01-01')
AND o.canceled = false
`),
connection.query(`
SELECT COUNT(DISTINCT po.id) as total
FROM purchase_orders po
FORCE INDEX (idx_purchase_orders_metrics)
LEFT JOIN calculate_status cs ON cs.module_name = 'product_metrics'
WHERE po.updated > COALESCE(cs.last_calculation_timestamp, '1970-01-01')
`)
]); ]);
totalProducts = productCount.total; totalProducts = productCount.total;
totalOrders = orderCount.total; totalOrders = orderCount.total;
totalPurchaseOrders = poCount.total; totalPurchaseOrders = poCount.total;
connection.release();
// If nothing needs updating, we can exit early
if (totalProducts === 0 && totalOrders === 0 && totalPurchaseOrders === 0) {
console.log('No records need updating');
return {
processedProducts: 0,
processedOrders: 0,
processedPurchaseOrders: 0,
success: true
};
}
// Create history record for this calculation // Create history record for this calculation
connection = await getConnection(); // Re-establish connection
const [historyResult] = await connection.query(` const [historyResult] = await connection.query(`
INSERT INTO calculate_history ( INSERT INTO calculate_history (
start_time, start_time,
@@ -146,7 +186,7 @@ async function calculateMetrics() {
totalPurchaseOrders, totalPurchaseOrders,
SKIP_PRODUCT_METRICS, SKIP_PRODUCT_METRICS,
SKIP_TIME_AGGREGATES, SKIP_TIME_AGGREGATES,
SKIP_FINANCIAL_METRICS, SKIP_FINANCIAL_METRICS,
SKIP_VENDOR_METRICS, SKIP_VENDOR_METRICS,
SKIP_CATEGORY_METRICS, SKIP_CATEGORY_METRICS,
SKIP_BRAND_METRICS, SKIP_BRAND_METRICS,
@@ -172,7 +212,7 @@ async function calculateMetrics() {
} }
isCancelled = false; isCancelled = false;
connection = await getConnection(); connection = await getConnection(); // Get a new connection for the main processing
try { try {
global.outputProgress({ global.outputProgress({
@@ -191,14 +231,12 @@ async function calculateMetrics() {
} }
}); });
// Update progress periodically // Update progress periodically - REFACTORED
const updateProgress = async (products = null, orders = null, purchaseOrders = null) => { const updateProgress = async (products = null, orders = null, purchaseOrders = null) => {
// Ensure all values are valid numbers or default to previous value
if (products !== null) processedProducts = Number(products) || processedProducts || 0; if (products !== null) processedProducts = Number(products) || processedProducts || 0;
if (orders !== null) processedOrders = Number(orders) || processedOrders || 0; if (orders !== null) processedOrders = Number(orders) || processedOrders || 0;
if (purchaseOrders !== null) processedPurchaseOrders = Number(purchaseOrders) || processedPurchaseOrders || 0; if (purchaseOrders !== null) processedPurchaseOrders = Number(purchaseOrders) || processedPurchaseOrders || 0;
// Ensure we never send NaN to the database
const safeProducts = Number(processedProducts) || 0; const safeProducts = Number(processedProducts) || 0;
const safeOrders = Number(processedOrders) || 0; const safeOrders = Number(processedOrders) || 0;
const safePurchaseOrders = Number(processedPurchaseOrders) || 0; const safePurchaseOrders = Number(processedPurchaseOrders) || 0;
@@ -213,14 +251,14 @@ async function calculateMetrics() {
`, [safeProducts, safeOrders, safePurchaseOrders, calculateHistoryId]); `, [safeProducts, safeOrders, safePurchaseOrders, calculateHistoryId]);
}; };
// Helper function to ensure valid progress numbers // Helper function to ensure valid progress numbers - this is fine
const ensureValidProgress = (current, total) => ({ const ensureValidProgress = (current, total) => ({
current: Number(current) || 0, current: Number(current) || 0,
total: Number(total) || 1, // Default to 1 to avoid division by zero total: Number(total) || 1, // Default to 1 to avoid division by zero
percentage: (((Number(current) || 0) / (Number(total) || 1)) * 100).toFixed(1) percentage: (((Number(current) || 0) / (Number(total) || 1)) * 100).toFixed(1)
}); });
// Initial progress // Initial progress - this is fine
const initialProgress = ensureValidProgress(0, totalProducts); const initialProgress = ensureValidProgress(0, totalProducts);
global.outputProgress({ global.outputProgress({
status: 'running', status: 'running',
@@ -238,37 +276,28 @@ async function calculateMetrics() {
} }
}); });
// --- Call each module, passing totals and accumulating processed counts ---
if (!SKIP_PRODUCT_METRICS) { if (!SKIP_PRODUCT_METRICS) {
const result = await calculateProductMetrics(startTime, totalProducts); const result = await calculateProductMetrics(startTime, totalProducts, processedProducts, isCancelled); // Pass totals
await updateProgress(result.processedProducts, result.processedOrders, result.processedPurchaseOrders); processedProducts += result.processedProducts; // Accumulate
processedOrders += result.processedOrders;
processedPurchaseOrders += result.processedPurchaseOrders;
await updateProgress(processedProducts, processedOrders, processedPurchaseOrders); // Update with accumulated values
if (!result.success) { if (!result.success) {
throw new Error('Product metrics calculation failed'); throw new Error('Product metrics calculation failed');
} }
} else { } else {
console.log('Skipping product metrics calculation...'); console.log('Skipping product metrics calculation...');
processedProducts = Math.floor(totalProducts * 0.6); // Don't artificially inflate processedProducts if skipping
await updateProgress(processedProducts);
global.outputProgress({
status: 'running',
operation: 'Skipping product metrics calculation',
current: processedProducts,
total: totalProducts,
elapsed: global.formatElapsedTime(startTime),
remaining: global.estimateRemaining(startTime, processedProducts, totalProducts),
rate: global.calculateRate(startTime, processedProducts),
percentage: '60',
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
} }
// Calculate time-based aggregates
if (!SKIP_TIME_AGGREGATES) { if (!SKIP_TIME_AGGREGATES) {
const result = await calculateTimeAggregates(startTime, totalProducts, processedProducts); const result = await calculateTimeAggregates(startTime, totalProducts, processedProducts, isCancelled); // Pass totals
await updateProgress(result.processedProducts, result.processedOrders, result.processedPurchaseOrders); processedProducts += result.processedProducts; // Accumulate
processedOrders += result.processedOrders;
processedPurchaseOrders += result.processedPurchaseOrders;
await updateProgress(processedProducts, processedOrders, processedPurchaseOrders);
if (!result.success) { if (!result.success) {
throw new Error('Time aggregates calculation failed'); throw new Error('Time aggregates calculation failed');
} }
@@ -276,21 +305,25 @@ async function calculateMetrics() {
console.log('Skipping time aggregates calculation'); console.log('Skipping time aggregates calculation');
} }
// Calculate financial metrics
if (!SKIP_FINANCIAL_METRICS) { if (!SKIP_FINANCIAL_METRICS) {
const result = await calculateFinancialMetrics(startTime, totalProducts, processedProducts); const result = await calculateFinancialMetrics(startTime, totalProducts, processedProducts, isCancelled); // Pass totals
await updateProgress(result.processedProducts, result.processedOrders, result.processedPurchaseOrders); processedProducts += result.processedProducts; // Accumulate
processedOrders += result.processedOrders;
processedPurchaseOrders += result.processedPurchaseOrders;
await updateProgress(processedProducts, processedOrders, processedPurchaseOrders);
if (!result.success) { if (!result.success) {
throw new Error('Financial metrics calculation failed'); throw new Error('Financial metrics calculation failed');
} }
} else { } else {
console.log('Skipping financial metrics calculation'); console.log('Skipping financial metrics calculation');
} }
// Calculate vendor metrics
if (!SKIP_VENDOR_METRICS) { if (!SKIP_VENDOR_METRICS) {
const result = await calculateVendorMetrics(startTime, totalProducts, processedProducts); const result = await calculateVendorMetrics(startTime, totalProducts, processedProducts, isCancelled); // Pass totals
await updateProgress(result.processedProducts, result.processedOrders, result.processedPurchaseOrders); processedProducts += result.processedProducts; // Accumulate
processedOrders += result.processedOrders;
processedPurchaseOrders += result.processedPurchaseOrders;
await updateProgress(processedProducts, processedOrders, processedPurchaseOrders);
if (!result.success) { if (!result.success) {
throw new Error('Vendor metrics calculation failed'); throw new Error('Vendor metrics calculation failed');
} }
@@ -298,10 +331,12 @@ async function calculateMetrics() {
console.log('Skipping vendor metrics calculation'); console.log('Skipping vendor metrics calculation');
} }
// Calculate category metrics
if (!SKIP_CATEGORY_METRICS) { if (!SKIP_CATEGORY_METRICS) {
const result = await calculateCategoryMetrics(startTime, totalProducts, processedProducts); const result = await calculateCategoryMetrics(startTime, totalProducts, processedProducts, isCancelled); // Pass totals
await updateProgress(result.processedProducts, result.processedOrders, result.processedPurchaseOrders); processedProducts += result.processedProducts; // Accumulate
processedOrders += result.processedOrders;
processedPurchaseOrders += result.processedPurchaseOrders;
await updateProgress(processedProducts, processedOrders, processedPurchaseOrders);
if (!result.success) { if (!result.success) {
throw new Error('Category metrics calculation failed'); throw new Error('Category metrics calculation failed');
} }
@@ -309,10 +344,12 @@ async function calculateMetrics() {
console.log('Skipping category metrics calculation'); console.log('Skipping category metrics calculation');
} }
// Calculate brand metrics
if (!SKIP_BRAND_METRICS) { if (!SKIP_BRAND_METRICS) {
const result = await calculateBrandMetrics(startTime, totalProducts, processedProducts); const result = await calculateBrandMetrics(startTime, totalProducts, processedProducts, isCancelled); // Pass totals
await updateProgress(result.processedProducts, result.processedOrders, result.processedPurchaseOrders); processedProducts += result.processedProducts; // Accumulate
processedOrders += result.processedOrders;
processedPurchaseOrders += result.processedPurchaseOrders;
await updateProgress(processedProducts, processedOrders, processedPurchaseOrders);
if (!result.success) { if (!result.success) {
throw new Error('Brand metrics calculation failed'); throw new Error('Brand metrics calculation failed');
} }
@@ -320,10 +357,12 @@ async function calculateMetrics() {
console.log('Skipping brand metrics calculation'); console.log('Skipping brand metrics calculation');
} }
// Calculate sales forecasts
if (!SKIP_SALES_FORECASTS) { if (!SKIP_SALES_FORECASTS) {
const result = await calculateSalesForecasts(startTime, totalProducts, processedProducts); const result = await calculateSalesForecasts(startTime, totalProducts, processedProducts, isCancelled); // Pass totals
await updateProgress(result.processedProducts, result.processedOrders, result.processedPurchaseOrders); processedProducts += result.processedProducts; // Accumulate
processedOrders += result.processedOrders;
processedPurchaseOrders += result.processedPurchaseOrders;
await updateProgress(processedProducts, processedOrders, processedPurchaseOrders);
if (!result.success) { if (!result.success) {
throw new Error('Sales forecasts calculation failed'); throw new Error('Sales forecasts calculation failed');
} }
@@ -331,23 +370,7 @@ async function calculateMetrics() {
console.log('Skipping sales forecasts calculation'); console.log('Skipping sales forecasts calculation');
} }
// Calculate ABC classification // --- ABC Classification (Refactored) ---
outputProgress({
status: 'running',
operation: 'Starting ABC classification',
current: processedProducts || 0,
total: totalProducts || 0,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedProducts || 0, totalProducts || 0),
rate: calculateRate(startTime, processedProducts || 0),
percentage: (((processedProducts || 0) / (totalProducts || 1)) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (isCancelled) return { if (isCancelled) return {
processedProducts: processedProducts || 0, processedProducts: processedProducts || 0,
processedOrders: processedOrders || 0, processedOrders: processedOrders || 0,
@@ -365,21 +388,26 @@ async function calculateMetrics() {
pid BIGINT NOT NULL, pid BIGINT NOT NULL,
total_revenue DECIMAL(10,3), total_revenue DECIMAL(10,3),
rank_num INT, rank_num INT,
dense_rank_num INT,
percentile DECIMAL(5,2),
total_count INT, total_count INT,
PRIMARY KEY (pid), PRIMARY KEY (pid),
INDEX (rank_num) INDEX (rank_num),
INDEX (dense_rank_num),
INDEX (percentile)
) ENGINE=MEMORY ) ENGINE=MEMORY
`); `);
let processedCount = processedProducts;
outputProgress({ outputProgress({
status: 'running', status: 'running',
operation: 'Creating revenue rankings', operation: 'Creating revenue rankings',
current: processedProducts || 0, current: processedCount,
total: totalProducts || 0, total: totalProducts,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedProducts || 0, totalProducts || 0), remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedProducts || 0), rate: calculateRate(startTime, processedCount),
percentage: (((processedProducts || 0) / (totalProducts || 1)) * 100).toFixed(1), percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: { timing: {
start_time: new Date(startTime).toISOString(), start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(), end_time: new Date().toISOString(),
@@ -394,49 +422,35 @@ async function calculateMetrics() {
success: false success: false
}; };
// Calculate rankings with proper tie handling and get total count in one go.
await connection.query(` await connection.query(`
INSERT INTO temp_revenue_ranks INSERT INTO temp_revenue_ranks
WITH revenue_data AS (
SELECT
pid,
total_revenue,
COUNT(*) OVER () as total_count,
PERCENT_RANK() OVER (ORDER BY total_revenue DESC) * 100 as percentile,
RANK() OVER (ORDER BY total_revenue DESC) as rank_num,
DENSE_RANK() OVER (ORDER BY total_revenue DESC) as dense_rank_num
FROM product_metrics
WHERE total_revenue > 0
)
SELECT SELECT
pid, pid,
total_revenue, total_revenue,
@rank := @rank + 1 as rank_num, rank_num,
@total_count := @rank as total_count dense_rank_num,
FROM ( percentile,
SELECT pid, total_revenue total_count
FROM product_metrics FROM revenue_data
WHERE total_revenue > 0
ORDER BY total_revenue DESC
) ranked,
(SELECT @rank := 0) r
`); `);
// Get total count for percentage calculation // Get total count for percentage calculation (already done in the above query)
const [rankingCount] = await connection.query('SELECT MAX(rank_num) as total_count FROM temp_revenue_ranks'); // No need for this separate query:
const totalCount = rankingCount[0].total_count || 1; // const [rankingCount] = await connection.query('SELECT MAX(rank_num) as total_count FROM temp_revenue_ranks');
const max_rank = totalCount; // Store max_rank for use in classification // const totalCount = rankingCount[0].total_count || 1;
// const max_rank = totalCount; // Store max_rank for use in classification
outputProgress({
status: 'running',
operation: 'Updating ABC classifications',
current: processedProducts || 0,
total: totalProducts || 0,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedProducts || 0, totalProducts || 0),
rate: calculateRate(startTime, processedProducts || 0),
percentage: (((processedProducts || 0) / (totalProducts || 1)) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (isCancelled) return {
processedProducts: processedProducts || 0,
processedOrders: processedOrders || 0,
processedPurchaseOrders: 0,
success: false
};
// ABC classification progress tracking // ABC classification progress tracking
let abcProcessedCount = 0; let abcProcessedCount = 0;
@@ -452,7 +466,7 @@ async function calculateMetrics() {
success: false success: false
}; };
// First get a batch of PIDs that need updating // Get a batch of PIDs that need updating - REFACTORED to use percentile
const [pids] = await connection.query(` const [pids] = await connection.query(`
SELECT pm.pid SELECT pm.pid
FROM product_metrics pm FROM product_metrics pm
@@ -460,41 +474,38 @@ async function calculateMetrics() {
WHERE pm.abc_class IS NULL WHERE pm.abc_class IS NULL
OR pm.abc_class != OR pm.abc_class !=
CASE CASE
WHEN tr.rank_num IS NULL THEN 'C' WHEN tr.pid IS NULL THEN 'C'
WHEN (tr.rank_num / ?) * 100 <= ? THEN 'A' WHEN tr.percentile <= ? THEN 'A'
WHEN (tr.rank_num / ?) * 100 <= ? THEN 'B' WHEN tr.percentile <= ? THEN 'B'
ELSE 'C' ELSE 'C'
END END
LIMIT ? LIMIT ?
`, [max_rank, abcThresholds.a_threshold, `, [abcThresholds.a_threshold, abcThresholds.b_threshold, batchSize]);
max_rank, abcThresholds.b_threshold,
batchSize]);
if (pids.length === 0) { if (pids.length === 0) {
break; break;
} }
// Then update just those PIDs // Update just those PIDs - REFACTORED to use percentile
const [result] = await connection.query(` await connection.query(`
UPDATE product_metrics pm UPDATE product_metrics pm
LEFT JOIN temp_revenue_ranks tr ON pm.pid = tr.pid LEFT JOIN temp_revenue_ranks tr ON pm.pid = tr.pid
SET pm.abc_class = SET pm.abc_class =
CASE CASE
WHEN tr.rank_num IS NULL THEN 'C' WHEN tr.pid IS NULL THEN 'C'
WHEN (tr.rank_num / ?) * 100 <= ? THEN 'A' WHEN tr.percentile <= ? THEN 'A'
WHEN (tr.rank_num / ?) * 100 <= ? THEN 'B' WHEN tr.percentile <= ? THEN 'B'
ELSE 'C' ELSE 'C'
END, END,
pm.last_calculated_at = NOW() pm.last_calculated_at = NOW()
WHERE pm.pid IN (?) WHERE pm.pid IN (?)
`, [max_rank, abcThresholds.a_threshold, `, [abcThresholds.a_threshold, abcThresholds.b_threshold, pids.map(row => row.pid)]);
max_rank, abcThresholds.b_threshold,
pids.map(row => row.pid)]); abcProcessedCount += pids.length; // Use pids.length, more accurate
processedProducts += pids.length; // Add to the main processedProducts
abcProcessedCount += result.affectedRows;
// Calculate progress ensuring valid numbers // Calculate progress ensuring valid numbers
const currentProgress = Math.floor(totalProducts * (0.99 + (abcProcessedCount / (totalCount || 1)) * 0.01)); const currentProgress = Math.floor(totalProducts * (0.99 + (abcProcessedCount / (totalProducts || 1)) * 0.01));
processedProducts = Number(currentProgress) || processedProducts || 0; processedProducts = Number(currentProgress) || processedProducts || 0;
// Only update progress at most once per second // Only update progress at most once per second

View File

@@ -4,19 +4,51 @@ const { getConnection } = require('./utils/db');
async function calculateBrandMetrics(startTime, totalProducts, processedCount = 0, isCancelled = false) { async function calculateBrandMetrics(startTime, totalProducts, processedCount = 0, isCancelled = false) {
const connection = await getConnection(); const connection = await getConnection();
let success = false; let success = false;
let processedOrders = 0; const BATCH_SIZE = 5000;
let myProcessedProducts = 0; // Not *directly* processing products, tracking brands
try { try {
// Get last calculation timestamp
const [lastCalc] = await connection.query(`
SELECT last_calculation_timestamp
FROM calculate_status
WHERE module_name = 'brand_metrics'
`);
const lastCalculationTime = lastCalc[0]?.last_calculation_timestamp || '1970-01-01';
// Get total count of brands needing updates
const [brandCount] = await connection.query(`
SELECT COUNT(DISTINCT p.brand) as count
FROM products p
LEFT JOIN orders o ON p.pid = o.pid AND o.updated > ?
WHERE p.brand IS NOT NULL
AND (
p.updated > ?
OR o.id IS NOT NULL
)
`, [lastCalculationTime, lastCalculationTime]);
const totalBrands = brandCount[0].count; // Track total *brands*
if (totalBrands === 0) {
console.log('No brands need metric updates');
return {
processedProducts: 0, // Not directly processing products
processedOrders: 0,
processedPurchaseOrders: 0,
success: true
};
}
if (isCancelled) { if (isCancelled) {
outputProgress({ outputProgress({
status: 'cancelled', status: 'cancelled',
operation: 'Brand metrics calculation cancelled', operation: 'Brand metrics calculation cancelled',
current: processedCount, current: processedCount, // Use passed-in value
total: totalProducts, total: totalBrands, // Report total *brands*
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: null, remaining: null,
rate: calculateRate(startTime, processedCount), rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1), percentage: ((processedCount / totalBrands) * 100).toFixed(1), // Base on brands
timing: { timing: {
start_time: new Date(startTime).toISOString(), start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(), end_time: new Date().toISOString(),
@@ -24,30 +56,22 @@ async function calculateBrandMetrics(startTime, totalProducts, processedCount =
} }
}); });
return { return {
processedProducts: processedCount, processedProducts: 0, // Not directly processing products
processedOrders: 0, processedOrders: 0,
processedPurchaseOrders: 0, processedPurchaseOrders: 0,
success success
}; };
} }
// Get order count that will be processed
const [orderCount] = await connection.query(`
SELECT COUNT(*) as count
FROM orders o
WHERE o.canceled = false
`);
processedOrders = orderCount[0].count;
outputProgress({ outputProgress({
status: 'running', status: 'running',
operation: 'Starting brand metrics calculation', operation: 'Starting brand metrics calculation',
current: processedCount, current: processedCount, // Use passed-in value
total: totalProducts, total: totalBrands, // Report total *brands*
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts), remaining: estimateRemaining(startTime, processedCount, totalBrands),
rate: calculateRate(startTime, processedCount), rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1), percentage: ((processedCount / totalBrands) * 100).toFixed(1), // Base on brands
timing: { timing: {
start_time: new Date(startTime).toISOString(), start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(), end_time: new Date().toISOString(),
@@ -55,241 +79,194 @@ async function calculateBrandMetrics(startTime, totalProducts, processedCount =
} }
}); });
// Calculate brand metrics with optimized queries // Process in batches
await connection.query(` let lastBrand = '';
INSERT INTO brand_metrics ( let processedBrands = 0; // Track processed brands
brand, while (true) {
product_count, if (isCancelled) break;
active_products,
total_stock_units, const [batch] = await connection.query(`
total_stock_cost, SELECT DISTINCT p.brand
total_stock_retail,
total_revenue,
avg_margin,
growth_rate
)
WITH filtered_products AS (
SELECT
p.*,
CASE
WHEN p.stock_quantity <= 5000 AND p.stock_quantity >= 0
THEN p.pid
END as valid_pid,
CASE
WHEN p.visible = true
AND p.stock_quantity <= 5000
AND p.stock_quantity >= 0
THEN p.pid
END as active_pid,
CASE
WHEN p.stock_quantity IS NULL
OR p.stock_quantity < 0
OR p.stock_quantity > 5000
THEN 0
ELSE p.stock_quantity
END as valid_stock
FROM products p FROM products p
FORCE INDEX (idx_brand)
LEFT JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid AND o.updated > ?
WHERE p.brand IS NOT NULL WHERE p.brand IS NOT NULL
), AND p.brand > ?
sales_periods AS ( AND (
p.updated > ?
OR o.id IS NOT NULL
)
ORDER BY p.brand
LIMIT ?
`, [lastCalculationTime, lastBrand, lastCalculationTime, BATCH_SIZE]);
if (batch.length === 0) break;
// Create temporary tables for better performance
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_product_stats');
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_sales_stats');
await connection.query(`
CREATE TEMPORARY TABLE temp_product_stats (
brand VARCHAR(100) NOT NULL,
product_count INT,
active_products INT,
total_stock_units INT,
total_stock_cost DECIMAL(15,2),
total_stock_retail DECIMAL(15,2),
total_revenue DECIMAL(15,2),
avg_margin DECIMAL(5,2),
PRIMARY KEY (brand),
INDEX (total_revenue),
INDEX (product_count)
) ENGINE=MEMORY
`);
await connection.query(`
CREATE TEMPORARY TABLE temp_sales_stats (
brand VARCHAR(100) NOT NULL,
current_period_sales DECIMAL(15,2),
previous_period_sales DECIMAL(15,2),
PRIMARY KEY (brand),
INDEX (current_period_sales),
INDEX (previous_period_sales)
) ENGINE=MEMORY
`);
// Populate product stats with optimized index usage
await connection.query(`
INSERT INTO temp_product_stats
SELECT SELECT
p.brand, p.brand,
SUM(o.quantity * (o.price - COALESCE(o.discount, 0))) as period_revenue, COUNT(DISTINCT p.pid) as product_count,
SUM(o.quantity * (o.price - COALESCE(o.discount, 0) - p.cost_price)) as period_margin, COUNT(DISTINCT CASE WHEN p.visible = true THEN p.pid END) as active_products,
COUNT(DISTINCT DATE(o.date)) as period_days, COALESCE(SUM(p.stock_quantity), 0) as total_stock_units,
CASE COALESCE(SUM(p.stock_quantity * p.cost_price), 0) as total_stock_cost,
WHEN o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 3 MONTH) THEN 'current' COALESCE(SUM(p.stock_quantity * p.price), 0) as total_stock_retail,
WHEN o.date BETWEEN DATE_SUB(CURRENT_DATE, INTERVAL 15 MONTH) COALESCE(SUM(pm.total_revenue), 0) as total_revenue,
AND DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH) THEN 'previous' COALESCE(AVG(NULLIF(pm.avg_margin_percent, 0)), 0) as avg_margin
END as period_type FROM products p
FROM filtered_products p FORCE INDEX (idx_brand)
JOIN orders o ON p.pid = o.pid LEFT JOIN product_metrics pm FORCE INDEX (PRIMARY) ON p.pid = pm.pid
WHERE o.canceled = false WHERE p.brand IN (?)
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 15 MONTH) AND (
GROUP BY p.brand, period_type p.updated > ?
), OR EXISTS (
brand_data AS ( SELECT 1 FROM orders o FORCE INDEX (idx_orders_metrics)
SELECT WHERE o.pid = p.pid
p.brand, AND o.updated > ?
COUNT(DISTINCT p.valid_pid) as product_count,
COUNT(DISTINCT p.active_pid) as active_products,
SUM(p.valid_stock) as total_stock_units,
SUM(p.valid_stock * p.cost_price) as total_stock_cost,
SUM(p.valid_stock * p.price) as total_stock_retail,
COALESCE(SUM(o.quantity * (o.price - COALESCE(o.discount, 0))), 0) as total_revenue,
CASE
WHEN SUM(o.quantity * o.price) > 0
THEN GREATEST(
-100.0,
LEAST(
100.0,
(
SUM(o.quantity * o.price) - -- Use gross revenue (before discounts)
SUM(o.quantity * COALESCE(p.cost_price, 0)) -- Total costs
) * 100.0 /
NULLIF(SUM(o.quantity * o.price), 0) -- Divide by gross revenue
)
)
ELSE 0
END as avg_margin
FROM filtered_products p
LEFT JOIN orders o ON p.pid = o.pid AND o.canceled = false
GROUP BY p.brand
)
SELECT
bd.brand,
bd.product_count,
bd.active_products,
bd.total_stock_units,
bd.total_stock_cost,
bd.total_stock_retail,
bd.total_revenue,
bd.avg_margin,
CASE
WHEN MAX(CASE WHEN sp.period_type = 'previous' THEN sp.period_revenue END) = 0
AND MAX(CASE WHEN sp.period_type = 'current' THEN sp.period_revenue END) > 0
THEN 100.0
WHEN MAX(CASE WHEN sp.period_type = 'previous' THEN sp.period_revenue END) = 0
THEN 0.0
ELSE GREATEST(
-100.0,
LEAST(
((MAX(CASE WHEN sp.period_type = 'current' THEN sp.period_revenue END) -
MAX(CASE WHEN sp.period_type = 'previous' THEN sp.period_revenue END)) /
NULLIF(ABS(MAX(CASE WHEN sp.period_type = 'previous' THEN sp.period_revenue END)), 0)) * 100.0,
999.99
) )
) )
END as growth_rate GROUP BY p.brand
FROM brand_data bd `, [batch.map(row => row.brand), lastCalculationTime, lastCalculationTime]);
LEFT JOIN sales_periods sp ON bd.brand = sp.brand
GROUP BY bd.brand, bd.product_count, bd.active_products, bd.total_stock_units,
bd.total_stock_cost, bd.total_stock_retail, bd.total_revenue, bd.avg_margin
ON DUPLICATE KEY UPDATE
product_count = VALUES(product_count),
active_products = VALUES(active_products),
total_stock_units = VALUES(total_stock_units),
total_stock_cost = VALUES(total_stock_cost),
total_stock_retail = VALUES(total_stock_retail),
total_revenue = VALUES(total_revenue),
avg_margin = VALUES(avg_margin),
growth_rate = VALUES(growth_rate),
last_calculated_at = CURRENT_TIMESTAMP
`);
processedCount = Math.floor(totalProducts * 0.97); // Populate sales stats with optimized date handling
outputProgress({ await connection.query(`
status: 'running', INSERT INTO temp_sales_stats
operation: 'Brand metrics calculated, starting time-based metrics', WITH date_ranges AS (
current: processedCount, SELECT
total: totalProducts, DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY) as current_start,
elapsed: formatElapsedTime(startTime), CURRENT_DATE as current_end,
remaining: estimateRemaining(startTime, processedCount, totalProducts), DATE_SUB(CURRENT_DATE, INTERVAL 60 DAY) as previous_start,
rate: calculateRate(startTime, processedCount), DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY) as previous_end
percentage: ((processedCount / totalProducts) * 100).toFixed(1), )
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (isCancelled) return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders: 0,
success
};
// Calculate brand time-based metrics with optimized query
await connection.query(`
INSERT INTO brand_time_metrics (
brand,
year,
month,
product_count,
active_products,
total_stock_units,
total_stock_cost,
total_stock_retail,
total_revenue,
avg_margin
)
WITH filtered_products AS (
SELECT
p.*,
CASE WHEN p.stock_quantity <= 5000 THEN p.pid END as valid_pid,
CASE WHEN p.visible = true AND p.stock_quantity <= 5000 THEN p.pid END as active_pid,
CASE
WHEN p.stock_quantity IS NULL OR p.stock_quantity < 0 OR p.stock_quantity > 5000 THEN 0
ELSE p.stock_quantity
END as valid_stock
FROM products p
WHERE p.brand IS NOT NULL
),
monthly_metrics AS (
SELECT SELECT
p.brand, p.brand,
YEAR(o.date) as year, COALESCE(SUM(
MONTH(o.date) as month, CASE WHEN o.date >= dr.current_start
COUNT(DISTINCT p.valid_pid) as product_count, THEN o.quantity * o.price
COUNT(DISTINCT p.active_pid) as active_products,
SUM(p.valid_stock) as total_stock_units,
SUM(p.valid_stock * p.cost_price) as total_stock_cost,
SUM(p.valid_stock * p.price) as total_stock_retail,
SUM(o.quantity * o.price) as total_revenue,
CASE
WHEN SUM(o.quantity * o.price) > 0
THEN GREATEST(
-100.0,
LEAST(
100.0,
(
SUM(o.quantity * o.price) - -- Use gross revenue (before discounts)
SUM(o.quantity * COALESCE(p.cost_price, 0)) -- Total costs
) * 100.0 /
NULLIF(SUM(o.quantity * o.price), 0) -- Divide by gross revenue
)
)
ELSE 0 ELSE 0
END as avg_margin END
FROM filtered_products p ), 0) as current_period_sales,
LEFT JOIN orders o ON p.pid = o.pid AND o.canceled = false COALESCE(SUM(
WHERE o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH) CASE WHEN o.date >= dr.previous_start AND o.date < dr.current_start
GROUP BY p.brand, YEAR(o.date), MONTH(o.date) THEN o.quantity * o.price
) ELSE 0
SELECT * END
FROM monthly_metrics ), 0) as previous_period_sales
ON DUPLICATE KEY UPDATE FROM products p
product_count = VALUES(product_count), FORCE INDEX (idx_brand)
active_products = VALUES(active_products), INNER JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid
total_stock_units = VALUES(total_stock_units), CROSS JOIN date_ranges dr
total_stock_cost = VALUES(total_stock_cost), WHERE p.brand IN (?)
total_stock_retail = VALUES(total_stock_retail), AND o.canceled = false
total_revenue = VALUES(total_revenue), AND o.date >= dr.previous_start
avg_margin = VALUES(avg_margin) AND o.updated > ?
`); GROUP BY p.brand
`, [batch.map(row => row.brand), lastCalculationTime]);
processedCount = Math.floor(totalProducts * 0.99); // Update metrics using temp tables with optimized calculations
outputProgress({ await connection.query(`
status: 'running', INSERT INTO brand_metrics (
operation: 'Brand time-based metrics calculated', brand,
current: processedCount, product_count,
total: totalProducts, active_products,
elapsed: formatElapsedTime(startTime), total_stock_units,
remaining: estimateRemaining(startTime, processedCount, totalProducts), total_stock_cost,
rate: calculateRate(startTime, processedCount), total_stock_retail,
percentage: ((processedCount / totalProducts) * 100).toFixed(1), total_revenue,
timing: { avg_margin,
start_time: new Date(startTime).toISOString(), growth_rate,
end_time: new Date().toISOString(), last_calculated_at
elapsed_seconds: Math.round((Date.now() - startTime) / 1000) )
} SELECT
}); ps.brand,
ps.product_count,
ps.active_products,
ps.total_stock_units,
ps.total_stock_cost,
ps.total_stock_retail,
ps.total_revenue,
ps.avg_margin,
CASE
WHEN COALESCE(ss.previous_period_sales, 0) = 0 AND COALESCE(ss.current_period_sales, 0) > 0 THEN 100
WHEN COALESCE(ss.previous_period_sales, 0) = 0 THEN 0
ELSE ROUND(LEAST(999.99, GREATEST(-100,
((ss.current_period_sales / NULLIF(ss.previous_period_sales, 0)) - 1) * 100
)), 2)
END as growth_rate,
NOW() as last_calculated_at
FROM temp_product_stats ps
LEFT JOIN temp_sales_stats ss ON ps.brand = ss.brand
ON DUPLICATE KEY UPDATE
product_count = VALUES(product_count),
active_products = VALUES(active_products),
total_stock_units = VALUES(total_stock_units),
total_stock_cost = VALUES(total_stock_cost),
total_stock_retail = VALUES(total_stock_retail),
total_revenue = VALUES(total_revenue),
avg_margin = VALUES(avg_margin),
growth_rate = VALUES(growth_rate),
last_calculated_at = NOW()
`);
// Clean up temp tables
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_product_stats');
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_sales_stats');
lastBrand = batch[batch.length - 1].brand;
processedBrands += batch.length; // Increment processed *brands*
outputProgress({
status: 'running',
operation: 'Processing brand metrics batch',
current: processedCount + processedBrands, // Use cumulative brand count
total: totalBrands, // Report total *brands*
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount + processedBrands, totalBrands),
rate: calculateRate(startTime, processedCount + processedBrands),
percentage: (((processedCount + processedBrands) / totalBrands) * 100).toFixed(1), // Base on brands
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
}
// If we get here, everything completed successfully // If we get here, everything completed successfully
success = true; success = true;
// Update calculate_status // Update calculate_status
await connection.query(` await connection.query(`
INSERT INTO calculate_status (module_name, last_calculation_timestamp) INSERT INTO calculate_status (module_name, last_calculation_timestamp)
@@ -298,8 +275,8 @@ async function calculateBrandMetrics(startTime, totalProducts, processedCount =
`); `);
return { return {
processedProducts: processedCount, processedProducts: 0, // Not directly processing products
processedOrders, processedOrders: 0,
processedPurchaseOrders: 0, processedPurchaseOrders: 0,
success success
}; };

View File

@@ -4,19 +4,53 @@ const { getConnection } = require('./utils/db');
async function calculateCategoryMetrics(startTime, totalProducts, processedCount = 0, isCancelled = false) { async function calculateCategoryMetrics(startTime, totalProducts, processedCount = 0, isCancelled = false) {
const connection = await getConnection(); const connection = await getConnection();
let success = false; let success = false;
let processedOrders = 0; const BATCH_SIZE = 5000;
let myProcessedProducts = 0; // Not *directly* processing products, but tracking categories
try { try {
// Get last calculation timestamp
const [lastCalc] = await connection.query(`
SELECT last_calculation_timestamp
FROM calculate_status
WHERE module_name = 'category_metrics'
`);
const lastCalculationTime = lastCalc[0]?.last_calculation_timestamp || '1970-01-01';
// Get total count of categories needing updates
const [categoryCount] = await connection.query(`
SELECT COUNT(DISTINCT c.cat_id) as count
FROM categories c
JOIN product_categories pc ON c.cat_id = pc.cat_id
LEFT JOIN products p ON pc.pid = p.pid AND p.updated > ?
LEFT JOIN orders o ON p.pid = o.pid AND o.updated > ?
WHERE c.status = 'active'
AND (
p.pid IS NOT NULL
OR o.id IS NOT NULL
)
`, [lastCalculationTime, lastCalculationTime]);
const totalCategories = categoryCount[0].count; // Track total *categories*
if (totalCategories === 0) {
console.log('No categories need metric updates');
return {
processedProducts: 0, // Not directly processing products
processedOrders: 0,
processedPurchaseOrders: 0,
success: true
};
}
if (isCancelled) { if (isCancelled) {
outputProgress({ outputProgress({
status: 'cancelled', status: 'cancelled',
operation: 'Category metrics calculation cancelled', operation: 'Category metrics calculation cancelled',
current: processedCount, current: processedCount, // Use passed-in value
total: totalProducts, total: totalCategories, // Report total *categories*
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: null, remaining: null,
rate: calculateRate(startTime, processedCount), rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1), percentage: ((processedCount / totalCategories) * 100).toFixed(1), // Base on categories
timing: { timing: {
start_time: new Date(startTime).toISOString(), start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(), end_time: new Date().toISOString(),
@@ -24,76 +58,22 @@ async function calculateCategoryMetrics(startTime, totalProducts, processedCount
} }
}); });
return { return {
processedProducts: processedCount, processedProducts: 0, // Not directly processing products
processedOrders: 0, processedOrders: 0,
processedPurchaseOrders: 0, processedPurchaseOrders: 0,
success success
}; };
} }
// Get order count that will be processed
const [orderCount] = await connection.query(`
SELECT COUNT(*) as count
FROM orders o
WHERE o.canceled = false
`);
processedOrders = orderCount[0].count;
outputProgress({ outputProgress({
status: 'running', status: 'running',
operation: 'Starting category metrics calculation', operation: 'Starting category metrics calculation',
current: processedCount, current: processedCount, // Use passed-in value
total: totalProducts, total: totalCategories, // Report total *categories*
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts), remaining: estimateRemaining(startTime, processedCount, totalCategories),
rate: calculateRate(startTime, processedCount), rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1), percentage: ((processedCount / totalCategories) * 100).toFixed(1), // Base on categories
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
// First, calculate base category metrics
await connection.query(`
INSERT INTO category_metrics (
category_id,
product_count,
active_products,
total_value,
status,
last_calculated_at
)
SELECT
c.cat_id,
COUNT(DISTINCT p.pid) as product_count,
COUNT(DISTINCT CASE WHEN p.visible = true THEN p.pid END) as active_products,
COALESCE(SUM(p.stock_quantity * p.cost_price), 0) as total_value,
c.status,
NOW() as last_calculated_at
FROM categories c
LEFT JOIN product_categories pc ON c.cat_id = pc.cat_id
LEFT JOIN products p ON pc.pid = p.pid
GROUP BY c.cat_id, c.status
ON DUPLICATE KEY UPDATE
product_count = VALUES(product_count),
active_products = VALUES(active_products),
total_value = VALUES(total_value),
status = VALUES(status),
last_calculated_at = VALUES(last_calculated_at)
`);
processedCount = Math.floor(totalProducts * 0.90);
outputProgress({
status: 'running',
operation: 'Base category metrics calculated, updating with margin data',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: { timing: {
start_time: new Date(startTime).toISOString(), start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(), end_time: new Date().toISOString(),
@@ -101,399 +81,196 @@ async function calculateCategoryMetrics(startTime, totalProducts, processedCount
} }
}); });
if (isCancelled) return { // Process in batches
processedProducts: processedCount, let lastCatId = 0;
processedOrders, let processedCategories = 0; // Track processed categories
processedPurchaseOrders: 0, while (true) {
success if (isCancelled) break;
};
// Then update with margin and turnover data const [batch] = await connection.query(`
await connection.query(` SELECT DISTINCT c.cat_id
WITH category_sales AS ( FROM categories c
FORCE INDEX (PRIMARY)
JOIN product_categories pc FORCE INDEX (idx_category) ON c.cat_id = pc.cat_id
LEFT JOIN products p FORCE INDEX (PRIMARY) ON pc.pid = p.pid AND p.updated > ?
LEFT JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid AND o.updated > ?
WHERE c.status = 'active'
AND c.cat_id > ?
AND (
p.pid IS NOT NULL
OR o.id IS NOT NULL
)
ORDER BY c.cat_id
LIMIT ?
`, [lastCalculationTime, lastCalculationTime, lastCatId, BATCH_SIZE]);
if (batch.length === 0) break;
// Create temporary tables for better performance
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_product_stats');
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_sales_stats');
await connection.query(`
CREATE TEMPORARY TABLE temp_product_stats (
cat_id BIGINT NOT NULL,
product_count INT,
active_products INT,
total_value DECIMAL(15,2),
avg_margin DECIMAL(5,2),
turnover_rate DECIMAL(10,2),
PRIMARY KEY (cat_id),
INDEX (product_count),
INDEX (total_value)
) ENGINE=MEMORY
`);
await connection.query(`
CREATE TEMPORARY TABLE temp_sales_stats (
cat_id BIGINT NOT NULL,
recent_revenue DECIMAL(15,2),
previous_revenue DECIMAL(15,2),
PRIMARY KEY (cat_id),
INDEX (recent_revenue),
INDEX (previous_revenue)
) ENGINE=MEMORY
`);
// Populate product stats with optimized index usage
await connection.query(`
INSERT INTO temp_product_stats
SELECT SELECT
pc.cat_id, c.cat_id,
SUM(o.quantity * o.price) as total_sales, COUNT(DISTINCT p.pid) as product_count,
SUM(o.quantity * (o.price - p.cost_price)) as total_margin, COUNT(DISTINCT CASE WHEN p.visible = true THEN p.pid END) as active_products,
SUM(o.quantity) as units_sold, COALESCE(SUM(p.stock_quantity * p.cost_price), 0) as total_value,
AVG(GREATEST(p.stock_quantity, 0)) as avg_stock, COALESCE(AVG(NULLIF(pm.avg_margin_percent, 0)), 0) as avg_margin,
COUNT(DISTINCT DATE(o.date)) as active_days COALESCE(AVG(NULLIF(pm.turnover_rate, 0)), 0) as turnover_rate
FROM product_categories pc FROM categories c
JOIN products p ON pc.pid = p.pid FORCE INDEX (PRIMARY)
JOIN orders o ON p.pid = o.pid INNER JOIN product_categories pc FORCE INDEX (idx_category) ON c.cat_id = pc.cat_id
LEFT JOIN turnover_config tc ON LEFT JOIN products p FORCE INDEX (PRIMARY) ON pc.pid = p.pid
(tc.category_id = pc.cat_id AND tc.vendor = p.vendor) OR LEFT JOIN product_metrics pm FORCE INDEX (PRIMARY) ON p.pid = pm.pid
(tc.category_id = pc.cat_id AND tc.vendor IS NULL) OR WHERE c.cat_id IN (?)
(tc.category_id IS NULL AND tc.vendor = p.vendor) OR AND (
(tc.category_id IS NULL AND tc.vendor IS NULL) p.updated > ?
WHERE o.canceled = false OR EXISTS (
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL COALESCE(tc.calculation_period_days, 30) DAY) SELECT 1 FROM orders o FORCE INDEX (idx_orders_metrics)
GROUP BY pc.cat_id WHERE o.pid = p.pid
) AND o.updated > ?
UPDATE category_metrics cm )
JOIN category_sales cs ON cm.category_id = cs.cat_id
LEFT JOIN turnover_config tc ON
(tc.category_id = cm.category_id AND tc.vendor IS NULL) OR
(tc.category_id IS NULL AND tc.vendor IS NULL)
SET
cm.avg_margin = COALESCE(cs.total_margin * 100.0 / NULLIF(cs.total_sales, 0), 0),
cm.turnover_rate = CASE
WHEN cs.avg_stock > 0 AND cs.active_days > 0
THEN LEAST(
(cs.units_sold / cs.avg_stock) * (365.0 / cs.active_days),
999.99
) )
ELSE 0 GROUP BY c.cat_id
END, `, [batch.map(row => row.cat_id), lastCalculationTime, lastCalculationTime]);
cm.last_calculated_at = NOW()
`);
processedCount = Math.floor(totalProducts * 0.95); // Populate sales stats with optimized date handling
outputProgress({ await connection.query(`
status: 'running', INSERT INTO temp_sales_stats
operation: 'Margin data updated, calculating growth rates', WITH date_ranges AS (
current: processedCount, SELECT
total: totalProducts, DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY) as current_start,
elapsed: formatElapsedTime(startTime), CURRENT_DATE as current_end,
remaining: estimateRemaining(startTime, processedCount, totalProducts), DATE_SUB(CURRENT_DATE, INTERVAL 60 DAY) as previous_start,
rate: calculateRate(startTime, processedCount), DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY) as previous_end
percentage: ((processedCount / totalProducts) * 100).toFixed(1), )
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (isCancelled) return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders: 0,
success
};
// Finally update growth rates
await connection.query(`
WITH current_period AS (
SELECT SELECT
pc.cat_id, c.cat_id,
SUM(o.quantity * (o.price - COALESCE(o.discount, 0)) / COALESCE(SUM(
(1 + COALESCE(ss.seasonality_factor, 0))) as revenue, CASE WHEN o.date >= dr.current_start
SUM(o.quantity * (o.price - COALESCE(o.discount, 0) - p.cost_price)) as gross_profit, THEN o.quantity * o.price
COUNT(DISTINCT DATE(o.date)) as days ELSE 0
FROM product_categories pc END
JOIN products p ON pc.pid = p.pid ), 0) as recent_revenue,
JOIN orders o ON p.pid = o.pid COALESCE(SUM(
LEFT JOIN sales_seasonality ss ON MONTH(o.date) = ss.month CASE WHEN o.date >= dr.previous_start AND o.date < dr.current_start
WHERE o.canceled = false THEN o.quantity * o.price
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 3 MONTH) ELSE 0
GROUP BY pc.cat_id END
), ), 0) as previous_revenue
previous_period AS ( FROM categories c
SELECT FORCE INDEX (PRIMARY)
pc.cat_id, INNER JOIN product_categories pc FORCE INDEX (idx_category) ON c.cat_id = pc.cat_id
SUM(o.quantity * (o.price - COALESCE(o.discount, 0)) / INNER JOIN products p FORCE INDEX (PRIMARY) ON pc.pid = p.pid
(1 + COALESCE(ss.seasonality_factor, 0))) as revenue, INNER JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid
COUNT(DISTINCT DATE(o.date)) as days
FROM product_categories pc
JOIN products p ON pc.pid = p.pid
JOIN orders o ON p.pid = o.pid
LEFT JOIN sales_seasonality ss ON MONTH(o.date) = ss.month
WHERE o.canceled = false
AND o.date BETWEEN DATE_SUB(CURRENT_DATE, INTERVAL 15 MONTH)
AND DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH)
GROUP BY pc.cat_id
),
trend_data AS (
SELECT
pc.cat_id,
MONTH(o.date) as month,
SUM(o.quantity * (o.price - COALESCE(o.discount, 0)) /
(1 + COALESCE(ss.seasonality_factor, 0))) as revenue,
COUNT(DISTINCT DATE(o.date)) as days_in_month
FROM product_categories pc
JOIN products p ON pc.pid = p.pid
JOIN orders o ON p.pid = o.pid
LEFT JOIN sales_seasonality ss ON MONTH(o.date) = ss.month
WHERE o.canceled = false
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 15 MONTH)
GROUP BY pc.cat_id, MONTH(o.date)
),
trend_stats AS (
SELECT
cat_id,
COUNT(*) as n,
AVG(month) as avg_x,
AVG(revenue / NULLIF(days_in_month, 0)) as avg_y,
SUM(month * (revenue / NULLIF(days_in_month, 0))) as sum_xy,
SUM(month * month) as sum_xx
FROM trend_data
GROUP BY cat_id
HAVING COUNT(*) >= 6
),
trend_analysis AS (
SELECT
cat_id,
((n * sum_xy) - (avg_x * n * avg_y)) /
NULLIF((n * sum_xx) - (n * avg_x * avg_x), 0) as trend_slope,
avg_y as avg_daily_revenue
FROM trend_stats
),
margin_calc AS (
SELECT
pc.cat_id,
CASE
WHEN SUM(o.quantity * o.price) > 0 THEN
GREATEST(
-100.0,
LEAST(
100.0,
(
SUM(o.quantity * o.price) - -- Use gross revenue (before discounts)
SUM(o.quantity * COALESCE(p.cost_price, 0)) -- Total costs
) * 100.0 /
NULLIF(SUM(o.quantity * o.price), 0) -- Divide by gross revenue
)
)
ELSE NULL
END as avg_margin
FROM product_categories pc
JOIN products p ON pc.pid = p.pid
JOIN orders o ON p.pid = o.pid
WHERE o.canceled = false
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 3 MONTH)
GROUP BY pc.cat_id
)
UPDATE category_metrics cm
LEFT JOIN current_period cp ON cm.category_id = cp.cat_id
LEFT JOIN previous_period pp ON cm.category_id = pp.cat_id
LEFT JOIN trend_analysis ta ON cm.category_id = ta.cat_id
LEFT JOIN margin_calc mc ON cm.category_id = mc.cat_id
SET
cm.growth_rate = CASE
WHEN pp.revenue = 0 AND COALESCE(cp.revenue, 0) > 0 THEN 100.0
WHEN pp.revenue = 0 OR cp.revenue IS NULL THEN 0.0
WHEN ta.trend_slope IS NOT NULL THEN
GREATEST(
-100.0,
LEAST(
(ta.trend_slope / NULLIF(ta.avg_daily_revenue, 0)) * 365 * 100,
999.99
)
)
ELSE
GREATEST(
-100.0,
LEAST(
((COALESCE(cp.revenue, 0) - pp.revenue) /
NULLIF(ABS(pp.revenue), 0)) * 100.0,
999.99
)
)
END,
cm.avg_margin = COALESCE(mc.avg_margin, cm.avg_margin),
cm.last_calculated_at = NOW()
WHERE cp.cat_id IS NOT NULL OR pp.cat_id IS NOT NULL
`);
processedCount = Math.floor(totalProducts * 0.97);
outputProgress({
status: 'running',
operation: 'Growth rates calculated, updating time-based metrics',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (isCancelled) return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders: 0,
success
};
// Calculate time-based metrics
await connection.query(`
INSERT INTO category_time_metrics (
category_id,
year,
month,
product_count,
active_products,
total_value,
total_revenue,
avg_margin,
turnover_rate
)
SELECT
pc.cat_id,
YEAR(o.date) as year,
MONTH(o.date) as month,
COUNT(DISTINCT p.pid) as product_count,
COUNT(DISTINCT CASE WHEN p.visible = true THEN p.pid END) as active_products,
SUM(p.stock_quantity * p.cost_price) as total_value,
SUM(o.quantity * o.price) as total_revenue,
CASE
WHEN SUM(o.quantity * o.price) > 0 THEN
LEAST(
GREATEST(
SUM(o.quantity * (o.price - GREATEST(p.cost_price, 0))) * 100.0 /
SUM(o.quantity * o.price),
-100
),
100
)
ELSE 0
END as avg_margin,
COALESCE(
LEAST(
SUM(o.quantity) / NULLIF(AVG(GREATEST(p.stock_quantity, 0)), 0),
999.99
),
0
) as turnover_rate
FROM product_categories pc
JOIN products p ON pc.pid = p.pid
JOIN orders o ON p.pid = o.pid
WHERE o.canceled = false
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH)
GROUP BY pc.cat_id, YEAR(o.date), MONTH(o.date)
ON DUPLICATE KEY UPDATE
product_count = VALUES(product_count),
active_products = VALUES(active_products),
total_value = VALUES(total_value),
total_revenue = VALUES(total_revenue),
avg_margin = VALUES(avg_margin),
turnover_rate = VALUES(turnover_rate)
`);
processedCount = Math.floor(totalProducts * 0.99);
outputProgress({
status: 'running',
operation: 'Time-based metrics calculated, updating category-sales metrics',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (isCancelled) return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders: 0,
success
};
// Calculate category-sales metrics
await connection.query(`
INSERT INTO category_sales_metrics (
category_id,
brand,
period_start,
period_end,
avg_daily_sales,
total_sold,
num_products,
avg_price,
last_calculated_at
)
WITH date_ranges AS (
SELECT
DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY) as period_start,
CURRENT_DATE as period_end
UNION ALL
SELECT
DATE_SUB(CURRENT_DATE, INTERVAL 90 DAY),
DATE_SUB(CURRENT_DATE, INTERVAL 31 DAY)
UNION ALL
SELECT
DATE_SUB(CURRENT_DATE, INTERVAL 180 DAY),
DATE_SUB(CURRENT_DATE, INTERVAL 91 DAY)
UNION ALL
SELECT
DATE_SUB(CURRENT_DATE, INTERVAL 365 DAY),
DATE_SUB(CURRENT_DATE, INTERVAL 181 DAY)
),
sales_data AS (
SELECT
pc.cat_id,
COALESCE(p.brand, 'Unknown') as brand,
dr.period_start,
dr.period_end,
COUNT(DISTINCT p.pid) as num_products,
SUM(o.quantity) as total_sold,
SUM(o.quantity * o.price) as total_revenue,
COUNT(DISTINCT DATE(o.date)) as num_days
FROM products p
JOIN product_categories pc ON p.pid = pc.pid
JOIN orders o ON p.pid = o.pid
CROSS JOIN date_ranges dr CROSS JOIN date_ranges dr
WHERE o.canceled = false WHERE c.cat_id IN (?)
AND o.date BETWEEN dr.period_start AND dr.period_end AND o.canceled = false
GROUP BY pc.cat_id, p.brand, dr.period_start, dr.period_end AND o.date >= dr.previous_start
) AND o.updated > ?
SELECT GROUP BY c.cat_id
cat_id as category_id, `, [batch.map(row => row.cat_id), lastCalculationTime]);
brand,
period_start,
period_end,
CASE
WHEN num_days > 0
THEN total_sold / num_days
ELSE 0
END as avg_daily_sales,
total_sold,
num_products,
CASE
WHEN total_sold > 0
THEN total_revenue / total_sold
ELSE 0
END as avg_price,
NOW() as last_calculated_at
FROM sales_data
ON DUPLICATE KEY UPDATE
avg_daily_sales = VALUES(avg_daily_sales),
total_sold = VALUES(total_sold),
num_products = VALUES(num_products),
avg_price = VALUES(avg_price),
last_calculated_at = VALUES(last_calculated_at)
`);
processedCount = Math.floor(totalProducts * 1.0); // Update metrics using temp tables with optimized calculations
outputProgress({ await connection.query(`
status: 'running', INSERT INTO category_metrics (
operation: 'Category-sales metrics calculated', category_id,
current: processedCount, product_count,
total: totalProducts, active_products,
elapsed: formatElapsedTime(startTime), total_value,
remaining: estimateRemaining(startTime, processedCount, totalProducts), avg_margin,
rate: calculateRate(startTime, processedCount), turnover_rate,
percentage: ((processedCount / totalProducts) * 100).toFixed(1), growth_rate,
timing: { status,
start_time: new Date(startTime).toISOString(), last_calculated_at
end_time: new Date().toISOString(), )
elapsed_seconds: Math.round((Date.now() - startTime) / 1000) SELECT
} c.cat_id,
}); ps.product_count,
ps.active_products,
ps.total_value,
ps.avg_margin,
ps.turnover_rate,
CASE
WHEN COALESCE(ss.previous_revenue, 0) = 0 AND COALESCE(ss.recent_revenue, 0) > 0 THEN 100
WHEN COALESCE(ss.previous_revenue, 0) = 0 THEN 0
ELSE ROUND(LEAST(999.99, GREATEST(-100,
((ss.recent_revenue / NULLIF(ss.previous_revenue, 0)) - 1) * 100
)), 2)
END as growth_rate,
c.status,
NOW() as last_calculated_at
FROM categories c
FORCE INDEX (PRIMARY)
LEFT JOIN temp_product_stats ps ON c.cat_id = ps.cat_id
LEFT JOIN temp_sales_stats ss ON c.cat_id = ss.cat_id
WHERE c.cat_id IN (?)
ON DUPLICATE KEY UPDATE
product_count = VALUES(product_count),
active_products = VALUES(active_products),
total_value = VALUES(total_value),
avg_margin = VALUES(avg_margin),
turnover_rate = VALUES(turnover_rate),
growth_rate = VALUES(growth_rate),
status = VALUES(status),
last_calculated_at = NOW()
`, [batch.map(row => row.cat_id)]);
// Clean up temp tables
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_product_stats');
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_sales_stats');
lastCatId = batch[batch.length - 1].cat_id;
processedCategories += batch.length; // Increment processed *categories*
outputProgress({
status: 'running',
operation: 'Processing category metrics batch',
current: processedCount + processedCategories, // Use cumulative category count
total: totalCategories, // Report total *categories*
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount + processedCategories, totalCategories),
rate: calculateRate(startTime, processedCount + processedCategories),
percentage: (((processedCount + processedCategories) / totalCategories) * 100).toFixed(1), // Base on categories
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
}
// If we get here, everything completed successfully // If we get here, everything completed successfully
success = true; success = true;
// Update calculate_status // Update calculate_status
await connection.query(` await connection.query(`
INSERT INTO calculate_status (module_name, last_calculation_timestamp) INSERT INTO calculate_status (module_name, last_calculation_timestamp)
@@ -502,8 +279,8 @@ async function calculateCategoryMetrics(startTime, totalProducts, processedCount
`); `);
return { return {
processedProducts: processedCount, processedProducts: 0, // Not directly processing products
processedOrders, processedOrders: 0,
processedPurchaseOrders: 0, processedPurchaseOrders: 0,
success success
}; };

View File

@@ -4,9 +4,40 @@ const { getConnection } = require('./utils/db');
async function calculateFinancialMetrics(startTime, totalProducts, processedCount = 0, isCancelled = false) { async function calculateFinancialMetrics(startTime, totalProducts, processedCount = 0, isCancelled = false) {
const connection = await getConnection(); const connection = await getConnection();
let success = false; let success = false;
let processedOrders = 0; const BATCH_SIZE = 5000;
let myProcessedProducts = 0; // Track products processed *within this module*
try { try {
// Get last calculation timestamp
const [lastCalc] = await connection.query(`
SELECT last_calculation_timestamp
FROM calculate_status
WHERE module_name = 'financial_metrics'
`);
const lastCalculationTime = lastCalc[0]?.last_calculation_timestamp || '1970-01-01';
// Get total count of products needing updates
if (!totalProducts) {
const [productCount] = await connection.query(`
SELECT COUNT(DISTINCT p.pid) as count
FROM products p
LEFT JOIN orders o ON p.pid = o.pid AND o.updated > ?
WHERE p.updated > ?
OR o.pid IS NOT NULL
`, [lastCalculationTime, lastCalculationTime]);
totalProducts = productCount[0].count;
}
if (totalProducts === 0) {
console.log('No products need financial metric updates');
return {
processedProducts: 0,
processedOrders: 0,
processedPurchaseOrders: 0,
success: true
};
}
if (isCancelled) { if (isCancelled) {
outputProgress({ outputProgress({
status: 'cancelled', status: 'cancelled',
@@ -24,22 +55,13 @@ async function calculateFinancialMetrics(startTime, totalProducts, processedCoun
} }
}); });
return { return {
processedProducts: processedCount, processedProducts: myProcessedProducts,
processedOrders: 0, processedOrders: 0,
processedPurchaseOrders: 0, processedPurchaseOrders: 0,
success success
}; };
} }
// Get order count that will be processed
const [orderCount] = await connection.query(`
SELECT COUNT(*) as count
FROM orders o
WHERE o.canceled = false
AND DATE(o.date) >= DATE_SUB(CURDATE(), INTERVAL 12 MONTH)
`);
processedOrders = orderCount[0].count;
outputProgress({ outputProgress({
status: 'running', status: 'running',
operation: 'Starting financial metrics calculation', operation: 'Starting financial metrics calculation',
@@ -56,110 +78,80 @@ async function calculateFinancialMetrics(startTime, totalProducts, processedCoun
} }
}); });
// Calculate financial metrics with optimized query // Process in batches
await connection.query(` let lastPid = 0;
WITH product_financials AS ( while (true) {
SELECT if (isCancelled) break;
p.pid,
p.cost_price * p.stock_quantity as inventory_value, const [batch] = await connection.query(`
SUM(o.quantity * o.price) as total_revenue, SELECT DISTINCT p.pid
SUM(o.quantity * p.cost_price) as cost_of_goods_sold,
SUM(o.quantity * (o.price - p.cost_price)) as gross_profit,
MIN(o.date) as first_sale_date,
MAX(o.date) as last_sale_date,
DATEDIFF(MAX(o.date), MIN(o.date)) + 1 as calculation_period_days,
COUNT(DISTINCT DATE(o.date)) as active_days
FROM products p FROM products p
LEFT JOIN orders o ON p.pid = o.pid LEFT JOIN orders o ON p.pid = o.pid
WHERE o.canceled = false WHERE p.pid > ?
AND DATE(o.date) >= DATE_SUB(CURDATE(), INTERVAL 12 MONTH) AND (
GROUP BY p.pid p.updated > ?
) OR EXISTS (
UPDATE product_metrics pm SELECT 1 FROM orders o2
JOIN product_financials pf ON pm.pid = pf.pid WHERE o2.pid = p.pid
SET AND o2.updated > ?
pm.inventory_value = COALESCE(pf.inventory_value, 0), )
pm.total_revenue = COALESCE(pf.total_revenue, 0), )
pm.cost_of_goods_sold = COALESCE(pf.cost_of_goods_sold, 0), ORDER BY p.pid
pm.gross_profit = COALESCE(pf.gross_profit, 0), LIMIT ?
pm.gmroi = CASE `, [lastPid, lastCalculationTime, lastCalculationTime, BATCH_SIZE]);
WHEN COALESCE(pf.inventory_value, 0) > 0 AND pf.active_days > 0 THEN
(COALESCE(pf.gross_profit, 0) * (365.0 / pf.active_days)) / COALESCE(pf.inventory_value, 0)
ELSE 0
END,
pm.last_calculated_at = CURRENT_TIMESTAMP
`);
processedCount = Math.floor(totalProducts * 0.65); if (batch.length === 0) break;
outputProgress({
status: 'running',
operation: 'Base financial metrics calculated, updating time aggregates',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (isCancelled) return { // Update financial metrics for this batch
processedProducts: processedCount, await connection.query(`
processedOrders, UPDATE product_metrics pm
processedPurchaseOrders: 0, JOIN (
success SELECT
}; p.pid,
p.cost_price * p.stock_quantity as inventory_value,
SUM(o.quantity * o.price) as total_revenue,
SUM(o.quantity * p.cost_price) as cost_of_goods_sold,
SUM(o.quantity * (o.price - p.cost_price)) as gross_profit,
COUNT(DISTINCT DATE(o.date)) as active_days
FROM products p
LEFT JOIN orders o ON p.pid = o.pid
AND o.canceled = false
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 90 DAY)
WHERE p.pid IN (?)
GROUP BY p.pid
) fin ON pm.pid = fin.pid
SET
pm.inventory_value = COALESCE(fin.inventory_value, 0),
pm.total_revenue = COALESCE(fin.total_revenue, 0),
pm.cost_of_goods_sold = COALESCE(fin.cost_of_goods_sold, 0),
pm.gross_profit = COALESCE(fin.gross_profit, 0),
pm.gmroi = CASE
WHEN COALESCE(fin.inventory_value, 0) > 0 AND fin.active_days > 0
THEN (COALESCE(fin.gross_profit, 0) * (365.0 / fin.active_days)) / COALESCE(fin.inventory_value, 0)
ELSE 0
END,
pm.last_calculated_at = NOW()
`, [batch.map(row => row.pid)]);
// Update time-based aggregates with optimized query lastPid = batch[batch.length - 1].pid;
await connection.query(` myProcessedProducts += batch.length;
WITH monthly_financials AS (
SELECT
p.pid,
YEAR(o.date) as year,
MONTH(o.date) as month,
p.cost_price * p.stock_quantity as inventory_value,
SUM(o.quantity * (o.price - p.cost_price)) as gross_profit,
COUNT(DISTINCT DATE(o.date)) as active_days,
MIN(o.date) as period_start,
MAX(o.date) as period_end
FROM products p
LEFT JOIN orders o ON p.pid = o.pid
WHERE o.canceled = false
GROUP BY p.pid, YEAR(o.date), MONTH(o.date)
)
UPDATE product_time_aggregates pta
JOIN monthly_financials mf ON pta.pid = mf.pid
AND pta.year = mf.year
AND pta.month = mf.month
SET
pta.inventory_value = COALESCE(mf.inventory_value, 0),
pta.gmroi = CASE
WHEN COALESCE(mf.inventory_value, 0) > 0 AND mf.active_days > 0 THEN
(COALESCE(mf.gross_profit, 0) * (365.0 / mf.active_days)) / COALESCE(mf.inventory_value, 0)
ELSE 0
END
`);
processedCount = Math.floor(totalProducts * 0.70); outputProgress({
outputProgress({ status: 'running',
status: 'running', operation: 'Processing financial metrics batch',
operation: 'Time-based aggregates updated', current: processedCount + myProcessedProducts,
current: processedCount, total: totalProducts,
total: totalProducts, elapsed: formatElapsedTime(startTime),
elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, processedCount + myProcessedProducts, totalProducts),
remaining: estimateRemaining(startTime, processedCount, totalProducts), rate: calculateRate(startTime, processedCount + myProcessedProducts),
rate: calculateRate(startTime, processedCount), percentage: (((processedCount + myProcessedProducts) / totalProducts) * 100).toFixed(1),
percentage: ((processedCount / totalProducts) * 100).toFixed(1), timing: {
timing: { start_time: new Date(startTime).toISOString(),
start_time: new Date(startTime).toISOString(), end_time: new Date().toISOString(),
end_time: new Date().toISOString(), elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
elapsed_seconds: Math.round((Date.now() - startTime) / 1000) }
} });
}); }
// If we get here, everything completed successfully // If we get here, everything completed successfully
success = true; success = true;
@@ -172,8 +164,8 @@ async function calculateFinancialMetrics(startTime, totalProducts, processedCoun
`); `);
return { return {
processedProducts: processedCount, processedProducts: myProcessedProducts,
processedOrders, processedOrders: 0,
processedPurchaseOrders: 0, processedPurchaseOrders: 0,
success success
}; };

View File

@@ -13,19 +13,32 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
const connection = await getConnection(); const connection = await getConnection();
let success = false; let success = false;
let processedOrders = 0; let processedOrders = 0;
let myProcessedProducts = 0; // Track products processed *within this module*
const BATCH_SIZE = 5000; const BATCH_SIZE = 5000;
try { try {
// Get last calculation timestamp
const [lastCalc] = await connection.query(`
SELECT last_calculation_timestamp
FROM calculate_status
WHERE module_name = 'product_metrics'
`);
const lastCalculationTime = lastCalc[0]?.last_calculation_timestamp || '1970-01-01';
if (totalProducts === 0) {
console.log('No products need updating');
return {
processedProducts: myProcessedProducts,
processedOrders: 0,
processedPurchaseOrders: 0,
success: true
};
}
// Skip flags are inherited from the parent scope // Skip flags are inherited from the parent scope
const SKIP_PRODUCT_BASE_METRICS = 0; const SKIP_PRODUCT_BASE_METRICS = 0;
const SKIP_PRODUCT_TIME_AGGREGATES = 0; const SKIP_PRODUCT_TIME_AGGREGATES = 0;
// Get total product count if not provided
if (!totalProducts) {
const [productCount] = await connection.query('SELECT COUNT(*) as count FROM products');
totalProducts = productCount[0].count;
}
if (isCancelled) { if (isCancelled) {
outputProgress({ outputProgress({
status: 'cancelled', status: 'cancelled',
@@ -43,7 +56,7 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
} }
}); });
return { return {
processedProducts: processedCount, processedProducts: myProcessedProducts,
processedOrders, processedOrders,
processedPurchaseOrders: 0, processedPurchaseOrders: 0,
success success
@@ -93,10 +106,39 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
processedOrders = orderCount[0].count; processedOrders = orderCount[0].count;
// Clear temporary tables // Clear temporary tables
await connection.query('TRUNCATE TABLE temp_sales_metrics'); await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_sales_metrics');
await connection.query('TRUNCATE TABLE temp_purchase_metrics'); await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_purchase_metrics');
// Populate temp_sales_metrics with base stats and sales averages // Create optimized temporary tables with indexes
await connection.query(`
CREATE TEMPORARY TABLE temp_sales_metrics (
pid BIGINT NOT NULL,
daily_sales_avg DECIMAL(10,3),
weekly_sales_avg DECIMAL(10,3),
monthly_sales_avg DECIMAL(10,3),
total_revenue DECIMAL(10,2),
avg_margin_percent DECIMAL(5,2),
first_sale_date DATE,
last_sale_date DATE,
PRIMARY KEY (pid),
INDEX (daily_sales_avg),
INDEX (total_revenue)
) ENGINE=MEMORY
`);
await connection.query(`
CREATE TEMPORARY TABLE temp_purchase_metrics (
pid BIGINT NOT NULL,
avg_lead_time_days DECIMAL(5,1),
last_purchase_date DATE,
first_received_date DATE,
last_received_date DATE,
PRIMARY KEY (pid),
INDEX (avg_lead_time_days)
) ENGINE=MEMORY
`);
// Populate temp_sales_metrics with base stats and sales averages using FORCE INDEX
await connection.query(` await connection.query(`
INSERT INTO temp_sales_metrics INSERT INTO temp_sales_metrics
SELECT SELECT
@@ -113,13 +155,21 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
MIN(o.date) as first_sale_date, MIN(o.date) as first_sale_date,
MAX(o.date) as last_sale_date MAX(o.date) as last_sale_date
FROM products p FROM products p
LEFT JOIN orders o ON p.pid = o.pid FORCE INDEX (PRIMARY)
AND o.canceled = false LEFT JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid
AND o.date >= DATE_SUB(CURDATE(), INTERVAL 90 DAY) AND o.canceled = false
AND o.date >= DATE_SUB(CURDATE(), INTERVAL 90 DAY)
WHERE p.updated > ?
OR EXISTS (
SELECT 1 FROM orders o2 FORCE INDEX (idx_orders_metrics)
WHERE o2.pid = p.pid
AND o2.canceled = false
AND o2.updated > ?
)
GROUP BY p.pid GROUP BY p.pid
`); `, [lastCalculationTime, lastCalculationTime]);
// Populate temp_purchase_metrics // Populate temp_purchase_metrics with optimized index usage
await connection.query(` await connection.query(`
INSERT INTO temp_purchase_metrics INSERT INTO temp_purchase_metrics
SELECT SELECT
@@ -129,21 +179,38 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
MIN(po.received_date) as first_received_date, MIN(po.received_date) as first_received_date,
MAX(po.received_date) as last_received_date MAX(po.received_date) as last_received_date
FROM products p FROM products p
LEFT JOIN purchase_orders po ON p.pid = po.pid FORCE INDEX (PRIMARY)
AND po.received_date IS NOT NULL LEFT JOIN purchase_orders po FORCE INDEX (idx_po_metrics) ON p.pid = po.pid
AND po.date >= DATE_SUB(CURDATE(), INTERVAL 365 DAY) AND po.received_date IS NOT NULL
AND po.date >= DATE_SUB(CURDATE(), INTERVAL 365 DAY)
WHERE p.updated > ?
OR EXISTS (
SELECT 1 FROM purchase_orders po2 FORCE INDEX (idx_po_metrics)
WHERE po2.pid = p.pid
AND po2.updated > ?
)
GROUP BY p.pid GROUP BY p.pid
`); `, [lastCalculationTime, lastCalculationTime]);
// Process updates in batches // Process updates in batches, but only for affected products
let lastPid = 0; let lastPid = 0;
while (true) { while (true) {
if (isCancelled) break; if (isCancelled) break;
const [batch] = await connection.query( const [batch] = await connection.query(`
'SELECT pid FROM products WHERE pid > ? ORDER BY pid LIMIT ?', SELECT DISTINCT p.pid
[lastPid, BATCH_SIZE] FROM products p
); LEFT JOIN orders o ON p.pid = o.pid AND o.updated > ?
LEFT JOIN purchase_orders po ON p.pid = po.pid AND po.updated > ?
WHERE p.pid > ?
AND (
p.updated > ?
OR o.pid IS NOT NULL
OR po.pid IS NOT NULL
)
ORDER BY p.pid
LIMIT ?
`, [lastCalculationTime, lastCalculationTime, lastPid, lastCalculationTime, BATCH_SIZE]);
if (batch.length === 0) break; if (batch.length === 0) break;
@@ -210,6 +277,7 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
lastPid = batch[batch.length - 1].pid; lastPid = batch[batch.length - 1].pid;
processedCount += batch.length; processedCount += batch.length;
myProcessedProducts += batch.length; // Increment the *module's* count
outputProgress({ outputProgress({
status: 'running', status: 'running',
@@ -281,12 +349,12 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
outputProgress({ outputProgress({
status: 'running', status: 'running',
operation: 'Starting product time aggregates calculation', operation: 'Starting product time aggregates calculation',
current: processedCount || 0, current: processedCount,
total: totalProducts || 0, total: totalProducts,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount || 0, totalProducts || 0), remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount || 0), rate: calculateRate(startTime, processedCount),
percentage: (((processedCount || 0) / (totalProducts || 1)) * 100).toFixed(1), percentage: (((processedCount) / (totalProducts || 1)) * 100).toFixed(1),
timing: { timing: {
start_time: new Date(startTime).toISOString(), start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(), end_time: new Date().toISOString(),
@@ -348,12 +416,12 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
outputProgress({ outputProgress({
status: 'running', status: 'running',
operation: 'Product time aggregates calculated', operation: 'Product time aggregates calculated',
current: processedCount || 0, current: processedCount,
total: totalProducts || 0, total: totalProducts,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount || 0, totalProducts || 0), remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount || 0), rate: calculateRate(startTime, processedCount),
percentage: (((processedCount || 0) / (totalProducts || 1)) * 100).toFixed(1), percentage: (((processedCount) / (totalProducts || 1)) * 100).toFixed(1),
timing: { timing: {
start_time: new Date(startTime).toISOString(), start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(), end_time: new Date().toISOString(),
@@ -365,12 +433,12 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
outputProgress({ outputProgress({
status: 'running', status: 'running',
operation: 'Skipping product time aggregates calculation', operation: 'Skipping product time aggregates calculation',
current: processedCount || 0, current: processedCount,
total: totalProducts || 0, total: totalProducts,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount || 0, totalProducts || 0), remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount || 0), rate: calculateRate(startTime, processedCount),
percentage: (((processedCount || 0) / (totalProducts || 1)) * 100).toFixed(1), percentage: (((processedCount) / (totalProducts || 1)) * 100).toFixed(1),
timing: { timing: {
start_time: new Date(startTime).toISOString(), start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(), end_time: new Date().toISOString(),
@@ -399,7 +467,7 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
if (isCancelled) return { if (isCancelled) return {
processedProducts: processedCount, processedProducts: processedCount,
processedOrders, processedOrders,
processedPurchaseOrders: 0, // This module doesn't process POs processedPurchaseOrders: 0,
success success
}; };
@@ -460,7 +528,7 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
if (isCancelled) return { if (isCancelled) return {
processedProducts: processedCount, processedProducts: processedCount,
processedOrders, processedOrders,
processedPurchaseOrders: 0, // This module doesn't process POs processedPurchaseOrders: 0,
success success
}; };
@@ -532,7 +600,7 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
// If we get here, everything completed successfully // If we get here, everything completed successfully
success = true; success = true;
// Update calculate_status // Update calculate_status with current timestamp
await connection.query(` await connection.query(`
INSERT INTO calculate_status (module_name, last_calculation_timestamp) INSERT INTO calculate_status (module_name, last_calculation_timestamp)
VALUES ('product_metrics', NOW()) VALUES ('product_metrics', NOW())
@@ -540,9 +608,9 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
`); `);
return { return {
processedProducts: processedCount || 0, processedProducts: processedCount,
processedOrders: processedOrders || 0, processedOrders: processedOrders || 0,
processedPurchaseOrders: 0, // This module doesn't process POs processedPurchaseOrders: 0,
success success
}; };

View File

@@ -4,19 +4,51 @@ const { getConnection } = require('./utils/db');
async function calculateSalesForecasts(startTime, totalProducts, processedCount = 0, isCancelled = false) { async function calculateSalesForecasts(startTime, totalProducts, processedCount = 0, isCancelled = false) {
const connection = await getConnection(); const connection = await getConnection();
let success = false; let success = false;
let processedOrders = 0; let myProcessedProducts = 0; // Track products processed *within this module*
const BATCH_SIZE = 5000;
try { try {
// Get last calculation timestamp
const [lastCalc] = await connection.query(`
SELECT last_calculation_timestamp
FROM calculate_status
WHERE module_name = 'sales_forecasts'
`);
const lastCalculationTime = lastCalc[0]?.last_calculation_timestamp || '1970-01-01';
// Get total count of products needing updates
const [productCount] = await connection.query(`
SELECT COUNT(DISTINCT p.pid) as count
FROM products p
LEFT JOIN orders o ON p.pid = o.pid AND o.updated > ?
WHERE p.visible = true
AND (
p.updated > ?
OR o.id IS NOT NULL
)
`, [lastCalculationTime, lastCalculationTime]);
const totalProductsToUpdate = productCount[0].count;
if (totalProductsToUpdate === 0) {
console.log('No products need forecast updates');
return {
processedProducts: 0,
processedOrders: 0,
processedPurchaseOrders: 0,
success: true
};
}
if (isCancelled) { if (isCancelled) {
outputProgress({ outputProgress({
status: 'cancelled', status: 'cancelled',
operation: 'Sales forecasts calculation cancelled', operation: 'Sales forecast calculation cancelled',
current: processedCount, current: processedCount,
total: totalProducts, total: totalProducts,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: null, remaining: null,
rate: calculateRate(startTime, processedCount), rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1), percentage: ((processedCount / totalProductsToUpdate) * 100).toFixed(1),
timing: { timing: {
start_time: new Date(startTime).toISOString(), start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(), end_time: new Date().toISOString(),
@@ -24,31 +56,22 @@ async function calculateSalesForecasts(startTime, totalProducts, processedCount
} }
}); });
return { return {
processedProducts: processedCount, processedProducts: myProcessedProducts,
processedOrders: 0, processedOrders: 0,
processedPurchaseOrders: 0, processedPurchaseOrders: 0,
success success
}; };
} }
// Get order count that will be processed
const [orderCount] = await connection.query(`
SELECT COUNT(*) as count
FROM orders o
WHERE o.canceled = false
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 90 DAY)
`);
processedOrders = orderCount[0].count;
outputProgress({ outputProgress({
status: 'running', status: 'running',
operation: 'Starting sales forecasts calculation', operation: 'Starting sales forecast calculation',
current: processedCount, current: processedCount,
total: totalProducts, total: totalProducts,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts), remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount), rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1), percentage: ((processedCount / totalProductsToUpdate) * 100).toFixed(1),
timing: { timing: {
start_time: new Date(startTime).toISOString(), start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(), end_time: new Date().toISOString(),
@@ -56,365 +79,201 @@ async function calculateSalesForecasts(startTime, totalProducts, processedCount
} }
}); });
// First, create a temporary table for forecast dates // Process in batches
await connection.query(` let lastPid = '';
CREATE TEMPORARY TABLE IF NOT EXISTS temp_forecast_dates ( while (true) {
forecast_date DATE, if (isCancelled) break;
day_of_week INT,
month INT,
PRIMARY KEY (forecast_date)
)
`);
await connection.query(` const [batch] = await connection.query(`
INSERT INTO temp_forecast_dates SELECT DISTINCT p.pid
SELECT FROM products p
DATE_ADD(CURRENT_DATE, INTERVAL n DAY) as forecast_date, FORCE INDEX (PRIMARY)
DAYOFWEEK(DATE_ADD(CURRENT_DATE, INTERVAL n DAY)) as day_of_week, LEFT JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid AND o.updated > ?
MONTH(DATE_ADD(CURRENT_DATE, INTERVAL n DAY)) as month WHERE p.visible = true
FROM ( AND p.pid > ?
SELECT a.N + b.N * 10 as n AND (
FROM p.updated > ?
(SELECT 0 as N UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4 UNION OR o.id IS NOT NULL
SELECT 5 UNION SELECT 6 UNION SELECT 7 UNION SELECT 8 UNION SELECT 9) a, )
(SELECT 0 as N UNION SELECT 1 UNION SELECT 2) b ORDER BY p.pid
ORDER BY n LIMIT ?
LIMIT 31 `, [lastCalculationTime, lastPid, lastCalculationTime, BATCH_SIZE]);
) numbers
`);
processedCount = Math.floor(totalProducts * 0.92); if (batch.length === 0) break;
outputProgress({
status: 'running',
operation: 'Forecast dates prepared, calculating daily sales stats',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (isCancelled) return { // Create temporary tables for better performance
processedProducts: processedCount, await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_historical_sales');
processedOrders, await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_sales_stats');
processedPurchaseOrders: 0, await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_recent_trend');
success await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_confidence_calc');
};
// Create temporary table for daily sales stats // Create optimized temporary tables with indexes
await connection.query(` await connection.query(`
CREATE TEMPORARY TABLE IF NOT EXISTS temp_daily_sales AS CREATE TEMPORARY TABLE temp_historical_sales (
SELECT pid BIGINT NOT NULL,
o.pid, sale_date DATE NOT NULL,
DAYOFWEEK(o.date) as day_of_week, daily_quantity INT,
SUM(o.quantity) as daily_quantity, daily_revenue DECIMAL(15,2),
SUM(o.price * o.quantity) as daily_revenue, PRIMARY KEY (pid, sale_date),
COUNT(DISTINCT DATE(o.date)) as day_count INDEX (sale_date)
FROM orders o ) ENGINE=MEMORY
WHERE o.canceled = false `);
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 90 DAY)
GROUP BY o.pid, DAYOFWEEK(o.date)
`);
processedCount = Math.floor(totalProducts * 0.94); await connection.query(`
outputProgress({ CREATE TEMPORARY TABLE temp_sales_stats (
status: 'running', pid BIGINT NOT NULL,
operation: 'Daily sales stats calculated, preparing product stats', avg_daily_units DECIMAL(10,2),
current: processedCount, avg_daily_revenue DECIMAL(15,2),
total: totalProducts, std_daily_units DECIMAL(10,2),
elapsed: formatElapsedTime(startTime), days_with_sales INT,
remaining: estimateRemaining(startTime, processedCount, totalProducts), first_sale DATE,
rate: calculateRate(startTime, processedCount), last_sale DATE,
percentage: ((processedCount / totalProducts) * 100).toFixed(1), PRIMARY KEY (pid),
timing: { INDEX (days_with_sales),
start_time: new Date(startTime).toISOString(), INDEX (last_sale)
end_time: new Date().toISOString(), ) ENGINE=MEMORY
elapsed_seconds: Math.round((Date.now() - startTime) / 1000) `);
}
});
if (isCancelled) return { await connection.query(`
processedProducts: processedCount, CREATE TEMPORARY TABLE temp_recent_trend (
processedOrders, pid BIGINT NOT NULL,
processedPurchaseOrders: 0, recent_avg_units DECIMAL(10,2),
success recent_avg_revenue DECIMAL(15,2),
}; PRIMARY KEY (pid)
) ENGINE=MEMORY
`);
// Create temporary table for product stats await connection.query(`
await connection.query(` CREATE TEMPORARY TABLE temp_confidence_calc (
CREATE TEMPORARY TABLE IF NOT EXISTS temp_product_stats AS pid BIGINT NOT NULL,
SELECT confidence_level TINYINT,
pid, PRIMARY KEY (pid)
AVG(daily_revenue) as overall_avg_revenue, ) ENGINE=MEMORY
SUM(day_count) as total_days `);
FROM temp_daily_sales
GROUP BY pid
`);
processedCount = Math.floor(totalProducts * 0.96); // Populate historical sales with optimized index usage
outputProgress({ await connection.query(`
status: 'running', INSERT INTO temp_historical_sales
operation: 'Product stats prepared, calculating product-level forecasts',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (isCancelled) return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders: 0,
success
};
// Calculate product-level forecasts
await connection.query(`
INSERT INTO sales_forecasts (
pid,
forecast_date,
forecast_units,
forecast_revenue,
confidence_level,
last_calculated_at
)
WITH daily_stats AS (
SELECT SELECT
ds.pid, o.pid,
AVG(ds.daily_quantity) as avg_daily_qty, DATE(o.date) as sale_date,
STDDEV(ds.daily_quantity) as std_daily_qty, SUM(o.quantity) as daily_quantity,
COUNT(DISTINCT ds.day_count) as data_points, SUM(o.quantity * o.price) as daily_revenue
SUM(ds.day_count) as total_days, FROM orders o
AVG(ds.daily_revenue) as avg_daily_revenue, FORCE INDEX (idx_orders_metrics)
STDDEV(ds.daily_revenue) as std_daily_revenue, WHERE o.canceled = false
MIN(ds.daily_quantity) as min_daily_qty, AND o.pid IN (?)
MAX(ds.daily_quantity) as max_daily_qty, AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 180 DAY)
-- Calculate variance without using LAG GROUP BY o.pid, DATE(o.date)
COALESCE( `, [batch.map(row => row.pid)]);
STDDEV(ds.daily_quantity) / NULLIF(AVG(ds.daily_quantity), 0),
0
) as daily_variance_ratio
FROM temp_daily_sales ds
GROUP BY ds.pid
HAVING AVG(ds.daily_quantity) > 0
)
SELECT
ds.pid,
fd.forecast_date,
GREATEST(0,
ROUND(
ds.avg_daily_qty *
(1 + COALESCE(sf.seasonality_factor, 0)) *
CASE
WHEN ds.std_daily_qty / NULLIF(ds.avg_daily_qty, 0) > 1.5 THEN 0.85
WHEN ds.std_daily_qty / NULLIF(ds.avg_daily_qty, 0) > 1.0 THEN 0.9
WHEN ds.std_daily_qty / NULLIF(ds.avg_daily_qty, 0) > 0.5 THEN 0.95
ELSE 1.0
END,
2
)
) as forecast_units,
GREATEST(0,
ROUND(
COALESCE(
CASE
WHEN ds.data_points >= 4 THEN ds.avg_daily_revenue
ELSE ps.overall_avg_revenue
END *
(1 + COALESCE(sf.seasonality_factor, 0)) *
CASE
WHEN ds.std_daily_revenue / NULLIF(ds.avg_daily_revenue, 0) > 1.5 THEN 0.85
WHEN ds.std_daily_revenue / NULLIF(ds.avg_daily_revenue, 0) > 1.0 THEN 0.9
WHEN ds.std_daily_revenue / NULLIF(ds.avg_daily_revenue, 0) > 0.5 THEN 0.95
ELSE 1.0
END,
0
),
2
)
) as forecast_revenue,
CASE
WHEN ds.total_days >= 60 AND ds.daily_variance_ratio < 0.5 THEN 90
WHEN ds.total_days >= 60 THEN 85
WHEN ds.total_days >= 30 AND ds.daily_variance_ratio < 0.5 THEN 80
WHEN ds.total_days >= 30 THEN 75
WHEN ds.total_days >= 14 AND ds.daily_variance_ratio < 0.5 THEN 70
WHEN ds.total_days >= 14 THEN 65
ELSE 60
END as confidence_level,
NOW() as last_calculated_at
FROM daily_stats ds
JOIN temp_product_stats ps ON ds.pid = ps.pid
CROSS JOIN temp_forecast_dates fd
LEFT JOIN sales_seasonality sf ON fd.month = sf.month
GROUP BY ds.pid, fd.forecast_date, ps.overall_avg_revenue, sf.seasonality_factor
ON DUPLICATE KEY UPDATE
forecast_units = VALUES(forecast_units),
forecast_revenue = VALUES(forecast_revenue),
confidence_level = VALUES(confidence_level),
last_calculated_at = NOW()
`);
processedCount = Math.floor(totalProducts * 0.98); // Populate sales stats
outputProgress({ await connection.query(`
status: 'running', INSERT INTO temp_sales_stats
operation: 'Product forecasts calculated, preparing category stats', SELECT
current: processedCount, pid,
total: totalProducts, AVG(daily_quantity) as avg_daily_units,
elapsed: formatElapsedTime(startTime), AVG(daily_revenue) as avg_daily_revenue,
remaining: estimateRemaining(startTime, processedCount, totalProducts), STDDEV(daily_quantity) as std_daily_units,
rate: calculateRate(startTime, processedCount), COUNT(*) as days_with_sales,
percentage: ((processedCount / totalProducts) * 100).toFixed(1), MIN(sale_date) as first_sale,
timing: { MAX(sale_date) as last_sale
start_time: new Date(startTime).toISOString(), FROM temp_historical_sales
end_time: new Date().toISOString(), GROUP BY pid
elapsed_seconds: Math.round((Date.now() - startTime) / 1000) `);
}
});
if (isCancelled) return { // Populate recent trend
processedProducts: processedCount, await connection.query(`
processedOrders, INSERT INTO temp_recent_trend
processedPurchaseOrders: 0, SELECT
success h.pid,
}; AVG(h.daily_quantity) as recent_avg_units,
AVG(h.daily_revenue) as recent_avg_revenue
FROM temp_historical_sales h
WHERE h.sale_date >= DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY)
GROUP BY h.pid
`);
// Create temporary table for category stats // Calculate confidence levels
await connection.query(` await connection.query(`
CREATE TEMPORARY TABLE IF NOT EXISTS temp_category_sales AS INSERT INTO temp_confidence_calc
SELECT SELECT
pc.cat_id, s.pid,
DAYOFWEEK(o.date) as day_of_week, LEAST(100, GREATEST(0, ROUND(
SUM(o.quantity) as daily_quantity, (s.days_with_sales / 180.0 * 50) + -- Up to 50 points for history length
SUM(o.price * o.quantity) as daily_revenue, (CASE
COUNT(DISTINCT DATE(o.date)) as day_count WHEN s.std_daily_units = 0 OR s.avg_daily_units = 0 THEN 0
FROM orders o WHEN (s.std_daily_units / s.avg_daily_units) <= 0.5 THEN 30
JOIN product_categories pc ON o.pid = pc.pid WHEN (s.std_daily_units / s.avg_daily_units) <= 1.0 THEN 20
WHERE o.canceled = false WHEN (s.std_daily_units / s.avg_daily_units) <= 2.0 THEN 10
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 90 DAY) ELSE 0
GROUP BY pc.cat_id, DAYOFWEEK(o.date) END) + -- Up to 30 points for consistency
`); (CASE
WHEN DATEDIFF(CURRENT_DATE, s.last_sale) <= 7 THEN 20
WHEN DATEDIFF(CURRENT_DATE, s.last_sale) <= 30 THEN 10
ELSE 0
END) -- Up to 20 points for recency
))) as confidence_level
FROM temp_sales_stats s
`);
await connection.query(` // Generate forecasts using temp tables
CREATE TEMPORARY TABLE IF NOT EXISTS temp_category_stats AS await connection.query(`
SELECT REPLACE INTO sales_forecasts
cat_id, (pid, forecast_date, forecast_units, forecast_revenue, confidence_level, last_calculated_at)
AVG(daily_revenue) as overall_avg_revenue, SELECT
SUM(day_count) as total_days s.pid,
FROM temp_category_sales DATE_ADD(CURRENT_DATE, INTERVAL n.days DAY),
GROUP BY cat_id GREATEST(0, ROUND(
`);
processedCount = Math.floor(totalProducts * 0.99);
outputProgress({
status: 'running',
operation: 'Category stats prepared, calculating category-level forecasts',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (isCancelled) return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders: 0,
success
};
// Calculate category-level forecasts
await connection.query(`
INSERT INTO category_forecasts (
category_id,
forecast_date,
forecast_units,
forecast_revenue,
confidence_level,
last_calculated_at
)
SELECT
cs.cat_id as category_id,
fd.forecast_date,
GREATEST(0,
AVG(cs.daily_quantity) *
(1 + COALESCE(sf.seasonality_factor, 0))
) as forecast_units,
GREATEST(0,
COALESCE(
CASE CASE
WHEN SUM(cs.day_count) >= 4 THEN AVG(cs.daily_revenue) WHEN s.days_with_sales >= n.days THEN COALESCE(t.recent_avg_units, s.avg_daily_units)
ELSE ct.overall_avg_revenue ELSE s.avg_daily_units * (s.days_with_sales / n.days)
END * END
(1 + COALESCE(sf.seasonality_factor, 0)) * )),
(0.95 + (RAND() * 0.1)), GREATEST(0, ROUND(
0 CASE
) WHEN s.days_with_sales >= n.days THEN COALESCE(t.recent_avg_revenue, s.avg_daily_revenue)
) as forecast_revenue, ELSE s.avg_daily_revenue * (s.days_with_sales / n.days)
CASE END,
WHEN ct.total_days >= 60 THEN 90 2
WHEN ct.total_days >= 30 THEN 80 )),
WHEN ct.total_days >= 14 THEN 70 c.confidence_level,
ELSE 60 NOW()
END as confidence_level, FROM temp_sales_stats s
NOW() as last_calculated_at CROSS JOIN (
FROM temp_category_sales cs SELECT 30 as days
JOIN temp_category_stats ct ON cs.cat_id = ct.cat_id UNION SELECT 60
CROSS JOIN temp_forecast_dates fd UNION SELECT 90
LEFT JOIN sales_seasonality sf ON fd.month = sf.month ) n
GROUP BY cs.cat_id, fd.forecast_date, ct.overall_avg_revenue, ct.total_days, sf.seasonality_factor LEFT JOIN temp_recent_trend t ON s.pid = t.pid
HAVING AVG(cs.daily_quantity) > 0 LEFT JOIN temp_confidence_calc c ON s.pid = c.pid;
ON DUPLICATE KEY UPDATE `);
forecast_units = VALUES(forecast_units),
forecast_revenue = VALUES(forecast_revenue),
confidence_level = VALUES(confidence_level),
last_calculated_at = NOW()
`);
// Clean up temporary tables // Clean up temp tables
await connection.query(` await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_historical_sales');
DROP TEMPORARY TABLE IF EXISTS temp_forecast_dates; await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_sales_stats');
DROP TEMPORARY TABLE IF EXISTS temp_daily_sales; await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_recent_trend');
DROP TEMPORARY TABLE IF EXISTS temp_product_stats; await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_confidence_calc');
DROP TEMPORARY TABLE IF EXISTS temp_category_sales;
DROP TEMPORARY TABLE IF EXISTS temp_category_stats;
`);
processedCount = Math.floor(totalProducts * 1.0); lastPid = batch[batch.length - 1].pid;
outputProgress({ myProcessedProducts += batch.length;
status: 'running',
operation: 'Category forecasts calculated and temporary tables cleaned up', outputProgress({
current: processedCount, status: 'running',
total: totalProducts, operation: 'Processing sales forecast batch',
elapsed: formatElapsedTime(startTime), current: processedCount,
remaining: estimateRemaining(startTime, processedCount, totalProducts), total: totalProducts,
rate: calculateRate(startTime, processedCount), elapsed: formatElapsedTime(startTime),
percentage: ((processedCount / totalProducts) * 100).toFixed(1), remaining: estimateRemaining(startTime, processedCount, totalProducts),
timing: { rate: calculateRate(startTime, processedCount),
start_time: new Date(startTime).toISOString(), percentage: ((processedCount / totalProductsToUpdate) * 100).toFixed(1),
end_time: new Date().toISOString(), timing: {
elapsed_seconds: Math.round((Date.now() - startTime) / 1000) start_time: new Date(startTime).toISOString(),
} end_time: new Date().toISOString(),
}); elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
}
// If we get here, everything completed successfully // If we get here, everything completed successfully
success = true; success = true;
@@ -427,8 +286,8 @@ async function calculateSalesForecasts(startTime, totalProducts, processedCount
`); `);
return { return {
processedProducts: processedCount, processedProducts: myProcessedProducts,
processedOrders, processedOrders: 0,
processedPurchaseOrders: 0, processedPurchaseOrders: 0,
success success
}; };

View File

@@ -4,14 +4,35 @@ const { getConnection } = require('./utils/db');
async function calculateTimeAggregates(startTime, totalProducts, processedCount = 0, isCancelled = false) { async function calculateTimeAggregates(startTime, totalProducts, processedCount = 0, isCancelled = false) {
const connection = await getConnection(); const connection = await getConnection();
let success = false; let success = false;
let processedOrders = 0; const BATCH_SIZE = 5000;
let myProcessedProducts = 0; // Track products processed *within this module*
try { try {
// Get last calculation timestamp
const [lastCalc] = await connection.query(`
SELECT last_calculation_timestamp
FROM calculate_status
WHERE module_name = 'time_aggregates'
`);
const lastCalculationTime = lastCalc[0]?.last_calculation_timestamp || '1970-01-01';
// We now receive totalProducts as an argument, so we don't need to query for it here.
if (totalProducts === 0) {
console.log('No products need time aggregate updates');
return {
processedProducts: 0,
processedOrders: 0,
processedPurchaseOrders: 0,
success: true
};
}
if (isCancelled) { if (isCancelled) {
outputProgress({ outputProgress({
status: 'cancelled', status: 'cancelled',
operation: 'Time aggregates calculation cancelled', operation: 'Time aggregates calculation cancelled',
current: processedCount, current: processedCount, // Use passed-in value
total: totalProducts, total: totalProducts,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: null, remaining: null,
@@ -24,25 +45,17 @@ async function calculateTimeAggregates(startTime, totalProducts, processedCount
} }
}); });
return { return {
processedProducts: processedCount, processedProducts: myProcessedProducts, // Return only what *this* module processed
processedOrders: 0, processedOrders: 0,
processedPurchaseOrders: 0, processedPurchaseOrders: 0,
success success
}; };
} }
// Get order count that will be processed
const [orderCount] = await connection.query(`
SELECT COUNT(*) as count
FROM orders o
WHERE o.canceled = false
`);
processedOrders = orderCount[0].count;
outputProgress({ outputProgress({
status: 'running', status: 'running',
operation: 'Starting time aggregates calculation', operation: 'Starting time aggregates calculation',
current: processedCount, current: processedCount, // Use passed-in value
total: totalProducts, total: totalProducts,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts), remaining: estimateRemaining(startTime, processedCount, totalProducts),
@@ -55,184 +68,204 @@ async function calculateTimeAggregates(startTime, totalProducts, processedCount
} }
}); });
// Initial insert of time-based aggregates // Process in batches
await connection.query(` let lastPid = 0;
INSERT INTO product_time_aggregates ( while (true) {
pid, if (isCancelled) break;
year,
month,
total_quantity_sold,
total_revenue,
total_cost,
order_count,
stock_received,
stock_ordered,
avg_price,
profit_margin,
inventory_value,
gmroi
)
WITH monthly_sales AS (
SELECT
o.pid,
YEAR(o.date) as year,
MONTH(o.date) as month,
SUM(o.quantity) as total_quantity_sold,
SUM((o.price - COALESCE(o.discount, 0)) * o.quantity) as total_revenue,
SUM(COALESCE(p.cost_price, 0) * o.quantity) as total_cost,
COUNT(DISTINCT o.order_number) as order_count,
AVG(o.price - COALESCE(o.discount, 0)) as avg_price,
CASE
WHEN SUM((o.price - COALESCE(o.discount, 0)) * o.quantity) > 0
THEN ((SUM((o.price - COALESCE(o.discount, 0)) * o.quantity) - SUM(COALESCE(p.cost_price, 0) * o.quantity))
/ SUM((o.price - COALESCE(o.discount, 0)) * o.quantity)) * 100
ELSE 0
END as profit_margin,
p.cost_price * p.stock_quantity as inventory_value,
COUNT(DISTINCT DATE(o.date)) as active_days
FROM orders o
JOIN products p ON o.pid = p.pid
WHERE o.canceled = false
GROUP BY o.pid, YEAR(o.date), MONTH(o.date)
),
monthly_stock AS (
SELECT
pid,
YEAR(date) as year,
MONTH(date) as month,
SUM(received) as stock_received,
SUM(ordered) as stock_ordered
FROM purchase_orders
GROUP BY pid, YEAR(date), MONTH(date)
)
SELECT
s.pid,
s.year,
s.month,
s.total_quantity_sold,
s.total_revenue,
s.total_cost,
s.order_count,
COALESCE(ms.stock_received, 0) as stock_received,
COALESCE(ms.stock_ordered, 0) as stock_ordered,
s.avg_price,
s.profit_margin,
s.inventory_value,
CASE
WHEN s.inventory_value > 0 THEN
(s.total_revenue - s.total_cost) / s.inventory_value
ELSE 0
END as gmroi
FROM monthly_sales s
LEFT JOIN monthly_stock ms
ON s.pid = ms.pid
AND s.year = ms.year
AND s.month = ms.month
UNION
SELECT
p.pid,
p.year,
p.month,
0 as total_quantity_sold,
0 as total_revenue,
0 as total_cost,
0 as order_count,
p.stock_received,
p.stock_ordered,
0 as avg_price,
0 as profit_margin,
(SELECT cost_price * stock_quantity FROM products WHERE pid = p.pid) as inventory_value,
0 as gmroi
FROM monthly_stock p
LEFT JOIN monthly_sales s
ON p.pid = s.pid
AND p.year = s.year
AND p.month = s.month
WHERE s.pid IS NULL
ON DUPLICATE KEY UPDATE
total_quantity_sold = VALUES(total_quantity_sold),
total_revenue = VALUES(total_revenue),
total_cost = VALUES(total_cost),
order_count = VALUES(order_count),
stock_received = VALUES(stock_received),
stock_ordered = VALUES(stock_ordered),
avg_price = VALUES(avg_price),
profit_margin = VALUES(profit_margin),
inventory_value = VALUES(inventory_value),
gmroi = VALUES(gmroi)
`);
processedCount = Math.floor(totalProducts * 0.60); const [batch] = await connection.query(`
outputProgress({ SELECT DISTINCT p.pid
status: 'running', FROM products p
operation: 'Base time aggregates calculated, updating financial metrics', FORCE INDEX (PRIMARY)
current: processedCount, LEFT JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid
total: totalProducts, WHERE p.pid > ?
elapsed: formatElapsedTime(startTime), AND (
remaining: estimateRemaining(startTime, processedCount, totalProducts), p.updated > ?
rate: calculateRate(startTime, processedCount), OR EXISTS (
percentage: ((processedCount / totalProducts) * 100).toFixed(1), SELECT 1
timing: { FROM orders o2 FORCE INDEX (idx_orders_metrics)
start_time: new Date(startTime).toISOString(), WHERE o2.pid = p.pid
end_time: new Date().toISOString(), AND o2.updated > ?
elapsed_seconds: Math.round((Date.now() - startTime) / 1000) )
} )
}); ORDER BY p.pid
LIMIT ?
`, [lastPid, lastCalculationTime, lastCalculationTime, BATCH_SIZE]);
if (isCancelled) return { if (batch.length === 0) break;
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders: 0,
success
};
// Update with financial metrics // Create temporary tables for better performance
await connection.query(` await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_order_stats');
UPDATE product_time_aggregates pta await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_purchase_stats');
JOIN ( await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_time_aggregates');
// Create optimized temporary tables
await connection.query(`
CREATE TEMPORARY TABLE temp_order_stats (
pid BIGINT NOT NULL,
year INT NOT NULL,
month INT NOT NULL,
total_quantity_sold INT DEFAULT 0,
total_revenue DECIMAL(10,3) DEFAULT 0,
total_cost DECIMAL(10,3) DEFAULT 0,
order_count INT DEFAULT 0,
avg_price DECIMAL(10,3),
PRIMARY KEY (pid, year, month),
INDEX (pid)
) ENGINE=MEMORY
`);
await connection.query(`
CREATE TEMPORARY TABLE temp_purchase_stats (
pid BIGINT NOT NULL,
year INT NOT NULL,
month INT NOT NULL,
stock_received INT DEFAULT 0,
stock_ordered INT DEFAULT 0,
PRIMARY KEY (pid, year, month),
INDEX (pid)
) ENGINE=MEMORY
`);
await connection.query(`
CREATE TEMPORARY TABLE temp_time_aggregates (
pid BIGINT NOT NULL,
year INT NOT NULL,
month INT NOT NULL,
total_quantity_sold INT DEFAULT 0,
total_revenue DECIMAL(10,3) DEFAULT 0,
total_cost DECIMAL(10,3) DEFAULT 0,
order_count INT DEFAULT 0,
stock_received INT DEFAULT 0,
stock_ordered INT DEFAULT 0,
avg_price DECIMAL(10,3),
profit_margin DECIMAL(10,3),
inventory_value DECIMAL(10,3),
gmroi DECIMAL(10,3),
PRIMARY KEY (pid, year, month),
INDEX (pid)
) ENGINE=MEMORY
`);
// Populate order stats
await connection.query(`
INSERT INTO temp_order_stats
SELECT SELECT
p.pid, p.pid,
YEAR(o.date) as year, YEAR(o.date) as year,
MONTH(o.date) as month, MONTH(o.date) as month,
p.cost_price * p.stock_quantity as inventory_value, SUM(o.quantity) as total_quantity_sold,
SUM(o.quantity * (o.price - p.cost_price)) as gross_profit, SUM(o.quantity * o.price) as total_revenue,
COUNT(DISTINCT DATE(o.date)) as days_in_period SUM(o.quantity * p.cost_price) as total_cost,
COUNT(DISTINCT o.order_number) as order_count,
AVG(o.price) as avg_price
FROM products p FROM products p
LEFT JOIN orders o ON p.pid = o.pid FORCE INDEX (PRIMARY)
WHERE o.canceled = false INNER JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid
WHERE p.pid IN (?)
AND o.canceled = false
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH)
GROUP BY p.pid, YEAR(o.date), MONTH(o.date) GROUP BY p.pid, YEAR(o.date), MONTH(o.date)
) fin ON pta.pid = fin.pid `, [batch.map(row => row.pid)]);
AND pta.year = fin.year
AND pta.month = fin.month
SET
pta.inventory_value = COALESCE(fin.inventory_value, 0),
pta.gmroi = CASE
WHEN COALESCE(fin.inventory_value, 0) > 0 AND fin.days_in_period > 0 THEN
(COALESCE(fin.gross_profit, 0) * (365.0 / fin.days_in_period)) / COALESCE(fin.inventory_value, 0)
ELSE 0
END
`);
processedCount = Math.floor(totalProducts * 0.65); // Populate purchase stats
outputProgress({ await connection.query(`
status: 'running', INSERT INTO temp_purchase_stats
operation: 'Financial metrics updated', SELECT
current: processedCount, p.pid,
total: totalProducts, YEAR(po.date) as year,
elapsed: formatElapsedTime(startTime), MONTH(po.date) as month,
remaining: estimateRemaining(startTime, processedCount, totalProducts), COALESCE(SUM(CASE WHEN po.received_date IS NOT NULL THEN po.received ELSE 0 END), 0) as stock_received,
rate: calculateRate(startTime, processedCount), COALESCE(SUM(po.ordered), 0) as stock_ordered
percentage: ((processedCount / totalProducts) * 100).toFixed(1), FROM products p
timing: { FORCE INDEX (PRIMARY)
start_time: new Date(startTime).toISOString(), INNER JOIN purchase_orders po FORCE INDEX (idx_po_metrics) ON p.pid = po.pid
end_time: new Date().toISOString(), WHERE p.pid IN (?)
elapsed_seconds: Math.round((Date.now() - startTime) / 1000) AND po.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH)
} GROUP BY p.pid, YEAR(po.date), MONTH(po.date)
}); `, [batch.map(row => row.pid)]);
// Combine stats and calculate metrics
await connection.query(`
INSERT INTO temp_time_aggregates
SELECT
o.pid,
o.year,
o.month,
o.total_quantity_sold,
o.total_revenue,
o.total_cost,
o.order_count,
COALESCE(ps.stock_received, 0) as stock_received,
COALESCE(ps.stock_ordered, 0) as stock_ordered,
o.avg_price,
CASE
WHEN o.total_revenue > 0
THEN ((o.total_revenue - o.total_cost) / o.total_revenue) * 100
ELSE 0
END as profit_margin,
p.cost_price * p.stock_quantity as inventory_value,
CASE
WHEN (p.cost_price * p.stock_quantity) > 0
THEN (o.total_revenue - o.total_cost) / (p.cost_price * p.stock_quantity)
ELSE 0
END as gmroi
FROM temp_order_stats o
LEFT JOIN temp_purchase_stats ps ON o.pid = ps.pid AND o.year = ps.year AND o.month = ps.month
JOIN products p FORCE INDEX (PRIMARY) ON o.pid = p.pid
`);
// Update final table with optimized batch update
await connection.query(`
INSERT INTO product_time_aggregates (
pid, year, month,
total_quantity_sold, total_revenue, total_cost,
order_count, stock_received, stock_ordered,
avg_price, profit_margin, inventory_value, gmroi
)
SELECT *
FROM temp_time_aggregates
ON DUPLICATE KEY UPDATE
total_quantity_sold = VALUES(total_quantity_sold),
total_revenue = VALUES(total_revenue),
total_cost = VALUES(total_cost),
order_count = VALUES(order_count),
stock_received = VALUES(stock_received),
stock_ordered = VALUES(stock_ordered),
avg_price = VALUES(avg_price),
profit_margin = VALUES(profit_margin),
inventory_value = VALUES(inventory_value),
gmroi = VALUES(gmroi)
`);
// Clean up temp tables
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_order_stats');
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_purchase_stats');
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_time_aggregates');
lastPid = batch[batch.length - 1].pid;
myProcessedProducts += batch.length; // Increment *this module's* count
outputProgress({
status: 'running',
operation: 'Processing time aggregates batch',
current: processedCount + myProcessedProducts, // Show cumulative progress
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount + myProcessedProducts, totalProducts),
rate: calculateRate(startTime, processedCount + myProcessedProducts),
percentage: (((processedCount + myProcessedProducts) / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
}
// If we get here, everything completed successfully // If we get here, everything completed successfully
success = true; success = true;
// Update calculate_status // Update calculate_status
await connection.query(` await connection.query(`
INSERT INTO calculate_status (module_name, last_calculation_timestamp) INSERT INTO calculate_status (module_name, last_calculation_timestamp)
@@ -241,8 +274,8 @@ async function calculateTimeAggregates(startTime, totalProducts, processedCount
`); `);
return { return {
processedProducts: processedCount, processedProducts: myProcessedProducts, // Return only what *this* module processed
processedOrders, processedOrders: 0,
processedPurchaseOrders: 0, processedPurchaseOrders: 0,
success success
}; };

View File

@@ -4,20 +4,58 @@ const { getConnection } = require('./utils/db');
async function calculateVendorMetrics(startTime, totalProducts, processedCount = 0, isCancelled = false) { async function calculateVendorMetrics(startTime, totalProducts, processedCount = 0, isCancelled = false) {
const connection = await getConnection(); const connection = await getConnection();
let success = false; let success = false;
let processedOrders = 0; const BATCH_SIZE = 5000;
let processedPurchaseOrders = 0; let myProcessedProducts = 0; // Not directly processing products, but we'll track vendors
try { try {
// Get last calculation timestamp
const [lastCalc] = await connection.query(`
SELECT last_calculation_timestamp
FROM calculate_status
WHERE module_name = 'vendor_metrics'
`);
const lastCalculationTime = lastCalc[0]?.last_calculation_timestamp || '1970-01-01';
// Get total count of vendors needing updates using EXISTS for better performance
const [vendorCount] = await connection.query(`
SELECT COUNT(DISTINCT v.vendor) as count
FROM vendor_details v
WHERE v.status = 'active'
AND (
EXISTS (
SELECT 1 FROM products p
WHERE p.vendor = v.vendor
AND p.updated > ?
)
OR EXISTS (
SELECT 1 FROM purchase_orders po
WHERE po.vendor = v.vendor
AND po.updated > ?
)
)
`, [lastCalculationTime, lastCalculationTime]);
const totalVendors = vendorCount[0].count; // Track total *vendors*
if (totalVendors === 0) {
console.log('No vendors need metric updates');
return {
processedProducts: 0, // No products directly processed
processedOrders: 0,
processedPurchaseOrders: 0,
success: true
};
}
if (isCancelled) { if (isCancelled) {
outputProgress({ outputProgress({
status: 'cancelled', status: 'cancelled',
operation: 'Vendor metrics calculation cancelled', operation: 'Vendor metrics calculation cancelled',
current: processedCount, current: processedCount, // Use passed-in value (for consistency)
total: totalProducts, total: totalVendors, // Report total *vendors*
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: null, remaining: null,
rate: calculateRate(startTime, processedCount), rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1), percentage: ((processedCount / totalVendors) * 100).toFixed(1), // Base on vendors
timing: { timing: {
start_time: new Date(startTime).toISOString(), start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(), end_time: new Date().toISOString(),
@@ -25,38 +63,22 @@ async function calculateVendorMetrics(startTime, totalProducts, processedCount =
} }
}); });
return { return {
processedProducts: processedCount, processedProducts: 0, // No products directly processed
processedOrders, processedOrders: 0,
processedPurchaseOrders, processedPurchaseOrders: 0,
success success
}; };
} }
// Get counts of records that will be processed
const [[orderCount], [poCount]] = await Promise.all([
connection.query(`
SELECT COUNT(*) as count
FROM orders o
WHERE o.canceled = false
`),
connection.query(`
SELECT COUNT(*) as count
FROM purchase_orders po
WHERE po.status != 0
`)
]);
processedOrders = orderCount.count;
processedPurchaseOrders = poCount.count;
outputProgress({ outputProgress({
status: 'running', status: 'running',
operation: 'Starting vendor metrics calculation', operation: 'Starting vendor metrics calculation',
current: processedCount, current: processedCount, // Use passed-in value
total: totalProducts, total: totalVendors, // Report total *vendors*
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts), remaining: estimateRemaining(startTime, processedCount, totalVendors),
rate: calculateRate(startTime, processedCount), rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1), percentage: ((processedCount / totalVendors) * 100).toFixed(1), // Base on vendors
timing: { timing: {
start_time: new Date(startTime).toISOString(), start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(), end_time: new Date().toISOString(),
@@ -64,282 +86,197 @@ async function calculateVendorMetrics(startTime, totalProducts, processedCount =
} }
}); });
// First ensure all vendors exist in vendor_details // Process in batches
await connection.query(` let lastVendor = '';
INSERT IGNORE INTO vendor_details (vendor, status, created_at, updated_at) let processedVendors = 0; // Track processed vendors
SELECT DISTINCT while (true) {
vendor, if (isCancelled) break;
'active' as status,
NOW() as created_at,
NOW() as updated_at
FROM products
WHERE vendor IS NOT NULL
`);
processedCount = Math.floor(totalProducts * 0.8); // Get batch of vendors using EXISTS for better performance
outputProgress({ const [batch] = await connection.query(`
status: 'running', SELECT DISTINCT v.vendor
operation: 'Vendor details updated, calculating metrics', FROM vendor_details v
current: processedCount, WHERE v.status = 'active'
total: totalProducts, AND v.vendor > ?
elapsed: formatElapsedTime(startTime), AND (
remaining: estimateRemaining(startTime, processedCount, totalProducts), EXISTS (
rate: calculateRate(startTime, processedCount), SELECT 1
percentage: ((processedCount / totalProducts) * 100).toFixed(1), FROM products p
timing: { WHERE p.vendor = v.vendor
start_time: new Date(startTime).toISOString(), AND p.updated > ?
end_time: new Date().toISOString(), )
elapsed_seconds: Math.round((Date.now() - startTime) / 1000) OR EXISTS (
} SELECT 1
}); FROM purchase_orders po
WHERE po.vendor = v.vendor
AND po.updated > ?
)
)
ORDER BY v.vendor
LIMIT ?
`, [lastVendor, lastCalculationTime, lastCalculationTime, BATCH_SIZE]);
if (isCancelled) return { if (batch.length === 0) break;
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders,
success
};
// Now calculate vendor metrics // Create temporary tables with optimized structure and indexes
await connection.query(` await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_purchase_stats');
INSERT INTO vendor_metrics ( await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_product_stats');
vendor,
total_revenue, await connection.query(`
total_orders, CREATE TEMPORARY TABLE temp_purchase_stats (
total_late_orders, vendor VARCHAR(100) NOT NULL,
avg_lead_time_days, avg_lead_time_days DECIMAL(10,2),
on_time_delivery_rate, total_orders INT,
order_fill_rate, total_late_orders INT,
avg_order_value, total_purchase_value DECIMAL(15,2),
active_products, avg_order_value DECIMAL(15,2),
total_products, on_time_delivery_rate DECIMAL(5,2),
total_purchase_value, order_fill_rate DECIMAL(5,2),
avg_margin_percent, PRIMARY KEY (vendor),
status, INDEX (total_orders),
last_calculated_at INDEX (total_purchase_value)
) ) ENGINE=MEMORY
WITH vendor_sales AS ( `);
await connection.query(`
CREATE TEMPORARY TABLE temp_product_stats (
vendor VARCHAR(100) NOT NULL,
total_products INT,
active_products INT,
avg_margin_percent DECIMAL(5,2),
total_revenue DECIMAL(15,2),
PRIMARY KEY (vendor),
INDEX (total_products),
INDEX (total_revenue)
) ENGINE=MEMORY
`);
// Populate purchase_stats temp table with optimized index usage
await connection.query(`
INSERT INTO temp_purchase_stats
SELECT
po.vendor,
AVG(DATEDIFF(po.received_date, po.date)) as avg_lead_time_days,
COUNT(DISTINCT po.po_id) as total_orders,
COUNT(CASE WHEN DATEDIFF(po.received_date, po.date) > 30 THEN 1 END) as total_late_orders,
SUM(po.ordered * po.po_cost_price) as total_purchase_value,
AVG(po.ordered * po.po_cost_price) as avg_order_value,
(COUNT(CASE WHEN DATEDIFF(po.received_date, po.date) <= 30 THEN 1 END) / COUNT(*)) * 100 as on_time_delivery_rate,
(SUM(LEAST(po.received, po.ordered)) / NULLIF(SUM(po.ordered), 0)) * 100 as order_fill_rate
FROM purchase_orders po
FORCE INDEX (idx_vendor)
WHERE po.vendor IN (?)
AND po.received_date IS NOT NULL
AND po.date >= DATE_SUB(CURRENT_DATE, INTERVAL 365 DAY)
AND po.updated > ?
GROUP BY po.vendor
`, [batch.map(row => row.vendor), lastCalculationTime]);
// Populate product stats with optimized index usage
await connection.query(`
INSERT INTO temp_product_stats
SELECT SELECT
p.vendor, p.vendor,
SUM(o.quantity * o.price) as total_revenue, COUNT(DISTINCT p.pid) as product_count,
COUNT(DISTINCT o.id) as total_orders, COUNT(DISTINCT CASE WHEN p.visible = true THEN p.pid END) as active_products,
COUNT(DISTINCT p.pid) as active_products, AVG(pm.avg_margin_percent) as avg_margin,
SUM(o.quantity * (o.price - p.cost_price)) as total_margin SUM(pm.total_revenue) as total_revenue
FROM products p FROM products p
JOIN orders o ON p.pid = o.pid FORCE INDEX (idx_vendor)
WHERE o.canceled = false LEFT JOIN product_metrics pm FORCE INDEX (PRIMARY) ON p.pid = pm.pid
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH) WHERE p.vendor IN (?)
AND (
p.updated > ?
OR EXISTS (
SELECT 1 FROM orders o FORCE INDEX (idx_orders_metrics)
WHERE o.pid = p.pid
AND o.updated > ?
)
)
GROUP BY p.vendor GROUP BY p.vendor
), `, [batch.map(row => row.vendor), lastCalculationTime, lastCalculationTime]);
vendor_po AS (
SELECT // Update metrics using temp tables with optimized join order
p.vendor, await connection.query(`
COUNT(DISTINCT CASE WHEN po.receiving_status = 40 THEN po.id END) as received_orders, INSERT INTO vendor_metrics (
COUNT(DISTINCT po.id) as total_orders,
AVG(CASE
WHEN po.receiving_status = 40
THEN DATEDIFF(po.received_date, po.date)
END) as avg_lead_time_days,
SUM(po.ordered * po.po_cost_price) as total_purchase_value
FROM products p
JOIN purchase_orders po ON p.pid = po.pid
WHERE po.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH)
GROUP BY p.vendor
),
vendor_products AS (
SELECT
vendor, vendor,
COUNT(DISTINCT pid) as total_products avg_lead_time_days,
FROM products on_time_delivery_rate,
GROUP BY vendor order_fill_rate,
) total_orders,
SELECT total_late_orders,
vs.vendor, total_purchase_value,
COALESCE(vs.total_revenue, 0) as total_revenue, avg_order_value,
COALESCE(vp.total_orders, 0) as total_orders, active_products,
COALESCE(vp.total_orders - vp.received_orders, 0) as total_late_orders, total_products,
COALESCE(vp.avg_lead_time_days, 0) as avg_lead_time_days, total_revenue,
CASE avg_margin_percent,
WHEN vp.total_orders > 0 status,
THEN (vp.received_orders / vp.total_orders) * 100 last_calculated_at
ELSE 0 )
END as on_time_delivery_rate,
CASE
WHEN vp.total_orders > 0
THEN (vp.received_orders / vp.total_orders) * 100
ELSE 0
END as order_fill_rate,
CASE
WHEN vs.total_orders > 0
THEN vs.total_revenue / vs.total_orders
ELSE 0
END as avg_order_value,
COALESCE(vs.active_products, 0) as active_products,
COALESCE(vpr.total_products, 0) as total_products,
COALESCE(vp.total_purchase_value, 0) as total_purchase_value,
CASE
WHEN vs.total_revenue > 0
THEN (vs.total_margin / vs.total_revenue) * 100
ELSE 0
END as avg_margin_percent,
'active' as status,
NOW() as last_calculated_at
FROM vendor_sales vs
LEFT JOIN vendor_po vp ON vs.vendor = vp.vendor
LEFT JOIN vendor_products vpr ON vs.vendor = vpr.vendor
WHERE vs.vendor IS NOT NULL
ON DUPLICATE KEY UPDATE
total_revenue = VALUES(total_revenue),
total_orders = VALUES(total_orders),
total_late_orders = VALUES(total_late_orders),
avg_lead_time_days = VALUES(avg_lead_time_days),
on_time_delivery_rate = VALUES(on_time_delivery_rate),
order_fill_rate = VALUES(order_fill_rate),
avg_order_value = VALUES(avg_order_value),
active_products = VALUES(active_products),
total_products = VALUES(total_products),
total_purchase_value = VALUES(total_purchase_value),
avg_margin_percent = VALUES(avg_margin_percent),
status = VALUES(status),
last_calculated_at = VALUES(last_calculated_at)
`);
processedCount = Math.floor(totalProducts * 0.9);
outputProgress({
status: 'running',
operation: 'Vendor metrics calculated, updating time-based metrics',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (isCancelled) return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders,
success
};
// Calculate time-based metrics
await connection.query(`
INSERT INTO vendor_time_metrics (
vendor,
year,
month,
total_orders,
late_orders,
avg_lead_time_days,
total_purchase_value,
total_revenue,
avg_margin_percent
)
WITH monthly_orders AS (
SELECT SELECT
p.vendor, v.vendor,
YEAR(o.date) as year, COALESCE(ps.avg_lead_time_days, 0) as avg_lead_time_days,
MONTH(o.date) as month, COALESCE(ps.on_time_delivery_rate, 0) as on_time_delivery_rate,
COUNT(DISTINCT o.id) as total_orders, COALESCE(ps.order_fill_rate, 0) as order_fill_rate,
SUM(o.quantity * o.price) as total_revenue, COALESCE(ps.total_orders, 0) as total_orders,
SUM(o.quantity * (o.price - p.cost_price)) as total_margin COALESCE(ps.total_late_orders, 0) as total_late_orders,
FROM products p COALESCE(ps.total_purchase_value, 0) as total_purchase_value,
JOIN orders o ON p.pid = o.pid COALESCE(ps.avg_order_value, 0) as avg_order_value,
WHERE o.canceled = false COALESCE(prs.active_products, 0) as active_products,
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH) COALESCE(prs.total_products, 0) as total_products,
AND p.vendor IS NOT NULL COALESCE(prs.total_revenue, 0) as total_revenue,
GROUP BY p.vendor, YEAR(o.date), MONTH(o.date) COALESCE(prs.avg_margin_percent, 0) as avg_margin_percent,
), v.status,
monthly_po AS ( NOW() as last_calculated_at
SELECT FROM vendor_details v
p.vendor, FORCE INDEX (PRIMARY)
YEAR(po.date) as year, LEFT JOIN temp_purchase_stats ps ON v.vendor = ps.vendor
MONTH(po.date) as month, LEFT JOIN temp_product_stats prs ON v.vendor = prs.vendor
COUNT(DISTINCT po.id) as total_po, WHERE v.vendor IN (?)
COUNT(DISTINCT CASE ON DUPLICATE KEY UPDATE
WHEN po.receiving_status = 40 AND po.received_date > po.expected_date avg_lead_time_days = VALUES(avg_lead_time_days),
THEN po.id on_time_delivery_rate = VALUES(on_time_delivery_rate),
END) as late_orders, order_fill_rate = VALUES(order_fill_rate),
AVG(CASE total_orders = VALUES(total_orders),
WHEN po.receiving_status = 40 total_late_orders = VALUES(total_late_orders),
THEN DATEDIFF(po.received_date, po.date) total_purchase_value = VALUES(total_purchase_value),
END) as avg_lead_time_days, avg_order_value = VALUES(avg_order_value),
SUM(po.ordered * po.po_cost_price) as total_purchase_value active_products = VALUES(active_products),
FROM products p total_products = VALUES(total_products),
JOIN purchase_orders po ON p.pid = po.pid total_revenue = VALUES(total_revenue),
WHERE po.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH) avg_margin_percent = VALUES(avg_margin_percent),
AND p.vendor IS NOT NULL status = VALUES(status),
GROUP BY p.vendor, YEAR(po.date), MONTH(po.date) last_calculated_at = NOW()
) `, [batch.map(row => row.vendor)]);
SELECT
mo.vendor,
mo.year,
mo.month,
COALESCE(mp.total_po, 0) as total_orders,
COALESCE(mp.late_orders, 0) as late_orders,
COALESCE(mp.avg_lead_time_days, 0) as avg_lead_time_days,
COALESCE(mp.total_purchase_value, 0) as total_purchase_value,
mo.total_revenue,
CASE
WHEN mo.total_revenue > 0
THEN (mo.total_margin / mo.total_revenue) * 100
ELSE 0
END as avg_margin_percent
FROM monthly_orders mo
LEFT JOIN monthly_po mp ON mo.vendor = mp.vendor
AND mo.year = mp.year
AND mo.month = mp.month
UNION
SELECT
mp.vendor,
mp.year,
mp.month,
mp.total_po as total_orders,
mp.late_orders,
mp.avg_lead_time_days,
mp.total_purchase_value,
0 as total_revenue,
0 as avg_margin_percent
FROM monthly_po mp
LEFT JOIN monthly_orders mo ON mp.vendor = mo.vendor
AND mp.year = mo.year
AND mp.month = mo.month
WHERE mo.vendor IS NULL
ON DUPLICATE KEY UPDATE
total_orders = VALUES(total_orders),
late_orders = VALUES(late_orders),
avg_lead_time_days = VALUES(avg_lead_time_days),
total_purchase_value = VALUES(total_purchase_value),
total_revenue = VALUES(total_revenue),
avg_margin_percent = VALUES(avg_margin_percent)
`);
processedCount = Math.floor(totalProducts * 0.95); // Clean up temp tables
outputProgress({ await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_purchase_stats');
status: 'running', await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_product_stats');
operation: 'Time-based vendor metrics calculated',
current: processedCount, lastVendor = batch[batch.length - 1].vendor;
total: totalProducts, processedVendors += batch.length; // Increment processed *vendors*
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts), outputProgress({
rate: calculateRate(startTime, processedCount), status: 'running',
percentage: ((processedCount / totalProducts) * 100).toFixed(1), operation: 'Processing vendor metrics batch',
timing: { current: processedCount + processedVendors, // Use cumulative vendor count
start_time: new Date(startTime).toISOString(), total: totalVendors, // Report total *vendors*
end_time: new Date().toISOString(), elapsed: formatElapsedTime(startTime),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000) remaining: estimateRemaining(startTime, processedCount + processedVendors, totalVendors),
} rate: calculateRate(startTime, processedCount + processedVendors),
}); percentage: (((processedCount + processedVendors) / totalVendors) * 100).toFixed(1), // Base on vendors
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
}
// If we get here, everything completed successfully // If we get here, everything completed successfully
success = true; success = true;
// Update calculate_status // Update calculate_status
await connection.query(` await connection.query(`
INSERT INTO calculate_status (module_name, last_calculation_timestamp) INSERT INTO calculate_status (module_name, last_calculation_timestamp)
@@ -348,9 +285,9 @@ async function calculateVendorMetrics(startTime, totalProducts, processedCount =
`); `);
return { return {
processedProducts: processedCount, processedProducts: 0, // No products directly processed
processedOrders, processedOrders: 0,
processedPurchaseOrders, processedPurchaseOrders: 0,
success success
}; };

View File

@@ -4,23 +4,29 @@ import { ScrollArea } from "@/components/ui/scroll-area"
import { Table, TableBody, TableCell, TableHead, TableHeader, TableRow } from "@/components/ui/table" import { Table, TableBody, TableCell, TableHead, TableHeader, TableRow } from "@/components/ui/table"
import config from "@/config" import config from "@/config"
interface Product { interface ReplenishmentMetricsData {
pid: number; productsToReplenish: number;
sku: string; unitsToReplenish: number;
title: string; replenishmentCost: number;
stock_quantity: number; replenishmentRetail: number;
daily_sales_avg: string; topVariants: {
reorder_qty: number; id: number;
last_purchase_date: string | null; title: string;
currentStock: number;
replenishQty: number;
replenishCost: number;
replenishRetail: number;
status: string;
}[];
} }
export function TopReplenishProducts() { export function TopReplenishProducts() {
const { data } = useQuery<Product[]>({ const { data } = useQuery<ReplenishmentMetricsData>({
queryKey: ["top-replenish-products"], queryKey: ["replenishment-metrics"],
queryFn: async () => { queryFn: async () => {
const response = await fetch(`${config.apiUrl}/dashboard/replenish/products?limit=50`) const response = await fetch(`${config.apiUrl}/dashboard/replenishment/metrics`)
if (!response.ok) { if (!response.ok) {
throw new Error("Failed to fetch products to replenish") throw new Error("Failed to fetch replenishment metrics")
} }
return response.json() return response.json()
}, },
@@ -38,29 +44,28 @@ export function TopReplenishProducts() {
<TableRow> <TableRow>
<TableHead>Product</TableHead> <TableHead>Product</TableHead>
<TableHead className="text-right">Stock</TableHead> <TableHead className="text-right">Stock</TableHead>
<TableHead className="text-right">Daily Sales</TableHead>
<TableHead className="text-right">Reorder Qty</TableHead> <TableHead className="text-right">Reorder Qty</TableHead>
<TableHead>Last Purchase</TableHead> <TableHead className="text-right">Cost</TableHead>
<TableHead>Status</TableHead>
</TableRow> </TableRow>
</TableHeader> </TableHeader>
<TableBody> <TableBody>
{data?.map((product) => ( {data?.topVariants?.map((product) => (
<TableRow key={product.pid}> <TableRow key={product.id}>
<TableCell> <TableCell>
<a <a
href={`https://backend.acherryontop.com/product/${product.pid}`} href={`https://backend.acherryontop.com/product/${product.id}`}
target="_blank" target="_blank"
rel="noopener noreferrer" rel="noopener noreferrer"
className="hover:underline" className="hover:underline"
> >
{product.title} {product.title}
</a> </a>
<div className="text-sm text-muted-foreground">{product.sku}</div>
</TableCell> </TableCell>
<TableCell className="text-right">{product.stock_quantity}</TableCell> <TableCell className="text-right">{product.currentStock}</TableCell>
<TableCell className="text-right">{Number(product.daily_sales_avg).toFixed(1)}</TableCell> <TableCell className="text-right">{product.replenishQty}</TableCell>
<TableCell className="text-right">{product.reorder_qty}</TableCell> <TableCell className="text-right">${product.replenishCost.toFixed(2)}</TableCell>
<TableCell>{product.last_purchase_date ? product.last_purchase_date : '-'}</TableCell> <TableCell>{product.status}</TableCell>
</TableRow> </TableRow>
))} ))}
</TableBody> </TableBody>