19 Commits

Author SHA1 Message Date
eea57528ab Try to speed up category calcs 2025-02-10 11:21:20 -05:00
3d2d1b3946 Try to speed up brand calcs 2025-02-10 10:20:32 -05:00
d936d50f83 Vendor calculate script fix 2025-02-10 09:26:24 -05:00
610e26689c Try to speed up calculate script + fixes 2025-02-10 01:29:01 -05:00
7ff757203f Calculate script fixes 2025-02-09 15:40:57 -05:00
843ce71506 Make calculations incremental 2025-02-09 13:35:44 -05:00
2a6a0d0a87 Fixed calculations for frontend (likely still wrong but they display) + related regressions to calculate script 2025-02-05 00:02:06 -05:00
ebffb8f912 Enhance calculate scripts to deal with times and counts + fix regressions 2025-02-03 22:21:39 -05:00
5676e9094d Add calculate time tracking 2025-02-02 21:22:46 -05:00
b926aba9ff Add calculate history tracking 2025-02-02 20:41:23 -05:00
e62c6ac8ee Fix issues with change tracking 2025-02-02 20:24:23 -05:00
18f4970059 Set up change tracking in core tables 2025-02-02 19:12:39 -05:00
12cab7473a Fix calculate script regressions 2025-02-02 09:27:06 -05:00
06b0f1251e Fix import script regressions 2025-02-02 01:40:05 -05:00
8a43da502a Fix (probably) discrepancies and errors in import/calculate scripts 2025-02-02 00:01:46 -05:00
bd5bcdd548 Fix calculate errors 2025-02-01 23:38:13 -05:00
0a51328da2 Add a bunch of untested calculations enhancements based on import script changes 2025-02-01 14:46:17 -05:00
b2d7744cc5 Merge branch 'Improve-data-import' 2025-02-01 14:09:34 -05:00
8124fc9add Update gitignore 2025-02-01 14:09:25 -05:00
17 changed files with 2506 additions and 6938 deletions

1
.gitignore vendored
View File

