Split up calculate script into useable pieces

This commit is contained in:
2025-01-21 20:00:43 -05:00
parent 94ac097f42
commit fa8e2fa33b
10 changed files with 1795 additions and 2006 deletions

View File

@@ -0,0 +1,51 @@
const mysql = require('mysql2/promise');
const path = require('path');
require('dotenv').config({ path: path.resolve(__dirname, '../../..', '.env') });
// Database configuration
const dbConfig = {
host: process.env.DB_HOST,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
database: process.env.DB_NAME,
waitForConnections: true,
connectionLimit: 10,
queueLimit: 0,
// Add performance optimizations
namedPlaceholders: true,
maxPreparedStatements: 256,
enableKeepAlive: true,
keepAliveInitialDelay: 0,
// Add memory optimizations
flags: [
'FOUND_ROWS',
'LONG_PASSWORD',
'PROTOCOL_41',
'TRANSACTIONS',
'SECURE_CONNECTION',
'MULTI_RESULTS',
'PS_MULTI_RESULTS',
'PLUGIN_AUTH',
'CONNECT_ATTRS',
'PLUGIN_AUTH_LENENC_CLIENT_DATA',
'SESSION_TRACK',
'MULTI_STATEMENTS'
]
};
// Create a single pool instance to be reused
const pool = mysql.createPool(dbConfig);
async function getConnection() {
return await pool.getConnection();
}
async function closePool() {
await pool.end();
}
module.exports = {
dbConfig,
getConnection,
closePool
};

View File

@@ -0,0 +1,151 @@
const fs = require('fs');
const path = require('path');
// Helper function to format elapsed time
function formatElapsedTime(startTime) {
const elapsed = Date.now() - startTime;
const seconds = Math.floor(elapsed / 1000);
const minutes = Math.floor(seconds / 60);
const hours = Math.floor(minutes / 60);
if (hours > 0) {
return `${hours}h ${minutes % 60}m`;
} else if (minutes > 0) {
return `${minutes}m ${seconds % 60}s`;
} else {
return `${seconds}s`;
}
}
// Helper function to estimate remaining time
function estimateRemaining(startTime, current, total) {
if (current === 0) return null;
const elapsed = Date.now() - startTime;
const rate = current / elapsed;
const remaining = (total - current) / rate;
const minutes = Math.floor(remaining / 60000);
const seconds = Math.floor((remaining % 60000) / 1000);
if (minutes > 0) {
return `${minutes}m ${seconds}s`;
} else {
return `${seconds}s`;
}
}
// Helper function to calculate rate
function calculateRate(startTime, current) {
const elapsed = (Date.now() - startTime) / 1000; // Convert to seconds
return elapsed > 0 ? Math.round(current / elapsed) : 0;
}
// Set up logging
const LOG_DIR = path.join(__dirname, '../../../logs');
const ERROR_LOG = path.join(LOG_DIR, 'import-errors.log');
const IMPORT_LOG = path.join(LOG_DIR, 'import.log');
const STATUS_FILE = path.join(LOG_DIR, 'metrics-status.json');
// Ensure log directory exists
if (!fs.existsSync(LOG_DIR)) {
fs.mkdirSync(LOG_DIR, { recursive: true });
}
// Helper function to log errors
function logError(error, context = '') {
const timestamp = new Date().toISOString();
const errorMessage = `[${timestamp}] ${context}\nError: ${error.message}\nStack: ${error.stack}\n\n`;
// Log to error file
fs.appendFileSync(ERROR_LOG, errorMessage);
// Also log to console
console.error(`\n${context}\nError: ${error.message}`);
}
// Helper function to log import progress
function logImport(message) {
const timestamp = new Date().toISOString();
const logMessage = `[${timestamp}] ${message}\n`;
fs.appendFileSync(IMPORT_LOG, logMessage);
}
// Helper function to output progress
function outputProgress(data) {
// Save progress to file for resumption
saveProgress(data);
// Format as SSE event
const event = {
progress: data
};
// Always send to stdout for frontend
process.stdout.write(JSON.stringify(event) + '\n');
// Log significant events to disk
const isSignificant =
// Operation starts
(data.operation && !data.current) ||
// Operation completions and errors
data.status === 'complete' ||
data.status === 'error' ||
// Major phase changes
data.operation?.includes('Starting ABC classification') ||
data.operation?.includes('Starting time-based aggregates') ||
data.operation?.includes('Starting vendor metrics');
if (isSignificant) {
logImport(`${data.operation || 'Operation'}${data.message ? ': ' + data.message : ''}${data.error ? ' Error: ' + data.error : ''}${data.status ? ' Status: ' + data.status : ''}`);
}
}
function saveProgress(progress) {
try {
fs.writeFileSync(STATUS_FILE, JSON.stringify({
...progress,
timestamp: Date.now()
}));
} catch (err) {
console.error('Failed to save progress:', err);
}
}
function clearProgress() {
try {
if (fs.existsSync(STATUS_FILE)) {
fs.unlinkSync(STATUS_FILE);
}
} catch (err) {
console.error('Failed to clear progress:', err);
}
}
function getProgress() {
try {
if (fs.existsSync(STATUS_FILE)) {
const progress = JSON.parse(fs.readFileSync(STATUS_FILE, 'utf8'));
// Check if the progress is still valid (less than 1 hour old)
if (progress.timestamp && Date.now() - progress.timestamp < 3600000) {
return progress;
} else {
// Clear old progress
clearProgress();
}
}
} catch (err) {
console.error('Failed to read progress:', err);
clearProgress();
}
return null;
}
module.exports = {
formatElapsedTime,
estimateRemaining,
calculateRate,
logError,
logImport,
outputProgress,
saveProgress,
clearProgress,
getProgress
};

