Clean up old historical data calcs/scripts, optimize calculations to not update every row every time
This commit is contained in:
@@ -1,677 +0,0 @@
|
||||
const path = require('path');
|
||||
const fs = require('fs');
|
||||
const os = require('os'); // For detecting CPU cores
|
||||
|
||||
// Get the base directory (the directory containing the inventory-server folder)
|
||||
const baseDir = path.resolve(__dirname, '../../..');
|
||||
|
||||
// Load environment variables from the inventory-server directory
|
||||
require('dotenv').config({ path: path.resolve(__dirname, '../..', '.env') });
|
||||
|
||||
// Configure statement timeout (30 minutes)
|
||||
const PG_STATEMENT_TIMEOUT_MS = 1800000;
|
||||
|
||||
// Add error handler for uncaught exceptions
|
||||
process.on('uncaughtException', (error) => {
|
||||
console.error('Uncaught Exception:', error);
|
||||
process.exit(1);
|
||||
});
|
||||
|
||||
// Add error handler for unhandled promise rejections
|
||||
process.on('unhandledRejection', (reason, promise) => {
|
||||
console.error('Unhandled Rejection at:', promise, 'reason:', reason);
|
||||
process.exit(1);
|
||||
});
|
||||
|
||||
// Load progress module
|
||||
const progress = require('../utils/progress');
|
||||
|
||||
// Store progress functions in global scope to ensure availability
|
||||
global.formatElapsedTime = progress.formatElapsedTime;
|
||||
global.estimateRemaining = progress.estimateRemaining;
|
||||
global.calculateRate = progress.calculateRate;
|
||||
global.outputProgress = progress.outputProgress;
|
||||
global.clearProgress = progress.clearProgress;
|
||||
global.getProgress = progress.getProgress;
|
||||
global.logError = progress.logError;
|
||||
|
||||
// Load database module
|
||||
const { getConnection, closePool } = require('../utils/db');
|
||||
|
||||
// Add cancel handler
|
||||
let isCancelled = false;
|
||||
let runningQueryPromise = null;
|
||||
|
||||
function cancelCalculation() {
|
||||
if (!isCancelled) {
|
||||
isCancelled = true;
|
||||
console.log('Calculation has been cancelled by user');
|
||||
|
||||
// Store the query promise to potentially cancel it
|
||||
const queryToCancel = runningQueryPromise;
|
||||
if (queryToCancel) {
|
||||
console.log('Attempting to cancel the running query...');
|
||||
}
|
||||
|
||||
// Force-terminate any query that's been running for more than 5 seconds
|
||||
try {
|
||||
const connection = getConnection();
|
||||
connection.then(async (conn) => {
|
||||
try {
|
||||
// Identify and terminate long-running queries from our application
|
||||
await conn.query(`
|
||||
SELECT pg_cancel_backend(pid)
|
||||
FROM pg_stat_activity
|
||||
WHERE query_start < now() - interval '5 seconds'
|
||||
AND application_name = 'populate_metrics'
|
||||
AND query NOT LIKE '%pg_cancel_backend%'
|
||||
`);
|
||||
|
||||
// Release connection
|
||||
conn.release();
|
||||
} catch (err) {
|
||||
console.error('Error during force cancellation:', err);
|
||||
conn.release();
|
||||
}
|
||||
}).catch(err => {
|
||||
console.error('Could not get connection for cancellation:', err);
|
||||
});
|
||||
} catch (err) {
|
||||
console.error('Failed to terminate running queries:', err);
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
success: true,
|
||||
message: 'Calculation has been cancelled'
|
||||
};
|
||||
}
|
||||
|
||||
// Handle SIGTERM signal for cancellation
|
||||
process.on('SIGTERM', cancelCalculation);
|
||||
process.on('SIGINT', cancelCalculation);
|
||||
|
||||
const calculateInitialMetrics = (client, onProgress) => {
|
||||
return client.query(`
|
||||
-- Truncate the existing metrics tables to ensure clean data
|
||||
TRUNCATE TABLE public.daily_product_snapshots;
|
||||
TRUNCATE TABLE public.product_metrics;
|
||||
|
||||
-- First let's create daily snapshots for all products with order activity
|
||||
WITH SalesData AS (
|
||||
SELECT
|
||||
p.pid,
|
||||
p.sku,
|
||||
o.date::date AS order_date,
|
||||
-- Count orders to ensure we only include products with real activity
|
||||
COUNT(o.id) as order_count,
|
||||
-- Aggregate Sales (Quantity > 0, Status not Canceled/Returned)
|
||||
COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN o.quantity ELSE 0 END), 0) AS units_sold,
|
||||
COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN o.price * o.quantity ELSE 0 END), 0.00) AS gross_revenue_unadjusted,
|
||||
COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN o.discount ELSE 0 END), 0.00) AS discounts,
|
||||
COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN COALESCE(o.costeach, p.landing_cost_price, p.cost_price) * o.quantity ELSE 0 END), 0.00) AS cogs,
|
||||
COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN p.regular_price * o.quantity ELSE 0 END), 0.00) AS gross_regular_revenue,
|
||||
|
||||
-- Aggregate Returns (Quantity < 0 or Status = Returned)
|
||||
COALESCE(SUM(CASE WHEN o.quantity < 0 OR COALESCE(o.status, 'pending') = 'returned' THEN ABS(o.quantity) ELSE 0 END), 0) AS units_returned,
|
||||
COALESCE(SUM(CASE WHEN o.quantity < 0 OR COALESCE(o.status, 'pending') = 'returned' THEN o.price * ABS(o.quantity) ELSE 0 END), 0.00) AS returns_revenue
|
||||
FROM public.products p
|
||||
LEFT JOIN public.orders o ON p.pid = o.pid
|
||||
GROUP BY p.pid, p.sku, o.date::date
|
||||
HAVING COUNT(o.id) > 0 -- Only include products with actual orders
|
||||
),
|
||||
ReceivingData AS (
|
||||
SELECT
|
||||
r.pid,
|
||||
r.received_date::date AS receiving_date,
|
||||
-- Count receiving documents to ensure we only include products with real activity
|
||||
COUNT(DISTINCT r.receiving_id) as receiving_count,
|
||||
-- Calculate received quantity for this day
|
||||
SUM(r.received_quantity) AS units_received,
|
||||
-- Calculate received cost for this day
|
||||
SUM(r.received_quantity * r.unit_cost) AS cost_received
|
||||
FROM public.receivings r
|
||||
GROUP BY r.pid, r.received_date::date
|
||||
HAVING COUNT(DISTINCT r.receiving_id) > 0 OR SUM(r.received_quantity) > 0
|
||||
),
|
||||
-- Get current stock quantities
|
||||
StockData AS (
|
||||
SELECT
|
||||
p.pid,
|
||||
p.stock_quantity,
|
||||
COALESCE(p.landing_cost_price, p.cost_price, 0.00) as effective_cost_price,
|
||||
COALESCE(p.price, 0.00) as current_price,
|
||||
COALESCE(p.regular_price, 0.00) as current_regular_price
|
||||
FROM public.products p
|
||||
),
|
||||
-- Combine sales and receiving dates to get all activity dates
|
||||
DatePidCombos AS (
|
||||
SELECT DISTINCT pid, order_date AS activity_date FROM SalesData
|
||||
UNION
|
||||
SELECT DISTINCT pid, receiving_date FROM ReceivingData
|
||||
),
|
||||
-- Insert daily snapshots for all product-date combinations
|
||||
SnapshotInsert AS (
|
||||
INSERT INTO public.daily_product_snapshots (
|
||||
snapshot_date,
|
||||
pid,
|
||||
sku,
|
||||
eod_stock_quantity,
|
||||
eod_stock_cost,
|
||||
eod_stock_retail,
|
||||
eod_stock_gross,
|
||||
stockout_flag,
|
||||
units_sold,
|
||||
units_returned,
|
||||
gross_revenue,
|
||||
discounts,
|
||||
returns_revenue,
|
||||
net_revenue,
|
||||
cogs,
|
||||
gross_regular_revenue,
|
||||
profit,
|
||||
units_received,
|
||||
cost_received,
|
||||
calculation_timestamp
|
||||
)
|
||||
SELECT
|
||||
d.activity_date AS snapshot_date,
|
||||
d.pid,
|
||||
p.sku,
|
||||
-- Use current stock as approximation, since historical stock data is not available
|
||||
s.stock_quantity AS eod_stock_quantity,
|
||||
s.stock_quantity * s.effective_cost_price AS eod_stock_cost,
|
||||
s.stock_quantity * s.current_price AS eod_stock_retail,
|
||||
s.stock_quantity * s.current_regular_price AS eod_stock_gross,
|
||||
(s.stock_quantity <= 0) AS stockout_flag,
|
||||
-- Sales metrics
|
||||
COALESCE(sd.units_sold, 0),
|
||||
COALESCE(sd.units_returned, 0),
|
||||
COALESCE(sd.gross_revenue_unadjusted, 0.00),
|
||||
COALESCE(sd.discounts, 0.00),
|
||||
COALESCE(sd.returns_revenue, 0.00),
|
||||
COALESCE(sd.gross_revenue_unadjusted, 0.00) - COALESCE(sd.discounts, 0.00) AS net_revenue,
|
||||
COALESCE(sd.cogs, 0.00),
|
||||
COALESCE(sd.gross_regular_revenue, 0.00),
|
||||
(COALESCE(sd.gross_revenue_unadjusted, 0.00) - COALESCE(sd.discounts, 0.00)) - COALESCE(sd.cogs, 0.00) AS profit,
|
||||
-- Receiving metrics
|
||||
COALESCE(rd.units_received, 0),
|
||||
COALESCE(rd.cost_received, 0.00),
|
||||
now() -- calculation timestamp
|
||||
FROM DatePidCombos d
|
||||
JOIN public.products p ON d.pid = p.pid
|
||||
LEFT JOIN SalesData sd ON d.pid = sd.pid AND d.activity_date = sd.order_date
|
||||
LEFT JOIN ReceivingData rd ON d.pid = rd.pid AND d.activity_date = rd.receiving_date
|
||||
LEFT JOIN StockData s ON d.pid = s.pid
|
||||
RETURNING pid, snapshot_date
|
||||
),
|
||||
-- Now build the aggregated product metrics from the daily snapshots
|
||||
MetricsInsert AS (
|
||||
INSERT INTO public.product_metrics (
|
||||
pid,
|
||||
sku,
|
||||
current_stock_quantity,
|
||||
current_stock_cost,
|
||||
current_stock_retail,
|
||||
current_stock_msrp,
|
||||
is_out_of_stock,
|
||||
total_units_sold,
|
||||
total_units_returned,
|
||||
return_rate,
|
||||
gross_revenue,
|
||||
total_discounts,
|
||||
total_returns,
|
||||
net_revenue,
|
||||
total_cogs,
|
||||
total_gross_revenue,
|
||||
total_profit,
|
||||
profit_margin,
|
||||
avg_daily_units,
|
||||
reorder_point,
|
||||
reorder_alert,
|
||||
days_of_supply,
|
||||
sales_velocity,
|
||||
sales_velocity_score,
|
||||
rank_by_revenue,
|
||||
rank_by_quantity,
|
||||
rank_by_profit,
|
||||
total_received_quantity,
|
||||
total_received_cost,
|
||||
last_sold_date,
|
||||
last_received_date,
|
||||
days_since_last_sale,
|
||||
days_since_last_received,
|
||||
calculation_timestamp
|
||||
)
|
||||
SELECT
|
||||
p.pid,
|
||||
p.sku,
|
||||
p.stock_quantity AS current_stock_quantity,
|
||||
p.stock_quantity * COALESCE(p.landing_cost_price, p.cost_price, 0) AS current_stock_cost,
|
||||
p.stock_quantity * COALESCE(p.price, 0) AS current_stock_retail,
|
||||
p.stock_quantity * COALESCE(p.regular_price, 0) AS current_stock_msrp,
|
||||
(p.stock_quantity <= 0) AS is_out_of_stock,
|
||||
-- Aggregate metrics
|
||||
COALESCE(SUM(ds.units_sold), 0) AS total_units_sold,
|
||||
COALESCE(SUM(ds.units_returned), 0) AS total_units_returned,
|
||||
CASE
|
||||
WHEN COALESCE(SUM(ds.units_sold), 0) > 0
|
||||
THEN COALESCE(SUM(ds.units_returned), 0)::float / NULLIF(COALESCE(SUM(ds.units_sold), 0), 0)
|
||||
ELSE 0
|
||||
END AS return_rate,
|
||||
COALESCE(SUM(ds.gross_revenue), 0) AS gross_revenue,
|
||||
COALESCE(SUM(ds.discounts), 0) AS total_discounts,
|
||||
COALESCE(SUM(ds.returns_revenue), 0) AS total_returns,
|
||||
COALESCE(SUM(ds.net_revenue), 0) AS net_revenue,
|
||||
COALESCE(SUM(ds.cogs), 0) AS total_cogs,
|
||||
COALESCE(SUM(ds.gross_regular_revenue), 0) AS total_gross_revenue,
|
||||
COALESCE(SUM(ds.profit), 0) AS total_profit,
|
||||
CASE
|
||||
WHEN COALESCE(SUM(ds.net_revenue), 0) > 0
|
||||
THEN COALESCE(SUM(ds.profit), 0) / NULLIF(COALESCE(SUM(ds.net_revenue), 0), 0)
|
||||
ELSE 0
|
||||
END AS profit_margin,
|
||||
-- Calculate average daily units
|
||||
COALESCE(AVG(ds.units_sold), 0) AS avg_daily_units,
|
||||
-- Calculate reorder point (simplified, can be enhanced with lead time and safety stock)
|
||||
CEILING(COALESCE(AVG(ds.units_sold) * 14, 0)) AS reorder_point,
|
||||
(p.stock_quantity <= CEILING(COALESCE(AVG(ds.units_sold) * 14, 0))) AS reorder_alert,
|
||||
-- Days of supply based on average daily sales
|
||||
CASE
|
||||
WHEN COALESCE(AVG(ds.units_sold), 0) > 0
|
||||
THEN p.stock_quantity / NULLIF(COALESCE(AVG(ds.units_sold), 0), 0)
|
||||
ELSE NULL
|
||||
END AS days_of_supply,
|
||||
-- Sales velocity (average units sold per day over last 30 days)
|
||||
(SELECT COALESCE(AVG(recent.units_sold), 0)
|
||||
FROM public.daily_product_snapshots recent
|
||||
WHERE recent.pid = p.pid
|
||||
AND recent.snapshot_date >= CURRENT_DATE - INTERVAL '30 days'
|
||||
) AS sales_velocity,
|
||||
-- Placeholder for sales velocity score (can be calculated based on velocity)
|
||||
0 AS sales_velocity_score,
|
||||
-- Will be updated later by ranking procedure
|
||||
0 AS rank_by_revenue,
|
||||
0 AS rank_by_quantity,
|
||||
0 AS rank_by_profit,
|
||||
-- Receiving data
|
||||
COALESCE(SUM(ds.units_received), 0) AS total_received_quantity,
|
||||
COALESCE(SUM(ds.cost_received), 0) AS total_received_cost,
|
||||
-- Date metrics
|
||||
(SELECT MAX(sd.snapshot_date)
|
||||
FROM public.daily_product_snapshots sd
|
||||
WHERE sd.pid = p.pid AND sd.units_sold > 0
|
||||
) AS last_sold_date,
|
||||
(SELECT MAX(rd.snapshot_date)
|
||||
FROM public.daily_product_snapshots rd
|
||||
WHERE rd.pid = p.pid AND rd.units_received > 0
|
||||
) AS last_received_date,
|
||||
-- Calculate days since last sale/received
|
||||
CASE
|
||||
WHEN (SELECT MAX(sd.snapshot_date)
|
||||
FROM public.daily_product_snapshots sd
|
||||
WHERE sd.pid = p.pid AND sd.units_sold > 0) IS NOT NULL
|
||||
THEN (CURRENT_DATE - (SELECT MAX(sd.snapshot_date)
|
||||
FROM public.daily_product_snapshots sd
|
||||
WHERE sd.pid = p.pid AND sd.units_sold > 0))::integer
|
||||
ELSE NULL
|
||||
END AS days_since_last_sale,
|
||||
CASE
|
||||
WHEN (SELECT MAX(rd.snapshot_date)
|
||||
FROM public.daily_product_snapshots rd
|
||||
WHERE rd.pid = p.pid AND rd.units_received > 0) IS NOT NULL
|
||||
THEN (CURRENT_DATE - (SELECT MAX(rd.snapshot_date)
|
||||
FROM public.daily_product_snapshots rd
|
||||
WHERE rd.pid = p.pid AND rd.units_received > 0))::integer
|
||||
ELSE NULL
|
||||
END AS days_since_last_received,
|
||||
now() -- calculation timestamp
|
||||
FROM public.products p
|
||||
LEFT JOIN public.daily_product_snapshots ds ON p.pid = ds.pid
|
||||
GROUP BY p.pid, p.sku, p.stock_quantity, p.landing_cost_price, p.cost_price, p.price, p.regular_price
|
||||
)
|
||||
|
||||
-- Update the calculate_status table
|
||||
INSERT INTO public.calculate_status (module_name, last_calculation_timestamp)
|
||||
VALUES
|
||||
('daily_snapshots', now()),
|
||||
('product_metrics', now())
|
||||
ON CONFLICT (module_name) DO UPDATE
|
||||
SET last_calculation_timestamp = now();
|
||||
|
||||
-- Finally, update the ranks for products
|
||||
UPDATE public.product_metrics pm SET
|
||||
rank_by_revenue = rev_ranks.rank
|
||||
FROM (
|
||||
SELECT pid, RANK() OVER (ORDER BY net_revenue DESC) AS rank
|
||||
FROM public.product_metrics
|
||||
WHERE net_revenue > 0
|
||||
) rev_ranks
|
||||
WHERE pm.pid = rev_ranks.pid;
|
||||
|
||||
UPDATE public.product_metrics pm SET
|
||||
rank_by_quantity = qty_ranks.rank
|
||||
FROM (
|
||||
SELECT pid, RANK() OVER (ORDER BY total_units_sold DESC) AS rank
|
||||
FROM public.product_metrics
|
||||
WHERE total_units_sold > 0
|
||||
) qty_ranks
|
||||
WHERE pm.pid = qty_ranks.pid;
|
||||
|
||||
UPDATE public.product_metrics pm SET
|
||||
rank_by_profit = profit_ranks.rank
|
||||
FROM (
|
||||
SELECT pid, RANK() OVER (ORDER BY total_profit DESC) AS rank
|
||||
FROM public.product_metrics
|
||||
WHERE total_profit > 0
|
||||
) profit_ranks
|
||||
WHERE pm.pid = profit_ranks.pid;
|
||||
|
||||
-- Return count of products with metrics
|
||||
SELECT COUNT(*) AS product_count FROM public.product_metrics
|
||||
`);
|
||||
};
|
||||
|
||||
async function populateInitialMetrics() {
|
||||
let connection;
|
||||
const startTime = Date.now();
|
||||
let calculateHistoryId;
|
||||
|
||||
try {
|
||||
// Clean up any previously running calculations
|
||||
connection = await getConnection({
|
||||
// Add performance-related settings
|
||||
application_name: 'populate_metrics',
|
||||
statement_timeout: PG_STATEMENT_TIMEOUT_MS, // 30 min timeout per statement
|
||||
});
|
||||
|
||||
// Ensure the calculate_status table exists and has the correct structure
|
||||
await connection.query(`
|
||||
CREATE TABLE IF NOT EXISTS calculate_status (
|
||||
module_name TEXT PRIMARY KEY,
|
||||
last_calculation_timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||
)
|
||||
`);
|
||||
|
||||
await connection.query(`
|
||||
UPDATE calculate_history
|
||||
SET
|
||||
status = 'cancelled',
|
||||
end_time = NOW(),
|
||||
duration_seconds = EXTRACT(EPOCH FROM (NOW() - start_time))::INTEGER,
|
||||
error_message = 'Previous calculation was not completed properly'
|
||||
WHERE status = 'running' AND additional_info->>'type' = 'populate_initial_metrics'
|
||||
`);
|
||||
|
||||
// Create history record for this calculation
|
||||
const historyResult = await connection.query(`
|
||||
INSERT INTO calculate_history (
|
||||
start_time,
|
||||
status,
|
||||
additional_info
|
||||
) VALUES (
|
||||
NOW(),
|
||||
'running',
|
||||
jsonb_build_object(
|
||||
'type', 'populate_initial_metrics',
|
||||
'sql_file', 'populate_initial_product_metrics.sql'
|
||||
)
|
||||
) RETURNING id
|
||||
`);
|
||||
calculateHistoryId = historyResult.rows[0].id;
|
||||
|
||||
// Initialize progress
|
||||
global.outputProgress({
|
||||
status: 'running',
|
||||
operation: 'Starting initial product metrics population',
|
||||
current: 0,
|
||||
total: 100,
|
||||
elapsed: '0s',
|
||||
remaining: 'Calculating... (this may take a while)',
|
||||
rate: 0,
|
||||
percentage: '0',
|
||||
timing: {
|
||||
start_time: new Date(startTime).toISOString(),
|
||||
end_time: new Date().toISOString(),
|
||||
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
||||
},
|
||||
historyId: calculateHistoryId
|
||||
});
|
||||
|
||||
// Prepare the database - analyze tables
|
||||
global.outputProgress({
|
||||
status: 'running',
|
||||
operation: 'Analyzing database tables for better query performance',
|
||||
current: 2,
|
||||
total: 100,
|
||||
elapsed: global.formatElapsedTime(startTime),
|
||||
remaining: 'Analyzing...',
|
||||
rate: 0,
|
||||
percentage: '2',
|
||||
timing: {
|
||||
start_time: new Date(startTime).toISOString(),
|
||||
end_time: new Date().toISOString(),
|
||||
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
||||
},
|
||||
historyId: calculateHistoryId
|
||||
});
|
||||
|
||||
// Enable better query planning and parallel operations
|
||||
await connection.query(`
|
||||
-- Analyze tables for better query planning
|
||||
ANALYZE public.products;
|
||||
ANALYZE public.purchase_orders;
|
||||
ANALYZE public.daily_product_snapshots;
|
||||
ANALYZE public.orders;
|
||||
|
||||
-- Enable parallel operations
|
||||
SET LOCAL enable_parallel_append = on;
|
||||
SET LOCAL enable_parallel_hash = on;
|
||||
SET LOCAL max_parallel_workers_per_gather = 4;
|
||||
|
||||
-- Larger work memory for complex sorts/joins
|
||||
SET LOCAL work_mem = '128MB';
|
||||
`).catch(err => {
|
||||
// Non-fatal if analyze fails
|
||||
console.warn('Failed to analyze tables (non-fatal):', err.message);
|
||||
});
|
||||
|
||||
// Execute the SQL query
|
||||
global.outputProgress({
|
||||
status: 'running',
|
||||
operation: 'Executing initial metrics SQL query',
|
||||
current: 5,
|
||||
total: 100,
|
||||
elapsed: global.formatElapsedTime(startTime),
|
||||
remaining: 'Calculating... (this could take several hours with 150M+ records)',
|
||||
rate: 0,
|
||||
percentage: '5',
|
||||
timing: {
|
||||
start_time: new Date(startTime).toISOString(),
|
||||
end_time: new Date().toISOString(),
|
||||
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
||||
},
|
||||
historyId: calculateHistoryId
|
||||
});
|
||||
|
||||
// Read the SQL file
|
||||
const sqlFilePath = path.resolve(__dirname, 'populate_initial_product_metrics.sql');
|
||||
console.log('Base directory:', baseDir);
|
||||
console.log('Script directory:', __dirname);
|
||||
console.log('SQL file path:', sqlFilePath);
|
||||
console.log('Current working directory:', process.cwd());
|
||||
|
||||
if (!fs.existsSync(sqlFilePath)) {
|
||||
throw new Error(`SQL file not found at ${sqlFilePath}`);
|
||||
}
|
||||
|
||||
// Read and clean up the SQL (Slightly more robust cleaning)
|
||||
const sqlQuery = fs.readFileSync(sqlFilePath, 'utf8')
|
||||
.replace(/\r\n/g, '\n') // Handle Windows endings
|
||||
.replace(/\r/g, '\n') // Handle old Mac endings
|
||||
.trim(); // Remove leading/trailing whitespace VERY IMPORTANT
|
||||
|
||||
// Log details again AFTER cleaning
|
||||
console.log('SQL Query length (cleaned):', sqlQuery.length);
|
||||
console.log('SQL Query structure validation:');
|
||||
console.log('- Contains DO block:', sqlQuery.includes('DO $$') || sqlQuery.includes('DO $')); // Check both types of tag start
|
||||
console.log('- Contains BEGIN:', sqlQuery.includes('BEGIN'));
|
||||
console.log('- Contains END:', sqlQuery.includes('END $$;') || sqlQuery.includes('END $')); // Check both types of tag end
|
||||
console.log('- First 50 chars:', JSON.stringify(sqlQuery.slice(0, 50)));
|
||||
console.log('- Last 100 chars (cleaned):', JSON.stringify(sqlQuery.slice(-100)));
|
||||
|
||||
// Final check to ensure clean SQL ending
|
||||
if (!sqlQuery.endsWith('END $$;')) {
|
||||
console.warn('WARNING: SQL does not end with "END $$;". This might cause issues.');
|
||||
console.log('Exact ending:', JSON.stringify(sqlQuery.slice(-20)));
|
||||
}
|
||||
|
||||
// Execute the script
|
||||
console.log('Starting initial product metrics population...');
|
||||
|
||||
// Track the query promise for potential cancellation
|
||||
runningQueryPromise = connection.query({
|
||||
text: sqlQuery,
|
||||
rowMode: 'array'
|
||||
});
|
||||
await runningQueryPromise;
|
||||
runningQueryPromise = null;
|
||||
|
||||
// Update progress to 100%
|
||||
global.outputProgress({
|
||||
status: 'complete',
|
||||
operation: 'Initial product metrics population complete',
|
||||
current: 100,
|
||||
total: 100,
|
||||
elapsed: global.formatElapsedTime(startTime),
|
||||
remaining: '0s',
|
||||
rate: 0,
|
||||
percentage: '100',
|
||||
timing: {
|
||||
start_time: new Date(startTime).toISOString(),
|
||||
end_time: new Date().toISOString(),
|
||||
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
|
||||
},
|
||||
historyId: calculateHistoryId
|
||||
});
|
||||
|
||||
// Update history with completion
|
||||
await connection.query(`
|
||||
UPDATE calculate_history
|
||||
SET
|
||||
end_time = NOW(),
|
||||
duration_seconds = $1,
|
||||
status = 'completed'
|
||||
WHERE id = $2
|
||||
`, [Math.round((Date.now() - startTime) / 1000), calculateHistoryId]);
|
||||
|
||||
// Clear progress file on successful completion
|
||||
global.clearProgress();
|
||||
|
||||
return {
|
||||
success: true,
|
||||
message: 'Initial product metrics population completed successfully',
|
||||
duration: Math.round((Date.now() - startTime) / 1000)
|
||||
};
|
||||
} catch (error) {
|
||||
const endTime = Date.now();
|
||||
const totalElapsedSeconds = Math.round((endTime - startTime) / 1000);
|
||||
|
||||
// Enhanced error logging
|
||||
console.error('Error details:', {
|
||||
message: error.message,
|
||||
code: error.code,
|
||||
hint: error.hint,
|
||||
position: error.position,
|
||||
detail: error.detail,
|
||||
where: error.where ? error.where.substring(0, 500) + '...' : undefined, // Truncate to avoid huge logs
|
||||
severity: error.severity,
|
||||
file: error.file,
|
||||
line: error.line,
|
||||
routine: error.routine
|
||||
});
|
||||
|
||||
// Update history with error
|
||||
if (connection && calculateHistoryId) {
|
||||
await connection.query(`
|
||||
UPDATE calculate_history
|
||||
SET
|
||||
end_time = NOW(),
|
||||
duration_seconds = $1,
|
||||
status = $2,
|
||||
error_message = $3
|
||||
WHERE id = $4
|
||||
`, [
|
||||
totalElapsedSeconds,
|
||||
isCancelled ? 'cancelled' : 'failed',
|
||||
error.message,
|
||||
calculateHistoryId
|
||||
]);
|
||||
}
|
||||
|
||||
if (isCancelled) {
|
||||
global.outputProgress({
|
||||
status: 'cancelled',
|
||||
operation: 'Calculation cancelled',
|
||||
current: 50,
|
||||
total: 100,
|
||||
elapsed: global.formatElapsedTime(startTime),
|
||||
remaining: null,
|
||||
rate: 0,
|
||||
percentage: '50',
|
||||
timing: {
|
||||
start_time: new Date(startTime).toISOString(),
|
||||
end_time: new Date().toISOString(),
|
||||
elapsed_seconds: totalElapsedSeconds
|
||||
},
|
||||
historyId: calculateHistoryId
|
||||
});
|
||||
} else {
|
||||
global.outputProgress({
|
||||
status: 'error',
|
||||
operation: 'Error during initial product metrics population',
|
||||
message: error.message,
|
||||
current: 0,
|
||||
total: 100,
|
||||
elapsed: global.formatElapsedTime(startTime),
|
||||
remaining: null,
|
||||
rate: 0,
|
||||
percentage: '0',
|
||||
timing: {
|
||||
start_time: new Date(startTime).toISOString(),
|
||||
end_time: new Date().toISOString(),
|
||||
elapsed_seconds: totalElapsedSeconds
|
||||
},
|
||||
historyId: calculateHistoryId
|
||||
});
|
||||
}
|
||||
|
||||
console.error('Error during initial product metrics population:', error);
|
||||
return {
|
||||
success: false,
|
||||
error: error.message,
|
||||
duration: totalElapsedSeconds
|
||||
};
|
||||
} finally {
|
||||
if (connection) {
|
||||
connection.release();
|
||||
}
|
||||
await closePool();
|
||||
}
|
||||
}
|
||||
|
||||
// Start population process
|
||||
populateInitialMetrics()
|
||||
.then(result => {
|
||||
if (result.success) {
|
||||
console.log(`Initial product metrics population completed successfully in ${result.duration} seconds`);
|
||||
process.exit(0);
|
||||
} else {
|
||||
console.error(`Initial product metrics population failed: ${result.error}`);
|
||||
process.exit(1);
|
||||
}
|
||||
})
|
||||
.catch(err => {
|
||||
console.error('Unexpected error:', err);
|
||||
process.exit(1);
|
||||
});
|
||||
Reference in New Issue
Block a user