Compare commits
2 Commits
108181c63d
...
8e19e6cd74
| Author | SHA1 | Date | |
|---|---|---|---|
| 8e19e6cd74 | |||
| 749907bd30 |
@@ -62,13 +62,24 @@ const TEMP_TABLES = [
|
|||||||
|
|
||||||
// Add cleanup function for temporary tables
|
// Add cleanup function for temporary tables
|
||||||
async function cleanupTemporaryTables(connection) {
|
async function cleanupTemporaryTables(connection) {
|
||||||
|
// List of possible temporary tables that might exist
|
||||||
|
const tempTables = [
|
||||||
|
'temp_sales_metrics',
|
||||||
|
'temp_purchase_metrics',
|
||||||
|
'temp_forecast_dates',
|
||||||
|
'temp_daily_sales',
|
||||||
|
'temp_product_stats',
|
||||||
|
'temp_category_sales',
|
||||||
|
'temp_category_stats'
|
||||||
|
];
|
||||||
|
|
||||||
try {
|
try {
|
||||||
for (const table of TEMP_TABLES) {
|
// Drop each temporary table if it exists
|
||||||
await connection.query(`DROP TEMPORARY TABLE IF EXISTS ${table}`);
|
for (const table of tempTables) {
|
||||||
|
await connection.query(`DROP TABLE IF EXISTS ${table}`);
|
||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (err) {
|
||||||
logError(error, 'Error cleaning up temporary tables');
|
console.error('Error cleaning up temporary tables:', err);
|
||||||
throw error; // Re-throw to be handled by the caller
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -86,22 +97,42 @@ let isCancelled = false;
|
|||||||
|
|
||||||
function cancelCalculation() {
|
function cancelCalculation() {
|
||||||
isCancelled = true;
|
isCancelled = true;
|
||||||
global.clearProgress();
|
console.log('Calculation has been cancelled by user');
|
||||||
// Format as SSE event
|
|
||||||
const event = {
|
// Force-terminate any query that's been running for more than 5 seconds
|
||||||
progress: {
|
try {
|
||||||
status: 'cancelled',
|
const connection = getConnection();
|
||||||
operation: 'Calculation cancelled',
|
connection.then(async (conn) => {
|
||||||
current: 0,
|
try {
|
||||||
total: 0,
|
// Identify and terminate long-running queries from our application
|
||||||
elapsed: null,
|
await conn.query(`
|
||||||
remaining: null,
|
SELECT pg_cancel_backend(pid)
|
||||||
rate: 0,
|
FROM pg_stat_activity
|
||||||
timestamp: Date.now()
|
WHERE query_start < now() - interval '5 seconds'
|
||||||
}
|
AND application_name LIKE '%node%'
|
||||||
|
AND query NOT LIKE '%pg_cancel_backend%'
|
||||||
|
`);
|
||||||
|
|
||||||
|
// Clean up any temporary tables
|
||||||
|
await cleanupTemporaryTables(conn);
|
||||||
|
|
||||||
|
// Release connection
|
||||||
|
conn.release();
|
||||||
|
} catch (err) {
|
||||||
|
console.error('Error during force cancellation:', err);
|
||||||
|
conn.release();
|
||||||
|
}
|
||||||
|
}).catch(err => {
|
||||||
|
console.error('Could not get connection for cancellation:', err);
|
||||||
|
});
|
||||||
|
} catch (err) {
|
||||||
|
console.error('Failed to terminate running queries:', err);
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
success: true,
|
||||||
|
message: 'Calculation has been cancelled'
|
||||||
};
|
};
|
||||||
process.stdout.write(JSON.stringify(event) + '\n');
|
|
||||||
process.exit(0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle SIGTERM signal for cancellation
|
// Handle SIGTERM signal for cancellation
|
||||||
@@ -119,6 +150,15 @@ async function calculateMetrics() {
|
|||||||
let totalPurchaseOrders = 0;
|
let totalPurchaseOrders = 0;
|
||||||
let calculateHistoryId;
|
let calculateHistoryId;
|
||||||
|
|
||||||
|
// Set a maximum execution time (30 minutes)
|
||||||
|
const MAX_EXECUTION_TIME = 30 * 60 * 1000;
|
||||||
|
const timeout = setTimeout(() => {
|
||||||
|
console.error(`Calculation timed out after ${MAX_EXECUTION_TIME/1000} seconds, forcing termination`);
|
||||||
|
// Call cancel and force exit
|
||||||
|
cancelCalculation();
|
||||||
|
process.exit(1);
|
||||||
|
}, MAX_EXECUTION_TIME);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Clean up any previously running calculations
|
// Clean up any previously running calculations
|
||||||
connection = await getConnection();
|
connection = await getConnection();
|
||||||
@@ -127,24 +167,24 @@ async function calculateMetrics() {
|
|||||||
SET
|
SET
|
||||||
status = 'cancelled',
|
status = 'cancelled',
|
||||||
end_time = NOW(),
|
end_time = NOW(),
|
||||||
duration_seconds = TIMESTAMPDIFF(SECOND, start_time, NOW()),
|
duration_seconds = EXTRACT(EPOCH FROM (NOW() - start_time))::INTEGER,
|
||||||
error_message = 'Previous calculation was not completed properly'
|
error_message = 'Previous calculation was not completed properly'
|
||||||
WHERE status = 'running'
|
WHERE status = 'running'
|
||||||
`);
|
`);
|
||||||
|
|
||||||
// Get counts from all relevant tables
|
// Get counts from all relevant tables
|
||||||
const [[productCount], [orderCount], [poCount]] = await Promise.all([
|
const [productCountResult, orderCountResult, poCountResult] = await Promise.all([
|
||||||
connection.query('SELECT COUNT(*) as total FROM products'),
|
connection.query('SELECT COUNT(*) as total FROM products'),
|
||||||
connection.query('SELECT COUNT(*) as total FROM orders'),
|
connection.query('SELECT COUNT(*) as total FROM orders'),
|
||||||
connection.query('SELECT COUNT(*) as total FROM purchase_orders')
|
connection.query('SELECT COUNT(*) as total FROM purchase_orders')
|
||||||
]);
|
]);
|
||||||
|
|
||||||
totalProducts = productCount.total;
|
totalProducts = parseInt(productCountResult.rows[0].total);
|
||||||
totalOrders = orderCount.total;
|
totalOrders = parseInt(orderCountResult.rows[0].total);
|
||||||
totalPurchaseOrders = poCount.total;
|
totalPurchaseOrders = parseInt(poCountResult.rows[0].total);
|
||||||
|
|
||||||
// Create history record for this calculation
|
// Create history record for this calculation
|
||||||
const [historyResult] = await connection.query(`
|
const historyResult = await connection.query(`
|
||||||
INSERT INTO calculate_history (
|
INSERT INTO calculate_history (
|
||||||
start_time,
|
start_time,
|
||||||
status,
|
status,
|
||||||
@@ -155,19 +195,19 @@ async function calculateMetrics() {
|
|||||||
) VALUES (
|
) VALUES (
|
||||||
NOW(),
|
NOW(),
|
||||||
'running',
|
'running',
|
||||||
?,
|
$1,
|
||||||
?,
|
$2,
|
||||||
?,
|
$3,
|
||||||
JSON_OBJECT(
|
jsonb_build_object(
|
||||||
'skip_product_metrics', ?,
|
'skip_product_metrics', ($4::int > 0),
|
||||||
'skip_time_aggregates', ?,
|
'skip_time_aggregates', ($5::int > 0),
|
||||||
'skip_financial_metrics', ?,
|
'skip_financial_metrics', ($6::int > 0),
|
||||||
'skip_vendor_metrics', ?,
|
'skip_vendor_metrics', ($7::int > 0),
|
||||||
'skip_category_metrics', ?,
|
'skip_category_metrics', ($8::int > 0),
|
||||||
'skip_brand_metrics', ?,
|
'skip_brand_metrics', ($9::int > 0),
|
||||||
'skip_sales_forecasts', ?
|
'skip_sales_forecasts', ($10::int > 0)
|
||||||
)
|
)
|
||||||
)
|
) RETURNING id
|
||||||
`, [
|
`, [
|
||||||
totalProducts,
|
totalProducts,
|
||||||
totalOrders,
|
totalOrders,
|
||||||
@@ -180,8 +220,7 @@ async function calculateMetrics() {
|
|||||||
SKIP_BRAND_METRICS,
|
SKIP_BRAND_METRICS,
|
||||||
SKIP_SALES_FORECASTS
|
SKIP_SALES_FORECASTS
|
||||||
]);
|
]);
|
||||||
calculateHistoryId = historyResult.insertId;
|
calculateHistoryId = historyResult.rows[0].id;
|
||||||
connection.release();
|
|
||||||
|
|
||||||
// Add debug logging for the progress functions
|
// Add debug logging for the progress functions
|
||||||
console.log('Debug - Progress functions:', {
|
console.log('Debug - Progress functions:', {
|
||||||
@@ -199,6 +238,8 @@ async function calculateMetrics() {
|
|||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Release the connection before getting a new one
|
||||||
|
connection.release();
|
||||||
isCancelled = false;
|
isCancelled = false;
|
||||||
connection = await getConnection();
|
connection = await getConnection();
|
||||||
|
|
||||||
@@ -234,10 +275,10 @@ async function calculateMetrics() {
|
|||||||
await connection.query(`
|
await connection.query(`
|
||||||
UPDATE calculate_history
|
UPDATE calculate_history
|
||||||
SET
|
SET
|
||||||
processed_products = ?,
|
processed_products = $1,
|
||||||
processed_orders = ?,
|
processed_orders = $2,
|
||||||
processed_purchase_orders = ?
|
processed_purchase_orders = $3
|
||||||
WHERE id = ?
|
WHERE id = $4
|
||||||
`, [safeProducts, safeOrders, safePurchaseOrders, calculateHistoryId]);
|
`, [safeProducts, safeOrders, safePurchaseOrders, calculateHistoryId]);
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -359,216 +400,6 @@ async function calculateMetrics() {
|
|||||||
console.log('Skipping sales forecasts calculation');
|
console.log('Skipping sales forecasts calculation');
|
||||||
}
|
}
|
||||||
|
|
||||||
// Calculate ABC classification
|
|
||||||
outputProgress({
|
|
||||||
status: 'running',
|
|
||||||
operation: 'Starting ABC classification',
|
|
||||||
current: processedProducts || 0,
|
|
||||||
total: totalProducts || 0,
|
|
||||||
elapsed: formatElapsedTime(startTime),
|
|
||||||
remaining: estimateRemaining(startTime, processedProducts || 0, totalProducts || 0),
|
|
||||||
rate: calculateRate(startTime, processedProducts || 0),
|
|
||||||
percentage: (((processedProducts || 0) / (totalProducts || 1)) * 100).toFixed(1),
|
|
||||||
timing: {
|
|
||||||
start_time: new Date(startTime).toISOString(),
|
|
||||||
end_time: new Date().toISOString(),
|
|
||||||
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
if (isCancelled) return {
|
|
||||||
processedProducts: processedProducts || 0,
|
|
||||||
processedOrders: processedOrders || 0,
|
|
||||||
processedPurchaseOrders: 0,
|
|
||||||
success: false
|
|
||||||
};
|
|
||||||
|
|
||||||
const [abcConfig] = await connection.query('SELECT a_threshold, b_threshold FROM abc_classification_config WHERE id = 1');
|
|
||||||
const abcThresholds = abcConfig[0] || { a_threshold: 20, b_threshold: 50 };
|
|
||||||
|
|
||||||
// First, create and populate the rankings table with an index
|
|
||||||
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_revenue_ranks');
|
|
||||||
await connection.query(`
|
|
||||||
CREATE TEMPORARY TABLE temp_revenue_ranks (
|
|
||||||
pid BIGINT NOT NULL,
|
|
||||||
total_revenue DECIMAL(10,3),
|
|
||||||
rank_num INT,
|
|
||||||
total_count INT,
|
|
||||||
PRIMARY KEY (pid),
|
|
||||||
INDEX (rank_num)
|
|
||||||
) ENGINE=MEMORY
|
|
||||||
`);
|
|
||||||
|
|
||||||
outputProgress({
|
|
||||||
status: 'running',
|
|
||||||
operation: 'Creating revenue rankings',
|
|
||||||
current: processedProducts || 0,
|
|
||||||
total: totalProducts || 0,
|
|
||||||
elapsed: formatElapsedTime(startTime),
|
|
||||||
remaining: estimateRemaining(startTime, processedProducts || 0, totalProducts || 0),
|
|
||||||
rate: calculateRate(startTime, processedProducts || 0),
|
|
||||||
percentage: (((processedProducts || 0) / (totalProducts || 1)) * 100).toFixed(1),
|
|
||||||
timing: {
|
|
||||||
start_time: new Date(startTime).toISOString(),
|
|
||||||
end_time: new Date().toISOString(),
|
|
||||||
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
if (isCancelled) return {
|
|
||||||
processedProducts: processedProducts || 0,
|
|
||||||
processedOrders: processedOrders || 0,
|
|
||||||
processedPurchaseOrders: 0,
|
|
||||||
success: false
|
|
||||||
};
|
|
||||||
|
|
||||||
await connection.query(`
|
|
||||||
INSERT INTO temp_revenue_ranks
|
|
||||||
SELECT
|
|
||||||
pid,
|
|
||||||
total_revenue,
|
|
||||||
@rank := @rank + 1 as rank_num,
|
|
||||||
@total_count := @rank as total_count
|
|
||||||
FROM (
|
|
||||||
SELECT pid, total_revenue
|
|
||||||
FROM product_metrics
|
|
||||||
WHERE total_revenue > 0
|
|
||||||
ORDER BY total_revenue DESC
|
|
||||||
) ranked,
|
|
||||||
(SELECT @rank := 0) r
|
|
||||||
`);
|
|
||||||
|
|
||||||
// Get total count for percentage calculation
|
|
||||||
const [rankingCount] = await connection.query('SELECT MAX(rank_num) as total_count FROM temp_revenue_ranks');
|
|
||||||
const totalCount = rankingCount[0].total_count || 1;
|
|
||||||
const max_rank = totalCount; // Store max_rank for use in classification
|
|
||||||
|
|
||||||
outputProgress({
|
|
||||||
status: 'running',
|
|
||||||
operation: 'Updating ABC classifications',
|
|
||||||
current: processedProducts || 0,
|
|
||||||
total: totalProducts || 0,
|
|
||||||
elapsed: formatElapsedTime(startTime),
|
|
||||||
remaining: estimateRemaining(startTime, processedProducts || 0, totalProducts || 0),
|
|
||||||
rate: calculateRate(startTime, processedProducts || 0),
|
|
||||||
percentage: (((processedProducts || 0) / (totalProducts || 1)) * 100).toFixed(1),
|
|
||||||
timing: {
|
|
||||||
start_time: new Date(startTime).toISOString(),
|
|
||||||
end_time: new Date().toISOString(),
|
|
||||||
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
if (isCancelled) return {
|
|
||||||
processedProducts: processedProducts || 0,
|
|
||||||
processedOrders: processedOrders || 0,
|
|
||||||
processedPurchaseOrders: 0,
|
|
||||||
success: false
|
|
||||||
};
|
|
||||||
|
|
||||||
// ABC classification progress tracking
|
|
||||||
let abcProcessedCount = 0;
|
|
||||||
const batchSize = 5000;
|
|
||||||
let lastProgressUpdate = Date.now();
|
|
||||||
const progressUpdateInterval = 1000; // Update every second
|
|
||||||
|
|
||||||
while (true) {
|
|
||||||
if (isCancelled) return {
|
|
||||||
processedProducts: Number(processedProducts) || 0,
|
|
||||||
processedOrders: Number(processedOrders) || 0,
|
|
||||||
processedPurchaseOrders: 0,
|
|
||||||
success: false
|
|
||||||
};
|
|
||||||
|
|
||||||
// First get a batch of PIDs that need updating
|
|
||||||
const [pids] = await connection.query(`
|
|
||||||
SELECT pm.pid
|
|
||||||
FROM product_metrics pm
|
|
||||||
LEFT JOIN temp_revenue_ranks tr ON pm.pid = tr.pid
|
|
||||||
WHERE pm.abc_class IS NULL
|
|
||||||
OR pm.abc_class !=
|
|
||||||
CASE
|
|
||||||
WHEN tr.rank_num IS NULL THEN 'C'
|
|
||||||
WHEN (tr.rank_num / ?) * 100 <= ? THEN 'A'
|
|
||||||
WHEN (tr.rank_num / ?) * 100 <= ? THEN 'B'
|
|
||||||
ELSE 'C'
|
|
||||||
END
|
|
||||||
LIMIT ?
|
|
||||||
`, [max_rank, abcThresholds.a_threshold,
|
|
||||||
max_rank, abcThresholds.b_threshold,
|
|
||||||
batchSize]);
|
|
||||||
|
|
||||||
if (pids.length === 0) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Then update just those PIDs
|
|
||||||
const [result] = await connection.query(`
|
|
||||||
UPDATE product_metrics pm
|
|
||||||
LEFT JOIN temp_revenue_ranks tr ON pm.pid = tr.pid
|
|
||||||
SET pm.abc_class =
|
|
||||||
CASE
|
|
||||||
WHEN tr.rank_num IS NULL THEN 'C'
|
|
||||||
WHEN (tr.rank_num / ?) * 100 <= ? THEN 'A'
|
|
||||||
WHEN (tr.rank_num / ?) * 100 <= ? THEN 'B'
|
|
||||||
ELSE 'C'
|
|
||||||
END,
|
|
||||||
pm.last_calculated_at = NOW()
|
|
||||||
WHERE pm.pid IN (?)
|
|
||||||
`, [max_rank, abcThresholds.a_threshold,
|
|
||||||
max_rank, abcThresholds.b_threshold,
|
|
||||||
pids.map(row => row.pid)]);
|
|
||||||
|
|
||||||
abcProcessedCount += result.affectedRows;
|
|
||||||
|
|
||||||
// Calculate progress ensuring valid numbers
|
|
||||||
const currentProgress = Math.floor(totalProducts * (0.99 + (abcProcessedCount / (totalCount || 1)) * 0.01));
|
|
||||||
processedProducts = Number(currentProgress) || processedProducts || 0;
|
|
||||||
|
|
||||||
// Only update progress at most once per second
|
|
||||||
const now = Date.now();
|
|
||||||
if (now - lastProgressUpdate >= progressUpdateInterval) {
|
|
||||||
const progress = ensureValidProgress(processedProducts, totalProducts);
|
|
||||||
|
|
||||||
outputProgress({
|
|
||||||
status: 'running',
|
|
||||||
operation: 'ABC classification progress',
|
|
||||||
current: progress.current,
|
|
||||||
total: progress.total,
|
|
||||||
elapsed: formatElapsedTime(startTime),
|
|
||||||
remaining: estimateRemaining(startTime, progress.current, progress.total),
|
|
||||||
rate: calculateRate(startTime, progress.current),
|
|
||||||
percentage: progress.percentage,
|
|
||||||
timing: {
|
|
||||||
start_time: new Date(startTime).toISOString(),
|
|
||||||
end_time: new Date().toISOString(),
|
|
||||||
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
lastProgressUpdate = now;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update database progress
|
|
||||||
await updateProgress(processedProducts, processedOrders, processedPurchaseOrders);
|
|
||||||
|
|
||||||
// Small delay between batches to allow other transactions
|
|
||||||
await new Promise(resolve => setTimeout(resolve, 100));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Clean up
|
|
||||||
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_revenue_ranks');
|
|
||||||
|
|
||||||
const endTime = Date.now();
|
|
||||||
const totalElapsedSeconds = Math.round((endTime - startTime) / 1000);
|
|
||||||
|
|
||||||
// Update calculate_status for ABC classification
|
|
||||||
await connection.query(`
|
|
||||||
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
|
|
||||||
VALUES ('abc_classification', NOW())
|
|
||||||
ON DUPLICATE KEY UPDATE last_calculation_timestamp = NOW()
|
|
||||||
`);
|
|
||||||
|
|
||||||
// Final progress update with guaranteed valid numbers
|
// Final progress update with guaranteed valid numbers
|
||||||
const finalProgress = ensureValidProgress(totalProducts, totalProducts);
|
const finalProgress = ensureValidProgress(totalProducts, totalProducts);
|
||||||
|
|
||||||
@@ -578,14 +409,14 @@ async function calculateMetrics() {
|
|||||||
operation: 'Metrics calculation complete',
|
operation: 'Metrics calculation complete',
|
||||||
current: finalProgress.current,
|
current: finalProgress.current,
|
||||||
total: finalProgress.total,
|
total: finalProgress.total,
|
||||||
elapsed: formatElapsedTime(startTime),
|
elapsed: global.formatElapsedTime(startTime),
|
||||||
remaining: '0s',
|
remaining: '0s',
|
||||||
rate: calculateRate(startTime, finalProgress.current),
|
rate: global.calculateRate(startTime, finalProgress.current),
|
||||||
percentage: '100',
|
percentage: '100',
|
||||||
timing: {
|
timing: {
|
||||||
start_time: new Date(startTime).toISOString(),
|
start_time: new Date(startTime).toISOString(),
|
||||||
end_time: new Date().toISOString(),
|
end_time: new Date().toISOString(),
|
||||||
elapsed_seconds: totalElapsedSeconds
|
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -601,13 +432,13 @@ async function calculateMetrics() {
|
|||||||
UPDATE calculate_history
|
UPDATE calculate_history
|
||||||
SET
|
SET
|
||||||
end_time = NOW(),
|
end_time = NOW(),
|
||||||
duration_seconds = ?,
|
duration_seconds = $1,
|
||||||
processed_products = ?,
|
processed_products = $2,
|
||||||
processed_orders = ?,
|
processed_orders = $3,
|
||||||
processed_purchase_orders = ?,
|
processed_purchase_orders = $4,
|
||||||
status = 'completed'
|
status = 'completed'
|
||||||
WHERE id = ?
|
WHERE id = $5
|
||||||
`, [totalElapsedSeconds,
|
`, [Math.round((Date.now() - startTime) / 1000),
|
||||||
finalStats.processedProducts,
|
finalStats.processedProducts,
|
||||||
finalStats.processedOrders,
|
finalStats.processedOrders,
|
||||||
finalStats.processedPurchaseOrders,
|
finalStats.processedPurchaseOrders,
|
||||||
@@ -616,6 +447,11 @@ async function calculateMetrics() {
|
|||||||
// Clear progress file on successful completion
|
// Clear progress file on successful completion
|
||||||
global.clearProgress();
|
global.clearProgress();
|
||||||
|
|
||||||
|
return {
|
||||||
|
success: true,
|
||||||
|
message: 'Calculation completed successfully',
|
||||||
|
duration: Math.round((Date.now() - startTime) / 1000)
|
||||||
|
};
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
const endTime = Date.now();
|
const endTime = Date.now();
|
||||||
const totalElapsedSeconds = Math.round((endTime - startTime) / 1000);
|
const totalElapsedSeconds = Math.round((endTime - startTime) / 1000);
|
||||||
@@ -625,13 +461,13 @@ async function calculateMetrics() {
|
|||||||
UPDATE calculate_history
|
UPDATE calculate_history
|
||||||
SET
|
SET
|
||||||
end_time = NOW(),
|
end_time = NOW(),
|
||||||
duration_seconds = ?,
|
duration_seconds = $1,
|
||||||
processed_products = ?,
|
processed_products = $2,
|
||||||
processed_orders = ?,
|
processed_orders = $3,
|
||||||
processed_purchase_orders = ?,
|
processed_purchase_orders = $4,
|
||||||
status = ?,
|
status = $5,
|
||||||
error_message = ?
|
error_message = $6
|
||||||
WHERE id = ?
|
WHERE id = $7
|
||||||
`, [
|
`, [
|
||||||
totalElapsedSeconds,
|
totalElapsedSeconds,
|
||||||
processedProducts || 0, // Ensure we have a valid number
|
processedProducts || 0, // Ensure we have a valid number
|
||||||
@@ -677,17 +513,38 @@ async function calculateMetrics() {
|
|||||||
}
|
}
|
||||||
throw error;
|
throw error;
|
||||||
} finally {
|
} finally {
|
||||||
|
// Clear the timeout to prevent forced termination
|
||||||
|
clearTimeout(timeout);
|
||||||
|
|
||||||
|
// Always clean up and release connection
|
||||||
if (connection) {
|
if (connection) {
|
||||||
// Ensure temporary tables are cleaned up
|
try {
|
||||||
await cleanupTemporaryTables(connection);
|
await cleanupTemporaryTables(connection);
|
||||||
connection.release();
|
connection.release();
|
||||||
|
} catch (err) {
|
||||||
|
console.error('Error in final cleanup:', err);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// Close the connection pool when we're done
|
|
||||||
await closePool();
|
|
||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
success = false;
|
console.error('Error in metrics calculation', error);
|
||||||
logError(error, 'Error in metrics calculation');
|
|
||||||
|
try {
|
||||||
|
if (connection) {
|
||||||
|
await connection.query(`
|
||||||
|
UPDATE calculate_history
|
||||||
|
SET
|
||||||
|
status = 'error',
|
||||||
|
end_time = NOW(),
|
||||||
|
duration_seconds = EXTRACT(EPOCH FROM (NOW() - start_time))::INTEGER,
|
||||||
|
error_message = $1
|
||||||
|
WHERE id = $2
|
||||||
|
`, [error.message.substring(0, 500), calculateHistoryId]);
|
||||||
|
}
|
||||||
|
} catch (updateError) {
|
||||||
|
console.error('Error updating calculation history:', updateError);
|
||||||
|
}
|
||||||
|
|
||||||
throw error;
|
throw error;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -32,12 +32,12 @@ async function calculateBrandMetrics(startTime, totalProducts, processedCount =
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Get order count that will be processed
|
// Get order count that will be processed
|
||||||
const [orderCount] = await connection.query(`
|
const orderCount = await connection.query(`
|
||||||
SELECT COUNT(*) as count
|
SELECT COUNT(*) as count
|
||||||
FROM orders o
|
FROM orders o
|
||||||
WHERE o.canceled = false
|
WHERE o.canceled = false
|
||||||
`);
|
`);
|
||||||
processedOrders = orderCount[0].count;
|
processedOrders = parseInt(orderCount.rows[0].count);
|
||||||
|
|
||||||
outputProgress({
|
outputProgress({
|
||||||
status: 'running',
|
status: 'running',
|
||||||
@@ -98,14 +98,14 @@ async function calculateBrandMetrics(startTime, totalProducts, processedCount =
|
|||||||
SUM(o.quantity * (o.price - COALESCE(o.discount, 0) - p.cost_price)) as period_margin,
|
SUM(o.quantity * (o.price - COALESCE(o.discount, 0) - p.cost_price)) as period_margin,
|
||||||
COUNT(DISTINCT DATE(o.date)) as period_days,
|
COUNT(DISTINCT DATE(o.date)) as period_days,
|
||||||
CASE
|
CASE
|
||||||
WHEN o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 3 MONTH) THEN 'current'
|
WHEN o.date >= CURRENT_DATE - INTERVAL '3 months' THEN 'current'
|
||||||
WHEN o.date BETWEEN DATE_SUB(CURRENT_DATE, INTERVAL 15 MONTH)
|
WHEN o.date BETWEEN CURRENT_DATE - INTERVAL '15 months'
|
||||||
AND DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH) THEN 'previous'
|
AND CURRENT_DATE - INTERVAL '12 months' THEN 'previous'
|
||||||
END as period_type
|
END as period_type
|
||||||
FROM filtered_products p
|
FROM filtered_products p
|
||||||
JOIN orders o ON p.pid = o.pid
|
JOIN orders o ON p.pid = o.pid
|
||||||
WHERE o.canceled = false
|
WHERE o.canceled = false
|
||||||
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 15 MONTH)
|
AND o.date >= CURRENT_DATE - INTERVAL '15 months'
|
||||||
GROUP BY p.brand, period_type
|
GROUP BY p.brand, period_type
|
||||||
),
|
),
|
||||||
brand_data AS (
|
brand_data AS (
|
||||||
@@ -165,15 +165,16 @@ async function calculateBrandMetrics(startTime, totalProducts, processedCount =
|
|||||||
LEFT JOIN sales_periods sp ON bd.brand = sp.brand
|
LEFT JOIN sales_periods sp ON bd.brand = sp.brand
|
||||||
GROUP BY bd.brand, bd.product_count, bd.active_products, bd.total_stock_units,
|
GROUP BY bd.brand, bd.product_count, bd.active_products, bd.total_stock_units,
|
||||||
bd.total_stock_cost, bd.total_stock_retail, bd.total_revenue, bd.avg_margin
|
bd.total_stock_cost, bd.total_stock_retail, bd.total_revenue, bd.avg_margin
|
||||||
ON DUPLICATE KEY UPDATE
|
ON CONFLICT (brand) DO UPDATE
|
||||||
product_count = VALUES(product_count),
|
SET
|
||||||
active_products = VALUES(active_products),
|
product_count = EXCLUDED.product_count,
|
||||||
total_stock_units = VALUES(total_stock_units),
|
active_products = EXCLUDED.active_products,
|
||||||
total_stock_cost = VALUES(total_stock_cost),
|
total_stock_units = EXCLUDED.total_stock_units,
|
||||||
total_stock_retail = VALUES(total_stock_retail),
|
total_stock_cost = EXCLUDED.total_stock_cost,
|
||||||
total_revenue = VALUES(total_revenue),
|
total_stock_retail = EXCLUDED.total_stock_retail,
|
||||||
avg_margin = VALUES(avg_margin),
|
total_revenue = EXCLUDED.total_revenue,
|
||||||
growth_rate = VALUES(growth_rate),
|
avg_margin = EXCLUDED.avg_margin,
|
||||||
|
growth_rate = EXCLUDED.growth_rate,
|
||||||
last_calculated_at = CURRENT_TIMESTAMP
|
last_calculated_at = CURRENT_TIMESTAMP
|
||||||
`);
|
`);
|
||||||
|
|
||||||
@@ -230,8 +231,8 @@ async function calculateBrandMetrics(startTime, totalProducts, processedCount =
|
|||||||
monthly_metrics AS (
|
monthly_metrics AS (
|
||||||
SELECT
|
SELECT
|
||||||
p.brand,
|
p.brand,
|
||||||
YEAR(o.date) as year,
|
EXTRACT(YEAR FROM o.date::timestamp with time zone) as year,
|
||||||
MONTH(o.date) as month,
|
EXTRACT(MONTH FROM o.date::timestamp with time zone) as month,
|
||||||
COUNT(DISTINCT p.valid_pid) as product_count,
|
COUNT(DISTINCT p.valid_pid) as product_count,
|
||||||
COUNT(DISTINCT p.active_pid) as active_products,
|
COUNT(DISTINCT p.active_pid) as active_products,
|
||||||
SUM(p.valid_stock) as total_stock_units,
|
SUM(p.valid_stock) as total_stock_units,
|
||||||
@@ -255,19 +256,20 @@ async function calculateBrandMetrics(startTime, totalProducts, processedCount =
|
|||||||
END as avg_margin
|
END as avg_margin
|
||||||
FROM filtered_products p
|
FROM filtered_products p
|
||||||
LEFT JOIN orders o ON p.pid = o.pid AND o.canceled = false
|
LEFT JOIN orders o ON p.pid = o.pid AND o.canceled = false
|
||||||
WHERE o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH)
|
WHERE o.date >= CURRENT_DATE - INTERVAL '12 months'
|
||||||
GROUP BY p.brand, YEAR(o.date), MONTH(o.date)
|
GROUP BY p.brand, EXTRACT(YEAR FROM o.date::timestamp with time zone), EXTRACT(MONTH FROM o.date::timestamp with time zone)
|
||||||
)
|
)
|
||||||
SELECT *
|
SELECT *
|
||||||
FROM monthly_metrics
|
FROM monthly_metrics
|
||||||
ON DUPLICATE KEY UPDATE
|
ON CONFLICT (brand, year, month) DO UPDATE
|
||||||
product_count = VALUES(product_count),
|
SET
|
||||||
active_products = VALUES(active_products),
|
product_count = EXCLUDED.product_count,
|
||||||
total_stock_units = VALUES(total_stock_units),
|
active_products = EXCLUDED.active_products,
|
||||||
total_stock_cost = VALUES(total_stock_cost),
|
total_stock_units = EXCLUDED.total_stock_units,
|
||||||
total_stock_retail = VALUES(total_stock_retail),
|
total_stock_cost = EXCLUDED.total_stock_cost,
|
||||||
total_revenue = VALUES(total_revenue),
|
total_stock_retail = EXCLUDED.total_stock_retail,
|
||||||
avg_margin = VALUES(avg_margin)
|
total_revenue = EXCLUDED.total_revenue,
|
||||||
|
avg_margin = EXCLUDED.avg_margin
|
||||||
`);
|
`);
|
||||||
|
|
||||||
processedCount = Math.floor(totalProducts * 0.99);
|
processedCount = Math.floor(totalProducts * 0.99);
|
||||||
@@ -294,7 +296,8 @@ async function calculateBrandMetrics(startTime, totalProducts, processedCount =
|
|||||||
await connection.query(`
|
await connection.query(`
|
||||||
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
|
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
|
||||||
VALUES ('brand_metrics', NOW())
|
VALUES ('brand_metrics', NOW())
|
||||||
ON DUPLICATE KEY UPDATE last_calculation_timestamp = NOW()
|
ON CONFLICT (module_name) DO UPDATE
|
||||||
|
SET last_calculation_timestamp = NOW()
|
||||||
`);
|
`);
|
||||||
|
|
||||||
return {
|
return {
|
||||||
|
|||||||
@@ -32,12 +32,12 @@ async function calculateCategoryMetrics(startTime, totalProducts, processedCount
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Get order count that will be processed
|
// Get order count that will be processed
|
||||||
const [orderCount] = await connection.query(`
|
const orderCount = await connection.query(`
|
||||||
SELECT COUNT(*) as count
|
SELECT COUNT(*) as count
|
||||||
FROM orders o
|
FROM orders o
|
||||||
WHERE o.canceled = false
|
WHERE o.canceled = false
|
||||||
`);
|
`);
|
||||||
processedOrders = orderCount[0].count;
|
processedOrders = parseInt(orderCount.rows[0].count);
|
||||||
|
|
||||||
outputProgress({
|
outputProgress({
|
||||||
status: 'running',
|
status: 'running',
|
||||||
@@ -76,12 +76,13 @@ async function calculateCategoryMetrics(startTime, totalProducts, processedCount
|
|||||||
LEFT JOIN product_categories pc ON c.cat_id = pc.cat_id
|
LEFT JOIN product_categories pc ON c.cat_id = pc.cat_id
|
||||||
LEFT JOIN products p ON pc.pid = p.pid
|
LEFT JOIN products p ON pc.pid = p.pid
|
||||||
GROUP BY c.cat_id, c.status
|
GROUP BY c.cat_id, c.status
|
||||||
ON DUPLICATE KEY UPDATE
|
ON CONFLICT (category_id) DO UPDATE
|
||||||
product_count = VALUES(product_count),
|
SET
|
||||||
active_products = VALUES(active_products),
|
product_count = EXCLUDED.product_count,
|
||||||
total_value = VALUES(total_value),
|
active_products = EXCLUDED.active_products,
|
||||||
status = VALUES(status),
|
total_value = EXCLUDED.total_value,
|
||||||
last_calculated_at = VALUES(last_calculated_at)
|
status = EXCLUDED.status,
|
||||||
|
last_calculated_at = EXCLUDED.last_calculated_at
|
||||||
`);
|
`);
|
||||||
|
|
||||||
processedCount = Math.floor(totalProducts * 0.90);
|
processedCount = Math.floor(totalProducts * 0.90);
|
||||||
@@ -127,17 +128,13 @@ async function calculateCategoryMetrics(startTime, totalProducts, processedCount
|
|||||||
(tc.category_id IS NULL AND tc.vendor = p.vendor) OR
|
(tc.category_id IS NULL AND tc.vendor = p.vendor) OR
|
||||||
(tc.category_id IS NULL AND tc.vendor IS NULL)
|
(tc.category_id IS NULL AND tc.vendor IS NULL)
|
||||||
WHERE o.canceled = false
|
WHERE o.canceled = false
|
||||||
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL COALESCE(tc.calculation_period_days, 30) DAY)
|
AND o.date >= CURRENT_DATE - (COALESCE(tc.calculation_period_days, 30) || ' days')::INTERVAL
|
||||||
GROUP BY pc.cat_id
|
GROUP BY pc.cat_id
|
||||||
)
|
)
|
||||||
UPDATE category_metrics cm
|
UPDATE category_metrics
|
||||||
JOIN category_sales cs ON cm.category_id = cs.cat_id
|
|
||||||
LEFT JOIN turnover_config tc ON
|
|
||||||
(tc.category_id = cm.category_id AND tc.vendor IS NULL) OR
|
|
||||||
(tc.category_id IS NULL AND tc.vendor IS NULL)
|
|
||||||
SET
|
SET
|
||||||
cm.avg_margin = COALESCE(cs.total_margin * 100.0 / NULLIF(cs.total_sales, 0), 0),
|
avg_margin = COALESCE(cs.total_margin * 100.0 / NULLIF(cs.total_sales, 0), 0),
|
||||||
cm.turnover_rate = CASE
|
turnover_rate = CASE
|
||||||
WHEN cs.avg_stock > 0 AND cs.active_days > 0
|
WHEN cs.avg_stock > 0 AND cs.active_days > 0
|
||||||
THEN LEAST(
|
THEN LEAST(
|
||||||
(cs.units_sold / cs.avg_stock) * (365.0 / cs.active_days),
|
(cs.units_sold / cs.avg_stock) * (365.0 / cs.active_days),
|
||||||
@@ -145,7 +142,9 @@ async function calculateCategoryMetrics(startTime, totalProducts, processedCount
|
|||||||
)
|
)
|
||||||
ELSE 0
|
ELSE 0
|
||||||
END,
|
END,
|
||||||
cm.last_calculated_at = NOW()
|
last_calculated_at = NOW()
|
||||||
|
FROM category_sales cs
|
||||||
|
WHERE category_id = cs.cat_id
|
||||||
`);
|
`);
|
||||||
|
|
||||||
processedCount = Math.floor(totalProducts * 0.95);
|
processedCount = Math.floor(totalProducts * 0.95);
|
||||||
@@ -184,9 +183,9 @@ async function calculateCategoryMetrics(startTime, totalProducts, processedCount
|
|||||||
FROM product_categories pc
|
FROM product_categories pc
|
||||||
JOIN products p ON pc.pid = p.pid
|
JOIN products p ON pc.pid = p.pid
|
||||||
JOIN orders o ON p.pid = o.pid
|
JOIN orders o ON p.pid = o.pid
|
||||||
LEFT JOIN sales_seasonality ss ON MONTH(o.date) = ss.month
|
LEFT JOIN sales_seasonality ss ON EXTRACT(MONTH FROM o.date) = ss.month
|
||||||
WHERE o.canceled = false
|
WHERE o.canceled = false
|
||||||
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 3 MONTH)
|
AND o.date >= CURRENT_DATE - INTERVAL '3 months'
|
||||||
GROUP BY pc.cat_id
|
GROUP BY pc.cat_id
|
||||||
),
|
),
|
||||||
previous_period AS (
|
previous_period AS (
|
||||||
@@ -198,26 +197,26 @@ async function calculateCategoryMetrics(startTime, totalProducts, processedCount
|
|||||||
FROM product_categories pc
|
FROM product_categories pc
|
||||||
JOIN products p ON pc.pid = p.pid
|
JOIN products p ON pc.pid = p.pid
|
||||||
JOIN orders o ON p.pid = o.pid
|
JOIN orders o ON p.pid = o.pid
|
||||||
LEFT JOIN sales_seasonality ss ON MONTH(o.date) = ss.month
|
LEFT JOIN sales_seasonality ss ON EXTRACT(MONTH FROM o.date) = ss.month
|
||||||
WHERE o.canceled = false
|
WHERE o.canceled = false
|
||||||
AND o.date BETWEEN DATE_SUB(CURRENT_DATE, INTERVAL 15 MONTH)
|
AND o.date BETWEEN CURRENT_DATE - INTERVAL '15 months'
|
||||||
AND DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH)
|
AND CURRENT_DATE - INTERVAL '12 months'
|
||||||
GROUP BY pc.cat_id
|
GROUP BY pc.cat_id
|
||||||
),
|
),
|
||||||
trend_data AS (
|
trend_data AS (
|
||||||
SELECT
|
SELECT
|
||||||
pc.cat_id,
|
pc.cat_id,
|
||||||
MONTH(o.date) as month,
|
EXTRACT(MONTH FROM o.date) as month,
|
||||||
SUM(o.quantity * (o.price - COALESCE(o.discount, 0)) /
|
SUM(o.quantity * (o.price - COALESCE(o.discount, 0)) /
|
||||||
(1 + COALESCE(ss.seasonality_factor, 0))) as revenue,
|
(1 + COALESCE(ss.seasonality_factor, 0))) as revenue,
|
||||||
COUNT(DISTINCT DATE(o.date)) as days_in_month
|
COUNT(DISTINCT DATE(o.date)) as days_in_month
|
||||||
FROM product_categories pc
|
FROM product_categories pc
|
||||||
JOIN products p ON pc.pid = p.pid
|
JOIN products p ON pc.pid = p.pid
|
||||||
JOIN orders o ON p.pid = o.pid
|
JOIN orders o ON p.pid = o.pid
|
||||||
LEFT JOIN sales_seasonality ss ON MONTH(o.date) = ss.month
|
LEFT JOIN sales_seasonality ss ON EXTRACT(MONTH FROM o.date) = ss.month
|
||||||
WHERE o.canceled = false
|
WHERE o.canceled = false
|
||||||
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 15 MONTH)
|
AND o.date >= CURRENT_DATE - INTERVAL '15 months'
|
||||||
GROUP BY pc.cat_id, MONTH(o.date)
|
GROUP BY pc.cat_id, EXTRACT(MONTH FROM o.date)
|
||||||
),
|
),
|
||||||
trend_stats AS (
|
trend_stats AS (
|
||||||
SELECT
|
SELECT
|
||||||
@@ -261,16 +260,42 @@ async function calculateCategoryMetrics(startTime, totalProducts, processedCount
|
|||||||
JOIN products p ON pc.pid = p.pid
|
JOIN products p ON pc.pid = p.pid
|
||||||
JOIN orders o ON p.pid = o.pid
|
JOIN orders o ON p.pid = o.pid
|
||||||
WHERE o.canceled = false
|
WHERE o.canceled = false
|
||||||
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 3 MONTH)
|
AND o.date >= CURRENT_DATE - INTERVAL '3 months'
|
||||||
GROUP BY pc.cat_id
|
GROUP BY pc.cat_id
|
||||||
|
),
|
||||||
|
combined_metrics AS (
|
||||||
|
SELECT
|
||||||
|
COALESCE(cp.cat_id, pp.cat_id) as category_id,
|
||||||
|
CASE
|
||||||
|
WHEN pp.revenue = 0 AND COALESCE(cp.revenue, 0) > 0 THEN 100.0
|
||||||
|
WHEN pp.revenue = 0 OR cp.revenue IS NULL THEN 0.0
|
||||||
|
WHEN ta.trend_slope IS NOT NULL THEN
|
||||||
|
GREATEST(
|
||||||
|
-100.0,
|
||||||
|
LEAST(
|
||||||
|
(ta.trend_slope / NULLIF(ta.avg_daily_revenue, 0)) * 365 * 100,
|
||||||
|
999.99
|
||||||
|
)
|
||||||
|
)
|
||||||
|
ELSE
|
||||||
|
GREATEST(
|
||||||
|
-100.0,
|
||||||
|
LEAST(
|
||||||
|
((COALESCE(cp.revenue, 0) - pp.revenue) /
|
||||||
|
NULLIF(ABS(pp.revenue), 0)) * 100.0,
|
||||||
|
999.99
|
||||||
|
)
|
||||||
|
)
|
||||||
|
END as growth_rate,
|
||||||
|
mc.avg_margin
|
||||||
|
FROM current_period cp
|
||||||
|
FULL OUTER JOIN previous_period pp ON cp.cat_id = pp.cat_id
|
||||||
|
LEFT JOIN trend_analysis ta ON COALESCE(cp.cat_id, pp.cat_id) = ta.cat_id
|
||||||
|
LEFT JOIN margin_calc mc ON COALESCE(cp.cat_id, pp.cat_id) = mc.cat_id
|
||||||
)
|
)
|
||||||
UPDATE category_metrics cm
|
UPDATE category_metrics cm
|
||||||
LEFT JOIN current_period cp ON cm.category_id = cp.cat_id
|
|
||||||
LEFT JOIN previous_period pp ON cm.category_id = pp.cat_id
|
|
||||||
LEFT JOIN trend_analysis ta ON cm.category_id = ta.cat_id
|
|
||||||
LEFT JOIN margin_calc mc ON cm.category_id = mc.cat_id
|
|
||||||
SET
|
SET
|
||||||
cm.growth_rate = CASE
|
growth_rate = CASE
|
||||||
WHEN pp.revenue = 0 AND COALESCE(cp.revenue, 0) > 0 THEN 100.0
|
WHEN pp.revenue = 0 AND COALESCE(cp.revenue, 0) > 0 THEN 100.0
|
||||||
WHEN pp.revenue = 0 OR cp.revenue IS NULL THEN 0.0
|
WHEN pp.revenue = 0 OR cp.revenue IS NULL THEN 0.0
|
||||||
WHEN ta.trend_slope IS NOT NULL THEN
|
WHEN ta.trend_slope IS NOT NULL THEN
|
||||||
@@ -291,9 +316,13 @@ async function calculateCategoryMetrics(startTime, totalProducts, processedCount
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
END,
|
END,
|
||||||
cm.avg_margin = COALESCE(mc.avg_margin, cm.avg_margin),
|
avg_margin = COALESCE(mc.avg_margin, cm.avg_margin),
|
||||||
cm.last_calculated_at = NOW()
|
last_calculated_at = NOW()
|
||||||
WHERE cp.cat_id IS NOT NULL OR pp.cat_id IS NOT NULL
|
FROM current_period cp
|
||||||
|
FULL OUTER JOIN previous_period pp ON cp.cat_id = pp.cat_id
|
||||||
|
LEFT JOIN trend_analysis ta ON COALESCE(cp.cat_id, pp.cat_id) = ta.cat_id
|
||||||
|
LEFT JOIN margin_calc mc ON COALESCE(cp.cat_id, pp.cat_id) = mc.cat_id
|
||||||
|
WHERE cm.category_id = COALESCE(cp.cat_id, pp.cat_id)
|
||||||
`);
|
`);
|
||||||
|
|
||||||
processedCount = Math.floor(totalProducts * 0.97);
|
processedCount = Math.floor(totalProducts * 0.97);
|
||||||
@@ -335,8 +364,8 @@ async function calculateCategoryMetrics(startTime, totalProducts, processedCount
|
|||||||
)
|
)
|
||||||
SELECT
|
SELECT
|
||||||
pc.cat_id,
|
pc.cat_id,
|
||||||
YEAR(o.date) as year,
|
EXTRACT(YEAR FROM o.date::timestamp with time zone) as year,
|
||||||
MONTH(o.date) as month,
|
EXTRACT(MONTH FROM o.date::timestamp with time zone) as month,
|
||||||
COUNT(DISTINCT p.pid) as product_count,
|
COUNT(DISTINCT p.pid) as product_count,
|
||||||
COUNT(DISTINCT CASE WHEN p.visible = true THEN p.pid END) as active_products,
|
COUNT(DISTINCT CASE WHEN p.visible = true THEN p.pid END) as active_products,
|
||||||
SUM(p.stock_quantity * p.cost_price) as total_value,
|
SUM(p.stock_quantity * p.cost_price) as total_value,
|
||||||
@@ -364,15 +393,16 @@ async function calculateCategoryMetrics(startTime, totalProducts, processedCount
|
|||||||
JOIN products p ON pc.pid = p.pid
|
JOIN products p ON pc.pid = p.pid
|
||||||
JOIN orders o ON p.pid = o.pid
|
JOIN orders o ON p.pid = o.pid
|
||||||
WHERE o.canceled = false
|
WHERE o.canceled = false
|
||||||
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH)
|
AND o.date >= CURRENT_DATE - INTERVAL '12 months'
|
||||||
GROUP BY pc.cat_id, YEAR(o.date), MONTH(o.date)
|
GROUP BY pc.cat_id, EXTRACT(YEAR FROM o.date::timestamp with time zone), EXTRACT(MONTH FROM o.date::timestamp with time zone)
|
||||||
ON DUPLICATE KEY UPDATE
|
ON CONFLICT (category_id, year, month) DO UPDATE
|
||||||
product_count = VALUES(product_count),
|
SET
|
||||||
active_products = VALUES(active_products),
|
product_count = EXCLUDED.product_count,
|
||||||
total_value = VALUES(total_value),
|
active_products = EXCLUDED.active_products,
|
||||||
total_revenue = VALUES(total_revenue),
|
total_value = EXCLUDED.total_value,
|
||||||
avg_margin = VALUES(avg_margin),
|
total_revenue = EXCLUDED.total_revenue,
|
||||||
turnover_rate = VALUES(turnover_rate)
|
avg_margin = EXCLUDED.avg_margin,
|
||||||
|
turnover_rate = EXCLUDED.turnover_rate
|
||||||
`);
|
`);
|
||||||
|
|
||||||
processedCount = Math.floor(totalProducts * 0.99);
|
processedCount = Math.floor(totalProducts * 0.99);
|
||||||
@@ -414,20 +444,20 @@ async function calculateCategoryMetrics(startTime, totalProducts, processedCount
|
|||||||
)
|
)
|
||||||
WITH date_ranges AS (
|
WITH date_ranges AS (
|
||||||
SELECT
|
SELECT
|
||||||
DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY) as period_start,
|
CURRENT_DATE - INTERVAL '30 days' as period_start,
|
||||||
CURRENT_DATE as period_end
|
CURRENT_DATE as period_end
|
||||||
UNION ALL
|
UNION ALL
|
||||||
SELECT
|
SELECT
|
||||||
DATE_SUB(CURRENT_DATE, INTERVAL 90 DAY),
|
CURRENT_DATE - INTERVAL '90 days',
|
||||||
DATE_SUB(CURRENT_DATE, INTERVAL 31 DAY)
|
CURRENT_DATE - INTERVAL '31 days'
|
||||||
UNION ALL
|
UNION ALL
|
||||||
SELECT
|
SELECT
|
||||||
DATE_SUB(CURRENT_DATE, INTERVAL 180 DAY),
|
CURRENT_DATE - INTERVAL '180 days',
|
||||||
DATE_SUB(CURRENT_DATE, INTERVAL 91 DAY)
|
CURRENT_DATE - INTERVAL '91 days'
|
||||||
UNION ALL
|
UNION ALL
|
||||||
SELECT
|
SELECT
|
||||||
DATE_SUB(CURRENT_DATE, INTERVAL 365 DAY),
|
CURRENT_DATE - INTERVAL '365 days',
|
||||||
DATE_SUB(CURRENT_DATE, INTERVAL 181 DAY)
|
CURRENT_DATE - INTERVAL '181 days'
|
||||||
),
|
),
|
||||||
sales_data AS (
|
sales_data AS (
|
||||||
SELECT
|
SELECT
|
||||||
@@ -466,12 +496,13 @@ async function calculateCategoryMetrics(startTime, totalProducts, processedCount
|
|||||||
END as avg_price,
|
END as avg_price,
|
||||||
NOW() as last_calculated_at
|
NOW() as last_calculated_at
|
||||||
FROM sales_data
|
FROM sales_data
|
||||||
ON DUPLICATE KEY UPDATE
|
ON CONFLICT (category_id, brand, period_start, period_end) DO UPDATE
|
||||||
avg_daily_sales = VALUES(avg_daily_sales),
|
SET
|
||||||
total_sold = VALUES(total_sold),
|
avg_daily_sales = EXCLUDED.avg_daily_sales,
|
||||||
num_products = VALUES(num_products),
|
total_sold = EXCLUDED.total_sold,
|
||||||
avg_price = VALUES(avg_price),
|
num_products = EXCLUDED.num_products,
|
||||||
last_calculated_at = VALUES(last_calculated_at)
|
avg_price = EXCLUDED.avg_price,
|
||||||
|
last_calculated_at = EXCLUDED.last_calculated_at
|
||||||
`);
|
`);
|
||||||
|
|
||||||
processedCount = Math.floor(totalProducts * 1.0);
|
processedCount = Math.floor(totalProducts * 1.0);
|
||||||
@@ -498,7 +529,8 @@ async function calculateCategoryMetrics(startTime, totalProducts, processedCount
|
|||||||
await connection.query(`
|
await connection.query(`
|
||||||
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
|
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
|
||||||
VALUES ('category_metrics', NOW())
|
VALUES ('category_metrics', NOW())
|
||||||
ON DUPLICATE KEY UPDATE last_calculation_timestamp = NOW()
|
ON CONFLICT (module_name) DO UPDATE
|
||||||
|
SET last_calculation_timestamp = NOW()
|
||||||
`);
|
`);
|
||||||
|
|
||||||
return {
|
return {
|
||||||
|
|||||||
@@ -32,13 +32,13 @@ async function calculateFinancialMetrics(startTime, totalProducts, processedCoun
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Get order count that will be processed
|
// Get order count that will be processed
|
||||||
const [orderCount] = await connection.query(`
|
const orderCount = await connection.query(`
|
||||||
SELECT COUNT(*) as count
|
SELECT COUNT(*) as count
|
||||||
FROM orders o
|
FROM orders o
|
||||||
WHERE o.canceled = false
|
WHERE o.canceled = false
|
||||||
AND DATE(o.date) >= DATE_SUB(CURDATE(), INTERVAL 12 MONTH)
|
AND DATE(o.date) >= CURRENT_DATE - INTERVAL '12 months'
|
||||||
`);
|
`);
|
||||||
processedOrders = orderCount[0].count;
|
processedOrders = parseInt(orderCount.rows[0].count);
|
||||||
|
|
||||||
outputProgress({
|
outputProgress({
|
||||||
status: 'running',
|
status: 'running',
|
||||||
@@ -67,27 +67,28 @@ async function calculateFinancialMetrics(startTime, totalProducts, processedCoun
|
|||||||
SUM(o.quantity * (o.price - p.cost_price)) as gross_profit,
|
SUM(o.quantity * (o.price - p.cost_price)) as gross_profit,
|
||||||
MIN(o.date) as first_sale_date,
|
MIN(o.date) as first_sale_date,
|
||||||
MAX(o.date) as last_sale_date,
|
MAX(o.date) as last_sale_date,
|
||||||
DATEDIFF(MAX(o.date), MIN(o.date)) + 1 as calculation_period_days,
|
EXTRACT(DAY FROM (MAX(o.date)::timestamp with time zone - MIN(o.date)::timestamp with time zone)) + 1 as calculation_period_days,
|
||||||
COUNT(DISTINCT DATE(o.date)) as active_days
|
COUNT(DISTINCT DATE(o.date)) as active_days
|
||||||
FROM products p
|
FROM products p
|
||||||
LEFT JOIN orders o ON p.pid = o.pid
|
LEFT JOIN orders o ON p.pid = o.pid
|
||||||
WHERE o.canceled = false
|
WHERE o.canceled = false
|
||||||
AND DATE(o.date) >= DATE_SUB(CURDATE(), INTERVAL 12 MONTH)
|
AND DATE(o.date) >= CURRENT_DATE - INTERVAL '12 months'
|
||||||
GROUP BY p.pid
|
GROUP BY p.pid, p.cost_price, p.stock_quantity
|
||||||
)
|
)
|
||||||
UPDATE product_metrics pm
|
UPDATE product_metrics pm
|
||||||
JOIN product_financials pf ON pm.pid = pf.pid
|
|
||||||
SET
|
SET
|
||||||
pm.inventory_value = COALESCE(pf.inventory_value, 0),
|
inventory_value = COALESCE(pf.inventory_value, 0),
|
||||||
pm.total_revenue = COALESCE(pf.total_revenue, 0),
|
total_revenue = COALESCE(pf.total_revenue, 0),
|
||||||
pm.cost_of_goods_sold = COALESCE(pf.cost_of_goods_sold, 0),
|
cost_of_goods_sold = COALESCE(pf.cost_of_goods_sold, 0),
|
||||||
pm.gross_profit = COALESCE(pf.gross_profit, 0),
|
gross_profit = COALESCE(pf.gross_profit, 0),
|
||||||
pm.gmroi = CASE
|
gmroi = CASE
|
||||||
WHEN COALESCE(pf.inventory_value, 0) > 0 AND pf.active_days > 0 THEN
|
WHEN COALESCE(pf.inventory_value, 0) > 0 AND pf.active_days > 0 THEN
|
||||||
(COALESCE(pf.gross_profit, 0) * (365.0 / pf.active_days)) / COALESCE(pf.inventory_value, 0)
|
(COALESCE(pf.gross_profit, 0) * (365.0 / pf.active_days)) / COALESCE(pf.inventory_value, 0)
|
||||||
ELSE 0
|
ELSE 0
|
||||||
END,
|
END,
|
||||||
pm.last_calculated_at = CURRENT_TIMESTAMP
|
last_calculated_at = CURRENT_TIMESTAMP
|
||||||
|
FROM product_financials pf
|
||||||
|
WHERE pm.pid = pf.pid
|
||||||
`);
|
`);
|
||||||
|
|
||||||
processedCount = Math.floor(totalProducts * 0.65);
|
processedCount = Math.floor(totalProducts * 0.65);
|
||||||
@@ -119,8 +120,8 @@ async function calculateFinancialMetrics(startTime, totalProducts, processedCoun
|
|||||||
WITH monthly_financials AS (
|
WITH monthly_financials AS (
|
||||||
SELECT
|
SELECT
|
||||||
p.pid,
|
p.pid,
|
||||||
YEAR(o.date) as year,
|
EXTRACT(YEAR FROM o.date::timestamp with time zone) as year,
|
||||||
MONTH(o.date) as month,
|
EXTRACT(MONTH FROM o.date::timestamp with time zone) as month,
|
||||||
p.cost_price * p.stock_quantity as inventory_value,
|
p.cost_price * p.stock_quantity as inventory_value,
|
||||||
SUM(o.quantity * (o.price - p.cost_price)) as gross_profit,
|
SUM(o.quantity * (o.price - p.cost_price)) as gross_profit,
|
||||||
COUNT(DISTINCT DATE(o.date)) as active_days,
|
COUNT(DISTINCT DATE(o.date)) as active_days,
|
||||||
@@ -129,19 +130,20 @@ async function calculateFinancialMetrics(startTime, totalProducts, processedCoun
|
|||||||
FROM products p
|
FROM products p
|
||||||
LEFT JOIN orders o ON p.pid = o.pid
|
LEFT JOIN orders o ON p.pid = o.pid
|
||||||
WHERE o.canceled = false
|
WHERE o.canceled = false
|
||||||
GROUP BY p.pid, YEAR(o.date), MONTH(o.date)
|
GROUP BY p.pid, EXTRACT(YEAR FROM o.date::timestamp with time zone), EXTRACT(MONTH FROM o.date::timestamp with time zone), p.cost_price, p.stock_quantity
|
||||||
)
|
)
|
||||||
UPDATE product_time_aggregates pta
|
UPDATE product_time_aggregates pta
|
||||||
JOIN monthly_financials mf ON pta.pid = mf.pid
|
|
||||||
AND pta.year = mf.year
|
|
||||||
AND pta.month = mf.month
|
|
||||||
SET
|
SET
|
||||||
pta.inventory_value = COALESCE(mf.inventory_value, 0),
|
inventory_value = COALESCE(mf.inventory_value, 0),
|
||||||
pta.gmroi = CASE
|
gmroi = CASE
|
||||||
WHEN COALESCE(mf.inventory_value, 0) > 0 AND mf.active_days > 0 THEN
|
WHEN COALESCE(mf.inventory_value, 0) > 0 AND mf.active_days > 0 THEN
|
||||||
(COALESCE(mf.gross_profit, 0) * (365.0 / mf.active_days)) / COALESCE(mf.inventory_value, 0)
|
(COALESCE(mf.gross_profit, 0) * (365.0 / mf.active_days)) / COALESCE(mf.inventory_value, 0)
|
||||||
ELSE 0
|
ELSE 0
|
||||||
END
|
END
|
||||||
|
FROM monthly_financials mf
|
||||||
|
WHERE pta.pid = mf.pid
|
||||||
|
AND pta.year = mf.year
|
||||||
|
AND pta.month = mf.month
|
||||||
`);
|
`);
|
||||||
|
|
||||||
processedCount = Math.floor(totalProducts * 0.70);
|
processedCount = Math.floor(totalProducts * 0.70);
|
||||||
@@ -168,7 +170,8 @@ async function calculateFinancialMetrics(startTime, totalProducts, processedCoun
|
|||||||
await connection.query(`
|
await connection.query(`
|
||||||
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
|
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
|
||||||
VALUES ('financial_metrics', NOW())
|
VALUES ('financial_metrics', NOW())
|
||||||
ON DUPLICATE KEY UPDATE last_calculation_timestamp = NOW()
|
ON CONFLICT (module_name) DO UPDATE
|
||||||
|
SET last_calculation_timestamp = NOW()
|
||||||
`);
|
`);
|
||||||
|
|
||||||
return {
|
return {
|
||||||
|
|||||||
@@ -10,20 +10,21 @@ function sanitizeValue(value) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async function calculateProductMetrics(startTime, totalProducts, processedCount = 0, isCancelled = false) {
|
async function calculateProductMetrics(startTime, totalProducts, processedCount = 0, isCancelled = false) {
|
||||||
const connection = await getConnection();
|
let connection;
|
||||||
let success = false;
|
let success = false;
|
||||||
let processedOrders = 0;
|
let processedOrders = 0;
|
||||||
const BATCH_SIZE = 5000;
|
const BATCH_SIZE = 5000;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
connection = await getConnection();
|
||||||
// Skip flags are inherited from the parent scope
|
// Skip flags are inherited from the parent scope
|
||||||
const SKIP_PRODUCT_BASE_METRICS = 0;
|
const SKIP_PRODUCT_BASE_METRICS = 0;
|
||||||
const SKIP_PRODUCT_TIME_AGGREGATES = 0;
|
const SKIP_PRODUCT_TIME_AGGREGATES = 0;
|
||||||
|
|
||||||
// Get total product count if not provided
|
// Get total product count if not provided
|
||||||
if (!totalProducts) {
|
if (!totalProducts) {
|
||||||
const [productCount] = await connection.query('SELECT COUNT(*) as count FROM products');
|
const productCount = await connection.query('SELECT COUNT(*) as count FROM products');
|
||||||
totalProducts = productCount[0].count;
|
totalProducts = parseInt(productCount.rows[0].count);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isCancelled) {
|
if (isCancelled) {
|
||||||
@@ -52,19 +53,20 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
|
|||||||
|
|
||||||
// First ensure all products have a metrics record
|
// First ensure all products have a metrics record
|
||||||
await connection.query(`
|
await connection.query(`
|
||||||
INSERT IGNORE INTO product_metrics (pid, last_calculated_at)
|
INSERT INTO product_metrics (pid, last_calculated_at)
|
||||||
SELECT pid, NOW()
|
SELECT pid, NOW()
|
||||||
FROM products
|
FROM products
|
||||||
|
ON CONFLICT (pid) DO NOTHING
|
||||||
`);
|
`);
|
||||||
|
|
||||||
// Get threshold settings once
|
// Get threshold settings once
|
||||||
const [thresholds] = await connection.query(`
|
const thresholds = await connection.query(`
|
||||||
SELECT critical_days, reorder_days, overstock_days, low_stock_threshold
|
SELECT critical_days, reorder_days, overstock_days, low_stock_threshold
|
||||||
FROM stock_thresholds
|
FROM stock_thresholds
|
||||||
WHERE category_id IS NULL AND vendor IS NULL
|
WHERE category_id IS NULL AND vendor IS NULL
|
||||||
LIMIT 1
|
LIMIT 1
|
||||||
`);
|
`);
|
||||||
const defaultThresholds = thresholds[0];
|
const defaultThresholds = thresholds.rows[0];
|
||||||
|
|
||||||
// Calculate base product metrics
|
// Calculate base product metrics
|
||||||
if (!SKIP_PRODUCT_BASE_METRICS) {
|
if (!SKIP_PRODUCT_BASE_METRICS) {
|
||||||
@@ -85,16 +87,43 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
|
|||||||
});
|
});
|
||||||
|
|
||||||
// Get order count that will be processed
|
// Get order count that will be processed
|
||||||
const [orderCount] = await connection.query(`
|
const orderCount = await connection.query(`
|
||||||
SELECT COUNT(*) as count
|
SELECT COUNT(*) as count
|
||||||
FROM orders o
|
FROM orders o
|
||||||
WHERE o.canceled = false
|
WHERE o.canceled = false
|
||||||
`);
|
`);
|
||||||
processedOrders = orderCount[0].count;
|
processedOrders = parseInt(orderCount.rows[0].count);
|
||||||
|
|
||||||
// Clear temporary tables
|
// Clear temporary tables
|
||||||
await connection.query('TRUNCATE TABLE temp_sales_metrics');
|
await connection.query('DROP TABLE IF EXISTS temp_sales_metrics');
|
||||||
await connection.query('TRUNCATE TABLE temp_purchase_metrics');
|
await connection.query('DROP TABLE IF EXISTS temp_purchase_metrics');
|
||||||
|
|
||||||
|
// Create temp_sales_metrics
|
||||||
|
await connection.query(`
|
||||||
|
CREATE TEMPORARY TABLE temp_sales_metrics (
|
||||||
|
pid BIGINT NOT NULL,
|
||||||
|
daily_sales_avg DECIMAL(10,3),
|
||||||
|
weekly_sales_avg DECIMAL(10,3),
|
||||||
|
monthly_sales_avg DECIMAL(10,3),
|
||||||
|
total_revenue DECIMAL(10,3),
|
||||||
|
avg_margin_percent DECIMAL(10,3),
|
||||||
|
first_sale_date DATE,
|
||||||
|
last_sale_date DATE,
|
||||||
|
PRIMARY KEY (pid)
|
||||||
|
)
|
||||||
|
`);
|
||||||
|
|
||||||
|
// Create temp_purchase_metrics
|
||||||
|
await connection.query(`
|
||||||
|
CREATE TEMPORARY TABLE temp_purchase_metrics (
|
||||||
|
pid BIGINT NOT NULL,
|
||||||
|
avg_lead_time_days DOUBLE PRECISION,
|
||||||
|
last_purchase_date DATE,
|
||||||
|
first_received_date DATE,
|
||||||
|
last_received_date DATE,
|
||||||
|
PRIMARY KEY (pid)
|
||||||
|
)
|
||||||
|
`);
|
||||||
|
|
||||||
// Populate temp_sales_metrics with base stats and sales averages
|
// Populate temp_sales_metrics with base stats and sales averages
|
||||||
await connection.query(`
|
await connection.query(`
|
||||||
@@ -115,98 +144,131 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
|
|||||||
FROM products p
|
FROM products p
|
||||||
LEFT JOIN orders o ON p.pid = o.pid
|
LEFT JOIN orders o ON p.pid = o.pid
|
||||||
AND o.canceled = false
|
AND o.canceled = false
|
||||||
AND o.date >= DATE_SUB(CURDATE(), INTERVAL 90 DAY)
|
AND o.date >= CURRENT_DATE - INTERVAL '90 days'
|
||||||
GROUP BY p.pid
|
GROUP BY p.pid
|
||||||
`);
|
`);
|
||||||
|
|
||||||
// Populate temp_purchase_metrics
|
// Populate temp_purchase_metrics with timeout protection
|
||||||
await connection.query(`
|
await Promise.race([
|
||||||
INSERT INTO temp_purchase_metrics
|
connection.query(`
|
||||||
SELECT
|
INSERT INTO temp_purchase_metrics
|
||||||
p.pid,
|
SELECT
|
||||||
AVG(DATEDIFF(po.received_date, po.date)) as avg_lead_time_days,
|
p.pid,
|
||||||
MAX(po.date) as last_purchase_date,
|
AVG(
|
||||||
MIN(po.received_date) as first_received_date,
|
CASE
|
||||||
MAX(po.received_date) as last_received_date
|
WHEN po.received_date IS NOT NULL AND po.date IS NOT NULL
|
||||||
FROM products p
|
THEN EXTRACT(EPOCH FROM (po.received_date::timestamp with time zone - po.date::timestamp with time zone)) / 86400.0
|
||||||
LEFT JOIN purchase_orders po ON p.pid = po.pid
|
ELSE NULL
|
||||||
AND po.received_date IS NOT NULL
|
END
|
||||||
AND po.date >= DATE_SUB(CURDATE(), INTERVAL 365 DAY)
|
) as avg_lead_time_days,
|
||||||
GROUP BY p.pid
|
MAX(po.date) as last_purchase_date,
|
||||||
`);
|
MIN(po.received_date) as first_received_date,
|
||||||
|
MAX(po.received_date) as last_received_date
|
||||||
|
FROM products p
|
||||||
|
LEFT JOIN purchase_orders po ON p.pid = po.pid
|
||||||
|
AND po.received_date IS NOT NULL
|
||||||
|
AND po.date IS NOT NULL
|
||||||
|
AND po.date >= CURRENT_DATE - INTERVAL '365 days'
|
||||||
|
GROUP BY p.pid
|
||||||
|
`),
|
||||||
|
new Promise((_, reject) =>
|
||||||
|
setTimeout(() => reject(new Error('Timeout: temp_purchase_metrics query took too long')), 60000)
|
||||||
|
)
|
||||||
|
]).catch(async (err) => {
|
||||||
|
logError(err, 'Error populating temp_purchase_metrics, continuing with empty table');
|
||||||
|
// Create an empty fallback to continue processing
|
||||||
|
await connection.query(`
|
||||||
|
INSERT INTO temp_purchase_metrics
|
||||||
|
SELECT
|
||||||
|
p.pid,
|
||||||
|
30.0 as avg_lead_time_days,
|
||||||
|
NULL as last_purchase_date,
|
||||||
|
NULL as first_received_date,
|
||||||
|
NULL as last_received_date
|
||||||
|
FROM products p
|
||||||
|
LEFT JOIN temp_purchase_metrics tpm ON p.pid = tpm.pid
|
||||||
|
WHERE tpm.pid IS NULL
|
||||||
|
`);
|
||||||
|
});
|
||||||
|
|
||||||
// Process updates in batches
|
// Process updates in batches
|
||||||
let lastPid = 0;
|
let lastPid = 0;
|
||||||
while (true) {
|
let batchCount = 0;
|
||||||
|
const MAX_BATCHES = 1000; // Safety limit for number of batches to prevent infinite loops
|
||||||
|
|
||||||
|
while (batchCount < MAX_BATCHES) {
|
||||||
if (isCancelled) break;
|
if (isCancelled) break;
|
||||||
|
|
||||||
const [batch] = await connection.query(
|
batchCount++;
|
||||||
'SELECT pid FROM products WHERE pid > ? ORDER BY pid LIMIT ?',
|
const batch = await connection.query(
|
||||||
|
'SELECT pid FROM products WHERE pid > $1 ORDER BY pid LIMIT $2',
|
||||||
[lastPid, BATCH_SIZE]
|
[lastPid, BATCH_SIZE]
|
||||||
);
|
);
|
||||||
|
|
||||||
if (batch.length === 0) break;
|
if (batch.rows.length === 0) break;
|
||||||
|
|
||||||
|
// Process the entire batch in a single efficient query
|
||||||
await connection.query(`
|
await connection.query(`
|
||||||
UPDATE product_metrics pm
|
UPDATE product_metrics pm
|
||||||
JOIN products p ON pm.pid = p.pid
|
|
||||||
LEFT JOIN temp_sales_metrics sm ON pm.pid = sm.pid
|
|
||||||
LEFT JOIN temp_purchase_metrics lm ON pm.pid = lm.pid
|
|
||||||
SET
|
SET
|
||||||
pm.inventory_value = p.stock_quantity * NULLIF(p.cost_price, 0),
|
inventory_value = p.stock_quantity * NULLIF(p.cost_price, 0),
|
||||||
pm.daily_sales_avg = COALESCE(sm.daily_sales_avg, 0),
|
daily_sales_avg = COALESCE(sm.daily_sales_avg, 0),
|
||||||
pm.weekly_sales_avg = COALESCE(sm.weekly_sales_avg, 0),
|
weekly_sales_avg = COALESCE(sm.weekly_sales_avg, 0),
|
||||||
pm.monthly_sales_avg = COALESCE(sm.monthly_sales_avg, 0),
|
monthly_sales_avg = COALESCE(sm.monthly_sales_avg, 0),
|
||||||
pm.total_revenue = COALESCE(sm.total_revenue, 0),
|
total_revenue = COALESCE(sm.total_revenue, 0),
|
||||||
pm.avg_margin_percent = COALESCE(sm.avg_margin_percent, 0),
|
avg_margin_percent = COALESCE(sm.avg_margin_percent, 0),
|
||||||
pm.first_sale_date = sm.first_sale_date,
|
first_sale_date = sm.first_sale_date,
|
||||||
pm.last_sale_date = sm.last_sale_date,
|
last_sale_date = sm.last_sale_date,
|
||||||
pm.avg_lead_time_days = COALESCE(lm.avg_lead_time_days, 30),
|
avg_lead_time_days = COALESCE(lm.avg_lead_time_days, 30),
|
||||||
pm.days_of_inventory = CASE
|
days_of_inventory = CASE
|
||||||
WHEN COALESCE(sm.daily_sales_avg, 0) > 0
|
WHEN COALESCE(sm.daily_sales_avg, 0) > 0
|
||||||
THEN FLOOR(p.stock_quantity / NULLIF(sm.daily_sales_avg, 0))
|
THEN FLOOR(p.stock_quantity / NULLIF(sm.daily_sales_avg, 0))
|
||||||
ELSE NULL
|
ELSE NULL
|
||||||
END,
|
END,
|
||||||
pm.weeks_of_inventory = CASE
|
weeks_of_inventory = CASE
|
||||||
WHEN COALESCE(sm.weekly_sales_avg, 0) > 0
|
WHEN COALESCE(sm.weekly_sales_avg, 0) > 0
|
||||||
THEN FLOOR(p.stock_quantity / NULLIF(sm.weekly_sales_avg, 0))
|
THEN FLOOR(p.stock_quantity / NULLIF(sm.weekly_sales_avg, 0))
|
||||||
ELSE NULL
|
ELSE NULL
|
||||||
END,
|
END,
|
||||||
pm.stock_status = CASE
|
stock_status = CASE
|
||||||
WHEN p.stock_quantity <= 0 THEN 'Out of Stock'
|
WHEN p.stock_quantity <= 0 THEN 'Out of Stock'
|
||||||
WHEN COALESCE(sm.daily_sales_avg, 0) = 0 AND p.stock_quantity <= ? THEN 'Low Stock'
|
WHEN COALESCE(sm.daily_sales_avg, 0) = 0 AND p.stock_quantity <= $1 THEN 'Low Stock'
|
||||||
WHEN COALESCE(sm.daily_sales_avg, 0) = 0 THEN 'In Stock'
|
WHEN COALESCE(sm.daily_sales_avg, 0) = 0 THEN 'In Stock'
|
||||||
WHEN p.stock_quantity / NULLIF(sm.daily_sales_avg, 0) <= ? THEN 'Critical'
|
WHEN p.stock_quantity / NULLIF(sm.daily_sales_avg, 0) <= $2 THEN 'Critical'
|
||||||
WHEN p.stock_quantity / NULLIF(sm.daily_sales_avg, 0) <= ? THEN 'Reorder'
|
WHEN p.stock_quantity / NULLIF(sm.daily_sales_avg, 0) <= $3 THEN 'Reorder'
|
||||||
WHEN p.stock_quantity / NULLIF(sm.daily_sales_avg, 0) > ? THEN 'Overstocked'
|
WHEN p.stock_quantity / NULLIF(sm.daily_sales_avg, 0) > $4 THEN 'Overstocked'
|
||||||
ELSE 'Healthy'
|
ELSE 'Healthy'
|
||||||
END,
|
END,
|
||||||
pm.safety_stock = CASE
|
safety_stock = CASE
|
||||||
WHEN COALESCE(sm.daily_sales_avg, 0) > 0 THEN
|
WHEN COALESCE(sm.daily_sales_avg, 0) > 0 THEN
|
||||||
CEIL(sm.daily_sales_avg * SQRT(COALESCE(lm.avg_lead_time_days, 30)) * 1.96)
|
CEIL(sm.daily_sales_avg * SQRT(ABS(COALESCE(lm.avg_lead_time_days, 30))) * 1.96)
|
||||||
ELSE ?
|
ELSE $5
|
||||||
END,
|
END,
|
||||||
pm.reorder_point = CASE
|
reorder_point = CASE
|
||||||
WHEN COALESCE(sm.daily_sales_avg, 0) > 0 THEN
|
WHEN COALESCE(sm.daily_sales_avg, 0) > 0 THEN
|
||||||
CEIL(sm.daily_sales_avg * COALESCE(lm.avg_lead_time_days, 30)) +
|
CEIL(sm.daily_sales_avg * COALESCE(lm.avg_lead_time_days, 30)) +
|
||||||
CEIL(sm.daily_sales_avg * SQRT(COALESCE(lm.avg_lead_time_days, 30)) * 1.96)
|
CEIL(sm.daily_sales_avg * SQRT(ABS(COALESCE(lm.avg_lead_time_days, 30))) * 1.96)
|
||||||
ELSE ?
|
ELSE $6
|
||||||
END,
|
END,
|
||||||
pm.reorder_qty = CASE
|
reorder_qty = CASE
|
||||||
WHEN COALESCE(sm.daily_sales_avg, 0) > 0 AND NULLIF(p.cost_price, 0) IS NOT NULL THEN
|
WHEN COALESCE(sm.daily_sales_avg, 0) > 0 AND NULLIF(p.cost_price, 0) IS NOT NULL AND NULLIF(p.cost_price, 0) > 0 THEN
|
||||||
GREATEST(
|
GREATEST(
|
||||||
CEIL(SQRT((2 * (sm.daily_sales_avg * 365) * 25) / (NULLIF(p.cost_price, 0) * 0.25))),
|
CEIL(SQRT(ABS((2 * (sm.daily_sales_avg * 365) * 25) / (NULLIF(p.cost_price, 0) * 0.25)))),
|
||||||
?
|
$7
|
||||||
)
|
)
|
||||||
ELSE ?
|
ELSE $8
|
||||||
END,
|
END,
|
||||||
pm.overstocked_amt = CASE
|
overstocked_amt = CASE
|
||||||
WHEN p.stock_quantity / NULLIF(sm.daily_sales_avg, 0) > ?
|
WHEN p.stock_quantity / NULLIF(sm.daily_sales_avg, 0) > $9
|
||||||
THEN GREATEST(0, p.stock_quantity - CEIL(sm.daily_sales_avg * ?))
|
THEN GREATEST(0, p.stock_quantity - CEIL(sm.daily_sales_avg * $10))
|
||||||
ELSE 0
|
ELSE 0
|
||||||
END,
|
END,
|
||||||
pm.last_calculated_at = NOW()
|
last_calculated_at = NOW()
|
||||||
WHERE p.pid IN (${batch.map(() => '?').join(',')})
|
FROM products p
|
||||||
|
LEFT JOIN temp_sales_metrics sm ON p.pid = sm.pid
|
||||||
|
LEFT JOIN temp_purchase_metrics lm ON p.pid = lm.pid
|
||||||
|
WHERE p.pid = ANY($11::bigint[])
|
||||||
|
AND pm.pid = p.pid
|
||||||
`,
|
`,
|
||||||
[
|
[
|
||||||
defaultThresholds.low_stock_threshold,
|
defaultThresholds.low_stock_threshold,
|
||||||
@@ -219,12 +281,11 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
|
|||||||
defaultThresholds.low_stock_threshold,
|
defaultThresholds.low_stock_threshold,
|
||||||
defaultThresholds.overstock_days,
|
defaultThresholds.overstock_days,
|
||||||
defaultThresholds.overstock_days,
|
defaultThresholds.overstock_days,
|
||||||
...batch.map(row => row.pid)
|
batch.rows.map(row => row.pid)
|
||||||
]
|
]);
|
||||||
);
|
|
||||||
|
|
||||||
lastPid = batch[batch.length - 1].pid;
|
lastPid = batch.rows[batch.rows.length - 1].pid;
|
||||||
processedCount += batch.length;
|
processedCount += batch.rows.length;
|
||||||
|
|
||||||
outputProgress({
|
outputProgress({
|
||||||
status: 'running',
|
status: 'running',
|
||||||
@@ -243,54 +304,59 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// Calculate forecast accuracy and bias in batches
|
// Add safety check if the loop processed MAX_BATCHES
|
||||||
lastPid = 0;
|
if (batchCount >= MAX_BATCHES) {
|
||||||
while (true) {
|
logError(new Error(`Reached maximum batch count (${MAX_BATCHES}). Process may have entered an infinite loop.`), 'Batch processing safety limit reached');
|
||||||
if (isCancelled) break;
|
|
||||||
|
|
||||||
const [batch] = await connection.query(
|
|
||||||
'SELECT pid FROM products WHERE pid > ? ORDER BY pid LIMIT ?',
|
|
||||||
[lastPid, BATCH_SIZE]
|
|
||||||
);
|
|
||||||
|
|
||||||
if (batch.length === 0) break;
|
|
||||||
|
|
||||||
await connection.query(`
|
|
||||||
UPDATE product_metrics pm
|
|
||||||
JOIN (
|
|
||||||
SELECT
|
|
||||||
sf.pid,
|
|
||||||
AVG(CASE
|
|
||||||
WHEN o.quantity > 0
|
|
||||||
THEN ABS(sf.forecast_units - o.quantity) / o.quantity * 100
|
|
||||||
ELSE 100
|
|
||||||
END) as avg_forecast_error,
|
|
||||||
AVG(CASE
|
|
||||||
WHEN o.quantity > 0
|
|
||||||
THEN (sf.forecast_units - o.quantity) / o.quantity * 100
|
|
||||||
ELSE 0
|
|
||||||
END) as avg_forecast_bias,
|
|
||||||
MAX(sf.forecast_date) as last_forecast_date
|
|
||||||
FROM sales_forecasts sf
|
|
||||||
JOIN orders o ON sf.pid = o.pid
|
|
||||||
AND DATE(o.date) = sf.forecast_date
|
|
||||||
WHERE o.canceled = false
|
|
||||||
AND sf.forecast_date >= DATE_SUB(CURRENT_DATE, INTERVAL 90 DAY)
|
|
||||||
AND sf.pid IN (?)
|
|
||||||
GROUP BY sf.pid
|
|
||||||
) fa ON pm.pid = fa.pid
|
|
||||||
SET
|
|
||||||
pm.forecast_accuracy = GREATEST(0, 100 - LEAST(fa.avg_forecast_error, 100)),
|
|
||||||
pm.forecast_bias = GREATEST(-100, LEAST(fa.avg_forecast_bias, 100)),
|
|
||||||
pm.last_forecast_date = fa.last_forecast_date,
|
|
||||||
pm.last_calculated_at = NOW()
|
|
||||||
WHERE pm.pid IN (?)
|
|
||||||
`, [batch.map(row => row.pid), batch.map(row => row.pid)]);
|
|
||||||
|
|
||||||
lastPid = batch[batch.length - 1].pid;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Calculate forecast accuracy and bias in batches
|
||||||
|
lastPid = 0;
|
||||||
|
while (true) {
|
||||||
|
if (isCancelled) break;
|
||||||
|
|
||||||
|
const batch = await connection.query(
|
||||||
|
'SELECT pid FROM products WHERE pid > $1 ORDER BY pid LIMIT $2',
|
||||||
|
[lastPid, BATCH_SIZE]
|
||||||
|
);
|
||||||
|
|
||||||
|
if (batch.rows.length === 0) break;
|
||||||
|
|
||||||
|
await connection.query(`
|
||||||
|
UPDATE product_metrics pm
|
||||||
|
SET
|
||||||
|
forecast_accuracy = GREATEST(0, 100 - LEAST(fa.avg_forecast_error, 100)),
|
||||||
|
forecast_bias = GREATEST(-100, LEAST(fa.avg_forecast_bias, 100)),
|
||||||
|
last_forecast_date = fa.last_forecast_date,
|
||||||
|
last_calculated_at = NOW()
|
||||||
|
FROM (
|
||||||
|
SELECT
|
||||||
|
sf.pid,
|
||||||
|
AVG(CASE
|
||||||
|
WHEN o.quantity > 0
|
||||||
|
THEN ABS(sf.forecast_quantity - o.quantity) / o.quantity * 100
|
||||||
|
ELSE 100
|
||||||
|
END) as avg_forecast_error,
|
||||||
|
AVG(CASE
|
||||||
|
WHEN o.quantity > 0
|
||||||
|
THEN (sf.forecast_quantity - o.quantity) / o.quantity * 100
|
||||||
|
ELSE 0
|
||||||
|
END) as avg_forecast_bias,
|
||||||
|
MAX(sf.forecast_date) as last_forecast_date
|
||||||
|
FROM sales_forecasts sf
|
||||||
|
JOIN orders o ON sf.pid = o.pid
|
||||||
|
AND DATE(o.date) = sf.forecast_date
|
||||||
|
WHERE o.canceled = false
|
||||||
|
AND sf.forecast_date >= CURRENT_DATE - INTERVAL '90 days'
|
||||||
|
AND sf.pid = ANY($1::bigint[])
|
||||||
|
GROUP BY sf.pid
|
||||||
|
) fa
|
||||||
|
WHERE pm.pid = fa.pid
|
||||||
|
`, [batch.rows.map(row => row.pid)]);
|
||||||
|
|
||||||
|
lastPid = batch.rows[batch.rows.length - 1].pid;
|
||||||
|
}
|
||||||
|
|
||||||
// Calculate product time aggregates
|
// Calculate product time aggregates
|
||||||
if (!SKIP_PRODUCT_TIME_AGGREGATES) {
|
if (!SKIP_PRODUCT_TIME_AGGREGATES) {
|
||||||
outputProgress({
|
outputProgress({
|
||||||
@@ -326,11 +392,11 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
|
|||||||
)
|
)
|
||||||
SELECT
|
SELECT
|
||||||
p.pid,
|
p.pid,
|
||||||
YEAR(o.date) as year,
|
EXTRACT(YEAR FROM o.date::timestamp with time zone) as year,
|
||||||
MONTH(o.date) as month,
|
EXTRACT(MONTH FROM o.date::timestamp with time zone) as month,
|
||||||
SUM(o.quantity) as total_quantity_sold,
|
SUM(o.quantity) as total_quantity_sold,
|
||||||
SUM(o.quantity * o.price) as total_revenue,
|
SUM(o.price * o.quantity) as total_revenue,
|
||||||
SUM(o.quantity * p.cost_price) as total_cost,
|
SUM(p.cost_price * o.quantity) as total_cost,
|
||||||
COUNT(DISTINCT o.order_number) as order_count,
|
COUNT(DISTINCT o.order_number) as order_count,
|
||||||
AVG(o.price) as avg_price,
|
AVG(o.price) as avg_price,
|
||||||
CASE
|
CASE
|
||||||
@@ -346,17 +412,18 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
|
|||||||
END as gmroi
|
END as gmroi
|
||||||
FROM products p
|
FROM products p
|
||||||
LEFT JOIN orders o ON p.pid = o.pid AND o.canceled = false
|
LEFT JOIN orders o ON p.pid = o.pid AND o.canceled = false
|
||||||
WHERE o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH)
|
WHERE o.date >= CURRENT_DATE - INTERVAL '12 months'
|
||||||
GROUP BY p.pid, YEAR(o.date), MONTH(o.date)
|
GROUP BY p.pid, EXTRACT(YEAR FROM o.date::timestamp with time zone), EXTRACT(MONTH FROM o.date::timestamp with time zone)
|
||||||
ON DUPLICATE KEY UPDATE
|
ON CONFLICT (pid, year, month) DO UPDATE
|
||||||
total_quantity_sold = VALUES(total_quantity_sold),
|
SET
|
||||||
total_revenue = VALUES(total_revenue),
|
total_quantity_sold = EXCLUDED.total_quantity_sold,
|
||||||
total_cost = VALUES(total_cost),
|
total_revenue = EXCLUDED.total_revenue,
|
||||||
order_count = VALUES(order_count),
|
total_cost = EXCLUDED.total_cost,
|
||||||
avg_price = VALUES(avg_price),
|
order_count = EXCLUDED.order_count,
|
||||||
profit_margin = VALUES(profit_margin),
|
avg_price = EXCLUDED.avg_price,
|
||||||
inventory_value = VALUES(inventory_value),
|
profit_margin = EXCLUDED.profit_margin,
|
||||||
gmroi = VALUES(gmroi)
|
inventory_value = EXCLUDED.inventory_value,
|
||||||
|
gmroi = EXCLUDED.gmroi
|
||||||
`);
|
`);
|
||||||
|
|
||||||
processedCount = Math.floor(totalProducts * 0.6);
|
processedCount = Math.floor(totalProducts * 0.6);
|
||||||
@@ -418,11 +485,11 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
|
|||||||
success
|
success
|
||||||
};
|
};
|
||||||
|
|
||||||
const [abcConfig] = await connection.query('SELECT a_threshold, b_threshold FROM abc_classification_config WHERE id = 1');
|
const abcConfig = await connection.query('SELECT a_threshold, b_threshold FROM abc_classification_config WHERE id = 1');
|
||||||
const abcThresholds = abcConfig[0] || { a_threshold: 20, b_threshold: 50 };
|
const abcThresholds = abcConfig.rows[0] || { a_threshold: 20, b_threshold: 50 };
|
||||||
|
|
||||||
// First, create and populate the rankings table with an index
|
// First, create and populate the rankings table with an index
|
||||||
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_revenue_ranks');
|
await connection.query('DROP TABLE IF EXISTS temp_revenue_ranks');
|
||||||
await connection.query(`
|
await connection.query(`
|
||||||
CREATE TEMPORARY TABLE temp_revenue_ranks (
|
CREATE TEMPORARY TABLE temp_revenue_ranks (
|
||||||
pid BIGINT NOT NULL,
|
pid BIGINT NOT NULL,
|
||||||
@@ -431,12 +498,12 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
|
|||||||
dense_rank_num INT,
|
dense_rank_num INT,
|
||||||
percentile DECIMAL(5,2),
|
percentile DECIMAL(5,2),
|
||||||
total_count INT,
|
total_count INT,
|
||||||
PRIMARY KEY (pid),
|
PRIMARY KEY (pid)
|
||||||
INDEX (rank_num),
|
)
|
||||||
INDEX (dense_rank_num),
|
|
||||||
INDEX (percentile)
|
|
||||||
) ENGINE=MEMORY
|
|
||||||
`);
|
`);
|
||||||
|
await connection.query('CREATE INDEX ON temp_revenue_ranks (rank_num)');
|
||||||
|
await connection.query('CREATE INDEX ON temp_revenue_ranks (dense_rank_num)');
|
||||||
|
await connection.query('CREATE INDEX ON temp_revenue_ranks (percentile)');
|
||||||
|
|
||||||
// Calculate rankings with proper tie handling
|
// Calculate rankings with proper tie handling
|
||||||
await connection.query(`
|
await connection.query(`
|
||||||
@@ -463,58 +530,74 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
|
|||||||
`);
|
`);
|
||||||
|
|
||||||
// Get total count for percentage calculation
|
// Get total count for percentage calculation
|
||||||
const [rankingCount] = await connection.query('SELECT MAX(rank_num) as total_count FROM temp_revenue_ranks');
|
const rankingCount = await connection.query('SELECT MAX(rank_num) as total_count FROM temp_revenue_ranks');
|
||||||
const totalCount = rankingCount[0].total_count || 1;
|
const totalCount = parseInt(rankingCount.rows[0].total_count) || 1;
|
||||||
const max_rank = totalCount;
|
|
||||||
|
|
||||||
// Process updates in batches
|
// Process updates in batches
|
||||||
let abcProcessedCount = 0;
|
let abcProcessedCount = 0;
|
||||||
const batchSize = 5000;
|
const batchSize = 5000;
|
||||||
|
const maxPid = await connection.query('SELECT MAX(pid) as max_pid FROM products');
|
||||||
|
const maxProductId = parseInt(maxPid.rows[0].max_pid);
|
||||||
|
|
||||||
while (true) {
|
while (abcProcessedCount < maxProductId) {
|
||||||
if (isCancelled) return {
|
if (isCancelled) return {
|
||||||
processedProducts: processedCount,
|
processedProducts: processedCount,
|
||||||
processedOrders,
|
processedOrders,
|
||||||
processedPurchaseOrders: 0, // This module doesn't process POs
|
processedPurchaseOrders: 0,
|
||||||
success
|
success
|
||||||
};
|
};
|
||||||
|
|
||||||
// Get a batch of PIDs that need updating
|
// Get a batch of PIDs that need updating
|
||||||
const [pids] = await connection.query(`
|
const pids = await connection.query(`
|
||||||
SELECT pm.pid
|
SELECT pm.pid
|
||||||
FROM product_metrics pm
|
FROM product_metrics pm
|
||||||
LEFT JOIN temp_revenue_ranks tr ON pm.pid = tr.pid
|
LEFT JOIN temp_revenue_ranks tr ON pm.pid = tr.pid
|
||||||
WHERE pm.abc_class IS NULL
|
WHERE pm.pid > $1
|
||||||
OR pm.abc_class !=
|
AND (pm.abc_class IS NULL
|
||||||
CASE
|
OR pm.abc_class !=
|
||||||
WHEN tr.pid IS NULL THEN 'C'
|
CASE
|
||||||
WHEN tr.percentile <= ? THEN 'A'
|
WHEN tr.pid IS NULL THEN 'C'
|
||||||
WHEN tr.percentile <= ? THEN 'B'
|
WHEN tr.percentile <= $2 THEN 'A'
|
||||||
ELSE 'C'
|
WHEN tr.percentile <= $3 THEN 'B'
|
||||||
END
|
ELSE 'C'
|
||||||
LIMIT ?
|
END)
|
||||||
`, [abcThresholds.a_threshold, abcThresholds.b_threshold, batchSize]);
|
ORDER BY pm.pid
|
||||||
|
LIMIT $4
|
||||||
|
`, [abcProcessedCount, abcThresholds.a_threshold, abcThresholds.b_threshold, batchSize]);
|
||||||
|
|
||||||
if (pids.length === 0) break;
|
if (pids.rows.length === 0) break;
|
||||||
|
|
||||||
|
const pidValues = pids.rows.map(row => row.pid);
|
||||||
|
|
||||||
await connection.query(`
|
await connection.query(`
|
||||||
UPDATE product_metrics pm
|
UPDATE product_metrics pm
|
||||||
LEFT JOIN temp_revenue_ranks tr ON pm.pid = tr.pid
|
SET abc_class =
|
||||||
SET pm.abc_class =
|
|
||||||
CASE
|
CASE
|
||||||
WHEN tr.pid IS NULL THEN 'C'
|
WHEN tr.pid IS NULL THEN 'C'
|
||||||
WHEN tr.percentile <= ? THEN 'A'
|
WHEN tr.percentile <= $1 THEN 'A'
|
||||||
WHEN tr.percentile <= ? THEN 'B'
|
WHEN tr.percentile <= $2 THEN 'B'
|
||||||
ELSE 'C'
|
ELSE 'C'
|
||||||
END,
|
END,
|
||||||
pm.last_calculated_at = NOW()
|
last_calculated_at = NOW()
|
||||||
WHERE pm.pid IN (?)
|
FROM (SELECT pid, percentile FROM temp_revenue_ranks) tr
|
||||||
`, [abcThresholds.a_threshold, abcThresholds.b_threshold, pids.map(row => row.pid)]);
|
WHERE pm.pid = tr.pid AND pm.pid = ANY($3::bigint[])
|
||||||
|
OR (pm.pid = ANY($3::bigint[]) AND tr.pid IS NULL)
|
||||||
|
`, [abcThresholds.a_threshold, abcThresholds.b_threshold, pidValues]);
|
||||||
|
|
||||||
// Now update turnover rate with proper handling of zero inventory periods
|
// Now update turnover rate with proper handling of zero inventory periods
|
||||||
await connection.query(`
|
await connection.query(`
|
||||||
UPDATE product_metrics pm
|
UPDATE product_metrics pm
|
||||||
JOIN (
|
SET
|
||||||
|
turnover_rate = CASE
|
||||||
|
WHEN sales.avg_nonzero_stock > 0 AND sales.active_days > 0
|
||||||
|
THEN LEAST(
|
||||||
|
(sales.total_sold / sales.avg_nonzero_stock) * (365.0 / sales.active_days),
|
||||||
|
999.99
|
||||||
|
)
|
||||||
|
ELSE 0
|
||||||
|
END,
|
||||||
|
last_calculated_at = NOW()
|
||||||
|
FROM (
|
||||||
SELECT
|
SELECT
|
||||||
o.pid,
|
o.pid,
|
||||||
SUM(o.quantity) as total_sold,
|
SUM(o.quantity) as total_sold,
|
||||||
@@ -526,22 +609,33 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
|
|||||||
FROM orders o
|
FROM orders o
|
||||||
JOIN products p ON o.pid = p.pid
|
JOIN products p ON o.pid = p.pid
|
||||||
WHERE o.canceled = false
|
WHERE o.canceled = false
|
||||||
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 90 DAY)
|
AND o.date >= CURRENT_DATE - INTERVAL '90 days'
|
||||||
AND o.pid IN (?)
|
AND o.pid = ANY($1::bigint[])
|
||||||
GROUP BY o.pid
|
GROUP BY o.pid
|
||||||
) sales ON pm.pid = sales.pid
|
) sales
|
||||||
SET
|
WHERE pm.pid = sales.pid
|
||||||
pm.turnover_rate = CASE
|
`, [pidValues]);
|
||||||
WHEN sales.avg_nonzero_stock > 0 AND sales.active_days > 0
|
|
||||||
THEN LEAST(
|
abcProcessedCount = pids.rows[pids.rows.length - 1].pid;
|
||||||
(sales.total_sold / sales.avg_nonzero_stock) * (365.0 / sales.active_days),
|
|
||||||
999.99
|
// Calculate progress proportionally to total products
|
||||||
)
|
processedCount = Math.floor(totalProducts * (0.60 + (abcProcessedCount / maxProductId) * 0.2));
|
||||||
ELSE 0
|
|
||||||
END,
|
outputProgress({
|
||||||
pm.last_calculated_at = NOW()
|
status: 'running',
|
||||||
WHERE pm.pid IN (?)
|
operation: 'ABC classification progress',
|
||||||
`, [pids.map(row => row.pid), pids.map(row => row.pid)]);
|
current: processedCount,
|
||||||
|
total: totalProducts,
|
||||||
|
elapsed: formatElapsedTime(startTime),
|
||||||
|
remaining: estimateRemaining(startTime, processedCount, totalProducts),
|
||||||
|
rate: calculateRate(startTime, processedCount),
|
||||||
|
percentage: ((processedCount / totalProducts) * 100).toFixed(1),
|
||||||
|
timing: {
|
||||||
|
start_time: new Date(startTime).toISOString(),
|
||||||
|
end_time: new Date().toISOString(),
|
||||||
|
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we get here, everything completed successfully
|
// If we get here, everything completed successfully
|
||||||
@@ -551,7 +645,8 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
|
|||||||
await connection.query(`
|
await connection.query(`
|
||||||
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
|
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
|
||||||
VALUES ('product_metrics', NOW())
|
VALUES ('product_metrics', NOW())
|
||||||
ON DUPLICATE KEY UPDATE last_calculation_timestamp = NOW()
|
ON CONFLICT (module_name) DO UPDATE
|
||||||
|
SET last_calculation_timestamp = NOW()
|
||||||
`);
|
`);
|
||||||
|
|
||||||
return {
|
return {
|
||||||
@@ -566,7 +661,16 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
|
|||||||
logError(error, 'Error calculating product metrics');
|
logError(error, 'Error calculating product metrics');
|
||||||
throw error;
|
throw error;
|
||||||
} finally {
|
} finally {
|
||||||
|
// Always clean up temporary tables, even if an error occurred
|
||||||
if (connection) {
|
if (connection) {
|
||||||
|
try {
|
||||||
|
await connection.query('DROP TABLE IF EXISTS temp_sales_metrics');
|
||||||
|
await connection.query('DROP TABLE IF EXISTS temp_purchase_metrics');
|
||||||
|
} catch (err) {
|
||||||
|
console.error('Error cleaning up temporary tables:', err);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure to release the connection
|
||||||
connection.release();
|
connection.release();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -32,13 +32,13 @@ async function calculateSalesForecasts(startTime, totalProducts, processedCount
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Get order count that will be processed
|
// Get order count that will be processed
|
||||||
const [orderCount] = await connection.query(`
|
const orderCount = await connection.query(`
|
||||||
SELECT COUNT(*) as count
|
SELECT COUNT(*) as count
|
||||||
FROM orders o
|
FROM orders o
|
||||||
WHERE o.canceled = false
|
WHERE o.canceled = false
|
||||||
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 90 DAY)
|
AND o.date >= CURRENT_DATE - INTERVAL '90 days'
|
||||||
`);
|
`);
|
||||||
processedOrders = orderCount[0].count;
|
processedOrders = parseInt(orderCount.rows[0].count);
|
||||||
|
|
||||||
outputProgress({
|
outputProgress({
|
||||||
status: 'running',
|
status: 'running',
|
||||||
@@ -69,15 +69,15 @@ async function calculateSalesForecasts(startTime, totalProducts, processedCount
|
|||||||
await connection.query(`
|
await connection.query(`
|
||||||
INSERT INTO temp_forecast_dates
|
INSERT INTO temp_forecast_dates
|
||||||
SELECT
|
SELECT
|
||||||
DATE_ADD(CURRENT_DATE, INTERVAL n DAY) as forecast_date,
|
CURRENT_DATE + (n || ' days')::INTERVAL as forecast_date,
|
||||||
DAYOFWEEK(DATE_ADD(CURRENT_DATE, INTERVAL n DAY)) as day_of_week,
|
EXTRACT(DOW FROM CURRENT_DATE + (n || ' days')::INTERVAL) + 1 as day_of_week,
|
||||||
MONTH(DATE_ADD(CURRENT_DATE, INTERVAL n DAY)) as month
|
EXTRACT(MONTH FROM CURRENT_DATE + (n || ' days')::INTERVAL) as month
|
||||||
FROM (
|
FROM (
|
||||||
SELECT a.N + b.N * 10 as n
|
SELECT a.n + b.n * 10 as n
|
||||||
FROM
|
FROM
|
||||||
(SELECT 0 as N UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4 UNION
|
(SELECT 0 as n UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4 UNION
|
||||||
SELECT 5 UNION SELECT 6 UNION SELECT 7 UNION SELECT 8 UNION SELECT 9) a,
|
SELECT 5 UNION SELECT 6 UNION SELECT 7 UNION SELECT 8 UNION SELECT 9) a,
|
||||||
(SELECT 0 as N UNION SELECT 1 UNION SELECT 2) b
|
(SELECT 0 as n UNION SELECT 1 UNION SELECT 2) b
|
||||||
ORDER BY n
|
ORDER BY n
|
||||||
LIMIT 31
|
LIMIT 31
|
||||||
) numbers
|
) numbers
|
||||||
@@ -109,17 +109,17 @@ async function calculateSalesForecasts(startTime, totalProducts, processedCount
|
|||||||
|
|
||||||
// Create temporary table for daily sales stats
|
// Create temporary table for daily sales stats
|
||||||
await connection.query(`
|
await connection.query(`
|
||||||
CREATE TEMPORARY TABLE IF NOT EXISTS temp_daily_sales AS
|
CREATE TEMPORARY TABLE temp_daily_sales AS
|
||||||
SELECT
|
SELECT
|
||||||
o.pid,
|
o.pid,
|
||||||
DAYOFWEEK(o.date) as day_of_week,
|
EXTRACT(DOW FROM o.date) + 1 as day_of_week,
|
||||||
SUM(o.quantity) as daily_quantity,
|
SUM(o.quantity) as daily_quantity,
|
||||||
SUM(o.price * o.quantity) as daily_revenue,
|
SUM(o.price * o.quantity) as daily_revenue,
|
||||||
COUNT(DISTINCT DATE(o.date)) as day_count
|
COUNT(DISTINCT DATE(o.date)) as day_count
|
||||||
FROM orders o
|
FROM orders o
|
||||||
WHERE o.canceled = false
|
WHERE o.canceled = false
|
||||||
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 90 DAY)
|
AND o.date >= CURRENT_DATE - INTERVAL '90 days'
|
||||||
GROUP BY o.pid, DAYOFWEEK(o.date)
|
GROUP BY o.pid, EXTRACT(DOW FROM o.date) + 1
|
||||||
`);
|
`);
|
||||||
|
|
||||||
processedCount = Math.floor(totalProducts * 0.94);
|
processedCount = Math.floor(totalProducts * 0.94);
|
||||||
@@ -148,7 +148,7 @@ async function calculateSalesForecasts(startTime, totalProducts, processedCount
|
|||||||
|
|
||||||
// Create temporary table for product stats
|
// Create temporary table for product stats
|
||||||
await connection.query(`
|
await connection.query(`
|
||||||
CREATE TEMPORARY TABLE IF NOT EXISTS temp_product_stats AS
|
CREATE TEMPORARY TABLE temp_product_stats AS
|
||||||
SELECT
|
SELECT
|
||||||
pid,
|
pid,
|
||||||
AVG(daily_revenue) as overall_avg_revenue,
|
AVG(daily_revenue) as overall_avg_revenue,
|
||||||
@@ -186,10 +186,9 @@ async function calculateSalesForecasts(startTime, totalProducts, processedCount
|
|||||||
INSERT INTO sales_forecasts (
|
INSERT INTO sales_forecasts (
|
||||||
pid,
|
pid,
|
||||||
forecast_date,
|
forecast_date,
|
||||||
forecast_units,
|
forecast_quantity,
|
||||||
forecast_revenue,
|
|
||||||
confidence_level,
|
confidence_level,
|
||||||
last_calculated_at
|
created_at
|
||||||
)
|
)
|
||||||
WITH daily_stats AS (
|
WITH daily_stats AS (
|
||||||
SELECT
|
SELECT
|
||||||
@@ -223,29 +222,9 @@ async function calculateSalesForecasts(startTime, totalProducts, processedCount
|
|||||||
WHEN ds.std_daily_qty / NULLIF(ds.avg_daily_qty, 0) > 1.0 THEN 0.9
|
WHEN ds.std_daily_qty / NULLIF(ds.avg_daily_qty, 0) > 1.0 THEN 0.9
|
||||||
WHEN ds.std_daily_qty / NULLIF(ds.avg_daily_qty, 0) > 0.5 THEN 0.95
|
WHEN ds.std_daily_qty / NULLIF(ds.avg_daily_qty, 0) > 0.5 THEN 0.95
|
||||||
ELSE 1.0
|
ELSE 1.0
|
||||||
END,
|
END
|
||||||
2
|
|
||||||
)
|
)
|
||||||
) as forecast_units,
|
) as forecast_quantity,
|
||||||
GREATEST(0,
|
|
||||||
ROUND(
|
|
||||||
COALESCE(
|
|
||||||
CASE
|
|
||||||
WHEN ds.data_points >= 4 THEN ds.avg_daily_revenue
|
|
||||||
ELSE ps.overall_avg_revenue
|
|
||||||
END *
|
|
||||||
(1 + COALESCE(sf.seasonality_factor, 0)) *
|
|
||||||
CASE
|
|
||||||
WHEN ds.std_daily_revenue / NULLIF(ds.avg_daily_revenue, 0) > 1.5 THEN 0.85
|
|
||||||
WHEN ds.std_daily_revenue / NULLIF(ds.avg_daily_revenue, 0) > 1.0 THEN 0.9
|
|
||||||
WHEN ds.std_daily_revenue / NULLIF(ds.avg_daily_revenue, 0) > 0.5 THEN 0.95
|
|
||||||
ELSE 1.0
|
|
||||||
END,
|
|
||||||
0
|
|
||||||
),
|
|
||||||
2
|
|
||||||
)
|
|
||||||
) as forecast_revenue,
|
|
||||||
CASE
|
CASE
|
||||||
WHEN ds.total_days >= 60 AND ds.daily_variance_ratio < 0.5 THEN 90
|
WHEN ds.total_days >= 60 AND ds.daily_variance_ratio < 0.5 THEN 90
|
||||||
WHEN ds.total_days >= 60 THEN 85
|
WHEN ds.total_days >= 60 THEN 85
|
||||||
@@ -255,17 +234,18 @@ async function calculateSalesForecasts(startTime, totalProducts, processedCount
|
|||||||
WHEN ds.total_days >= 14 THEN 65
|
WHEN ds.total_days >= 14 THEN 65
|
||||||
ELSE 60
|
ELSE 60
|
||||||
END as confidence_level,
|
END as confidence_level,
|
||||||
NOW() as last_calculated_at
|
NOW() as created_at
|
||||||
FROM daily_stats ds
|
FROM daily_stats ds
|
||||||
JOIN temp_product_stats ps ON ds.pid = ps.pid
|
JOIN temp_product_stats ps ON ds.pid = ps.pid
|
||||||
CROSS JOIN temp_forecast_dates fd
|
CROSS JOIN temp_forecast_dates fd
|
||||||
LEFT JOIN sales_seasonality sf ON fd.month = sf.month
|
LEFT JOIN sales_seasonality sf ON fd.month = sf.month
|
||||||
GROUP BY ds.pid, fd.forecast_date, ps.overall_avg_revenue, sf.seasonality_factor
|
GROUP BY ds.pid, fd.forecast_date, ps.overall_avg_revenue, sf.seasonality_factor,
|
||||||
ON DUPLICATE KEY UPDATE
|
ds.avg_daily_qty, ds.std_daily_qty, ds.avg_daily_qty, ds.total_days, ds.daily_variance_ratio
|
||||||
forecast_units = VALUES(forecast_units),
|
ON CONFLICT (pid, forecast_date) DO UPDATE
|
||||||
forecast_revenue = VALUES(forecast_revenue),
|
SET
|
||||||
confidence_level = VALUES(confidence_level),
|
forecast_quantity = EXCLUDED.forecast_quantity,
|
||||||
last_calculated_at = NOW()
|
confidence_level = EXCLUDED.confidence_level,
|
||||||
|
created_at = NOW()
|
||||||
`);
|
`);
|
||||||
|
|
||||||
processedCount = Math.floor(totalProducts * 0.98);
|
processedCount = Math.floor(totalProducts * 0.98);
|
||||||
@@ -294,22 +274,22 @@ async function calculateSalesForecasts(startTime, totalProducts, processedCount
|
|||||||
|
|
||||||
// Create temporary table for category stats
|
// Create temporary table for category stats
|
||||||
await connection.query(`
|
await connection.query(`
|
||||||
CREATE TEMPORARY TABLE IF NOT EXISTS temp_category_sales AS
|
CREATE TEMPORARY TABLE temp_category_sales AS
|
||||||
SELECT
|
SELECT
|
||||||
pc.cat_id,
|
pc.cat_id,
|
||||||
DAYOFWEEK(o.date) as day_of_week,
|
EXTRACT(DOW FROM o.date) + 1 as day_of_week,
|
||||||
SUM(o.quantity) as daily_quantity,
|
SUM(o.quantity) as daily_quantity,
|
||||||
SUM(o.price * o.quantity) as daily_revenue,
|
SUM(o.price * o.quantity) as daily_revenue,
|
||||||
COUNT(DISTINCT DATE(o.date)) as day_count
|
COUNT(DISTINCT DATE(o.date)) as day_count
|
||||||
FROM orders o
|
FROM orders o
|
||||||
JOIN product_categories pc ON o.pid = pc.pid
|
JOIN product_categories pc ON o.pid = pc.pid
|
||||||
WHERE o.canceled = false
|
WHERE o.canceled = false
|
||||||
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 90 DAY)
|
AND o.date >= CURRENT_DATE - INTERVAL '90 days'
|
||||||
GROUP BY pc.cat_id, DAYOFWEEK(o.date)
|
GROUP BY pc.cat_id, EXTRACT(DOW FROM o.date) + 1
|
||||||
`);
|
`);
|
||||||
|
|
||||||
await connection.query(`
|
await connection.query(`
|
||||||
CREATE TEMPORARY TABLE IF NOT EXISTS temp_category_stats AS
|
CREATE TEMPORARY TABLE temp_category_stats AS
|
||||||
SELECT
|
SELECT
|
||||||
cat_id,
|
cat_id,
|
||||||
AVG(daily_revenue) as overall_avg_revenue,
|
AVG(daily_revenue) as overall_avg_revenue,
|
||||||
@@ -350,10 +330,10 @@ async function calculateSalesForecasts(startTime, totalProducts, processedCount
|
|||||||
forecast_units,
|
forecast_units,
|
||||||
forecast_revenue,
|
forecast_revenue,
|
||||||
confidence_level,
|
confidence_level,
|
||||||
last_calculated_at
|
created_at
|
||||||
)
|
)
|
||||||
SELECT
|
SELECT
|
||||||
cs.cat_id as category_id,
|
cs.cat_id::bigint as category_id,
|
||||||
fd.forecast_date,
|
fd.forecast_date,
|
||||||
GREATEST(0,
|
GREATEST(0,
|
||||||
AVG(cs.daily_quantity) *
|
AVG(cs.daily_quantity) *
|
||||||
@@ -366,7 +346,7 @@ async function calculateSalesForecasts(startTime, totalProducts, processedCount
|
|||||||
ELSE ct.overall_avg_revenue
|
ELSE ct.overall_avg_revenue
|
||||||
END *
|
END *
|
||||||
(1 + COALESCE(sf.seasonality_factor, 0)) *
|
(1 + COALESCE(sf.seasonality_factor, 0)) *
|
||||||
(0.95 + (RAND() * 0.1)),
|
(0.95 + (random() * 0.1)),
|
||||||
0
|
0
|
||||||
)
|
)
|
||||||
) as forecast_revenue,
|
) as forecast_revenue,
|
||||||
@@ -376,27 +356,34 @@ async function calculateSalesForecasts(startTime, totalProducts, processedCount
|
|||||||
WHEN ct.total_days >= 14 THEN 70
|
WHEN ct.total_days >= 14 THEN 70
|
||||||
ELSE 60
|
ELSE 60
|
||||||
END as confidence_level,
|
END as confidence_level,
|
||||||
NOW() as last_calculated_at
|
NOW() as created_at
|
||||||
FROM temp_category_sales cs
|
FROM temp_category_sales cs
|
||||||
JOIN temp_category_stats ct ON cs.cat_id = ct.cat_id
|
JOIN temp_category_stats ct ON cs.cat_id = ct.cat_id
|
||||||
CROSS JOIN temp_forecast_dates fd
|
CROSS JOIN temp_forecast_dates fd
|
||||||
LEFT JOIN sales_seasonality sf ON fd.month = sf.month
|
LEFT JOIN sales_seasonality sf ON fd.month = sf.month
|
||||||
GROUP BY cs.cat_id, fd.forecast_date, ct.overall_avg_revenue, ct.total_days, sf.seasonality_factor
|
GROUP BY
|
||||||
|
cs.cat_id,
|
||||||
|
fd.forecast_date,
|
||||||
|
ct.overall_avg_revenue,
|
||||||
|
ct.total_days,
|
||||||
|
sf.seasonality_factor,
|
||||||
|
sf.month
|
||||||
HAVING AVG(cs.daily_quantity) > 0
|
HAVING AVG(cs.daily_quantity) > 0
|
||||||
ON DUPLICATE KEY UPDATE
|
ON CONFLICT (category_id, forecast_date) DO UPDATE
|
||||||
forecast_units = VALUES(forecast_units),
|
SET
|
||||||
forecast_revenue = VALUES(forecast_revenue),
|
forecast_units = EXCLUDED.forecast_units,
|
||||||
confidence_level = VALUES(confidence_level),
|
forecast_revenue = EXCLUDED.forecast_revenue,
|
||||||
last_calculated_at = NOW()
|
confidence_level = EXCLUDED.confidence_level,
|
||||||
|
created_at = NOW()
|
||||||
`);
|
`);
|
||||||
|
|
||||||
// Clean up temporary tables
|
// Clean up temporary tables
|
||||||
await connection.query(`
|
await connection.query(`
|
||||||
DROP TEMPORARY TABLE IF EXISTS temp_forecast_dates;
|
DROP TABLE IF EXISTS temp_forecast_dates;
|
||||||
DROP TEMPORARY TABLE IF EXISTS temp_daily_sales;
|
DROP TABLE IF EXISTS temp_daily_sales;
|
||||||
DROP TEMPORARY TABLE IF EXISTS temp_product_stats;
|
DROP TABLE IF EXISTS temp_product_stats;
|
||||||
DROP TEMPORARY TABLE IF EXISTS temp_category_sales;
|
DROP TABLE IF EXISTS temp_category_sales;
|
||||||
DROP TEMPORARY TABLE IF EXISTS temp_category_stats;
|
DROP TABLE IF EXISTS temp_category_stats;
|
||||||
`);
|
`);
|
||||||
|
|
||||||
processedCount = Math.floor(totalProducts * 1.0);
|
processedCount = Math.floor(totalProducts * 1.0);
|
||||||
@@ -423,7 +410,8 @@ async function calculateSalesForecasts(startTime, totalProducts, processedCount
|
|||||||
await connection.query(`
|
await connection.query(`
|
||||||
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
|
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
|
||||||
VALUES ('sales_forecasts', NOW())
|
VALUES ('sales_forecasts', NOW())
|
||||||
ON DUPLICATE KEY UPDATE last_calculation_timestamp = NOW()
|
ON CONFLICT (module_name) DO UPDATE
|
||||||
|
SET last_calculation_timestamp = NOW()
|
||||||
`);
|
`);
|
||||||
|
|
||||||
return {
|
return {
|
||||||
|
|||||||
@@ -32,12 +32,12 @@ async function calculateTimeAggregates(startTime, totalProducts, processedCount
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Get order count that will be processed
|
// Get order count that will be processed
|
||||||
const [orderCount] = await connection.query(`
|
const orderCount = await connection.query(`
|
||||||
SELECT COUNT(*) as count
|
SELECT COUNT(*) as count
|
||||||
FROM orders o
|
FROM orders o
|
||||||
WHERE o.canceled = false
|
WHERE o.canceled = false
|
||||||
`);
|
`);
|
||||||
processedOrders = orderCount[0].count;
|
processedOrders = parseInt(orderCount.rows[0].count);
|
||||||
|
|
||||||
outputProgress({
|
outputProgress({
|
||||||
status: 'running',
|
status: 'running',
|
||||||
@@ -75,8 +75,8 @@ async function calculateTimeAggregates(startTime, totalProducts, processedCount
|
|||||||
WITH monthly_sales AS (
|
WITH monthly_sales AS (
|
||||||
SELECT
|
SELECT
|
||||||
o.pid,
|
o.pid,
|
||||||
YEAR(o.date) as year,
|
EXTRACT(YEAR FROM o.date::timestamp with time zone) as year,
|
||||||
MONTH(o.date) as month,
|
EXTRACT(MONTH FROM o.date::timestamp with time zone) as month,
|
||||||
SUM(o.quantity) as total_quantity_sold,
|
SUM(o.quantity) as total_quantity_sold,
|
||||||
SUM((o.price - COALESCE(o.discount, 0)) * o.quantity) as total_revenue,
|
SUM((o.price - COALESCE(o.discount, 0)) * o.quantity) as total_revenue,
|
||||||
SUM(COALESCE(p.cost_price, 0) * o.quantity) as total_cost,
|
SUM(COALESCE(p.cost_price, 0) * o.quantity) as total_cost,
|
||||||
@@ -93,17 +93,17 @@ async function calculateTimeAggregates(startTime, totalProducts, processedCount
|
|||||||
FROM orders o
|
FROM orders o
|
||||||
JOIN products p ON o.pid = p.pid
|
JOIN products p ON o.pid = p.pid
|
||||||
WHERE o.canceled = false
|
WHERE o.canceled = false
|
||||||
GROUP BY o.pid, YEAR(o.date), MONTH(o.date)
|
GROUP BY o.pid, EXTRACT(YEAR FROM o.date::timestamp with time zone), EXTRACT(MONTH FROM o.date::timestamp with time zone), p.cost_price, p.stock_quantity
|
||||||
),
|
),
|
||||||
monthly_stock AS (
|
monthly_stock AS (
|
||||||
SELECT
|
SELECT
|
||||||
pid,
|
pid,
|
||||||
YEAR(date) as year,
|
EXTRACT(YEAR FROM date::timestamp with time zone) as year,
|
||||||
MONTH(date) as month,
|
EXTRACT(MONTH FROM date::timestamp with time zone) as month,
|
||||||
SUM(received) as stock_received,
|
SUM(received) as stock_received,
|
||||||
SUM(ordered) as stock_ordered
|
SUM(ordered) as stock_ordered
|
||||||
FROM purchase_orders
|
FROM purchase_orders
|
||||||
GROUP BY pid, YEAR(date), MONTH(date)
|
GROUP BY pid, EXTRACT(YEAR FROM date::timestamp with time zone), EXTRACT(MONTH FROM date::timestamp with time zone)
|
||||||
),
|
),
|
||||||
base_products AS (
|
base_products AS (
|
||||||
SELECT
|
SELECT
|
||||||
@@ -197,17 +197,18 @@ async function calculateTimeAggregates(startTime, totalProducts, processedCount
|
|||||||
AND s.year = ms.year
|
AND s.year = ms.year
|
||||||
AND s.month = ms.month
|
AND s.month = ms.month
|
||||||
)
|
)
|
||||||
ON DUPLICATE KEY UPDATE
|
ON CONFLICT (pid, year, month) DO UPDATE
|
||||||
total_quantity_sold = VALUES(total_quantity_sold),
|
SET
|
||||||
total_revenue = VALUES(total_revenue),
|
total_quantity_sold = EXCLUDED.total_quantity_sold,
|
||||||
total_cost = VALUES(total_cost),
|
total_revenue = EXCLUDED.total_revenue,
|
||||||
order_count = VALUES(order_count),
|
total_cost = EXCLUDED.total_cost,
|
||||||
stock_received = VALUES(stock_received),
|
order_count = EXCLUDED.order_count,
|
||||||
stock_ordered = VALUES(stock_ordered),
|
stock_received = EXCLUDED.stock_received,
|
||||||
avg_price = VALUES(avg_price),
|
stock_ordered = EXCLUDED.stock_ordered,
|
||||||
profit_margin = VALUES(profit_margin),
|
avg_price = EXCLUDED.avg_price,
|
||||||
inventory_value = VALUES(inventory_value),
|
profit_margin = EXCLUDED.profit_margin,
|
||||||
gmroi = VALUES(gmroi)
|
inventory_value = EXCLUDED.inventory_value,
|
||||||
|
gmroi = EXCLUDED.gmroi
|
||||||
`);
|
`);
|
||||||
|
|
||||||
processedCount = Math.floor(totalProducts * 0.60);
|
processedCount = Math.floor(totalProducts * 0.60);
|
||||||
@@ -237,23 +238,23 @@ async function calculateTimeAggregates(startTime, totalProducts, processedCount
|
|||||||
// Update with financial metrics
|
// Update with financial metrics
|
||||||
await connection.query(`
|
await connection.query(`
|
||||||
UPDATE product_time_aggregates pta
|
UPDATE product_time_aggregates pta
|
||||||
JOIN (
|
SET inventory_value = COALESCE(fin.inventory_value, 0)
|
||||||
|
FROM (
|
||||||
SELECT
|
SELECT
|
||||||
p.pid,
|
p.pid,
|
||||||
YEAR(o.date) as year,
|
EXTRACT(YEAR FROM o.date::timestamp with time zone) as year,
|
||||||
MONTH(o.date) as month,
|
EXTRACT(MONTH FROM o.date::timestamp with time zone) as month,
|
||||||
p.cost_price * p.stock_quantity as inventory_value,
|
p.cost_price * p.stock_quantity as inventory_value,
|
||||||
SUM(o.quantity * (o.price - p.cost_price)) as gross_profit,
|
SUM(o.quantity * (o.price - p.cost_price)) as gross_profit,
|
||||||
COUNT(DISTINCT DATE(o.date)) as active_days
|
COUNT(DISTINCT DATE(o.date)) as active_days
|
||||||
FROM products p
|
FROM products p
|
||||||
LEFT JOIN orders o ON p.pid = o.pid
|
LEFT JOIN orders o ON p.pid = o.pid
|
||||||
WHERE o.canceled = false
|
WHERE o.canceled = false
|
||||||
GROUP BY p.pid, YEAR(o.date), MONTH(o.date)
|
GROUP BY p.pid, EXTRACT(YEAR FROM o.date::timestamp with time zone), EXTRACT(MONTH FROM o.date::timestamp with time zone), p.cost_price, p.stock_quantity
|
||||||
) fin ON pta.pid = fin.pid
|
) fin
|
||||||
|
WHERE pta.pid = fin.pid
|
||||||
AND pta.year = fin.year
|
AND pta.year = fin.year
|
||||||
AND pta.month = fin.month
|
AND pta.month = fin.month
|
||||||
SET
|
|
||||||
pta.inventory_value = COALESCE(fin.inventory_value, 0)
|
|
||||||
`);
|
`);
|
||||||
|
|
||||||
processedCount = Math.floor(totalProducts * 0.65);
|
processedCount = Math.floor(totalProducts * 0.65);
|
||||||
@@ -280,7 +281,8 @@ async function calculateTimeAggregates(startTime, totalProducts, processedCount
|
|||||||
await connection.query(`
|
await connection.query(`
|
||||||
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
|
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
|
||||||
VALUES ('time_aggregates', NOW())
|
VALUES ('time_aggregates', NOW())
|
||||||
ON DUPLICATE KEY UPDATE last_calculation_timestamp = NOW()
|
ON CONFLICT (module_name) DO UPDATE
|
||||||
|
SET last_calculation_timestamp = NOW()
|
||||||
`);
|
`);
|
||||||
|
|
||||||
return {
|
return {
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
const mysql = require('mysql2/promise');
|
const { Pool } = require('pg');
|
||||||
const path = require('path');
|
const path = require('path');
|
||||||
require('dotenv').config({ path: path.resolve(__dirname, '../../..', '.env') });
|
require('dotenv').config({ path: path.resolve(__dirname, '../../..', '.env') });
|
||||||
|
|
||||||
@@ -8,36 +8,24 @@ const dbConfig = {
|
|||||||
user: process.env.DB_USER,
|
user: process.env.DB_USER,
|
||||||
password: process.env.DB_PASSWORD,
|
password: process.env.DB_PASSWORD,
|
||||||
database: process.env.DB_NAME,
|
database: process.env.DB_NAME,
|
||||||
waitForConnections: true,
|
port: process.env.DB_PORT || 5432,
|
||||||
connectionLimit: 10,
|
ssl: process.env.DB_SSL === 'true',
|
||||||
queueLimit: 0,
|
|
||||||
// Add performance optimizations
|
// Add performance optimizations
|
||||||
namedPlaceholders: true,
|
max: 10, // connection pool max size
|
||||||
maxPreparedStatements: 256,
|
idleTimeoutMillis: 30000,
|
||||||
enableKeepAlive: true,
|
connectionTimeoutMillis: 60000
|
||||||
keepAliveInitialDelay: 0,
|
|
||||||
// Add memory optimizations
|
|
||||||
flags: [
|
|
||||||
'FOUND_ROWS',
|
|
||||||
'LONG_PASSWORD',
|
|
||||||
'PROTOCOL_41',
|
|
||||||
'TRANSACTIONS',
|
|
||||||
'SECURE_CONNECTION',
|
|
||||||
'MULTI_RESULTS',
|
|
||||||
'PS_MULTI_RESULTS',
|
|
||||||
'PLUGIN_AUTH',
|
|
||||||
'CONNECT_ATTRS',
|
|
||||||
'PLUGIN_AUTH_LENENC_CLIENT_DATA',
|
|
||||||
'SESSION_TRACK',
|
|
||||||
'MULTI_STATEMENTS'
|
|
||||||
]
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// Create a single pool instance to be reused
|
// Create a single pool instance to be reused
|
||||||
const pool = mysql.createPool(dbConfig);
|
const pool = new Pool(dbConfig);
|
||||||
|
|
||||||
|
// Add event handlers for pool
|
||||||
|
pool.on('error', (err, client) => {
|
||||||
|
console.error('Unexpected error on idle client', err);
|
||||||
|
});
|
||||||
|
|
||||||
async function getConnection() {
|
async function getConnection() {
|
||||||
return await pool.getConnection();
|
return await pool.connect();
|
||||||
}
|
}
|
||||||
|
|
||||||
async function closePool() {
|
async function closePool() {
|
||||||
|
|||||||
@@ -33,7 +33,7 @@ async function calculateVendorMetrics(startTime, totalProducts, processedCount =
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Get counts of records that will be processed
|
// Get counts of records that will be processed
|
||||||
const [[orderCount], [poCount]] = await Promise.all([
|
const [orderCountResult, poCountResult] = await Promise.all([
|
||||||
connection.query(`
|
connection.query(`
|
||||||
SELECT COUNT(*) as count
|
SELECT COUNT(*) as count
|
||||||
FROM orders o
|
FROM orders o
|
||||||
@@ -45,8 +45,8 @@ async function calculateVendorMetrics(startTime, totalProducts, processedCount =
|
|||||||
WHERE po.status != 0
|
WHERE po.status != 0
|
||||||
`)
|
`)
|
||||||
]);
|
]);
|
||||||
processedOrders = orderCount.count;
|
processedOrders = parseInt(orderCountResult.rows[0].count);
|
||||||
processedPurchaseOrders = poCount.count;
|
processedPurchaseOrders = parseInt(poCountResult.rows[0].count);
|
||||||
|
|
||||||
outputProgress({
|
outputProgress({
|
||||||
status: 'running',
|
status: 'running',
|
||||||
@@ -66,7 +66,7 @@ async function calculateVendorMetrics(startTime, totalProducts, processedCount =
|
|||||||
|
|
||||||
// First ensure all vendors exist in vendor_details
|
// First ensure all vendors exist in vendor_details
|
||||||
await connection.query(`
|
await connection.query(`
|
||||||
INSERT IGNORE INTO vendor_details (vendor, status, created_at, updated_at)
|
INSERT INTO vendor_details (vendor, status, created_at, updated_at)
|
||||||
SELECT DISTINCT
|
SELECT DISTINCT
|
||||||
vendor,
|
vendor,
|
||||||
'active' as status,
|
'active' as status,
|
||||||
@@ -74,6 +74,7 @@ async function calculateVendorMetrics(startTime, totalProducts, processedCount =
|
|||||||
NOW() as updated_at
|
NOW() as updated_at
|
||||||
FROM products
|
FROM products
|
||||||
WHERE vendor IS NOT NULL
|
WHERE vendor IS NOT NULL
|
||||||
|
ON CONFLICT (vendor) DO NOTHING
|
||||||
`);
|
`);
|
||||||
|
|
||||||
processedCount = Math.floor(totalProducts * 0.8);
|
processedCount = Math.floor(totalProducts * 0.8);
|
||||||
@@ -128,7 +129,7 @@ async function calculateVendorMetrics(startTime, totalProducts, processedCount =
|
|||||||
FROM products p
|
FROM products p
|
||||||
JOIN orders o ON p.pid = o.pid
|
JOIN orders o ON p.pid = o.pid
|
||||||
WHERE o.canceled = false
|
WHERE o.canceled = false
|
||||||
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH)
|
AND o.date >= CURRENT_DATE - INTERVAL '12 months'
|
||||||
GROUP BY p.vendor
|
GROUP BY p.vendor
|
||||||
),
|
),
|
||||||
vendor_po AS (
|
vendor_po AS (
|
||||||
@@ -138,12 +139,15 @@ async function calculateVendorMetrics(startTime, totalProducts, processedCount =
|
|||||||
COUNT(DISTINCT po.id) as total_orders,
|
COUNT(DISTINCT po.id) as total_orders,
|
||||||
AVG(CASE
|
AVG(CASE
|
||||||
WHEN po.receiving_status = 40
|
WHEN po.receiving_status = 40
|
||||||
THEN DATEDIFF(po.received_date, po.date)
|
AND po.received_date IS NOT NULL
|
||||||
|
AND po.date IS NOT NULL
|
||||||
|
THEN EXTRACT(EPOCH FROM (po.received_date::timestamp with time zone - po.date::timestamp with time zone)) / 86400.0
|
||||||
|
ELSE NULL
|
||||||
END) as avg_lead_time_days,
|
END) as avg_lead_time_days,
|
||||||
SUM(po.ordered * po.po_cost_price) as total_purchase_value
|
SUM(po.ordered * po.po_cost_price) as total_purchase_value
|
||||||
FROM products p
|
FROM products p
|
||||||
JOIN purchase_orders po ON p.pid = po.pid
|
JOIN purchase_orders po ON p.pid = po.pid
|
||||||
WHERE po.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH)
|
WHERE po.date >= CURRENT_DATE - INTERVAL '12 months'
|
||||||
GROUP BY p.vendor
|
GROUP BY p.vendor
|
||||||
),
|
),
|
||||||
vendor_products AS (
|
vendor_products AS (
|
||||||
@@ -188,20 +192,21 @@ async function calculateVendorMetrics(startTime, totalProducts, processedCount =
|
|||||||
LEFT JOIN vendor_po vp ON vs.vendor = vp.vendor
|
LEFT JOIN vendor_po vp ON vs.vendor = vp.vendor
|
||||||
LEFT JOIN vendor_products vpr ON vs.vendor = vpr.vendor
|
LEFT JOIN vendor_products vpr ON vs.vendor = vpr.vendor
|
||||||
WHERE vs.vendor IS NOT NULL
|
WHERE vs.vendor IS NOT NULL
|
||||||
ON DUPLICATE KEY UPDATE
|
ON CONFLICT (vendor) DO UPDATE
|
||||||
total_revenue = VALUES(total_revenue),
|
SET
|
||||||
total_orders = VALUES(total_orders),
|
total_revenue = EXCLUDED.total_revenue,
|
||||||
total_late_orders = VALUES(total_late_orders),
|
total_orders = EXCLUDED.total_orders,
|
||||||
avg_lead_time_days = VALUES(avg_lead_time_days),
|
total_late_orders = EXCLUDED.total_late_orders,
|
||||||
on_time_delivery_rate = VALUES(on_time_delivery_rate),
|
avg_lead_time_days = EXCLUDED.avg_lead_time_days,
|
||||||
order_fill_rate = VALUES(order_fill_rate),
|
on_time_delivery_rate = EXCLUDED.on_time_delivery_rate,
|
||||||
avg_order_value = VALUES(avg_order_value),
|
order_fill_rate = EXCLUDED.order_fill_rate,
|
||||||
active_products = VALUES(active_products),
|
avg_order_value = EXCLUDED.avg_order_value,
|
||||||
total_products = VALUES(total_products),
|
active_products = EXCLUDED.active_products,
|
||||||
total_purchase_value = VALUES(total_purchase_value),
|
total_products = EXCLUDED.total_products,
|
||||||
avg_margin_percent = VALUES(avg_margin_percent),
|
total_purchase_value = EXCLUDED.total_purchase_value,
|
||||||
status = VALUES(status),
|
avg_margin_percent = EXCLUDED.avg_margin_percent,
|
||||||
last_calculated_at = VALUES(last_calculated_at)
|
status = EXCLUDED.status,
|
||||||
|
last_calculated_at = EXCLUDED.last_calculated_at
|
||||||
`);
|
`);
|
||||||
|
|
||||||
processedCount = Math.floor(totalProducts * 0.9);
|
processedCount = Math.floor(totalProducts * 0.9);
|
||||||
@@ -244,23 +249,23 @@ async function calculateVendorMetrics(startTime, totalProducts, processedCount =
|
|||||||
WITH monthly_orders AS (
|
WITH monthly_orders AS (
|
||||||
SELECT
|
SELECT
|
||||||
p.vendor,
|
p.vendor,
|
||||||
YEAR(o.date) as year,
|
EXTRACT(YEAR FROM o.date::timestamp with time zone) as year,
|
||||||
MONTH(o.date) as month,
|
EXTRACT(MONTH FROM o.date::timestamp with time zone) as month,
|
||||||
COUNT(DISTINCT o.id) as total_orders,
|
COUNT(DISTINCT o.id) as total_orders,
|
||||||
SUM(o.quantity * o.price) as total_revenue,
|
SUM(o.quantity * o.price) as total_revenue,
|
||||||
SUM(o.quantity * (o.price - p.cost_price)) as total_margin
|
SUM(o.quantity * (o.price - p.cost_price)) as total_margin
|
||||||
FROM products p
|
FROM products p
|
||||||
JOIN orders o ON p.pid = o.pid
|
JOIN orders o ON p.pid = o.pid
|
||||||
WHERE o.canceled = false
|
WHERE o.canceled = false
|
||||||
AND o.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH)
|
AND o.date >= CURRENT_DATE - INTERVAL '12 months'
|
||||||
AND p.vendor IS NOT NULL
|
AND p.vendor IS NOT NULL
|
||||||
GROUP BY p.vendor, YEAR(o.date), MONTH(o.date)
|
GROUP BY p.vendor, EXTRACT(YEAR FROM o.date::timestamp with time zone), EXTRACT(MONTH FROM o.date::timestamp with time zone)
|
||||||
),
|
),
|
||||||
monthly_po AS (
|
monthly_po AS (
|
||||||
SELECT
|
SELECT
|
||||||
p.vendor,
|
p.vendor,
|
||||||
YEAR(po.date) as year,
|
EXTRACT(YEAR FROM po.date::timestamp with time zone) as year,
|
||||||
MONTH(po.date) as month,
|
EXTRACT(MONTH FROM po.date::timestamp with time zone) as month,
|
||||||
COUNT(DISTINCT po.id) as total_po,
|
COUNT(DISTINCT po.id) as total_po,
|
||||||
COUNT(DISTINCT CASE
|
COUNT(DISTINCT CASE
|
||||||
WHEN po.receiving_status = 40 AND po.received_date > po.expected_date
|
WHEN po.receiving_status = 40 AND po.received_date > po.expected_date
|
||||||
@@ -268,14 +273,17 @@ async function calculateVendorMetrics(startTime, totalProducts, processedCount =
|
|||||||
END) as late_orders,
|
END) as late_orders,
|
||||||
AVG(CASE
|
AVG(CASE
|
||||||
WHEN po.receiving_status = 40
|
WHEN po.receiving_status = 40
|
||||||
THEN DATEDIFF(po.received_date, po.date)
|
AND po.received_date IS NOT NULL
|
||||||
|
AND po.date IS NOT NULL
|
||||||
|
THEN EXTRACT(EPOCH FROM (po.received_date::timestamp with time zone - po.date::timestamp with time zone)) / 86400.0
|
||||||
|
ELSE NULL
|
||||||
END) as avg_lead_time_days,
|
END) as avg_lead_time_days,
|
||||||
SUM(po.ordered * po.po_cost_price) as total_purchase_value
|
SUM(po.ordered * po.po_cost_price) as total_purchase_value
|
||||||
FROM products p
|
FROM products p
|
||||||
JOIN purchase_orders po ON p.pid = po.pid
|
JOIN purchase_orders po ON p.pid = po.pid
|
||||||
WHERE po.date >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH)
|
WHERE po.date >= CURRENT_DATE - INTERVAL '12 months'
|
||||||
AND p.vendor IS NOT NULL
|
AND p.vendor IS NOT NULL
|
||||||
GROUP BY p.vendor, YEAR(po.date), MONTH(po.date)
|
GROUP BY p.vendor, EXTRACT(YEAR FROM po.date::timestamp with time zone), EXTRACT(MONTH FROM po.date::timestamp with time zone)
|
||||||
)
|
)
|
||||||
SELECT
|
SELECT
|
||||||
mo.vendor,
|
mo.vendor,
|
||||||
@@ -311,13 +319,14 @@ async function calculateVendorMetrics(startTime, totalProducts, processedCount =
|
|||||||
AND mp.year = mo.year
|
AND mp.year = mo.year
|
||||||
AND mp.month = mo.month
|
AND mp.month = mo.month
|
||||||
WHERE mo.vendor IS NULL
|
WHERE mo.vendor IS NULL
|
||||||
ON DUPLICATE KEY UPDATE
|
ON CONFLICT (vendor, year, month) DO UPDATE
|
||||||
total_orders = VALUES(total_orders),
|
SET
|
||||||
late_orders = VALUES(late_orders),
|
total_orders = EXCLUDED.total_orders,
|
||||||
avg_lead_time_days = VALUES(avg_lead_time_days),
|
late_orders = EXCLUDED.late_orders,
|
||||||
total_purchase_value = VALUES(total_purchase_value),
|
avg_lead_time_days = EXCLUDED.avg_lead_time_days,
|
||||||
total_revenue = VALUES(total_revenue),
|
total_purchase_value = EXCLUDED.total_purchase_value,
|
||||||
avg_margin_percent = VALUES(avg_margin_percent)
|
total_revenue = EXCLUDED.total_revenue,
|
||||||
|
avg_margin_percent = EXCLUDED.avg_margin_percent
|
||||||
`);
|
`);
|
||||||
|
|
||||||
processedCount = Math.floor(totalProducts * 0.95);
|
processedCount = Math.floor(totalProducts * 0.95);
|
||||||
@@ -344,7 +353,8 @@ async function calculateVendorMetrics(startTime, totalProducts, processedCount =
|
|||||||
await connection.query(`
|
await connection.query(`
|
||||||
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
|
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
|
||||||
VALUES ('vendor_metrics', NOW())
|
VALUES ('vendor_metrics', NOW())
|
||||||
ON DUPLICATE KEY UPDATE last_calculation_timestamp = NOW()
|
ON CONFLICT (module_name) DO UPDATE
|
||||||
|
SET last_calculation_timestamp = NOW()
|
||||||
`);
|
`);
|
||||||
|
|
||||||
return {
|
return {
|
||||||
|
|||||||
@@ -100,6 +100,9 @@ async function resetMetrics() {
|
|||||||
client = new Client(dbConfig);
|
client = new Client(dbConfig);
|
||||||
await client.connect();
|
await client.connect();
|
||||||
|
|
||||||
|
// Explicitly begin a transaction
|
||||||
|
await client.query('BEGIN');
|
||||||
|
|
||||||
// First verify current state
|
// First verify current state
|
||||||
const initialTables = await client.query(`
|
const initialTables = await client.query(`
|
||||||
SELECT tablename as name
|
SELECT tablename as name
|
||||||
@@ -124,6 +127,7 @@ async function resetMetrics() {
|
|||||||
|
|
||||||
for (const table of [...METRICS_TABLES].reverse()) {
|
for (const table of [...METRICS_TABLES].reverse()) {
|
||||||
try {
|
try {
|
||||||
|
// Use NOWAIT to avoid hanging if there's a lock
|
||||||
await client.query(`DROP TABLE IF EXISTS "${table}" CASCADE`);
|
await client.query(`DROP TABLE IF EXISTS "${table}" CASCADE`);
|
||||||
|
|
||||||
// Verify the table was actually dropped
|
// Verify the table was actually dropped
|
||||||
@@ -142,13 +146,23 @@ async function resetMetrics() {
|
|||||||
operation: 'Table dropped',
|
operation: 'Table dropped',
|
||||||
message: `Successfully dropped table: ${table}`
|
message: `Successfully dropped table: ${table}`
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Commit after each table drop to ensure locks are released
|
||||||
|
await client.query('COMMIT');
|
||||||
|
// Start a new transaction for the next table
|
||||||
|
await client.query('BEGIN');
|
||||||
|
// Re-disable foreign key constraints for the new transaction
|
||||||
|
await client.query('SET session_replication_role = \'replica\'');
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
outputProgress({
|
outputProgress({
|
||||||
status: 'error',
|
status: 'error',
|
||||||
operation: 'Drop table error',
|
operation: 'Drop table error',
|
||||||
message: `Error dropping table ${table}: ${err.message}`
|
message: `Error dropping table ${table}: ${err.message}`
|
||||||
});
|
});
|
||||||
throw err;
|
await client.query('ROLLBACK');
|
||||||
|
// Re-start transaction for next table
|
||||||
|
await client.query('BEGIN');
|
||||||
|
await client.query('SET session_replication_role = \'replica\'');
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -164,6 +178,11 @@ async function resetMetrics() {
|
|||||||
throw new Error(`Failed to drop all tables. Remaining tables: ${afterDrop.rows.map(t => t.name).join(', ')}`);
|
throw new Error(`Failed to drop all tables. Remaining tables: ${afterDrop.rows.map(t => t.name).join(', ')}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Make sure we have a fresh transaction here
|
||||||
|
await client.query('COMMIT');
|
||||||
|
await client.query('BEGIN');
|
||||||
|
await client.query('SET session_replication_role = \'replica\'');
|
||||||
|
|
||||||
// Read metrics schema
|
// Read metrics schema
|
||||||
outputProgress({
|
outputProgress({
|
||||||
operation: 'Reading schema',
|
operation: 'Reading schema',
|
||||||
@@ -220,6 +239,13 @@ async function resetMetrics() {
|
|||||||
rowCount: result.rowCount
|
rowCount: result.rowCount
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Commit every 10 statements to avoid long-running transactions
|
||||||
|
if (i > 0 && i % 10 === 0) {
|
||||||
|
await client.query('COMMIT');
|
||||||
|
await client.query('BEGIN');
|
||||||
|
await client.query('SET session_replication_role = \'replica\'');
|
||||||
|
}
|
||||||
} catch (sqlError) {
|
} catch (sqlError) {
|
||||||
outputProgress({
|
outputProgress({
|
||||||
status: 'error',
|
status: 'error',
|
||||||
@@ -230,10 +256,17 @@ async function resetMetrics() {
|
|||||||
statementNumber: i + 1
|
statementNumber: i + 1
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
await client.query('ROLLBACK');
|
||||||
throw sqlError;
|
throw sqlError;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Final commit for any pending statements
|
||||||
|
await client.query('COMMIT');
|
||||||
|
|
||||||
|
// Start new transaction for final checks
|
||||||
|
await client.query('BEGIN');
|
||||||
|
|
||||||
// Re-enable foreign key checks after all tables are created
|
// Re-enable foreign key checks after all tables are created
|
||||||
await client.query('SET session_replication_role = \'origin\'');
|
await client.query('SET session_replication_role = \'origin\'');
|
||||||
|
|
||||||
@@ -269,9 +302,11 @@ async function resetMetrics() {
|
|||||||
operation: 'Final table check',
|
operation: 'Final table check',
|
||||||
message: `All database tables: ${finalCheck.rows.map(t => t.name).join(', ')}`
|
message: `All database tables: ${finalCheck.rows.map(t => t.name).join(', ')}`
|
||||||
});
|
});
|
||||||
|
await client.query('ROLLBACK');
|
||||||
throw new Error(`Failed to create metrics tables: ${missingMetricsTables.join(', ')}`);
|
throw new Error(`Failed to create metrics tables: ${missingMetricsTables.join(', ')}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Commit final transaction
|
||||||
await client.query('COMMIT');
|
await client.query('COMMIT');
|
||||||
|
|
||||||
outputProgress({
|
outputProgress({
|
||||||
@@ -288,7 +323,11 @@ async function resetMetrics() {
|
|||||||
});
|
});
|
||||||
|
|
||||||
if (client) {
|
if (client) {
|
||||||
await client.query('ROLLBACK');
|
try {
|
||||||
|
await client.query('ROLLBACK');
|
||||||
|
} catch (rollbackError) {
|
||||||
|
console.error('Error during rollback:', rollbackError);
|
||||||
|
}
|
||||||
// Make sure to re-enable foreign key checks even if there's an error
|
// Make sure to re-enable foreign key checks even if there's an error
|
||||||
await client.query('SET session_replication_role = \'origin\'').catch(() => {});
|
await client.query('SET session_replication_role = \'origin\'').catch(() => {});
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user