Add in forecasting, lifecycle phases, associated component and script changes
This commit is contained in:
@@ -11,6 +11,7 @@ const RUN_PERIODIC_METRICS = true;
|
||||
const RUN_BRAND_METRICS = true;
|
||||
const RUN_VENDOR_METRICS = true;
|
||||
const RUN_CATEGORY_METRICS = true;
|
||||
const RUN_LIFECYCLE_FORECASTS = true;
|
||||
|
||||
// Maximum execution time for the entire sequence (e.g., 90 minutes)
|
||||
const MAX_EXECUTION_TIME_TOTAL = 90 * 60 * 1000;
|
||||
@@ -592,6 +593,13 @@ async function runAllCalculations() {
|
||||
historyType: 'product_metrics',
|
||||
statusModule: 'product_metrics'
|
||||
},
|
||||
{
|
||||
run: RUN_LIFECYCLE_FORECASTS,
|
||||
name: 'Lifecycle Forecast Update',
|
||||
sqlFile: 'metrics-new/update_lifecycle_forecasts.sql',
|
||||
historyType: 'lifecycle_forecasts',
|
||||
statusModule: 'lifecycle_forecasts'
|
||||
},
|
||||
{
|
||||
run: RUN_PERIODIC_METRICS,
|
||||
name: 'Periodic Metrics Update',
|
||||
|
||||
Binary file not shown.
1612
inventory-server/scripts/forecast/forecast_engine.py
Normal file
1612
inventory-server/scripts/forecast/forecast_engine.py
Normal file
File diff suppressed because it is too large
Load Diff
5
inventory-server/scripts/forecast/requirements.txt
Normal file
5
inventory-server/scripts/forecast/requirements.txt
Normal file
@@ -0,0 +1,5 @@
|
||||
numpy>=1.24
|
||||
scipy>=1.10
|
||||
pandas>=2.0
|
||||
psycopg2-binary>=2.9
|
||||
statsmodels>=0.14
|
||||
128
inventory-server/scripts/forecast/run_forecast.js
Normal file
128
inventory-server/scripts/forecast/run_forecast.js
Normal file
@@ -0,0 +1,128 @@
|
||||
#!/usr/bin/env node
|
||||
/**
|
||||
* Forecast Pipeline Orchestrator
|
||||
*
|
||||
* Spawns the Python forecast engine with database credentials from the
|
||||
* environment. Can be run manually, via cron, or integrated into the
|
||||
* existing metrics pipeline.
|
||||
*
|
||||
* Usage:
|
||||
* node run_forecast.js
|
||||
*
|
||||
* Environment:
|
||||
* Reads DB_HOST, DB_USER, DB_PASSWORD, DB_NAME, DB_PORT from
|
||||
* /var/www/html/inventory/.env (or current process env).
|
||||
*/
|
||||
|
||||
const { spawn } = require('child_process');
|
||||
const path = require('path');
|
||||
const fs = require('fs');
|
||||
|
||||
// Load .env file if it exists (production path)
|
||||
const envPaths = [
|
||||
'/var/www/html/inventory/.env',
|
||||
path.join(__dirname, '../../.env'),
|
||||
];
|
||||
|
||||
for (const envPath of envPaths) {
|
||||
if (fs.existsSync(envPath)) {
|
||||
const envContent = fs.readFileSync(envPath, 'utf-8');
|
||||
for (const line of envContent.split('\n')) {
|
||||
const trimmed = line.trim();
|
||||
if (!trimmed || trimmed.startsWith('#')) continue;
|
||||
const eqIndex = trimmed.indexOf('=');
|
||||
if (eqIndex === -1) continue;
|
||||
const key = trimmed.slice(0, eqIndex);
|
||||
const value = trimmed.slice(eqIndex + 1);
|
||||
if (!process.env[key]) {
|
||||
process.env[key] = value;
|
||||
}
|
||||
}
|
||||
console.log(`Loaded env from ${envPath}`);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Verify required env vars
|
||||
const required = ['DB_HOST', 'DB_USER', 'DB_PASSWORD', 'DB_NAME'];
|
||||
const missing = required.filter(k => !process.env[k]);
|
||||
if (missing.length > 0) {
|
||||
console.error(`Missing required environment variables: ${missing.join(', ')}`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const SCRIPT_DIR = __dirname;
|
||||
const PYTHON_SCRIPT = path.join(SCRIPT_DIR, 'forecast_engine.py');
|
||||
const VENV_DIR = path.join(SCRIPT_DIR, 'venv');
|
||||
const REQUIREMENTS = path.join(SCRIPT_DIR, 'requirements.txt');
|
||||
|
||||
// Determine python binary (prefer venv if it exists)
|
||||
function getPythonBin() {
|
||||
const venvPython = path.join(VENV_DIR, 'bin', 'python');
|
||||
if (fs.existsSync(venvPython)) return venvPython;
|
||||
|
||||
// Fall back to system python
|
||||
return 'python3';
|
||||
}
|
||||
|
||||
// Ensure venv and dependencies are installed
|
||||
async function ensureDependencies() {
|
||||
if (!fs.existsSync(path.join(VENV_DIR, 'bin', 'python'))) {
|
||||
console.log('Creating virtual environment...');
|
||||
await runCommand('python3', ['-m', 'venv', VENV_DIR]);
|
||||
}
|
||||
|
||||
// Always run pip install — idempotent, fast when packages already present
|
||||
console.log('Checking dependencies...');
|
||||
const python = path.join(VENV_DIR, 'bin', 'python');
|
||||
await runCommand(python, ['-m', 'pip', 'install', '--quiet', '-r', REQUIREMENTS]);
|
||||
}
|
||||
|
||||
function runCommand(cmd, args, options = {}) {
|
||||
return new Promise((resolve, reject) => {
|
||||
const proc = spawn(cmd, args, {
|
||||
stdio: 'inherit',
|
||||
...options,
|
||||
});
|
||||
proc.on('close', code => {
|
||||
if (code === 0) resolve();
|
||||
else reject(new Error(`${cmd} exited with code ${code}`));
|
||||
});
|
||||
proc.on('error', reject);
|
||||
});
|
||||
}
|
||||
|
||||
async function main() {
|
||||
const startTime = Date.now();
|
||||
console.log('='.repeat(60));
|
||||
console.log(`Forecast Pipeline - ${new Date().toISOString()}`);
|
||||
console.log('='.repeat(60));
|
||||
|
||||
try {
|
||||
await ensureDependencies();
|
||||
|
||||
const pythonBin = getPythonBin();
|
||||
console.log(`Using Python: ${pythonBin}`);
|
||||
console.log(`Running: ${PYTHON_SCRIPT}`);
|
||||
console.log('');
|
||||
|
||||
await runCommand(pythonBin, [PYTHON_SCRIPT], {
|
||||
env: {
|
||||
...process.env,
|
||||
PYTHONUNBUFFERED: '1', // Real-time output
|
||||
},
|
||||
});
|
||||
|
||||
const duration = ((Date.now() - startTime) / 1000).toFixed(1);
|
||||
console.log('');
|
||||
console.log('='.repeat(60));
|
||||
console.log(`Forecast pipeline completed in ${duration}s`);
|
||||
console.log('='.repeat(60));
|
||||
} catch (err) {
|
||||
const duration = ((Date.now() - startTime) / 1000).toFixed(1);
|
||||
console.error(`Forecast pipeline FAILED after ${duration}s:`, err.message);
|
||||
process.exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
main();
|
||||
51
inventory-server/scripts/forecast/sql/create_tables.sql
Normal file
51
inventory-server/scripts/forecast/sql/create_tables.sql
Normal file
@@ -0,0 +1,51 @@
|
||||
-- Forecasting Pipeline Tables
|
||||
-- Run once to create the schema. Safe to re-run (IF NOT EXISTS).
|
||||
|
||||
-- Precomputed reference decay curves per brand (or brand x category at any hierarchy level)
|
||||
CREATE TABLE IF NOT EXISTS brand_lifecycle_curves (
|
||||
id SERIAL PRIMARY KEY,
|
||||
brand TEXT NOT NULL,
|
||||
root_category TEXT, -- NULL = brand-level fallback curve, else category name
|
||||
cat_id BIGINT, -- NULL = brand-only; else category_hierarchy.cat_id for precise matching
|
||||
category_level SMALLINT, -- NULL = brand-only; 0-3 = hierarchy depth
|
||||
amplitude NUMERIC(10,4), -- A in: sales(t) = A * exp(-λt) + C
|
||||
decay_rate NUMERIC(10,6), -- λ (higher = faster decay)
|
||||
baseline NUMERIC(10,4), -- C (long-tail steady-state daily sales)
|
||||
r_squared NUMERIC(6,4), -- goodness of fit
|
||||
sample_size INT, -- number of products that informed this curve
|
||||
median_first_week_sales NUMERIC(10,2), -- for scaling new launches
|
||||
median_preorder_sales NUMERIC(10,2), -- for scaling pre-order products
|
||||
median_preorder_days NUMERIC(10,2), -- median pre-order accumulation window (days)
|
||||
computed_at TIMESTAMP DEFAULT NOW(),
|
||||
UNIQUE(brand, cat_id)
|
||||
);
|
||||
|
||||
-- Per-product daily forecasts (next 90 days, regenerated each run)
|
||||
CREATE TABLE IF NOT EXISTS product_forecasts (
|
||||
pid BIGINT NOT NULL,
|
||||
forecast_date DATE NOT NULL,
|
||||
forecast_units NUMERIC(10,2),
|
||||
forecast_revenue NUMERIC(14,4),
|
||||
lifecycle_phase TEXT, -- preorder, launch, decay, mature, slow_mover, dormant
|
||||
forecast_method TEXT, -- lifecycle_curve, exp_smoothing, velocity, zero
|
||||
confidence_lower NUMERIC(10,2),
|
||||
confidence_upper NUMERIC(10,2),
|
||||
generated_at TIMESTAMP DEFAULT NOW(),
|
||||
PRIMARY KEY (pid, forecast_date)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_pf_date ON product_forecasts(forecast_date);
|
||||
CREATE INDEX IF NOT EXISTS idx_pf_phase ON product_forecasts(lifecycle_phase);
|
||||
|
||||
-- Forecast run history (for monitoring)
|
||||
CREATE TABLE IF NOT EXISTS forecast_runs (
|
||||
id SERIAL PRIMARY KEY,
|
||||
started_at TIMESTAMP NOT NULL,
|
||||
finished_at TIMESTAMP,
|
||||
status TEXT DEFAULT 'running', -- running, completed, failed
|
||||
products_forecast INT,
|
||||
phase_counts JSONB, -- {"launch": 50, "decay": 200, ...}
|
||||
curve_count INT, -- brand curves computed
|
||||
error_message TEXT,
|
||||
duration_seconds NUMERIC(10,2)
|
||||
);
|
||||
@@ -40,7 +40,7 @@ const sshConfig = {
|
||||
password: process.env.PROD_DB_PASSWORD,
|
||||
database: process.env.PROD_DB_NAME,
|
||||
port: process.env.PROD_DB_PORT || 3306,
|
||||
timezone: '-05:00', // Production DB always stores times in EST (UTC-5) regardless of DST
|
||||
timezone: '-05:00', // mysql2 driver timezone — corrected at runtime via adjustDateForMySQL() in utils.js
|
||||
},
|
||||
localDbConfig: {
|
||||
// PostgreSQL config for local
|
||||
|
||||
@@ -58,8 +58,12 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
|
||||
"SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'orders'"
|
||||
);
|
||||
const lastSyncTime = syncInfo?.rows?.[0]?.last_sync_timestamp || '1970-01-01';
|
||||
// Adjust for mysql2 driver timezone vs MySQL server timezone mismatch
|
||||
const mysqlSyncTime = prodConnection.adjustDateForMySQL
|
||||
? prodConnection.adjustDateForMySQL(lastSyncTime)
|
||||
: lastSyncTime;
|
||||
|
||||
console.log('Orders: Using last sync time:', lastSyncTime);
|
||||
console.log('Orders: Using last sync time:', lastSyncTime, '(adjusted:', mysqlSyncTime, ')');
|
||||
|
||||
// First get count of order items - Keep MySQL compatible for production
|
||||
const [[{ total }]] = await prodConnection.query(`
|
||||
@@ -82,7 +86,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
|
||||
)
|
||||
)
|
||||
` : ''}
|
||||
`, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []);
|
||||
`, incrementalUpdate ? [mysqlSyncTime, mysqlSyncTime, mysqlSyncTime] : []);
|
||||
|
||||
totalOrderItems = total;
|
||||
console.log('Orders: Found changes:', totalOrderItems);
|
||||
@@ -116,7 +120,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
|
||||
)
|
||||
)
|
||||
` : ''}
|
||||
`, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []);
|
||||
`, incrementalUpdate ? [mysqlSyncTime, mysqlSyncTime, mysqlSyncTime] : []);
|
||||
|
||||
console.log('Orders: Found', orderItems.length, 'order items to process');
|
||||
|
||||
|
||||
@@ -669,8 +669,13 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
|
||||
// Setup temporary tables
|
||||
await setupTemporaryTables(localConnection);
|
||||
|
||||
// Adjust sync time for mysql2 driver timezone vs MySQL server timezone mismatch
|
||||
const mysqlSyncTime = prodConnection.adjustDateForMySQL
|
||||
? prodConnection.adjustDateForMySQL(lastSyncTime)
|
||||
: lastSyncTime;
|
||||
|
||||
// Materialize calculations into temp table
|
||||
const materializeResult = await materializeCalculations(prodConnection, localConnection, incrementalUpdate, lastSyncTime, startTime);
|
||||
const materializeResult = await materializeCalculations(prodConnection, localConnection, incrementalUpdate, mysqlSyncTime, startTime);
|
||||
|
||||
// Get the list of products that need updating
|
||||
const [products] = await localConnection.query(`
|
||||
|
||||
@@ -65,8 +65,12 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
|
||||
"SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'purchase_orders'"
|
||||
);
|
||||
const lastSyncTime = syncInfo?.rows?.[0]?.last_sync_timestamp || '1970-01-01';
|
||||
// Adjust for mysql2 driver timezone vs MySQL server timezone mismatch
|
||||
const mysqlSyncTime = prodConnection.adjustDateForMySQL
|
||||
? prodConnection.adjustDateForMySQL(lastSyncTime)
|
||||
: lastSyncTime;
|
||||
|
||||
console.log('Purchase Orders: Using last sync time:', lastSyncTime);
|
||||
console.log('Purchase Orders: Using last sync time:', lastSyncTime, '(adjusted:', mysqlSyncTime, ')');
|
||||
|
||||
// Create temp tables for processing
|
||||
await localConnection.query(`
|
||||
@@ -254,7 +258,7 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
|
||||
OR p.date_estin > ?
|
||||
)
|
||||
` : ''}
|
||||
`, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []);
|
||||
`, incrementalUpdate ? [mysqlSyncTime, mysqlSyncTime, mysqlSyncTime] : []);
|
||||
|
||||
const totalPOs = poCount[0].total;
|
||||
console.log(`Found ${totalPOs} relevant purchase orders`);
|
||||
@@ -291,7 +295,7 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
|
||||
` : ''}
|
||||
ORDER BY p.po_id
|
||||
LIMIT ${PO_BATCH_SIZE} OFFSET ${offset}
|
||||
`, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []);
|
||||
`, incrementalUpdate ? [mysqlSyncTime, mysqlSyncTime, mysqlSyncTime] : []);
|
||||
|
||||
if (poList.length === 0) {
|
||||
allPOsProcessed = true;
|
||||
@@ -426,7 +430,7 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
|
||||
OR r.date_created > ?
|
||||
)
|
||||
` : ''}
|
||||
`, incrementalUpdate ? [lastSyncTime, lastSyncTime] : []);
|
||||
`, incrementalUpdate ? [mysqlSyncTime, mysqlSyncTime] : []);
|
||||
|
||||
const totalReceivings = receivingCount[0].total;
|
||||
console.log(`Found ${totalReceivings} relevant receivings`);
|
||||
@@ -463,7 +467,7 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
|
||||
` : ''}
|
||||
ORDER BY r.receiving_id
|
||||
LIMIT ${PO_BATCH_SIZE} OFFSET ${offset}
|
||||
`, incrementalUpdate ? [lastSyncTime, lastSyncTime] : []);
|
||||
`, incrementalUpdate ? [mysqlSyncTime, mysqlSyncTime] : []);
|
||||
|
||||
if (receivingList.length === 0) {
|
||||
allReceivingsProcessed = true;
|
||||
|
||||
@@ -48,6 +48,37 @@ async function setupConnections(sshConfig) {
|
||||
stream: tunnel.stream,
|
||||
});
|
||||
|
||||
// Detect MySQL server timezone and calculate correction for the driver timezone mismatch.
|
||||
// The mysql2 driver is configured with timezone: '-05:00' (EST), but the MySQL server
|
||||
// may be in a different timezone (e.g., America/Chicago = CST/CDT). When the driver
|
||||
// formats a JS Date as EST and MySQL interprets it in its own timezone, DATETIME
|
||||
// comparisons can be off. This correction adjusts Date objects before they're passed
|
||||
// to MySQL queries so the formatted string matches the server's local time.
|
||||
const [[{ utcDiffSec }]] = await prodConnection.query(
|
||||
"SELECT TIMESTAMPDIFF(SECOND, NOW(), UTC_TIMESTAMP()) as utcDiffSec"
|
||||
);
|
||||
const mysqlOffsetMs = -utcDiffSec * 1000; // MySQL UTC offset in ms (e.g., -21600000 for CST)
|
||||
const driverOffsetMs = -5 * 3600 * 1000; // Driver's -05:00 in ms (-18000000)
|
||||
const tzCorrectionMs = driverOffsetMs - mysqlOffsetMs;
|
||||
// CST (winter): -18000000 - (-21600000) = +3600000 (1 hour correction needed)
|
||||
// CDT (summer): -18000000 - (-18000000) = 0 (no correction needed)
|
||||
|
||||
if (tzCorrectionMs !== 0) {
|
||||
console.log(`MySQL timezone correction: ${tzCorrectionMs / 1000}s (server offset: ${utcDiffSec}s from UTC)`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Adjusts a Date/timestamp for the mysql2 driver timezone mismatch before
|
||||
* passing it as a query parameter to MySQL. This ensures that the string
|
||||
* mysql2 generates matches the timezone that DATETIME values are stored in.
|
||||
*/
|
||||
function adjustDateForMySQL(date) {
|
||||
if (!date || tzCorrectionMs === 0) return date;
|
||||
const d = date instanceof Date ? date : new Date(date);
|
||||
return new Date(d.getTime() - tzCorrectionMs);
|
||||
}
|
||||
prodConnection.adjustDateForMySQL = adjustDateForMySQL;
|
||||
|
||||
// Setup PostgreSQL connection pool for local
|
||||
const localPool = new Pool(sshConfig.localDbConfig);
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
-- Description: Calculates and updates daily aggregated product data.
|
||||
-- Self-healing: automatically detects and fills gaps in snapshot history.
|
||||
-- Always reprocesses recent days to pick up new orders and data corrections.
|
||||
-- Self-healing: detects gaps (missing snapshots), stale data (snapshot
|
||||
-- aggregates that don't match source tables after backfills), and always
|
||||
-- reprocesses recent days to pick up new orders and data corrections.
|
||||
-- Dependencies: Core import tables (products, orders, purchase_orders), calculate_status table.
|
||||
-- Frequency: Hourly (Run ~5-10 minutes after hourly data import completes).
|
||||
|
||||
@@ -18,28 +19,26 @@ DECLARE
|
||||
BEGIN
|
||||
RAISE NOTICE 'Running % script. Start Time: %', _module_name, _start_time;
|
||||
|
||||
-- Find the latest existing snapshot date to determine where gaps begin
|
||||
-- Find the latest existing snapshot date (for logging only)
|
||||
SELECT MAX(snapshot_date) INTO _latest_snapshot
|
||||
FROM public.daily_product_snapshots;
|
||||
|
||||
-- Determine how far back to look for gaps, capped at _max_backfill_days
|
||||
_backfill_start := GREATEST(
|
||||
COALESCE(_latest_snapshot + 1, CURRENT_DATE - _max_backfill_days),
|
||||
CURRENT_DATE - _max_backfill_days
|
||||
);
|
||||
-- Always scan the full backfill window to catch holes in the middle,
|
||||
-- not just gaps at the end. The gap fill and stale detection queries
|
||||
-- need to see the entire range to find missing or outdated snapshots.
|
||||
_backfill_start := CURRENT_DATE - _max_backfill_days;
|
||||
|
||||
IF _latest_snapshot IS NULL THEN
|
||||
RAISE NOTICE 'No existing snapshots found. Backfilling up to % days.', _max_backfill_days;
|
||||
ELSIF _backfill_start > _latest_snapshot + 1 THEN
|
||||
RAISE NOTICE 'Latest snapshot: %. Gap exceeds % day cap — backfilling from %. Use rebuild script for full history.',
|
||||
_latest_snapshot, _max_backfill_days, _backfill_start;
|
||||
ELSE
|
||||
RAISE NOTICE 'Latest snapshot: %. Checking for gaps from %.', _latest_snapshot, _backfill_start;
|
||||
RAISE NOTICE 'Latest snapshot: %. Scanning from % for gaps and stale data.', _latest_snapshot, _backfill_start;
|
||||
END IF;
|
||||
|
||||
-- Process all dates that need snapshots:
|
||||
-- 1. Gap fill: dates with orders/receivings but no snapshots (older than recent window)
|
||||
-- 2. Recent recheck: last N days always reprocessed (picks up new orders, corrections)
|
||||
-- 2. Stale detection: existing snapshots where aggregates don't match source data
|
||||
-- (catches backfilled imports that arrived after snapshot was calculated)
|
||||
-- 3. Recent recheck: last N days always reprocessed (picks up new orders, corrections)
|
||||
FOR _target_date IN
|
||||
SELECT d FROM (
|
||||
-- Gap fill: find dates with activity but missing snapshots
|
||||
@@ -55,6 +54,36 @@ BEGIN
|
||||
SELECT 1 FROM public.daily_product_snapshots dps WHERE dps.snapshot_date = activity_dates.d
|
||||
)
|
||||
UNION
|
||||
-- Stale detection: compare snapshot aggregates against source tables
|
||||
SELECT snap_agg.snapshot_date AS d
|
||||
FROM (
|
||||
SELECT snapshot_date,
|
||||
COALESCE(SUM(units_received), 0)::bigint AS snap_received,
|
||||
COALESCE(SUM(units_sold), 0)::bigint AS snap_sold
|
||||
FROM public.daily_product_snapshots
|
||||
WHERE snapshot_date >= _backfill_start
|
||||
AND snapshot_date < CURRENT_DATE - _recent_recheck_days
|
||||
GROUP BY snapshot_date
|
||||
) snap_agg
|
||||
LEFT JOIN (
|
||||
SELECT received_date::date AS d, SUM(qty_each)::bigint AS actual_received
|
||||
FROM public.receivings
|
||||
WHERE received_date::date >= _backfill_start
|
||||
AND received_date::date < CURRENT_DATE - _recent_recheck_days
|
||||
GROUP BY received_date::date
|
||||
) recv_agg ON snap_agg.snapshot_date = recv_agg.d
|
||||
LEFT JOIN (
|
||||
SELECT date::date AS d,
|
||||
SUM(CASE WHEN quantity > 0 AND COALESCE(status, 'pending') NOT IN ('canceled', 'returned')
|
||||
THEN quantity ELSE 0 END)::bigint AS actual_sold
|
||||
FROM public.orders
|
||||
WHERE date::date >= _backfill_start
|
||||
AND date::date < CURRENT_DATE - _recent_recheck_days
|
||||
GROUP BY date::date
|
||||
) orders_agg ON snap_agg.snapshot_date = orders_agg.d
|
||||
WHERE snap_agg.snap_received != COALESCE(recv_agg.actual_received, 0)
|
||||
OR snap_agg.snap_sold != COALESCE(orders_agg.actual_sold, 0)
|
||||
UNION
|
||||
-- Recent days: always reprocess
|
||||
SELECT d::date
|
||||
FROM generate_series(
|
||||
@@ -66,11 +95,18 @@ BEGIN
|
||||
ORDER BY d
|
||||
LOOP
|
||||
_days_processed := _days_processed + 1;
|
||||
RAISE NOTICE 'Processing date: % [%/%]', _target_date, _days_processed,
|
||||
_days_processed; -- count not known ahead of time, but shows progress
|
||||
|
||||
|
||||
-- Classify why this date is being processed (for logging)
|
||||
IF _target_date >= CURRENT_DATE - _recent_recheck_days THEN
|
||||
RAISE NOTICE 'Processing date: % [recent recheck]', _target_date;
|
||||
ELSIF NOT EXISTS (SELECT 1 FROM public.daily_product_snapshots WHERE snapshot_date = _target_date) THEN
|
||||
RAISE NOTICE 'Processing date: % [gap fill — no existing snapshot]', _target_date;
|
||||
ELSE
|
||||
RAISE NOTICE 'Processing date: % [stale data — snapshot aggregates mismatch source]', _target_date;
|
||||
END IF;
|
||||
|
||||
-- IMPORTANT: First delete any existing data for this date to prevent duplication
|
||||
DELETE FROM public.daily_product_snapshots
|
||||
DELETE FROM public.daily_product_snapshots
|
||||
WHERE snapshot_date = _target_date;
|
||||
|
||||
-- Proceed with calculating daily metrics only for products with actual activity
|
||||
|
||||
@@ -0,0 +1,131 @@
|
||||
-- Description: Populates lifecycle forecast columns on product_metrics from product_forecasts.
|
||||
-- Runs AFTER update_product_metrics.sql so that lead time / days of stock settings are available.
|
||||
-- Dependencies: product_metrics (fully populated), product_forecasts, settings tables.
|
||||
-- Frequency: After each metrics run and/or after forecast engine runs.
|
||||
|
||||
DO $$
|
||||
DECLARE
|
||||
_module_name TEXT := 'lifecycle_forecasts';
|
||||
_start_time TIMESTAMPTZ := clock_timestamp();
|
||||
_updated INT;
|
||||
BEGIN
|
||||
RAISE NOTICE 'Running % module. Start Time: %', _module_name, _start_time;
|
||||
|
||||
-- Step 1: Set lifecycle_phase from product_forecasts (one phase per product)
|
||||
UPDATE product_metrics pm
|
||||
SET lifecycle_phase = sub.lifecycle_phase
|
||||
FROM (
|
||||
SELECT DISTINCT ON (pid) pid, lifecycle_phase
|
||||
FROM product_forecasts
|
||||
ORDER BY pid, forecast_date
|
||||
) sub
|
||||
WHERE pm.pid = sub.pid
|
||||
AND (pm.lifecycle_phase IS DISTINCT FROM sub.lifecycle_phase);
|
||||
|
||||
GET DIAGNOSTICS _updated = ROW_COUNT;
|
||||
RAISE NOTICE 'Updated lifecycle_phase for % products', _updated;
|
||||
|
||||
-- Step 2: Compute lifecycle-based lead time and planning period forecasts
|
||||
-- Uses each product's configured lead time and days of stock
|
||||
WITH forecast_sums AS (
|
||||
SELECT
|
||||
pf.pid,
|
||||
SUM(pf.forecast_units) FILTER (
|
||||
WHERE pf.forecast_date <= CURRENT_DATE + s.effective_lead_time
|
||||
) AS lt_forecast,
|
||||
SUM(pf.forecast_units) FILTER (
|
||||
WHERE pf.forecast_date <= CURRENT_DATE + s.effective_lead_time + s.effective_days_of_stock
|
||||
) AS pp_forecast
|
||||
FROM product_forecasts pf
|
||||
JOIN (
|
||||
SELECT
|
||||
p.pid,
|
||||
COALESCE(sp.lead_time_days, sv.default_lead_time_days,
|
||||
(SELECT setting_value::int FROM settings_global WHERE setting_key = 'default_lead_time_days'), 14
|
||||
) AS effective_lead_time,
|
||||
COALESCE(sp.days_of_stock, sv.default_days_of_stock,
|
||||
(SELECT setting_value::int FROM settings_global WHERE setting_key = 'default_days_of_stock'), 30
|
||||
) AS effective_days_of_stock
|
||||
FROM products p
|
||||
LEFT JOIN settings_product sp ON p.pid = sp.pid
|
||||
LEFT JOIN settings_vendor sv ON p.vendor = sv.vendor
|
||||
) s ON s.pid = pf.pid
|
||||
WHERE pf.forecast_date >= CURRENT_DATE
|
||||
GROUP BY pf.pid
|
||||
)
|
||||
UPDATE product_metrics pm
|
||||
SET
|
||||
lifecycle_lead_time_forecast = COALESCE(fs.lt_forecast, 0),
|
||||
lifecycle_planning_period_forecast = COALESCE(fs.pp_forecast, 0)
|
||||
FROM forecast_sums fs
|
||||
WHERE pm.pid = fs.pid
|
||||
AND (pm.lifecycle_lead_time_forecast IS DISTINCT FROM COALESCE(fs.lt_forecast, 0)
|
||||
OR pm.lifecycle_planning_period_forecast IS DISTINCT FROM COALESCE(fs.pp_forecast, 0));
|
||||
|
||||
GET DIAGNOSTICS _updated = ROW_COUNT;
|
||||
RAISE NOTICE 'Updated lifecycle forecasts for % products', _updated;
|
||||
|
||||
-- Step 3: Reclassify demand_pattern using residual CV (de-trended)
|
||||
-- For launch/decay products, raw CV is high because of expected lifecycle decay.
|
||||
-- We subtract the expected brand curve value to get residuals, then compute CV on those.
|
||||
-- Products that track their brand curve closely → low residual CV → "stable"
|
||||
-- Products with erratic deviations from curve → higher residual CV → "variable"/"sporadic"
|
||||
WITH product_curve AS (
|
||||
-- Get each product's brand curve and age
|
||||
SELECT
|
||||
pm.pid,
|
||||
pm.lifecycle_phase,
|
||||
pm.date_first_received,
|
||||
blc.amplitude,
|
||||
blc.decay_rate,
|
||||
blc.baseline
|
||||
FROM product_metrics pm
|
||||
JOIN products p ON p.pid = pm.pid
|
||||
LEFT JOIN brand_lifecycle_curves blc
|
||||
ON blc.brand = pm.brand
|
||||
AND blc.root_category IS NULL -- brand-only curve
|
||||
WHERE pm.lifecycle_phase IN ('launch', 'decay')
|
||||
AND pm.date_first_received IS NOT NULL
|
||||
AND blc.amplitude IS NOT NULL
|
||||
),
|
||||
daily_residuals AS (
|
||||
-- Compute residual = actual - expected for each snapshot day
|
||||
-- Curve params are in WEEKLY units; divide by 7 to get daily expected
|
||||
SELECT
|
||||
dps.pid,
|
||||
dps.units_sold,
|
||||
(pc.amplitude * EXP(-pc.decay_rate * (dps.snapshot_date - pc.date_first_received)::numeric / 7.0) + pc.baseline) / 7.0 AS expected,
|
||||
dps.units_sold - (pc.amplitude * EXP(-pc.decay_rate * (dps.snapshot_date - pc.date_first_received)::numeric / 7.0) + pc.baseline) / 7.0 AS residual
|
||||
FROM daily_product_snapshots dps
|
||||
JOIN product_curve pc ON pc.pid = dps.pid
|
||||
WHERE dps.snapshot_date >= CURRENT_DATE - INTERVAL '29 days'
|
||||
AND dps.snapshot_date <= CURRENT_DATE
|
||||
),
|
||||
residual_cv AS (
|
||||
SELECT
|
||||
pid,
|
||||
AVG(units_sold) AS avg_sales,
|
||||
CASE WHEN COUNT(*) >= 7 AND AVG(ABS(expected)) > 0.01 THEN
|
||||
STDDEV_POP(residual) / GREATEST(AVG(ABS(expected)), 0.1)
|
||||
END AS res_cv
|
||||
FROM daily_residuals
|
||||
GROUP BY pid
|
||||
)
|
||||
UPDATE product_metrics pm
|
||||
SET demand_pattern = classify_demand_pattern(rc.avg_sales, rc.res_cv)
|
||||
FROM residual_cv rc
|
||||
WHERE pm.pid = rc.pid
|
||||
AND rc.res_cv IS NOT NULL
|
||||
AND pm.demand_pattern IS DISTINCT FROM classify_demand_pattern(rc.avg_sales, rc.res_cv);
|
||||
|
||||
GET DIAGNOSTICS _updated = ROW_COUNT;
|
||||
RAISE NOTICE 'Reclassified demand_pattern for % launch/decay products', _updated;
|
||||
|
||||
-- Update tracking
|
||||
INSERT INTO public.calculate_status (module_name, last_calculation_timestamp)
|
||||
VALUES (_module_name, clock_timestamp())
|
||||
ON CONFLICT (module_name) DO UPDATE SET
|
||||
last_calculation_timestamp = EXCLUDED.last_calculation_timestamp;
|
||||
|
||||
RAISE NOTICE '% module complete. Duration: %', _module_name, clock_timestamp() - _start_time;
|
||||
END $$;
|
||||
Reference in New Issue
Block a user