Add receivings table, split PO import
This commit is contained in:
@@ -11,10 +11,10 @@ const importHistoricalData = require('./import/historical-data');
|
||||
dotenv.config({ path: path.join(__dirname, "../.env") });
|
||||
|
||||
// Constants to control which imports run
|
||||
const IMPORT_CATEGORIES = true;
|
||||
const IMPORT_CATEGORIES = false;
|
||||
const IMPORT_PRODUCTS = true;
|
||||
const IMPORT_ORDERS = true;
|
||||
const IMPORT_PURCHASE_ORDERS = true;
|
||||
const IMPORT_ORDERS = false;
|
||||
const IMPORT_PURCHASE_ORDERS = false;
|
||||
const IMPORT_HISTORICAL_DATA = false;
|
||||
|
||||
// Add flag for incremental updates
|
||||
|
||||
@@ -406,7 +406,12 @@ async function materializeCalculations(prodConnection, localConnection, incremen
|
||||
WHERE oi.prod_pid = p.pid AND o.order_status >= 20) AS total_sold,
|
||||
pls.date_sold as date_last_sold,
|
||||
(SELECT iid FROM product_images WHERE pid = p.pid AND \`order\` = 255 LIMIT 1) AS primary_iid,
|
||||
NULL as category_ids
|
||||
GROUP_CONCAT(DISTINCT CASE
|
||||
WHEN pc.cat_id IS NOT NULL
|
||||
AND pc.type IN (10, 20, 11, 21, 12, 13)
|
||||
AND pci.cat_id NOT IN (16, 17)
|
||||
THEN pci.cat_id
|
||||
END) as category_ids
|
||||
FROM products p
|
||||
LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0
|
||||
LEFT JOIN current_inventory ci ON p.pid = ci.pid
|
||||
|
||||
@@ -35,7 +35,7 @@ function validateDate(mysqlDate) {
|
||||
|
||||
/**
|
||||
* Imports purchase orders and receivings from a production MySQL database to a local PostgreSQL database.
|
||||
* Implements FIFO allocation of receivings to purchase orders.
|
||||
* Handles these as separate data streams without complex FIFO allocation.
|
||||
*
|
||||
* @param {object} prodConnection - A MySQL connection to production DB
|
||||
* @param {object} localConnection - A PostgreSQL connection to local DB
|
||||
@@ -44,8 +44,10 @@ function validateDate(mysqlDate) {
|
||||
*/
|
||||
async function importPurchaseOrders(prodConnection, localConnection, incrementalUpdate = true) {
|
||||
const startTime = Date.now();
|
||||
let recordsAdded = 0;
|
||||
let recordsUpdated = 0;
|
||||
let poRecordsAdded = 0;
|
||||
let poRecordsUpdated = 0;
|
||||
let receivingRecordsAdded = 0;
|
||||
let receivingRecordsUpdated = 0;
|
||||
let totalProcessed = 0;
|
||||
|
||||
// Batch size constants
|
||||
@@ -68,7 +70,6 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
|
||||
await localConnection.query(`
|
||||
DROP TABLE IF EXISTS temp_purchase_orders;
|
||||
DROP TABLE IF EXISTS temp_receivings;
|
||||
DROP TABLE IF EXISTS temp_receiving_allocations;
|
||||
DROP TABLE IF EXISTS employee_names;
|
||||
DROP TABLE IF EXISTS temp_supplier_names;
|
||||
|
||||
@@ -95,32 +96,23 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
|
||||
-- Temporary table for receivings
|
||||
CREATE TEMP TABLE temp_receivings (
|
||||
receiving_id TEXT NOT NULL,
|
||||
po_id TEXT,
|
||||
pid BIGINT NOT NULL,
|
||||
sku TEXT,
|
||||
name TEXT,
|
||||
vendor TEXT,
|
||||
qty_each INTEGER,
|
||||
cost_each NUMERIC(14, 4),
|
||||
qty_each_orig INTEGER,
|
||||
cost_each NUMERIC(14, 5),
|
||||
cost_each_orig NUMERIC(14, 5),
|
||||
received_by INTEGER,
|
||||
received_by_name TEXT,
|
||||
received_date TIMESTAMP WITH TIME ZONE,
|
||||
receiving_created_date TIMESTAMP WITH TIME ZONE,
|
||||
supplier_id INTEGER,
|
||||
status TEXT,
|
||||
sku TEXT,
|
||||
name TEXT,
|
||||
PRIMARY KEY (receiving_id, pid)
|
||||
);
|
||||
|
||||
-- Temporary table for tracking FIFO allocations
|
||||
CREATE TEMP TABLE temp_receiving_allocations (
|
||||
po_id TEXT NOT NULL,
|
||||
pid BIGINT NOT NULL,
|
||||
receiving_id TEXT NOT NULL,
|
||||
allocated_qty INTEGER NOT NULL,
|
||||
cost_each NUMERIC(14, 4) NOT NULL,
|
||||
received_date TIMESTAMP WITH TIME ZONE NOT NULL,
|
||||
received_by INTEGER,
|
||||
PRIMARY KEY (po_id, pid, receiving_id)
|
||||
);
|
||||
|
||||
-- Temporary table for employee names
|
||||
CREATE TEMP TABLE employee_names (
|
||||
employeeid INTEGER PRIMARY KEY,
|
||||
@@ -131,7 +123,6 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
|
||||
-- Create indexes for efficient joins
|
||||
CREATE INDEX idx_temp_po_pid ON temp_purchase_orders(pid);
|
||||
CREATE INDEX idx_temp_receiving_pid ON temp_receivings(pid);
|
||||
CREATE INDEX idx_temp_receiving_po_id ON temp_receivings(po_id);
|
||||
`);
|
||||
|
||||
// Map status codes to text values
|
||||
@@ -194,7 +185,56 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
|
||||
`, employeeValues);
|
||||
}
|
||||
|
||||
// 1. First, fetch all relevant POs
|
||||
// Add this section before the PO import to create a supplier names mapping
|
||||
outputProgress({
|
||||
status: "running",
|
||||
operation: "Purchase orders import",
|
||||
message: "Fetching supplier data for vendor mapping"
|
||||
});
|
||||
|
||||
// Fetch supplier data from production and store in a temp table
|
||||
const [suppliers] = await prodConnection.query(`
|
||||
SELECT
|
||||
supplierid,
|
||||
companyname
|
||||
FROM suppliers
|
||||
WHERE companyname IS NOT NULL AND companyname != ''
|
||||
`);
|
||||
|
||||
if (suppliers.length > 0) {
|
||||
// Create temp table for supplier names
|
||||
await localConnection.query(`
|
||||
DROP TABLE IF EXISTS temp_supplier_names;
|
||||
CREATE TEMP TABLE temp_supplier_names (
|
||||
supplier_id INTEGER PRIMARY KEY,
|
||||
company_name TEXT NOT NULL
|
||||
);
|
||||
`);
|
||||
|
||||
// Insert supplier data in batches
|
||||
for (let i = 0; i < suppliers.length; i += INSERT_BATCH_SIZE) {
|
||||
const batch = suppliers.slice(i, i + INSERT_BATCH_SIZE);
|
||||
|
||||
const placeholders = batch.map((_, idx) => {
|
||||
const base = idx * 2;
|
||||
return `($${base + 1}, $${base + 2})`;
|
||||
}).join(',');
|
||||
|
||||
const values = batch.flatMap(s => [
|
||||
s.supplierid,
|
||||
s.companyname || 'Unnamed Supplier'
|
||||
]);
|
||||
|
||||
await localConnection.query(`
|
||||
INSERT INTO temp_supplier_names (supplier_id, company_name)
|
||||
VALUES ${placeholders}
|
||||
ON CONFLICT (supplier_id) DO UPDATE SET
|
||||
company_name = EXCLUDED.company_name
|
||||
`, values);
|
||||
}
|
||||
}
|
||||
|
||||
// 1. Fetch and process purchase orders
|
||||
outputProgress({
|
||||
status: "running",
|
||||
operation: "Purchase orders import",
|
||||
@@ -392,10 +432,16 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
|
||||
const [receivingList] = await prodConnection.query(`
|
||||
SELECT
|
||||
r.receiving_id,
|
||||
r.po_id,
|
||||
r.supplier_id,
|
||||
r.status,
|
||||
r.date_created
|
||||
r.notes,
|
||||
r.shipping,
|
||||
r.total_amount,
|
||||
r.hold,
|
||||
r.for_storefront,
|
||||
r.date_created,
|
||||
r.date_paid,
|
||||
r.date_checked
|
||||
FROM receivings r
|
||||
WHERE r.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL ${yearInterval} YEAR)
|
||||
${incrementalUpdate ? `
|
||||
@@ -421,7 +467,9 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
|
||||
rp.receiving_id,
|
||||
rp.pid,
|
||||
rp.qty_each,
|
||||
rp.qty_each_orig,
|
||||
rp.cost_each,
|
||||
rp.cost_each_orig,
|
||||
rp.received_by,
|
||||
rp.received_date,
|
||||
r.date_created as receiving_created_date,
|
||||
@@ -439,19 +487,50 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
|
||||
const receiving = receivingList.find(r => r.receiving_id == product.receiving_id);
|
||||
if (!receiving) continue;
|
||||
|
||||
// Get employee name if available
|
||||
let receivedByName = null;
|
||||
if (product.received_by) {
|
||||
const [employeeResult] = await localConnection.query(`
|
||||
SELECT CONCAT(firstname, ' ', lastname) as full_name
|
||||
FROM employee_names
|
||||
WHERE employeeid = $1
|
||||
`, [product.received_by]);
|
||||
|
||||
if (employeeResult.rows.length > 0) {
|
||||
receivedByName = employeeResult.rows[0].full_name;
|
||||
}
|
||||
}
|
||||
|
||||
// Get vendor name if available
|
||||
let vendorName = 'Unknown Vendor';
|
||||
if (receiving.supplier_id) {
|
||||
const [vendorResult] = await localConnection.query(`
|
||||
SELECT company_name
|
||||
FROM temp_supplier_names
|
||||
WHERE supplier_id = $1
|
||||
`, [receiving.supplier_id]);
|
||||
|
||||
if (vendorResult.rows.length > 0) {
|
||||
vendorName = vendorResult.rows[0].company_name;
|
||||
}
|
||||
}
|
||||
|
||||
completeReceivings.push({
|
||||
receiving_id: receiving.receiving_id.toString(),
|
||||
po_id: receiving.po_id ? receiving.po_id.toString() : null,
|
||||
pid: product.pid,
|
||||
sku: product.sku,
|
||||
name: product.name,
|
||||
vendor: vendorName,
|
||||
qty_each: product.qty_each,
|
||||
qty_each_orig: product.qty_each_orig,
|
||||
cost_each: product.cost_each,
|
||||
cost_each_orig: product.cost_each_orig,
|
||||
received_by: product.received_by,
|
||||
received_by_name: receivedByName,
|
||||
received_date: validateDate(product.received_date) || validateDate(product.receiving_created_date),
|
||||
receiving_created_date: validateDate(product.receiving_created_date),
|
||||
supplier_id: receiving.supplier_id,
|
||||
status: receivingStatusMap[receiving.status] || 'created',
|
||||
sku: product.sku,
|
||||
name: product.name
|
||||
status: receivingStatusMap[receiving.status] || 'created'
|
||||
});
|
||||
}
|
||||
|
||||
@@ -460,43 +539,49 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
|
||||
const batch = completeReceivings.slice(i, i + INSERT_BATCH_SIZE);
|
||||
|
||||
const placeholders = batch.map((_, idx) => {
|
||||
const base = idx * 12;
|
||||
return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, $${base + 10}, $${base + 11}, $${base + 12})`;
|
||||
const base = idx * 15;
|
||||
return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, $${base + 10}, $${base + 11}, $${base + 12}, $${base + 13}, $${base + 14}, $${base + 15})`;
|
||||
}).join(',');
|
||||
|
||||
const values = batch.flatMap(r => [
|
||||
r.receiving_id,
|
||||
r.po_id,
|
||||
r.pid,
|
||||
r.sku,
|
||||
r.name,
|
||||
r.vendor,
|
||||
r.qty_each,
|
||||
r.qty_each_orig,
|
||||
r.cost_each,
|
||||
r.cost_each_orig,
|
||||
r.received_by,
|
||||
r.received_by_name,
|
||||
r.received_date,
|
||||
r.receiving_created_date,
|
||||
r.supplier_id,
|
||||
r.status,
|
||||
r.sku,
|
||||
r.name
|
||||
r.status
|
||||
]);
|
||||
|
||||
await localConnection.query(`
|
||||
INSERT INTO temp_receivings (
|
||||
receiving_id, po_id, pid, qty_each, cost_each, received_by,
|
||||
received_date, receiving_created_date, supplier_id, status,
|
||||
sku, name
|
||||
receiving_id, pid, sku, name, vendor, qty_each, qty_each_orig,
|
||||
cost_each, cost_each_orig, received_by, received_by_name,
|
||||
received_date, receiving_created_date, supplier_id, status
|
||||
)
|
||||
VALUES ${placeholders}
|
||||
ON CONFLICT (receiving_id, pid) DO UPDATE SET
|
||||
po_id = EXCLUDED.po_id,
|
||||
sku = EXCLUDED.sku,
|
||||
name = EXCLUDED.name,
|
||||
vendor = EXCLUDED.vendor,
|
||||
qty_each = EXCLUDED.qty_each,
|
||||
qty_each_orig = EXCLUDED.qty_each_orig,
|
||||
cost_each = EXCLUDED.cost_each,
|
||||
cost_each_orig = EXCLUDED.cost_each_orig,
|
||||
received_by = EXCLUDED.received_by,
|
||||
received_by_name = EXCLUDED.received_by_name,
|
||||
received_date = EXCLUDED.received_date,
|
||||
receiving_created_date = EXCLUDED.receiving_created_date,
|
||||
supplier_id = EXCLUDED.supplier_id,
|
||||
status = EXCLUDED.status,
|
||||
sku = EXCLUDED.sku,
|
||||
name = EXCLUDED.name
|
||||
status = EXCLUDED.status
|
||||
`, values);
|
||||
}
|
||||
|
||||
@@ -519,64 +604,13 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
|
||||
}
|
||||
}
|
||||
|
||||
// Add this section before the FIFO steps to create a supplier names mapping
|
||||
// Add this section to filter out invalid PIDs before final import
|
||||
outputProgress({
|
||||
status: "running",
|
||||
operation: "Purchase orders import",
|
||||
message: "Fetching supplier data for vendor mapping"
|
||||
message: "Validating product IDs before final import"
|
||||
});
|
||||
|
||||
// Fetch supplier data from production and store in a temp table
|
||||
const [suppliers] = await prodConnection.query(`
|
||||
SELECT
|
||||
supplierid,
|
||||
companyname
|
||||
FROM suppliers
|
||||
WHERE companyname IS NOT NULL AND companyname != ''
|
||||
`);
|
||||
|
||||
if (suppliers.length > 0) {
|
||||
// Create temp table for supplier names
|
||||
await localConnection.query(`
|
||||
DROP TABLE IF EXISTS temp_supplier_names;
|
||||
CREATE TEMP TABLE temp_supplier_names (
|
||||
supplier_id INTEGER PRIMARY KEY,
|
||||
company_name TEXT NOT NULL
|
||||
);
|
||||
`);
|
||||
|
||||
// Insert supplier data in batches
|
||||
for (let i = 0; i < suppliers.length; i += INSERT_BATCH_SIZE) {
|
||||
const batch = suppliers.slice(i, i + INSERT_BATCH_SIZE);
|
||||
|
||||
const placeholders = batch.map((_, idx) => {
|
||||
const base = idx * 2;
|
||||
return `($${base + 1}, $${base + 2})`;
|
||||
}).join(',');
|
||||
|
||||
const values = batch.flatMap(s => [
|
||||
s.supplierid,
|
||||
s.companyname || 'Unnamed Supplier'
|
||||
]);
|
||||
|
||||
await localConnection.query(`
|
||||
INSERT INTO temp_supplier_names (supplier_id, company_name)
|
||||
VALUES ${placeholders}
|
||||
ON CONFLICT (supplier_id) DO UPDATE SET
|
||||
company_name = EXCLUDED.company_name
|
||||
`, values);
|
||||
}
|
||||
}
|
||||
|
||||
// 3. Implement FIFO allocation of receivings to purchase orders
|
||||
outputProgress({
|
||||
status: "running",
|
||||
operation: "Purchase orders import",
|
||||
message: "Validating product IDs before allocation"
|
||||
});
|
||||
|
||||
// Add this section to filter out invalid PIDs before allocation
|
||||
// This will check all PIDs in our temp tables against the products table
|
||||
await localConnection.query(`
|
||||
-- Create temp table to store invalid PIDs
|
||||
DROP TABLE IF EXISTS temp_invalid_pids;
|
||||
@@ -614,367 +648,107 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
|
||||
console.log(`Filtered out ${filteredCount} items with invalid product IDs`);
|
||||
}
|
||||
|
||||
// Break FIFO allocation into steps with progress tracking
|
||||
const fifoSteps = [
|
||||
{
|
||||
name: "Direct allocations",
|
||||
query: `
|
||||
INSERT INTO temp_receiving_allocations (
|
||||
po_id, pid, receiving_id, allocated_qty, cost_each, received_date, received_by
|
||||
)
|
||||
SELECT
|
||||
r.po_id,
|
||||
r.pid,
|
||||
r.receiving_id,
|
||||
LEAST(r.qty_each, po.ordered) as allocated_qty,
|
||||
r.cost_each,
|
||||
COALESCE(r.received_date, NOW()) as received_date,
|
||||
r.received_by
|
||||
FROM temp_receivings r
|
||||
JOIN temp_purchase_orders po ON r.po_id = po.po_id AND r.pid = po.pid
|
||||
WHERE r.po_id IS NOT NULL
|
||||
`
|
||||
},
|
||||
{
|
||||
name: "Handling standalone receivings",
|
||||
query: `
|
||||
INSERT INTO temp_purchase_orders (
|
||||
po_id, pid, sku, name, vendor, date, status,
|
||||
ordered, po_cost_price, supplier_id, date_created, date_ordered
|
||||
)
|
||||
SELECT
|
||||
r.receiving_id::text as po_id,
|
||||
r.pid,
|
||||
r.sku,
|
||||
r.name,
|
||||
COALESCE(
|
||||
-- First, check if we already have a vendor name from the temp_purchase_orders table
|
||||
(SELECT vendor FROM temp_purchase_orders
|
||||
WHERE supplier_id = r.supplier_id LIMIT 1),
|
||||
-- Next, check the supplier_names mapping table we created
|
||||
(SELECT company_name FROM temp_supplier_names
|
||||
WHERE supplier_id = r.supplier_id),
|
||||
-- If both fail, use a generic name with the supplier ID
|
||||
CASE
|
||||
WHEN r.supplier_id IS NOT NULL THEN 'Supplier #' || r.supplier_id::text
|
||||
ELSE 'Unknown Supplier'
|
||||
END
|
||||
) as vendor,
|
||||
COALESCE(r.received_date, r.receiving_created_date) as date,
|
||||
'created' as status,
|
||||
NULL as ordered,
|
||||
r.cost_each as po_cost_price,
|
||||
r.supplier_id,
|
||||
COALESCE(r.receiving_created_date, r.received_date) as date_created,
|
||||
NULL as date_ordered
|
||||
FROM temp_receivings r
|
||||
WHERE r.po_id IS NULL
|
||||
OR NOT EXISTS (
|
||||
SELECT 1 FROM temp_purchase_orders po
|
||||
WHERE po.po_id = r.po_id AND po.pid = r.pid
|
||||
)
|
||||
ON CONFLICT (po_id, pid) DO NOTHING
|
||||
`
|
||||
},
|
||||
{
|
||||
name: "Allocating standalone receivings",
|
||||
query: `
|
||||
INSERT INTO temp_receiving_allocations (
|
||||
po_id, pid, receiving_id, allocated_qty, cost_each, received_date, received_by
|
||||
)
|
||||
SELECT
|
||||
r.receiving_id::text as po_id,
|
||||
r.pid,
|
||||
r.receiving_id,
|
||||
r.qty_each as allocated_qty,
|
||||
r.cost_each,
|
||||
COALESCE(r.received_date, NOW()) as received_date,
|
||||
r.received_by
|
||||
FROM temp_receivings r
|
||||
WHERE r.po_id IS NULL
|
||||
OR NOT EXISTS (
|
||||
SELECT 1 FROM temp_purchase_orders po
|
||||
WHERE po.po_id = r.po_id AND po.pid = r.pid
|
||||
)
|
||||
`
|
||||
},
|
||||
{
|
||||
name: "FIFO allocation logic",
|
||||
query: `
|
||||
WITH
|
||||
-- Calculate remaining quantities after direct allocations
|
||||
remaining_po_quantities AS (
|
||||
SELECT
|
||||
po.po_id,
|
||||
po.pid,
|
||||
po.ordered,
|
||||
COALESCE(SUM(ra.allocated_qty), 0) as already_allocated,
|
||||
po.ordered - COALESCE(SUM(ra.allocated_qty), 0) as remaining_qty,
|
||||
po.date_ordered,
|
||||
po.date_created
|
||||
FROM temp_purchase_orders po
|
||||
LEFT JOIN temp_receiving_allocations ra ON po.po_id = ra.po_id AND po.pid = ra.pid
|
||||
WHERE po.ordered IS NOT NULL
|
||||
GROUP BY po.po_id, po.pid, po.ordered, po.date_ordered, po.date_created
|
||||
HAVING po.ordered > COALESCE(SUM(ra.allocated_qty), 0)
|
||||
),
|
||||
remaining_receiving_quantities AS (
|
||||
SELECT
|
||||
r.receiving_id,
|
||||
r.pid,
|
||||
r.qty_each,
|
||||
COALESCE(SUM(ra.allocated_qty), 0) as already_allocated,
|
||||
r.qty_each - COALESCE(SUM(ra.allocated_qty), 0) as remaining_qty,
|
||||
r.received_date,
|
||||
r.cost_each,
|
||||
r.received_by
|
||||
FROM temp_receivings r
|
||||
LEFT JOIN temp_receiving_allocations ra ON r.receiving_id = ra.receiving_id AND r.pid = ra.pid
|
||||
GROUP BY r.receiving_id, r.pid, r.qty_each, r.received_date, r.cost_each, r.received_by
|
||||
HAVING r.qty_each > COALESCE(SUM(ra.allocated_qty), 0)
|
||||
),
|
||||
-- Rank POs by age, with a cutoff for very old POs (1 year)
|
||||
ranked_pos AS (
|
||||
SELECT
|
||||
po.po_id,
|
||||
po.pid,
|
||||
po.remaining_qty,
|
||||
CASE
|
||||
WHEN po.date_ordered IS NULL OR po.date_ordered < NOW() - INTERVAL '1 year' THEN 2
|
||||
ELSE 1
|
||||
END as age_group,
|
||||
ROW_NUMBER() OVER (
|
||||
PARTITION BY po.pid, (CASE WHEN po.date_ordered IS NULL OR po.date_ordered < NOW() - INTERVAL '1 year' THEN 2 ELSE 1 END)
|
||||
ORDER BY COALESCE(po.date_ordered, po.date_created, NOW())
|
||||
) as rank_in_group
|
||||
FROM remaining_po_quantities po
|
||||
),
|
||||
-- Rank receivings by date
|
||||
ranked_receivings AS (
|
||||
SELECT
|
||||
r.receiving_id,
|
||||
r.pid,
|
||||
r.remaining_qty,
|
||||
r.received_date,
|
||||
r.cost_each,
|
||||
r.received_by,
|
||||
ROW_NUMBER() OVER (PARTITION BY r.pid ORDER BY COALESCE(r.received_date, NOW())) as rank
|
||||
FROM remaining_receiving_quantities r
|
||||
),
|
||||
-- First allocate to recent POs
|
||||
allocations_recent AS (
|
||||
SELECT
|
||||
po.po_id,
|
||||
po.pid,
|
||||
r.receiving_id,
|
||||
LEAST(po.remaining_qty, r.remaining_qty) as allocated_qty,
|
||||
r.cost_each,
|
||||
COALESCE(r.received_date, NOW()) as received_date,
|
||||
r.received_by,
|
||||
po.age_group,
|
||||
po.rank_in_group,
|
||||
r.rank,
|
||||
'recent' as allocation_type
|
||||
FROM ranked_pos po
|
||||
JOIN ranked_receivings r ON po.pid = r.pid
|
||||
WHERE po.age_group = 1
|
||||
ORDER BY po.pid, po.rank_in_group, r.rank
|
||||
),
|
||||
-- Then allocate to older POs
|
||||
remaining_after_recent AS (
|
||||
SELECT
|
||||
r.receiving_id,
|
||||
r.pid,
|
||||
r.remaining_qty - COALESCE(SUM(a.allocated_qty), 0) as remaining_qty,
|
||||
r.received_date,
|
||||
r.cost_each,
|
||||
r.received_by,
|
||||
r.rank
|
||||
FROM ranked_receivings r
|
||||
LEFT JOIN allocations_recent a ON r.receiving_id = a.receiving_id AND r.pid = a.pid
|
||||
GROUP BY r.receiving_id, r.pid, r.remaining_qty, r.received_date, r.cost_each, r.received_by, r.rank
|
||||
HAVING r.remaining_qty > COALESCE(SUM(a.allocated_qty), 0)
|
||||
),
|
||||
allocations_old AS (
|
||||
SELECT
|
||||
po.po_id,
|
||||
po.pid,
|
||||
r.receiving_id,
|
||||
LEAST(po.remaining_qty, r.remaining_qty) as allocated_qty,
|
||||
r.cost_each,
|
||||
COALESCE(r.received_date, NOW()) as received_date,
|
||||
r.received_by,
|
||||
po.age_group,
|
||||
po.rank_in_group,
|
||||
r.rank,
|
||||
'old' as allocation_type
|
||||
FROM ranked_pos po
|
||||
JOIN remaining_after_recent r ON po.pid = r.pid
|
||||
WHERE po.age_group = 2
|
||||
ORDER BY po.pid, po.rank_in_group, r.rank
|
||||
),
|
||||
-- Combine allocations
|
||||
combined_allocations AS (
|
||||
SELECT * FROM allocations_recent
|
||||
UNION ALL
|
||||
SELECT * FROM allocations_old
|
||||
)
|
||||
-- Insert into allocations table
|
||||
INSERT INTO temp_receiving_allocations (
|
||||
po_id, pid, receiving_id, allocated_qty, cost_each, received_date, received_by
|
||||
)
|
||||
SELECT
|
||||
po_id, pid, receiving_id, allocated_qty, cost_each,
|
||||
COALESCE(received_date, NOW()) as received_date,
|
||||
received_by
|
||||
FROM combined_allocations
|
||||
WHERE allocated_qty > 0
|
||||
`
|
||||
}
|
||||
];
|
||||
|
||||
// Execute FIFO steps with progress tracking
|
||||
for (let i = 0; i < fifoSteps.length; i++) {
|
||||
const step = fifoSteps[i];
|
||||
outputProgress({
|
||||
status: "running",
|
||||
operation: "Purchase orders import",
|
||||
message: `FIFO allocation step ${i+1}/${fifoSteps.length}: ${step.name}`,
|
||||
current: i,
|
||||
total: fifoSteps.length
|
||||
});
|
||||
|
||||
await localConnection.query(step.query);
|
||||
}
|
||||
|
||||
// 4. Generate final purchase order records with receiving data
|
||||
// 3. Insert final purchase order records to the actual table
|
||||
outputProgress({
|
||||
status: "running",
|
||||
operation: "Purchase orders import",
|
||||
message: "Generating final purchase order records"
|
||||
message: "Inserting final purchase order records"
|
||||
});
|
||||
|
||||
const [finalResult] = await localConnection.query(`
|
||||
WITH
|
||||
receiving_summaries AS (
|
||||
SELECT
|
||||
po_id,
|
||||
pid,
|
||||
SUM(allocated_qty) as total_received,
|
||||
JSONB_AGG(
|
||||
JSONB_BUILD_OBJECT(
|
||||
'receiving_id', receiving_id,
|
||||
'qty', allocated_qty,
|
||||
'date', COALESCE(received_date, NOW()),
|
||||
'cost', cost_each,
|
||||
'received_by', received_by,
|
||||
'received_by_name', CASE
|
||||
WHEN received_by IS NOT NULL AND received_by > 0 THEN
|
||||
(SELECT CONCAT(firstname, ' ', lastname)
|
||||
FROM employee_names
|
||||
WHERE employeeid = received_by)
|
||||
ELSE NULL
|
||||
END
|
||||
) ORDER BY COALESCE(received_date, NOW())
|
||||
) as receiving_history,
|
||||
MIN(COALESCE(received_date, NOW())) as first_received_date,
|
||||
MAX(COALESCE(received_date, NOW())) as last_received_date,
|
||||
STRING_AGG(
|
||||
DISTINCT CASE WHEN received_by IS NOT NULL AND received_by > 0
|
||||
THEN CAST(received_by AS TEXT)
|
||||
ELSE NULL
|
||||
END,
|
||||
','
|
||||
) as received_by_list,
|
||||
STRING_AGG(
|
||||
DISTINCT CASE
|
||||
WHEN ra.received_by IS NOT NULL AND ra.received_by > 0 THEN
|
||||
(SELECT CONCAT(firstname, ' ', lastname)
|
||||
FROM employee_names
|
||||
WHERE employeeid = ra.received_by)
|
||||
ELSE NULL
|
||||
END,
|
||||
', '
|
||||
) as received_by_names
|
||||
FROM temp_receiving_allocations ra
|
||||
GROUP BY po_id, pid
|
||||
),
|
||||
cost_averaging AS (
|
||||
SELECT
|
||||
ra.po_id,
|
||||
ra.pid,
|
||||
SUM(ra.allocated_qty * ra.cost_each) / NULLIF(SUM(ra.allocated_qty), 0) as avg_cost
|
||||
FROM temp_receiving_allocations ra
|
||||
GROUP BY ra.po_id, ra.pid
|
||||
)
|
||||
const [poResult] = await localConnection.query(`
|
||||
INSERT INTO purchase_orders (
|
||||
po_id, vendor, date, expected_date, pid, sku, name,
|
||||
cost_price, po_cost_price, status, receiving_status, notes, long_note,
|
||||
ordered, received, received_date, last_received_date, received_by,
|
||||
receiving_history
|
||||
po_cost_price, status, notes, long_note,
|
||||
ordered, supplier_id, date_created, date_ordered
|
||||
)
|
||||
SELECT
|
||||
po.po_id,
|
||||
po.vendor,
|
||||
CASE
|
||||
WHEN po.date IS NOT NULL THEN po.date
|
||||
-- For standalone receivings, try to use the receiving date from history
|
||||
WHEN po.po_id LIKE 'R%' AND rs.first_received_date IS NOT NULL THEN rs.first_received_date
|
||||
-- As a last resort for data integrity, use Unix epoch (Jan 1, 1970)
|
||||
ELSE to_timestamp(0)
|
||||
END as date,
|
||||
NULLIF(po.expected_date::text, '0000-00-00')::date as expected_date,
|
||||
po.pid,
|
||||
po.sku,
|
||||
po.name,
|
||||
COALESCE(ca.avg_cost, po.po_cost_price) as cost_price,
|
||||
po.po_cost_price,
|
||||
COALESCE(po.status, 'created'),
|
||||
CASE
|
||||
WHEN rs.total_received IS NULL THEN 'created'
|
||||
WHEN rs.total_received = 0 THEN 'created'
|
||||
WHEN rs.total_received < po.ordered THEN 'partial_received'
|
||||
WHEN rs.total_received >= po.ordered THEN 'full_received'
|
||||
ELSE 'created'
|
||||
END as receiving_status,
|
||||
po.notes,
|
||||
po.long_note,
|
||||
COALESCE(po.ordered, 0),
|
||||
COALESCE(rs.total_received, 0),
|
||||
NULLIF(rs.first_received_date::text, '0000-00-00 00:00:00')::timestamp with time zone as received_date,
|
||||
NULLIF(rs.last_received_date::text, '0000-00-00 00:00:00')::timestamp with time zone as last_received_date,
|
||||
CASE
|
||||
WHEN rs.received_by_list IS NULL THEN NULL
|
||||
ELSE rs.received_by_names
|
||||
END as received_by,
|
||||
rs.receiving_history
|
||||
FROM temp_purchase_orders po
|
||||
LEFT JOIN receiving_summaries rs ON po.po_id = rs.po_id AND po.pid = rs.pid
|
||||
LEFT JOIN cost_averaging ca ON po.po_id = ca.po_id AND po.pid = ca.pid
|
||||
ON CONFLICT (po_id, pid) DO UPDATE SET
|
||||
vendor = EXCLUDED.vendor,
|
||||
date = EXCLUDED.date,
|
||||
expected_date = EXCLUDED.expected_date,
|
||||
po_id,
|
||||
vendor,
|
||||
COALESCE(date, date_created, now()) as date,
|
||||
expected_date,
|
||||
pid,
|
||||
sku,
|
||||
name,
|
||||
po_cost_price,
|
||||
status,
|
||||
notes,
|
||||
long_note,
|
||||
ordered,
|
||||
supplier_id,
|
||||
date_created,
|
||||
date_ordered
|
||||
FROM temp_purchase_orders
|
||||
ON CONFLICT (po_id, pid) DO UPDATE SET
|
||||
vendor = EXCLUDED.vendor,
|
||||
date = EXCLUDED.date,
|
||||
expected_date = EXCLUDED.expected_date,
|
||||
sku = EXCLUDED.sku,
|
||||
name = EXCLUDED.name,
|
||||
cost_price = EXCLUDED.cost_price,
|
||||
po_cost_price = EXCLUDED.po_cost_price,
|
||||
status = EXCLUDED.status,
|
||||
receiving_status = EXCLUDED.receiving_status,
|
||||
notes = EXCLUDED.notes,
|
||||
status = EXCLUDED.status,
|
||||
notes = EXCLUDED.notes,
|
||||
long_note = EXCLUDED.long_note,
|
||||
ordered = EXCLUDED.ordered,
|
||||
received = EXCLUDED.received,
|
||||
received_date = EXCLUDED.received_date,
|
||||
last_received_date = EXCLUDED.last_received_date,
|
||||
received_by = EXCLUDED.received_by,
|
||||
receiving_history = EXCLUDED.receiving_history,
|
||||
ordered = EXCLUDED.ordered,
|
||||
supplier_id = EXCLUDED.supplier_id,
|
||||
date_created = EXCLUDED.date_created,
|
||||
date_ordered = EXCLUDED.date_ordered,
|
||||
updated = CURRENT_TIMESTAMP
|
||||
RETURNING (xmax = 0) as inserted
|
||||
`);
|
||||
|
||||
recordsAdded = finalResult.rows.filter(r => r.inserted).length;
|
||||
recordsUpdated = finalResult.rows.filter(r => !r.inserted).length;
|
||||
poRecordsAdded = poResult.rows.filter(r => r.inserted).length;
|
||||
poRecordsUpdated = poResult.rows.filter(r => !r.inserted).length;
|
||||
|
||||
// 4. Insert final receiving records to the actual table
|
||||
outputProgress({
|
||||
status: "running",
|
||||
operation: "Purchase orders import",
|
||||
message: "Inserting final receiving records"
|
||||
});
|
||||
|
||||
const [receivingsResult] = await localConnection.query(`
|
||||
INSERT INTO receivings (
|
||||
receiving_id, pid, sku, name, vendor, qty_each, qty_each_orig,
|
||||
cost_each, cost_each_orig, received_by, received_by_name,
|
||||
received_date, receiving_created_date, supplier_id, status
|
||||
)
|
||||
SELECT
|
||||
receiving_id,
|
||||
pid,
|
||||
sku,
|
||||
name,
|
||||
vendor,
|
||||
qty_each,
|
||||
qty_each_orig,
|
||||
cost_each,
|
||||
cost_each_orig,
|
||||
received_by,
|
||||
received_by_name,
|
||||
COALESCE(received_date, receiving_created_date, now()) as received_date,
|
||||
receiving_created_date,
|
||||
supplier_id,
|
||||
status
|
||||
FROM temp_receivings
|
||||
ON CONFLICT (receiving_id, pid) DO UPDATE SET
|
||||
sku = EXCLUDED.sku,
|
||||
name = EXCLUDED.name,
|
||||
vendor = EXCLUDED.vendor,
|
||||
qty_each = EXCLUDED.qty_each,
|
||||
qty_each_orig = EXCLUDED.qty_each_orig,
|
||||
cost_each = EXCLUDED.cost_each,
|
||||
cost_each_orig = EXCLUDED.cost_each_orig,
|
||||
received_by = EXCLUDED.received_by,
|
||||
received_by_name = EXCLUDED.received_by_name,
|
||||
received_date = EXCLUDED.received_date,
|
||||
receiving_created_date = EXCLUDED.receiving_created_date,
|
||||
supplier_id = EXCLUDED.supplier_id,
|
||||
status = EXCLUDED.status,
|
||||
updated = CURRENT_TIMESTAMP
|
||||
RETURNING (xmax = 0) as inserted
|
||||
`);
|
||||
|
||||
receivingRecordsAdded = receivingsResult.rows.filter(r => r.inserted).length;
|
||||
receivingRecordsUpdated = receivingsResult.rows.filter(r => !r.inserted).length;
|
||||
|
||||
// Update sync status
|
||||
await localConnection.query(`
|
||||
@@ -988,9 +762,9 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
|
||||
await localConnection.query(`
|
||||
DROP TABLE IF EXISTS temp_purchase_orders;
|
||||
DROP TABLE IF EXISTS temp_receivings;
|
||||
DROP TABLE IF EXISTS temp_receiving_allocations;
|
||||
DROP TABLE IF EXISTS employee_names;
|
||||
DROP TABLE IF EXISTS temp_supplier_names;
|
||||
DROP TABLE IF EXISTS temp_invalid_pids;
|
||||
`);
|
||||
|
||||
// Commit transaction
|
||||
@@ -998,8 +772,12 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
|
||||
|
||||
return {
|
||||
status: "complete",
|
||||
recordsAdded: recordsAdded || 0,
|
||||
recordsUpdated: recordsUpdated || 0,
|
||||
recordsAdded: poRecordsAdded + receivingRecordsAdded,
|
||||
recordsUpdated: poRecordsUpdated + receivingRecordsUpdated,
|
||||
poRecordsAdded,
|
||||
poRecordsUpdated,
|
||||
receivingRecordsAdded,
|
||||
receivingRecordsUpdated,
|
||||
totalRecords: totalProcessed
|
||||
};
|
||||
} catch (error) {
|
||||
|
||||
Reference in New Issue
Block a user