View File

@@ -0,0 +1,133 @@
const { getConnection } = require('./db');
async function calculateTimeAggregates(startTime, totalProducts, processedCount) {
const connection = await getConnection();
try {
// Initial insert of time-based aggregates
await connection.query(`
INSERT INTO product_time_aggregates (
product_id,
year,
month,
total_quantity_sold,
total_revenue,
total_cost,
order_count,
stock_received,
stock_ordered,
avg_price,
profit_margin
)
WITH sales_data AS (
SELECT
o.product_id,
YEAR(o.date) as year,
MONTH(o.date) as month,
SUM(o.quantity) as total_quantity_sold,
SUM((o.price - COALESCE(o.discount, 0)) * o.quantity) as total_revenue,
SUM(COALESCE(p.cost_price, 0) * o.quantity) as total_cost,
COUNT(DISTINCT o.order_number) as order_count,
AVG(o.price - COALESCE(o.discount, 0)) as avg_price,
CASE
WHEN SUM((o.price - COALESCE(o.discount, 0)) * o.quantity) = 0 THEN 0
ELSE ((SUM((o.price - COALESCE(o.discount, 0)) * o.quantity) -
SUM(COALESCE(p.cost_price, 0) * o.quantity)) /
SUM((o.price - COALESCE(o.discount, 0)) * o.quantity)) * 100
END as profit_margin
FROM orders o
JOIN products p ON o.product_id = p.product_id
WHERE o.canceled = 0
GROUP BY o.product_id, YEAR(o.date), MONTH(o.date)
),
purchase_data AS (
SELECT
product_id,
YEAR(date) as year,
MONTH(date) as month,
SUM(received) as stock_received,
SUM(ordered) as stock_ordered
FROM purchase_orders
WHERE status = 'closed'
GROUP BY product_id, YEAR(date), MONTH(date)
)
SELECT
s.product_id,
s.year,
s.month,
s.total_quantity_sold,
s.total_revenue,
s.total_cost,
s.order_count,
COALESCE(p.stock_received, 0) as stock_received,
COALESCE(p.stock_ordered, 0) as stock_ordered,
s.avg_price,
s.profit_margin
FROM sales_data s
LEFT JOIN purchase_data p
ON s.product_id = p.product_id
AND s.year = p.year
AND s.month = p.month
UNION
SELECT
p.product_id,
p.year,
p.month,
0 as total_quantity_sold,
0 as total_revenue,
0 as total_cost,
0 as order_count,
p.stock_received,
p.stock_ordered,
0 as avg_price,
0 as profit_margin
FROM purchase_data p
LEFT JOIN sales_data s
ON p.product_id = s.product_id
AND p.year = s.year
AND p.month = s.month
WHERE s.product_id IS NULL
ON DUPLICATE KEY UPDATE
total_quantity_sold = VALUES(total_quantity_sold),
total_revenue = VALUES(total_revenue),
total_cost = VALUES(total_cost),
order_count = VALUES(order_count),
stock_received = VALUES(stock_received),
stock_ordered = VALUES(stock_ordered),
avg_price = VALUES(avg_price),
profit_margin = VALUES(profit_margin)
`);
// Update with financial metrics
await connection.query(`
UPDATE product_time_aggregates pta
JOIN (
SELECT
p.product_id,
YEAR(o.date) as year,
MONTH(o.date) as month,
p.cost_price * p.stock_quantity as inventory_value,
SUM(o.quantity * (o.price - p.cost_price)) as gross_profit,
COUNT(DISTINCT DATE(o.date)) as days_in_period
FROM products p
LEFT JOIN orders o ON p.product_id = o.product_id
WHERE o.canceled = false
GROUP BY p.product_id, YEAR(o.date), MONTH(o.date)
) fin ON pta.product_id = fin.product_id
AND pta.year = fin.year
AND pta.month = fin.month
SET
pta.inventory_value = COALESCE(fin.inventory_value, 0),
pta.gmroi = CASE
WHEN COALESCE(fin.inventory_value, 0) > 0 AND fin.days_in_period > 0 THEN
(COALESCE(fin.gross_profit, 0) * (365.0 / fin.days_in_period)) / COALESCE(fin.inventory_value, 0)
ELSE 0
END
`);
return Math.floor(totalProducts * 0.65);
} finally {
connection.release();
}
}
module.exports = calculateTimeAggregates;