From 5dd779cb4a8da72cb69876dbe9004d33ad93db87 Mon Sep 17 00:00:00 2001 From: Matt Date: Tue, 25 Mar 2025 19:12:41 -0400 Subject: [PATCH] Fix purchase orders import --- docs/import-from-prod-data-mapping.md | 342 +++++++ inventory-server/db/schema.sql | 11 +- inventory-server/scripts/import/categories.js | 8 + inventory-server/scripts/import/products.js | 12 + .../scripts/import/purchase-orders.js | 878 +++++++++++++++--- 5 files changed, 1136 insertions(+), 115 deletions(-) create mode 100644 docs/import-from-prod-data-mapping.md diff --git a/docs/import-from-prod-data-mapping.md b/docs/import-from-prod-data-mapping.md new file mode 100644 index 0000000..53d6cb4 --- /dev/null +++ b/docs/import-from-prod-data-mapping.md @@ -0,0 +1,342 @@ +# MySQL to PostgreSQL Import Process Documentation + +This document outlines the data import process from the production MySQL database to the local PostgreSQL database, focusing on column mappings, data transformations, and the overall import architecture. + +## Table of Contents +1. [Overview](#overview) +2. [Import Architecture](#import-architecture) +3. [Column Mappings](#column-mappings) + - [Categories](#categories) + - [Products](#products) + - [Product Categories (Relationship)](#product-categories-relationship) + - [Orders](#orders) + - [Purchase Orders](#purchase-orders) + - [Metadata Tables](#metadata-tables) +4. [Special Calculations](#special-calculations) +5. [Implementation Notes](#implementation-notes) + +## Overview + +The import process extracts data from a MySQL 5.7 production database and imports it into a PostgreSQL database. It can operate in two modes: + +- **Full Import**: Imports all data regardless of last sync time +- **Incremental Import**: Only imports data that has changed since the last import + +The process handles four main data types: +- Categories (product categorization hierarchy) +- Products (inventory items) +- Orders (sales records) +- Purchase Orders (vendor orders) + +## Import Architecture + +The import process follows these steps: + +1. **Establish Connection**: Creates a SSH tunnel to the production server and establishes database connections +2. **Setup Import History**: Creates a record of the current import operation +3. **Import Categories**: Processes product categories in hierarchical order +4. **Import Products**: Processes products with their attributes and category relationships +5. **Import Orders**: Processes customer orders with line items, taxes, and discounts +6. **Import Purchase Orders**: Processes vendor purchase orders with line items +7. **Record Results**: Updates the import history with results +8. **Close Connections**: Cleans up connections and resources + +Each import step uses temporary tables for processing and wraps operations in transactions to ensure data consistency. + +## Column Mappings + +### Categories +| PostgreSQL Column | MySQL Source | Transformation | +|-------------------|---------------------------------|----------------------------------------------| +| cat_id | product_categories.cat_id | Direct mapping | +| name | product_categories.name | Direct mapping | +| type | product_categories.type | Direct mapping | +| parent_id | product_categories.master_cat_id| NULL for top-level categories (types 10, 20) | +| description | product_categories.combined_name| Direct mapping | +| status | N/A | Hard-coded 'active' | +| created_at | N/A | Current timestamp | +| updated_at | N/A | Current timestamp | + +**Notes:** +- Categories are processed in hierarchical order by type: [10, 20, 11, 21, 12, 13] +- Type 10/20 are top-level categories with no parent +- Types 11/21/12/13 are child categories that reference parent categories + +### Products +| PostgreSQL Column | MySQL Source | Transformation | +|----------------------|----------------------------------|---------------------------------------------------------------| +| pid | products.pid | Direct mapping | +| title | products.description | Direct mapping | +| description | products.notes | Direct mapping | +| sku | products.itemnumber | Fallback to 'NO-SKU' if empty | +| stock_quantity | shop_inventory.available_local | Capped at 5000, minimum 0 | +| preorder_count | current_inventory.onpreorder | Default 0 | +| notions_inv_count | product_notions_b2b.inventory | Default 0 | +| price | product_current_prices.price_each| Default 0, filtered on active=1 | +| regular_price | products.sellingprice | Default 0 | +| cost_price | product_inventory | Weighted average: SUM(costeach * count) / SUM(count) when count > 0, or latest costeach | +| vendor | suppliers.companyname | Via supplier_item_data.supplier_id | +| vendor_reference | supplier_item_data | supplier_itemnumber or notions_itemnumber based on vendor | +| notions_reference | supplier_item_data.notions_itemnumber | Direct mapping | +| brand | product_categories.name | Linked via products.company | +| line | product_categories.name | Linked via products.line | +| subline | product_categories.name | Linked via products.subline | +| artist | product_categories.name | Linked via products.artist | +| categories | product_category_index | Comma-separated list of category IDs | +| created_at | products.date_created | Validated date, NULL if invalid | +| first_received | products.datein | Validated date, NULL if invalid | +| landing_cost_price | NULL | Not set | +| barcode | products.upc | Direct mapping | +| harmonized_tariff_code| products.harmonized_tariff_code | Direct mapping | +| updated_at | products.stamp | Validated date, NULL if invalid | +| visible | shop_inventory | Calculated from show + buyable > 0 | +| managing_stock | N/A | Hard-coded true | +| replenishable | Multiple fields | Complex calculation based on reorder, dates, etc. | +| permalink | N/A | Constructed URL with product ID | +| moq | supplier_item_data | notions_qty_per_unit or supplier_qty_per_unit, minimum 1 | +| uom | N/A | Hard-coded 1 | +| rating | products.rating | Direct mapping | +| reviews | products.rating_votes | Direct mapping | +| weight | products.weight | Direct mapping | +| length | products.length | Direct mapping | +| width | products.width | Direct mapping | +| height | products.height | Direct mapping | +| country_of_origin | products.country_of_origin | Direct mapping | +| location | products.location | Direct mapping | +| total_sold | order_items | SUM(qty_ordered) for all order_items where prod_pid = pid | +| baskets | mybasket | COUNT of records where mb.item = pid and qty > 0 | +| notifies | product_notify | COUNT of records where pn.pid = pid | +| date_last_sold | product_last_sold.date_sold | Validated date, NULL if invalid | +| image | N/A | Constructed from pid and image URL pattern | +| image_175 | N/A | Constructed from pid and image URL pattern | +| image_full | N/A | Constructed from pid and image URL pattern | +| options | NULL | Not set | +| tags | NULL | Not set | + +**Notes:** +- Replenishable calculation: + ```javascript + CASE + WHEN p.reorder < 0 THEN 0 + WHEN ( + (COALESCE(pls.date_sold, '0000-00-00') = '0000-00-00' OR pls.date_sold <= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR)) + AND (p.datein = '0000-00-00 00:00:00' OR p.datein <= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL 5 YEAR)) + AND (p.date_refill = '0000-00-00 00:00:00' OR p.date_refill <= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL 5 YEAR)) + ) THEN 0 + ELSE 1 + END + ``` + +In business terms, a product is considered NOT replenishable only if: +- It was manually flagged as not replenishable (negative reorder value) +- OR it shows no activity across ALL metrics (no sales AND no receipts AND no refills in the past 5 years) +- Image URLs are constructed using this pattern: + ```javascript + const paddedPid = pid.toString().padStart(6, '0'); + const prefix = paddedPid.slice(0, 3); + const basePath = `${imageUrlBase}${prefix}/${pid}`; + return { + image: `${basePath}-t-${iid}.jpg`, + image_175: `${basePath}-175x175-${iid}.jpg`, + image_full: `${basePath}-o-${iid}.jpg` + }; + ``` + +### Product Categories (Relationship) + +| PostgreSQL Column | MySQL Source | Transformation | +|-------------------|-----------------------------------|---------------------------------------------------------------| +| pid | products.pid | Direct mapping | +| cat_id | product_category_index.cat_id | Direct mapping, filtered by category types | + +**Notes:** +- Only categories of types 10, 20, 11, 21, 12, 13 are imported +- Categories 16 and 17 are explicitly excluded + +### Orders + +| PostgreSQL Column | MySQL Source | Transformation | +|-------------------|-----------------------------------|---------------------------------------------------------------| +| order_number | order_items.order_id | Direct mapping | +| pid | order_items.prod_pid | Direct mapping | +| sku | order_items.prod_itemnumber | Fallback to 'NO-SKU' if empty | +| date | _order.date_placed_onlydate | Via join to _order table | +| price | order_items.prod_price | Direct mapping | +| quantity | order_items.qty_ordered | Direct mapping | +| discount | Multiple sources | Complex calculation (see notes) | +| tax | order_tax_info_products.item_taxes_to_collect | Via latest order_tax_info record | +| tax_included | N/A | Hard-coded false | +| shipping | N/A | Hard-coded 0 | +| customer | _order.order_cid | Direct mapping | +| customer_name | users | CONCAT(users.firstname, ' ', users.lastname) | +| status | _order.order_status | Direct mapping | +| canceled | _order.date_cancelled | Boolean: true if date_cancelled is not '0000-00-00 00:00:00' | +| costeach | order_costs | From latest record or fallback to price * 0.5 | + +**Notes:** +- Only orders with order_status >= 15 and with a valid date_placed are processed +- For incremental imports, only orders modified since last sync are processed +- Discount calculation combines three sources: + 1. Base discount: order_items.prod_price_reg - order_items.prod_price + 2. Promo discount: SUM of order_discount_items.amount + 3. Proportional order discount: Calculation based on order subtotal proportion + ```javascript + (oi.base_discount + + COALESCE(ot.promo_discount, 0) + + CASE + WHEN om.summary_discount > 0 AND om.summary_subtotal > 0 THEN + ROUND((om.summary_discount * (oi.price * oi.quantity)) / NULLIF(om.summary_subtotal, 0), 2) + ELSE 0 + END)::DECIMAL(10,2) + ``` +- Taxes are taken from the latest tax record for an order +- Cost data is taken from the latest non-pending cost record + +### Purchase Orders + +| PostgreSQL Column | MySQL Source | Transformation | +|-------------------|-----------------------------------|---------------------------------------------------------------| +| po_id | po.po_id | Default 0 if NULL | +| pid | po_products.pid | Direct mapping | +| sku | products.itemnumber | Fallback to 'NO-SKU' if empty | +| name | products.description | Fallback to 'Unknown Product' | +| cost_price | po_products.cost_each | Direct mapping | +| po_cost_price | po_products.cost_each | Duplicate of cost_price | +| vendor | suppliers.companyname | Fallback to 'Unknown Vendor' if empty | +| date | po.date_ordered | Fallback to po.date_created if NULL | +| expected_date | po.date_estin | Direct mapping | +| status | po.status | Default 1 if NULL | +| notes | po.short_note | Fallback to po.notes if NULL | +| ordered | po_products.qty_each | Direct mapping | +| received | N/A | Hard-coded 0 | +| receiving_status | N/A | Hard-coded 1 | + +**Notes:** +- Only POs created within last 1 year (incremental) or 5 years (full) are processed +- For incremental imports, only POs modified since last sync are processed + +### Metadata Tables + +#### import_history + +| PostgreSQL Column | Source | Notes | +|-------------------|-----------------------------------|---------------------------------------------------------------| +| id | Auto-increment | Primary key | +| table_name | Code | 'all_tables' for overall import | +| start_time | NOW() | Import start time | +| end_time | NOW() | Import completion time | +| duration_seconds | Calculation | Elapsed seconds | +| is_incremental | INCREMENTAL_UPDATE | Flag from config | +| records_added | Calculation | Sum from all imports | +| records_updated | Calculation | Sum from all imports | +| status | Code | 'running', 'completed', 'failed', or 'cancelled' | +| error_message | Exception | Error message if failed | +| additional_info | JSON | Configuration and results | + +#### sync_status + +| PostgreSQL Column | Source | Notes | +|----------------------|--------------------------------|---------------------------------------------------------------| +| table_name | Code | Name of imported table | +| last_sync_timestamp | NOW() | Timestamp of successful sync | +| last_sync_id | NULL | Not used currently | + +## Special Calculations + +### Date Validation + +MySQL dates are validated before insertion into PostgreSQL: + +```javascript +function validateDate(mysqlDate) { + if (!mysqlDate || mysqlDate === '0000-00-00' || mysqlDate === '0000-00-00 00:00:00') { + return null; + } + // Check if the date is valid + const date = new Date(mysqlDate); + return isNaN(date.getTime()) ? null : mysqlDate; +} +``` + +### Retry Mechanism + +Operations that might fail temporarily are retried with exponential backoff: + +```javascript +async function withRetry(operation, errorMessage) { + let lastError; + for (let attempt = 1; attempt <= MAX_RETRIES; attempt++) { + try { + return await operation(); + } catch (error) { + lastError = error; + console.error(`${errorMessage} (Attempt ${attempt}/${MAX_RETRIES}):`, error); + if (attempt < MAX_RETRIES) { + const backoffTime = RETRY_DELAY * Math.pow(2, attempt - 1); + await new Promise(resolve => setTimeout(resolve, backoffTime)); + } + } + } + throw lastError; +} +``` + +### Progress Tracking + +Progress is tracked with estimated time remaining: + +```javascript +function estimateRemaining(startTime, current, total) { + if (current === 0) return "Calculating..."; + const elapsedSeconds = (Date.now() - startTime) / 1000; + const itemsPerSecond = current / elapsedSeconds; + const remainingItems = total - current; + const remainingSeconds = remainingItems / itemsPerSecond; + return formatElapsedTime(remainingSeconds); +} +``` + +## Implementation Notes + +### Transaction Management + +All imports use transactions to ensure data consistency: + +- **Categories**: Uses savepoints for each category type +- **Products**: Uses a single transaction for the entire import +- **Orders**: Uses a single transaction with temporary tables +- **Purchase Orders**: Uses a single transaction with temporary tables + +### Memory Usage Optimization + +To minimize memory usage when processing large datasets: + +1. Data is processed in batches (100-5000 records per batch) +2. Temporary tables are used for intermediate data +3. Some queries use cursors to avoid loading all results at once + +### MySQL vs PostgreSQL Compatibility + +The scripts handle differences between MySQL and PostgreSQL: + +1. MySQL-specific syntax like `USE INDEX` is removed for PostgreSQL +2. `GROUP_CONCAT` in MySQL becomes string operations in PostgreSQL +3. Transaction syntax differences are abstracted in the connection wrapper +4. PostgreSQL's `ON CONFLICT` replaces MySQL's `ON DUPLICATE KEY UPDATE` + +### SSH Tunnel + +Database connections go through an SSH tunnel for security: + +```javascript +ssh.forwardOut( + "127.0.0.1", + 0, + sshConfig.prodDbConfig.host, + sshConfig.prodDbConfig.port, + async (err, stream) => { + if (err) reject(err); + resolve({ ssh, stream }); + } +); +``` \ No newline at end of file diff --git a/inventory-server/db/schema.sql b/inventory-server/db/schema.sql index a01b13f..65b7c24 100644 --- a/inventory-server/db/schema.sql +++ b/inventory-server/db/schema.sql @@ -4,7 +4,12 @@ SET session_replication_role = 'replica'; -- Disable foreign key checks tempora -- Create function for updating timestamps CREATE OR REPLACE FUNCTION update_updated_column() RETURNS TRIGGER AS $func$ BEGIN - NEW.updated = CURRENT_TIMESTAMP; + -- Check which table is being updated and use the appropriate column + IF TG_TABLE_NAME = 'categories' THEN + NEW.updated_at = CURRENT_TIMESTAMP; + ELSE + NEW.updated = CURRENT_TIMESTAMP; + END IF; RETURN NEW; END; $func$ language plpgsql; @@ -160,7 +165,7 @@ CREATE TABLE purchase_orders ( expected_date DATE, pid BIGINT NOT NULL, sku VARCHAR(50) NOT NULL, - name VARCHAR(100) NOT NULL, + name VARCHAR(255) NOT NULL, cost_price DECIMAL(10, 3) NOT NULL, po_cost_price DECIMAL(10, 3) NOT NULL, status SMALLINT DEFAULT 1, @@ -171,7 +176,7 @@ CREATE TABLE purchase_orders ( received INTEGER DEFAULT 0, received_date DATE, last_received_date DATE, - received_by VARCHAR(100), + received_by VARCHAR, receiving_history JSONB, updated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (pid) REFERENCES products(pid), diff --git a/inventory-server/scripts/import/categories.js b/inventory-server/scripts/import/categories.js index 41df041..9c1741d 100644 --- a/inventory-server/scripts/import/categories.js +++ b/inventory-server/scripts/import/categories.js @@ -142,6 +142,14 @@ async function importCategories(prodConnection, localConnection) { // Commit the entire transaction - we'll do this even if we have skipped categories await localConnection.query('COMMIT'); + // Update sync status + await localConnection.query(` + INSERT INTO sync_status (table_name, last_sync_timestamp) + VALUES ('categories', NOW()) + ON CONFLICT (table_name) DO UPDATE SET + last_sync_timestamp = NOW() + `); + outputProgress({ status: "complete", operation: "Categories import completed", diff --git a/inventory-server/scripts/import/products.js b/inventory-server/scripts/import/products.js index 60762ef..9267764 100644 --- a/inventory-server/scripts/import/products.js +++ b/inventory-server/scripts/import/products.js @@ -144,6 +144,8 @@ async function importMissingProducts(prodConnection, localConnection, missingPid CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible, CASE WHEN p.reorder < 0 THEN 0 + WHEN p.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL 1 YEAR) THEN 1 + WHEN COALESCE(pnb.inventory, 0) > 0 THEN 1 WHEN ( (COALESCE(pls.date_sold, '0000-00-00') = '0000-00-00' OR pls.date_sold <= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR)) AND (p.datein = '0000-00-00 00:00:00' OR p.datein <= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL 5 YEAR)) @@ -343,6 +345,8 @@ async function materializeCalculations(prodConnection, localConnection, incremen CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible, CASE WHEN p.reorder < 0 THEN 0 + WHEN p.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL 1 YEAR) THEN 1 + WHEN COALESCE(pnb.inventory, 0) > 0 THEN 1 WHEN ( (COALESCE(pls.date_sold, '0000-00-00') = '0000-00-00' OR pls.date_sold <= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR)) AND (p.datein = '0000-00-00 00:00:00' OR p.datein <= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL 5 YEAR)) @@ -840,6 +844,14 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate // Commit the transaction await localConnection.commit(); + // Update sync status + await localConnection.query(` + INSERT INTO sync_status (table_name, last_sync_timestamp) + VALUES ('products', NOW()) + ON CONFLICT (table_name) DO UPDATE SET + last_sync_timestamp = NOW() + `); + return { status: 'complete', recordsAdded, diff --git a/inventory-server/scripts/import/purchase-orders.js b/inventory-server/scripts/import/purchase-orders.js index 4457ed5..a4943c7 100644 --- a/inventory-server/scripts/import/purchase-orders.js +++ b/inventory-server/scripts/import/purchase-orders.js @@ -1,9 +1,56 @@ const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress'); +/** + * Validates a date from MySQL before inserting it into PostgreSQL + * @param {string|Date|null} mysqlDate - Date string or object from MySQL + * @returns {string|null} Valid date string or null if invalid + */ +function validateDate(mysqlDate) { + // Handle null, undefined, or empty values + if (!mysqlDate) { + return null; + } + + // Convert to string if it's not already + const dateStr = String(mysqlDate); + + // Handle MySQL zero dates and empty values + if (dateStr === '0000-00-00' || + dateStr === '0000-00-00 00:00:00' || + dateStr.indexOf('0000-00-00') !== -1 || + dateStr === '') { + return null; + } + + // Check if the date is valid + const date = new Date(mysqlDate); + + // If the date is invalid or suspiciously old (pre-1970), return null + if (isNaN(date.getTime()) || date.getFullYear() < 1970) { + return null; + } + + return mysqlDate; +} + +/** + * Imports purchase orders and receivings from a production MySQL database to a local PostgreSQL database. + * Implements FIFO allocation of receivings to purchase orders. + * + * @param {object} prodConnection - A MySQL connection to production DB + * @param {object} localConnection - A PostgreSQL connection to local DB + * @param {boolean} incrementalUpdate - Set to false for a full sync; true for incremental + * @returns {object} Information about the sync operation + */ async function importPurchaseOrders(prodConnection, localConnection, incrementalUpdate = true) { const startTime = Date.now(); let recordsAdded = 0; let recordsUpdated = 0; + let totalProcessed = 0; + + // Batch size constants + const PO_BATCH_SIZE = 500; + const INSERT_BATCH_SIZE = 100; try { // Begin transaction for the entire import process @@ -17,104 +64,244 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental console.log('Purchase Orders: Using last sync time:', lastSyncTime); - // Create temp tables + // Create temp tables for processing await localConnection.query(` DROP TABLE IF EXISTS temp_purchase_orders; - CREATE TABLE temp_purchase_orders ( - po_id INTEGER NOT NULL, - pid INTEGER NOT NULL, + DROP TABLE IF EXISTS temp_receivings; + DROP TABLE IF EXISTS temp_receiving_allocations; + DROP TABLE IF EXISTS employee_names; + + -- Temporary table for purchase orders + CREATE TEMP TABLE temp_purchase_orders ( + po_id VARCHAR(50) NOT NULL, + pid BIGINT NOT NULL, sku VARCHAR(50), name VARCHAR(255), vendor VARCHAR(255), date TIMESTAMP WITH TIME ZONE, - expected_date TIMESTAMP WITH TIME ZONE, + expected_date DATE, status INTEGER, + status_text VARCHAR(50), notes TEXT, + long_note TEXT, ordered INTEGER, - cost_price DECIMAL(10,3), + po_cost_price DECIMAL(10,3), + supplier_id INTEGER, + date_created TIMESTAMP WITH TIME ZONE, + date_ordered TIMESTAMP WITH TIME ZONE, PRIMARY KEY (po_id, pid) ); + + -- Temporary table for receivings + CREATE TEMP TABLE temp_receivings ( + receiving_id VARCHAR(50) NOT NULL, + po_id VARCHAR(50), + pid BIGINT NOT NULL, + qty_each INTEGER, + cost_each DECIMAL(10,5), + received_by INTEGER, + received_date TIMESTAMP WITH TIME ZONE, + receiving_created_date TIMESTAMP WITH TIME ZONE, + supplier_id INTEGER, + status INTEGER, + status_text VARCHAR(50), + PRIMARY KEY (receiving_id, pid) + ); + + -- Temporary table for tracking FIFO allocations + CREATE TEMP TABLE temp_receiving_allocations ( + po_id VARCHAR(50) NOT NULL, + pid BIGINT NOT NULL, + receiving_id VARCHAR(50) NOT NULL, + allocated_qty INTEGER NOT NULL, + cost_each DECIMAL(10,5) NOT NULL, + received_date TIMESTAMP WITH TIME ZONE NOT NULL, + received_by INTEGER, + PRIMARY KEY (po_id, pid, receiving_id) + ); + + -- Temporary table for employee names + CREATE TEMP TABLE employee_names ( + employeeid INTEGER PRIMARY KEY, + firstname VARCHAR(100), + lastname VARCHAR(100) + ); + + -- Create indexes for efficient joins + CREATE INDEX idx_temp_po_pid ON temp_purchase_orders(pid); + CREATE INDEX idx_temp_receiving_pid ON temp_receivings(pid); + CREATE INDEX idx_temp_receiving_po_id ON temp_receivings(po_id); `); - // First get all relevant PO IDs with basic info - Keep MySQL compatible for production - const [[{ total }]] = await prodConnection.query(` + // Map status codes to text values + const poStatusMap = { + 0: 'Canceled', + 1: 'Created', + 10: 'Ready ESend', + 11: 'Ordered', + 12: 'Preordered', + 13: 'Electronically Sent', + 15: 'Receiving Started', + 50: 'Done' + }; + + const receivingStatusMap = { + 0: 'Canceled', + 1: 'Created', + 30: 'Partial Received', + 40: 'Full Received', + 50: 'Paid' + }; + + // Get time window for data retrieval + const yearInterval = incrementalUpdate ? 1 : 5; + + // Fetch employee data from production + outputProgress({ + status: "running", + operation: "Purchase orders import", + message: "Fetching employee data" + }); + + const [employees] = await prodConnection.query(` + SELECT + employeeid, + firstname, + lastname + FROM employees + `); + + // Insert employee data into temp table + if (employees.length > 0) { + const employeeValues = employees.map(emp => [ + emp.employeeid, + emp.firstname || '', + emp.lastname || '' + ]).flat(); + + const placeholders = employees.map((_, idx) => { + const base = idx * 3; + return `($${base + 1}, $${base + 2}, $${base + 3})`; + }).join(','); + + await localConnection.query(` + INSERT INTO employee_names (employeeid, firstname, lastname) + VALUES ${placeholders} + ON CONFLICT (employeeid) DO UPDATE SET + firstname = EXCLUDED.firstname, + lastname = EXCLUDED.lastname + `, employeeValues); + } + + // 1. First, fetch all relevant POs + outputProgress({ + status: "running", + operation: "Purchase orders import", + message: "Fetching purchase orders" + }); + + const [poCount] = await prodConnection.query(` SELECT COUNT(*) as total - FROM ( - SELECT DISTINCT pop.po_id, pop.pid - FROM po p - JOIN po_products pop ON p.po_id = pop.po_id - JOIN suppliers s ON p.supplier_id = s.supplierid - WHERE p.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) - ${incrementalUpdate ? ` - AND ( - p.date_updated > ? - OR p.date_ordered > ? - OR p.date_estin > ? - ) - ` : ''} - ) all_items + FROM po p + WHERE p.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL ${yearInterval} YEAR) + ${incrementalUpdate ? ` + AND ( + p.date_updated > ? + OR p.date_ordered > ? + OR p.date_estin > ? + ) + ` : ''} `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []); - console.log('Purchase Orders: Found changes:', total); + const totalPOs = poCount[0].total; + console.log(`Found ${totalPOs} relevant purchase orders`); - // Get PO list - Keep MySQL compatible for production - console.log('Fetching purchase orders in batches...'); - - const FETCH_BATCH_SIZE = 5000; - const INSERT_BATCH_SIZE = 200; // Process 200 records at a time for inserts + // Fetch and process POs in batches let offset = 0; - let allProcessed = false; - let totalProcessed = 0; + let allPOsProcessed = false; - while (!allProcessed) { - console.log(`Fetching batch at offset ${offset}...`); + while (!allPOsProcessed) { const [poList] = await prodConnection.query(` - SELECT DISTINCT - COALESCE(p.po_id, 0) as po_id, - pop.pid, - COALESCE(NULLIF(pr.itemnumber, ''), 'NO-SKU') as sku, - COALESCE(pr.description, 'Unknown Product') as name, - COALESCE(NULLIF(s.companyname, ''), 'Unknown Vendor') as vendor, - COALESCE(p.date_ordered, p.date_created) as date, - p.date_estin as expected_date, - COALESCE(p.status, 1) as status, - COALESCE(p.short_note, p.notes) as notes, - pop.qty_each as ordered, - pop.cost_each as cost_price + SELECT + p.po_id, + p.supplier_id, + s.companyname AS vendor, + p.status, + p.notes AS long_note, + p.short_note AS notes, + p.date_created, + p.date_ordered, + p.date_estin FROM po p - JOIN po_products pop ON p.po_id = pop.po_id - JOIN products pr ON pop.pid = pr.pid LEFT JOIN suppliers s ON p.supplier_id = s.supplierid - WHERE p.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) + WHERE p.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL ${yearInterval} YEAR) ${incrementalUpdate ? ` AND ( p.date_updated > ? - OR p.date_ordered > ? + OR p.date_ordered > ? OR p.date_estin > ? ) ` : ''} - ORDER BY p.po_id, pop.pid - LIMIT ${FETCH_BATCH_SIZE} OFFSET ${offset} + ORDER BY p.po_id + LIMIT ${PO_BATCH_SIZE} OFFSET ${offset} `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []); - + if (poList.length === 0) { - allProcessed = true; + allPOsProcessed = true; break; } - - console.log(`Processing batch of ${poList.length} purchase order items (${offset}-${offset + poList.length})`); - // Process in smaller batches for inserts - for (let i = 0; i < poList.length; i += INSERT_BATCH_SIZE) { - const batch = poList.slice(i, Math.min(i + INSERT_BATCH_SIZE, poList.length)); + // Get products for these POs + const poIds = poList.map(po => po.po_id); + + const [poProducts] = await prodConnection.query(` + SELECT + pp.po_id, + pp.pid, + pp.qty_each, + pp.cost_each, + COALESCE(p.itemnumber, 'NO-SKU') AS sku, + COALESCE(p.description, 'Unknown Product') AS name + FROM po_products pp + LEFT JOIN products p ON pp.pid = p.pid + WHERE pp.po_id IN (?) + `, [poIds]); + + // Build complete PO records + const completePOs = []; + for (const product of poProducts) { + const po = poList.find(p => p.po_id == product.po_id); + if (!po) continue; + + completePOs.push({ + po_id: po.po_id.toString(), + pid: product.pid, + sku: product.sku, + name: product.name, + vendor: po.vendor || 'Unknown Vendor', + date: validateDate(po.date_ordered) || validateDate(po.date_created), + expected_date: validateDate(po.date_estin), + status: po.status, + status_text: poStatusMap[po.status] || '', + notes: po.notes || '', + long_note: po.long_note || '', + ordered: product.qty_each, + po_cost_price: product.cost_each, + supplier_id: po.supplier_id, + date_created: validateDate(po.date_created), + date_ordered: validateDate(po.date_ordered) + }); + } + + // Insert PO data in batches + for (let i = 0; i < completePOs.length; i += INSERT_BATCH_SIZE) { + const batch = completePOs.slice(i, i + INSERT_BATCH_SIZE); - // Create parameterized query with placeholders const placeholders = batch.map((_, idx) => { - const base = idx * 11; // 11 columns - return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, $${base + 10}, $${base + 11})`; + const base = idx * 16; + return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, $${base + 10}, $${base + 11}, $${base + 12}, $${base + 13}, $${base + 14}, $${base + 15}, $${base + 16})`; }).join(','); - // Create flattened values array const values = batch.flatMap(po => [ po.po_id, po.pid, @@ -124,16 +311,20 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental po.date, po.expected_date, po.status, + po.status_text, po.notes, + po.long_note, po.ordered, - po.cost_price + po.po_cost_price, + po.supplier_id, + po.date_created, + po.date_ordered ]); - // Execute batch insert await localConnection.query(` INSERT INTO temp_purchase_orders ( - po_id, pid, sku, name, vendor, date, expected_date, - status, notes, ordered, cost_price + po_id, pid, sku, name, vendor, date, expected_date, status, status_text, + notes, long_note, ordered, po_cost_price, supplier_id, date_created, date_ordered ) VALUES ${placeholders} ON CONFLICT (po_id, pid) DO UPDATE SET @@ -143,73 +334,531 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental date = EXCLUDED.date, expected_date = EXCLUDED.expected_date, status = EXCLUDED.status, + status_text = EXCLUDED.status_text, notes = EXCLUDED.notes, + long_note = EXCLUDED.long_note, ordered = EXCLUDED.ordered, - cost_price = EXCLUDED.cost_price + po_cost_price = EXCLUDED.po_cost_price, + supplier_id = EXCLUDED.supplier_id, + date_created = EXCLUDED.date_created, + date_ordered = EXCLUDED.date_ordered `, values); + } + + offset += poList.length; + totalProcessed += completePOs.length; + + outputProgress({ + status: "running", + operation: "Purchase orders import", + message: `Processed ${offset} of ${totalPOs} purchase orders (${totalProcessed} line items)`, + current: offset, + total: totalPOs, + elapsed: formatElapsedTime((Date.now() - startTime) / 1000), + remaining: estimateRemaining(startTime, offset, totalPOs), + rate: calculateRate(startTime, offset) + }); + + if (poList.length < PO_BATCH_SIZE) { + allPOsProcessed = true; + } + } + + // 2. Next, fetch all relevant receivings + outputProgress({ + status: "running", + operation: "Purchase orders import", + message: "Fetching receivings data" + }); + + const [receivingCount] = await prodConnection.query(` + SELECT COUNT(*) as total + FROM receivings r + WHERE r.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL ${yearInterval} YEAR) + ${incrementalUpdate ? ` + AND ( + r.date_updated > ? + OR r.date_created > ? + ) + ` : ''} + `, incrementalUpdate ? [lastSyncTime, lastSyncTime] : []); + + const totalReceivings = receivingCount[0].total; + console.log(`Found ${totalReceivings} relevant receivings`); + + // Fetch and process receivings in batches + offset = 0; // Reset offset for receivings + let allReceivingsProcessed = false; + + while (!allReceivingsProcessed) { + const [receivingList] = await prodConnection.query(` + SELECT + r.receiving_id, + r.po_id, + r.supplier_id, + r.status, + r.date_created + FROM receivings r + WHERE r.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL ${yearInterval} YEAR) + ${incrementalUpdate ? ` + AND ( + r.date_updated > ? + OR r.date_created > ? + ) + ` : ''} + ORDER BY r.receiving_id + LIMIT ${PO_BATCH_SIZE} OFFSET ${offset} + `, incrementalUpdate ? [lastSyncTime, lastSyncTime] : []); + + if (receivingList.length === 0) { + allReceivingsProcessed = true; + break; + } + + // Get products for these receivings + const receivingIds = receivingList.map(r => r.receiving_id); + + const [receivingProducts] = await prodConnection.query(` + SELECT + rp.receiving_id, + rp.pid, + rp.qty_each, + rp.cost_each, + rp.received_by, + rp.received_date, + r.date_created as receiving_created_date + FROM receivings_products rp + JOIN receivings r ON rp.receiving_id = r.receiving_id + WHERE rp.receiving_id IN (?) + `, [receivingIds]); + + // Build complete receiving records + const completeReceivings = []; + for (const product of receivingProducts) { + const receiving = receivingList.find(r => r.receiving_id == product.receiving_id); + if (!receiving) continue; - totalProcessed += batch.length; - - outputProgress({ - status: "running", - operation: "Purchase orders import", - message: `Processed ${totalProcessed}/${total} purchase order items`, - current: totalProcessed, - total: total, - elapsed: formatElapsedTime((Date.now() - startTime) / 1000), - remaining: estimateRemaining(startTime, totalProcessed, total), - rate: calculateRate(startTime, totalProcessed) + completeReceivings.push({ + receiving_id: receiving.receiving_id.toString(), + po_id: receiving.po_id ? receiving.po_id.toString() : null, + pid: product.pid, + qty_each: product.qty_each, + cost_each: product.cost_each, + received_by: product.received_by, + received_date: validateDate(product.received_date) || validateDate(product.receiving_created_date), + receiving_created_date: validateDate(product.receiving_created_date), + supplier_id: receiving.supplier_id, + status: receiving.status, + status_text: receivingStatusMap[receiving.status] || '', + receiving_created_date: validateDate(product.receiving_created_date) }); } - // Update offset for next batch - offset += poList.length; + // Insert receiving data in batches + for (let i = 0; i < completeReceivings.length; i += INSERT_BATCH_SIZE) { + const batch = completeReceivings.slice(i, i + INSERT_BATCH_SIZE); + + const placeholders = batch.map((_, idx) => { + const base = idx * 11; + return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, $${base + 10}, $${base + 11})`; + }).join(','); + + const values = batch.flatMap(r => [ + r.receiving_id, + r.po_id, + r.pid, + r.qty_each, + r.cost_each, + r.received_by, + r.received_date, + r.receiving_created_date, + r.supplier_id, + r.status, + r.status_text + ]); + + await localConnection.query(` + INSERT INTO temp_receivings ( + receiving_id, po_id, pid, qty_each, cost_each, received_by, + received_date, receiving_created_date, supplier_id, status, status_text + ) + VALUES ${placeholders} + ON CONFLICT (receiving_id, pid) DO UPDATE SET + po_id = EXCLUDED.po_id, + qty_each = EXCLUDED.qty_each, + cost_each = EXCLUDED.cost_each, + received_by = EXCLUDED.received_by, + received_date = EXCLUDED.received_date, + receiving_created_date = EXCLUDED.receiving_created_date, + supplier_id = EXCLUDED.supplier_id, + status = EXCLUDED.status, + status_text = EXCLUDED.status_text + `, values); + } - // Check if we've received fewer records than the batch size, meaning we're done - if (poList.length < FETCH_BATCH_SIZE) { - allProcessed = true; + offset += receivingList.length; + totalProcessed += completeReceivings.length; + + outputProgress({ + status: "running", + operation: "Purchase orders import", + message: `Processed ${offset} of ${totalReceivings} receivings (${totalProcessed} line items total)`, + current: offset, + total: totalReceivings, + elapsed: formatElapsedTime((Date.now() - startTime) / 1000), + remaining: estimateRemaining(startTime, offset, totalReceivings), + rate: calculateRate(startTime, offset) + }); + + if (receivingList.length < PO_BATCH_SIZE) { + allReceivingsProcessed = true; } } - // Count the temp table contents - const [tempCount] = await localConnection.query(`SELECT COUNT(*) FROM temp_purchase_orders`); - const tempRowCount = parseInt(tempCount.rows[0].count); - console.log(`Successfully inserted ${tempRowCount} rows into temp_purchase_orders`); - - // Now insert into the final table - const [result] = await localConnection.query(` - WITH inserted_pos AS ( - INSERT INTO purchase_orders ( - po_id, pid, sku, name, cost_price, po_cost_price, - vendor, date, expected_date, status, notes, - ordered, received, receiving_status - ) + // 3. Implement FIFO allocation of receivings to purchase orders + outputProgress({ + status: "running", + operation: "Purchase orders import", + message: "Allocating receivings to purchase orders using FIFO" + }); + + // Step 1: Handle receivings with matching PO IDs (direct allocation) + await localConnection.query(` + INSERT INTO temp_receiving_allocations ( + po_id, pid, receiving_id, allocated_qty, cost_each, received_date, received_by + ) + SELECT + r.po_id, + r.pid, + r.receiving_id, + LEAST(r.qty_each, po.ordered) as allocated_qty, + r.cost_each, + COALESCE(r.received_date, NOW()) as received_date, + r.received_by + FROM temp_receivings r + JOIN temp_purchase_orders po ON r.po_id = po.po_id AND r.pid = po.pid + WHERE r.po_id IS NOT NULL + `); + + // Step 2: Handle receivings without a matching PO (standalone receivings) + // Create a PO entry for each standalone receiving + await localConnection.query(` + INSERT INTO temp_purchase_orders ( + po_id, pid, sku, name, vendor, date, status, status_text, + ordered, po_cost_price, supplier_id, date_created, date_ordered + ) + SELECT + 'R' || r.receiving_id as po_id, + r.pid, + COALESCE(p.sku, 'NO-SKU') as sku, + COALESCE(p.name, 'Unknown Product') as name, + COALESCE( + (SELECT vendor FROM temp_purchase_orders + WHERE supplier_id = r.supplier_id LIMIT 1), + 'Unknown Vendor' + ) as vendor, + COALESCE(r.received_date, r.receiving_created_date) as date, + NULL as status, + NULL as status_text, + NULL as ordered, + r.cost_each as po_cost_price, + r.supplier_id, + COALESCE(r.receiving_created_date, r.received_date) as date_created, + NULL as date_ordered + FROM temp_receivings r + LEFT JOIN ( + SELECT DISTINCT pid, sku, name FROM temp_purchase_orders + ) p ON r.pid = p.pid + WHERE r.po_id IS NULL + OR NOT EXISTS ( + SELECT 1 FROM temp_purchase_orders po + WHERE po.po_id = r.po_id AND po.pid = r.pid + ) + ON CONFLICT (po_id, pid) DO NOTHING + `); + + // Now allocate these standalone receivings to their "virtual" POs + await localConnection.query(` + INSERT INTO temp_receiving_allocations ( + po_id, pid, receiving_id, allocated_qty, cost_each, received_date, received_by + ) + SELECT + 'R' || r.receiving_id as po_id, + r.pid, + r.receiving_id, + r.qty_each as allocated_qty, + r.cost_each, + COALESCE(r.received_date, NOW()) as received_date, + r.received_by + FROM temp_receivings r + WHERE r.po_id IS NULL + OR NOT EXISTS ( + SELECT 1 FROM temp_purchase_orders po + WHERE po.po_id = r.po_id AND po.pid = r.pid + ) + `); + + // Step 3: Handle unallocated receivings vs. unfulfilled orders + // This is the complex FIFO allocation logic + await localConnection.query(` + WITH + -- Calculate remaining quantities after direct allocations + remaining_po_quantities AS ( SELECT - po_id, pid, sku, name, cost_price, cost_price, - vendor, date, expected_date, status, notes, - ordered, 0 as received, 1 as receiving_status - FROM temp_purchase_orders + po.po_id, + po.pid, + po.ordered, + COALESCE(SUM(ra.allocated_qty), 0) as already_allocated, + po.ordered - COALESCE(SUM(ra.allocated_qty), 0) as remaining_qty, + po.date_ordered, + po.date_created + FROM temp_purchase_orders po + LEFT JOIN temp_receiving_allocations ra ON po.po_id = ra.po_id AND po.pid = ra.pid + WHERE po.ordered IS NOT NULL + GROUP BY po.po_id, po.pid, po.ordered, po.date_ordered, po.date_created + HAVING po.ordered > COALESCE(SUM(ra.allocated_qty), 0) + ), + remaining_receiving_quantities AS ( + SELECT + r.receiving_id, + r.pid, + r.qty_each, + COALESCE(SUM(ra.allocated_qty), 0) as already_allocated, + r.qty_each - COALESCE(SUM(ra.allocated_qty), 0) as remaining_qty, + r.received_date, + r.cost_each, + r.received_by + FROM temp_receivings r + LEFT JOIN temp_receiving_allocations ra ON r.receiving_id = ra.receiving_id AND r.pid = ra.pid + GROUP BY r.receiving_id, r.pid, r.qty_each, r.received_date, r.cost_each, r.received_by + HAVING r.qty_each > COALESCE(SUM(ra.allocated_qty), 0) + ), + -- Rank POs by age, with a cutoff for very old POs (1 year) + ranked_pos AS ( + SELECT + po.po_id, + po.pid, + po.remaining_qty, + CASE + WHEN po.date_ordered IS NULL OR po.date_ordered < NOW() - INTERVAL '1 year' THEN 2 + ELSE 1 + END as age_group, + ROW_NUMBER() OVER ( + PARTITION BY po.pid, (CASE WHEN po.date_ordered IS NULL OR po.date_ordered < NOW() - INTERVAL '1 year' THEN 2 ELSE 1 END) + ORDER BY COALESCE(po.date_ordered, po.date_created, NOW()) + ) as rank_in_group + FROM remaining_po_quantities po + ), + -- Rank receivings by date + ranked_receivings AS ( + SELECT + r.receiving_id, + r.pid, + r.remaining_qty, + r.received_date, + r.cost_each, + r.received_by, + ROW_NUMBER() OVER (PARTITION BY r.pid ORDER BY COALESCE(r.received_date, NOW())) as rank + FROM remaining_receiving_quantities r + ), + -- First allocate to recent POs + allocations_recent AS ( + SELECT + po.po_id, + po.pid, + r.receiving_id, + LEAST(po.remaining_qty, r.remaining_qty) as allocated_qty, + r.cost_each, + COALESCE(r.received_date, NOW()) as received_date, + r.received_by, + po.age_group, + po.rank_in_group, + r.rank, + 'recent' as allocation_type + FROM ranked_pos po + JOIN ranked_receivings r ON po.pid = r.pid + WHERE po.age_group = 1 + ORDER BY po.pid, po.rank_in_group, r.rank + ), + -- Then allocate to older POs + remaining_after_recent AS ( + SELECT + r.receiving_id, + r.pid, + r.remaining_qty - COALESCE(SUM(a.allocated_qty), 0) as remaining_qty, + r.received_date, + r.cost_each, + r.received_by, + r.rank + FROM ranked_receivings r + LEFT JOIN allocations_recent a ON r.receiving_id = a.receiving_id AND r.pid = a.pid + GROUP BY r.receiving_id, r.pid, r.remaining_qty, r.received_date, r.cost_each, r.received_by, r.rank + HAVING r.remaining_qty > COALESCE(SUM(a.allocated_qty), 0) + ), + allocations_old AS ( + SELECT + po.po_id, + po.pid, + r.receiving_id, + LEAST(po.remaining_qty, r.remaining_qty) as allocated_qty, + r.cost_each, + COALESCE(r.received_date, NOW()) as received_date, + r.received_by, + po.age_group, + po.rank_in_group, + r.rank, + 'old' as allocation_type + FROM ranked_pos po + JOIN remaining_after_recent r ON po.pid = r.pid + WHERE po.age_group = 2 + ORDER BY po.pid, po.rank_in_group, r.rank + ), + -- Combine allocations + combined_allocations AS ( + SELECT * FROM allocations_recent + UNION ALL + SELECT * FROM allocations_old + ) + -- Insert into allocations table + INSERT INTO temp_receiving_allocations ( + po_id, pid, receiving_id, allocated_qty, cost_each, received_date, received_by + ) + SELECT + po_id, pid, receiving_id, allocated_qty, cost_each, + COALESCE(received_date, NOW()) as received_date, + received_by + FROM combined_allocations + WHERE allocated_qty > 0 + `); + + // 4. Generate final purchase order records with receiving data + outputProgress({ + status: "running", + operation: "Purchase orders import", + message: "Generating final purchase order records" + }); + + const [finalResult] = await localConnection.query(` + WITH + receiving_summaries AS ( + SELECT + po_id, + pid, + SUM(allocated_qty) as total_received, + JSONB_AGG( + JSONB_BUILD_OBJECT( + 'receiving_id', receiving_id, + 'qty', allocated_qty, + 'date', COALESCE(received_date, NOW()), + 'cost', cost_each, + 'received_by', received_by, + 'received_by_name', CASE + WHEN received_by IS NOT NULL AND received_by > 0 THEN + (SELECT CONCAT(firstname, ' ', lastname) + FROM employee_names + WHERE employeeid = received_by) + ELSE NULL + END + ) ORDER BY COALESCE(received_date, NOW()) + ) as receiving_history, + MIN(COALESCE(received_date, NOW())) as first_received_date, + MAX(COALESCE(received_date, NOW())) as last_received_date, + STRING_AGG( + DISTINCT CASE WHEN received_by IS NOT NULL AND received_by > 0 + THEN CAST(received_by AS TEXT) + ELSE NULL + END, + ',' + ) as received_by_list, + STRING_AGG( + DISTINCT CASE + WHEN ra.received_by IS NOT NULL AND ra.received_by > 0 THEN + (SELECT CONCAT(firstname, ' ', lastname) + FROM employee_names + WHERE employeeid = ra.received_by) + ELSE NULL + END, + ', ' + ) as received_by_names + FROM temp_receiving_allocations ra + GROUP BY po_id, pid + ), + cost_averaging AS ( + SELECT + ra.po_id, + ra.pid, + SUM(ra.allocated_qty * ra.cost_each) / NULLIF(SUM(ra.allocated_qty), 0) as avg_cost + FROM temp_receiving_allocations ra + GROUP BY ra.po_id, ra.pid + ) + INSERT INTO purchase_orders ( + po_id, vendor, date, expected_date, pid, sku, name, + cost_price, po_cost_price, status, receiving_status, notes, long_note, + ordered, received, received_date, last_received_date, received_by, + receiving_history + ) + SELECT + po.po_id, + po.vendor, + CASE + WHEN po.date IS NOT NULL THEN po.date + -- For standalone receivings, try to use the receiving date from history + WHEN po.po_id LIKE 'R%' AND rs.first_received_date IS NOT NULL THEN rs.first_received_date + -- As a last resort for data integrity, use Unix epoch (Jan 1, 1970) + ELSE to_timestamp(0) + END as date, + NULLIF(po.expected_date::text, '0000-00-00')::date as expected_date, + po.pid, + po.sku, + po.name, + COALESCE(ca.avg_cost, po.po_cost_price) as cost_price, + po.po_cost_price, + CASE WHEN po.status IS NULL THEN 1 ELSE po.status END as status, + CASE + WHEN rs.total_received IS NULL THEN 1 + WHEN rs.total_received = 0 THEN 1 + WHEN rs.total_received < po.ordered THEN 30 + WHEN rs.total_received >= po.ordered THEN 40 + ELSE 1 + END as receiving_status, + po.notes, + po.long_note, + COALESCE(po.ordered, 0), + COALESCE(rs.total_received, 0), + NULLIF(rs.first_received_date::text, '0000-00-00 00:00:00')::timestamp with time zone as received_date, + NULLIF(rs.last_received_date::text, '0000-00-00 00:00:00')::timestamp with time zone as last_received_date, + CASE + WHEN rs.received_by_list IS NULL THEN NULL + ELSE rs.received_by_names + END as received_by, + rs.receiving_history + FROM temp_purchase_orders po + LEFT JOIN receiving_summaries rs ON po.po_id = rs.po_id AND po.pid = rs.pid + LEFT JOIN cost_averaging ca ON po.po_id = ca.po_id AND po.pid = ca.pid ON CONFLICT (po_id, pid) DO UPDATE SET vendor = EXCLUDED.vendor, date = EXCLUDED.date, expected_date = EXCLUDED.expected_date, + sku = EXCLUDED.sku, + name = EXCLUDED.name, + cost_price = EXCLUDED.cost_price, + po_cost_price = EXCLUDED.po_cost_price, status = EXCLUDED.status, + receiving_status = EXCLUDED.receiving_status, notes = EXCLUDED.notes, + long_note = EXCLUDED.long_note, ordered = EXCLUDED.ordered, - cost_price = EXCLUDED.cost_price, - po_cost_price = EXCLUDED.po_cost_price - RETURNING xmax = 0 as inserted - ) - SELECT - COUNT(*) FILTER (WHERE inserted) as inserted, - COUNT(*) FILTER (WHERE NOT inserted) as updated - FROM inserted_pos + received = EXCLUDED.received, + received_date = EXCLUDED.received_date, + last_received_date = EXCLUDED.last_received_date, + received_by = EXCLUDED.received_by, + receiving_history = EXCLUDED.receiving_history, + updated = CURRENT_TIMESTAMP + RETURNING (xmax = 0) as inserted `); - // Parse the result - const { inserted, updated } = result.rows[0]; - recordsAdded = parseInt(inserted) || 0; - recordsUpdated = parseInt(updated) || 0; + recordsAdded = finalResult.rows.filter(r => r.inserted).length; + recordsUpdated = finalResult.rows.filter(r => !r.inserted).length; // Update sync status await localConnection.query(` @@ -220,7 +869,12 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental `); // Clean up temporary tables - await localConnection.query(`DROP TABLE IF EXISTS temp_purchase_orders;`); + await localConnection.query(` + DROP TABLE IF EXISTS temp_purchase_orders; + DROP TABLE IF EXISTS temp_receivings; + DROP TABLE IF EXISTS temp_receiving_allocations; + DROP TABLE IF EXISTS employee_names; + `); // Commit transaction await localConnection.commit();