@@ -26,6 +26,7 @@ dist-ssr
dashboard/build/** dashboard/build/**
dashboard-server/frontend/build/** dashboard-server/frontend/build/**
**/build/** **/build/**
.fuse_hidden**
._* ._*
# Build directories # Build directories

View File

@@ -171,6 +171,39 @@ ORDER BY
c.name, c.name,
st.vendor; st.vendor;
CREATE TABLE IF NOT EXISTS calculate_history (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
start_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
end_time TIMESTAMP NULL,
duration_seconds INT,
duration_minutes DECIMAL(10,2) GENERATED ALWAYS AS (duration_seconds / 60.0) STORED,
total_products INT DEFAULT 0,
total_orders INT DEFAULT 0,
total_purchase_orders INT DEFAULT 0,
processed_products INT DEFAULT 0,
processed_orders INT DEFAULT 0,
processed_purchase_orders INT DEFAULT 0,
status ENUM('running', 'completed', 'failed', 'cancelled') DEFAULT 'running',
error_message TEXT,
additional_info JSON,
INDEX idx_status_time (status, start_time)
);
CREATE TABLE IF NOT EXISTS calculate_status (
module_name ENUM(
'product_metrics',
'time_aggregates',
'financial_metrics',
'vendor_metrics',
'category_metrics',
'brand_metrics',
'sales_forecasts',
'abc_classification'
) PRIMARY KEY,
last_calculation_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
INDEX idx_last_calc (last_calculation_timestamp)
);
CREATE TABLE IF NOT EXISTS sync_status ( CREATE TABLE IF NOT EXISTS sync_status (
table_name VARCHAR(50) PRIMARY KEY, table_name VARCHAR(50) PRIMARY KEY,
last_sync_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, last_sync_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,

View File

@@ -126,13 +126,13 @@ CREATE TABLE IF NOT EXISTS vendor_metrics (
order_fill_rate DECIMAL(5,2), order_fill_rate DECIMAL(5,2),
total_orders INT DEFAULT 0, total_orders INT DEFAULT 0,
total_late_orders INT DEFAULT 0, total_late_orders INT DEFAULT 0,
total_purchase_value DECIMAL(10,3) DEFAULT 0, total_purchase_value DECIMAL(15,3) DEFAULT 0,
avg_order_value DECIMAL(10,3), avg_order_value DECIMAL(15,3),
-- Product metrics -- Product metrics
active_products INT DEFAULT 0, active_products INT DEFAULT 0,
total_products INT DEFAULT 0, total_products INT DEFAULT 0,
-- Financial metrics -- Financial metrics
total_revenue DECIMAL(10,3) DEFAULT 0, total_revenue DECIMAL(15,3) DEFAULT 0,
avg_margin_percent DECIMAL(5,2), avg_margin_percent DECIMAL(5,2),
-- Status -- Status
status VARCHAR(20) DEFAULT 'active', status VARCHAR(20) DEFAULT 'active',

View File

@@ -51,13 +51,15 @@ CREATE TABLE products (
baskets INT UNSIGNED DEFAULT 0, baskets INT UNSIGNED DEFAULT 0,
notifies INT UNSIGNED DEFAULT 0, notifies INT UNSIGNED DEFAULT 0,
date_last_sold DATE, date_last_sold DATE,
updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (pid), PRIMARY KEY (pid),
INDEX idx_sku (SKU), INDEX idx_sku (SKU),
INDEX idx_vendor (vendor), INDEX idx_vendor (vendor),
INDEX idx_brand (brand), INDEX idx_brand (brand),
INDEX idx_location (location), INDEX idx_location (location),
INDEX idx_total_sold (total_sold), INDEX idx_total_sold (total_sold),
INDEX idx_date_last_sold (date_last_sold) INDEX idx_date_last_sold (date_last_sold),
INDEX idx_updated (updated)
) ENGINE=InnoDB; ) ENGINE=InnoDB;
-- Create categories table with hierarchy support -- Create categories table with hierarchy support
@@ -118,6 +120,7 @@ CREATE TABLE IF NOT EXISTS orders (
customer_name VARCHAR(100), customer_name VARCHAR(100),
status VARCHAR(20) DEFAULT 'pending', status VARCHAR(20) DEFAULT 'pending',
canceled TINYINT(1) DEFAULT 0, canceled TINYINT(1) DEFAULT 0,
updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id), PRIMARY KEY (id),
UNIQUE KEY unique_order_line (order_number, pid), UNIQUE KEY unique_order_line (order_number, pid),
KEY order_number (order_number), KEY order_number (order_number),
@@ -125,7 +128,8 @@ CREATE TABLE IF NOT EXISTS orders (
KEY customer (customer), KEY customer (customer),
KEY date (date), KEY date (date),
KEY status (status), KEY status (status),
INDEX idx_orders_metrics (pid, date, canceled) INDEX idx_orders_metrics (pid, date, canceled),
INDEX idx_updated (updated)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- Create purchase_orders table with its indexes -- Create purchase_orders table with its indexes
@@ -148,8 +152,9 @@ CREATE TABLE purchase_orders (
received INT DEFAULT 0, received INT DEFAULT 0,
received_date DATE COMMENT 'Date of first receiving', received_date DATE COMMENT 'Date of first receiving',
last_received_date DATE COMMENT 'Date of most recent receiving', last_received_date DATE COMMENT 'Date of most recent receiving',
received_by INT, received_by VARCHAR(100) COMMENT 'Name of person who first received this PO line',
receiving_history JSON COMMENT 'Array of receiving records with qty, date, cost, receiving_id, and alt_po flag', receiving_history JSON COMMENT 'Array of receiving records with qty, date, cost, receiving_id, and alt_po flag',
updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
FOREIGN KEY (pid) REFERENCES products(pid), FOREIGN KEY (pid) REFERENCES products(pid),
INDEX idx_po_id (po_id), INDEX idx_po_id (po_id),
INDEX idx_vendor (vendor), INDEX idx_vendor (vendor),
@@ -159,6 +164,7 @@ CREATE TABLE purchase_orders (
INDEX idx_po_metrics (pid, date, receiving_status, received_date), INDEX idx_po_metrics (pid, date, receiving_status, received_date),
INDEX idx_po_product_date (pid, date), INDEX idx_po_product_date (pid, date),
INDEX idx_po_product_status (pid, status), INDEX idx_po_product_status (pid, status),
INDEX idx_updated (updated),
UNIQUE KEY unique_po_product (po_id, pid) UNIQUE KEY unique_po_product (po_id, pid)
) ENGINE=InnoDB; ) ENGINE=InnoDB;

File diff suppressed because it is too large Load Diff

View File

@@ -7,13 +7,13 @@ require('dotenv').config({ path: path.resolve(__dirname, '..', '.env') });
// Configuration flags for controlling which metrics to calculate // Configuration flags for controlling which metrics to calculate
// Set to 1 to skip the corresponding calculation, 0 to run it // Set to 1 to skip the corresponding calculation, 0 to run it
const SKIP_PRODUCT_METRICS = 1; // Skip all product metrics const SKIP_PRODUCT_METRICS = 0;
const SKIP_TIME_AGGREGATES = 1; // Skip time aggregates const SKIP_TIME_AGGREGATES = 0;
const SKIP_FINANCIAL_METRICS = 1; // Skip financial metrics const SKIP_FINANCIAL_METRICS = 0;
const SKIP_VENDOR_METRICS = 1; // Skip vendor metrics const SKIP_VENDOR_METRICS = 0;
const SKIP_CATEGORY_METRICS = 1; // Skip category metrics const SKIP_CATEGORY_METRICS = 0;
const SKIP_BRAND_METRICS = 1; // Skip brand metrics const SKIP_BRAND_METRICS = 0;
const SKIP_SALES_FORECASTS = 1; // Skip sales forecasts const SKIP_SALES_FORECASTS = 0;
// Add error handler for uncaught exceptions // Add error handler for uncaught exceptions
process.on('uncaughtException', (error) => { process.on('uncaughtException', (error) => {
@@ -83,10 +83,115 @@ process.on('SIGTERM', cancelCalculation);
async function calculateMetrics() { async function calculateMetrics() {
let connection; let connection;
const startTime = Date.now(); const startTime = Date.now();
let processedCount = 0; let processedProducts = 0;
let processedOrders = 0;
let processedPurchaseOrders = 0;
let totalProducts = 0; let totalProducts = 0;
let totalOrders = 0;
let totalPurchaseOrders = 0;
let calculateHistoryId;
try { try {
// Clean up any previously running calculations
connection = await getConnection();
await connection.query(`
UPDATE calculate_history
SET
status = 'cancelled',
end_time = NOW(),
duration_seconds = TIMESTAMPDIFF(SECOND, start_time, NOW()),
error_message = 'Previous calculation was not completed properly'
WHERE status = 'running'
`);
// Get counts of records that need updating based on last calculation time
const [[productCount], [orderCount], [poCount]] = await Promise.all([
connection.query(`
SELECT COUNT(DISTINCT p.pid) as total
FROM products p
FORCE INDEX (PRIMARY)
LEFT JOIN calculate_status cs ON cs.module_name = 'product_metrics'
LEFT JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid
AND o.updated > COALESCE(cs.last_calculation_timestamp, '1970-01-01')
AND o.canceled = false
LEFT JOIN purchase_orders po FORCE INDEX (idx_purchase_orders_metrics) ON p.pid = po.pid
AND po.updated > COALESCE(cs.last_calculation_timestamp, '1970-01-01')
WHERE p.updated > COALESCE(cs.last_calculation_timestamp, '1970-01-01')
OR o.pid IS NOT NULL
OR po.pid IS NOT NULL
`),
connection.query(`
SELECT COUNT(DISTINCT o.id) as total
FROM orders o
FORCE INDEX (idx_orders_metrics)
LEFT JOIN calculate_status cs ON cs.module_name = 'product_metrics'
WHERE o.updated > COALESCE(cs.last_calculation_timestamp, '1970-01-01')
AND o.canceled = false
`),
connection.query(`
SELECT COUNT(DISTINCT po.id) as total
FROM purchase_orders po
FORCE INDEX (idx_purchase_orders_metrics)
LEFT JOIN calculate_status cs ON cs.module_name = 'product_metrics'
WHERE po.updated > COALESCE(cs.last_calculation_timestamp, '1970-01-01')
`)
]);
totalProducts = productCount.total;
totalOrders = orderCount.total;
totalPurchaseOrders = poCount.total;
// If nothing needs updating, we can exit early
if (totalProducts === 0 && totalOrders === 0 && totalPurchaseOrders === 0) {
console.log('No records need updating');
return {
processedProducts: 0,
processedOrders: 0,
processedPurchaseOrders: 0,
success: true
};
}
// Create history record for this calculation
const [historyResult] = await connection.query(`
INSERT INTO calculate_history (
start_time,
status,
total_products,
total_orders,
total_purchase_orders,
additional_info
) VALUES (
NOW(),
'running',
?,
?,
?,
JSON_OBJECT(
'skip_product_metrics', ?,
'skip_time_aggregates', ?,
'skip_financial_metrics', ?,
'skip_vendor_metrics', ?,
'skip_category_metrics', ?,
'skip_brand_metrics', ?,
'skip_sales_forecasts', ?
)
)
`, [
totalProducts,
totalOrders,
totalPurchaseOrders,
SKIP_PRODUCT_METRICS,
SKIP_TIME_AGGREGATES,
SKIP_FINANCIAL_METRICS,
SKIP_VENDOR_METRICS,
SKIP_CATEGORY_METRICS,
SKIP_BRAND_METRICS,
SKIP_SALES_FORECASTS
]);
calculateHistoryId = historyResult.insertId;
connection.release();
// Add debug logging for the progress functions // Add debug logging for the progress functions
console.log('Debug - Progress functions:', { console.log('Debug - Progress functions:', {
formatElapsedTime: typeof global.formatElapsedTime, formatElapsedTime: typeof global.formatElapsedTime,
@@ -115,72 +220,150 @@ async function calculateMetrics() {
elapsed: '0s', elapsed: '0s',
remaining: 'Calculating...', remaining: 'Calculating...',
rate: 0, rate: 0,
percentage: '0' percentage: '0',
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
}); });
// Get total number of products // Update progress periodically
const [countResult] = await connection.query('SELECT COUNT(*) as total FROM products') const updateProgress = async (products = null, orders = null, purchaseOrders = null) => {
.catch(err => { // Ensure all values are valid numbers or default to previous value
global.logError(err, 'Failed to count products'); if (products !== null) processedProducts = Number(products) || processedProducts || 0;
throw err; if (orders !== null) processedOrders = Number(orders) || processedOrders || 0;
}); if (purchaseOrders !== null) processedPurchaseOrders = Number(purchaseOrders) || processedPurchaseOrders || 0;
totalProducts = countResult[0].total;
// Ensure we never send NaN to the database
const safeProducts = Number(processedProducts) || 0;
const safeOrders = Number(processedOrders) || 0;
const safePurchaseOrders = Number(processedPurchaseOrders) || 0;
await connection.query(`
UPDATE calculate_history
SET
processed_products = ?,
processed_orders = ?,
processed_purchase_orders = ?
WHERE id = ?
`, [safeProducts, safeOrders, safePurchaseOrders, calculateHistoryId]);
};
// Helper function to ensure valid progress numbers
const ensureValidProgress = (current, total) => ({
current: Number(current) || 0,
total: Number(total) || 1, // Default to 1 to avoid division by zero
percentage: (((Number(current) || 0) / (Number(total) || 1)) * 100).toFixed(1)
});
// Initial progress
const initialProgress = ensureValidProgress(0, totalProducts);
global.outputProgress({
status: 'running',
operation: 'Starting metrics calculation',
current: initialProgress.current,
total: initialProgress.total,
elapsed: '0s',
remaining: 'Calculating...',
rate: 0,
percentage: initialProgress.percentage,
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (!SKIP_PRODUCT_METRICS) { if (!SKIP_PRODUCT_METRICS) {
processedCount = await calculateProductMetrics(startTime, totalProducts); const result = await calculateProductMetrics(startTime, totalProducts, processedProducts, isCancelled);
await updateProgress(result.processedProducts, result.processedOrders, result.processedPurchaseOrders);
if (!result.success) {
throw new Error('Product metrics calculation failed');
}
} else { } else {
console.log('Skipping product metrics calculation...'); console.log('Skipping product metrics calculation...');
processedCount = Math.floor(totalProducts * 0.6); processedProducts = Math.floor(totalProducts * 0.6);
await updateProgress(processedProducts);
global.outputProgress({ global.outputProgress({
status: 'running', status: 'running',
operation: 'Skipping product metrics calculation', operation: 'Skipping product metrics calculation',
current: processedCount, current: processedProducts,
total: totalProducts, total: totalProducts,
elapsed: global.formatElapsedTime(startTime), elapsed: global.formatElapsedTime(startTime),
remaining: global.estimateRemaining(startTime, processedCount, totalProducts), remaining: global.estimateRemaining(startTime, processedProducts, totalProducts),
rate: global.calculateRate(startTime, processedCount), rate: global.calculateRate(startTime, processedProducts),
percentage: '60' percentage: '60',
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
}); });
} }
// Calculate time-based aggregates // Calculate time-based aggregates
if (!SKIP_TIME_AGGREGATES) { if (!SKIP_TIME_AGGREGATES) {
processedCount = await calculateTimeAggregates(startTime, totalProducts, processedCount); const result = await calculateTimeAggregates(startTime, totalProducts, processedProducts);
await updateProgress(result.processedProducts, result.processedOrders, result.processedPurchaseOrders);
if (!result.success) {
throw new Error('Time aggregates calculation failed');
}
} else { } else {
console.log('Skipping time aggregates calculation'); console.log('Skipping time aggregates calculation');
} }
// Calculate financial metrics // Calculate financial metrics
if (!SKIP_FINANCIAL_METRICS) { if (!SKIP_FINANCIAL_METRICS) {
processedCount = await calculateFinancialMetrics(startTime, totalProducts, processedCount); const result = await calculateFinancialMetrics(startTime, totalProducts, processedProducts);
await updateProgress(result.processedProducts, result.processedOrders, result.processedPurchaseOrders);
if (!result.success) {
throw new Error('Financial metrics calculation failed');
}
} else { } else {
console.log('Skipping financial metrics calculation'); console.log('Skipping financial metrics calculation');
} }
// Calculate vendor metrics // Calculate vendor metrics
if (!SKIP_VENDOR_METRICS) { if (!SKIP_VENDOR_METRICS) {
processedCount = await calculateVendorMetrics(startTime, totalProducts, processedCount); const result = await calculateVendorMetrics(startTime, totalProducts, processedProducts);
await updateProgress(result.processedProducts, result.processedOrders, result.processedPurchaseOrders);
if (!result.success) {
throw new Error('Vendor metrics calculation failed');
}
} else { } else {
console.log('Skipping vendor metrics calculation'); console.log('Skipping vendor metrics calculation');
} }
// Calculate category metrics // Calculate category metrics
if (!SKIP_CATEGORY_METRICS) { if (!SKIP_CATEGORY_METRICS) {
processedCount = await calculateCategoryMetrics(startTime, totalProducts, processedCount); const result = await calculateCategoryMetrics(startTime, totalProducts, processedProducts);
await updateProgress(result.processedProducts, result.processedOrders, result.processedPurchaseOrders);
if (!result.success) {
throw new Error('Category metrics calculation failed');
}
} else { } else {
console.log('Skipping category metrics calculation'); console.log('Skipping category metrics calculation');
} }
// Calculate brand metrics // Calculate brand metrics
if (!SKIP_BRAND_METRICS) { if (!SKIP_BRAND_METRICS) {
processedCount = await calculateBrandMetrics(startTime, totalProducts, processedCount); const result = await calculateBrandMetrics(startTime, totalProducts, processedProducts);
await updateProgress(result.processedProducts, result.processedOrders, result.processedPurchaseOrders);
if (!result.success) {
throw new Error('Brand metrics calculation failed');
}
} else { } else {
console.log('Skipping brand metrics calculation'); console.log('Skipping brand metrics calculation');
} }
// Calculate sales forecasts // Calculate sales forecasts
if (!SKIP_SALES_FORECASTS) { if (!SKIP_SALES_FORECASTS) {
processedCount = await calculateSalesForecasts(startTime, totalProducts, processedCount); const result = await calculateSalesForecasts(startTime, totalProducts, processedProducts);
await updateProgress(result.processedProducts, result.processedOrders, result.processedPurchaseOrders);
if (!result.success) {
throw new Error('Sales forecasts calculation failed');
}
} else { } else {
console.log('Skipping sales forecasts calculation'); console.log('Skipping sales forecasts calculation');
} }
@@ -189,15 +372,25 @@ async function calculateMetrics() {
outputProgress({ outputProgress({
status: 'running', status: 'running',
operation: 'Starting ABC classification', operation: 'Starting ABC classification',
current: processedCount, current: processedProducts || 0,
total: totalProducts, total: totalProducts || 0,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts), remaining: estimateRemaining(startTime, processedProducts || 0, totalProducts || 0),
rate: calculateRate(startTime, processedCount), rate: calculateRate(startTime, processedProducts || 0),
percentage: ((processedCount / totalProducts) * 100).toFixed(1) percentage: (((processedProducts || 0) / (totalProducts || 1)) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
}); });
if (isCancelled) return processedCount; if (isCancelled) return {
processedProducts: processedProducts || 0,
processedOrders: processedOrders || 0,
processedPurchaseOrders: 0,
success: false
};
const [abcConfig] = await connection.query('SELECT a_threshold, b_threshold FROM abc_classification_config WHERE id = 1'); const [abcConfig] = await connection.query('SELECT a_threshold, b_threshold FROM abc_classification_config WHERE id = 1');
const abcThresholds = abcConfig[0] || { a_threshold: 20, b_threshold: 50 }; const abcThresholds = abcConfig[0] || { a_threshold: 20, b_threshold: 50 };
@@ -218,15 +411,25 @@ async function calculateMetrics() {
outputProgress({ outputProgress({
status: 'running', status: 'running',
operation: 'Creating revenue rankings', operation: 'Creating revenue rankings',
current: processedCount, current: processedProducts || 0,
total: totalProducts, total: totalProducts || 0,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts), remaining: estimateRemaining(startTime, processedProducts || 0, totalProducts || 0),
rate: calculateRate(startTime, processedCount), rate: calculateRate(startTime, processedProducts || 0),
percentage: ((processedCount / totalProducts) * 100).toFixed(1) percentage: (((processedProducts || 0) / (totalProducts || 1)) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
}); });
if (isCancelled) return processedCount; if (isCancelled) return {
processedProducts: processedProducts || 0,
processedOrders: processedOrders || 0,
processedPurchaseOrders: 0,
success: false
};
await connection.query(` await connection.query(`
INSERT INTO temp_revenue_ranks INSERT INTO temp_revenue_ranks
@@ -247,26 +450,44 @@ async function calculateMetrics() {
// Get total count for percentage calculation // Get total count for percentage calculation
const [rankingCount] = await connection.query('SELECT MAX(rank_num) as total_count FROM temp_revenue_ranks'); const [rankingCount] = await connection.query('SELECT MAX(rank_num) as total_count FROM temp_revenue_ranks');
const totalCount = rankingCount[0].total_count || 1; const totalCount = rankingCount[0].total_count || 1;
const max_rank = totalCount; // Store max_rank for use in classification
outputProgress({ outputProgress({
status: 'running', status: 'running',
operation: 'Updating ABC classifications', operation: 'Updating ABC classifications',
current: processedCount, current: processedProducts || 0,
total: totalProducts, total: totalProducts || 0,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts), remaining: estimateRemaining(startTime, processedProducts || 0, totalProducts || 0),
rate: calculateRate(startTime, processedCount), rate: calculateRate(startTime, processedProducts || 0),
percentage: ((processedCount / totalProducts) * 100).toFixed(1) percentage: (((processedProducts || 0) / (totalProducts || 1)) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
}); });
if (isCancelled) return processedCount; if (isCancelled) return {
processedProducts: processedProducts || 0,
processedOrders: processedOrders || 0,
processedPurchaseOrders: 0,
success: false
};
// Process updates in batches // ABC classification progress tracking
let abcProcessedCount = 0; let abcProcessedCount = 0;
const batchSize = 5000; const batchSize = 5000;
let lastProgressUpdate = Date.now();
const progressUpdateInterval = 1000; // Update every second
while (true) { while (true) {
if (isCancelled) return processedCount; if (isCancelled) return {
processedProducts: Number(processedProducts) || 0,
processedOrders: Number(processedOrders) || 0,
processedPurchaseOrders: 0,
success: false
};
// First get a batch of PIDs that need updating // First get a batch of PIDs that need updating
const [pids] = await connection.query(` const [pids] = await connection.query(`
@@ -282,8 +503,8 @@ async function calculateMetrics() {
ELSE 'C' ELSE 'C'
END END
LIMIT ? LIMIT ?
`, [totalCount, abcThresholds.a_threshold, `, [max_rank, abcThresholds.a_threshold,
totalCount, abcThresholds.b_threshold, max_rank, abcThresholds.b_threshold,
batchSize]); batchSize]);
if (pids.length === 0) { if (pids.length === 0) {
@@ -303,23 +524,42 @@ async function calculateMetrics() {
END, END,
pm.last_calculated_at = NOW() pm.last_calculated_at = NOW()
WHERE pm.pid IN (?) WHERE pm.pid IN (?)
`, [totalCount, abcThresholds.a_threshold, `, [max_rank, abcThresholds.a_threshold,
totalCount, abcThresholds.b_threshold, max_rank, abcThresholds.b_threshold,
pids.map(row => row.pid)]); pids.map(row => row.pid)]);
abcProcessedCount += result.affectedRows; abcProcessedCount += result.affectedRows;
processedCount = Math.floor(totalProducts * (0.99 + (abcProcessedCount / totalCount) * 0.01));
outputProgress({ // Calculate progress ensuring valid numbers
status: 'running', const currentProgress = Math.floor(totalProducts * (0.99 + (abcProcessedCount / (totalCount || 1)) * 0.01));
operation: 'ABC classification progress', processedProducts = Number(currentProgress) || processedProducts || 0;
current: processedCount,
total: totalProducts, // Only update progress at most once per second
elapsed: formatElapsedTime(startTime), const now = Date.now();
remaining: estimateRemaining(startTime, processedCount, totalProducts), if (now - lastProgressUpdate >= progressUpdateInterval) {
rate: calculateRate(startTime, processedCount), const progress = ensureValidProgress(processedProducts, totalProducts);
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
}); outputProgress({
status: 'running',
operation: 'ABC classification progress',
current: progress.current,
total: progress.total,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, progress.current, progress.total),
rate: calculateRate(startTime, progress.current),
percentage: progress.percentage,
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
lastProgressUpdate = now;
}
// Update database progress
await updateProgress(processedProducts, processedOrders, processedPurchaseOrders);
// Small delay between batches to allow other transactions // Small delay between batches to allow other transactions
await new Promise(resolve => setTimeout(resolve, 100)); await new Promise(resolve => setTimeout(resolve, 100));
@@ -328,49 +568,126 @@ async function calculateMetrics() {
// Clean up // Clean up
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_revenue_ranks'); await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_revenue_ranks');
const endTime = Date.now();
const totalElapsedSeconds = Math.round((endTime - startTime) / 1000);
// Update calculate_status for ABC classification
await connection.query(`
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
VALUES ('abc_classification', NOW())
ON DUPLICATE KEY UPDATE last_calculation_timestamp = NOW()
`);
// Final progress update with guaranteed valid numbers
const finalProgress = ensureValidProgress(totalProducts, totalProducts);
// Final success message // Final success message
outputProgress({ outputProgress({
status: 'complete', status: 'complete',
operation: 'Metrics calculation complete', operation: 'Metrics calculation complete',
current: totalProducts, current: finalProgress.current,
total: totalProducts, total: finalProgress.total,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: '0s', remaining: '0s',
rate: calculateRate(startTime, totalProducts), rate: calculateRate(startTime, finalProgress.current),
percentage: '100' percentage: '100',
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: totalElapsedSeconds
}
}); });
// Ensure all values are valid numbers before final update
const finalStats = {
processedProducts: Number(processedProducts) || 0,
processedOrders: Number(processedOrders) || 0,
processedPurchaseOrders: Number(processedPurchaseOrders) || 0
};
// Update history with completion
await connection.query(`
UPDATE calculate_history
SET
end_time = NOW(),
duration_seconds = ?,
processed_products = ?,
processed_orders = ?,
processed_purchase_orders = ?,
status = 'completed'
WHERE id = ?
`, [totalElapsedSeconds,
finalStats.processedProducts,
finalStats.processedOrders,
finalStats.processedPurchaseOrders,
calculateHistoryId]);
// Clear progress file on successful completion // Clear progress file on successful completion
global.clearProgress(); global.clearProgress();
} catch (error) { } catch (error) {
const endTime = Date.now();
const totalElapsedSeconds = Math.round((endTime - startTime) / 1000);
// Update history with error
await connection.query(`
UPDATE calculate_history
SET
end_time = NOW(),
duration_seconds = ?,
processed_products = ?,
processed_orders = ?,
processed_purchase_orders = ?,
status = ?,
error_message = ?
WHERE id = ?
`, [
totalElapsedSeconds,
processedProducts || 0, // Ensure we have a valid number
processedOrders || 0, // Ensure we have a valid number
processedPurchaseOrders || 0, // Ensure we have a valid number
isCancelled ? 'cancelled' : 'failed',
error.message,
calculateHistoryId
]);
if (isCancelled) { if (isCancelled) {
global.outputProgress({ global.outputProgress({
status: 'cancelled', status: 'cancelled',
operation: 'Calculation cancelled', operation: 'Calculation cancelled',
current: processedCount, current: processedProducts,
total: totalProducts || 0, total: totalProducts || 0,
elapsed: global.formatElapsedTime(startTime), elapsed: global.formatElapsedTime(startTime),
remaining: null, remaining: null,
rate: global.calculateRate(startTime, processedCount), rate: global.calculateRate(startTime, processedProducts),
percentage: ((processedCount / (totalProducts || 1)) * 100).toFixed(1) percentage: ((processedProducts / (totalProducts || 1)) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
}); });
} else { } else {
global.outputProgress({ global.outputProgress({
status: 'error', status: 'error',
operation: 'Error: ' + error.message, operation: 'Error: ' + error.message,
current: processedCount, current: processedProducts,
total: totalProducts || 0, total: totalProducts || 0,
elapsed: global.formatElapsedTime(startTime), elapsed: global.formatElapsedTime(startTime),
remaining: null, remaining: null,
rate: global.calculateRate(startTime, processedCount), rate: global.calculateRate(startTime, processedProducts),
percentage: ((processedCount / (totalProducts || 1)) * 100).toFixed(1) percentage: ((processedProducts / (totalProducts || 1)) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
}); });
} }
throw error; throw error;
} finally { } finally {
if (connection) { if (connection) {
connection.release(); connection.release();
} }
} }
} finally { } finally {

View File

@@ -28,9 +28,18 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
let cumulativeProcessedOrders = 0; let cumulativeProcessedOrders = 0;
try { try {
// Insert temporary table creation queries // Clean up any existing temp tables first
await localConnection.query(` await localConnection.query(`
CREATE TABLE IF NOT EXISTS temp_order_items ( DROP TEMPORARY TABLE IF EXISTS temp_order_items;
DROP TEMPORARY TABLE IF EXISTS temp_order_meta;
DROP TEMPORARY TABLE IF EXISTS temp_order_discounts;
DROP TEMPORARY TABLE IF EXISTS temp_order_taxes;
DROP TEMPORARY TABLE IF EXISTS temp_order_costs;
`);
// Create all temp tables with correct schema
await localConnection.query(`
CREATE TEMPORARY TABLE temp_order_items (
order_id INT UNSIGNED NOT NULL, order_id INT UNSIGNED NOT NULL,
pid INT UNSIGNED NOT NULL, pid INT UNSIGNED NOT NULL,
SKU VARCHAR(50) NOT NULL, SKU VARCHAR(50) NOT NULL,
@@ -40,35 +49,41 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
PRIMARY KEY (order_id, pid) PRIMARY KEY (order_id, pid)
) ENGINE=InnoDB DEFAULT CHARSET=utf8; ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
`); `);
await localConnection.query(` await localConnection.query(`
CREATE TABLE IF NOT EXISTS temp_order_meta ( CREATE TEMPORARY TABLE temp_order_meta (
order_id INT UNSIGNED NOT NULL, order_id INT UNSIGNED NOT NULL,
date DATE NOT NULL, date DATE NOT NULL,
customer VARCHAR(100) NOT NULL, customer VARCHAR(100) NOT NULL,
customer_name VARCHAR(150) NOT NULL, customer_name VARCHAR(150) NOT NULL,
status INT, status INT,
canceled TINYINT(1), canceled TINYINT(1),
summary_discount DECIMAL(10,2) DEFAULT 0.00,
summary_subtotal DECIMAL(10,2) DEFAULT 0.00,
PRIMARY KEY (order_id) PRIMARY KEY (order_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8; ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
`); `);
await localConnection.query(` await localConnection.query(`
CREATE TABLE IF NOT EXISTS temp_order_discounts ( CREATE TEMPORARY TABLE temp_order_discounts (
order_id INT UNSIGNED NOT NULL, order_id INT UNSIGNED NOT NULL,
pid INT UNSIGNED NOT NULL, pid INT UNSIGNED NOT NULL,
discount DECIMAL(10,2) NOT NULL, discount DECIMAL(10,2) NOT NULL,
PRIMARY KEY (order_id, pid) PRIMARY KEY (order_id, pid)
) ENGINE=InnoDB DEFAULT CHARSET=utf8; ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
`); `);
await localConnection.query(` await localConnection.query(`
CREATE TABLE IF NOT EXISTS temp_order_taxes ( CREATE TEMPORARY TABLE temp_order_taxes (
order_id INT UNSIGNED NOT NULL, order_id INT UNSIGNED NOT NULL,
pid INT UNSIGNED NOT NULL, pid INT UNSIGNED NOT NULL,
tax DECIMAL(10,2) NOT NULL, tax DECIMAL(10,2) NOT NULL,
PRIMARY KEY (order_id, pid) PRIMARY KEY (order_id, pid)
) ENGINE=InnoDB DEFAULT CHARSET=utf8; ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
`); `);
await localConnection.query(` await localConnection.query(`
CREATE TABLE IF NOT EXISTS temp_order_costs ( CREATE TEMPORARY TABLE temp_order_costs (
order_id INT UNSIGNED NOT NULL, order_id INT UNSIGNED NOT NULL,
pid INT UNSIGNED NOT NULL, pid INT UNSIGNED NOT NULL,
costeach DECIMAL(10,3) DEFAULT 0.000, costeach DECIMAL(10,3) DEFAULT 0.000,
@@ -81,6 +96,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
SELECT COLUMN_NAME SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = 'orders' WHERE TABLE_NAME = 'orders'
AND COLUMN_NAME != 'updated' -- Exclude the updated column
ORDER BY ORDINAL_POSITION ORDER BY ORDINAL_POSITION
`); `);
const columnNames = columns.map(col => col.COLUMN_NAME); const columnNames = columns.map(col => col.COLUMN_NAME);
@@ -212,7 +228,9 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
o.order_cid as customer, o.order_cid as customer,
CONCAT(COALESCE(u.firstname, ''), ' ', COALESCE(u.lastname, '')) as customer_name, CONCAT(COALESCE(u.firstname, ''), ' ', COALESCE(u.lastname, '')) as customer_name,
o.order_status as status, o.order_status as status,
CASE WHEN o.date_cancelled != '0000-00-00 00:00:00' THEN 1 ELSE 0 END as canceled CASE WHEN o.date_cancelled != '0000-00-00 00:00:00' THEN 1 ELSE 0 END as canceled,
o.summary_discount,
o.summary_subtotal
FROM _order o FROM _order o
LEFT JOIN users u ON o.order_cid = u.cid LEFT JOIN users u ON o.order_cid = u.cid
WHERE o.order_id IN (?) WHERE o.order_id IN (?)
@@ -226,19 +244,37 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
console.log('Found duplicates:', duplicates); console.log('Found duplicates:', duplicates);
} }
const placeholders = orders.map(() => "(?, ?, ?, ?, ?, ?)").join(","); const placeholders = orders.map(() => "(?, ?, ?, ?, ?, ?, ?, ?)").join(",");
const values = orders.flatMap(order => [ const values = orders.flatMap(order => [
order.order_id, order.date, order.customer, order.customer_name, order.status, order.canceled order.order_id,
order.date,
order.customer,
order.customer_name,
order.status,
order.canceled,
order.summary_discount,
order.summary_subtotal
]); ]);
await localConnection.query(` await localConnection.query(`
INSERT INTO temp_order_meta VALUES ${placeholders} INSERT INTO temp_order_meta (
order_id,
date,
customer,
customer_name,
status,
canceled,
summary_discount,
summary_subtotal
) VALUES ${placeholders}
ON DUPLICATE KEY UPDATE ON DUPLICATE KEY UPDATE
date = VALUES(date), date = VALUES(date),
customer = VALUES(customer), customer = VALUES(customer),
customer_name = VALUES(customer_name), customer_name = VALUES(customer_name),
status = VALUES(status), status = VALUES(status),
canceled = VALUES(canceled) canceled = VALUES(canceled),
summary_discount = VALUES(summary_discount),
summary_subtotal = VALUES(summary_subtotal)
`, values); `, values);
processedCount = i + orders.length; processedCount = i + orders.length;
@@ -317,14 +353,25 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
for (let i = 0; i < orderIds.length; i += 5000) { for (let i = 0; i < orderIds.length; i += 5000) {
const batchIds = orderIds.slice(i, i + 5000); const batchIds = orderIds.slice(i, i + 5000);
const [costs] = await prodConnection.query(` const [costs] = await prodConnection.query(`
SELECT orderid as order_id, pid, costeach SELECT
FROM order_costs oc.orderid as order_id,
WHERE orderid IN (?) oc.pid,
COALESCE(
oc.costeach,
(SELECT pi.costeach
FROM product_inventory pi
WHERE pi.pid = oc.pid
AND pi.daterec <= o.date_placed
ORDER BY pi.daterec DESC LIMIT 1)
) as costeach
FROM order_costs oc
JOIN _order o ON oc.orderid = o.order_id
WHERE oc.orderid IN (?)
`, [batchIds]); `, [batchIds]);
if (costs.length > 0) { if (costs.length > 0) {
const placeholders = costs.map(() => '(?, ?, ?)').join(","); const placeholders = costs.map(() => '(?, ?, ?)').join(",");
const values = costs.flatMap(c => [c.order_id, c.pid, c.costeach]); const values = costs.flatMap(c => [c.order_id, c.pid, c.costeach || 0]);
await localConnection.query(` await localConnection.query(`
INSERT INTO temp_order_costs (order_id, pid, costeach) INSERT INTO temp_order_costs (order_id, pid, costeach)
VALUES ${placeholders} VALUES ${placeholders}
@@ -355,7 +402,13 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
om.date, om.date,
oi.price, oi.price,
oi.quantity, oi.quantity,
oi.base_discount + COALESCE(od.discount, 0) as discount, oi.base_discount + COALESCE(od.discount, 0) +
CASE
WHEN om.summary_discount > 0 THEN
ROUND((om.summary_discount * (oi.price * oi.quantity)) /
NULLIF(om.summary_subtotal, 0), 2)
ELSE 0
END as discount,
COALESCE(ot.tax, 0) as tax, COALESCE(ot.tax, 0) as tax,
0 as tax_included, 0 as tax_included,
0 as shipping, 0 as shipping,
@@ -455,7 +508,13 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
om.date, om.date,
oi.price, oi.price,
oi.quantity, oi.quantity,
oi.base_discount + COALESCE(od.discount, 0) as discount, oi.base_discount + COALESCE(od.discount, 0) +
CASE
WHEN o.summary_discount > 0 THEN
ROUND((o.summary_discount * (oi.price * oi.quantity)) /
NULLIF(o.summary_subtotal, 0), 2)
ELSE 0
END as discount,
COALESCE(ot.tax, 0) as tax, COALESCE(ot.tax, 0) as tax,
0 as tax_included, 0 as tax_included,
0 as shipping, 0 as shipping,
@@ -466,6 +525,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
COALESCE(tc.costeach, 0) as costeach COALESCE(tc.costeach, 0) as costeach
FROM temp_order_items oi FROM temp_order_items oi
JOIN temp_order_meta om ON oi.order_id = om.order_id JOIN temp_order_meta om ON oi.order_id = om.order_id
LEFT JOIN _order o ON oi.order_id = o.order_id
LEFT JOIN temp_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid LEFT JOIN temp_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid
LEFT JOIN temp_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid LEFT JOIN temp_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid
LEFT JOIN temp_order_costs tc ON oi.order_id = tc.order_id AND oi.pid = tc.pid LEFT JOIN temp_order_costs tc ON oi.order_id = tc.order_id AND oi.pid = tc.pid

View File

@@ -339,6 +339,7 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
SELECT COLUMN_NAME SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = 'products' WHERE TABLE_NAME = 'products'
AND COLUMN_NAME != 'updated' -- Exclude the updated column
ORDER BY ORDINAL_POSITION ORDER BY ORDINAL_POSITION
`); `);
const columnNames = columns.map(col => col.COLUMN_NAME); const columnNames = columns.map(col => col.COLUMN_NAME);
@@ -470,43 +471,104 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
// Process category relationships // Process category relationships
if (batch.some(p => p.category_ids)) { if (batch.some(p => p.category_ids)) {
const categoryRelationships = batch // First get all valid categories
.filter(p => p.category_ids) const allCategoryIds = [...new Set(
.flatMap(product => batch
product.category_ids .filter(p => p.category_ids)
.split(',') .flatMap(product =>
.map(id => id.trim()) product.category_ids
.filter(id => id) .split(',')
.map(Number) .map(id => id.trim())
.filter(id => !isNaN(id)) .filter(id => id)
.map(catId => [catId, product.pid]) .map(Number)
); .filter(id => !isNaN(id))
)
)];
if (categoryRelationships.length > 0) { // Verify categories exist and get their hierarchy
// Verify categories exist before inserting relationships const [categories] = await localConnection.query(`
const uniqueCatIds = [...new Set(categoryRelationships.map(([catId]) => catId))]; WITH RECURSIVE category_hierarchy AS (
const [existingCats] = await localConnection.query( SELECT
"SELECT cat_id FROM categories WHERE cat_id IN (?)", cat_id,
[uniqueCatIds] parent_id,
); type,
const existingCatIds = new Set(existingCats.map(c => c.cat_id)); 1 as level,
CAST(cat_id AS CHAR(200)) as path
FROM categories
WHERE cat_id IN (?)
UNION ALL
SELECT
c.cat_id,
c.parent_id,
c.type,
ch.level + 1,
CONCAT(ch.path, ',', c.cat_id)
FROM categories c
JOIN category_hierarchy ch ON c.parent_id = ch.cat_id
WHERE ch.level < 10 -- Prevent infinite recursion
)
SELECT
h.cat_id,
h.parent_id,
h.type,
h.path,
h.level
FROM (
SELECT DISTINCT cat_id, parent_id, type, path, level
FROM category_hierarchy
WHERE cat_id IN (?)
) h
ORDER BY h.level DESC
`, [allCategoryIds, allCategoryIds]);
// Filter relationships to only include existing categories const validCategories = new Map(categories.map(c => [c.cat_id, c]));
const validRelationships = categoryRelationships.filter(([catId]) => const validCategoryIds = new Set(categories.map(c => c.cat_id));
existingCatIds.has(catId)
);
if (validRelationships.length > 0) { // Build category relationships ensuring proper hierarchy
const catPlaceholders = validRelationships const categoryRelationships = [];
.map(() => "(?, ?)") batch
.join(","); .filter(p => p.category_ids)
await localConnection.query( .forEach(product => {
`INSERT IGNORE INTO product_categories (cat_id, pid) const productCategories = product.category_ids
VALUES ${catPlaceholders}`, .split(',')
validRelationships.flat() .map(id => id.trim())
); .filter(id => id)
.map(Number)
.filter(id => !isNaN(id))
.filter(id => validCategoryIds.has(id))
.map(id => validCategories.get(id))
.sort((a, b) => a.type - b.type); // Sort by type to ensure proper hierarchy
// Only add relationships that maintain proper hierarchy
productCategories.forEach(category => {
if (category.path.split(',').every(parentId =>
validCategoryIds.has(Number(parentId))
)) {
categoryRelationships.push([category.cat_id, product.pid]);
}
});
});
if (categoryRelationships.length > 0) {
// First remove any existing relationships that will be replaced
await localConnection.query(`
DELETE FROM product_categories
WHERE pid IN (?) AND cat_id IN (?)
`, [
[...new Set(categoryRelationships.map(([_, pid]) => pid))],
[...new Set(categoryRelationships.map(([catId, _]) => catId))]
]);
// Then insert the new relationships
const placeholders = categoryRelationships
.map(() => "(?, ?)")
.join(",");
await localConnection.query(`
INSERT INTO product_categories (cat_id, pid)
VALUES ${placeholders}
`, categoryRelationships.flat());
} }
}
} }
} }
@@ -554,6 +616,7 @@ async function importMissingProducts(prodConnection, localConnection, missingPid
SELECT COLUMN_NAME SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = 'products' WHERE TABLE_NAME = 'products'
AND COLUMN_NAME != 'updated' -- Exclude the updated column
ORDER BY ORDINAL_POSITION ORDER BY ORDINAL_POSITION
`); `);
const columnNames = columns.map((col) => col.COLUMN_NAME); const columnNames = columns.map((col) => col.COLUMN_NAME);

View File

@@ -33,16 +33,15 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
status: "running", status: "running",
}); });
// Get column names for the insert // Get column names first
const [columns] = await localConnection.query(` const [columns] = await localConnection.query(`
SELECT COLUMN_NAME SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = 'purchase_orders' WHERE TABLE_NAME = 'purchase_orders'
AND COLUMN_NAME != 'updated' -- Exclude the updated column
ORDER BY ORDINAL_POSITION ORDER BY ORDINAL_POSITION
`); `);
const columnNames = columns const columnNames = columns.map(col => col.COLUMN_NAME);
.map((col) => col.COLUMN_NAME)
.filter((name) => name !== "id");
// Build incremental conditions // Build incremental conditions
const incrementalWhereClause = incrementalUpdate const incrementalWhereClause = incrementalUpdate
@@ -321,41 +320,47 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
let lastFulfillmentReceiving = null; let lastFulfillmentReceiving = null;
for (const receiving of allReceivings) { for (const receiving of allReceivings) {
const qtyToApply = Math.min(remainingToFulfill, receiving.qty_each); // Convert quantities to base units using supplier data
if (qtyToApply > 0) { const baseQtyReceived = receiving.qty_each * (
// If this is the first receiving being applied, use its cost receiving.type === 'original' ? 1 :
if (actualCost === null) { Math.max(1, product.supplier_qty_per_unit || 1)
actualCost = receiving.cost_each; );
firstFulfillmentReceiving = receiving; const qtyToApply = Math.min(remainingToFulfill, baseQtyReceived);
if (qtyToApply > 0) {
// If this is the first receiving being applied, use its cost
if (actualCost === null && receiving.cost_each > 0) {
actualCost = receiving.cost_each;
firstFulfillmentReceiving = receiving;
}
lastFulfillmentReceiving = receiving;
fulfillmentTracking.push({
receiving_id: receiving.receiving_id,
qty_applied: qtyToApply,
qty_total: baseQtyReceived,
cost: receiving.cost_each || actualCost || product.cost_each,
date: receiving.received_date,
received_by: receiving.received_by,
received_by_name: receiving.received_by_name || 'Unknown',
type: receiving.type,
remaining_qty: baseQtyReceived - qtyToApply
});
remainingToFulfill -= qtyToApply;
} else {
// Track excess receivings
fulfillmentTracking.push({
receiving_id: receiving.receiving_id,
qty_applied: 0,
qty_total: baseQtyReceived,
cost: receiving.cost_each || actualCost || product.cost_each,
date: receiving.received_date,
received_by: receiving.received_by,
received_by_name: receiving.received_by_name || 'Unknown',
type: receiving.type,
is_excess: true
});
} }
lastFulfillmentReceiving = receiving; totalReceived += baseQtyReceived;
fulfillmentTracking.push({
receiving_id: receiving.receiving_id,
qty_applied: qtyToApply,
qty_total: receiving.qty_each,
cost: receiving.cost_each,
date: receiving.received_date,
received_by: receiving.received_by,
received_by_name: receiving.received_by_name || 'Unknown',
type: receiving.type,
remaining_qty: receiving.qty_each - qtyToApply
});
remainingToFulfill -= qtyToApply;
} else {
// Track excess receivings
fulfillmentTracking.push({
receiving_id: receiving.receiving_id,
qty_applied: 0,
qty_total: receiving.qty_each,
cost: receiving.cost_each,
date: receiving.received_date,
received_by: receiving.received_by,
received_by_name: receiving.received_by_name || 'Unknown',
type: receiving.type,
is_excess: true
});
}
totalReceived += receiving.qty_each;
} }
const receiving_status = !totalReceived ? 1 : // created const receiving_status = !totalReceived ? 1 : // created

View File

@@ -1,82 +0,0 @@
// Split into inserts and updates
const insertsAndUpdates = batch.reduce((acc, po) => {
const key = `${po.po_id}-${po.pid}`;
if (existingPOMap.has(key)) {
const existing = existingPOMap.get(key);
// Check if any values are different
const hasChanges = columnNames.some(col => {
const newVal = po[col] ?? null;
const oldVal = existing[col] ?? null;
// Special handling for numbers to avoid type coercion issues
if (typeof newVal === 'number' && typeof oldVal === 'number') {
return Math.abs(newVal - oldVal) > 0.00001; // Allow for tiny floating point differences
}
// Special handling for receiving_history JSON
if (col === 'receiving_history') {
return JSON.stringify(newVal) !== JSON.stringify(oldVal);
}
return newVal !== oldVal;
});
if (hasChanges) {
console.log(`PO line changed: ${key}`, {
po_id: po.po_id,
pid: po.pid,
changes: columnNames.filter(col => {
const newVal = po[col] ?? null;
const oldVal = existing[col] ?? null;
if (typeof newVal === 'number' && typeof oldVal === 'number') {
return Math.abs(newVal - oldVal) > 0.00001;
}
if (col === 'receiving_history') {
return JSON.stringify(newVal) !== JSON.stringify(oldVal);
}
return newVal !== oldVal;
})
});
acc.updates.push({
po_id: po.po_id,
pid: po.pid,
values: columnNames.map(col => po[col] ?? null)
});
}
} else {
console.log(`New PO line: ${key}`);
acc.inserts.push({
po_id: po.po_id,
pid: po.pid,
values: columnNames.map(col => po[col] ?? null)
});
}
return acc;
}, { inserts: [], updates: [] });
// Handle inserts
if (insertsAndUpdates.inserts.length > 0) {
const insertPlaceholders = Array(insertsAndUpdates.inserts.length).fill(placeholderGroup).join(",");
const insertResult = await localConnection.query(`
INSERT INTO purchase_orders (${columnNames.join(",")})
VALUES ${insertPlaceholders}
`, insertsAndUpdates.inserts.map(i => i.values).flat());
recordsAdded += insertResult[0].affectedRows;
}
// Handle updates
if (insertsAndUpdates.updates.length > 0) {
const updatePlaceholders = Array(insertsAndUpdates.updates.length).fill(placeholderGroup).join(",");
const updateResult = await localConnection.query(`
INSERT INTO purchase_orders (${columnNames.join(",")})
VALUES ${updatePlaceholders}
ON DUPLICATE KEY UPDATE
${columnNames
.filter(col => col !== "po_id" && col !== "pid")
.map(col => `${col} = VALUES(${col})`)
.join(",")};
`, insertsAndUpdates.updates.map(u => u.values).flat());
// Each update affects 2 rows in affectedRows, so we divide by 2 to get actual count
recordsUpdated += insertsAndUpdates.updates.length;
}

View File

@@ -1,217 +1,286 @@
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress'); const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress');
const { getConnection } = require('./utils/db'); const { getConnection } = require('./utils/db');
async function calculateBrandMetrics(startTime, totalProducts, processedCount, isCancelled = false) { async function calculateBrandMetrics(startTime, totalProducts, processedCount = 0, isCancelled = false) {
const connection = await getConnection(); const connection = await getConnection();
let success = false;
const BATCH_SIZE = 5000;
try { try {
// Get last calculation timestamp
const [lastCalc] = await connection.query(`
SELECT last_calculation_timestamp
FROM calculate_status
WHERE module_name = 'brand_metrics'
`);
const lastCalculationTime = lastCalc[0]?.last_calculation_timestamp || '1970-01-01';
// Get total count of brands needing updates
const [brandCount] = await connection.query(`
SELECT COUNT(DISTINCT p.brand) as count
FROM products p
LEFT JOIN orders o ON p.pid = o.pid AND o.updated > ?
WHERE p.brand IS NOT NULL
AND (
p.updated > ?
OR o.id IS NOT NULL
)
`, [lastCalculationTime, lastCalculationTime]);
const totalBrands = brandCount[0].count;
if (totalBrands === 0) {
console.log('No brands need metric updates');
return {
processedProducts: 0,
processedOrders: 0,
processedPurchaseOrders: 0,
success: true
};
}
if (isCancelled) { if (isCancelled) {
outputProgress({ outputProgress({
status: 'cancelled', status: 'cancelled',
operation: 'Brand metrics calculation cancelled', operation: 'Brand metrics calculation cancelled',
current: processedCount, current: processedCount,
total: totalProducts, total: totalBrands,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: null, remaining: null,
rate: calculateRate(startTime, processedCount), rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1) percentage: ((processedCount / totalBrands) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
}); });
return processedCount; return {
processedProducts: processedCount,
processedOrders: 0,
processedPurchaseOrders: 0,
success
};
} }
outputProgress({ outputProgress({
status: 'running', status: 'running',
operation: 'Starting brand metrics calculation', operation: 'Starting brand metrics calculation',
current: processedCount, current: processedCount,
total: totalProducts, total: totalBrands,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts), remaining: estimateRemaining(startTime, processedCount, totalBrands),
rate: calculateRate(startTime, processedCount), rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1) percentage: ((processedCount / totalBrands) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
}); });
// Calculate brand metrics with optimized queries // Process in batches
await connection.query(` let lastBrand = '';
INSERT INTO brand_metrics ( while (true) {
brand, if (isCancelled) break;
product_count,
active_products, const [batch] = await connection.query(`
total_stock_units, SELECT DISTINCT p.brand
total_stock_cost,
total_stock_retail,
total_revenue,
avg_margin,
growth_rate
)
WITH filtered_products AS (
SELECT
p.*,
CASE WHEN p.stock_quantity <= 5000 THEN p.pid END as valid_pid,
CASE WHEN p.visible = true AND p.stock_quantity <= 5000 THEN p.pid END as active_pid,
CASE
WHEN p.stock_quantity IS NULL OR p.stock_quantity < 0 OR p.stock_quantity > 5000 THEN 0
ELSE p.stock_quantity
END as valid_stock
FROM products p FROM products p
FORCE INDEX (idx_brand)
LEFT JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid AND o.updated > ?
WHERE p.brand IS NOT NULL WHERE p.brand IS NOT NULL
), AND p.brand > ?
sales_periods AS ( AND (
p.updated > ?
OR o.id IS NOT NULL
)
ORDER BY p.brand
LIMIT ?
`, [lastCalculationTime, lastBrand, lastCalculationTime, BATCH_SIZE]);
if (batch.length === 0) break;
// Create temporary tables for better performance
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_product_stats');
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_sales_stats');
await connection.query(`
CREATE TEMPORARY TABLE temp_product_stats (
brand VARCHAR(100) NOT NULL,
product_count INT,
active_products INT,
total_stock_units INT,
total_stock_cost DECIMAL(15,2),
total_stock_retail DECIMAL(15,2),
total_revenue DECIMAL(15,2),
avg_margin DECIMAL(5,2),
PRIMARY KEY (brand),
INDEX (total_revenue),
INDEX (product_count)
) ENGINE=MEMORY
`);
await connection.query(`
CREATE TEMPORARY TABLE temp_sales_stats (
brand VARCHAR(100) NOT NULL,
current_period_sales DECIMAL(15,2),
previous_period_sales DECIMAL(15,2),
PRIMARY KEY (brand),
INDEX (current_period_sales),
INDEX (previous_period_sales)
) ENGINE=MEMORY
`);
// Populate product stats with optimized index usage
await connection.query(`
INSERT INTO temp_product_stats
SELECT SELECT
p.brand, p.brand,
SUM(o.quantity * o.price) as period_revenue, COUNT(DISTINCT p.pid) as product_count,
CASE COUNT(DISTINCT CASE WHEN p.visible = true THEN p.pid END) as active_products,
WHEN o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 3 MONTH) THEN 'current' COALESCE(SUM(p.stock_quantity), 0) as total_stock_units,
WHEN o.date BETWEEN DATE_SUB(CURRENT_DATE, INTERVAL 15 MONTH) AND DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH) THEN 'previous' COALESCE(SUM(p.stock_quantity * p.cost_price), 0) as total_stock_cost,
END as period_type COALESCE(SUM(p.stock_quantity * p.price), 0) as total_stock_retail,
FROM filtered_products p COALESCE(SUM(pm.total_revenue), 0) as total_revenue,
JOIN orders o ON p.pid = o.pid COALESCE(AVG(NULLIF(pm.avg_margin_percent, 0)), 0) as avg_margin
WHERE o.canceled = false FROM products p
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 15 MONTH) FORCE INDEX (idx_brand)
GROUP BY p.brand, period_type LEFT JOIN product_metrics pm FORCE INDEX (PRIMARY) ON p.pid = pm.pid
), WHERE p.brand IN (?)
brand_data AS ( AND (
SELECT p.updated > ?
p.brand, OR EXISTS (
COUNT(DISTINCT p.valid_pid) as product_count, SELECT 1 FROM orders o FORCE INDEX (idx_orders_metrics)
COUNT(DISTINCT p.active_pid) as active_products, WHERE o.pid = p.pid
SUM(p.valid_stock) as total_stock_units, AND o.updated > ?
SUM(p.valid_stock * p.cost_price) as total_stock_cost, )
SUM(p.valid_stock * p.price) as total_stock_retail,
COALESCE(SUM(o.quantity * o.price), 0) as total_revenue,
CASE
WHEN SUM(o.quantity * o.price) > 0 THEN
(SUM((o.price - p.cost_price) * o.quantity) * 100.0) / SUM(o.price * o.quantity)
ELSE 0
END as avg_margin
FROM filtered_products p
LEFT JOIN orders o ON p.pid = o.pid AND o.canceled = false
GROUP BY p.brand
)
SELECT
bd.brand,
bd.product_count,
bd.active_products,
bd.total_stock_units,
bd.total_stock_cost,
bd.total_stock_retail,
bd.total_revenue,
bd.avg_margin,
CASE
WHEN MAX(CASE WHEN sp.period_type = 'previous' THEN sp.period_revenue END) = 0
AND MAX(CASE WHEN sp.period_type = 'current' THEN sp.period_revenue END) > 0 THEN 100.0
WHEN MAX(CASE WHEN sp.period_type = 'previous' THEN sp.period_revenue END) = 0 THEN 0.0
ELSE LEAST(
GREATEST(
((MAX(CASE WHEN sp.period_type = 'current' THEN sp.period_revenue END) -
MAX(CASE WHEN sp.period_type = 'previous' THEN sp.period_revenue END)) /
NULLIF(MAX(CASE WHEN sp.period_type = 'previous' THEN sp.period_revenue END), 0)) * 100.0,
-100.0
),
999.99
) )
END as growth_rate GROUP BY p.brand
FROM brand_data bd `, [batch.map(row => row.brand), lastCalculationTime, lastCalculationTime]);
LEFT JOIN sales_periods sp ON bd.brand = sp.brand
GROUP BY bd.brand, bd.product_count, bd.active_products, bd.total_stock_units,
bd.total_stock_cost, bd.total_stock_retail, bd.total_revenue, bd.avg_margin
ON DUPLICATE KEY UPDATE
product_count = VALUES(product_count),
active_products = VALUES(active_products),
total_stock_units = VALUES(total_stock_units),
total_stock_cost = VALUES(total_stock_cost),
total_stock_retail = VALUES(total_stock_retail),
total_revenue = VALUES(total_revenue),
avg_margin = VALUES(avg_margin),
growth_rate = VALUES(growth_rate),
last_calculated_at = CURRENT_TIMESTAMP
`);
processedCount = Math.floor(totalProducts * 0.97); // Populate sales stats with optimized date handling
outputProgress({ await connection.query(`
status: 'running', INSERT INTO temp_sales_stats
operation: 'Brand metrics calculated, starting time-based metrics', WITH date_ranges AS (
current: processedCount, SELECT
total: totalProducts, DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY) as current_start,
elapsed: formatElapsedTime(startTime), CURRENT_DATE as current_end,
remaining: estimateRemaining(startTime, processedCount, totalProducts), DATE_SUB(CURRENT_DATE, INTERVAL 60 DAY) as previous_start,
rate: calculateRate(startTime, processedCount), DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY) as previous_end
percentage: ((processedCount / totalProducts) * 100).toFixed(1) )
});
if (isCancelled) return processedCount;
// Calculate brand time-based metrics with optimized query
await connection.query(`
INSERT INTO brand_time_metrics (
brand,
year,
month,
product_count,
active_products,
total_stock_units,
total_stock_cost,
total_stock_retail,
total_revenue,
avg_margin
)
WITH filtered_products AS (
SELECT
p.*,
CASE WHEN p.stock_quantity <= 5000 THEN p.pid END as valid_pid,
CASE WHEN p.visible = true AND p.stock_quantity <= 5000 THEN p.pid END as active_pid,
CASE
WHEN p.stock_quantity IS NULL OR p.stock_quantity < 0 OR p.stock_quantity > 5000 THEN 0
ELSE p.stock_quantity
END as valid_stock
FROM products p
WHERE p.brand IS NOT NULL
),
monthly_metrics AS (
SELECT SELECT
p.brand, p.brand,
YEAR(o.date) as year, COALESCE(SUM(
MONTH(o.date) as month, CASE WHEN o.date >= dr.current_start
COUNT(DISTINCT p.valid_pid) as product_count, THEN o.quantity * o.price
COUNT(DISTINCT p.active_pid) as active_products,
SUM(p.valid_stock) as total_stock_units,
SUM(p.valid_stock * p.cost_price) as total_stock_cost,
SUM(p.valid_stock * p.price) as total_stock_retail,
SUM(o.quantity * o.price) as total_revenue,
CASE
WHEN SUM(o.quantity * o.price) > 0 THEN
(SUM((o.price - p.cost_price) * o.quantity) * 100.0) / SUM(o.price * o.quantity)
ELSE 0 ELSE 0
END as avg_margin END
FROM filtered_products p ), 0) as current_period_sales,
LEFT JOIN orders o ON p.pid = o.pid AND o.canceled = false COALESCE(SUM(
WHERE o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH) CASE WHEN o.date >= dr.previous_start AND o.date < dr.current_start
GROUP BY p.brand, YEAR(o.date), MONTH(o.date) THEN o.quantity * o.price
) ELSE 0
SELECT * END
FROM monthly_metrics ), 0) as previous_period_sales
ON DUPLICATE KEY UPDATE FROM products p
product_count = VALUES(product_count), FORCE INDEX (idx_brand)
active_products = VALUES(active_products), INNER JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid
total_stock_units = VALUES(total_stock_units), CROSS JOIN date_ranges dr
total_stock_cost = VALUES(total_stock_cost), WHERE p.brand IN (?)
total_stock_retail = VALUES(total_stock_retail), AND o.canceled = false
total_revenue = VALUES(total_revenue), AND o.date >= dr.previous_start
avg_margin = VALUES(avg_margin) AND o.updated > ?
GROUP BY p.brand
`, [batch.map(row => row.brand), lastCalculationTime]);
// Update metrics using temp tables with optimized calculations
await connection.query(`
INSERT INTO brand_metrics (
brand,
product_count,
active_products,
total_stock_units,
total_stock_cost,
total_stock_retail,
total_revenue,
avg_margin,
growth_rate,
last_calculated_at
)
SELECT
ps.brand,
ps.product_count,
ps.active_products,
ps.total_stock_units,
ps.total_stock_cost,
ps.total_stock_retail,
ps.total_revenue,
ps.avg_margin,
CASE
WHEN COALESCE(ss.previous_period_sales, 0) = 0 AND COALESCE(ss.current_period_sales, 0) > 0 THEN 100
WHEN COALESCE(ss.previous_period_sales, 0) = 0 THEN 0
ELSE ROUND(LEAST(999.99, GREATEST(-100,
((ss.current_period_sales / NULLIF(ss.previous_period_sales, 0)) - 1) * 100
)), 2)
END as growth_rate,
NOW() as last_calculated_at
FROM temp_product_stats ps
LEFT JOIN temp_sales_stats ss ON ps.brand = ss.brand
ON DUPLICATE KEY UPDATE
product_count = VALUES(product_count),
active_products = VALUES(active_products),
total_stock_units = VALUES(total_stock_units),
total_stock_cost = VALUES(total_stock_cost),
total_stock_retail = VALUES(total_stock_retail),
total_revenue = VALUES(total_revenue),
avg_margin = VALUES(avg_margin),
growth_rate = VALUES(growth_rate),
last_calculated_at = NOW()
`);
// Clean up temp tables
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_product_stats');
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_sales_stats');
lastBrand = batch[batch.length - 1].brand;
processedCount += batch.length;
outputProgress({
status: 'running',
operation: 'Processing brand metrics batch',
current: processedCount,
total: totalBrands,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalBrands),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalBrands) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
}
// If we get here, everything completed successfully
success = true;
// Update calculate_status
await connection.query(`
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
VALUES ('brand_metrics', NOW())
ON DUPLICATE KEY UPDATE last_calculation_timestamp = NOW()
`); `);
processedCount = Math.floor(totalProducts * 0.99); return {
outputProgress({ processedProducts: processedCount,
status: 'running', processedOrders: 0,
operation: 'Brand time-based metrics calculated', processedPurchaseOrders: 0,
current: processedCount, success
total: totalProducts, };
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
return processedCount;
} catch (error) { } catch (error) {
success = false;
logError(error, 'Error calculating brand metrics'); logError(error, 'Error calculating brand metrics');
throw error; throw error;
} finally { } finally {

View File

@@ -1,232 +1,290 @@
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress'); const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress');
const { getConnection } = require('./utils/db'); const { getConnection } = require('./utils/db');
async function calculateCategoryMetrics(startTime, totalProducts, processedCount, isCancelled = false) { async function calculateCategoryMetrics(startTime, totalProducts, processedCount = 0, isCancelled = false) {
const connection = await getConnection(); const connection = await getConnection();
let success = false;
const BATCH_SIZE = 5000;
try { try {
// Get last calculation timestamp
const [lastCalc] = await connection.query(`
SELECT last_calculation_timestamp
FROM calculate_status
WHERE module_name = 'category_metrics'
`);
const lastCalculationTime = lastCalc[0]?.last_calculation_timestamp || '1970-01-01';
// Get total count of categories needing updates
const [categoryCount] = await connection.query(`
SELECT COUNT(DISTINCT c.cat_id) as count
FROM categories c
JOIN product_categories pc ON c.cat_id = pc.cat_id
LEFT JOIN products p ON pc.pid = p.pid AND p.updated > ?
LEFT JOIN orders o ON p.pid = o.pid AND o.updated > ?
WHERE c.status = 'active'
AND (
p.pid IS NOT NULL
OR o.id IS NOT NULL
)
`, [lastCalculationTime, lastCalculationTime]);
const totalCategories = categoryCount[0].count;
if (totalCategories === 0) {
console.log('No categories need metric updates');
return {
processedProducts: 0,
processedOrders: 0,
processedPurchaseOrders: 0,
success: true
};
}
if (isCancelled) { if (isCancelled) {
outputProgress({ outputProgress({
status: 'cancelled', status: 'cancelled',
operation: 'Category metrics calculation cancelled', operation: 'Category metrics calculation cancelled',
current: processedCount, current: processedCount,
total: totalProducts, total: totalCategories,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: null, remaining: null,
rate: calculateRate(startTime, processedCount), rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1) percentage: ((processedCount / totalCategories) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
}); });
return processedCount; return {
processedProducts: processedCount,
processedOrders: 0,
processedPurchaseOrders: 0,
success
};
} }
outputProgress({ outputProgress({
status: 'running', status: 'running',
operation: 'Starting category metrics calculation', operation: 'Starting category metrics calculation',
current: processedCount, current: processedCount,
total: totalProducts, total: totalCategories,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts), remaining: estimateRemaining(startTime, processedCount, totalCategories),
rate: calculateRate(startTime, processedCount), rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1) percentage: ((processedCount / totalCategories) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
}); });
// First, calculate base category metrics // Process in batches
await connection.query(` let lastCatId = 0;
INSERT INTO category_metrics ( while (true) {
category_id, if (isCancelled) break;
product_count,
active_products,
total_value,
status,
last_calculated_at
)
SELECT
c.cat_id,
COUNT(DISTINCT p.pid) as product_count,
COUNT(DISTINCT CASE WHEN p.visible = true THEN p.pid END) as active_products,
COALESCE(SUM(p.stock_quantity * p.cost_price), 0) as total_value,
c.status,
NOW() as last_calculated_at
FROM categories c
LEFT JOIN product_categories pc ON c.cat_id = pc.cat_id
LEFT JOIN products p ON pc.pid = p.pid
GROUP BY c.cat_id, c.status
ON DUPLICATE KEY UPDATE
product_count = VALUES(product_count),
active_products = VALUES(active_products),
total_value = VALUES(total_value),
status = VALUES(status),
last_calculated_at = VALUES(last_calculated_at)
`);
processedCount = Math.floor(totalProducts * 0.90); const [batch] = await connection.query(`
outputProgress({ SELECT DISTINCT c.cat_id
status: 'running', FROM categories c
operation: 'Base category metrics calculated, updating with margin data', FORCE INDEX (PRIMARY)
current: processedCount, JOIN product_categories pc FORCE INDEX (idx_category) ON c.cat_id = pc.cat_id
total: totalProducts, LEFT JOIN products p FORCE INDEX (PRIMARY) ON pc.pid = p.pid AND p.updated > ?
elapsed: formatElapsedTime(startTime), LEFT JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid AND o.updated > ?
remaining: estimateRemaining(startTime, processedCount, totalProducts), WHERE c.status = 'active'
rate: calculateRate(startTime, processedCount), AND c.cat_id > ?
percentage: ((processedCount / totalProducts) * 100).toFixed(1) AND (
}); p.pid IS NOT NULL
OR o.id IS NOT NULL
)
ORDER BY c.cat_id
LIMIT ?
`, [lastCalculationTime, lastCalculationTime, lastCatId, BATCH_SIZE]);
if (isCancelled) return processedCount; if (batch.length === 0) break;
// Then update with margin and turnover data // Create temporary tables for better performance
await connection.query(` await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_product_stats');
WITH category_sales AS ( await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_sales_stats');
await connection.query(`
CREATE TEMPORARY TABLE temp_product_stats (
cat_id BIGINT NOT NULL,
product_count INT,
active_products INT,
total_value DECIMAL(15,2),
avg_margin DECIMAL(5,2),
turnover_rate DECIMAL(10,2),
PRIMARY KEY (cat_id),
INDEX (product_count),
INDEX (total_value)
) ENGINE=MEMORY
`);
await connection.query(`
CREATE TEMPORARY TABLE temp_sales_stats (
cat_id BIGINT NOT NULL,
recent_revenue DECIMAL(15,2),
previous_revenue DECIMAL(15,2),
PRIMARY KEY (cat_id),
INDEX (recent_revenue),
INDEX (previous_revenue)
) ENGINE=MEMORY
`);
// Populate product stats with optimized index usage
await connection.query(`
INSERT INTO temp_product_stats
SELECT SELECT
pc.cat_id, c.cat_id,
SUM(o.quantity * o.price) as total_sales, COUNT(DISTINCT p.pid) as product_count,
SUM(o.quantity * (o.price - p.cost_price)) as total_margin, COUNT(DISTINCT CASE WHEN p.visible = true THEN p.pid END) as active_products,
SUM(o.quantity) as units_sold, COALESCE(SUM(p.stock_quantity * p.cost_price), 0) as total_value,
AVG(GREATEST(p.stock_quantity, 0)) as avg_stock COALESCE(AVG(NULLIF(pm.avg_margin_percent, 0)), 0) as avg_margin,
FROM product_categories pc COALESCE(AVG(NULLIF(pm.turnover_rate, 0)), 0) as turnover_rate
JOIN products p ON pc.pid = p.pid FROM categories c
JOIN orders o ON p.pid = o.pid FORCE INDEX (PRIMARY)
WHERE o.canceled = false INNER JOIN product_categories pc FORCE INDEX (idx_category) ON c.cat_id = pc.cat_id
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 1 YEAR) LEFT JOIN products p FORCE INDEX (PRIMARY) ON pc.pid = p.pid
GROUP BY pc.cat_id LEFT JOIN product_metrics pm FORCE INDEX (PRIMARY) ON p.pid = pm.pid
) WHERE c.cat_id IN (?)
UPDATE category_metrics cm AND (
JOIN category_sales cs ON cm.category_id = cs.cat_id p.updated > ?
SET OR EXISTS (
cm.avg_margin = COALESCE(cs.total_margin * 100.0 / NULLIF(cs.total_sales, 0), 0), SELECT 1 FROM orders o FORCE INDEX (idx_orders_metrics)
cm.turnover_rate = LEAST(COALESCE(cs.units_sold / NULLIF(cs.avg_stock, 0), 0), 999.99), WHERE o.pid = p.pid
cm.last_calculated_at = NOW() AND o.updated > ?
`); )
processedCount = Math.floor(totalProducts * 0.95);
outputProgress({
status: 'running',
operation: 'Margin data updated, calculating growth rates',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
if (isCancelled) return processedCount;
// Finally update growth rates
await connection.query(`
WITH current_period AS (
SELECT
pc.cat_id,
SUM(o.quantity * o.price) as revenue
FROM product_categories pc
JOIN products p ON pc.pid = p.pid
JOIN orders o ON p.pid = o.pid
WHERE o.canceled = false
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 3 MONTH)
GROUP BY pc.cat_id
),
previous_period AS (
SELECT
pc.cat_id,
SUM(o.quantity * o.price) as revenue
FROM product_categories pc
JOIN products p ON pc.pid = p.pid
JOIN orders o ON p.pid = o.pid
WHERE o.canceled = false
AND o.date BETWEEN DATE_SUB(CURRENT_DATE, INTERVAL 15 MONTH)
AND DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH)
GROUP BY pc.cat_id
)
UPDATE category_metrics cm
LEFT JOIN current_period cp ON cm.category_id = cp.cat_id
LEFT JOIN previous_period pp ON cm.category_id = pp.cat_id
SET
cm.growth_rate = CASE
WHEN pp.revenue = 0 AND COALESCE(cp.revenue, 0) > 0 THEN 100.0
WHEN pp.revenue = 0 THEN 0.0
ELSE LEAST(
GREATEST(
((COALESCE(cp.revenue, 0) - pp.revenue) / pp.revenue) * 100.0,
-100.0
),
999.99
) )
END, GROUP BY c.cat_id
cm.last_calculated_at = NOW() `, [batch.map(row => row.cat_id), lastCalculationTime, lastCalculationTime]);
WHERE cp.cat_id IS NOT NULL OR pp.cat_id IS NOT NULL
`);
processedCount = Math.floor(totalProducts * 0.97); // Populate sales stats with optimized date handling
outputProgress({ await connection.query(`
status: 'running', INSERT INTO temp_sales_stats
operation: 'Growth rates calculated, updating time-based metrics', WITH date_ranges AS (
current: processedCount, SELECT
total: totalProducts, DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY) as current_start,
elapsed: formatElapsedTime(startTime), CURRENT_DATE as current_end,
remaining: estimateRemaining(startTime, processedCount, totalProducts), DATE_SUB(CURRENT_DATE, INTERVAL 60 DAY) as previous_start,
rate: calculateRate(startTime, processedCount), DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY) as previous_end
percentage: ((processedCount / totalProducts) * 100).toFixed(1) )
}); SELECT
c.cat_id,
COALESCE(SUM(
CASE WHEN o.date >= dr.current_start
THEN o.quantity * o.price
ELSE 0
END
), 0) as recent_revenue,
COALESCE(SUM(
CASE WHEN o.date >= dr.previous_start AND o.date < dr.current_start
THEN o.quantity * o.price
ELSE 0
END
), 0) as previous_revenue
FROM categories c
FORCE INDEX (PRIMARY)
INNER JOIN product_categories pc FORCE INDEX (idx_category) ON c.cat_id = pc.cat_id
INNER JOIN products p FORCE INDEX (PRIMARY) ON pc.pid = p.pid
INNER JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid
CROSS JOIN date_ranges dr
WHERE c.cat_id IN (?)
AND o.canceled = false
AND o.date >= dr.previous_start
AND o.updated > ?
GROUP BY c.cat_id
`, [batch.map(row => row.cat_id), lastCalculationTime]);
if (isCancelled) return processedCount; // Update metrics using temp tables with optimized calculations
await connection.query(`
INSERT INTO category_metrics (
category_id,
product_count,
active_products,
total_value,
avg_margin,
turnover_rate,
growth_rate,
status,
last_calculated_at
)
SELECT
c.cat_id,
ps.product_count,
ps.active_products,
ps.total_value,
ps.avg_margin,
ps.turnover_rate,
CASE
WHEN COALESCE(ss.previous_revenue, 0) = 0 AND COALESCE(ss.recent_revenue, 0) > 0 THEN 100
WHEN COALESCE(ss.previous_revenue, 0) = 0 THEN 0
ELSE ROUND(LEAST(999.99, GREATEST(-100,
((ss.recent_revenue / NULLIF(ss.previous_revenue, 0)) - 1) * 100
)), 2)
END as growth_rate,
c.status,
NOW() as last_calculated_at
FROM categories c
FORCE INDEX (PRIMARY)
LEFT JOIN temp_product_stats ps ON c.cat_id = ps.cat_id
LEFT JOIN temp_sales_stats ss ON c.cat_id = ss.cat_id
WHERE c.cat_id IN (?)
ON DUPLICATE KEY UPDATE
product_count = VALUES(product_count),
active_products = VALUES(active_products),
total_value = VALUES(total_value),
avg_margin = VALUES(avg_margin),
turnover_rate = VALUES(turnover_rate),
growth_rate = VALUES(growth_rate),
status = VALUES(status),
last_calculated_at = NOW()
`, [batch.map(row => row.cat_id)]);
// Calculate time-based metrics // Clean up temp tables
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_product_stats');
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_sales_stats');
lastCatId = batch[batch.length - 1].cat_id;
processedCount += batch.length;
outputProgress({
status: 'running',
operation: 'Processing category metrics batch',
current: processedCount,
total: totalCategories,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalCategories),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalCategories) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
}
// If we get here, everything completed successfully
success = true;
// Update calculate_status
await connection.query(` await connection.query(`
INSERT INTO category_time_metrics ( INSERT INTO calculate_status (module_name, last_calculation_timestamp)
category_id, VALUES ('category_metrics', NOW())
year, ON DUPLICATE KEY UPDATE last_calculation_timestamp = NOW()
month,
product_count,
active_products,
total_value,
total_revenue,
avg_margin,
turnover_rate
)
SELECT
pc.cat_id,
YEAR(o.date) as year,
MONTH(o.date) as month,
COUNT(DISTINCT p.pid) as product_count,
COUNT(DISTINCT CASE WHEN p.visible = true THEN p.pid END) as active_products,
SUM(p.stock_quantity * p.cost_price) as total_value,
SUM(o.quantity * o.price) as total_revenue,
COALESCE(
SUM(o.quantity * (o.price - p.cost_price)) * 100.0 /
NULLIF(SUM(o.quantity * o.price), 0),
0
) as avg_margin,
COALESCE(
SUM(o.quantity) / NULLIF(AVG(GREATEST(p.stock_quantity, 0)), 0),
0
) as turnover_rate
FROM product_categories pc
JOIN products p ON pc.pid = p.pid
JOIN orders o ON p.pid = o.pid
WHERE o.canceled = false
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH)
GROUP BY pc.cat_id, YEAR(o.date), MONTH(o.date)
ON DUPLICATE KEY UPDATE
product_count = VALUES(product_count),
active_products = VALUES(active_products),
total_value = VALUES(total_value),
total_revenue = VALUES(total_revenue),
avg_margin = VALUES(avg_margin),
turnover_rate = VALUES(turnover_rate)
`); `);
processedCount = Math.floor(totalProducts * 0.99); return {
outputProgress({ processedProducts: processedCount,
status: 'running', processedOrders: 0,
operation: 'Time-based metrics calculated', processedPurchaseOrders: 0,
current: processedCount, success
total: totalProducts, };
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
return processedCount;
} catch (error) { } catch (error) {
success = false;
logError(error, 'Error calculating category metrics'); logError(error, 'Error calculating category metrics');
throw error; throw error;
} finally { } finally {

View File

@@ -1,9 +1,42 @@
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress'); const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress');
const { getConnection } = require('./utils/db'); const { getConnection } = require('./utils/db');
async function calculateFinancialMetrics(startTime, totalProducts, processedCount, isCancelled = false) { async function calculateFinancialMetrics(startTime, totalProducts, processedCount = 0, isCancelled = false) {
const connection = await getConnection(); const connection = await getConnection();
let success = false;
const BATCH_SIZE = 5000;
try { try {
// Get last calculation timestamp
const [lastCalc] = await connection.query(`
SELECT last_calculation_timestamp
FROM calculate_status
WHERE module_name = 'financial_metrics'
`);
const lastCalculationTime = lastCalc[0]?.last_calculation_timestamp || '1970-01-01';
// Get total count of products needing updates
if (!totalProducts) {
const [productCount] = await connection.query(`
SELECT COUNT(DISTINCT p.pid) as count
FROM products p
LEFT JOIN orders o ON p.pid = o.pid AND o.updated > ?
WHERE p.updated > ?
OR o.pid IS NOT NULL
`, [lastCalculationTime, lastCalculationTime]);
totalProducts = productCount[0].count;
}
if (totalProducts === 0) {
console.log('No products need financial metric updates');
return {
processedProducts: 0,
processedOrders: 0,
processedPurchaseOrders: 0,
success: true
};
}
if (isCancelled) { if (isCancelled) {
outputProgress({ outputProgress({
status: 'cancelled', status: 'cancelled',
@@ -13,9 +46,19 @@ async function calculateFinancialMetrics(startTime, totalProducts, processedCoun
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: null, remaining: null,
rate: calculateRate(startTime, processedCount), rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1) percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
}); });
return processedCount; return {
processedProducts: processedCount,
processedOrders: 0,
processedPurchaseOrders: 0,
success
};
} }
outputProgress({ outputProgress({
@@ -26,100 +69,108 @@ async function calculateFinancialMetrics(startTime, totalProducts, processedCoun
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts), remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount), rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1) percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
}); });
// Calculate financial metrics with optimized query // Process in batches
await connection.query(` let lastPid = 0;
WITH product_financials AS ( while (true) {
SELECT if (isCancelled) break;
p.pid,
p.cost_price * p.stock_quantity as inventory_value, const [batch] = await connection.query(`
SUM(o.quantity * o.price) as total_revenue, SELECT DISTINCT p.pid
SUM(o.quantity * p.cost_price) as cost_of_goods_sold,
SUM(o.quantity * (o.price - p.cost_price)) as gross_profit,
MIN(o.date) as first_sale_date,
MAX(o.date) as last_sale_date,
DATEDIFF(MAX(o.date), MIN(o.date)) + 1 as calculation_period_days,
COUNT(DISTINCT DATE(o.date)) as active_days
FROM products p FROM products p
LEFT JOIN orders o ON p.pid = o.pid LEFT JOIN orders o ON p.pid = o.pid
WHERE o.canceled = false WHERE p.pid > ?
AND DATE(o.date) >= DATE_SUB(CURDATE(), INTERVAL 12 MONTH) AND (
GROUP BY p.pid p.updated > ?
) OR EXISTS (
UPDATE product_metrics pm SELECT 1 FROM orders o2
JOIN product_financials pf ON pm.pid = pf.pid WHERE o2.pid = p.pid
SET AND o2.updated > ?
pm.inventory_value = COALESCE(pf.inventory_value, 0), )
pm.total_revenue = COALESCE(pf.total_revenue, 0), )
pm.cost_of_goods_sold = COALESCE(pf.cost_of_goods_sold, 0), ORDER BY p.pid
pm.gross_profit = COALESCE(pf.gross_profit, 0), LIMIT ?
pm.gmroi = CASE `, [lastPid, lastCalculationTime, lastCalculationTime, BATCH_SIZE]);
WHEN COALESCE(pf.inventory_value, 0) > 0 AND pf.active_days > 0 THEN
(COALESCE(pf.gross_profit, 0) * (365.0 / pf.active_days)) / COALESCE(pf.inventory_value, 0)
ELSE 0
END
`);
processedCount = Math.floor(totalProducts * 0.65); if (batch.length === 0) break;
outputProgress({
status: 'running',
operation: 'Base financial metrics calculated, updating time aggregates',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
if (isCancelled) return processedCount; // Update financial metrics for this batch
await connection.query(`
UPDATE product_metrics pm
JOIN (
SELECT
p.pid,
p.cost_price * p.stock_quantity as inventory_value,
SUM(o.quantity * o.price) as total_revenue,
SUM(o.quantity * p.cost_price) as cost_of_goods_sold,
SUM(o.quantity * (o.price - p.cost_price)) as gross_profit,
COUNT(DISTINCT DATE(o.date)) as active_days
FROM products p
LEFT JOIN orders o ON p.pid = o.pid
AND o.canceled = false
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 90 DAY)
WHERE p.pid IN (?)
GROUP BY p.pid
) fin ON pm.pid = fin.pid
SET
pm.inventory_value = COALESCE(fin.inventory_value, 0),
pm.total_revenue = COALESCE(fin.total_revenue, 0),
pm.cost_of_goods_sold = COALESCE(fin.cost_of_goods_sold, 0),
pm.gross_profit = COALESCE(fin.gross_profit, 0),
pm.gmroi = CASE
WHEN COALESCE(fin.inventory_value, 0) > 0 AND fin.active_days > 0
THEN (COALESCE(fin.gross_profit, 0) * (365.0 / fin.active_days)) / COALESCE(fin.inventory_value, 0)
ELSE 0
END,
pm.last_calculated_at = NOW()
`, [batch.map(row => row.pid)]);
// Update time-based aggregates with optimized query lastPid = batch[batch.length - 1].pid;
processedCount += batch.length;
outputProgress({
status: 'running',
operation: 'Processing financial metrics batch',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
}
// If we get here, everything completed successfully
success = true;
// Update calculate_status
await connection.query(` await connection.query(`
WITH monthly_financials AS ( INSERT INTO calculate_status (module_name, last_calculation_timestamp)
SELECT VALUES ('financial_metrics', NOW())
p.pid, ON DUPLICATE KEY UPDATE last_calculation_timestamp = NOW()
YEAR(o.date) as year,
MONTH(o.date) as month,
p.cost_price * p.stock_quantity as inventory_value,
SUM(o.quantity * (o.price - p.cost_price)) as gross_profit,
COUNT(DISTINCT DATE(o.date)) as active_days,
MIN(o.date) as period_start,
MAX(o.date) as period_end
FROM products p
LEFT JOIN orders o ON p.pid = o.pid
WHERE o.canceled = false
GROUP BY p.pid, YEAR(o.date), MONTH(o.date)
)
UPDATE product_time_aggregates pta
JOIN monthly_financials mf ON pta.pid = mf.pid
AND pta.year = mf.year
AND pta.month = mf.month
SET
pta.inventory_value = COALESCE(mf.inventory_value, 0),
pta.gmroi = CASE
WHEN COALESCE(mf.inventory_value, 0) > 0 AND mf.active_days > 0 THEN
(COALESCE(mf.gross_profit, 0) * (365.0 / mf.active_days)) / COALESCE(mf.inventory_value, 0)
ELSE 0
END
`); `);
processedCount = Math.floor(totalProducts * 0.70); return {
outputProgress({ processedProducts: processedCount,
status: 'running', processedOrders: 0,
operation: 'Time-based aggregates updated', processedPurchaseOrders: 0,
current: processedCount, success
total: totalProducts, };
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
return processedCount;
} catch (error) { } catch (error) {
success = false;
logError(error, 'Error calculating financial metrics'); logError(error, 'Error calculating financial metrics');
throw error; throw error;
} finally { } finally {

View File

@@ -11,7 +11,43 @@ function sanitizeValue(value) {
async function calculateProductMetrics(startTime, totalProducts, processedCount = 0, isCancelled = false) { async function calculateProductMetrics(startTime, totalProducts, processedCount = 0, isCancelled = false) {
const connection = await getConnection(); const connection = await getConnection();
let success = false;
let processedOrders = 0;
const BATCH_SIZE = 5000;
try { try {
// Get last calculation timestamp
const [lastCalc] = await connection.query(`
SELECT last_calculation_timestamp
FROM calculate_status
WHERE module_name = 'product_metrics'
`);
const lastCalculationTime = lastCalc[0]?.last_calculation_timestamp || '1970-01-01';
// Get total product count if not provided
if (!totalProducts) {
const [productCount] = await connection.query(`
SELECT COUNT(DISTINCT p.pid) as count
FROM products p
LEFT JOIN orders o ON p.pid = o.pid AND o.updated > ?
LEFT JOIN purchase_orders po ON p.pid = po.pid AND po.updated > ?
WHERE p.updated > ?
OR o.pid IS NOT NULL
OR po.pid IS NOT NULL
`, [lastCalculationTime, lastCalculationTime, lastCalculationTime]);
totalProducts = productCount[0].count;
}
if (totalProducts === 0) {
console.log('No products need updating');
return {
processedProducts: 0,
processedOrders: 0,
processedPurchaseOrders: 0,
success: true
};
}
// Skip flags are inherited from the parent scope // Skip flags are inherited from the parent scope
const SKIP_PRODUCT_BASE_METRICS = 0; const SKIP_PRODUCT_BASE_METRICS = 0;
const SKIP_PRODUCT_TIME_AGGREGATES = 0; const SKIP_PRODUCT_TIME_AGGREGATES = 0;
@@ -25,11 +61,37 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: null, remaining: null,
rate: calculateRate(startTime, processedCount), rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1) percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
}); });
return processedCount; return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders: 0,
success
};
} }
// First ensure all products have a metrics record
await connection.query(`
INSERT IGNORE INTO product_metrics (pid, last_calculated_at)
SELECT pid, NOW()
FROM products
`);
// Get threshold settings once
const [thresholds] = await connection.query(`
SELECT critical_days, reorder_days, overstock_days, low_stock_threshold
FROM stock_thresholds
WHERE category_id IS NULL AND vendor IS NULL
LIMIT 1
`);
const defaultThresholds = thresholds[0];
// Calculate base product metrics // Calculate base product metrics
if (!SKIP_PRODUCT_BASE_METRICS) { if (!SKIP_PRODUCT_BASE_METRICS) {
outputProgress({ outputProgress({
@@ -40,89 +102,276 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts), remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount), rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1) percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
}); });
// Calculate base metrics // Get order count that will be processed
const [orderCount] = await connection.query(`
SELECT COUNT(*) as count
FROM orders o
WHERE o.canceled = false
`);
processedOrders = orderCount[0].count;
// Clear temporary tables
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_sales_metrics');
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_purchase_metrics');
// Create optimized temporary tables with indexes
await connection.query(` await connection.query(`
UPDATE product_metrics pm CREATE TEMPORARY TABLE temp_sales_metrics (
JOIN ( pid BIGINT NOT NULL,
SELECT daily_sales_avg DECIMAL(10,3),
p.pid, weekly_sales_avg DECIMAL(10,3),
p.cost_price * p.stock_quantity as inventory_value, monthly_sales_avg DECIMAL(10,3),
SUM(o.quantity) as total_quantity, total_revenue DECIMAL(10,2),
COUNT(DISTINCT o.order_number) as number_of_orders, avg_margin_percent DECIMAL(5,2),
SUM(o.quantity * o.price) as total_revenue, first_sale_date DATE,
SUM(o.quantity * p.cost_price) as cost_of_goods_sold, last_sale_date DATE,
AVG(o.price) as avg_price, PRIMARY KEY (pid),
STDDEV(o.price) as price_std, INDEX (daily_sales_avg),
MIN(o.date) as first_sale_date, INDEX (total_revenue)
MAX(o.date) as last_sale_date, ) ENGINE=MEMORY
COUNT(DISTINCT DATE(o.date)) as active_days
FROM products p
LEFT JOIN orders o ON p.pid = o.pid AND o.canceled = false
GROUP BY p.pid
) stats ON pm.pid = stats.pid
SET
pm.inventory_value = COALESCE(stats.inventory_value, 0),
pm.avg_quantity_per_order = COALESCE(stats.total_quantity / NULLIF(stats.number_of_orders, 0), 0),
pm.number_of_orders = COALESCE(stats.number_of_orders, 0),
pm.total_revenue = COALESCE(stats.total_revenue, 0),
pm.cost_of_goods_sold = COALESCE(stats.cost_of_goods_sold, 0),
pm.gross_profit = COALESCE(stats.total_revenue - stats.cost_of_goods_sold, 0),
pm.avg_margin_percent = CASE
WHEN COALESCE(stats.total_revenue, 0) > 0
THEN ((stats.total_revenue - stats.cost_of_goods_sold) / stats.total_revenue) * 100
ELSE 0
END,
pm.first_sale_date = stats.first_sale_date,
pm.last_sale_date = stats.last_sale_date,
pm.gmroi = CASE
WHEN COALESCE(stats.inventory_value, 0) > 0
THEN (stats.total_revenue - stats.cost_of_goods_sold) / stats.inventory_value
ELSE 0
END,
pm.last_calculated_at = NOW()
`); `);
processedCount = Math.floor(totalProducts * 0.4); await connection.query(`
outputProgress({ CREATE TEMPORARY TABLE temp_purchase_metrics (
status: 'running', pid BIGINT NOT NULL,
operation: 'Base product metrics calculated', avg_lead_time_days DECIMAL(5,1),
current: processedCount, last_purchase_date DATE,
total: totalProducts, first_received_date DATE,
elapsed: formatElapsedTime(startTime), last_received_date DATE,
remaining: estimateRemaining(startTime, processedCount, totalProducts), PRIMARY KEY (pid),
rate: calculateRate(startTime, processedCount), INDEX (avg_lead_time_days)
percentage: ((processedCount / totalProducts) * 100).toFixed(1) ) ENGINE=MEMORY
}); `);
} else {
processedCount = Math.floor(totalProducts * 0.4);
outputProgress({
status: 'running',
operation: 'Skipping base product metrics calculation',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
}
if (isCancelled) return processedCount; // Populate temp_sales_metrics with base stats and sales averages using FORCE INDEX
await connection.query(`
INSERT INTO temp_sales_metrics
SELECT
p.pid,
COALESCE(SUM(o.quantity) / NULLIF(COUNT(DISTINCT DATE(o.date)), 0), 0) as daily_sales_avg,
COALESCE(SUM(o.quantity) / NULLIF(CEIL(COUNT(DISTINCT DATE(o.date)) / 7), 0), 0) as weekly_sales_avg,
COALESCE(SUM(o.quantity) / NULLIF(CEIL(COUNT(DISTINCT DATE(o.date)) / 30), 0), 0) as monthly_sales_avg,
COALESCE(SUM(o.quantity * o.price), 0) as total_revenue,
CASE
WHEN SUM(o.quantity * o.price) > 0
THEN ((SUM(o.quantity * o.price) - SUM(o.quantity * p.cost_price)) / SUM(o.quantity * o.price)) * 100
ELSE 0
END as avg_margin_percent,
MIN(o.date) as first_sale_date,
MAX(o.date) as last_sale_date
FROM products p
FORCE INDEX (PRIMARY)
LEFT JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid
AND o.canceled = false
AND o.date >= DATE_SUB(CURDATE(), INTERVAL 90 DAY)
WHERE p.updated > ?
OR EXISTS (
SELECT 1 FROM orders o2 FORCE INDEX (idx_orders_metrics)
WHERE o2.pid = p.pid
AND o2.canceled = false
AND o2.updated > ?
)
GROUP BY p.pid
`, [lastCalculationTime, lastCalculationTime]);
// Populate temp_purchase_metrics with optimized index usage
await connection.query(`
INSERT INTO temp_purchase_metrics
SELECT
p.pid,
AVG(DATEDIFF(po.received_date, po.date)) as avg_lead_time_days,
MAX(po.date) as last_purchase_date,
MIN(po.received_date) as first_received_date,
MAX(po.received_date) as last_received_date
FROM products p
FORCE INDEX (PRIMARY)
LEFT JOIN purchase_orders po FORCE INDEX (idx_po_metrics) ON p.pid = po.pid
AND po.received_date IS NOT NULL
AND po.date >= DATE_SUB(CURDATE(), INTERVAL 365 DAY)
WHERE p.updated > ?
OR EXISTS (
SELECT 1 FROM purchase_orders po2 FORCE INDEX (idx_po_metrics)
WHERE po2.pid = p.pid
AND po2.updated > ?
)
GROUP BY p.pid
`, [lastCalculationTime, lastCalculationTime]);
// Process updates in batches, but only for affected products
let lastPid = 0;
while (true) {
if (isCancelled) break;
const [batch] = await connection.query(`
SELECT DISTINCT p.pid
FROM products p
LEFT JOIN orders o ON p.pid = o.pid AND o.updated > ?
LEFT JOIN purchase_orders po ON p.pid = po.pid AND po.updated > ?
WHERE p.pid > ?
AND (
p.updated > ?
OR o.pid IS NOT NULL
OR po.pid IS NOT NULL
)
ORDER BY p.pid
LIMIT ?
`, [lastCalculationTime, lastCalculationTime, lastPid, lastCalculationTime, BATCH_SIZE]);
if (batch.length === 0) break;
await connection.query(`
UPDATE product_metrics pm
JOIN products p ON pm.pid = p.pid
LEFT JOIN temp_sales_metrics sm ON pm.pid = sm.pid
LEFT JOIN temp_purchase_metrics lm ON pm.pid = lm.pid
SET
pm.inventory_value = p.stock_quantity * p.cost_price,
pm.daily_sales_avg = COALESCE(sm.daily_sales_avg, 0),
pm.weekly_sales_avg = COALESCE(sm.weekly_sales_avg, 0),
pm.monthly_sales_avg = COALESCE(sm.monthly_sales_avg, 0),
pm.total_revenue = COALESCE(sm.total_revenue, 0),
pm.avg_margin_percent = COALESCE(sm.avg_margin_percent, 0),
pm.first_sale_date = sm.first_sale_date,
pm.last_sale_date = sm.last_sale_date,
pm.avg_lead_time_days = COALESCE(lm.avg_lead_time_days, 30),
pm.days_of_inventory = CASE
WHEN COALESCE(sm.daily_sales_avg, 0) > 0
THEN FLOOR(p.stock_quantity / sm.daily_sales_avg)
ELSE NULL
END,
pm.weeks_of_inventory = CASE
WHEN COALESCE(sm.weekly_sales_avg, 0) > 0
THEN FLOOR(p.stock_quantity / sm.weekly_sales_avg)
ELSE NULL
END,
pm.stock_status = CASE
WHEN p.stock_quantity <= 0 THEN 'Out of Stock'
WHEN COALESCE(sm.daily_sales_avg, 0) = 0 AND p.stock_quantity <= ? THEN 'Low Stock'
WHEN COALESCE(sm.daily_sales_avg, 0) = 0 THEN 'In Stock'
WHEN p.stock_quantity / NULLIF(sm.daily_sales_avg, 0) <= ? THEN 'Critical'
WHEN p.stock_quantity / NULLIF(sm.daily_sales_avg, 0) <= ? THEN 'Reorder'
WHEN p.stock_quantity / NULLIF(sm.daily_sales_avg, 0) > ? THEN 'Overstocked'
ELSE 'Healthy'
END,
pm.reorder_qty = CASE
WHEN COALESCE(sm.daily_sales_avg, 0) > 0 THEN
GREATEST(
CEIL(sm.daily_sales_avg * COALESCE(lm.avg_lead_time_days, 30) * 1.96),
?
)
ELSE ?
END,
pm.overstocked_amt = CASE
WHEN p.stock_quantity / NULLIF(sm.daily_sales_avg, 0) > ?
THEN GREATEST(0, p.stock_quantity - CEIL(sm.daily_sales_avg * ?))
ELSE 0
END,
pm.last_calculated_at = NOW()
WHERE p.pid IN (?)
`, [
defaultThresholds.low_stock_threshold,
defaultThresholds.critical_days,
defaultThresholds.reorder_days,
defaultThresholds.overstock_days,
defaultThresholds.low_stock_threshold,
defaultThresholds.low_stock_threshold,
defaultThresholds.overstock_days,
defaultThresholds.overstock_days,
batch.map(row => row.pid)
]);
lastPid = batch[batch.length - 1].pid;
processedCount += batch.length;
outputProgress({
status: 'running',
operation: 'Processing base metrics batch',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
}
// Calculate forecast accuracy and bias in batches
lastPid = 0;
while (true) {
if (isCancelled) break;
const [batch] = await connection.query(
'SELECT pid FROM products WHERE pid > ? ORDER BY pid LIMIT ?',
[lastPid, BATCH_SIZE]
);
if (batch.length === 0) break;
await connection.query(`
UPDATE product_metrics pm
JOIN (
SELECT
sf.pid,
AVG(CASE
WHEN o.quantity > 0
THEN ABS(sf.forecast_units - o.quantity) / o.quantity * 100
ELSE 100
END) as avg_forecast_error,
AVG(CASE
WHEN o.quantity > 0
THEN (sf.forecast_units - o.quantity) / o.quantity * 100
ELSE 0
END) as avg_forecast_bias,
MAX(sf.forecast_date) as last_forecast_date
FROM sales_forecasts sf
JOIN orders o ON sf.pid = o.pid
AND DATE(o.date) = sf.forecast_date
WHERE o.canceled = false
AND sf.forecast_date >= DATE_SUB(CURRENT_DATE, INTERVAL 90 DAY)
AND sf.pid IN (?)
GROUP BY sf.pid
) fa ON pm.pid = fa.pid
SET
pm.forecast_accuracy = GREATEST(0, 100 - LEAST(fa.avg_forecast_error, 100)),
pm.forecast_bias = GREATEST(-100, LEAST(fa.avg_forecast_bias, 100)),
pm.last_forecast_date = fa.last_forecast_date,
pm.last_calculated_at = NOW()
WHERE pm.pid IN (?)
`, [batch.map(row => row.pid), batch.map(row => row.pid)]);
lastPid = batch[batch.length - 1].pid;
}
}
// Calculate product time aggregates // Calculate product time aggregates
if (!SKIP_PRODUCT_TIME_AGGREGATES) { if (!SKIP_PRODUCT_TIME_AGGREGATES) {
outputProgress({ outputProgress({
status: 'running', status: 'running',
operation: 'Starting product time aggregates calculation', operation: 'Starting product time aggregates calculation',
current: processedCount, current: processedCount || 0,
total: totalProducts, total: totalProducts || 0,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts), remaining: estimateRemaining(startTime, processedCount || 0, totalProducts || 0),
rate: calculateRate(startTime, processedCount), rate: calculateRate(startTime, processedCount || 0),
percentage: ((processedCount / totalProducts) * 100).toFixed(1) percentage: (((processedCount || 0) / (totalProducts || 1)) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
}); });
// Calculate time-based aggregates // Calculate time-based aggregates
@@ -179,29 +428,206 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
outputProgress({ outputProgress({
status: 'running', status: 'running',
operation: 'Product time aggregates calculated', operation: 'Product time aggregates calculated',
current: processedCount, current: processedCount || 0,
total: totalProducts, total: totalProducts || 0,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts), remaining: estimateRemaining(startTime, processedCount || 0, totalProducts || 0),
rate: calculateRate(startTime, processedCount), rate: calculateRate(startTime, processedCount || 0),
percentage: ((processedCount / totalProducts) * 100).toFixed(1) percentage: (((processedCount || 0) / (totalProducts || 1)) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
}); });
} else { } else {
processedCount = Math.floor(totalProducts * 0.6); processedCount = Math.floor(totalProducts * 0.6);
outputProgress({ outputProgress({
status: 'running', status: 'running',
operation: 'Skipping product time aggregates calculation', operation: 'Skipping product time aggregates calculation',
current: processedCount, current: processedCount || 0,
total: totalProducts, total: totalProducts || 0,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts), remaining: estimateRemaining(startTime, processedCount || 0, totalProducts || 0),
rate: calculateRate(startTime, processedCount), rate: calculateRate(startTime, processedCount || 0),
percentage: ((processedCount / totalProducts) * 100).toFixed(1) percentage: (((processedCount || 0) / (totalProducts || 1)) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
}); });
} }
return processedCount; // Calculate ABC classification
outputProgress({
status: 'running',
operation: 'Starting ABC classification',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
if (isCancelled) return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders: 0, // This module doesn't process POs
success
};
const [abcConfig] = await connection.query('SELECT a_threshold, b_threshold FROM abc_classification_config WHERE id = 1');
const abcThresholds = abcConfig[0] || { a_threshold: 20, b_threshold: 50 };
// First, create and populate the rankings table with an index
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_revenue_ranks');
await connection.query(`
CREATE TEMPORARY TABLE temp_revenue_ranks (
pid BIGINT NOT NULL,
total_revenue DECIMAL(10,3),
rank_num INT,
dense_rank_num INT,
percentile DECIMAL(5,2),
total_count INT,
PRIMARY KEY (pid),
INDEX (rank_num),
INDEX (dense_rank_num),
INDEX (percentile)
) ENGINE=MEMORY
`);
// Calculate rankings with proper tie handling
await connection.query(`
INSERT INTO temp_revenue_ranks
WITH revenue_data AS (
SELECT
pid,
total_revenue,
COUNT(*) OVER () as total_count,
PERCENT_RANK() OVER (ORDER BY total_revenue DESC) * 100 as percentile,
RANK() OVER (ORDER BY total_revenue DESC) as rank_num,
DENSE_RANK() OVER (ORDER BY total_revenue DESC) as dense_rank_num
FROM product_metrics
WHERE total_revenue > 0
)
SELECT
pid,
total_revenue,
rank_num,
dense_rank_num,
percentile,
total_count
FROM revenue_data
`);
// Get total count for percentage calculation
const [rankingCount] = await connection.query('SELECT MAX(rank_num) as total_count FROM temp_revenue_ranks');
const totalCount = rankingCount[0].total_count || 1;
const max_rank = totalCount;
// Process updates in batches
let abcProcessedCount = 0;
const batchSize = 5000;
while (true) {
if (isCancelled) return {
processedProducts: processedCount,
processedOrders,
processedPurchaseOrders: 0, // This module doesn't process POs
success
};
// Get a batch of PIDs that need updating
const [pids] = await connection.query(`
SELECT pm.pid
FROM product_metrics pm
LEFT JOIN temp_revenue_ranks tr ON pm.pid = tr.pid
WHERE pm.abc_class IS NULL
OR pm.abc_class !=
CASE
WHEN tr.pid IS NULL THEN 'C'
WHEN tr.percentile <= ? THEN 'A'
WHEN tr.percentile <= ? THEN 'B'
ELSE 'C'
END
LIMIT ?
`, [abcThresholds.a_threshold, abcThresholds.b_threshold, batchSize]);
if (pids.length === 0) break;
await connection.query(`
UPDATE product_metrics pm
LEFT JOIN temp_revenue_ranks tr ON pm.pid = tr.pid
SET pm.abc_class =
CASE
WHEN tr.pid IS NULL THEN 'C'
WHEN tr.percentile <= ? THEN 'A'
WHEN tr.percentile <= ? THEN 'B'
ELSE 'C'
END,
pm.last_calculated_at = NOW()
WHERE pm.pid IN (?)
`, [abcThresholds.a_threshold, abcThresholds.b_threshold, pids.map(row => row.pid)]);
// Now update turnover rate with proper handling of zero inventory periods
await connection.query(`
UPDATE product_metrics pm
JOIN (
SELECT
o.pid,
SUM(o.quantity) as total_sold,
COUNT(DISTINCT DATE(o.date)) as active_days,
AVG(CASE
WHEN p.stock_quantity > 0 THEN p.stock_quantity
ELSE NULL
END) as avg_nonzero_stock
FROM orders o
JOIN products p ON o.pid = p.pid
WHERE o.canceled = false
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 90 DAY)
AND o.pid IN (?)
GROUP BY o.pid
) sales ON pm.pid = sales.pid
SET
pm.turnover_rate = CASE
WHEN sales.avg_nonzero_stock > 0 AND sales.active_days > 0
THEN LEAST(
(sales.total_sold / sales.avg_nonzero_stock) * (365.0 / sales.active_days),
999.99
)
ELSE 0
END,
pm.last_calculated_at = NOW()
WHERE pm.pid IN (?)
`, [pids.map(row => row.pid), pids.map(row => row.pid)]);
}
// If we get here, everything completed successfully
success = true;
// Update calculate_status with current timestamp
await connection.query(`
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
VALUES ('product_metrics', NOW())
ON DUPLICATE KEY UPDATE last_calculation_timestamp = NOW()
`);
return {
processedProducts: processedCount || 0,
processedOrders: processedOrders || 0,
processedPurchaseOrders: 0, // This module doesn't process POs
success
};
} catch (error) { } catch (error) {
success = false;
logError(error, 'Error calculating product metrics'); logError(error, 'Error calculating product metrics');
throw error; throw error;
} finally { } finally {

View File

@@ -1,302 +1,298 @@
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress'); const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress');
const { getConnection } = require('./utils/db'); const { getConnection } = require('./utils/db');
async function calculateSalesForecasts(startTime, totalProducts, processedCount, isCancelled = false) { async function calculateSalesForecasts(startTime, totalProducts, processedCount = 0, isCancelled = false) {
const connection = await getConnection(); const connection = await getConnection();
let success = false;
const BATCH_SIZE = 5000;
try { try {
// Get last calculation timestamp
const [lastCalc] = await connection.query(`
SELECT last_calculation_timestamp
FROM calculate_status
WHERE module_name = 'sales_forecasts'
`);
const lastCalculationTime = lastCalc[0]?.last_calculation_timestamp || '1970-01-01';
// Get total count of products needing updates
const [productCount] = await connection.query(`
SELECT COUNT(DISTINCT p.pid) as count
FROM products p
LEFT JOIN orders o ON p.pid = o.pid AND o.updated > ?
WHERE p.visible = true
AND (
p.updated > ?
OR o.id IS NOT NULL
)
`, [lastCalculationTime, lastCalculationTime]);
const totalProductsToUpdate = productCount[0].count;
if (totalProductsToUpdate === 0) {
console.log('No products need forecast updates');
return {
processedProducts: 0,
processedOrders: 0,
processedPurchaseOrders: 0,
success: true
};
}
if (isCancelled) { if (isCancelled) {
outputProgress({ outputProgress({
status: 'cancelled', status: 'cancelled',
operation: 'Sales forecasts calculation cancelled', operation: 'Sales forecast calculation cancelled',
current: processedCount, current: processedCount,
total: totalProducts, total: totalProductsToUpdate,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: null, remaining: null,
rate: calculateRate(startTime, processedCount), rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1) percentage: ((processedCount / totalProductsToUpdate) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
}); });
return processedCount; return {
processedProducts: processedCount,
processedOrders: 0,
processedPurchaseOrders: 0,
success
};
} }
outputProgress({ outputProgress({
status: 'running', status: 'running',
operation: 'Starting sales forecasts calculation', operation: 'Starting sales forecast calculation',
current: processedCount, current: processedCount,
total: totalProducts, total: totalProductsToUpdate,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts), remaining: estimateRemaining(startTime, processedCount, totalProductsToUpdate),
rate: calculateRate(startTime, processedCount), rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1) percentage: ((processedCount / totalProductsToUpdate) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
}); });
// First, create a temporary table for forecast dates // Process in batches
await connection.query(` let lastPid = '';
CREATE TEMPORARY TABLE IF NOT EXISTS temp_forecast_dates ( while (true) {
forecast_date DATE, if (isCancelled) break;
day_of_week INT,
month INT,
PRIMARY KEY (forecast_date)
)
`);
await connection.query(` const [batch] = await connection.query(`
INSERT INTO temp_forecast_dates SELECT DISTINCT p.pid
SELECT FROM products p
DATE_ADD(CURRENT_DATE, INTERVAL n DAY) as forecast_date, FORCE INDEX (PRIMARY)
DAYOFWEEK(DATE_ADD(CURRENT_DATE, INTERVAL n DAY)) as day_of_week, LEFT JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid AND o.updated > ?
MONTH(DATE_ADD(CURRENT_DATE, INTERVAL n DAY)) as month WHERE p.visible = true
FROM ( AND p.pid > ?
SELECT a.N + b.N * 10 as n AND (
FROM p.updated > ?
(SELECT 0 as N UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4 UNION OR o.id IS NOT NULL
SELECT 5 UNION SELECT 6 UNION SELECT 7 UNION SELECT 8 UNION SELECT 9) a, )
(SELECT 0 as N UNION SELECT 1 UNION SELECT 2) b ORDER BY p.pid
ORDER BY n LIMIT ?
LIMIT 31 `, [lastCalculationTime, lastPid, lastCalculationTime, BATCH_SIZE]);
) numbers
`);
processedCount = Math.floor(totalProducts * 0.92); if (batch.length === 0) break;
outputProgress({
status: 'running',
operation: 'Forecast dates prepared, calculating daily sales stats',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
if (isCancelled) return processedCount; // Create temporary tables for better performance
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_historical_sales');
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_sales_stats');
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_recent_trend');
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_confidence_calc');
// Create temporary table for daily sales stats // Create optimized temporary tables with indexes
await connection.query(` await connection.query(`
CREATE TEMPORARY TABLE IF NOT EXISTS temp_daily_sales AS CREATE TEMPORARY TABLE temp_historical_sales (
SELECT pid BIGINT NOT NULL,
o.pid, sale_date DATE NOT NULL,
DAYOFWEEK(o.date) as day_of_week, daily_quantity INT,
SUM(o.quantity) as daily_quantity, daily_revenue DECIMAL(15,2),
SUM(o.price * o.quantity) as daily_revenue, PRIMARY KEY (pid, sale_date),
COUNT(DISTINCT DATE(o.date)) as day_count INDEX (sale_date)
FROM orders o ) ENGINE=MEMORY
WHERE o.canceled = false `);
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 90 DAY)
GROUP BY o.pid, DAYOFWEEK(o.date)
`);
processedCount = Math.floor(totalProducts * 0.94); await connection.query(`
outputProgress({ CREATE TEMPORARY TABLE temp_sales_stats (
status: 'running', pid BIGINT NOT NULL,
operation: 'Daily sales stats calculated, preparing product stats', avg_daily_units DECIMAL(10,2),
current: processedCount, avg_daily_revenue DECIMAL(15,2),
total: totalProducts, std_daily_units DECIMAL(10,2),
elapsed: formatElapsedTime(startTime), days_with_sales INT,
remaining: estimateRemaining(startTime, processedCount, totalProducts), first_sale DATE,
rate: calculateRate(startTime, processedCount), last_sale DATE,
percentage: ((processedCount / totalProducts) * 100).toFixed(1) PRIMARY KEY (pid),
}); INDEX (days_with_sales),
INDEX (last_sale)
) ENGINE=MEMORY
`);
if (isCancelled) return processedCount; await connection.query(`
CREATE TEMPORARY TABLE temp_recent_trend (
pid BIGINT NOT NULL,
recent_avg_units DECIMAL(10,2),
recent_avg_revenue DECIMAL(15,2),
PRIMARY KEY (pid)
) ENGINE=MEMORY
`);
// Create temporary table for product stats await connection.query(`
await connection.query(` CREATE TEMPORARY TABLE temp_confidence_calc (
CREATE TEMPORARY TABLE IF NOT EXISTS temp_product_stats AS pid BIGINT NOT NULL,
SELECT confidence_level TINYINT,
pid, PRIMARY KEY (pid)
AVG(daily_revenue) as overall_avg_revenue, ) ENGINE=MEMORY
SUM(day_count) as total_days `);
FROM temp_daily_sales
GROUP BY pid
`);
processedCount = Math.floor(totalProducts * 0.96); // Populate historical sales with optimized index usage
outputProgress({ await connection.query(`
status: 'running', INSERT INTO temp_historical_sales
operation: 'Product stats prepared, calculating product-level forecasts', SELECT
current: processedCount, o.pid,
total: totalProducts, DATE(o.date) as sale_date,
elapsed: formatElapsedTime(startTime), SUM(o.quantity) as daily_quantity,
remaining: estimateRemaining(startTime, processedCount, totalProducts), SUM(o.quantity * o.price) as daily_revenue
rate: calculateRate(startTime, processedCount), FROM orders o
percentage: ((processedCount / totalProducts) * 100).toFixed(1) FORCE INDEX (idx_orders_metrics)
}); WHERE o.canceled = false
AND o.pid IN (?)
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 180 DAY)
GROUP BY o.pid, DATE(o.date)
`, [batch.map(row => row.pid)]);
if (isCancelled) return processedCount; // Populate sales stats
await connection.query(`
INSERT INTO temp_sales_stats
SELECT
pid,
AVG(daily_quantity) as avg_daily_units,
AVG(daily_revenue) as avg_daily_revenue,
STDDEV(daily_quantity) as std_daily_units,
COUNT(*) as days_with_sales,
MIN(sale_date) as first_sale,
MAX(sale_date) as last_sale
FROM temp_historical_sales
GROUP BY pid
`);
// Calculate product-level forecasts // Populate recent trend
await connection.query(` await connection.query(`
INSERT INTO sales_forecasts ( INSERT INTO temp_recent_trend
pid, SELECT
forecast_date, h.pid,
forecast_units, AVG(h.daily_quantity) as recent_avg_units,
forecast_revenue, AVG(h.daily_revenue) as recent_avg_revenue
confidence_level, FROM temp_historical_sales h
last_calculated_at WHERE h.sale_date >= DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY)
) GROUP BY h.pid
SELECT `);
ds.pid,
fd.forecast_date, // Calculate confidence levels
GREATEST(0, await connection.query(`
AVG(ds.daily_quantity) * INSERT INTO temp_confidence_calc
(1 + COALESCE(sf.seasonality_factor, 0)) SELECT
) as forecast_units, s.pid,
GREATEST(0, LEAST(100, GREATEST(0, ROUND(
COALESCE( (s.days_with_sales / 180.0 * 50) + -- Up to 50 points for history length
(CASE
WHEN s.std_daily_units = 0 OR s.avg_daily_units = 0 THEN 0
WHEN (s.std_daily_units / s.avg_daily_units) <= 0.5 THEN 30
WHEN (s.std_daily_units / s.avg_daily_units) <= 1.0 THEN 20
WHEN (s.std_daily_units / s.avg_daily_units) <= 2.0 THEN 10
ELSE 0
END) + -- Up to 30 points for consistency
(CASE
WHEN DATEDIFF(CURRENT_DATE, s.last_sale) <= 7 THEN 20
WHEN DATEDIFF(CURRENT_DATE, s.last_sale) <= 30 THEN 10
ELSE 0
END) -- Up to 20 points for recency
))) as confidence_level
FROM temp_sales_stats s
`);
// Generate forecasts using temp tables
await connection.query(`
REPLACE INTO sales_forecasts
(pid, forecast_date, forecast_units, forecast_revenue, confidence_level, last_calculated_at)
SELECT
s.pid,
DATE_ADD(CURRENT_DATE, INTERVAL n.days DAY),
GREATEST(0, ROUND(
CASE CASE
WHEN SUM(ds.day_count) >= 4 THEN AVG(ds.daily_revenue) WHEN s.days_with_sales >= n.days THEN COALESCE(t.recent_avg_units, s.avg_daily_units)
ELSE ps.overall_avg_revenue ELSE s.avg_daily_units * (s.days_with_sales / n.days)
END * END
(1 + COALESCE(sf.seasonality_factor, 0)) * )),
(0.95 + (RAND() * 0.1)), GREATEST(0, ROUND(
0
)
) as forecast_revenue,
CASE
WHEN ps.total_days >= 60 THEN 90
WHEN ps.total_days >= 30 THEN 80
WHEN ps.total_days >= 14 THEN 70
ELSE 60
END as confidence_level,
NOW() as last_calculated_at
FROM temp_daily_sales ds
JOIN temp_product_stats ps ON ds.pid = ps.pid
CROSS JOIN temp_forecast_dates fd
LEFT JOIN sales_seasonality sf ON fd.month = sf.month
GROUP BY ds.pid, fd.forecast_date, ps.overall_avg_revenue, ps.total_days, sf.seasonality_factor
HAVING AVG(ds.daily_quantity) > 0
ON DUPLICATE KEY UPDATE
forecast_units = VALUES(forecast_units),
forecast_revenue = VALUES(forecast_revenue),
confidence_level = VALUES(confidence_level),
last_calculated_at = NOW()
`);
processedCount = Math.floor(totalProducts * 0.98);
outputProgress({
status: 'running',
operation: 'Product forecasts calculated, preparing category stats',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
if (isCancelled) return processedCount;
// Create temporary table for category stats
await connection.query(`
CREATE TEMPORARY TABLE IF NOT EXISTS temp_category_sales AS
SELECT
pc.cat_id,
DAYOFWEEK(o.date) as day_of_week,
SUM(o.quantity) as daily_quantity,
SUM(o.price * o.quantity) as daily_revenue,
COUNT(DISTINCT DATE(o.date)) as day_count
FROM orders o
JOIN product_categories pc ON o.pid = pc.pid
WHERE o.canceled = false
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 90 DAY)
GROUP BY pc.cat_id, DAYOFWEEK(o.date)
`);
await connection.query(`
CREATE TEMPORARY TABLE IF NOT EXISTS temp_category_stats AS
SELECT
cat_id,
AVG(daily_revenue) as overall_avg_revenue,
SUM(day_count) as total_days
FROM temp_category_sales
GROUP BY cat_id
`);
processedCount = Math.floor(totalProducts * 0.99);
outputProgress({
status: 'running',
operation: 'Category stats prepared, calculating category-level forecasts',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
if (isCancelled) return processedCount;
// Calculate category-level forecasts
await connection.query(`
INSERT INTO category_forecasts (
category_id,
forecast_date,
forecast_units,
forecast_revenue,
confidence_level,
last_calculated_at
)
SELECT
cs.cat_id as category_id,
fd.forecast_date,
GREATEST(0,
AVG(cs.daily_quantity) *
(1 + COALESCE(sf.seasonality_factor, 0))
) as forecast_units,
GREATEST(0,
COALESCE(
CASE CASE
WHEN SUM(cs.day_count) >= 4 THEN AVG(cs.daily_revenue) WHEN s.days_with_sales >= n.days THEN COALESCE(t.recent_avg_revenue, s.avg_daily_revenue)
ELSE ct.overall_avg_revenue ELSE s.avg_daily_revenue * (s.days_with_sales / n.days)
END * END,
(1 + COALESCE(sf.seasonality_factor, 0)) * 2
(0.95 + (RAND() * 0.1)), )),
0 c.confidence_level,
) NOW()
) as forecast_revenue, FROM temp_sales_stats s
CASE CROSS JOIN (
WHEN ct.total_days >= 60 THEN 90 SELECT 30 as days
WHEN ct.total_days >= 30 THEN 80 UNION SELECT 60
WHEN ct.total_days >= 14 THEN 70 UNION SELECT 90
ELSE 60 ) n
END as confidence_level, LEFT JOIN temp_recent_trend t ON s.pid = t.pid
NOW() as last_calculated_at LEFT JOIN temp_confidence_calc c ON s.pid = c.pid;
FROM temp_category_sales cs `);
JOIN temp_category_stats ct ON cs.cat_id = ct.cat_id
CROSS JOIN temp_forecast_dates fd
LEFT JOIN sales_seasonality sf ON fd.month = sf.month
GROUP BY cs.cat_id, fd.forecast_date, ct.overall_avg_revenue, ct.total_days, sf.seasonality_factor
HAVING AVG(cs.daily_quantity) > 0
ON DUPLICATE KEY UPDATE
forecast_units = VALUES(forecast_units),
forecast_revenue = VALUES(forecast_revenue),
confidence_level = VALUES(confidence_level),
last_calculated_at = NOW()
`);
// Clean up temporary tables // Clean up temp tables
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_historical_sales');
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_sales_stats');
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_recent_trend');
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_confidence_calc');
lastPid = batch[batch.length - 1].pid;
processedCount += batch.length;
outputProgress({
status: 'running',
operation: 'Processing sales forecast batch',
current: processedCount,
total: totalProductsToUpdate,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProductsToUpdate),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProductsToUpdate) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
}
// If we get here, everything completed successfully
success = true;
// Update calculate_status
await connection.query(` await connection.query(`
DROP TEMPORARY TABLE IF EXISTS temp_forecast_dates; INSERT INTO calculate_status (module_name, last_calculation_timestamp)
DROP TEMPORARY TABLE IF EXISTS temp_daily_sales; VALUES ('sales_forecasts', NOW())
DROP TEMPORARY TABLE IF EXISTS temp_product_stats; ON DUPLICATE KEY UPDATE last_calculation_timestamp = NOW()
DROP TEMPORARY TABLE IF EXISTS temp_category_sales;
DROP TEMPORARY TABLE IF EXISTS temp_category_stats;
`); `);
processedCount = Math.floor(totalProducts * 1.0); return {
outputProgress({ processedProducts: processedCount,
status: 'running', processedOrders: 0,
operation: 'Category forecasts calculated and temporary tables cleaned up', processedPurchaseOrders: 0,
current: processedCount, success
total: totalProducts, };
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
return processedCount;
} catch (error) { } catch (error) {
success = false;
logError(error, 'Error calculating sales forecasts'); logError(error, 'Error calculating sales forecasts');
throw error; throw error;
} finally { } finally {

View File

@@ -1,9 +1,42 @@
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress'); const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress');
const { getConnection } = require('./utils/db'); const { getConnection } = require('./utils/db');
async function calculateTimeAggregates(startTime, totalProducts, processedCount, isCancelled = false) { async function calculateTimeAggregates(startTime, totalProducts, processedCount = 0, isCancelled = false) {
const connection = await getConnection(); const connection = await getConnection();
let success = false;
const BATCH_SIZE = 5000;
try { try {
// Get last calculation timestamp
const [lastCalc] = await connection.query(`
SELECT last_calculation_timestamp
FROM calculate_status
WHERE module_name = 'time_aggregates'
`);
const lastCalculationTime = lastCalc[0]?.last_calculation_timestamp || '1970-01-01';
// Get total count of products needing updates
if (!totalProducts) {
const [productCount] = await connection.query(`
SELECT COUNT(DISTINCT p.pid) as count
FROM products p
LEFT JOIN orders o ON p.pid = o.pid AND o.updated > ?
WHERE p.updated > ?
OR o.pid IS NOT NULL
`, [lastCalculationTime, lastCalculationTime]);
totalProducts = productCount[0].count;
}
if (totalProducts === 0) {
console.log('No products need time aggregate updates');
return {
processedProducts: 0,
processedOrders: 0,
processedPurchaseOrders: 0,
success: true
};
}
if (isCancelled) { if (isCancelled) {
outputProgress({ outputProgress({
status: 'cancelled', status: 'cancelled',
@@ -13,9 +46,19 @@ async function calculateTimeAggregates(startTime, totalProducts, processedCount,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: null, remaining: null,
rate: calculateRate(startTime, processedCount), rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1) percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
}); });
return processedCount; return {
processedProducts: processedCount,
processedOrders: 0,
processedPurchaseOrders: 0,
success
};
} }
outputProgress({ outputProgress({
@@ -26,158 +69,168 @@ async function calculateTimeAggregates(startTime, totalProducts, processedCount,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts), remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount), rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1) percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
}); });
// Initial insert of time-based aggregates // Process in batches
await connection.query(` let lastPid = 0;
INSERT INTO product_time_aggregates ( while (true) {
pid, if (isCancelled) break;
year,
month,
total_quantity_sold,
total_revenue,
total_cost,
order_count,
stock_received,
stock_ordered,
avg_price,
profit_margin
)
WITH sales_data AS (
SELECT
o.pid,
YEAR(o.date) as year,
MONTH(o.date) as month,
SUM(o.quantity) as total_quantity_sold,
SUM((o.price - COALESCE(o.discount, 0)) * o.quantity) as total_revenue,
SUM(COALESCE(p.cost_price, 0) * o.quantity) as total_cost,
COUNT(DISTINCT o.order_number) as order_count,
AVG(o.price - COALESCE(o.discount, 0)) as avg_price,
CASE
WHEN SUM((o.price - COALESCE(o.discount, 0)) * o.quantity) = 0 THEN 0
ELSE ((SUM((o.price - COALESCE(o.discount, 0)) * o.quantity) -
SUM(COALESCE(p.cost_price, 0) * o.quantity)) /
SUM((o.price - COALESCE(o.discount, 0)) * o.quantity)) * 100
END as profit_margin
FROM orders o
JOIN products p ON o.pid = p.pid
WHERE o.canceled = 0
GROUP BY o.pid, YEAR(o.date), MONTH(o.date)
),
purchase_data AS (
SELECT
pid,
YEAR(date) as year,
MONTH(date) as month,
SUM(received) as stock_received,
SUM(ordered) as stock_ordered
FROM purchase_orders
WHERE status = 50
GROUP BY pid, YEAR(date), MONTH(date)
)
SELECT
s.pid,
s.year,
s.month,
s.total_quantity_sold,
s.total_revenue,
s.total_cost,
s.order_count,
COALESCE(p.stock_received, 0) as stock_received,
COALESCE(p.stock_ordered, 0) as stock_ordered,
s.avg_price,
s.profit_margin
FROM sales_data s
LEFT JOIN purchase_data p
ON s.pid = p.pid
AND s.year = p.year
AND s.month = p.month
UNION
SELECT
p.pid,
p.year,
p.month,
0 as total_quantity_sold,
0 as total_revenue,
0 as total_cost,
0 as order_count,
p.stock_received,
p.stock_ordered,
0 as avg_price,
0 as profit_margin
FROM purchase_data p
LEFT JOIN sales_data s
ON p.pid = s.pid
AND p.year = s.year
AND p.month = s.month
WHERE s.pid IS NULL
ON DUPLICATE KEY UPDATE
total_quantity_sold = VALUES(total_quantity_sold),
total_revenue = VALUES(total_revenue),
total_cost = VALUES(total_cost),
order_count = VALUES(order_count),
stock_received = VALUES(stock_received),
stock_ordered = VALUES(stock_ordered),
avg_price = VALUES(avg_price),
profit_margin = VALUES(profit_margin)
`);
processedCount = Math.floor(totalProducts * 0.60); const [batch] = await connection.query(`
outputProgress({ SELECT DISTINCT p.pid
status: 'running', FROM products p
operation: 'Base time aggregates calculated, updating financial metrics', FORCE INDEX (PRIMARY)
current: processedCount, LEFT JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid
total: totalProducts, WHERE p.pid > ?
elapsed: formatElapsedTime(startTime), AND (
remaining: estimateRemaining(startTime, processedCount, totalProducts), p.updated > ?
rate: calculateRate(startTime, processedCount), OR EXISTS (
percentage: ((processedCount / totalProducts) * 100).toFixed(1) SELECT 1
}); FROM orders o2 FORCE INDEX (idx_orders_metrics)
WHERE o2.pid = p.pid
AND o2.updated > ?
)
)
ORDER BY p.pid
LIMIT ?
`, [lastPid, lastCalculationTime, lastCalculationTime, BATCH_SIZE]);
if (isCancelled) return processedCount; if (batch.length === 0) break;
// Update with financial metrics // Calculate and update time aggregates for this batch using temporary table
await connection.query(` await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_time_aggregates');
UPDATE product_time_aggregates pta await connection.query(`
JOIN ( CREATE TEMPORARY TABLE temp_time_aggregates (
pid BIGINT NOT NULL,
year INT NOT NULL,
month INT NOT NULL,
total_quantity_sold INT DEFAULT 0,
total_revenue DECIMAL(10,3) DEFAULT 0,
total_cost DECIMAL(10,3) DEFAULT 0,
order_count INT DEFAULT 0,
stock_received INT DEFAULT 0,
stock_ordered INT DEFAULT 0,
avg_price DECIMAL(10,3),
profit_margin DECIMAL(10,3),
inventory_value DECIMAL(10,3),
gmroi DECIMAL(10,3),
PRIMARY KEY (pid, year, month),
INDEX (pid),
INDEX (year, month)
) ENGINE=MEMORY
`);
// Populate temporary table
await connection.query(`
INSERT INTO temp_time_aggregates
SELECT SELECT
p.pid, p.pid,
YEAR(o.date) as year, YEAR(o.date) as year,
MONTH(o.date) as month, MONTH(o.date) as month,
SUM(o.quantity) as total_quantity_sold,
SUM(o.quantity * o.price) as total_revenue,
SUM(o.quantity * p.cost_price) as total_cost,
COUNT(DISTINCT o.order_number) as order_count,
COALESCE(SUM(CASE WHEN po.received_date IS NOT NULL THEN po.received ELSE 0 END), 0) as stock_received,
COALESCE(SUM(po.ordered), 0) as stock_ordered,
AVG(o.price) as avg_price,
CASE
WHEN SUM(o.quantity * o.price) > 0
THEN ((SUM(o.quantity * o.price) - SUM(o.quantity * p.cost_price)) / SUM(o.quantity * o.price)) * 100
ELSE 0
END as profit_margin,
p.cost_price * p.stock_quantity as inventory_value, p.cost_price * p.stock_quantity as inventory_value,
SUM(o.quantity * (o.price - p.cost_price)) as gross_profit, CASE
COUNT(DISTINCT DATE(o.date)) as days_in_period WHEN p.cost_price * p.stock_quantity > 0
THEN (SUM(o.quantity * (o.price - p.cost_price))) / (p.cost_price * p.stock_quantity)
ELSE 0
END as gmroi
FROM products p FROM products p
LEFT JOIN orders o ON p.pid = o.pid FORCE INDEX (PRIMARY)
WHERE o.canceled = false INNER JOIN orders o FORCE INDEX (idx_orders_metrics) ON p.pid = o.pid
AND o.canceled = false
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH)
LEFT JOIN purchase_orders po FORCE INDEX (idx_po_metrics) ON p.pid = po.pid
AND po.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH)
WHERE p.pid IN (?)
GROUP BY p.pid, YEAR(o.date), MONTH(o.date) GROUP BY p.pid, YEAR(o.date), MONTH(o.date)
) fin ON pta.pid = fin.pid HAVING year IS NOT NULL AND month IS NOT NULL
AND pta.year = fin.year `, [batch.map(row => row.pid)]);
AND pta.month = fin.month
SET // Update from temporary table
pta.inventory_value = COALESCE(fin.inventory_value, 0), await connection.query(`
pta.gmroi = CASE INSERT INTO product_time_aggregates (
WHEN COALESCE(fin.inventory_value, 0) > 0 AND fin.days_in_period > 0 THEN pid, year, month,
(COALESCE(fin.gross_profit, 0) * (365.0 / fin.days_in_period)) / COALESCE(fin.inventory_value, 0) total_quantity_sold, total_revenue, total_cost,
ELSE 0 order_count, stock_received, stock_ordered,
END avg_price, profit_margin, inventory_value, gmroi
)
SELECT
pid, year, month,
total_quantity_sold, total_revenue, total_cost,
order_count, stock_received, stock_ordered,
avg_price, profit_margin, inventory_value, gmroi
FROM temp_time_aggregates
ON DUPLICATE KEY UPDATE
total_quantity_sold = VALUES(total_quantity_sold),
total_revenue = VALUES(total_revenue),
total_cost = VALUES(total_cost),
order_count = VALUES(order_count),
stock_received = VALUES(stock_received),
stock_ordered = VALUES(stock_ordered),
avg_price = VALUES(avg_price),
profit_margin = VALUES(profit_margin),
inventory_value = VALUES(inventory_value),
gmroi = VALUES(gmroi)
`);
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_time_aggregates');
lastPid = batch[batch.length - 1].pid;
processedCount += batch.length;
outputProgress({
status: 'running',
operation: 'Processing time aggregates batch',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
}
// If we get here, everything completed successfully
success = true;
// Update calculate_status
await connection.query(`
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
VALUES ('time_aggregates', NOW())
ON DUPLICATE KEY UPDATE last_calculation_timestamp = NOW()
`); `);
processedCount = Math.floor(totalProducts * 0.65); return {
outputProgress({ processedProducts: processedCount,
status: 'running', processedOrders: 0,
operation: 'Financial metrics updated', processedPurchaseOrders: 0,
current: processedCount, success
total: totalProducts, };
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
return processedCount;
} catch (error) { } catch (error) {
success = false;
logError(error, 'Error calculating time aggregates'); logError(error, 'Error calculating time aggregates');
throw error; throw error;
} finally { } finally {

View File

@@ -1,166 +1,296 @@
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress'); const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress');
const { getConnection } = require('./utils/db'); const { getConnection } = require('./utils/db');
async function calculateVendorMetrics(startTime, totalProducts, processedCount, isCancelled = false) { async function calculateVendorMetrics(startTime, totalProducts, processedCount = 0, isCancelled = false) {
const connection = await getConnection(); const connection = await getConnection();
let success = false;
const BATCH_SIZE = 5000;
try { try {
// Get last calculation timestamp
const [lastCalc] = await connection.query(`
SELECT last_calculation_timestamp
FROM calculate_status
WHERE module_name = 'vendor_metrics'
`);
const lastCalculationTime = lastCalc[0]?.last_calculation_timestamp || '1970-01-01';
// Get total count of vendors needing updates using EXISTS for better performance
const [vendorCount] = await connection.query(`
SELECT COUNT(DISTINCT v.vendor) as count
FROM vendor_details v
WHERE v.status = 'active'
AND (
EXISTS (
SELECT 1 FROM products p
WHERE p.vendor = v.vendor
AND p.updated > ?
)
OR EXISTS (
SELECT 1 FROM purchase_orders po
WHERE po.vendor = v.vendor
AND po.updated > ?
)
)
`, [lastCalculationTime, lastCalculationTime]);
const totalVendors = vendorCount[0].count;
if (totalVendors === 0) {
console.log('No vendors need metric updates');
return {
processedProducts: 0,
processedOrders: 0,
processedPurchaseOrders: 0,
success: true
};
}
if (isCancelled) { if (isCancelled) {
outputProgress({ outputProgress({
status: 'cancelled', status: 'cancelled',
operation: 'Vendor metrics calculation cancelled', operation: 'Vendor metrics calculation cancelled',
current: processedCount, current: processedCount,
total: totalProducts, total: totalVendors,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: null, remaining: null,
rate: calculateRate(startTime, processedCount), rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1) percentage: ((processedCount / totalVendors) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
}); });
return processedCount; return {
processedProducts: processedCount,
processedOrders: 0,
processedPurchaseOrders: 0,
success
};
} }
outputProgress({ outputProgress({
status: 'running', status: 'running',
operation: 'Starting vendor metrics calculation', operation: 'Starting vendor metrics calculation',
current: processedCount, current: processedCount,
total: totalProducts, total: totalVendors,
elapsed: formatElapsedTime(startTime), elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts), remaining: estimateRemaining(startTime, processedCount, totalVendors),
rate: calculateRate(startTime, processedCount), rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1) percentage: ((processedCount / totalVendors) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
}); });
// First ensure all vendors exist in vendor_details // Process in batches
await connection.query(` let lastVendor = '';
INSERT IGNORE INTO vendor_details (vendor, status, created_at, updated_at) while (true) {
SELECT DISTINCT if (isCancelled) break;
vendor,
'active' as status,
NOW() as created_at,
NOW() as updated_at
FROM products
WHERE vendor IS NOT NULL
`);
processedCount = Math.floor(totalProducts * 0.8); // Get batch of vendors using EXISTS for better performance
outputProgress({ const [batch] = await connection.query(`
status: 'running', SELECT DISTINCT v.vendor
operation: 'Vendor details updated, calculating metrics', FROM vendor_details v
current: processedCount, WHERE v.status = 'active'
total: totalProducts, AND v.vendor > ?
elapsed: formatElapsedTime(startTime), AND (
remaining: estimateRemaining(startTime, processedCount, totalProducts), EXISTS (
rate: calculateRate(startTime, processedCount), SELECT 1
percentage: ((processedCount / totalProducts) * 100).toFixed(1) FROM products p
}); WHERE p.vendor = v.vendor
AND p.updated > ?
)
OR EXISTS (
SELECT 1
FROM purchase_orders po
WHERE po.vendor = v.vendor
AND po.updated > ?
)
)
ORDER BY v.vendor
LIMIT ?
`, [lastVendor, lastCalculationTime, lastCalculationTime, BATCH_SIZE]);
if (isCancelled) return processedCount; if (batch.length === 0) break;
// Now calculate vendor metrics // Create temporary tables with optimized structure and indexes
await connection.query(` await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_purchase_stats');
INSERT INTO vendor_metrics ( await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_product_stats');
vendor,
total_revenue, await connection.query(`
total_orders, CREATE TEMPORARY TABLE temp_purchase_stats (
total_late_orders, vendor VARCHAR(100) NOT NULL,
avg_lead_time_days, avg_lead_time_days DECIMAL(10,2),
on_time_delivery_rate, total_orders INT,
order_fill_rate, total_late_orders INT,
avg_order_value, total_purchase_value DECIMAL(15,2),
active_products, avg_order_value DECIMAL(15,2),
total_products, on_time_delivery_rate DECIMAL(5,2),
status, order_fill_rate DECIMAL(5,2),
last_calculated_at PRIMARY KEY (vendor),
) INDEX (total_orders),
WITH vendor_sales AS ( INDEX (total_purchase_value)
) ENGINE=MEMORY
`);
await connection.query(`
CREATE TEMPORARY TABLE temp_product_stats (
vendor VARCHAR(100) NOT NULL,
total_products INT,
active_products INT,
avg_margin_percent DECIMAL(5,2),
total_revenue DECIMAL(15,2),
PRIMARY KEY (vendor),
INDEX (total_products),
INDEX (total_revenue)
) ENGINE=MEMORY
`);
// Populate purchase_stats temp table with optimized index usage
await connection.query(`
INSERT INTO temp_purchase_stats
SELECT
po.vendor,
AVG(DATEDIFF(po.received_date, po.date)) as avg_lead_time_days,
COUNT(DISTINCT po.po_id) as total_orders,
COUNT(CASE WHEN DATEDIFF(po.received_date, po.date) > 30 THEN 1 END) as total_late_orders,
SUM(po.ordered * po.po_cost_price) as total_purchase_value,
AVG(po.ordered * po.po_cost_price) as avg_order_value,
(COUNT(CASE WHEN DATEDIFF(po.received_date, po.date) <= 30 THEN 1 END) / COUNT(*)) * 100 as on_time_delivery_rate,
(SUM(LEAST(po.received, po.ordered)) / NULLIF(SUM(po.ordered), 0)) * 100 as order_fill_rate
FROM purchase_orders po
FORCE INDEX (idx_vendor)
WHERE po.vendor IN (?)
AND po.received_date IS NOT NULL
AND po.date >= DATE_SUB(CURRENT_DATE, INTERVAL 365 DAY)
AND po.updated > ?
GROUP BY po.vendor
`, [batch.map(row => row.vendor), lastCalculationTime]);
// Populate product stats with optimized index usage
await connection.query(`
INSERT INTO temp_product_stats
SELECT SELECT
p.vendor, p.vendor,
SUM(o.quantity * o.price) as total_revenue, COUNT(DISTINCT p.pid) as product_count,
COUNT(DISTINCT o.id) as total_orders, COUNT(DISTINCT CASE WHEN p.visible = true THEN p.pid END) as active_products,
COUNT(DISTINCT p.pid) as active_products AVG(pm.avg_margin_percent) as avg_margin,
SUM(pm.total_revenue) as total_revenue
FROM products p FROM products p
JOIN orders o ON p.pid = o.pid FORCE INDEX (idx_vendor)
WHERE o.canceled = false LEFT JOIN product_metrics pm FORCE INDEX (PRIMARY) ON p.pid = pm.pid
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH) WHERE p.vendor IN (?)
AND (
p.updated > ?
OR EXISTS (
SELECT 1 FROM orders o FORCE INDEX (idx_orders_metrics)
WHERE o.pid = p.pid
AND o.updated > ?
)
)
GROUP BY p.vendor GROUP BY p.vendor
), `, [batch.map(row => row.vendor), lastCalculationTime, lastCalculationTime]);
vendor_po AS (
SELECT // Update metrics using temp tables with optimized join order
p.vendor, await connection.query(`
COUNT(DISTINCT CASE WHEN po.receiving_status = 40 THEN po.id END) as received_orders, INSERT INTO vendor_metrics (
COUNT(DISTINCT po.id) as total_orders,
AVG(CASE
WHEN po.receiving_status = 40
THEN DATEDIFF(po.received_date, po.date)
END) as avg_lead_time_days
FROM products p
JOIN purchase_orders po ON p.pid = po.pid
WHERE po.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH)
GROUP BY p.vendor
),
vendor_products AS (
SELECT
vendor, vendor,
COUNT(DISTINCT pid) as total_products avg_lead_time_days,
FROM products on_time_delivery_rate,
GROUP BY vendor order_fill_rate,
) total_orders,
SELECT total_late_orders,
vs.vendor, total_purchase_value,
COALESCE(vs.total_revenue, 0) as total_revenue, avg_order_value,
COALESCE(vp.total_orders, 0) as total_orders, active_products,
COALESCE(vp.total_orders - vp.received_orders, 0) as total_late_orders, total_products,
COALESCE(vp.avg_lead_time_days, 0) as avg_lead_time_days, total_revenue,
CASE avg_margin_percent,
WHEN vp.total_orders > 0 status,
THEN (vp.received_orders / vp.total_orders) * 100 last_calculated_at
ELSE 0 )
END as on_time_delivery_rate, SELECT
CASE v.vendor,
WHEN vp.total_orders > 0 COALESCE(ps.avg_lead_time_days, 0) as avg_lead_time_days,
THEN (vp.received_orders / vp.total_orders) * 100 COALESCE(ps.on_time_delivery_rate, 0) as on_time_delivery_rate,
ELSE 0 COALESCE(ps.order_fill_rate, 0) as order_fill_rate,
END as order_fill_rate, COALESCE(ps.total_orders, 0) as total_orders,
CASE COALESCE(ps.total_late_orders, 0) as total_late_orders,
WHEN vs.total_orders > 0 COALESCE(ps.total_purchase_value, 0) as total_purchase_value,
THEN vs.total_revenue / vs.total_orders COALESCE(ps.avg_order_value, 0) as avg_order_value,
ELSE 0 COALESCE(prs.active_products, 0) as active_products,
END as avg_order_value, COALESCE(prs.total_products, 0) as total_products,
COALESCE(vs.active_products, 0) as active_products, COALESCE(prs.total_revenue, 0) as total_revenue,
COALESCE(vpr.total_products, 0) as total_products, COALESCE(prs.avg_margin_percent, 0) as avg_margin_percent,
'active' as status, v.status,
NOW() as last_calculated_at NOW() as last_calculated_at
FROM vendor_sales vs FROM vendor_details v
LEFT JOIN vendor_po vp ON vs.vendor = vp.vendor FORCE INDEX (PRIMARY)
LEFT JOIN vendor_products vpr ON vs.vendor = vpr.vendor LEFT JOIN temp_purchase_stats ps ON v.vendor = ps.vendor
WHERE vs.vendor IS NOT NULL LEFT JOIN temp_product_stats prs ON v.vendor = prs.vendor
ON DUPLICATE KEY UPDATE WHERE v.vendor IN (?)
total_revenue = VALUES(total_revenue), ON DUPLICATE KEY UPDATE
total_orders = VALUES(total_orders), avg_lead_time_days = VALUES(avg_lead_time_days),
total_late_orders = VALUES(total_late_orders), on_time_delivery_rate = VALUES(on_time_delivery_rate),
avg_lead_time_days = VALUES(avg_lead_time_days), order_fill_rate = VALUES(order_fill_rate),
on_time_delivery_rate = VALUES(on_time_delivery_rate), total_orders = VALUES(total_orders),
order_fill_rate = VALUES(order_fill_rate), total_late_orders = VALUES(total_late_orders),
avg_order_value = VALUES(avg_order_value), total_purchase_value = VALUES(total_purchase_value),
active_products = VALUES(active_products), avg_order_value = VALUES(avg_order_value),
total_products = VALUES(total_products), active_products = VALUES(active_products),
status = VALUES(status), total_products = VALUES(total_products),
last_calculated_at = VALUES(last_calculated_at) total_revenue = VALUES(total_revenue),
avg_margin_percent = VALUES(avg_margin_percent),
status = VALUES(status),
last_calculated_at = NOW()
`, [batch.map(row => row.vendor)]);
// Clean up temp tables
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_purchase_stats');
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_product_stats');
lastVendor = batch[batch.length - 1].vendor;
processedCount += batch.length;
outputProgress({
status: 'running',
operation: 'Processing vendor metrics batch',
current: processedCount,
total: totalVendors,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalVendors),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalVendors) * 100).toFixed(1),
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
}
// If we get here, everything completed successfully
success = true;
// Update calculate_status
await connection.query(`
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
VALUES ('vendor_metrics', NOW())
ON DUPLICATE KEY UPDATE last_calculation_timestamp = NOW()
`); `);
processedCount = Math.floor(totalProducts * 0.9); return {
outputProgress({ processedProducts: processedCount,
status: 'running', processedOrders: 0,
operation: 'Vendor metrics calculated', processedPurchaseOrders: 0,
current: processedCount, success
total: totalProducts, };
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
return processedCount;
} catch (error) { } catch (error) {
success = false;
logError(error, 'Error calculating vendor metrics'); logError(error, 'Error calculating vendor metrics');
throw error; throw error;
} finally { } finally {