Optimize orders import
This commit is contained in:
@@ -1,227 +0,0 @@
|
||||
-- Description: Calculates and updates daily aggregated product data.
|
||||
-- Self-healing: automatically detects and fills gaps in snapshot history.
|
||||
-- Always reprocesses recent days to pick up new orders and data corrections.
|
||||
-- Dependencies: Core import tables (products, orders, purchase_orders), calculate_status table.
|
||||
-- Frequency: Hourly (Run ~5-10 minutes after hourly data import completes).
|
||||
|
||||
DO $$
|
||||
DECLARE
|
||||
_module_name TEXT := 'daily_snapshots';
|
||||
_start_time TIMESTAMPTZ := clock_timestamp();
|
||||
_target_date DATE;
|
||||
_total_records INT := 0;
|
||||
_days_processed INT := 0;
|
||||
_max_backfill_days INT := 90; -- Safety cap: max days to backfill per run
|
||||
_recent_recheck_days INT := 2; -- Always reprocess this many recent days (today + yesterday)
|
||||
_latest_snapshot DATE;
|
||||
_backfill_start DATE;
|
||||
BEGIN
|
||||
RAISE NOTICE 'Running % script. Start Time: %', _module_name, _start_time;
|
||||
|
||||
-- Find the latest existing snapshot date to determine where gaps begin
|
||||
SELECT MAX(snapshot_date) INTO _latest_snapshot
|
||||
FROM public.daily_product_snapshots;
|
||||
|
||||
-- Determine how far back to look for gaps, capped at _max_backfill_days
|
||||
_backfill_start := GREATEST(
|
||||
COALESCE(_latest_snapshot + 1, CURRENT_DATE - _max_backfill_days),
|
||||
CURRENT_DATE - _max_backfill_days
|
||||
);
|
||||
|
||||
IF _latest_snapshot IS NULL THEN
|
||||
RAISE NOTICE 'No existing snapshots found. Backfilling up to % days.', _max_backfill_days;
|
||||
ELSIF _backfill_start > _latest_snapshot + 1 THEN
|
||||
RAISE NOTICE 'Latest snapshot: %. Gap exceeds % day cap — backfilling from %. Use rebuild script for full history.',
|
||||
_latest_snapshot, _max_backfill_days, _backfill_start;
|
||||
ELSE
|
||||
RAISE NOTICE 'Latest snapshot: %. Checking for gaps from %.', _latest_snapshot, _backfill_start;
|
||||
END IF;
|
||||
|
||||
-- Process all dates that need snapshots:
|
||||
-- 1. Gap fill: dates with orders/receivings but no snapshots (older than recent window)
|
||||
-- 2. Recent recheck: last N days always reprocessed (picks up new orders, corrections)
|
||||
FOR _target_date IN
|
||||
SELECT d FROM (
|
||||
-- Gap fill: find dates with activity but missing snapshots
|
||||
SELECT activity_dates.d
|
||||
FROM (
|
||||
SELECT DISTINCT date::date AS d FROM public.orders
|
||||
WHERE date::date >= _backfill_start AND date::date < CURRENT_DATE - _recent_recheck_days
|
||||
UNION
|
||||
SELECT DISTINCT received_date::date AS d FROM public.receivings
|
||||
WHERE received_date::date >= _backfill_start AND received_date::date < CURRENT_DATE - _recent_recheck_days
|
||||
) activity_dates
|
||||
WHERE NOT EXISTS (
|
||||
SELECT 1 FROM public.daily_product_snapshots dps WHERE dps.snapshot_date = activity_dates.d
|
||||
)
|
||||
UNION
|
||||
-- Recent days: always reprocess
|
||||
SELECT d::date
|
||||
FROM generate_series(
|
||||
(CURRENT_DATE - _recent_recheck_days)::timestamp,
|
||||
CURRENT_DATE::timestamp,
|
||||
'1 day'::interval
|
||||
) d
|
||||
) dates_to_process
|
||||
ORDER BY d
|
||||
LOOP
|
||||
_days_processed := _days_processed + 1;
|
||||
RAISE NOTICE 'Processing date: % [%/%]', _target_date, _days_processed,
|
||||
_days_processed; -- count not known ahead of time, but shows progress
|
||||
|
||||
-- IMPORTANT: First delete any existing data for this date to prevent duplication
|
||||
DELETE FROM public.daily_product_snapshots
|
||||
WHERE snapshot_date = _target_date;
|
||||
|
||||
-- Proceed with calculating daily metrics only for products with actual activity
|
||||
WITH SalesData AS (
|
||||
SELECT
|
||||
p.pid,
|
||||
p.sku,
|
||||
-- Track number of orders to ensure we have real data
|
||||
COUNT(o.id) as order_count,
|
||||
-- Aggregate Sales (Quantity > 0, Status not Canceled/Returned)
|
||||
COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN o.quantity ELSE 0 END), 0) AS units_sold,
|
||||
COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN o.price * o.quantity ELSE 0 END), 0.00) AS gross_revenue_unadjusted, -- Before discount
|
||||
COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN o.discount ELSE 0 END), 0.00) AS discounts,
|
||||
COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN
|
||||
COALESCE(
|
||||
o.costeach, -- First use order-specific cost if available
|
||||
get_weighted_avg_cost(p.pid, o.date::date), -- Then use weighted average cost
|
||||
p.cost_price -- Final fallback to current cost
|
||||
) * o.quantity
|
||||
ELSE 0 END), 0.00) AS cogs,
|
||||
COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN p.regular_price * o.quantity ELSE 0 END), 0.00) AS gross_regular_revenue, -- Use current regular price for simplicity here
|
||||
|
||||
-- Aggregate Returns (Quantity < 0 or Status = Returned)
|
||||
COALESCE(SUM(CASE WHEN o.quantity < 0 OR COALESCE(o.status, 'pending') = 'returned' THEN ABS(o.quantity) ELSE 0 END), 0) AS units_returned,
|
||||
COALESCE(SUM(CASE WHEN o.quantity < 0 OR COALESCE(o.status, 'pending') = 'returned' THEN o.price * ABS(o.quantity) ELSE 0 END), 0.00) AS returns_revenue
|
||||
FROM public.products p -- Start from products to include those with no orders today
|
||||
JOIN public.orders o -- Changed to INNER JOIN to only process products with orders
|
||||
ON p.pid = o.pid
|
||||
AND o.date::date = _target_date -- Cast to date to ensure compatibility regardless of original type
|
||||
GROUP BY p.pid, p.sku
|
||||
-- No HAVING clause here - we always want to include all orders
|
||||
),
|
||||
ReceivingData AS (
|
||||
SELECT
|
||||
r.pid,
|
||||
-- Track number of receiving docs to ensure we have real data
|
||||
COUNT(DISTINCT r.receiving_id) as receiving_doc_count,
|
||||
-- Sum the quantities received on this date
|
||||
SUM(r.qty_each) AS units_received,
|
||||
-- Calculate the cost received (qty * cost)
|
||||
SUM(r.qty_each * r.cost_each) AS cost_received
|
||||
FROM public.receivings r
|
||||
WHERE r.received_date::date = _target_date
|
||||
-- Optional: Filter out canceled receivings if needed
|
||||
-- AND r.status <> 'canceled'
|
||||
GROUP BY r.pid
|
||||
-- Only include products with actual receiving activity
|
||||
HAVING COUNT(DISTINCT r.receiving_id) > 0 OR SUM(r.qty_each) > 0
|
||||
),
|
||||
CurrentStock AS (
|
||||
-- Use historical stock from stock_snapshots when available,
|
||||
-- falling back to current stock from products table
|
||||
SELECT
|
||||
p.pid,
|
||||
COALESCE(ss.stock_quantity, p.stock_quantity) AS stock_quantity,
|
||||
COALESCE(ss.stock_value, p.stock_quantity * COALESCE(p.cost_price, 0.00)) AS stock_value,
|
||||
COALESCE(p.price, 0.00) AS current_price,
|
||||
COALESCE(p.regular_price, 0.00) AS current_regular_price
|
||||
FROM public.products p
|
||||
LEFT JOIN stock_snapshots ss ON p.pid = ss.pid AND ss.snapshot_date = _target_date
|
||||
),
|
||||
ProductsWithActivity AS (
|
||||
-- Quick pre-filter to only process products with activity
|
||||
SELECT DISTINCT pid
|
||||
FROM (
|
||||
SELECT pid FROM SalesData
|
||||
UNION
|
||||
SELECT pid FROM ReceivingData
|
||||
) a
|
||||
)
|
||||
-- Now insert records, but ONLY for products with actual activity
|
||||
INSERT INTO public.daily_product_snapshots (
|
||||
snapshot_date,
|
||||
pid,
|
||||
sku,
|
||||
eod_stock_quantity,
|
||||
eod_stock_cost,
|
||||
eod_stock_retail,
|
||||
eod_stock_gross,
|
||||
stockout_flag,
|
||||
units_sold,
|
||||
units_returned,
|
||||
gross_revenue,
|
||||
discounts,
|
||||
returns_revenue,
|
||||
net_revenue,
|
||||
cogs,
|
||||
gross_regular_revenue,
|
||||
profit,
|
||||
units_received,
|
||||
cost_received,
|
||||
calculation_timestamp
|
||||
)
|
||||
SELECT
|
||||
_target_date AS snapshot_date,
|
||||
COALESCE(sd.pid, rd.pid) AS pid, -- Use sales or receiving PID
|
||||
COALESCE(sd.sku, p.sku) AS sku, -- Get SKU from sales data or products table
|
||||
-- Inventory Metrics (Using CurrentStock)
|
||||
cs.stock_quantity AS eod_stock_quantity,
|
||||
cs.stock_value AS eod_stock_cost,
|
||||
cs.stock_quantity * cs.current_price AS eod_stock_retail,
|
||||
cs.stock_quantity * cs.current_regular_price AS eod_stock_gross,
|
||||
(cs.stock_quantity <= 0) AS stockout_flag,
|
||||
-- Sales Metrics (From SalesData)
|
||||
COALESCE(sd.units_sold, 0),
|
||||
COALESCE(sd.units_returned, 0),
|
||||
COALESCE(sd.gross_revenue_unadjusted, 0.00),
|
||||
COALESCE(sd.discounts, 0.00),
|
||||
COALESCE(sd.returns_revenue, 0.00),
|
||||
COALESCE(sd.gross_revenue_unadjusted, 0.00) - COALESCE(sd.discounts, 0.00) - COALESCE(sd.returns_revenue, 0.00) AS net_revenue,
|
||||
COALESCE(sd.cogs, 0.00),
|
||||
COALESCE(sd.gross_regular_revenue, 0.00),
|
||||
(COALESCE(sd.gross_revenue_unadjusted, 0.00) - COALESCE(sd.discounts, 0.00) - COALESCE(sd.returns_revenue, 0.00)) - COALESCE(sd.cogs, 0.00) AS profit,
|
||||
-- Receiving Metrics (From ReceivingData)
|
||||
COALESCE(rd.units_received, 0),
|
||||
COALESCE(rd.cost_received, 0.00),
|
||||
_start_time -- Timestamp of this calculation run
|
||||
FROM SalesData sd
|
||||
FULL OUTER JOIN ReceivingData rd ON sd.pid = rd.pid
|
||||
JOIN ProductsWithActivity pwa ON COALESCE(sd.pid, rd.pid) = pwa.pid
|
||||
LEFT JOIN public.products p ON COALESCE(sd.pid, rd.pid) = p.pid
|
||||
LEFT JOIN CurrentStock cs ON COALESCE(sd.pid, rd.pid) = cs.pid
|
||||
WHERE p.pid IS NOT NULL; -- Ensure we only insert for existing products
|
||||
|
||||
-- Get the total number of records inserted for this date
|
||||
GET DIAGNOSTICS _total_records = ROW_COUNT;
|
||||
RAISE NOTICE 'Created % daily snapshot records for % with sales/receiving activity', _total_records, _target_date;
|
||||
END LOOP;
|
||||
|
||||
IF _days_processed = 0 THEN
|
||||
RAISE NOTICE 'No days need updating — all snapshot data is current.';
|
||||
ELSE
|
||||
RAISE NOTICE 'Processed % days total.', _days_processed;
|
||||
END IF;
|
||||
|
||||
-- Update the status table with the timestamp from the START of this run
|
||||
INSERT INTO public.calculate_status (module_name, last_calculation_timestamp)
|
||||
VALUES (_module_name, _start_time)
|
||||
ON CONFLICT (module_name) DO UPDATE SET last_calculation_timestamp = _start_time;
|
||||
|
||||
RAISE NOTICE 'Finished % script. Duration: %', _module_name, clock_timestamp() - _start_time;
|
||||
|
||||
END $$;
|
||||
|
||||
-- Return the total records processed for tracking
|
||||
SELECT
|
||||
COUNT(*) as rows_processed,
|
||||
COUNT(DISTINCT snapshot_date) as days_processed,
|
||||
MIN(snapshot_date) as earliest_date,
|
||||
MAX(snapshot_date) as latest_date,
|
||||
SUM(units_sold) as total_units_sold,
|
||||
SUM(units_received) as total_units_received
|
||||
FROM public.daily_product_snapshots
|
||||
WHERE calculation_timestamp >= (NOW() - INTERVAL '5 minutes'); -- Recent updates only
|
||||
Reference in New Issue
Block a user