Files
inventory/inventory-server/old/populate-initial-metrics.js
2025-10-04 16:14:09 -04:00

677 lines
26 KiB
JavaScript

const path = require('path');
const fs = require('fs');
const os = require('os'); // For detecting CPU cores
// Get the base directory (the directory containing the inventory-server folder)
const baseDir = path.resolve(__dirname, '../../..');
// Load environment variables from the inventory-server directory
require('dotenv').config({ path: path.resolve(__dirname, '../..', '.env') });
// Configure statement timeout (30 minutes)
const PG_STATEMENT_TIMEOUT_MS = 1800000;
// Add error handler for uncaught exceptions
process.on('uncaughtException', (error) => {
console.error('Uncaught Exception:', error);
process.exit(1);
});
// Add error handler for unhandled promise rejections
process.on('unhandledRejection', (reason, promise) => {
console.error('Unhandled Rejection at:', promise, 'reason:', reason);
process.exit(1);
});
// Load progress module
const progress = require('../scripts/metrics-new/utils/progress');
// Store progress functions in global scope to ensure availability
global.formatElapsedTime = progress.formatElapsedTime;
global.estimateRemaining = progress.estimateRemaining;
global.calculateRate = progress.calculateRate;
global.outputProgress = progress.outputProgress;
global.clearProgress = progress.clearProgress;
global.getProgress = progress.getProgress;
global.logError = progress.logError;
// Load database module
const { getConnection, closePool } = require('../scripts/metrics-new/utils/db');
// Add cancel handler
let isCancelled = false;
let runningQueryPromise = null;
function cancelCalculation() {
if (!isCancelled) {
isCancelled = true;
console.log('Calculation has been cancelled by user');
// Store the query promise to potentially cancel it
const queryToCancel = runningQueryPromise;
if (queryToCancel) {
console.log('Attempting to cancel the running query...');
}
// Force-terminate any query that's been running for more than 5 seconds
try {
const connection = getConnection();
connection.then(async (conn) => {
try {
// Identify and terminate long-running queries from our application
await conn.query(`
SELECT pg_cancel_backend(pid)
FROM pg_stat_activity
WHERE query_start < now() - interval '5 seconds'
AND application_name = 'populate_metrics'
AND query NOT LIKE '%pg_cancel_backend%'
`);
// Release connection
conn.release();
} catch (err) {
console.error('Error during force cancellation:', err);
conn.release();
}
}).catch(err => {
console.error('Could not get connection for cancellation:', err);
});
} catch (err) {
console.error('Failed to terminate running queries:', err);
}
}
return {
success: true,
message: 'Calculation has been cancelled'
};
}
// Handle SIGTERM signal for cancellation
process.on('SIGTERM', cancelCalculation);
process.on('SIGINT', cancelCalculation);
const calculateInitialMetrics = (client, onProgress) => {
return client.query(`
-- Truncate the existing metrics tables to ensure clean data
TRUNCATE TABLE public.daily_product_snapshots;
TRUNCATE TABLE public.product_metrics;
-- First let's create daily snapshots for all products with order activity
WITH SalesData AS (
SELECT
p.pid,
p.sku,
o.date::date AS order_date,
-- Count orders to ensure we only include products with real activity
COUNT(o.id) as order_count,
-- Aggregate Sales (Quantity > 0, Status not Canceled/Returned)
COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN o.quantity ELSE 0 END), 0) AS units_sold,
COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN o.price * o.quantity ELSE 0 END), 0.00) AS gross_revenue_unadjusted,
COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN o.discount ELSE 0 END), 0.00) AS discounts,
COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN COALESCE(o.costeach, p.landing_cost_price, p.cost_price) * o.quantity ELSE 0 END), 0.00) AS cogs,
COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN p.regular_price * o.quantity ELSE 0 END), 0.00) AS gross_regular_revenue,
-- Aggregate Returns (Quantity < 0 or Status = Returned)
COALESCE(SUM(CASE WHEN o.quantity < 0 OR COALESCE(o.status, 'pending') = 'returned' THEN ABS(o.quantity) ELSE 0 END), 0) AS units_returned,
COALESCE(SUM(CASE WHEN o.quantity < 0 OR COALESCE(o.status, 'pending') = 'returned' THEN o.price * ABS(o.quantity) ELSE 0 END), 0.00) AS returns_revenue
FROM public.products p
LEFT JOIN public.orders o ON p.pid = o.pid
GROUP BY p.pid, p.sku, o.date::date
HAVING COUNT(o.id) > 0 -- Only include products with actual orders
),
ReceivingData AS (
SELECT
r.pid,
r.received_date::date AS receiving_date,
-- Count receiving documents to ensure we only include products with real activity
COUNT(DISTINCT r.receiving_id) as receiving_count,
-- Calculate received quantity for this day
SUM(r.received_quantity) AS units_received,
-- Calculate received cost for this day
SUM(r.received_quantity * r.unit_cost) AS cost_received
FROM public.receivings r
GROUP BY r.pid, r.received_date::date
HAVING COUNT(DISTINCT r.receiving_id) > 0 OR SUM(r.received_quantity) > 0
),
-- Get current stock quantities
StockData AS (
SELECT
p.pid,
p.stock_quantity,
COALESCE(p.landing_cost_price, p.cost_price, 0.00) as effective_cost_price,
COALESCE(p.price, 0.00) as current_price,
COALESCE(p.regular_price, 0.00) as current_regular_price
FROM public.products p
),
-- Combine sales and receiving dates to get all activity dates
DatePidCombos AS (
SELECT DISTINCT pid, order_date AS activity_date FROM SalesData
UNION
SELECT DISTINCT pid, receiving_date FROM ReceivingData
),
-- Insert daily snapshots for all product-date combinations
SnapshotInsert AS (
INSERT INTO public.daily_product_snapshots (
snapshot_date,
pid,
sku,
eod_stock_quantity,
eod_stock_cost,
eod_stock_retail,
eod_stock_gross,
stockout_flag,
units_sold,
units_returned,
gross_revenue,
discounts,
returns_revenue,
net_revenue,
cogs,
gross_regular_revenue,
profit,
units_received,
cost_received,
calculation_timestamp
)
SELECT
d.activity_date AS snapshot_date,
d.pid,
p.sku,
-- Use current stock as approximation, since historical stock data is not available
s.stock_quantity AS eod_stock_quantity,
s.stock_quantity * s.effective_cost_price AS eod_stock_cost,
s.stock_quantity * s.current_price AS eod_stock_retail,
s.stock_quantity * s.current_regular_price AS eod_stock_gross,
(s.stock_quantity <= 0) AS stockout_flag,
-- Sales metrics
COALESCE(sd.units_sold, 0),
COALESCE(sd.units_returned, 0),
COALESCE(sd.gross_revenue_unadjusted, 0.00),
COALESCE(sd.discounts, 0.00),
COALESCE(sd.returns_revenue, 0.00),
COALESCE(sd.gross_revenue_unadjusted, 0.00) - COALESCE(sd.discounts, 0.00) AS net_revenue,
COALESCE(sd.cogs, 0.00),
COALESCE(sd.gross_regular_revenue, 0.00),
(COALESCE(sd.gross_revenue_unadjusted, 0.00) - COALESCE(sd.discounts, 0.00)) - COALESCE(sd.cogs, 0.00) AS profit,
-- Receiving metrics
COALESCE(rd.units_received, 0),
COALESCE(rd.cost_received, 0.00),
now() -- calculation timestamp
FROM DatePidCombos d
JOIN public.products p ON d.pid = p.pid
LEFT JOIN SalesData sd ON d.pid = sd.pid AND d.activity_date = sd.order_date
LEFT JOIN ReceivingData rd ON d.pid = rd.pid AND d.activity_date = rd.receiving_date
LEFT JOIN StockData s ON d.pid = s.pid
RETURNING pid, snapshot_date
),
-- Now build the aggregated product metrics from the daily snapshots
MetricsInsert AS (
INSERT INTO public.product_metrics (
pid,
sku,
current_stock_quantity,
current_stock_cost,
current_stock_retail,
current_stock_msrp,
is_out_of_stock,
total_units_sold,
total_units_returned,
return_rate,
gross_revenue,
total_discounts,
total_returns,
net_revenue,
total_cogs,
total_gross_revenue,
total_profit,
profit_margin,
avg_daily_units,
reorder_point,
reorder_alert,
days_of_supply,
sales_velocity,
sales_velocity_score,
rank_by_revenue,
rank_by_quantity,
rank_by_profit,
total_received_quantity,
total_received_cost,
last_sold_date,
last_received_date,
days_since_last_sale,
days_since_last_received,
calculation_timestamp
)
SELECT
p.pid,
p.sku,
p.stock_quantity AS current_stock_quantity,
p.stock_quantity * COALESCE(p.landing_cost_price, p.cost_price, 0) AS current_stock_cost,
p.stock_quantity * COALESCE(p.price, 0) AS current_stock_retail,
p.stock_quantity * COALESCE(p.regular_price, 0) AS current_stock_msrp,
(p.stock_quantity <= 0) AS is_out_of_stock,
-- Aggregate metrics
COALESCE(SUM(ds.units_sold), 0) AS total_units_sold,
COALESCE(SUM(ds.units_returned), 0) AS total_units_returned,
CASE
WHEN COALESCE(SUM(ds.units_sold), 0) > 0
THEN COALESCE(SUM(ds.units_returned), 0)::float / NULLIF(COALESCE(SUM(ds.units_sold), 0), 0)
ELSE 0
END AS return_rate,
COALESCE(SUM(ds.gross_revenue), 0) AS gross_revenue,
COALESCE(SUM(ds.discounts), 0) AS total_discounts,
COALESCE(SUM(ds.returns_revenue), 0) AS total_returns,
COALESCE(SUM(ds.net_revenue), 0) AS net_revenue,
COALESCE(SUM(ds.cogs), 0) AS total_cogs,
COALESCE(SUM(ds.gross_regular_revenue), 0) AS total_gross_revenue,
COALESCE(SUM(ds.profit), 0) AS total_profit,
CASE
WHEN COALESCE(SUM(ds.net_revenue), 0) > 0
THEN COALESCE(SUM(ds.profit), 0) / NULLIF(COALESCE(SUM(ds.net_revenue), 0), 0)
ELSE 0
END AS profit_margin,
-- Calculate average daily units
COALESCE(AVG(ds.units_sold), 0) AS avg_daily_units,
-- Calculate reorder point (simplified, can be enhanced with lead time and safety stock)
CEILING(COALESCE(AVG(ds.units_sold) * 14, 0)) AS reorder_point,
(p.stock_quantity <= CEILING(COALESCE(AVG(ds.units_sold) * 14, 0))) AS reorder_alert,
-- Days of supply based on average daily sales
CASE
WHEN COALESCE(AVG(ds.units_sold), 0) > 0
THEN p.stock_quantity / NULLIF(COALESCE(AVG(ds.units_sold), 0), 0)
ELSE NULL
END AS days_of_supply,
-- Sales velocity (average units sold per day over last 30 days)
(SELECT COALESCE(AVG(recent.units_sold), 0)
FROM public.daily_product_snapshots recent
WHERE recent.pid = p.pid
AND recent.snapshot_date >= CURRENT_DATE - INTERVAL '30 days'
) AS sales_velocity,
-- Placeholder for sales velocity score (can be calculated based on velocity)
0 AS sales_velocity_score,
-- Will be updated later by ranking procedure
0 AS rank_by_revenue,
0 AS rank_by_quantity,
0 AS rank_by_profit,
-- Receiving data
COALESCE(SUM(ds.units_received), 0) AS total_received_quantity,
COALESCE(SUM(ds.cost_received), 0) AS total_received_cost,
-- Date metrics
(SELECT MAX(sd.snapshot_date)
FROM public.daily_product_snapshots sd
WHERE sd.pid = p.pid AND sd.units_sold > 0
) AS last_sold_date,
(SELECT MAX(rd.snapshot_date)
FROM public.daily_product_snapshots rd
WHERE rd.pid = p.pid AND rd.units_received > 0
) AS last_received_date,
-- Calculate days since last sale/received
CASE
WHEN (SELECT MAX(sd.snapshot_date)
FROM public.daily_product_snapshots sd
WHERE sd.pid = p.pid AND sd.units_sold > 0) IS NOT NULL
THEN (CURRENT_DATE - (SELECT MAX(sd.snapshot_date)
FROM public.daily_product_snapshots sd
WHERE sd.pid = p.pid AND sd.units_sold > 0))::integer
ELSE NULL
END AS days_since_last_sale,
CASE
WHEN (SELECT MAX(rd.snapshot_date)
FROM public.daily_product_snapshots rd
WHERE rd.pid = p.pid AND rd.units_received > 0) IS NOT NULL
THEN (CURRENT_DATE - (SELECT MAX(rd.snapshot_date)
FROM public.daily_product_snapshots rd
WHERE rd.pid = p.pid AND rd.units_received > 0))::integer
ELSE NULL
END AS days_since_last_received,
now() -- calculation timestamp
FROM public.products p
LEFT JOIN public.daily_product_snapshots ds ON p.pid = ds.pid
GROUP BY p.pid, p.sku, p.stock_quantity, p.landing_cost_price, p.cost_price, p.price, p.regular_price
)
-- Update the calculate_status table
INSERT INTO public.calculate_status (module_name, last_calculation_timestamp)
VALUES
('daily_snapshots', now()),
('product_metrics', now())
ON CONFLICT (module_name) DO UPDATE
SET last_calculation_timestamp = now();
-- Finally, update the ranks for products
UPDATE public.product_metrics pm SET
rank_by_revenue = rev_ranks.rank
FROM (
SELECT pid, RANK() OVER (ORDER BY net_revenue DESC) AS rank
FROM public.product_metrics
WHERE net_revenue > 0
) rev_ranks
WHERE pm.pid = rev_ranks.pid;
UPDATE public.product_metrics pm SET
rank_by_quantity = qty_ranks.rank
FROM (
SELECT pid, RANK() OVER (ORDER BY total_units_sold DESC) AS rank
FROM public.product_metrics
WHERE total_units_sold > 0
) qty_ranks
WHERE pm.pid = qty_ranks.pid;
UPDATE public.product_metrics pm SET
rank_by_profit = profit_ranks.rank
FROM (
SELECT pid, RANK() OVER (ORDER BY total_profit DESC) AS rank
FROM public.product_metrics
WHERE total_profit > 0
) profit_ranks
WHERE pm.pid = profit_ranks.pid;
-- Return count of products with metrics
SELECT COUNT(*) AS product_count FROM public.product_metrics
`);
};
async function populateInitialMetrics() {
let connection;
const startTime = Date.now();
let calculateHistoryId;
try {
// Clean up any previously running calculations
connection = await getConnection({
// Add performance-related settings
application_name: 'populate_metrics',
statement_timeout: PG_STATEMENT_TIMEOUT_MS, // 30 min timeout per statement
});
// Ensure the calculate_status table exists and has the correct structure
await connection.query(`
CREATE TABLE IF NOT EXISTS calculate_status (
module_name TEXT PRIMARY KEY,
last_calculation_timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
)
`);
await connection.query(`
UPDATE calculate_history
SET
status = 'cancelled',
end_time = NOW(),
duration_seconds = EXTRACT(EPOCH FROM (NOW() - start_time))::INTEGER,
error_message = 'Previous calculation was not completed properly'
WHERE status = 'running' AND additional_info->>'type' = 'populate_initial_metrics'
`);
// Create history record for this calculation
const historyResult = await connection.query(`
INSERT INTO calculate_history (
start_time,
status,
additional_info
) VALUES (
NOW(),
'running',
jsonb_build_object(
'type', 'populate_initial_metrics',
'sql_file', 'populate_initial_product_metrics.sql'
)
) RETURNING id
`);
calculateHistoryId = historyResult.rows[0].id;
// Initialize progress
global.outputProgress({
status: 'running',
operation: 'Starting initial product metrics population',
current: 0,
total: 100,
elapsed: '0s',
remaining: 'Calculating... (this may take a while)',
rate: 0,
percentage: '0',
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
},
historyId: calculateHistoryId
});
// Prepare the database - analyze tables
global.outputProgress({
status: 'running',
operation: 'Analyzing database tables for better query performance',
current: 2,
total: 100,
elapsed: global.formatElapsedTime(startTime),
remaining: 'Analyzing...',
rate: 0,
percentage: '2',
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
},
historyId: calculateHistoryId
});
// Enable better query planning and parallel operations
await connection.query(`
-- Analyze tables for better query planning
ANALYZE public.products;
ANALYZE public.purchase_orders;
ANALYZE public.daily_product_snapshots;
ANALYZE public.orders;
-- Enable parallel operations
SET LOCAL enable_parallel_append = on;
SET LOCAL enable_parallel_hash = on;
SET LOCAL max_parallel_workers_per_gather = 4;
-- Larger work memory for complex sorts/joins
SET LOCAL work_mem = '128MB';
`).catch(err => {
// Non-fatal if analyze fails
console.warn('Failed to analyze tables (non-fatal):', err.message);
});
// Execute the SQL query
global.outputProgress({
status: 'running',
operation: 'Executing initial metrics SQL query',
current: 5,
total: 100,
elapsed: global.formatElapsedTime(startTime),
remaining: 'Calculating... (this could take several hours with 150M+ records)',
rate: 0,
percentage: '5',
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
},
historyId: calculateHistoryId
});
// Read the SQL file
const sqlFilePath = path.resolve(__dirname, 'populate_initial_product_metrics.sql');
console.log('Base directory:', baseDir);
console.log('Script directory:', __dirname);
console.log('SQL file path:', sqlFilePath);
console.log('Current working directory:', process.cwd());
if (!fs.existsSync(sqlFilePath)) {
throw new Error(`SQL file not found at ${sqlFilePath}`);
}
// Read and clean up the SQL (Slightly more robust cleaning)
const sqlQuery = fs.readFileSync(sqlFilePath, 'utf8')
.replace(/\r\n/g, '\n') // Handle Windows endings
.replace(/\r/g, '\n') // Handle old Mac endings
.trim(); // Remove leading/trailing whitespace VERY IMPORTANT
// Log details again AFTER cleaning
console.log('SQL Query length (cleaned):', sqlQuery.length);
console.log('SQL Query structure validation:');
console.log('- Contains DO block:', sqlQuery.includes('DO $$') || sqlQuery.includes('DO $')); // Check both types of tag start
console.log('- Contains BEGIN:', sqlQuery.includes('BEGIN'));
console.log('- Contains END:', sqlQuery.includes('END $$;') || sqlQuery.includes('END $')); // Check both types of tag end
console.log('- First 50 chars:', JSON.stringify(sqlQuery.slice(0, 50)));
console.log('- Last 100 chars (cleaned):', JSON.stringify(sqlQuery.slice(-100)));
// Final check to ensure clean SQL ending
if (!sqlQuery.endsWith('END $$;')) {
console.warn('WARNING: SQL does not end with "END $$;". This might cause issues.');
console.log('Exact ending:', JSON.stringify(sqlQuery.slice(-20)));
}
// Execute the script
console.log('Starting initial product metrics population...');
// Track the query promise for potential cancellation
runningQueryPromise = connection.query({
text: sqlQuery,
rowMode: 'array'
});
await runningQueryPromise;
runningQueryPromise = null;
// Update progress to 100%
global.outputProgress({
status: 'complete',
operation: 'Initial product metrics population complete',
current: 100,
total: 100,
elapsed: global.formatElapsedTime(startTime),
remaining: '0s',
rate: 0,
percentage: '100',
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
},
historyId: calculateHistoryId
});
// Update history with completion
await connection.query(`
UPDATE calculate_history
SET
end_time = NOW(),
duration_seconds = $1,
status = 'completed'
WHERE id = $2
`, [Math.round((Date.now() - startTime) / 1000), calculateHistoryId]);
// Clear progress file on successful completion
global.clearProgress();
return {
success: true,
message: 'Initial product metrics population completed successfully',
duration: Math.round((Date.now() - startTime) / 1000)
};
} catch (error) {
const endTime = Date.now();
const totalElapsedSeconds = Math.round((endTime - startTime) / 1000);
// Enhanced error logging
console.error('Error details:', {
message: error.message,
code: error.code,
hint: error.hint,
position: error.position,
detail: error.detail,
where: error.where ? error.where.substring(0, 500) + '...' : undefined, // Truncate to avoid huge logs
severity: error.severity,
file: error.file,
line: error.line,
routine: error.routine
});
// Update history with error
if (connection && calculateHistoryId) {
await connection.query(`
UPDATE calculate_history
SET
end_time = NOW(),
duration_seconds = $1,
status = $2,
error_message = $3
WHERE id = $4
`, [
totalElapsedSeconds,
isCancelled ? 'cancelled' : 'failed',
error.message,
calculateHistoryId
]);
}
if (isCancelled) {
global.outputProgress({
status: 'cancelled',
operation: 'Calculation cancelled',
current: 50,
total: 100,
elapsed: global.formatElapsedTime(startTime),
remaining: null,
rate: 0,
percentage: '50',
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: totalElapsedSeconds
},
historyId: calculateHistoryId
});
} else {
global.outputProgress({
status: 'error',
operation: 'Error during initial product metrics population',
message: error.message,
current: 0,
total: 100,
elapsed: global.formatElapsedTime(startTime),
remaining: null,
rate: 0,
percentage: '0',
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: totalElapsedSeconds
},
historyId: calculateHistoryId
});
}
console.error('Error during initial product metrics population:', error);
return {
success: false,
error: error.message,
duration: totalElapsedSeconds
};
} finally {
if (connection) {
connection.release();
}
await closePool();
}
}
// Start population process
populateInitialMetrics()
.then(result => {
if (result.success) {
console.log(`Initial product metrics population completed successfully in ${result.duration} seconds`);
process.exit(0);
} else {
console.error(`Initial product metrics population failed: ${result.error}`);
process.exit(1);
}
})
.catch(err => {
console.error('Unexpected error:', err);
process.exit(1);
});