222 lines
11 KiB
SQL
222 lines
11 KiB
SQL
-- Description: Calculates and updates daily aggregated product data for recent days.
|
|
-- Uses UPSERT (INSERT ON CONFLICT UPDATE) for idempotency.
|
|
-- Dependencies: Core import tables (products, orders, purchase_orders), calculate_status table.
|
|
-- Frequency: Hourly (Run ~5-10 minutes after hourly data import completes).
|
|
|
|
DO $$
|
|
DECLARE
|
|
_module_name TEXT := 'daily_snapshots';
|
|
_start_time TIMESTAMPTZ := clock_timestamp(); -- Time execution started
|
|
_last_calc_time TIMESTAMPTZ;
|
|
_target_date DATE; -- Will be set in the loop
|
|
_total_records INT := 0;
|
|
_has_orders BOOLEAN := FALSE;
|
|
_process_days INT := 5; -- Number of days to check/process (today plus previous 4 days)
|
|
_day_counter INT;
|
|
_missing_days INT[] := ARRAY[]::INT[]; -- Array to store days with missing or incomplete data
|
|
BEGIN
|
|
-- Get the timestamp before the last successful run of this module
|
|
SELECT last_calculation_timestamp INTO _last_calc_time
|
|
FROM public.calculate_status
|
|
WHERE module_name = _module_name;
|
|
|
|
RAISE NOTICE 'Running % script. Start Time: %', _module_name, _start_time;
|
|
|
|
-- First, check which days need processing by comparing orders data with snapshot data
|
|
FOR _day_counter IN 0..(_process_days-1) LOOP
|
|
_target_date := CURRENT_DATE - (_day_counter * INTERVAL '1 day');
|
|
|
|
-- Check if this date needs updating by comparing orders to snapshot data
|
|
-- If the date has orders but not enough snapshots, or if snapshots show zero sales but orders exist, it's incomplete
|
|
SELECT
|
|
CASE WHEN (
|
|
-- We have orders for this date but not enough snapshots, or snapshots with wrong total
|
|
(EXISTS (SELECT 1 FROM public.orders WHERE date::date = _target_date) AND
|
|
(
|
|
-- No snapshots exist for this date
|
|
NOT EXISTS (SELECT 1 FROM public.daily_product_snapshots WHERE snapshot_date = _target_date) OR
|
|
-- Or snapshots show zero sales but orders exist
|
|
(SELECT COALESCE(SUM(units_sold), 0) FROM public.daily_product_snapshots WHERE snapshot_date = _target_date) = 0 OR
|
|
-- Or the count of snapshot records is significantly less than distinct products in orders
|
|
(SELECT COUNT(*) FROM public.daily_product_snapshots WHERE snapshot_date = _target_date) <
|
|
(SELECT COUNT(DISTINCT pid) FROM public.orders WHERE date::date = _target_date) * 0.8
|
|
)
|
|
)
|
|
) THEN TRUE ELSE FALSE END
|
|
INTO _has_orders;
|
|
|
|
IF _has_orders THEN
|
|
-- This day needs processing - add to our array
|
|
_missing_days := _missing_days || _day_counter;
|
|
RAISE NOTICE 'Day % needs updating (incomplete or missing data)', _target_date;
|
|
END IF;
|
|
END LOOP;
|
|
|
|
-- If no days need updating, exit early
|
|
IF array_length(_missing_days, 1) IS NULL THEN
|
|
RAISE NOTICE 'No days need updating - all snapshot data appears complete';
|
|
|
|
-- Still update the calculate_status to record this run
|
|
UPDATE public.calculate_status
|
|
SET last_calculation_timestamp = _start_time
|
|
WHERE module_name = _module_name;
|
|
|
|
RETURN;
|
|
END IF;
|
|
|
|
RAISE NOTICE 'Need to update % days with missing or incomplete data', array_length(_missing_days, 1);
|
|
|
|
-- Process only the days that need updating
|
|
FOREACH _day_counter IN ARRAY _missing_days LOOP
|
|
_target_date := CURRENT_DATE - (_day_counter * INTERVAL '1 day');
|
|
RAISE NOTICE 'Processing date: %', _target_date;
|
|
|
|
-- IMPORTANT: First delete any existing data for this date to prevent duplication
|
|
DELETE FROM public.daily_product_snapshots
|
|
WHERE snapshot_date = _target_date;
|
|
|
|
-- Proceed with calculating daily metrics only for products with actual activity
|
|
WITH SalesData AS (
|
|
SELECT
|
|
p.pid,
|
|
p.sku,
|
|
-- Track number of orders to ensure we have real data
|
|
COUNT(o.id) as order_count,
|
|
-- Aggregate Sales (Quantity > 0, Status not Canceled/Returned)
|
|
COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN o.quantity ELSE 0 END), 0) AS units_sold,
|
|
COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN o.price * o.quantity ELSE 0 END), 0.00) AS gross_revenue_unadjusted, -- Before discount
|
|
COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN o.discount ELSE 0 END), 0.00) AS discounts,
|
|
COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN
|
|
COALESCE(
|
|
o.costeach, -- First use order-specific cost if available
|
|
get_weighted_avg_cost(p.pid, o.date::date), -- Then use weighted average cost
|
|
p.landing_cost_price, -- Fallback to landing cost
|
|
p.cost_price -- Final fallback to current cost
|
|
) * o.quantity
|
|
ELSE 0 END), 0.00) AS cogs,
|
|
COALESCE(SUM(CASE WHEN o.quantity > 0 AND COALESCE(o.status, 'pending') NOT IN ('canceled', 'returned') THEN p.regular_price * o.quantity ELSE 0 END), 0.00) AS gross_regular_revenue, -- Use current regular price for simplicity here
|
|
|
|
-- Aggregate Returns (Quantity < 0 or Status = Returned)
|
|
COALESCE(SUM(CASE WHEN o.quantity < 0 OR COALESCE(o.status, 'pending') = 'returned' THEN ABS(o.quantity) ELSE 0 END), 0) AS units_returned,
|
|
COALESCE(SUM(CASE WHEN o.quantity < 0 OR COALESCE(o.status, 'pending') = 'returned' THEN o.price * ABS(o.quantity) ELSE 0 END), 0.00) AS returns_revenue
|
|
FROM public.products p -- Start from products to include those with no orders today
|
|
JOIN public.orders o -- Changed to INNER JOIN to only process products with orders
|
|
ON p.pid = o.pid
|
|
AND o.date::date = _target_date -- Cast to date to ensure compatibility regardless of original type
|
|
GROUP BY p.pid, p.sku
|
|
-- No HAVING clause here - we always want to include all orders
|
|
),
|
|
ReceivingData AS (
|
|
SELECT
|
|
r.pid,
|
|
-- Track number of receiving docs to ensure we have real data
|
|
COUNT(DISTINCT r.receiving_id) as receiving_doc_count,
|
|
-- Sum the quantities received on this date
|
|
SUM(r.qty_each) AS units_received,
|
|
-- Calculate the cost received (qty * cost)
|
|
SUM(r.qty_each * r.cost_each) AS cost_received
|
|
FROM public.receivings r
|
|
WHERE r.received_date::date = _target_date
|
|
-- Optional: Filter out canceled receivings if needed
|
|
-- AND r.status <> 'canceled'
|
|
GROUP BY r.pid
|
|
-- Only include products with actual receiving activity
|
|
HAVING COUNT(DISTINCT r.receiving_id) > 0 OR SUM(r.qty_each) > 0
|
|
),
|
|
CurrentStock AS (
|
|
-- Select current stock values directly from products table
|
|
SELECT
|
|
pid,
|
|
stock_quantity,
|
|
COALESCE(landing_cost_price, cost_price, 0.00) as effective_cost_price,
|
|
COALESCE(price, 0.00) as current_price,
|
|
COALESCE(regular_price, 0.00) as current_regular_price
|
|
FROM public.products
|
|
),
|
|
ProductsWithActivity AS (
|
|
-- Quick pre-filter to only process products with activity
|
|
SELECT DISTINCT pid
|
|
FROM (
|
|
SELECT pid FROM SalesData
|
|
UNION
|
|
SELECT pid FROM ReceivingData
|
|
) a
|
|
)
|
|
-- Now insert records, but ONLY for products with actual activity
|
|
INSERT INTO public.daily_product_snapshots (
|
|
snapshot_date,
|
|
pid,
|
|
sku,
|
|
eod_stock_quantity,
|
|
eod_stock_cost,
|
|
eod_stock_retail,
|
|
eod_stock_gross,
|
|
stockout_flag,
|
|
units_sold,
|
|
units_returned,
|
|
gross_revenue,
|
|
discounts,
|
|
returns_revenue,
|
|
net_revenue,
|
|
cogs,
|
|
gross_regular_revenue,
|
|
profit,
|
|
units_received,
|
|
cost_received,
|
|
calculation_timestamp
|
|
)
|
|
SELECT
|
|
_target_date AS snapshot_date,
|
|
COALESCE(sd.pid, rd.pid) AS pid, -- Use sales or receiving PID
|
|
COALESCE(sd.sku, p.sku) AS sku, -- Get SKU from sales data or products table
|
|
-- Inventory Metrics (Using CurrentStock)
|
|
cs.stock_quantity AS eod_stock_quantity,
|
|
cs.stock_quantity * cs.effective_cost_price AS eod_stock_cost,
|
|
cs.stock_quantity * cs.current_price AS eod_stock_retail,
|
|
cs.stock_quantity * cs.current_regular_price AS eod_stock_gross,
|
|
(cs.stock_quantity <= 0) AS stockout_flag,
|
|
-- Sales Metrics (From SalesData)
|
|
COALESCE(sd.units_sold, 0),
|
|
COALESCE(sd.units_returned, 0),
|
|
COALESCE(sd.gross_revenue_unadjusted, 0.00),
|
|
COALESCE(sd.discounts, 0.00),
|
|
COALESCE(sd.returns_revenue, 0.00),
|
|
COALESCE(sd.gross_revenue_unadjusted, 0.00) - COALESCE(sd.discounts, 0.00) AS net_revenue,
|
|
COALESCE(sd.cogs, 0.00),
|
|
COALESCE(sd.gross_regular_revenue, 0.00),
|
|
(COALESCE(sd.gross_revenue_unadjusted, 0.00) - COALESCE(sd.discounts, 0.00)) - COALESCE(sd.cogs, 0.00) AS profit, -- Basic profit: Net Revenue - COGS
|
|
-- Receiving Metrics (From ReceivingData)
|
|
COALESCE(rd.units_received, 0),
|
|
COALESCE(rd.cost_received, 0.00),
|
|
_start_time -- Timestamp of this calculation run
|
|
FROM SalesData sd
|
|
FULL OUTER JOIN ReceivingData rd ON sd.pid = rd.pid
|
|
JOIN ProductsWithActivity pwa ON COALESCE(sd.pid, rd.pid) = pwa.pid
|
|
LEFT JOIN public.products p ON COALESCE(sd.pid, rd.pid) = p.pid
|
|
LEFT JOIN CurrentStock cs ON COALESCE(sd.pid, rd.pid) = cs.pid
|
|
WHERE p.pid IS NOT NULL; -- Ensure we only insert for existing products
|
|
|
|
-- Get the total number of records inserted for this date
|
|
GET DIAGNOSTICS _total_records = ROW_COUNT;
|
|
RAISE NOTICE 'Created % daily snapshot records for % with sales/receiving activity', _total_records, _target_date;
|
|
END LOOP;
|
|
|
|
-- Update the status table with the timestamp from the START of this run
|
|
UPDATE public.calculate_status
|
|
SET last_calculation_timestamp = _start_time
|
|
WHERE module_name = _module_name;
|
|
|
|
RAISE NOTICE 'Finished % processing for multiple dates. Duration: %', _module_name, clock_timestamp() - _start_time;
|
|
|
|
END $$;
|
|
|
|
-- Return the total records processed for tracking
|
|
SELECT
|
|
COUNT(*) as rows_processed,
|
|
COUNT(DISTINCT snapshot_date) as days_processed,
|
|
MIN(snapshot_date) as earliest_date,
|
|
MAX(snapshot_date) as latest_date,
|
|
SUM(units_sold) as total_units_sold,
|
|
SUM(units_received) as total_units_received
|
|
FROM public.daily_product_snapshots
|
|
WHERE calculation_timestamp >= (NOW() - INTERVAL '5 minutes'); -- Recent updates only |