Fix purchase orders import

This commit is contained in:
2025-03-25 19:12:41 -04:00
parent 7b0e792d03
commit 5dd779cb4a
5 changed files with 1136 additions and 115 deletions

View File

@@ -0,0 +1,342 @@
# MySQL to PostgreSQL Import Process Documentation
This document outlines the data import process from the production MySQL database to the local PostgreSQL database, focusing on column mappings, data transformations, and the overall import architecture.
## Table of Contents
1. [Overview](#overview)
2. [Import Architecture](#import-architecture)
3. [Column Mappings](#column-mappings)
- [Categories](#categories)
- [Products](#products)
- [Product Categories (Relationship)](#product-categories-relationship)
- [Orders](#orders)
- [Purchase Orders](#purchase-orders)
- [Metadata Tables](#metadata-tables)
4. [Special Calculations](#special-calculations)
5. [Implementation Notes](#implementation-notes)
## Overview
The import process extracts data from a MySQL 5.7 production database and imports it into a PostgreSQL database. It can operate in two modes:
- **Full Import**: Imports all data regardless of last sync time
- **Incremental Import**: Only imports data that has changed since the last import
The process handles four main data types:
- Categories (product categorization hierarchy)
- Products (inventory items)
- Orders (sales records)
- Purchase Orders (vendor orders)
## Import Architecture
The import process follows these steps:
1. **Establish Connection**: Creates a SSH tunnel to the production server and establishes database connections
2. **Setup Import History**: Creates a record of the current import operation
3. **Import Categories**: Processes product categories in hierarchical order
4. **Import Products**: Processes products with their attributes and category relationships
5. **Import Orders**: Processes customer orders with line items, taxes, and discounts
6. **Import Purchase Orders**: Processes vendor purchase orders with line items
7. **Record Results**: Updates the import history with results
8. **Close Connections**: Cleans up connections and resources
Each import step uses temporary tables for processing and wraps operations in transactions to ensure data consistency.
## Column Mappings
### Categories
| PostgreSQL Column | MySQL Source | Transformation |
|-------------------|---------------------------------|----------------------------------------------|
| cat_id | product_categories.cat_id | Direct mapping |
| name | product_categories.name | Direct mapping |
| type | product_categories.type | Direct mapping |
| parent_id | product_categories.master_cat_id| NULL for top-level categories (types 10, 20) |
| description | product_categories.combined_name| Direct mapping |
| status | N/A | Hard-coded 'active' |
| created_at | N/A | Current timestamp |
| updated_at | N/A | Current timestamp |
**Notes:**
- Categories are processed in hierarchical order by type: [10, 20, 11, 21, 12, 13]
- Type 10/20 are top-level categories with no parent
- Types 11/21/12/13 are child categories that reference parent categories
### Products
| PostgreSQL Column | MySQL Source | Transformation |
|----------------------|----------------------------------|---------------------------------------------------------------|
| pid | products.pid | Direct mapping |
| title | products.description | Direct mapping |
| description | products.notes | Direct mapping |
| sku | products.itemnumber | Fallback to 'NO-SKU' if empty |
| stock_quantity | shop_inventory.available_local | Capped at 5000, minimum 0 |
| preorder_count | current_inventory.onpreorder | Default 0 |
| notions_inv_count | product_notions_b2b.inventory | Default 0 |
| price | product_current_prices.price_each| Default 0, filtered on active=1 |
| regular_price | products.sellingprice | Default 0 |
| cost_price | product_inventory | Weighted average: SUM(costeach * count) / SUM(count) when count > 0, or latest costeach |
| vendor | suppliers.companyname | Via supplier_item_data.supplier_id |
| vendor_reference | supplier_item_data | supplier_itemnumber or notions_itemnumber based on vendor |
| notions_reference | supplier_item_data.notions_itemnumber | Direct mapping |
| brand | product_categories.name | Linked via products.company |
| line | product_categories.name | Linked via products.line |
| subline | product_categories.name | Linked via products.subline |
| artist | product_categories.name | Linked via products.artist |
| categories | product_category_index | Comma-separated list of category IDs |
| created_at | products.date_created | Validated date, NULL if invalid |
| first_received | products.datein | Validated date, NULL if invalid |
| landing_cost_price | NULL | Not set |
| barcode | products.upc | Direct mapping |
| harmonized_tariff_code| products.harmonized_tariff_code | Direct mapping |
| updated_at | products.stamp | Validated date, NULL if invalid |
| visible | shop_inventory | Calculated from show + buyable > 0 |
| managing_stock | N/A | Hard-coded true |
| replenishable | Multiple fields | Complex calculation based on reorder, dates, etc. |
| permalink | N/A | Constructed URL with product ID |
| moq | supplier_item_data | notions_qty_per_unit or supplier_qty_per_unit, minimum 1 |
| uom | N/A | Hard-coded 1 |
| rating | products.rating | Direct mapping |
| reviews | products.rating_votes | Direct mapping |
| weight | products.weight | Direct mapping |
| length | products.length | Direct mapping |
| width | products.width | Direct mapping |
| height | products.height | Direct mapping |
| country_of_origin | products.country_of_origin | Direct mapping |
| location | products.location | Direct mapping |
| total_sold | order_items | SUM(qty_ordered) for all order_items where prod_pid = pid |
| baskets | mybasket | COUNT of records where mb.item = pid and qty > 0 |
| notifies | product_notify | COUNT of records where pn.pid = pid |
| date_last_sold | product_last_sold.date_sold | Validated date, NULL if invalid |
| image | N/A | Constructed from pid and image URL pattern |
| image_175 | N/A | Constructed from pid and image URL pattern |
| image_full | N/A | Constructed from pid and image URL pattern |
| options | NULL | Not set |
| tags | NULL | Not set |
**Notes:**
- Replenishable calculation:
```javascript
CASE
WHEN p.reorder < 0 THEN 0
WHEN (
(COALESCE(pls.date_sold, '0000-00-00') = '0000-00-00' OR pls.date_sold <= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR))
AND (p.datein = '0000-00-00 00:00:00' OR p.datein <= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL 5 YEAR))
AND (p.date_refill = '0000-00-00 00:00:00' OR p.date_refill <= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL 5 YEAR))
) THEN 0
ELSE 1
END
```
In business terms, a product is considered NOT replenishable only if:
- It was manually flagged as not replenishable (negative reorder value)
- OR it shows no activity across ALL metrics (no sales AND no receipts AND no refills in the past 5 years)
- Image URLs are constructed using this pattern:
```javascript
const paddedPid = pid.toString().padStart(6, '0');
const prefix = paddedPid.slice(0, 3);
const basePath = `${imageUrlBase}${prefix}/${pid}`;
return {
image: `${basePath}-t-${iid}.jpg`,
image_175: `${basePath}-175x175-${iid}.jpg`,
image_full: `${basePath}-o-${iid}.jpg`
};
```
### Product Categories (Relationship)
| PostgreSQL Column | MySQL Source | Transformation |
|-------------------|-----------------------------------|---------------------------------------------------------------|
| pid | products.pid | Direct mapping |
| cat_id | product_category_index.cat_id | Direct mapping, filtered by category types |
**Notes:**
- Only categories of types 10, 20, 11, 21, 12, 13 are imported
- Categories 16 and 17 are explicitly excluded
### Orders
| PostgreSQL Column | MySQL Source | Transformation |
|-------------------|-----------------------------------|---------------------------------------------------------------|
| order_number | order_items.order_id | Direct mapping |
| pid | order_items.prod_pid | Direct mapping |
| sku | order_items.prod_itemnumber | Fallback to 'NO-SKU' if empty |
| date | _order.date_placed_onlydate | Via join to _order table |
| price | order_items.prod_price | Direct mapping |
| quantity | order_items.qty_ordered | Direct mapping |
| discount | Multiple sources | Complex calculation (see notes) |
| tax | order_tax_info_products.item_taxes_to_collect | Via latest order_tax_info record |
| tax_included | N/A | Hard-coded false |
| shipping | N/A | Hard-coded 0 |
| customer | _order.order_cid | Direct mapping |
| customer_name | users | CONCAT(users.firstname, ' ', users.lastname) |
| status | _order.order_status | Direct mapping |
| canceled | _order.date_cancelled | Boolean: true if date_cancelled is not '0000-00-00 00:00:00' |
| costeach | order_costs | From latest record or fallback to price * 0.5 |
**Notes:**
- Only orders with order_status >= 15 and with a valid date_placed are processed
- For incremental imports, only orders modified since last sync are processed
- Discount calculation combines three sources:
1. Base discount: order_items.prod_price_reg - order_items.prod_price
2. Promo discount: SUM of order_discount_items.amount
3. Proportional order discount: Calculation based on order subtotal proportion
```javascript
(oi.base_discount +
COALESCE(ot.promo_discount, 0) +
CASE
WHEN om.summary_discount > 0 AND om.summary_subtotal > 0 THEN
ROUND((om.summary_discount * (oi.price * oi.quantity)) / NULLIF(om.summary_subtotal, 0), 2)
ELSE 0
END)::DECIMAL(10,2)
```
- Taxes are taken from the latest tax record for an order
- Cost data is taken from the latest non-pending cost record
### Purchase Orders
| PostgreSQL Column | MySQL Source | Transformation |
|-------------------|-----------------------------------|---------------------------------------------------------------|
| po_id | po.po_id | Default 0 if NULL |
| pid | po_products.pid | Direct mapping |
| sku | products.itemnumber | Fallback to 'NO-SKU' if empty |
| name | products.description | Fallback to 'Unknown Product' |
| cost_price | po_products.cost_each | Direct mapping |
| po_cost_price | po_products.cost_each | Duplicate of cost_price |
| vendor | suppliers.companyname | Fallback to 'Unknown Vendor' if empty |
| date | po.date_ordered | Fallback to po.date_created if NULL |
| expected_date | po.date_estin | Direct mapping |
| status | po.status | Default 1 if NULL |
| notes | po.short_note | Fallback to po.notes if NULL |
| ordered | po_products.qty_each | Direct mapping |
| received | N/A | Hard-coded 0 |
| receiving_status | N/A | Hard-coded 1 |
**Notes:**
- Only POs created within last 1 year (incremental) or 5 years (full) are processed
- For incremental imports, only POs modified since last sync are processed
### Metadata Tables
#### import_history
| PostgreSQL Column | Source | Notes |
|-------------------|-----------------------------------|---------------------------------------------------------------|
| id | Auto-increment | Primary key |
| table_name | Code | 'all_tables' for overall import |
| start_time | NOW() | Import start time |
| end_time | NOW() | Import completion time |
| duration_seconds | Calculation | Elapsed seconds |
| is_incremental | INCREMENTAL_UPDATE | Flag from config |
| records_added | Calculation | Sum from all imports |
| records_updated | Calculation | Sum from all imports |
| status | Code | 'running', 'completed', 'failed', or 'cancelled' |
| error_message | Exception | Error message if failed |
| additional_info | JSON | Configuration and results |
#### sync_status
| PostgreSQL Column | Source | Notes |
|----------------------|--------------------------------|---------------------------------------------------------------|
| table_name | Code | Name of imported table |
| last_sync_timestamp | NOW() | Timestamp of successful sync |
| last_sync_id | NULL | Not used currently |
## Special Calculations
### Date Validation
MySQL dates are validated before insertion into PostgreSQL:
```javascript
function validateDate(mysqlDate) {
if (!mysqlDate || mysqlDate === '0000-00-00' || mysqlDate === '0000-00-00 00:00:00') {
return null;
}
// Check if the date is valid
const date = new Date(mysqlDate);
return isNaN(date.getTime()) ? null : mysqlDate;
}
```
### Retry Mechanism
Operations that might fail temporarily are retried with exponential backoff:
```javascript
async function withRetry(operation, errorMessage) {
let lastError;
for (let attempt = 1; attempt <= MAX_RETRIES; attempt++) {
try {
return await operation();
} catch (error) {
lastError = error;
console.error(`${errorMessage} (Attempt ${attempt}/${MAX_RETRIES}):`, error);
if (attempt < MAX_RETRIES) {
const backoffTime = RETRY_DELAY * Math.pow(2, attempt - 1);
await new Promise(resolve => setTimeout(resolve, backoffTime));
}
}
}
throw lastError;
}
```
### Progress Tracking
Progress is tracked with estimated time remaining:
```javascript
function estimateRemaining(startTime, current, total) {
if (current === 0) return "Calculating...";
const elapsedSeconds = (Date.now() - startTime) / 1000;
const itemsPerSecond = current / elapsedSeconds;
const remainingItems = total - current;
const remainingSeconds = remainingItems / itemsPerSecond;
return formatElapsedTime(remainingSeconds);
}
```
## Implementation Notes
### Transaction Management
All imports use transactions to ensure data consistency:
- **Categories**: Uses savepoints for each category type
- **Products**: Uses a single transaction for the entire import
- **Orders**: Uses a single transaction with temporary tables
- **Purchase Orders**: Uses a single transaction with temporary tables
### Memory Usage Optimization
To minimize memory usage when processing large datasets:
1. Data is processed in batches (100-5000 records per batch)
2. Temporary tables are used for intermediate data
3. Some queries use cursors to avoid loading all results at once
### MySQL vs PostgreSQL Compatibility
The scripts handle differences between MySQL and PostgreSQL:
1. MySQL-specific syntax like `USE INDEX` is removed for PostgreSQL
2. `GROUP_CONCAT` in MySQL becomes string operations in PostgreSQL
3. Transaction syntax differences are abstracted in the connection wrapper
4. PostgreSQL's `ON CONFLICT` replaces MySQL's `ON DUPLICATE KEY UPDATE`
### SSH Tunnel
Database connections go through an SSH tunnel for security:
```javascript
ssh.forwardOut(
"127.0.0.1",
0,
sshConfig.prodDbConfig.host,
sshConfig.prodDbConfig.port,
async (err, stream) => {
if (err) reject(err);
resolve({ ssh, stream });
}
);
```

View File

@@ -4,7 +4,12 @@ SET session_replication_role = 'replica'; -- Disable foreign key checks tempora
-- Create function for updating timestamps -- Create function for updating timestamps
CREATE OR REPLACE FUNCTION update_updated_column() RETURNS TRIGGER AS $func$ CREATE OR REPLACE FUNCTION update_updated_column() RETURNS TRIGGER AS $func$
BEGIN BEGIN
NEW.updated = CURRENT_TIMESTAMP; -- Check which table is being updated and use the appropriate column
IF TG_TABLE_NAME = 'categories' THEN
NEW.updated_at = CURRENT_TIMESTAMP;
ELSE
NEW.updated = CURRENT_TIMESTAMP;
END IF;
RETURN NEW; RETURN NEW;
END; END;
$func$ language plpgsql; $func$ language plpgsql;
@@ -160,7 +165,7 @@ CREATE TABLE purchase_orders (
expected_date DATE, expected_date DATE,
pid BIGINT NOT NULL, pid BIGINT NOT NULL,
sku VARCHAR(50) NOT NULL, sku VARCHAR(50) NOT NULL,
name VARCHAR(100) NOT NULL, name VARCHAR(255) NOT NULL,
cost_price DECIMAL(10, 3) NOT NULL, cost_price DECIMAL(10, 3) NOT NULL,
po_cost_price DECIMAL(10, 3) NOT NULL, po_cost_price DECIMAL(10, 3) NOT NULL,
status SMALLINT DEFAULT 1, status SMALLINT DEFAULT 1,
@@ -171,7 +176,7 @@ CREATE TABLE purchase_orders (
received INTEGER DEFAULT 0, received INTEGER DEFAULT 0,
received_date DATE, received_date DATE,
last_received_date DATE, last_received_date DATE,
received_by VARCHAR(100), received_by VARCHAR,
receiving_history JSONB, receiving_history JSONB,
updated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, updated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (pid) REFERENCES products(pid), FOREIGN KEY (pid) REFERENCES products(pid),

View File

@@ -142,6 +142,14 @@ async function importCategories(prodConnection, localConnection) {
// Commit the entire transaction - we'll do this even if we have skipped categories // Commit the entire transaction - we'll do this even if we have skipped categories
await localConnection.query('COMMIT'); await localConnection.query('COMMIT');
// Update sync status
await localConnection.query(`
INSERT INTO sync_status (table_name, last_sync_timestamp)
VALUES ('categories', NOW())
ON CONFLICT (table_name) DO UPDATE SET
last_sync_timestamp = NOW()
`);
outputProgress({ outputProgress({
status: "complete", status: "complete",
operation: "Categories import completed", operation: "Categories import completed",

View File

@@ -144,6 +144,8 @@ async function importMissingProducts(prodConnection, localConnection, missingPid
CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible, CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible,
CASE CASE
WHEN p.reorder < 0 THEN 0 WHEN p.reorder < 0 THEN 0
WHEN p.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL 1 YEAR) THEN 1
WHEN COALESCE(pnb.inventory, 0) > 0 THEN 1
WHEN ( WHEN (
(COALESCE(pls.date_sold, '0000-00-00') = '0000-00-00' OR pls.date_sold <= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR)) (COALESCE(pls.date_sold, '0000-00-00') = '0000-00-00' OR pls.date_sold <= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR))
AND (p.datein = '0000-00-00 00:00:00' OR p.datein <= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL 5 YEAR)) AND (p.datein = '0000-00-00 00:00:00' OR p.datein <= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL 5 YEAR))
@@ -343,6 +345,8 @@ async function materializeCalculations(prodConnection, localConnection, incremen
CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible, CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible,
CASE CASE
WHEN p.reorder < 0 THEN 0 WHEN p.reorder < 0 THEN 0
WHEN p.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL 1 YEAR) THEN 1
WHEN COALESCE(pnb.inventory, 0) > 0 THEN 1
WHEN ( WHEN (
(COALESCE(pls.date_sold, '0000-00-00') = '0000-00-00' OR pls.date_sold <= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR)) (COALESCE(pls.date_sold, '0000-00-00') = '0000-00-00' OR pls.date_sold <= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR))
AND (p.datein = '0000-00-00 00:00:00' OR p.datein <= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL 5 YEAR)) AND (p.datein = '0000-00-00 00:00:00' OR p.datein <= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL 5 YEAR))
@@ -840,6 +844,14 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
// Commit the transaction // Commit the transaction
await localConnection.commit(); await localConnection.commit();
// Update sync status
await localConnection.query(`
INSERT INTO sync_status (table_name, last_sync_timestamp)
VALUES ('products', NOW())
ON CONFLICT (table_name) DO UPDATE SET
last_sync_timestamp = NOW()
`);
return { return {
status: 'complete', status: 'complete',
recordsAdded, recordsAdded,

View File

@@ -1,9 +1,56 @@
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress'); const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress');
/**
* Validates a date from MySQL before inserting it into PostgreSQL
* @param {string|Date|null} mysqlDate - Date string or object from MySQL
* @returns {string|null} Valid date string or null if invalid
*/
function validateDate(mysqlDate) {
// Handle null, undefined, or empty values
if (!mysqlDate) {
return null;
}
// Convert to string if it's not already
const dateStr = String(mysqlDate);
// Handle MySQL zero dates and empty values
if (dateStr === '0000-00-00' ||
dateStr === '0000-00-00 00:00:00' ||
dateStr.indexOf('0000-00-00') !== -1 ||
dateStr === '') {
return null;
}
// Check if the date is valid
const date = new Date(mysqlDate);
// If the date is invalid or suspiciously old (pre-1970), return null
if (isNaN(date.getTime()) || date.getFullYear() < 1970) {
return null;
}
return mysqlDate;
}
/**
* Imports purchase orders and receivings from a production MySQL database to a local PostgreSQL database.
* Implements FIFO allocation of receivings to purchase orders.
*
* @param {object} prodConnection - A MySQL connection to production DB
* @param {object} localConnection - A PostgreSQL connection to local DB
* @param {boolean} incrementalUpdate - Set to false for a full sync; true for incremental
* @returns {object} Information about the sync operation
*/
async function importPurchaseOrders(prodConnection, localConnection, incrementalUpdate = true) { async function importPurchaseOrders(prodConnection, localConnection, incrementalUpdate = true) {
const startTime = Date.now(); const startTime = Date.now();
let recordsAdded = 0; let recordsAdded = 0;
let recordsUpdated = 0; let recordsUpdated = 0;
let totalProcessed = 0;
// Batch size constants
const PO_BATCH_SIZE = 500;
const INSERT_BATCH_SIZE = 100;
try { try {
// Begin transaction for the entire import process // Begin transaction for the entire import process
@@ -17,104 +64,244 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
console.log('Purchase Orders: Using last sync time:', lastSyncTime); console.log('Purchase Orders: Using last sync time:', lastSyncTime);
// Create temp tables // Create temp tables for processing
await localConnection.query(` await localConnection.query(`
DROP TABLE IF EXISTS temp_purchase_orders; DROP TABLE IF EXISTS temp_purchase_orders;
CREATE TABLE temp_purchase_orders ( DROP TABLE IF EXISTS temp_receivings;
po_id INTEGER NOT NULL, DROP TABLE IF EXISTS temp_receiving_allocations;
pid INTEGER NOT NULL, DROP TABLE IF EXISTS employee_names;
-- Temporary table for purchase orders
CREATE TEMP TABLE temp_purchase_orders (
po_id VARCHAR(50) NOT NULL,
pid BIGINT NOT NULL,
sku VARCHAR(50), sku VARCHAR(50),
name VARCHAR(255), name VARCHAR(255),
vendor VARCHAR(255), vendor VARCHAR(255),
date TIMESTAMP WITH TIME ZONE, date TIMESTAMP WITH TIME ZONE,
expected_date TIMESTAMP WITH TIME ZONE, expected_date DATE,
status INTEGER, status INTEGER,
status_text VARCHAR(50),
notes TEXT, notes TEXT,
long_note TEXT,
ordered INTEGER, ordered INTEGER,
cost_price DECIMAL(10,3), po_cost_price DECIMAL(10,3),
supplier_id INTEGER,
date_created TIMESTAMP WITH TIME ZONE,
date_ordered TIMESTAMP WITH TIME ZONE,
PRIMARY KEY (po_id, pid) PRIMARY KEY (po_id, pid)
); );
-- Temporary table for receivings
CREATE TEMP TABLE temp_receivings (
receiving_id VARCHAR(50) NOT NULL,
po_id VARCHAR(50),
pid BIGINT NOT NULL,
qty_each INTEGER,
cost_each DECIMAL(10,5),
received_by INTEGER,
received_date TIMESTAMP WITH TIME ZONE,
receiving_created_date TIMESTAMP WITH TIME ZONE,
supplier_id INTEGER,
status INTEGER,
status_text VARCHAR(50),
PRIMARY KEY (receiving_id, pid)
);
-- Temporary table for tracking FIFO allocations
CREATE TEMP TABLE temp_receiving_allocations (
po_id VARCHAR(50) NOT NULL,
pid BIGINT NOT NULL,
receiving_id VARCHAR(50) NOT NULL,
allocated_qty INTEGER NOT NULL,
cost_each DECIMAL(10,5) NOT NULL,
received_date TIMESTAMP WITH TIME ZONE NOT NULL,
received_by INTEGER,
PRIMARY KEY (po_id, pid, receiving_id)
);
-- Temporary table for employee names
CREATE TEMP TABLE employee_names (
employeeid INTEGER PRIMARY KEY,
firstname VARCHAR(100),
lastname VARCHAR(100)
);
-- Create indexes for efficient joins
CREATE INDEX idx_temp_po_pid ON temp_purchase_orders(pid);
CREATE INDEX idx_temp_receiving_pid ON temp_receivings(pid);
CREATE INDEX idx_temp_receiving_po_id ON temp_receivings(po_id);
`); `);
// First get all relevant PO IDs with basic info - Keep MySQL compatible for production // Map status codes to text values
const [[{ total }]] = await prodConnection.query(` const poStatusMap = {
0: 'Canceled',
1: 'Created',
10: 'Ready ESend',
11: 'Ordered',
12: 'Preordered',
13: 'Electronically Sent',
15: 'Receiving Started',
50: 'Done'
};
const receivingStatusMap = {
0: 'Canceled',
1: 'Created',
30: 'Partial Received',
40: 'Full Received',
50: 'Paid'
};
// Get time window for data retrieval
const yearInterval = incrementalUpdate ? 1 : 5;
// Fetch employee data from production
outputProgress({
status: "running",
operation: "Purchase orders import",
message: "Fetching employee data"
});
const [employees] = await prodConnection.query(`
SELECT
employeeid,
firstname,
lastname
FROM employees
`);
// Insert employee data into temp table
if (employees.length > 0) {
const employeeValues = employees.map(emp => [
emp.employeeid,
emp.firstname || '',
emp.lastname || ''
]).flat();
const placeholders = employees.map((_, idx) => {
const base = idx * 3;
return `($${base + 1}, $${base + 2}, $${base + 3})`;
}).join(',');
await localConnection.query(`
INSERT INTO employee_names (employeeid, firstname, lastname)
VALUES ${placeholders}
ON CONFLICT (employeeid) DO UPDATE SET
firstname = EXCLUDED.firstname,
lastname = EXCLUDED.lastname
`, employeeValues);
}
// 1. First, fetch all relevant POs
outputProgress({
status: "running",
operation: "Purchase orders import",
message: "Fetching purchase orders"
});
const [poCount] = await prodConnection.query(`
SELECT COUNT(*) as total SELECT COUNT(*) as total
FROM ( FROM po p
SELECT DISTINCT pop.po_id, pop.pid WHERE p.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL ${yearInterval} YEAR)
FROM po p ${incrementalUpdate ? `
JOIN po_products pop ON p.po_id = pop.po_id AND (
JOIN suppliers s ON p.supplier_id = s.supplierid p.date_updated > ?
WHERE p.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) OR p.date_ordered > ?
${incrementalUpdate ? ` OR p.date_estin > ?
AND ( )
p.date_updated > ? ` : ''}
OR p.date_ordered > ?
OR p.date_estin > ?
)
` : ''}
) all_items
`, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []); `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []);
console.log('Purchase Orders: Found changes:', total); const totalPOs = poCount[0].total;
console.log(`Found ${totalPOs} relevant purchase orders`);
// Get PO list - Keep MySQL compatible for production // Fetch and process POs in batches
console.log('Fetching purchase orders in batches...');
const FETCH_BATCH_SIZE = 5000;
const INSERT_BATCH_SIZE = 200; // Process 200 records at a time for inserts
let offset = 0; let offset = 0;
let allProcessed = false; let allPOsProcessed = false;
let totalProcessed = 0;
while (!allProcessed) { while (!allPOsProcessed) {
console.log(`Fetching batch at offset ${offset}...`);
const [poList] = await prodConnection.query(` const [poList] = await prodConnection.query(`
SELECT DISTINCT SELECT
COALESCE(p.po_id, 0) as po_id, p.po_id,
pop.pid, p.supplier_id,
COALESCE(NULLIF(pr.itemnumber, ''), 'NO-SKU') as sku, s.companyname AS vendor,
COALESCE(pr.description, 'Unknown Product') as name, p.status,
COALESCE(NULLIF(s.companyname, ''), 'Unknown Vendor') as vendor, p.notes AS long_note,
COALESCE(p.date_ordered, p.date_created) as date, p.short_note AS notes,
p.date_estin as expected_date, p.date_created,
COALESCE(p.status, 1) as status, p.date_ordered,
COALESCE(p.short_note, p.notes) as notes, p.date_estin
pop.qty_each as ordered,
pop.cost_each as cost_price
FROM po p FROM po p
JOIN po_products pop ON p.po_id = pop.po_id
JOIN products pr ON pop.pid = pr.pid
LEFT JOIN suppliers s ON p.supplier_id = s.supplierid LEFT JOIN suppliers s ON p.supplier_id = s.supplierid
WHERE p.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) WHERE p.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL ${yearInterval} YEAR)
${incrementalUpdate ? ` ${incrementalUpdate ? `
AND ( AND (
p.date_updated > ? p.date_updated > ?
OR p.date_ordered > ? OR p.date_ordered > ?
OR p.date_estin > ? OR p.date_estin > ?
) )
` : ''} ` : ''}
ORDER BY p.po_id, pop.pid ORDER BY p.po_id
LIMIT ${FETCH_BATCH_SIZE} OFFSET ${offset} LIMIT ${PO_BATCH_SIZE} OFFSET ${offset}
`, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []); `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []);
if (poList.length === 0) { if (poList.length === 0) {
allProcessed = true; allPOsProcessed = true;
break; break;
} }
console.log(`Processing batch of ${poList.length} purchase order items (${offset}-${offset + poList.length})`);
// Process in smaller batches for inserts // Get products for these POs
for (let i = 0; i < poList.length; i += INSERT_BATCH_SIZE) { const poIds = poList.map(po => po.po_id);
const batch = poList.slice(i, Math.min(i + INSERT_BATCH_SIZE, poList.length));
const [poProducts] = await prodConnection.query(`
SELECT
pp.po_id,
pp.pid,
pp.qty_each,
pp.cost_each,
COALESCE(p.itemnumber, 'NO-SKU') AS sku,
COALESCE(p.description, 'Unknown Product') AS name
FROM po_products pp
LEFT JOIN products p ON pp.pid = p.pid
WHERE pp.po_id IN (?)
`, [poIds]);
// Build complete PO records
const completePOs = [];
for (const product of poProducts) {
const po = poList.find(p => p.po_id == product.po_id);
if (!po) continue;
completePOs.push({
po_id: po.po_id.toString(),
pid: product.pid,
sku: product.sku,
name: product.name,
vendor: po.vendor || 'Unknown Vendor',
date: validateDate(po.date_ordered) || validateDate(po.date_created),
expected_date: validateDate(po.date_estin),
status: po.status,
status_text: poStatusMap[po.status] || '',
notes: po.notes || '',
long_note: po.long_note || '',
ordered: product.qty_each,
po_cost_price: product.cost_each,
supplier_id: po.supplier_id,
date_created: validateDate(po.date_created),
date_ordered: validateDate(po.date_ordered)
});
}
// Insert PO data in batches
for (let i = 0; i < completePOs.length; i += INSERT_BATCH_SIZE) {
const batch = completePOs.slice(i, i + INSERT_BATCH_SIZE);
// Create parameterized query with placeholders
const placeholders = batch.map((_, idx) => { const placeholders = batch.map((_, idx) => {
const base = idx * 11; // 11 columns const base = idx * 16;
return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, $${base + 10}, $${base + 11})`; return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, $${base + 10}, $${base + 11}, $${base + 12}, $${base + 13}, $${base + 14}, $${base + 15}, $${base + 16})`;
}).join(','); }).join(',');
// Create flattened values array
const values = batch.flatMap(po => [ const values = batch.flatMap(po => [
po.po_id, po.po_id,
po.pid, po.pid,
@@ -124,16 +311,20 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
po.date, po.date,
po.expected_date, po.expected_date,
po.status, po.status,
po.status_text,
po.notes, po.notes,
po.long_note,
po.ordered, po.ordered,
po.cost_price po.po_cost_price,
po.supplier_id,
po.date_created,
po.date_ordered
]); ]);
// Execute batch insert
await localConnection.query(` await localConnection.query(`
INSERT INTO temp_purchase_orders ( INSERT INTO temp_purchase_orders (
po_id, pid, sku, name, vendor, date, expected_date, po_id, pid, sku, name, vendor, date, expected_date, status, status_text,
status, notes, ordered, cost_price notes, long_note, ordered, po_cost_price, supplier_id, date_created, date_ordered
) )
VALUES ${placeholders} VALUES ${placeholders}
ON CONFLICT (po_id, pid) DO UPDATE SET ON CONFLICT (po_id, pid) DO UPDATE SET
@@ -143,73 +334,531 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
date = EXCLUDED.date, date = EXCLUDED.date,
expected_date = EXCLUDED.expected_date, expected_date = EXCLUDED.expected_date,
status = EXCLUDED.status, status = EXCLUDED.status,
status_text = EXCLUDED.status_text,
notes = EXCLUDED.notes, notes = EXCLUDED.notes,
long_note = EXCLUDED.long_note,
ordered = EXCLUDED.ordered, ordered = EXCLUDED.ordered,
cost_price = EXCLUDED.cost_price po_cost_price = EXCLUDED.po_cost_price,
supplier_id = EXCLUDED.supplier_id,
date_created = EXCLUDED.date_created,
date_ordered = EXCLUDED.date_ordered
`, values); `, values);
}
offset += poList.length;
totalProcessed += completePOs.length;
outputProgress({
status: "running",
operation: "Purchase orders import",
message: `Processed ${offset} of ${totalPOs} purchase orders (${totalProcessed} line items)`,
current: offset,
total: totalPOs,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, offset, totalPOs),
rate: calculateRate(startTime, offset)
});
if (poList.length < PO_BATCH_SIZE) {
allPOsProcessed = true;
}
}
// 2. Next, fetch all relevant receivings
outputProgress({
status: "running",
operation: "Purchase orders import",
message: "Fetching receivings data"
});
const [receivingCount] = await prodConnection.query(`
SELECT COUNT(*) as total
FROM receivings r
WHERE r.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL ${yearInterval} YEAR)
${incrementalUpdate ? `
AND (
r.date_updated > ?
OR r.date_created > ?
)
` : ''}
`, incrementalUpdate ? [lastSyncTime, lastSyncTime] : []);
const totalReceivings = receivingCount[0].total;
console.log(`Found ${totalReceivings} relevant receivings`);
// Fetch and process receivings in batches
offset = 0; // Reset offset for receivings
let allReceivingsProcessed = false;
while (!allReceivingsProcessed) {
const [receivingList] = await prodConnection.query(`
SELECT
r.receiving_id,
r.po_id,
r.supplier_id,
r.status,
r.date_created
FROM receivings r
WHERE r.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL ${yearInterval} YEAR)
${incrementalUpdate ? `
AND (
r.date_updated > ?
OR r.date_created > ?
)
` : ''}
ORDER BY r.receiving_id
LIMIT ${PO_BATCH_SIZE} OFFSET ${offset}
`, incrementalUpdate ? [lastSyncTime, lastSyncTime] : []);
if (receivingList.length === 0) {
allReceivingsProcessed = true;
break;
}
// Get products for these receivings
const receivingIds = receivingList.map(r => r.receiving_id);
const [receivingProducts] = await prodConnection.query(`
SELECT
rp.receiving_id,
rp.pid,
rp.qty_each,
rp.cost_each,
rp.received_by,
rp.received_date,
r.date_created as receiving_created_date
FROM receivings_products rp
JOIN receivings r ON rp.receiving_id = r.receiving_id
WHERE rp.receiving_id IN (?)
`, [receivingIds]);
// Build complete receiving records
const completeReceivings = [];
for (const product of receivingProducts) {
const receiving = receivingList.find(r => r.receiving_id == product.receiving_id);
if (!receiving) continue;
totalProcessed += batch.length; completeReceivings.push({
receiving_id: receiving.receiving_id.toString(),
outputProgress({ po_id: receiving.po_id ? receiving.po_id.toString() : null,
status: "running", pid: product.pid,
operation: "Purchase orders import", qty_each: product.qty_each,
message: `Processed ${totalProcessed}/${total} purchase order items`, cost_each: product.cost_each,
current: totalProcessed, received_by: product.received_by,
total: total, received_date: validateDate(product.received_date) || validateDate(product.receiving_created_date),
elapsed: formatElapsedTime((Date.now() - startTime) / 1000), receiving_created_date: validateDate(product.receiving_created_date),
remaining: estimateRemaining(startTime, totalProcessed, total), supplier_id: receiving.supplier_id,
rate: calculateRate(startTime, totalProcessed) status: receiving.status,
status_text: receivingStatusMap[receiving.status] || '',
receiving_created_date: validateDate(product.receiving_created_date)
}); });
} }
// Update offset for next batch // Insert receiving data in batches
offset += poList.length; for (let i = 0; i < completeReceivings.length; i += INSERT_BATCH_SIZE) {
const batch = completeReceivings.slice(i, i + INSERT_BATCH_SIZE);
const placeholders = batch.map((_, idx) => {
const base = idx * 11;
return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, $${base + 10}, $${base + 11})`;
}).join(',');
const values = batch.flatMap(r => [
r.receiving_id,
r.po_id,
r.pid,
r.qty_each,
r.cost_each,
r.received_by,
r.received_date,
r.receiving_created_date,
r.supplier_id,
r.status,
r.status_text
]);
await localConnection.query(`
INSERT INTO temp_receivings (
receiving_id, po_id, pid, qty_each, cost_each, received_by,
received_date, receiving_created_date, supplier_id, status, status_text
)
VALUES ${placeholders}
ON CONFLICT (receiving_id, pid) DO UPDATE SET
po_id = EXCLUDED.po_id,
qty_each = EXCLUDED.qty_each,
cost_each = EXCLUDED.cost_each,
received_by = EXCLUDED.received_by,
received_date = EXCLUDED.received_date,
receiving_created_date = EXCLUDED.receiving_created_date,
supplier_id = EXCLUDED.supplier_id,
status = EXCLUDED.status,
status_text = EXCLUDED.status_text
`, values);
}
// Check if we've received fewer records than the batch size, meaning we're done offset += receivingList.length;
if (poList.length < FETCH_BATCH_SIZE) { totalProcessed += completeReceivings.length;
allProcessed = true;
outputProgress({
status: "running",
operation: "Purchase orders import",
message: `Processed ${offset} of ${totalReceivings} receivings (${totalProcessed} line items total)`,
current: offset,
total: totalReceivings,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, offset, totalReceivings),
rate: calculateRate(startTime, offset)
});
if (receivingList.length < PO_BATCH_SIZE) {
allReceivingsProcessed = true;
} }
} }
// Count the temp table contents // 3. Implement FIFO allocation of receivings to purchase orders
const [tempCount] = await localConnection.query(`SELECT COUNT(*) FROM temp_purchase_orders`); outputProgress({
const tempRowCount = parseInt(tempCount.rows[0].count); status: "running",
console.log(`Successfully inserted ${tempRowCount} rows into temp_purchase_orders`); operation: "Purchase orders import",
message: "Allocating receivings to purchase orders using FIFO"
// Now insert into the final table });
const [result] = await localConnection.query(`
WITH inserted_pos AS ( // Step 1: Handle receivings with matching PO IDs (direct allocation)
INSERT INTO purchase_orders ( await localConnection.query(`
po_id, pid, sku, name, cost_price, po_cost_price, INSERT INTO temp_receiving_allocations (
vendor, date, expected_date, status, notes, po_id, pid, receiving_id, allocated_qty, cost_each, received_date, received_by
ordered, received, receiving_status )
) SELECT
r.po_id,
r.pid,
r.receiving_id,
LEAST(r.qty_each, po.ordered) as allocated_qty,
r.cost_each,
COALESCE(r.received_date, NOW()) as received_date,
r.received_by
FROM temp_receivings r
JOIN temp_purchase_orders po ON r.po_id = po.po_id AND r.pid = po.pid
WHERE r.po_id IS NOT NULL
`);
// Step 2: Handle receivings without a matching PO (standalone receivings)
// Create a PO entry for each standalone receiving
await localConnection.query(`
INSERT INTO temp_purchase_orders (
po_id, pid, sku, name, vendor, date, status, status_text,
ordered, po_cost_price, supplier_id, date_created, date_ordered
)
SELECT
'R' || r.receiving_id as po_id,
r.pid,
COALESCE(p.sku, 'NO-SKU') as sku,
COALESCE(p.name, 'Unknown Product') as name,
COALESCE(
(SELECT vendor FROM temp_purchase_orders
WHERE supplier_id = r.supplier_id LIMIT 1),
'Unknown Vendor'
) as vendor,
COALESCE(r.received_date, r.receiving_created_date) as date,
NULL as status,
NULL as status_text,
NULL as ordered,
r.cost_each as po_cost_price,
r.supplier_id,
COALESCE(r.receiving_created_date, r.received_date) as date_created,
NULL as date_ordered
FROM temp_receivings r
LEFT JOIN (
SELECT DISTINCT pid, sku, name FROM temp_purchase_orders
) p ON r.pid = p.pid
WHERE r.po_id IS NULL
OR NOT EXISTS (
SELECT 1 FROM temp_purchase_orders po
WHERE po.po_id = r.po_id AND po.pid = r.pid
)
ON CONFLICT (po_id, pid) DO NOTHING
`);
// Now allocate these standalone receivings to their "virtual" POs
await localConnection.query(`
INSERT INTO temp_receiving_allocations (
po_id, pid, receiving_id, allocated_qty, cost_each, received_date, received_by
)
SELECT
'R' || r.receiving_id as po_id,
r.pid,
r.receiving_id,
r.qty_each as allocated_qty,
r.cost_each,
COALESCE(r.received_date, NOW()) as received_date,
r.received_by
FROM temp_receivings r
WHERE r.po_id IS NULL
OR NOT EXISTS (
SELECT 1 FROM temp_purchase_orders po
WHERE po.po_id = r.po_id AND po.pid = r.pid
)
`);
// Step 3: Handle unallocated receivings vs. unfulfilled orders
// This is the complex FIFO allocation logic
await localConnection.query(`
WITH
-- Calculate remaining quantities after direct allocations
remaining_po_quantities AS (
SELECT SELECT
po_id, pid, sku, name, cost_price, cost_price, po.po_id,
vendor, date, expected_date, status, notes, po.pid,
ordered, 0 as received, 1 as receiving_status po.ordered,
FROM temp_purchase_orders COALESCE(SUM(ra.allocated_qty), 0) as already_allocated,
po.ordered - COALESCE(SUM(ra.allocated_qty), 0) as remaining_qty,
po.date_ordered,
po.date_created
FROM temp_purchase_orders po
LEFT JOIN temp_receiving_allocations ra ON po.po_id = ra.po_id AND po.pid = ra.pid
WHERE po.ordered IS NOT NULL
GROUP BY po.po_id, po.pid, po.ordered, po.date_ordered, po.date_created
HAVING po.ordered > COALESCE(SUM(ra.allocated_qty), 0)
),
remaining_receiving_quantities AS (
SELECT
r.receiving_id,
r.pid,
r.qty_each,
COALESCE(SUM(ra.allocated_qty), 0) as already_allocated,
r.qty_each - COALESCE(SUM(ra.allocated_qty), 0) as remaining_qty,
r.received_date,
r.cost_each,
r.received_by
FROM temp_receivings r
LEFT JOIN temp_receiving_allocations ra ON r.receiving_id = ra.receiving_id AND r.pid = ra.pid
GROUP BY r.receiving_id, r.pid, r.qty_each, r.received_date, r.cost_each, r.received_by
HAVING r.qty_each > COALESCE(SUM(ra.allocated_qty), 0)
),
-- Rank POs by age, with a cutoff for very old POs (1 year)
ranked_pos AS (
SELECT
po.po_id,
po.pid,
po.remaining_qty,
CASE
WHEN po.date_ordered IS NULL OR po.date_ordered < NOW() - INTERVAL '1 year' THEN 2
ELSE 1
END as age_group,
ROW_NUMBER() OVER (
PARTITION BY po.pid, (CASE WHEN po.date_ordered IS NULL OR po.date_ordered < NOW() - INTERVAL '1 year' THEN 2 ELSE 1 END)
ORDER BY COALESCE(po.date_ordered, po.date_created, NOW())
) as rank_in_group
FROM remaining_po_quantities po
),
-- Rank receivings by date
ranked_receivings AS (
SELECT
r.receiving_id,
r.pid,
r.remaining_qty,
r.received_date,
r.cost_each,
r.received_by,
ROW_NUMBER() OVER (PARTITION BY r.pid ORDER BY COALESCE(r.received_date, NOW())) as rank
FROM remaining_receiving_quantities r
),
-- First allocate to recent POs
allocations_recent AS (
SELECT
po.po_id,
po.pid,
r.receiving_id,
LEAST(po.remaining_qty, r.remaining_qty) as allocated_qty,
r.cost_each,
COALESCE(r.received_date, NOW()) as received_date,
r.received_by,
po.age_group,
po.rank_in_group,
r.rank,
'recent' as allocation_type
FROM ranked_pos po
JOIN ranked_receivings r ON po.pid = r.pid
WHERE po.age_group = 1
ORDER BY po.pid, po.rank_in_group, r.rank
),
-- Then allocate to older POs
remaining_after_recent AS (
SELECT
r.receiving_id,
r.pid,
r.remaining_qty - COALESCE(SUM(a.allocated_qty), 0) as remaining_qty,
r.received_date,
r.cost_each,
r.received_by,
r.rank
FROM ranked_receivings r
LEFT JOIN allocations_recent a ON r.receiving_id = a.receiving_id AND r.pid = a.pid
GROUP BY r.receiving_id, r.pid, r.remaining_qty, r.received_date, r.cost_each, r.received_by, r.rank
HAVING r.remaining_qty > COALESCE(SUM(a.allocated_qty), 0)
),
allocations_old AS (
SELECT
po.po_id,
po.pid,
r.receiving_id,
LEAST(po.remaining_qty, r.remaining_qty) as allocated_qty,
r.cost_each,
COALESCE(r.received_date, NOW()) as received_date,
r.received_by,
po.age_group,
po.rank_in_group,
r.rank,
'old' as allocation_type
FROM ranked_pos po
JOIN remaining_after_recent r ON po.pid = r.pid
WHERE po.age_group = 2
ORDER BY po.pid, po.rank_in_group, r.rank
),
-- Combine allocations
combined_allocations AS (
SELECT * FROM allocations_recent
UNION ALL
SELECT * FROM allocations_old
)
-- Insert into allocations table
INSERT INTO temp_receiving_allocations (
po_id, pid, receiving_id, allocated_qty, cost_each, received_date, received_by
)
SELECT
po_id, pid, receiving_id, allocated_qty, cost_each,
COALESCE(received_date, NOW()) as received_date,
received_by
FROM combined_allocations
WHERE allocated_qty > 0
`);
// 4. Generate final purchase order records with receiving data
outputProgress({
status: "running",
operation: "Purchase orders import",
message: "Generating final purchase order records"
});
const [finalResult] = await localConnection.query(`
WITH
receiving_summaries AS (
SELECT
po_id,
pid,
SUM(allocated_qty) as total_received,
JSONB_AGG(
JSONB_BUILD_OBJECT(
'receiving_id', receiving_id,
'qty', allocated_qty,
'date', COALESCE(received_date, NOW()),
'cost', cost_each,
'received_by', received_by,
'received_by_name', CASE
WHEN received_by IS NOT NULL AND received_by > 0 THEN
(SELECT CONCAT(firstname, ' ', lastname)
FROM employee_names
WHERE employeeid = received_by)
ELSE NULL
END
) ORDER BY COALESCE(received_date, NOW())
) as receiving_history,
MIN(COALESCE(received_date, NOW())) as first_received_date,
MAX(COALESCE(received_date, NOW())) as last_received_date,
STRING_AGG(
DISTINCT CASE WHEN received_by IS NOT NULL AND received_by > 0
THEN CAST(received_by AS TEXT)
ELSE NULL
END,
','
) as received_by_list,
STRING_AGG(
DISTINCT CASE
WHEN ra.received_by IS NOT NULL AND ra.received_by > 0 THEN
(SELECT CONCAT(firstname, ' ', lastname)
FROM employee_names
WHERE employeeid = ra.received_by)
ELSE NULL
END,
', '
) as received_by_names
FROM temp_receiving_allocations ra
GROUP BY po_id, pid
),
cost_averaging AS (
SELECT
ra.po_id,
ra.pid,
SUM(ra.allocated_qty * ra.cost_each) / NULLIF(SUM(ra.allocated_qty), 0) as avg_cost
FROM temp_receiving_allocations ra
GROUP BY ra.po_id, ra.pid
)
INSERT INTO purchase_orders (
po_id, vendor, date, expected_date, pid, sku, name,
cost_price, po_cost_price, status, receiving_status, notes, long_note,
ordered, received, received_date, last_received_date, received_by,
receiving_history
)
SELECT
po.po_id,
po.vendor,
CASE
WHEN po.date IS NOT NULL THEN po.date
-- For standalone receivings, try to use the receiving date from history
WHEN po.po_id LIKE 'R%' AND rs.first_received_date IS NOT NULL THEN rs.first_received_date
-- As a last resort for data integrity, use Unix epoch (Jan 1, 1970)
ELSE to_timestamp(0)
END as date,
NULLIF(po.expected_date::text, '0000-00-00')::date as expected_date,
po.pid,
po.sku,
po.name,
COALESCE(ca.avg_cost, po.po_cost_price) as cost_price,
po.po_cost_price,
CASE WHEN po.status IS NULL THEN 1 ELSE po.status END as status,
CASE
WHEN rs.total_received IS NULL THEN 1
WHEN rs.total_received = 0 THEN 1
WHEN rs.total_received < po.ordered THEN 30
WHEN rs.total_received >= po.ordered THEN 40
ELSE 1
END as receiving_status,
po.notes,
po.long_note,
COALESCE(po.ordered, 0),
COALESCE(rs.total_received, 0),
NULLIF(rs.first_received_date::text, '0000-00-00 00:00:00')::timestamp with time zone as received_date,
NULLIF(rs.last_received_date::text, '0000-00-00 00:00:00')::timestamp with time zone as last_received_date,
CASE
WHEN rs.received_by_list IS NULL THEN NULL
ELSE rs.received_by_names
END as received_by,
rs.receiving_history
FROM temp_purchase_orders po
LEFT JOIN receiving_summaries rs ON po.po_id = rs.po_id AND po.pid = rs.pid
LEFT JOIN cost_averaging ca ON po.po_id = ca.po_id AND po.pid = ca.pid
ON CONFLICT (po_id, pid) DO UPDATE SET ON CONFLICT (po_id, pid) DO UPDATE SET
vendor = EXCLUDED.vendor, vendor = EXCLUDED.vendor,
date = EXCLUDED.date, date = EXCLUDED.date,
expected_date = EXCLUDED.expected_date, expected_date = EXCLUDED.expected_date,
sku = EXCLUDED.sku,
name = EXCLUDED.name,
cost_price = EXCLUDED.cost_price,
po_cost_price = EXCLUDED.po_cost_price,
status = EXCLUDED.status, status = EXCLUDED.status,
receiving_status = EXCLUDED.receiving_status,
notes = EXCLUDED.notes, notes = EXCLUDED.notes,
long_note = EXCLUDED.long_note,
ordered = EXCLUDED.ordered, ordered = EXCLUDED.ordered,
cost_price = EXCLUDED.cost_price, received = EXCLUDED.received,
po_cost_price = EXCLUDED.po_cost_price received_date = EXCLUDED.received_date,
RETURNING xmax = 0 as inserted last_received_date = EXCLUDED.last_received_date,
) received_by = EXCLUDED.received_by,
SELECT receiving_history = EXCLUDED.receiving_history,
COUNT(*) FILTER (WHERE inserted) as inserted, updated = CURRENT_TIMESTAMP
COUNT(*) FILTER (WHERE NOT inserted) as updated RETURNING (xmax = 0) as inserted
FROM inserted_pos
`); `);
// Parse the result recordsAdded = finalResult.rows.filter(r => r.inserted).length;
const { inserted, updated } = result.rows[0]; recordsUpdated = finalResult.rows.filter(r => !r.inserted).length;
recordsAdded = parseInt(inserted) || 0;
recordsUpdated = parseInt(updated) || 0;
// Update sync status // Update sync status
await localConnection.query(` await localConnection.query(`
@@ -220,7 +869,12 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
`); `);
// Clean up temporary tables // Clean up temporary tables
await localConnection.query(`DROP TABLE IF EXISTS temp_purchase_orders;`); await localConnection.query(`
DROP TABLE IF EXISTS temp_purchase_orders;
DROP TABLE IF EXISTS temp_receivings;
DROP TABLE IF EXISTS temp_receiving_allocations;
DROP TABLE IF EXISTS employee_names;
`);
// Commit transaction // Commit transaction
await localConnection.commit(); await localConnection.commit();