From 87d4b9e8041c1356d45fd192e141aa3514b7f744 Mon Sep 17 00:00:00 2001 From: Matt Date: Mon, 24 Mar 2025 22:27:44 -0400 Subject: [PATCH] Fixes/improvements for import scripts --- inventory-server/scripts/import-from-prod.js | 59 ++++---- inventory-server/scripts/import/categories.js | 41 +----- inventory-server/scripts/import/orders.js | 70 ++++++---- inventory-server/scripts/import/products.js | 58 +++++--- .../scripts/import/purchase-orders.js | 128 +++++++++--------- 5 files changed, 190 insertions(+), 166 deletions(-) diff --git a/inventory-server/scripts/import-from-prod.js b/inventory-server/scripts/import-from-prod.js index ff1e27c..f3392cb 100644 --- a/inventory-server/scripts/import-from-prod.js +++ b/inventory-server/scripts/import-from-prod.js @@ -10,9 +10,9 @@ const importPurchaseOrders = require('./import/purchase-orders'); dotenv.config({ path: path.join(__dirname, "../.env") }); // Constants to control which imports run -const IMPORT_CATEGORIES = false; -const IMPORT_PRODUCTS = false; -const IMPORT_ORDERS = false; +const IMPORT_CATEGORIES = true; +const IMPORT_PRODUCTS = true; +const IMPORT_ORDERS = true; const IMPORT_PURCHASE_ORDERS = true; // Add flag for incremental updates @@ -120,27 +120,38 @@ async function main() { `); // Create import history record for the overall session - const [historyResult] = await localConnection.query(` - INSERT INTO import_history ( - table_name, - start_time, - is_incremental, - status, - additional_info - ) VALUES ( - 'all_tables', - NOW(), - $1::boolean, - 'running', - jsonb_build_object( - 'categories_enabled', $2::boolean, - 'products_enabled', $3::boolean, - 'orders_enabled', $4::boolean, - 'purchase_orders_enabled', $5::boolean - ) - ) RETURNING id - `, [INCREMENTAL_UPDATE, IMPORT_CATEGORIES, IMPORT_PRODUCTS, IMPORT_ORDERS, IMPORT_PURCHASE_ORDERS]); - importHistoryId = historyResult.rows[0].id; + try { + const [historyResult] = await localConnection.query(` + INSERT INTO import_history ( + table_name, + start_time, + is_incremental, + status, + additional_info + ) VALUES ( + 'all_tables', + NOW(), + $1::boolean, + 'running', + jsonb_build_object( + 'categories_enabled', $2::boolean, + 'products_enabled', $3::boolean, + 'orders_enabled', $4::boolean, + 'purchase_orders_enabled', $5::boolean + ) + ) RETURNING id + `, [INCREMENTAL_UPDATE, IMPORT_CATEGORIES, IMPORT_PRODUCTS, IMPORT_ORDERS, IMPORT_PURCHASE_ORDERS]); + importHistoryId = historyResult.rows[0].id; + } catch (error) { + console.error("Error creating import history record:", error); + outputProgress({ + status: "error", + operation: "Import process", + message: "Failed to create import history record", + error: error.message + }); + throw error; + } const results = { categories: null, diff --git a/inventory-server/scripts/import/categories.js b/inventory-server/scripts/import/categories.js index 2634348..41df041 100644 --- a/inventory-server/scripts/import/categories.js +++ b/inventory-server/scripts/import/categories.js @@ -47,42 +47,18 @@ async function importCategories(prodConnection, localConnection) { continue; } - console.log(`\nProcessing ${categories.length} type ${type} categories`); - if (type === 10) { - console.log("Type 10 categories:", JSON.stringify(categories, null, 2)); - } + console.log(`Processing ${categories.length} type ${type} categories`); - // For types that can have parents (11, 21, 12, 13), verify parent existence + // For types that can have parents (11, 21, 12, 13), we'll proceed directly + // No need to check for parent existence since we process in hierarchical order let categoriesToInsert = categories; - if (![10, 20].includes(type)) { - // Get all parent IDs - const parentIds = [ - ...new Set( - categories - .filter(c => c && c.parent_id !== null) - .map(c => c.parent_id) - ), - ]; - - console.log(`Processing ${categories.length} type ${type} categories with ${parentIds.length} unique parent IDs`); - console.log('Parent IDs:', parentIds); - - // No need to check for parent existence - we trust they exist since they were just inserted - categoriesToInsert = categories; - } if (categoriesToInsert.length === 0) { - console.log( - `No valid categories of type ${type} to insert` - ); + console.log(`No valid categories of type ${type} to insert`); await localConnection.query(`RELEASE SAVEPOINT category_type_${type}`); continue; } - console.log( - `Inserting ${categoriesToInsert.length} type ${type} categories` - ); - // PostgreSQL upsert query with parameterized values const values = categoriesToInsert.flatMap((cat) => [ cat.cat_id, @@ -95,14 +71,10 @@ async function importCategories(prodConnection, localConnection) { new Date() ]); - console.log('Attempting to insert/update with values:', JSON.stringify(values, null, 2)); - const placeholders = categoriesToInsert .map((_, i) => `($${i * 8 + 1}, $${i * 8 + 2}, $${i * 8 + 3}, $${i * 8 + 4}, $${i * 8 + 5}, $${i * 8 + 6}, $${i * 8 + 7}, $${i * 8 + 8})`) .join(','); - console.log('Using placeholders:', placeholders); - // Insert categories with ON CONFLICT clause for PostgreSQL const query = ` WITH inserted_categories AS ( @@ -129,17 +101,14 @@ async function importCategories(prodConnection, localConnection) { COUNT(*) FILTER (WHERE is_insert) as inserted, COUNT(*) FILTER (WHERE NOT is_insert) as updated FROM inserted_categories`; - - console.log('Executing query:', query); const result = await localConnection.query(query, values); - console.log('Query result:', result); // Get the first result since query returns an array const queryResult = Array.isArray(result) ? result[0] : result; if (!queryResult || !queryResult.rows || !queryResult.rows[0]) { - console.error('Query failed to return results. Result:', queryResult); + console.error('Query failed to return results'); throw new Error('Query did not return expected results'); } diff --git a/inventory-server/scripts/import/orders.js b/inventory-server/scripts/import/orders.js index c4f9cdd..bbe9931 100644 --- a/inventory-server/scripts/import/orders.js +++ b/inventory-server/scripts/import/orders.js @@ -26,6 +26,9 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = let cumulativeProcessedOrders = 0; try { + // Begin transaction + await localConnection.beginTransaction(); + // Get last sync info const [syncInfo] = await localConnection.query( "SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'orders'" @@ -38,7 +41,6 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = const [[{ total }]] = await prodConnection.query(` SELECT COUNT(*) as total FROM order_items oi - USE INDEX (PRIMARY) JOIN _order o ON oi.order_id = o.order_id WHERE o.order_status >= 15 AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) @@ -78,7 +80,6 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = COALESCE(oi.prod_price_reg - oi.prod_price, 0) as base_discount, oi.stamp as last_modified FROM order_items oi - USE INDEX (PRIMARY) JOIN _order o ON oi.order_id = o.order_id WHERE o.order_status >= 15 AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) @@ -105,15 +106,15 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = console.log('Orders: Found', orderItems.length, 'order items to process'); - // Create tables in PostgreSQL for debugging + // Create tables in PostgreSQL for data processing await localConnection.query(` - DROP TABLE IF EXISTS debug_order_items; - DROP TABLE IF EXISTS debug_order_meta; - DROP TABLE IF EXISTS debug_order_discounts; - DROP TABLE IF EXISTS debug_order_taxes; - DROP TABLE IF EXISTS debug_order_costs; + DROP TABLE IF EXISTS temp_order_items; + DROP TABLE IF EXISTS temp_order_meta; + DROP TABLE IF EXISTS temp_order_discounts; + DROP TABLE IF EXISTS temp_order_taxes; + DROP TABLE IF EXISTS temp_order_costs; - CREATE TABLE debug_order_items ( + CREATE TEMP TABLE temp_order_items ( order_id INTEGER NOT NULL, pid INTEGER NOT NULL, SKU VARCHAR(50) NOT NULL, @@ -123,7 +124,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = PRIMARY KEY (order_id, pid) ); - CREATE TABLE debug_order_meta ( + CREATE TEMP TABLE temp_order_meta ( order_id INTEGER NOT NULL, date DATE NOT NULL, customer VARCHAR(100) NOT NULL, @@ -135,26 +136,29 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = PRIMARY KEY (order_id) ); - CREATE TABLE debug_order_discounts ( + CREATE TEMP TABLE temp_order_discounts ( order_id INTEGER NOT NULL, pid INTEGER NOT NULL, discount DECIMAL(10,2) NOT NULL, PRIMARY KEY (order_id, pid) ); - CREATE TABLE debug_order_taxes ( + CREATE TEMP TABLE temp_order_taxes ( order_id INTEGER NOT NULL, pid INTEGER NOT NULL, tax DECIMAL(10,2) NOT NULL, PRIMARY KEY (order_id, pid) ); - CREATE TABLE debug_order_costs ( + CREATE TEMP TABLE temp_order_costs ( order_id INTEGER NOT NULL, pid INTEGER NOT NULL, costeach DECIMAL(10,3) DEFAULT 0.000, PRIMARY KEY (order_id, pid) ); + + CREATE INDEX idx_temp_order_items_pid ON temp_order_items(pid); + CREATE INDEX idx_temp_order_meta_order_id ON temp_order_meta(order_id); `); // Insert order items in batches @@ -168,7 +172,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = ]); await localConnection.query(` - INSERT INTO debug_order_items (order_id, pid, SKU, price, quantity, base_discount) + INSERT INTO temp_order_items (order_id, pid, SKU, price, quantity, base_discount) VALUES ${placeholders} ON CONFLICT (order_id, pid) DO UPDATE SET SKU = EXCLUDED.SKU, @@ -239,7 +243,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = ]); await localConnection.query(` - INSERT INTO debug_order_meta ( + INSERT INTO temp_order_meta ( order_id, date, customer, customer_name, status, canceled, summary_discount, summary_subtotal ) @@ -281,7 +285,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = ]); await localConnection.query(` - INSERT INTO debug_order_discounts (order_id, pid, discount) + INSERT INTO temp_order_discounts (order_id, pid, discount) VALUES ${placeholders} ON CONFLICT (order_id, pid) DO UPDATE SET discount = EXCLUDED.discount @@ -321,7 +325,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = ]); await localConnection.query(` - INSERT INTO debug_order_taxes (order_id, pid, tax) + INSERT INTO temp_order_taxes (order_id, pid, tax) VALUES ${placeholders} ON CONFLICT (order_id, pid) DO UPDATE SET tax = EXCLUDED.tax @@ -366,7 +370,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = ]); await localConnection.query(` - INSERT INTO debug_order_costs (order_id, pid, costeach) + INSERT INTO temp_order_costs (order_id, pid, costeach) VALUES ${placeholders} ON CONFLICT (order_id, pid) DO UPDATE SET costeach = EXCLUDED.costeach @@ -426,9 +430,9 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = SUM(COALESCE(od.discount, 0)) as promo_discount, COALESCE(ot.tax, 0) as total_tax, COALESCE(oi.price * 0.5, 0) as costeach - FROM debug_order_items oi - LEFT JOIN debug_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid - LEFT JOIN debug_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid + FROM temp_order_items oi + LEFT JOIN temp_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid + LEFT JOIN temp_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid GROUP BY oi.order_id, oi.pid, ot.tax ) SELECT @@ -456,11 +460,11 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = FROM ( SELECT DISTINCT ON (order_id, pid) order_id, pid, SKU, price, quantity, base_discount - FROM debug_order_items + FROM temp_order_items WHERE order_id = ANY($1) ORDER BY order_id, pid ) oi - JOIN debug_order_meta om ON oi.order_id = om.order_id + JOIN temp_order_meta om ON oi.order_id = om.order_id LEFT JOIN order_totals ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid ORDER BY oi.order_id, oi.pid `, [subBatchIds]); @@ -564,6 +568,18 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = ON CONFLICT (table_name) DO UPDATE SET last_sync_timestamp = NOW() `); + + // Cleanup temporary tables + await localConnection.query(` + DROP TABLE IF EXISTS temp_order_items; + DROP TABLE IF EXISTS temp_order_meta; + DROP TABLE IF EXISTS temp_order_discounts; + DROP TABLE IF EXISTS temp_order_taxes; + DROP TABLE IF EXISTS temp_order_costs; + `); + + // Commit transaction + await localConnection.commit(); return { status: "complete", @@ -577,6 +593,14 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = }; } catch (error) { console.error("Error during orders import:", error); + + // Rollback transaction + try { + await localConnection.rollback(); + } catch (rollbackError) { + console.error("Error during rollback:", rollbackError); + } + throw error; } } diff --git a/inventory-server/scripts/import/products.js b/inventory-server/scripts/import/products.js index 8e0dc8f..f91619d 100644 --- a/inventory-server/scripts/import/products.js +++ b/inventory-server/scripts/import/products.js @@ -2,9 +2,12 @@ const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = const BATCH_SIZE = 100; // Smaller batch size for better progress tracking const MAX_RETRIES = 3; const RETRY_DELAY = 5000; // 5 seconds +const dotenv = require("dotenv"); +const path = require("path"); +dotenv.config({ path: path.join(__dirname, "../../.env") }); // Utility functions -const imageUrlBase = 'https://sbing.com/i/products/0000/'; +const imageUrlBase = process.env.PRODUCT_IMAGE_URL_BASE || 'https://sbing.com/i/products/0000/'; const getImageUrls = (pid, iid = 1) => { const paddedPid = pid.toString().padStart(6, '0'); // Use padded PID only for the first 3 digits @@ -18,7 +21,7 @@ const getImageUrls = (pid, iid = 1) => { }; }; -// Add helper function for retrying operations +// Add helper function for retrying operations with exponential backoff async function withRetry(operation, errorMessage) { let lastError; for (let attempt = 1; attempt <= MAX_RETRIES; attempt++) { @@ -28,7 +31,8 @@ async function withRetry(operation, errorMessage) { lastError = error; console.error(`${errorMessage} (Attempt ${attempt}/${MAX_RETRIES}):`, error); if (attempt < MAX_RETRIES) { - await new Promise(resolve => setTimeout(resolve, RETRY_DELAY)); + const backoffTime = RETRY_DELAY * Math.pow(2, attempt - 1); + await new Promise(resolve => setTimeout(resolve, backoffTime)); } } } @@ -772,32 +776,44 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate recordsAdded += parseInt(result.rows[0].inserted, 10) || 0; recordsUpdated += parseInt(result.rows[0].updated, 10) || 0; - // Process category relationships for each product in the batch + // Process category relationships in batches + const allCategories = []; for (const row of batch) { if (row.categories) { const categoryIds = row.categories.split(',').filter(id => id && id.trim()); if (categoryIds.length > 0) { - const catPlaceholders = categoryIds.map((_, idx) => - `($${idx * 2 + 1}, $${idx * 2 + 2})` - ).join(','); - const catValues = categoryIds.flatMap(catId => [row.pid, parseInt(catId.trim(), 10)]); - - // First delete existing relationships for this product - await localConnection.query( - 'DELETE FROM product_categories WHERE pid = $1', - [row.pid] - ); - - // Then insert the new relationships - await localConnection.query(` - INSERT INTO product_categories (pid, cat_id) - VALUES ${catPlaceholders} - ON CONFLICT (pid, cat_id) DO NOTHING - `, catValues); + categoryIds.forEach(catId => { + allCategories.push([row.pid, parseInt(catId.trim(), 10)]); + }); } } } + // If we have categories to process + if (allCategories.length > 0) { + // First get all products in this batch + const productIds = batch.map(p => p.pid); + + // Delete all existing relationships for products in this batch + await localConnection.query( + 'DELETE FROM product_categories WHERE pid = ANY($1)', + [productIds] + ); + + // Insert all new relationships in one batch + const catPlaceholders = allCategories.map((_, idx) => + `($${idx * 2 + 1}, $${idx * 2 + 2})` + ).join(','); + + const catValues = allCategories.flat(); + + await localConnection.query(` + INSERT INTO product_categories (pid, cat_id) + VALUES ${catPlaceholders} + ON CONFLICT (pid, cat_id) DO NOTHING + `, catValues); + } + outputProgress({ status: "running", operation: "Products import", diff --git a/inventory-server/scripts/import/purchase-orders.js b/inventory-server/scripts/import/purchase-orders.js index bac0208..4457ed5 100644 --- a/inventory-server/scripts/import/purchase-orders.js +++ b/inventory-server/scripts/import/purchase-orders.js @@ -6,6 +6,9 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental let recordsUpdated = 0; try { + // Begin transaction for the entire import process + await localConnection.beginTransaction(); + // Get last sync info const [syncInfo] = await localConnection.query( "SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'purchase_orders'" @@ -39,7 +42,6 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental FROM ( SELECT DISTINCT pop.po_id, pop.pid FROM po p - USE INDEX (idx_date_created) JOIN po_products pop ON p.po_id = pop.po_id JOIN suppliers s ON p.supplier_id = s.supplierid WHERE p.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) @@ -59,6 +61,7 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental console.log('Fetching purchase orders in batches...'); const FETCH_BATCH_SIZE = 5000; + const INSERT_BATCH_SIZE = 200; // Process 200 records at a time for inserts let offset = 0; let allProcessed = false; let totalProcessed = 0; @@ -101,64 +104,62 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental console.log(`Processing batch of ${poList.length} purchase order items (${offset}-${offset + poList.length})`); - let processed = 0; - - // Process each PO in a separate insert to avoid parameter issues - for (let i = 0; i < poList.length; i++) { - const po = poList[i]; + // Process in smaller batches for inserts + for (let i = 0; i < poList.length; i += INSERT_BATCH_SIZE) { + const batch = poList.slice(i, Math.min(i + INSERT_BATCH_SIZE, poList.length)); - try { - // Single row insert - await localConnection.query(` - INSERT INTO temp_purchase_orders ( - po_id, pid, sku, name, vendor, date, expected_date, - status, notes, ordered, cost_price - ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) - ON CONFLICT (po_id, pid) DO UPDATE SET - sku = EXCLUDED.sku, - name = EXCLUDED.name, - vendor = EXCLUDED.vendor, - date = EXCLUDED.date, - expected_date = EXCLUDED.expected_date, - status = EXCLUDED.status, - notes = EXCLUDED.notes, - ordered = EXCLUDED.ordered, - cost_price = EXCLUDED.cost_price - `, [ - po.po_id, - po.pid, - po.sku, - po.name, - po.vendor, - po.date, - po.expected_date, - po.status, - po.notes, - po.ordered, - po.cost_price - ]); - - processed++; - totalProcessed++; - - // Only log occasionally - if (processed % 500 === 0 || processed === 1 || processed === poList.length) { - outputProgress({ - status: "running", - operation: "Purchase orders import", - message: `Batch ${Math.floor(offset/FETCH_BATCH_SIZE) + 1}: ${processed}/${poList.length} (Total: ${totalProcessed}/${total})`, - current: totalProcessed, - total: total, - elapsed: formatElapsedTime((Date.now() - startTime) / 1000), - remaining: estimateRemaining(startTime, totalProcessed, total), - rate: calculateRate(startTime, totalProcessed) - }); - } - } catch (error) { - console.error(`Error inserting PO #${po.po_id} product #${po.pid}:`, error.message); - console.log('PO data:', po); - } + // Create parameterized query with placeholders + const placeholders = batch.map((_, idx) => { + const base = idx * 11; // 11 columns + return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, $${base + 10}, $${base + 11})`; + }).join(','); + + // Create flattened values array + const values = batch.flatMap(po => [ + po.po_id, + po.pid, + po.sku, + po.name, + po.vendor, + po.date, + po.expected_date, + po.status, + po.notes, + po.ordered, + po.cost_price + ]); + + // Execute batch insert + await localConnection.query(` + INSERT INTO temp_purchase_orders ( + po_id, pid, sku, name, vendor, date, expected_date, + status, notes, ordered, cost_price + ) + VALUES ${placeholders} + ON CONFLICT (po_id, pid) DO UPDATE SET + sku = EXCLUDED.sku, + name = EXCLUDED.name, + vendor = EXCLUDED.vendor, + date = EXCLUDED.date, + expected_date = EXCLUDED.expected_date, + status = EXCLUDED.status, + notes = EXCLUDED.notes, + ordered = EXCLUDED.ordered, + cost_price = EXCLUDED.cost_price + `, values); + + totalProcessed += batch.length; + + outputProgress({ + status: "running", + operation: "Purchase orders import", + message: `Processed ${totalProcessed}/${total} purchase order items`, + current: totalProcessed, + total: total, + elapsed: formatElapsedTime((Date.now() - startTime) / 1000), + remaining: estimateRemaining(startTime, totalProcessed, total), + rate: calculateRate(startTime, totalProcessed) + }); } // Update offset for next batch @@ -220,6 +221,9 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental // Clean up temporary tables await localConnection.query(`DROP TABLE IF EXISTS temp_purchase_orders;`); + + // Commit transaction + await localConnection.commit(); return { status: "complete", @@ -230,11 +234,11 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental } catch (error) { console.error("Error during purchase orders import:", error); - // Attempt cleanup on error + // Rollback transaction try { - await localConnection.query(`DROP TABLE IF EXISTS temp_purchase_orders;`); - } catch (cleanupError) { - console.error('Error during cleanup:', cleanupError.message); + await localConnection.rollback(); + } catch (rollbackError) { + console.error('Error during rollback:', rollbackError.message); } return {