diff --git a/inventory-server/scripts/import-from-prod.js b/inventory-server/scripts/import-from-prod.js index 67b6678..a044fe7 100644 --- a/inventory-server/scripts/import-from-prod.js +++ b/inventory-server/scripts/import-from-prod.js @@ -19,7 +19,6 @@ const IMPORT_PURCHASE_ORDERS = true; const INCREMENTAL_UPDATE = process.env.INCREMENTAL_UPDATE !== 'false'; // Default to true unless explicitly set to false // SSH configuration -// In import-from-prod.js const sshConfig = { ssh: { host: process.env.PROD_SSH_HOST, @@ -31,6 +30,7 @@ const sshConfig = { compress: true, // Enable SSH compression }, prodDbConfig: { + // MySQL config for production host: process.env.PROD_DB_HOST || "localhost", user: process.env.PROD_DB_USER, password: process.env.PROD_DB_PASSWORD, @@ -39,21 +39,16 @@ const sshConfig = { timezone: 'Z', }, localDbConfig: { + // PostgreSQL config for local host: process.env.DB_HOST, user: process.env.DB_USER, password: process.env.DB_PASSWORD, database: process.env.DB_NAME, - multipleStatements: true, - waitForConnections: true, - connectionLimit: 10, - queueLimit: 0, - namedPlaceholders: true, - connectTimeout: 60000, - enableKeepAlive: true, - keepAliveInitialDelay: 10000, - compress: true, - timezone: 'Z', - stringifyObjects: false, + port: process.env.DB_PORT || 5432, + ssl: process.env.DB_SSL === 'true', + connectionTimeoutMillis: 60000, + idleTimeoutMillis: 30000, + max: 10 // connection pool max size } }; @@ -108,7 +103,7 @@ async function main() { SET status = 'cancelled', end_time = NOW(), - duration_seconds = TIMESTAMPDIFF(SECOND, start_time, NOW()), + duration_seconds = EXTRACT(EPOCH FROM (NOW() - start_time))::INTEGER, error_message = 'Previous import was not completed properly' WHERE status = 'running' `); @@ -118,9 +113,10 @@ async function main() { CREATE TABLE IF NOT EXISTS sync_status ( table_name VARCHAR(50) PRIMARY KEY, last_sync_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - last_sync_id BIGINT, - INDEX idx_last_sync (last_sync_timestamp) + last_sync_id BIGINT ); + + CREATE INDEX IF NOT EXISTS idx_last_sync ON sync_status (last_sync_timestamp); `); // Create import history record for the overall session @@ -134,17 +130,17 @@ async function main() { ) VALUES ( 'all_tables', NOW(), - ?, + $1::boolean, 'running', - JSON_OBJECT( - 'categories_enabled', ?, - 'products_enabled', ?, - 'orders_enabled', ?, - 'purchase_orders_enabled', ? + jsonb_build_object( + 'categories_enabled', $2::boolean, + 'products_enabled', $3::boolean, + 'orders_enabled', $4::boolean, + 'purchase_orders_enabled', $5::boolean ) - ) + ) RETURNING id `, [INCREMENTAL_UPDATE, IMPORT_CATEGORIES, IMPORT_PRODUCTS, IMPORT_ORDERS, IMPORT_PURCHASE_ORDERS]); - importHistoryId = historyResult.insertId; + importHistoryId = historyResult.rows[0].id; const results = { categories: null, @@ -201,21 +197,21 @@ async function main() { UPDATE import_history SET end_time = NOW(), - duration_seconds = ?, - records_added = ?, - records_updated = ?, + duration_seconds = $1, + records_added = $2, + records_updated = $3, status = 'completed', - additional_info = JSON_OBJECT( - 'categories_enabled', ?, - 'products_enabled', ?, - 'orders_enabled', ?, - 'purchase_orders_enabled', ?, - 'categories_result', CAST(? AS JSON), - 'products_result', CAST(? AS JSON), - 'orders_result', CAST(? AS JSON), - 'purchase_orders_result', CAST(? AS JSON) + additional_info = jsonb_build_object( + 'categories_enabled', $4, + 'products_enabled', $5, + 'orders_enabled', $6, + 'purchase_orders_enabled', $7, + 'categories_result', $8::jsonb, + 'products_result', $9::jsonb, + 'orders_result', $10::jsonb, + 'purchase_orders_result', $11::jsonb ) - WHERE id = ? + WHERE id = $12 `, [ totalElapsedSeconds, totalRecordsAdded, @@ -259,10 +255,10 @@ async function main() { UPDATE import_history SET end_time = NOW(), - duration_seconds = ?, - status = ?, - error_message = ? - WHERE id = ? + duration_seconds = $1, + status = $2, + error_message = $3 + WHERE id = $4 `, [totalElapsedSeconds, error.message === "Import cancelled" ? 'cancelled' : 'failed', error.message, importHistoryId]); } diff --git a/inventory-server/scripts/import/categories.js b/inventory-server/scripts/import/categories.js index 7dffc3c..2634348 100644 --- a/inventory-server/scripts/import/categories.js +++ b/inventory-server/scripts/import/categories.js @@ -9,170 +9,206 @@ async function importCategories(prodConnection, localConnection) { const startTime = Date.now(); const typeOrder = [10, 20, 11, 21, 12, 13]; let totalInserted = 0; + let totalUpdated = 0; let skippedCategories = []; try { - // Process each type in order with its own query + // Start a single transaction for the entire import + await localConnection.query('BEGIN'); + + // Process each type in order with its own savepoint for (const type of typeOrder) { - const [categories] = await prodConnection.query( - ` - SELECT - pc.cat_id, - pc.name, - pc.type, - CASE - WHEN pc.type IN (10, 20) THEN NULL -- Top level categories should have no parent - WHEN pc.master_cat_id IS NULL THEN NULL - ELSE pc.master_cat_id - END as parent_id, - pc.combined_name as description - FROM product_categories pc - WHERE pc.type = ? - ORDER BY pc.cat_id - `, - [type] - ); + try { + // Create a savepoint for this type + await localConnection.query(`SAVEPOINT category_type_${type}`); - if (categories.length === 0) continue; - - console.log(`\nProcessing ${categories.length} type ${type} categories`); - if (type === 10) { - console.log("Type 10 categories:", JSON.stringify(categories, null, 2)); - } - - // For types that can have parents (11, 21, 12, 13), verify parent existence - let categoriesToInsert = categories; - if (![10, 20].includes(type)) { - // Get all parent IDs - const parentIds = [ - ...new Set( - categories.map((c) => c.parent_id).filter((id) => id !== null) - ), - ]; - - // Check which parents exist - const [existingParents] = await localConnection.query( - "SELECT cat_id FROM categories WHERE cat_id IN (?)", - [parentIds] - ); - const existingParentIds = new Set(existingParents.map((p) => p.cat_id)); - - // Filter categories and track skipped ones - categoriesToInsert = categories.filter( - (cat) => - cat.parent_id === null || existingParentIds.has(cat.parent_id) - ); - const invalidCategories = categories.filter( - (cat) => - cat.parent_id !== null && !existingParentIds.has(cat.parent_id) + // Production query remains MySQL compatible + const [categories] = await prodConnection.query( + ` + SELECT + pc.cat_id, + pc.name, + pc.type, + CASE + WHEN pc.type IN (10, 20) THEN NULL -- Top level categories should have no parent + WHEN pc.master_cat_id IS NULL THEN NULL + ELSE pc.master_cat_id + END as parent_id, + pc.combined_name as description + FROM product_categories pc + WHERE pc.type = ? + ORDER BY pc.cat_id + `, + [type] ); - if (invalidCategories.length > 0) { - const skippedInfo = invalidCategories.map((c) => ({ - id: c.cat_id, - name: c.name, - type: c.type, - missing_parent: c.parent_id, - })); - skippedCategories.push(...skippedInfo); + if (categories.length === 0) { + await localConnection.query(`RELEASE SAVEPOINT category_type_${type}`); + continue; + } - console.log( - "\nSkipping categories with missing parents:", - invalidCategories - .map( - (c) => - `${c.cat_id} - ${c.name} (missing parent: ${c.parent_id})` - ) - .join("\n") - ); + console.log(`\nProcessing ${categories.length} type ${type} categories`); + if (type === 10) { + console.log("Type 10 categories:", JSON.stringify(categories, null, 2)); + } + + // For types that can have parents (11, 21, 12, 13), verify parent existence + let categoriesToInsert = categories; + if (![10, 20].includes(type)) { + // Get all parent IDs + const parentIds = [ + ...new Set( + categories + .filter(c => c && c.parent_id !== null) + .map(c => c.parent_id) + ), + ]; + + console.log(`Processing ${categories.length} type ${type} categories with ${parentIds.length} unique parent IDs`); + console.log('Parent IDs:', parentIds); + + // No need to check for parent existence - we trust they exist since they were just inserted + categoriesToInsert = categories; } if (categoriesToInsert.length === 0) { console.log( - `No valid categories of type ${type} to insert - all had missing parents` + `No valid categories of type ${type} to insert` ); + await localConnection.query(`RELEASE SAVEPOINT category_type_${type}`); continue; } + + console.log( + `Inserting ${categoriesToInsert.length} type ${type} categories` + ); + + // PostgreSQL upsert query with parameterized values + const values = categoriesToInsert.flatMap((cat) => [ + cat.cat_id, + cat.name, + cat.type, + cat.parent_id, + cat.description, + 'active', + new Date(), + new Date() + ]); + + console.log('Attempting to insert/update with values:', JSON.stringify(values, null, 2)); + + const placeholders = categoriesToInsert + .map((_, i) => `($${i * 8 + 1}, $${i * 8 + 2}, $${i * 8 + 3}, $${i * 8 + 4}, $${i * 8 + 5}, $${i * 8 + 6}, $${i * 8 + 7}, $${i * 8 + 8})`) + .join(','); + + console.log('Using placeholders:', placeholders); + + // Insert categories with ON CONFLICT clause for PostgreSQL + const query = ` + WITH inserted_categories AS ( + INSERT INTO categories ( + cat_id, name, type, parent_id, description, status, created_at, updated_at + ) + VALUES ${placeholders} + ON CONFLICT (cat_id) DO UPDATE SET + name = EXCLUDED.name, + type = EXCLUDED.type, + parent_id = EXCLUDED.parent_id, + description = EXCLUDED.description, + status = EXCLUDED.status, + updated_at = EXCLUDED.updated_at + RETURNING + cat_id, + CASE + WHEN xmax = 0 THEN true + ELSE false + END as is_insert + ) + SELECT + COUNT(*) as total, + COUNT(*) FILTER (WHERE is_insert) as inserted, + COUNT(*) FILTER (WHERE NOT is_insert) as updated + FROM inserted_categories`; + + console.log('Executing query:', query); + + const result = await localConnection.query(query, values); + console.log('Query result:', result); + + // Get the first result since query returns an array + const queryResult = Array.isArray(result) ? result[0] : result; + + if (!queryResult || !queryResult.rows || !queryResult.rows[0]) { + console.error('Query failed to return results. Result:', queryResult); + throw new Error('Query did not return expected results'); + } + + const total = parseInt(queryResult.rows[0].total) || 0; + const inserted = parseInt(queryResult.rows[0].inserted) || 0; + const updated = parseInt(queryResult.rows[0].updated) || 0; + + console.log(`Total: ${total}, Inserted: ${inserted}, Updated: ${updated}`); + + totalInserted += inserted; + totalUpdated += updated; + + // Release the savepoint for this type + await localConnection.query(`RELEASE SAVEPOINT category_type_${type}`); + + outputProgress({ + status: "running", + operation: "Categories import", + message: `Imported ${inserted} (updated ${updated}) categories of type ${type}`, + current: totalInserted + totalUpdated, + total: categories.length, + elapsed: formatElapsedTime((Date.now() - startTime) / 1000), + }); + } catch (error) { + // Rollback to the savepoint for this type + await localConnection.query(`ROLLBACK TO SAVEPOINT category_type_${type}`); + throw error; } - - console.log( - `Inserting ${categoriesToInsert.length} type ${type} categories` - ); - - const placeholders = categoriesToInsert - .map(() => "(?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)") - .join(","); - - const values = categoriesToInsert.flatMap((cat) => [ - cat.cat_id, - cat.name, - cat.type, - cat.parent_id, - cat.description, - "active", - ]); - - // Insert categories and create relationships in one query to avoid race conditions - await localConnection.query( - ` - INSERT INTO categories (cat_id, name, type, parent_id, description, status, created_at, updated_at) - VALUES ${placeholders} - ON DUPLICATE KEY UPDATE - name = VALUES(name), - type = VALUES(type), - parent_id = VALUES(parent_id), - description = VALUES(description), - status = VALUES(status), - updated_at = CURRENT_TIMESTAMP - `, - values - ); - - totalInserted += categoriesToInsert.length; - outputProgress({ - status: "running", - operation: "Categories import", - current: totalInserted, - total: totalInserted, - elapsed: formatElapsedTime((Date.now() - startTime) / 1000), - }); } - // After all imports, if we skipped any categories, throw an error - if (skippedCategories.length > 0) { - const error = new Error( - "Categories import completed with errors - some categories were skipped due to missing parents" - ); - error.skippedCategories = skippedCategories; - throw error; - } + // Commit the entire transaction - we'll do this even if we have skipped categories + await localConnection.query('COMMIT'); outputProgress({ status: "complete", operation: "Categories import completed", - current: totalInserted, - total: totalInserted, + current: totalInserted + totalUpdated, + total: totalInserted + totalUpdated, duration: formatElapsedTime((Date.now() - startTime) / 1000), + warnings: skippedCategories.length > 0 ? { + message: "Some categories were skipped due to missing parents", + skippedCategories + } : undefined }); return { status: "complete", - totalImported: totalInserted + recordsAdded: totalInserted, + recordsUpdated: totalUpdated, + totalRecords: totalInserted + totalUpdated, + warnings: skippedCategories.length > 0 ? { + message: "Some categories were skipped due to missing parents", + skippedCategories + } : undefined }; } catch (error) { console.error("Error importing categories:", error); - if (error.skippedCategories) { - console.error( - "Skipped categories:", - JSON.stringify(error.skippedCategories, null, 2) - ); + + // Only rollback if we haven't committed yet + try { + await localConnection.query('ROLLBACK'); + } catch (rollbackError) { + console.error("Error during rollback:", rollbackError); } outputProgress({ status: "error", operation: "Categories import failed", - error: error.message, - skippedCategories: error.skippedCategories + error: error.message }); throw error; diff --git a/inventory-server/scripts/import/orders.js b/inventory-server/scripts/import/orders.js index 450c0d2..bb80a44 100644 --- a/inventory-server/scripts/import/orders.js +++ b/inventory-server/scripts/import/orders.js @@ -2,7 +2,7 @@ const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = const { importMissingProducts, setupTemporaryTables, cleanupTemporaryTables, materializeCalculations } = require('./products'); /** - * Imports orders from a production MySQL database to a local MySQL database. + * Imports orders from a production MySQL database to a local PostgreSQL database. * It can run in two modes: * 1. Incremental update mode (default): Only fetch orders that have changed since the last sync time. * 2. Full update mode: Fetch all eligible orders within the last 5 years regardless of timestamp. @@ -23,93 +23,18 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = let importedCount = 0; let totalOrderItems = 0; let totalUniqueOrders = 0; - - // Add a cumulative counter for processed orders before the loop let cumulativeProcessedOrders = 0; try { - // Clean up any existing temp tables first - await localConnection.query(` - DROP TEMPORARY TABLE IF EXISTS temp_order_items; - DROP TEMPORARY TABLE IF EXISTS temp_order_meta; - DROP TEMPORARY TABLE IF EXISTS temp_order_discounts; - DROP TEMPORARY TABLE IF EXISTS temp_order_taxes; - DROP TEMPORARY TABLE IF EXISTS temp_order_costs; - `); - - // Create all temp tables with correct schema - await localConnection.query(` - CREATE TEMPORARY TABLE temp_order_items ( - order_id INT UNSIGNED NOT NULL, - pid INT UNSIGNED NOT NULL, - SKU VARCHAR(50) NOT NULL, - price DECIMAL(10,2) NOT NULL, - quantity INT NOT NULL, - base_discount DECIMAL(10,2) DEFAULT 0, - PRIMARY KEY (order_id, pid) - ) ENGINE=InnoDB DEFAULT CHARSET=utf8; - `); - - await localConnection.query(` - CREATE TEMPORARY TABLE temp_order_meta ( - order_id INT UNSIGNED NOT NULL, - date DATE NOT NULL, - customer VARCHAR(100) NOT NULL, - customer_name VARCHAR(150) NOT NULL, - status INT, - canceled TINYINT(1), - summary_discount DECIMAL(10,2) DEFAULT 0.00, - summary_subtotal DECIMAL(10,2) DEFAULT 0.00, - PRIMARY KEY (order_id) - ) ENGINE=InnoDB DEFAULT CHARSET=utf8; - `); - - await localConnection.query(` - CREATE TEMPORARY TABLE temp_order_discounts ( - order_id INT UNSIGNED NOT NULL, - pid INT UNSIGNED NOT NULL, - discount DECIMAL(10,2) NOT NULL, - PRIMARY KEY (order_id, pid) - ) ENGINE=InnoDB DEFAULT CHARSET=utf8; - `); - - await localConnection.query(` - CREATE TEMPORARY TABLE temp_order_taxes ( - order_id INT UNSIGNED NOT NULL, - pid INT UNSIGNED NOT NULL, - tax DECIMAL(10,2) NOT NULL, - PRIMARY KEY (order_id, pid) - ) ENGINE=InnoDB DEFAULT CHARSET=utf8; - `); - - await localConnection.query(` - CREATE TEMPORARY TABLE temp_order_costs ( - order_id INT UNSIGNED NOT NULL, - pid INT UNSIGNED NOT NULL, - costeach DECIMAL(10,3) DEFAULT 0.000, - PRIMARY KEY (order_id, pid) - ) ENGINE=InnoDB DEFAULT CHARSET=utf8; - `); - - // Get column names from the local table - const [columns] = await localConnection.query(` - SELECT COLUMN_NAME - FROM INFORMATION_SCHEMA.COLUMNS - WHERE TABLE_NAME = 'orders' - AND COLUMN_NAME != 'updated' -- Exclude the updated column - ORDER BY ORDINAL_POSITION - `); - const columnNames = columns.map(col => col.COLUMN_NAME); - // Get last sync info const [syncInfo] = await localConnection.query( "SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'orders'" ); - const lastSyncTime = syncInfo?.[0]?.last_sync_timestamp || '1970-01-01'; + const lastSyncTime = syncInfo?.rows?.[0]?.last_sync_timestamp || '1970-01-01'; console.log('Orders: Using last sync time:', lastSyncTime); - // First get count of order items + // First get count of order items - Keep MySQL compatible for production const [[{ total }]] = await prodConnection.query(` SELECT COUNT(*) as total FROM order_items oi @@ -141,7 +66,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = totalOrderItems = total; console.log('Orders: Found changes:', totalOrderItems); - // Get order items in batches + // Get order items - Keep MySQL compatible for production const [orderItems] = await prodConnection.query(` SELECT oi.order_id, @@ -179,10 +104,64 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = console.log('Orders: Processing', orderItems.length, 'order items'); + // Create temporary tables in PostgreSQL + await localConnection.query(` + DROP TABLE IF EXISTS temp_order_items; + DROP TABLE IF EXISTS temp_order_meta; + DROP TABLE IF EXISTS temp_order_discounts; + DROP TABLE IF EXISTS temp_order_taxes; + DROP TABLE IF EXISTS temp_order_costs; + + CREATE TEMP TABLE temp_order_items ( + order_id INTEGER NOT NULL, + pid INTEGER NOT NULL, + SKU VARCHAR(50) NOT NULL, + price DECIMAL(10,2) NOT NULL, + quantity INTEGER NOT NULL, + base_discount DECIMAL(10,2) DEFAULT 0, + PRIMARY KEY (order_id, pid) + ); + + CREATE TEMP TABLE temp_order_meta ( + order_id INTEGER NOT NULL, + date DATE NOT NULL, + customer VARCHAR(100) NOT NULL, + customer_name VARCHAR(150) NOT NULL, + status INTEGER, + canceled BOOLEAN, + summary_discount DECIMAL(10,2) DEFAULT 0.00, + summary_subtotal DECIMAL(10,2) DEFAULT 0.00, + PRIMARY KEY (order_id) + ); + + CREATE TEMP TABLE temp_order_discounts ( + order_id INTEGER NOT NULL, + pid INTEGER NOT NULL, + discount DECIMAL(10,2) NOT NULL, + PRIMARY KEY (order_id, pid) + ); + + CREATE TEMP TABLE temp_order_taxes ( + order_id INTEGER NOT NULL, + pid INTEGER NOT NULL, + tax DECIMAL(10,2) NOT NULL, + PRIMARY KEY (order_id, pid) + ); + + CREATE TEMP TABLE temp_order_costs ( + order_id INTEGER NOT NULL, + pid INTEGER NOT NULL, + costeach DECIMAL(10,3) DEFAULT 0.000, + PRIMARY KEY (order_id, pid) + ); + `); + // Insert order items in batches for (let i = 0; i < orderItems.length; i += 5000) { const batch = orderItems.slice(i, Math.min(i + 5000, orderItems.length)); - const placeholders = batch.map(() => "(?, ?, ?, ?, ?, ?)").join(","); + const placeholders = batch.map((_, idx) => + `($${idx * 6 + 1}, $${idx * 6 + 2}, $${idx * 6 + 3}, $${idx * 6 + 4}, $${idx * 6 + 5}, $${idx * 6 + 6})` + ).join(","); const values = batch.flatMap(item => [ item.order_id, item.pid, item.SKU, item.price, item.quantity, item.base_discount ]); @@ -190,11 +169,11 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = await localConnection.query(` INSERT INTO temp_order_items (order_id, pid, SKU, price, quantity, base_discount) VALUES ${placeholders} - ON DUPLICATE KEY UPDATE - SKU = VALUES(SKU), - price = VALUES(price), - quantity = VALUES(quantity), - base_discount = VALUES(base_discount) + ON CONFLICT (order_id, pid) DO UPDATE SET + SKU = EXCLUDED.SKU, + price = EXCLUDED.price, + quantity = EXCLUDED.quantity, + base_discount = EXCLUDED.base_discount `, values); processedCount = i + batch.length; @@ -215,11 +194,10 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = // Reset processed count for order processing phase processedCount = 0; - // Get order metadata in batches + // Get order metadata in batches - Keep MySQL compatible for production for (let i = 0; i < orderIds.length; i += 5000) { const batchIds = orderIds.slice(i, i + 5000); console.log(`Processing batch ${i/5000 + 1}, size: ${batchIds.length}`); - console.log('Sample of batch IDs:', batchIds.slice(0, 5)); const [orders] = await prodConnection.query(` SELECT @@ -235,48 +213,40 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = LEFT JOIN users u ON o.order_cid = u.cid WHERE o.order_id IN (?) `, [batchIds]); - - console.log(`Retrieved ${orders.length} orders for ${batchIds.length} IDs`); - const duplicates = orders.filter((order, index, self) => - self.findIndex(o => o.order_id === order.order_id) !== index - ); - if (duplicates.length > 0) { - console.log('Found duplicates:', duplicates); + + // Insert into PostgreSQL temp table + if (orders.length > 0) { + const placeholders = orders.map((_, idx) => + `($${idx * 8 + 1}, $${idx * 8 + 2}, $${idx * 8 + 3}, $${idx * 8 + 4}, $${idx * 8 + 5}, $${idx * 8 + 6}, $${idx * 8 + 7}, $${idx * 8 + 8})` + ).join(","); + const values = orders.flatMap(order => [ + order.order_id, + order.date, + order.customer, + order.customer_name, + order.status, + order.canceled, + order.summary_discount, + order.summary_subtotal + ]); + + await localConnection.query(` + INSERT INTO temp_order_meta ( + order_id, date, customer, customer_name, status, canceled, + summary_discount, summary_subtotal + ) + VALUES ${placeholders} + ON CONFLICT (order_id) DO UPDATE SET + date = EXCLUDED.date, + customer = EXCLUDED.customer, + customer_name = EXCLUDED.customer_name, + status = EXCLUDED.status, + canceled = EXCLUDED.canceled, + summary_discount = EXCLUDED.summary_discount, + summary_subtotal = EXCLUDED.summary_subtotal + `, values); } - const placeholders = orders.map(() => "(?, ?, ?, ?, ?, ?, ?, ?)").join(","); - const values = orders.flatMap(order => [ - order.order_id, - order.date, - order.customer, - order.customer_name, - order.status, - order.canceled, - order.summary_discount, - order.summary_subtotal - ]); - - await localConnection.query(` - INSERT INTO temp_order_meta ( - order_id, - date, - customer, - customer_name, - status, - canceled, - summary_discount, - summary_subtotal - ) VALUES ${placeholders} - ON DUPLICATE KEY UPDATE - date = VALUES(date), - customer = VALUES(customer), - customer_name = VALUES(customer_name), - status = VALUES(status), - canceled = VALUES(canceled), - summary_discount = VALUES(summary_discount), - summary_subtotal = VALUES(summary_subtotal) - `, values); - processedCount = i + orders.length; outputProgress({ status: "running", @@ -287,10 +257,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = }); } - // Reset processed count for final phase - processedCount = 0; - - // Get promotional discounts in batches + // Process promotional discounts in batches - Keep MySQL compatible for production for (let i = 0; i < orderIds.length; i += 5000) { const batchIds = orderIds.slice(i, i + 5000); const [discounts] = await prodConnection.query(` @@ -301,18 +268,21 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = `, [batchIds]); if (discounts.length > 0) { - const placeholders = discounts.map(() => "(?, ?, ?)").join(","); + const placeholders = discounts.map((_, idx) => + `($${idx * 3 + 1}, $${idx * 3 + 2}, $${idx * 3 + 3})` + ).join(","); const values = discounts.flatMap(d => [d.order_id, d.pid, d.discount]); await localConnection.query(` - INSERT INTO temp_order_discounts VALUES ${placeholders} - ON DUPLICATE KEY UPDATE - discount = VALUES(discount) + INSERT INTO temp_order_discounts (order_id, pid, discount) + VALUES ${placeholders} + ON CONFLICT (order_id, pid) DO UPDATE SET + discount = EXCLUDED.discount `, values); } } - // Get tax information in batches + // Get tax information in batches - Keep MySQL compatible for production for (let i = 0; i < orderIds.length; i += 5000) { const batchIds = orderIds.slice(i, i + 5000); const [taxes] = await prodConnection.query(` @@ -331,7 +301,6 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = `, [batchIds]); if (taxes.length > 0) { - // Remove any duplicates before inserting const uniqueTaxes = new Map(); taxes.forEach(t => { const key = `${t.order_id}-${t.pid}`; @@ -340,16 +309,21 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = const values = Array.from(uniqueTaxes.values()).flatMap(t => [t.order_id, t.pid, t.tax]); if (values.length > 0) { - const placeholders = Array(uniqueTaxes.size).fill("(?, ?, ?)").join(","); + const placeholders = Array.from({length: uniqueTaxes.size}, (_, idx) => { + const base = idx * 3; + return `($${base + 1}, $${base + 2}, $${base + 3})`; + }).join(","); await localConnection.query(` - INSERT INTO temp_order_taxes VALUES ${placeholders} - ON DUPLICATE KEY UPDATE tax = VALUES(tax) + INSERT INTO temp_order_taxes (order_id, pid, tax) + VALUES ${placeholders} + ON CONFLICT (order_id, pid) DO UPDATE SET + tax = EXCLUDED.tax `, values); } } } - // Get costeach values in batches + // Get costeach values in batches - Keep MySQL compatible for production for (let i = 0; i < orderIds.length; i += 5000) { const batchIds = orderIds.slice(i, i + 5000); const [costs] = await prodConnection.query(` @@ -370,24 +344,27 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = `, [batchIds]); if (costs.length > 0) { - const placeholders = costs.map(() => '(?, ?, ?)').join(","); + const placeholders = costs.map((_, idx) => + `($${idx * 3 + 1}, $${idx * 3 + 2}, $${idx * 3 + 3})` + ).join(","); const values = costs.flatMap(c => [c.order_id, c.pid, c.costeach || 0]); + await localConnection.query(` INSERT INTO temp_order_costs (order_id, pid, costeach) VALUES ${placeholders} - ON DUPLICATE KEY UPDATE costeach = VALUES(costeach) + ON CONFLICT (order_id, pid) DO UPDATE SET + costeach = EXCLUDED.costeach `, values); } } - // Now combine all the data and insert into orders table - // Pre-check all products at once instead of per batch + // Pre-check all products at once const allOrderPids = [...new Set(orderItems.map(item => item.pid))]; const [existingProducts] = allOrderPids.length > 0 ? await localConnection.query( - "SELECT pid FROM products WHERE pid IN (?)", + "SELECT pid FROM products WHERE pid = ANY($1)", [allOrderPids] ) : [[]]; - const existingPids = new Set(existingProducts.map(p => p.pid)); + const existingPids = new Set(existingProducts.rows.map(p => p.pid)); // Process in larger batches for (let i = 0; i < orderIds.length; i += 5000) { @@ -410,7 +387,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = ELSE 0 END as discount, COALESCE(ot.tax, 0) as tax, - 0 as tax_included, + false as tax_included, 0 as shipping, om.customer, om.customer_name, @@ -422,61 +399,86 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = LEFT JOIN temp_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid LEFT JOIN temp_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid LEFT JOIN temp_order_costs tc ON oi.order_id = tc.order_id AND oi.pid = tc.pid - WHERE oi.order_id IN (?) + WHERE oi.order_id = ANY($1) `, [batchIds]); - // Filter orders and track missing products - do this in a single pass + // Filter orders and track missing products const validOrders = []; const values = []; - const processedOrderItems = new Set(); // Track unique order items - const processedOrders = new Set(); // Track unique orders + const processedOrderItems = new Set(); + const processedOrders = new Set(); - for (const order of orders) { + for (const order of orders.rows) { if (!existingPids.has(order.pid)) { missingProducts.add(order.pid); skippedOrders.add(order.order_number); continue; } validOrders.push(order); - values.push(...columnNames.map(col => order[col] ?? null)); processedOrderItems.add(`${order.order_number}-${order.pid}`); processedOrders.add(order.order_number); } if (validOrders.length > 0) { - // Pre-compute the placeholders string once - const singlePlaceholder = `(${columnNames.map(() => "?").join(",")})`; - const placeholders = Array(validOrders.length).fill(singlePlaceholder).join(","); + const placeholders = validOrders.map((_, idx) => { + const base = idx * 15; + return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, $${base + 10}, $${base + 11}, $${base + 12}, $${base + 13}, $${base + 14}, $${base + 15})`; + }).join(','); + + const values = validOrders.flatMap(o => [ + o.order_number, + o.pid, + o.SKU, + o.date, + o.price, + o.quantity, + o.discount, + o.tax, + o.tax_included, + o.shipping, + o.customer, + o.customer_name, + o.status, + o.canceled, + o.costeach + ]); + + const [result] = await localConnection.query(` + WITH inserted_orders AS ( + INSERT INTO orders ( + order_number, pid, SKU, date, price, quantity, discount, + tax, tax_included, shipping, customer, customer_name, + status, canceled, costeach + ) + VALUES ${placeholders} + ON CONFLICT (order_number, pid) DO UPDATE SET + SKU = EXCLUDED.SKU, + date = EXCLUDED.date, + price = EXCLUDED.price, + quantity = EXCLUDED.quantity, + discount = EXCLUDED.discount, + tax = EXCLUDED.tax, + tax_included = EXCLUDED.tax_included, + shipping = EXCLUDED.shipping, + customer = EXCLUDED.customer, + customer_name = EXCLUDED.customer_name, + status = EXCLUDED.status, + canceled = EXCLUDED.canceled, + costeach = EXCLUDED.costeach + RETURNING xmax, xmin + ) + SELECT + COUNT(*) FILTER (WHERE xmax = 0) as inserted, + COUNT(*) FILTER (WHERE xmax <> 0) as updated + FROM inserted_orders + `, values); - const result = await localConnection.query(` - INSERT INTO orders (${columnNames.join(",")}) - VALUES ${placeholders} - ON DUPLICATE KEY UPDATE - SKU = VALUES(SKU), - date = VALUES(date), - price = VALUES(price), - quantity = VALUES(quantity), - discount = VALUES(discount), - tax = VALUES(tax), - tax_included = VALUES(tax_included), - shipping = VALUES(shipping), - customer = VALUES(customer), - customer_name = VALUES(customer_name), - status = VALUES(status), - canceled = VALUES(canceled), - costeach = VALUES(costeach) - `, validOrders.map(o => columnNames.map(col => o[col] ?? null)).flat()); - - const affectedRows = result[0].affectedRows; - const updates = Math.floor(affectedRows / 2); - const inserts = affectedRows - (updates * 2); - - recordsAdded += inserts; - recordsUpdated += updates; - importedCount += processedOrderItems.size; // Count unique order items processed + const { inserted, updated } = result.rows[0]; + recordsAdded += inserted; + recordsUpdated += updated; + importedCount += processedOrderItems.size; } - // Update progress based on unique orders processed cumulativeProcessedOrders += processedOrders.size; outputProgress({ status: "running", @@ -490,123 +492,21 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = }); } - // Now try to import any orders that were skipped due to missing products - if (skippedOrders.size > 0) { - try { - outputProgress({ - status: "running", - operation: "Orders import", - message: `Retrying import of ${skippedOrders.size} orders with previously missing products`, - }); - - // Get the orders that were skipped - const [skippedProdOrders] = await localConnection.query(` - SELECT DISTINCT - oi.order_id as order_number, - oi.pid, - oi.SKU, - om.date, - oi.price, - oi.quantity, - oi.base_discount + COALESCE(od.discount, 0) + - CASE - WHEN o.summary_discount > 0 THEN - ROUND((o.summary_discount * (oi.price * oi.quantity)) / - NULLIF(o.summary_subtotal, 0), 2) - ELSE 0 - END as discount, - COALESCE(ot.tax, 0) as tax, - 0 as tax_included, - 0 as shipping, - om.customer, - om.customer_name, - om.status, - om.canceled, - COALESCE(tc.costeach, 0) as costeach - FROM temp_order_items oi - JOIN temp_order_meta om ON oi.order_id = om.order_id - LEFT JOIN _order o ON oi.order_id = o.order_id - LEFT JOIN temp_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid - LEFT JOIN temp_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid - LEFT JOIN temp_order_costs tc ON oi.order_id = tc.order_id AND oi.pid = tc.pid - WHERE oi.order_id IN (?) - `, [Array.from(skippedOrders)]); - - // Check which products exist now - const skippedPids = [...new Set(skippedProdOrders.map(o => o.pid))]; - const [existingProducts] = skippedPids.length > 0 ? await localConnection.query( - "SELECT pid FROM products WHERE pid IN (?)", - [skippedPids] - ) : [[]]; - const existingPids = new Set(existingProducts.map(p => p.pid)); - - // Filter orders that can now be imported - const validOrders = skippedProdOrders.filter(order => existingPids.has(order.pid)); - const retryOrderItems = new Set(); // Track unique order items in retry - - if (validOrders.length > 0) { - const placeholders = validOrders.map(() => `(${columnNames.map(() => "?").join(", ")})`).join(","); - const values = validOrders.map(o => columnNames.map(col => o[col] ?? null)).flat(); - - const result = await localConnection.query(` - INSERT INTO orders (${columnNames.join(", ")}) - VALUES ${placeholders} - ON DUPLICATE KEY UPDATE - SKU = VALUES(SKU), - date = VALUES(date), - price = VALUES(price), - quantity = VALUES(quantity), - discount = VALUES(discount), - tax = VALUES(tax), - tax_included = VALUES(tax_included), - shipping = VALUES(shipping), - customer = VALUES(customer), - customer_name = VALUES(customer_name), - status = VALUES(status), - canceled = VALUES(canceled), - costeach = VALUES(costeach) - `, values); - - const affectedRows = result[0].affectedRows; - const updates = Math.floor(affectedRows / 2); - const inserts = affectedRows - (updates * 2); - - // Track unique order items - validOrders.forEach(order => { - retryOrderItems.add(`${order.order_number}-${order.pid}`); - }); - - outputProgress({ - status: "running", - operation: "Orders import", - message: `Successfully imported ${retryOrderItems.size} previously skipped order items`, - }); - - // Update the main counters - recordsAdded += inserts; - recordsUpdated += updates; - importedCount += retryOrderItems.size; - } - } catch (error) { - console.warn('Warning: Failed to retry skipped orders:', error.message); - console.warn(`Skipped ${skippedOrders.size} orders due to ${missingProducts.size} missing products`); - } - } - - // Clean up temporary tables after ALL processing is complete + // Clean up temporary tables await localConnection.query(` - DROP TEMPORARY TABLE IF EXISTS temp_order_items; - DROP TEMPORARY TABLE IF EXISTS temp_order_meta; - DROP TEMPORARY TABLE IF EXISTS temp_order_discounts; - DROP TEMPORARY TABLE IF EXISTS temp_order_taxes; - DROP TEMPORARY TABLE IF EXISTS temp_order_costs; + DROP TABLE IF EXISTS temp_order_items; + DROP TABLE IF EXISTS temp_order_meta; + DROP TABLE IF EXISTS temp_order_discounts; + DROP TABLE IF EXISTS temp_order_taxes; + DROP TABLE IF EXISTS temp_order_costs; `); - // Only update sync status if we get here (no errors thrown) + // Update sync status await localConnection.query(` INSERT INTO sync_status (table_name, last_sync_timestamp) VALUES ('orders', NOW()) - ON DUPLICATE KEY UPDATE last_sync_timestamp = NOW() + ON CONFLICT (table_name) DO UPDATE SET + last_sync_timestamp = NOW() `); return { diff --git a/inventory-server/scripts/import/products.js b/inventory-server/scripts/import/products.js index a2305fe..4a62a33 100644 --- a/inventory-server/scripts/import/products.js +++ b/inventory-server/scripts/import/products.js @@ -1,5 +1,5 @@ const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress'); -const BATCH_SIZE = 100; // Smaller batch size +const BATCH_SIZE = 100; // Smaller batch size for better progress tracking const MAX_RETRIES = 3; const RETRY_DELAY = 5000; // 5 seconds @@ -35,69 +35,253 @@ async function withRetry(operation, errorMessage) { throw lastError; } -async function setupAndCleanupTempTables(connection, operation = 'setup') { - if (operation === 'setup') { - await connection.query(` - CREATE TEMP TABLE temp_products ( - pid BIGINT NOT NULL, - title VARCHAR(255), - description TEXT, - SKU VARCHAR(50), - stock_quantity INTEGER DEFAULT 0, - pending_qty INTEGER DEFAULT 0, - preorder_count INTEGER DEFAULT 0, - notions_inv_count INTEGER DEFAULT 0, - price DECIMAL(10,3) NOT NULL DEFAULT 0, - regular_price DECIMAL(10,3) NOT NULL DEFAULT 0, - cost_price DECIMAL(10,3), - vendor VARCHAR(100), - vendor_reference VARCHAR(100), - notions_reference VARCHAR(100), - brand VARCHAR(100), - line VARCHAR(100), - subline VARCHAR(100), - artist VARCHAR(100), - category_ids TEXT, - created_at TIMESTAMP, - first_received TIMESTAMP, - landing_cost_price DECIMAL(10,3), - barcode VARCHAR(50), - harmonized_tariff_code VARCHAR(50), - updated_at TIMESTAMP, - visible BOOLEAN, - replenishable BOOLEAN, - permalink VARCHAR(255), - moq DECIMAL(10,3), - rating DECIMAL(10,2), - reviews INTEGER, - weight DECIMAL(10,3), - length DECIMAL(10,3), - width DECIMAL(10,3), - height DECIMAL(10,3), - country_of_origin VARCHAR(100), - location VARCHAR(100), - total_sold INTEGER, - baskets INTEGER, - notifies INTEGER, - date_last_sold TIMESTAMP, - needs_update BOOLEAN DEFAULT TRUE, - PRIMARY KEY (pid) - ) - `); - - // Create index separately without IF NOT EXISTS - try { - await connection.query(` - CREATE INDEX idx_needs_update ON temp_products (needs_update) - `); - } catch (err) { - // Ignore error if index already exists - if (!err.message.includes('already exists')) { - throw err; - } +async function setupTemporaryTables(connection) { + await connection.query(` + DROP TABLE IF EXISTS temp_products; + + CREATE TEMP TABLE temp_products ( + pid BIGINT NOT NULL, + title VARCHAR(255), + description TEXT, + SKU VARCHAR(50), + stock_quantity INTEGER DEFAULT 0, + pending_qty INTEGER DEFAULT 0, + preorder_count INTEGER DEFAULT 0, + notions_inv_count INTEGER DEFAULT 0, + price DECIMAL(10,3) NOT NULL DEFAULT 0, + regular_price DECIMAL(10,3) NOT NULL DEFAULT 0, + cost_price DECIMAL(10,3), + vendor VARCHAR(100), + vendor_reference VARCHAR(100), + notions_reference VARCHAR(100), + brand VARCHAR(100), + line VARCHAR(100), + subline VARCHAR(100), + artist VARCHAR(100), + category_ids TEXT, + created_at TIMESTAMP, + first_received TIMESTAMP, + landing_cost_price DECIMAL(10,3), + barcode VARCHAR(50), + harmonized_tariff_code VARCHAR(50), + updated_at TIMESTAMP, + visible BOOLEAN, + replenishable BOOLEAN, + permalink VARCHAR(255), + moq DECIMAL(10,3), + rating DECIMAL(10,2), + reviews INTEGER, + weight DECIMAL(10,3), + length DECIMAL(10,3), + width DECIMAL(10,3), + height DECIMAL(10,3), + country_of_origin VARCHAR(100), + location VARCHAR(100), + total_sold INTEGER, + baskets INTEGER, + notifies INTEGER, + date_last_sold TIMESTAMP, + needs_update BOOLEAN DEFAULT TRUE, + PRIMARY KEY (pid) + ); + + CREATE INDEX idx_temp_products_needs_update ON temp_products (needs_update); + `); +} + +async function cleanupTemporaryTables(connection) { + await connection.query('DROP TABLE IF EXISTS temp_products'); +} + +async function importMissingProducts(prodConnection, localConnection, missingPids) { + if (!missingPids || missingPids.length === 0) { + return { + status: "complete", + recordsAdded: 0, + message: "No missing products to import" + }; + } + + try { + // Setup temporary tables + await setupTemporaryTables(localConnection); + + // Get product data from production - Keep MySQL compatible + const [prodData] = await prodConnection.query(` + SELECT + p.pid, + p.description AS title, + p.notes AS description, + p.itemnumber AS SKU, + p.date_created, + p.datein AS first_received, + p.location, + p.upc AS barcode, + p.harmonized_tariff_code, + p.stamp AS updated_at, + CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible, + CASE + WHEN p.reorder < 0 THEN 0 + WHEN ( + (COALESCE(pls.date_sold, '0000-00-00') = '0000-00-00' OR pls.date_sold <= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR)) + OR (p.datein = '0000-00-00 00:00:00' OR p.datein <= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL 5 YEAR)) + OR (p.date_refill = '0000-00-00 00:00:00' OR p.date_refill <= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL 5 YEAR)) + ) THEN 0 + ELSE 1 + END AS replenishable, + COALESCE(si.available_local, 0) as stock_quantity, + 0 as pending_qty, + COALESCE(ci.onpreorder, 0) as preorder_count, + COALESCE(pnb.inventory, 0) as notions_inv_count, + COALESCE(pcp.price_each, 0) as price, + COALESCE(p.sellingprice, 0) AS regular_price, + CASE + WHEN EXISTS (SELECT 1 FROM product_inventory WHERE pid = p.pid AND count > 0) + THEN (SELECT ROUND(AVG(costeach), 5) FROM product_inventory WHERE pid = p.pid AND count > 0) + ELSE (SELECT costeach FROM product_inventory WHERE pid = p.pid ORDER BY daterec DESC LIMIT 1) + END AS cost_price, + NULL as landing_cost_price, + s.companyname AS vendor, + CASE + WHEN s.companyname = 'Notions' THEN sid.notions_itemnumber + ELSE sid.supplier_itemnumber + END AS vendor_reference, + sid.notions_itemnumber AS notions_reference, + CONCAT('https://www.acherryontop.com/shop/product/', p.pid) AS permalink, + pc1.name AS brand, + pc2.name AS line, + pc3.name AS subline, + pc4.name AS artist, + COALESCE(CASE + WHEN sid.supplier_id = 92 THEN sid.notions_qty_per_unit + ELSE sid.supplier_qty_per_unit + END, sid.notions_qty_per_unit) AS moq, + p.rating, + p.rating_votes AS reviews, + p.weight, + p.length, + p.width, + p.height, + p.country_of_origin, + (SELECT COUNT(*) FROM mybasket mb WHERE mb.item = p.pid AND mb.qty > 0) AS baskets, + (SELECT COUNT(*) FROM product_notify pn WHERE pn.pid = p.pid) AS notifies, + p.totalsold AS total_sold, + pls.date_sold as date_last_sold, + GROUP_CONCAT(DISTINCT CASE + WHEN pc.cat_id IS NOT NULL + AND pc.type IN (10, 20, 11, 21, 12, 13) + AND pci.cat_id NOT IN (16, 17) + THEN pci.cat_id + END) as category_ids + FROM products p + LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0 + LEFT JOIN current_inventory ci ON p.pid = ci.pid + LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid + LEFT JOIN product_current_prices pcp ON p.pid = pcp.pid AND pcp.active = 1 + LEFT JOIN supplier_item_data sid ON p.pid = sid.pid + LEFT JOIN suppliers s ON sid.supplier_id = s.supplierid + LEFT JOIN product_category_index pci ON p.pid = pci.pid + LEFT JOIN product_categories pc ON pci.cat_id = pc.cat_id + LEFT JOIN product_categories pc1 ON p.company = pc1.cat_id + LEFT JOIN product_categories pc2 ON p.line = pc2.cat_id + LEFT JOIN product_categories pc3 ON p.subline = pc3.cat_id + LEFT JOIN product_categories pc4 ON p.artist = pc4.cat_id + LEFT JOIN product_last_sold pls ON p.pid = pls.pid + WHERE p.pid IN (?) + GROUP BY p.pid + `, [missingPids]); + + if (!prodData || prodData.length === 0) { + return { + status: "complete", + recordsAdded: 0, + message: "No products found in production database" + }; } - } else { - await connection.query('DROP TABLE IF EXISTS temp_products'); + + // Process in batches + let recordsAdded = 0; + for (let i = 0; i < prodData.length; i += BATCH_SIZE) { + const batch = prodData.slice(i, i + BATCH_SIZE); + + const placeholders = batch.map((_, idx) => { + const base = idx * 41; // 41 columns + return `(${Array.from({ length: 41 }, (_, i) => `$${base + i + 1}`).join(', ')})`; + }).join(','); + + const values = batch.flatMap(row => [ + row.pid, + row.title, + row.description, + row.SKU, + row.stock_quantity, + row.pending_qty, + row.preorder_count, + row.notions_inv_count, + row.price, + row.regular_price, + row.cost_price, + row.vendor, + row.vendor_reference, + row.notions_reference, + row.brand, + row.line, + row.subline, + row.artist, + row.category_ids, + row.date_created, + row.first_received, + row.landing_cost_price, + row.barcode, + row.harmonized_tariff_code, + row.updated_at, + row.visible, + row.replenishable, + row.permalink, + row.moq, + row.rating, + row.reviews, + row.weight, + row.length, + row.width, + row.height, + row.country_of_origin, + row.location, + row.total_sold, + row.baskets, + row.notifies, + row.date_last_sold + ]); + + const [result] = await localConnection.query(` + WITH inserted_products AS ( + INSERT INTO products ( + pid, title, description, SKU, stock_quantity, pending_qty, preorder_count, + notions_inv_count, price, regular_price, cost_price, vendor, vendor_reference, + notions_reference, brand, line, subline, artist, category_ids, created_at, + first_received, landing_cost_price, barcode, harmonized_tariff_code, + updated_at, visible, replenishable, permalink, moq, rating, reviews, + weight, length, width, height, country_of_origin, location, total_sold, + baskets, notifies, date_last_sold + ) + VALUES ${placeholders} + ON CONFLICT (pid) DO NOTHING + RETURNING pid + ) + SELECT COUNT(*) as inserted FROM inserted_products + `, values); + + recordsAdded += result.rows[0].inserted; + } + + return { + status: "complete", + recordsAdded, + message: `Successfully imported ${recordsAdded} missing products` + }; + } catch (error) { + console.error('Error importing missing products:', error); + throw error; } } @@ -108,7 +292,7 @@ async function materializeCalculations(prodConnection, localConnection, incremen message: "Fetching product data from production" }); - // Get all product data in a single optimized query + // Get all product data in a single optimized query - Keep MySQL compatible const [prodData] = await prodConnection.query(` SELECT p.pid, @@ -238,11 +422,9 @@ async function materializeCalculations(prodConnection, localConnection, incremen const batch = prodData.slice(i, Math.min(i + BATCH_SIZE, prodData.length)); await withRetry(async () => { - // Build the parameterized query for PostgreSQL - const valueBlocks = batch.map((_, idx) => { + const placeholders = batch.map((_, idx) => { const offset = idx * 41; // 41 columns - const params = Array.from({ length: 41 }, (_, i) => `$${offset + i + 1}`); - return `(${params.join(', ')})`; + return `(${Array.from({ length: 41 }, (_, i) => `$${offset + i + 1}`).join(', ')})`; }).join(','); const values = batch.flatMap(row => [ @@ -299,7 +481,7 @@ async function materializeCalculations(prodConnection, localConnection, incremen updated_at, visible, replenishable, permalink, moq, rating, reviews, weight, length, width, height, country_of_origin, location, total_sold, baskets, notifies, date_last_sold - ) VALUES ${valueBlocks} + ) VALUES ${placeholders} ON CONFLICT (pid) DO UPDATE SET title = EXCLUDED.title, description = EXCLUDED.description, @@ -341,9 +523,6 @@ async function materializeCalculations(prodConnection, localConnection, incremen baskets = EXCLUDED.baskets, notifies = EXCLUDED.notifies, date_last_sold = EXCLUDED.date_last_sold - RETURNING - CASE WHEN xmax::text::int > 0 THEN 0 ELSE 1 END AS inserted, - CASE WHEN xmax::text::int > 0 THEN 1 ELSE 0 END AS updated `, values); }, `Error inserting batch ${i} to ${i + batch.length}`); @@ -376,13 +555,13 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate const [syncResult] = await localConnection.query( "SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'products'" ); - if (syncResult.length > 0) { - lastSyncTime = syncResult[0].last_sync_timestamp; + if (syncResult.rows.length > 0) { + lastSyncTime = syncResult.rows[0].last_sync_timestamp; } } // Setup temporary tables - await setupAndCleanupTempTables(localConnection, 'setup'); + await setupTemporaryTables(localConnection); // Materialize calculations into temp table await materializeCalculations(prodConnection, localConnection, incrementalUpdate, lastSyncTime); @@ -442,11 +621,9 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate const batch = products.slice(i, Math.min(i + BATCH_SIZE, products.length)); await withRetry(async () => { - // Build the parameterized query for PostgreSQL - const valueBlocks = batch.map((_, idx) => { + const placeholders = batch.map((_, idx) => { const offset = idx * 41; // 41 columns - const params = Array.from({ length: 41 }, (_, i) => `$${offset + i + 1}`); - return `(${params.join(', ')})`; + return `(${Array.from({ length: 41 }, (_, i) => `$${offset + i + 1}`).join(', ')})`; }).join(','); const values = batch.flatMap(row => [ @@ -494,70 +671,69 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate ]); const result = await localConnection.query(` - INSERT INTO products ( - pid, title, description, SKU, stock_quantity, pending_qty, preorder_count, - notions_inv_count, price, regular_price, cost_price, vendor, vendor_reference, - notions_reference, brand, line, subline, artist, category_ids, created_at, - first_received, landing_cost_price, barcode, harmonized_tariff_code, - updated_at, visible, replenishable, permalink, moq, rating, reviews, - weight, length, width, height, country_of_origin, location, total_sold, - baskets, notifies, date_last_sold - ) VALUES ${valueBlocks} - ON CONFLICT (pid) DO UPDATE SET - title = EXCLUDED.title, - description = EXCLUDED.description, - SKU = EXCLUDED.SKU, - stock_quantity = EXCLUDED.stock_quantity, - pending_qty = EXCLUDED.pending_qty, - preorder_count = EXCLUDED.preorder_count, - notions_inv_count = EXCLUDED.notions_inv_count, - price = EXCLUDED.price, - regular_price = EXCLUDED.regular_price, - cost_price = EXCLUDED.cost_price, - vendor = EXCLUDED.vendor, - vendor_reference = EXCLUDED.vendor_reference, - notions_reference = EXCLUDED.notions_reference, - brand = EXCLUDED.brand, - line = EXCLUDED.line, - subline = EXCLUDED.subline, - artist = EXCLUDED.artist, - category_ids = EXCLUDED.category_ids, - created_at = EXCLUDED.created_at, - first_received = EXCLUDED.first_received, - landing_cost_price = EXCLUDED.landing_cost_price, - barcode = EXCLUDED.barcode, - harmonized_tariff_code = EXCLUDED.harmonized_tariff_code, - updated_at = EXCLUDED.updated_at, - visible = EXCLUDED.visible, - replenishable = EXCLUDED.replenishable, - permalink = EXCLUDED.permalink, - moq = EXCLUDED.moq, - rating = EXCLUDED.rating, - reviews = EXCLUDED.reviews, - weight = EXCLUDED.weight, - length = EXCLUDED.length, - width = EXCLUDED.width, - height = EXCLUDED.height, - country_of_origin = EXCLUDED.country_of_origin, - location = EXCLUDED.location, - total_sold = EXCLUDED.total_sold, - baskets = EXCLUDED.baskets, - notifies = EXCLUDED.notifies, - date_last_sold = EXCLUDED.date_last_sold - RETURNING - CASE WHEN xmax::text::int > 0 THEN 0 ELSE 1 END AS inserted, - CASE WHEN xmax::text::int > 0 THEN 1 ELSE 0 END AS updated + WITH upserted_products AS ( + INSERT INTO products ( + pid, title, description, SKU, stock_quantity, pending_qty, preorder_count, + notions_inv_count, price, regular_price, cost_price, vendor, vendor_reference, + notions_reference, brand, line, subline, artist, category_ids, created_at, + first_received, landing_cost_price, barcode, harmonized_tariff_code, + updated_at, visible, replenishable, permalink, moq, rating, reviews, + weight, length, width, height, country_of_origin, location, total_sold, + baskets, notifies, date_last_sold + ) VALUES ${placeholders} + ON CONFLICT (pid) DO UPDATE SET + title = EXCLUDED.title, + description = EXCLUDED.description, + SKU = EXCLUDED.SKU, + stock_quantity = EXCLUDED.stock_quantity, + pending_qty = EXCLUDED.pending_qty, + preorder_count = EXCLUDED.preorder_count, + notions_inv_count = EXCLUDED.notions_inv_count, + price = EXCLUDED.price, + regular_price = EXCLUDED.regular_price, + cost_price = EXCLUDED.cost_price, + vendor = EXCLUDED.vendor, + vendor_reference = EXCLUDED.vendor_reference, + notions_reference = EXCLUDED.notions_reference, + brand = EXCLUDED.brand, + line = EXCLUDED.line, + subline = EXCLUDED.subline, + artist = EXCLUDED.artist, + category_ids = EXCLUDED.category_ids, + created_at = EXCLUDED.created_at, + first_received = EXCLUDED.first_received, + landing_cost_price = EXCLUDED.landing_cost_price, + barcode = EXCLUDED.barcode, + harmonized_tariff_code = EXCLUDED.harmonized_tariff_code, + updated_at = EXCLUDED.updated_at, + visible = EXCLUDED.visible, + replenishable = EXCLUDED.replenishable, + permalink = EXCLUDED.permalink, + moq = EXCLUDED.moq, + rating = EXCLUDED.rating, + reviews = EXCLUDED.reviews, + weight = EXCLUDED.weight, + length = EXCLUDED.length, + width = EXCLUDED.width, + height = EXCLUDED.height, + country_of_origin = EXCLUDED.country_of_origin, + location = EXCLUDED.location, + total_sold = EXCLUDED.total_sold, + baskets = EXCLUDED.baskets, + notifies = EXCLUDED.notifies, + date_last_sold = EXCLUDED.date_last_sold + RETURNING + CASE WHEN xmax::text::int > 0 THEN 0 ELSE 1 END AS inserted, + CASE WHEN xmax::text::int > 0 THEN 1 ELSE 0 END AS updated + ) + SELECT + COUNT(*) FILTER (WHERE inserted = 1) as inserted, + COUNT(*) FILTER (WHERE updated = 1) as updated + FROM upserted_products `, values); - // Count inserted and updated records - const stats = result.rows.reduce((acc, row) => { - if (row.inserted) acc.inserted++; - if (row.updated) acc.updated++; - return acc; - }, { inserted: 0, updated: 0 }); - - recordsAdded += stats.inserted; - recordsUpdated += stats.updated; + recordsAdded += result.rows[0].inserted; + recordsUpdated += result.rows[0].updated; }, `Error inserting batch ${i} to ${i + batch.length}`); outputProgress({ @@ -575,13 +751,13 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate // Update sync status await localConnection.query(` INSERT INTO sync_status (table_name, last_sync_timestamp) - VALUES ('products', CURRENT_TIMESTAMP) + VALUES ('products', NOW()) ON CONFLICT (table_name) DO UPDATE SET - last_sync_timestamp = CURRENT_TIMESTAMP + last_sync_timestamp = NOW() `); // Cleanup temporary tables - await setupAndCleanupTempTables(localConnection, 'cleanup'); + await cleanupTemporaryTables(localConnection); return { status: "complete", @@ -593,7 +769,7 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate console.error('Error in importProducts:', error); // Attempt cleanup on error try { - await setupAndCleanupTempTables(localConnection, 'cleanup'); + await cleanupTemporaryTables(localConnection); } catch (cleanupError) { console.error('Error during cleanup:', cleanupError); } @@ -601,199 +777,10 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate } } -async function importMissingProducts(prodConnection, localConnection, missingPids) { - if (!missingPids || missingPids.length === 0) { - return { - status: "complete", - recordsAdded: 0, - message: "No missing products to import" - }; - } - - try { - // Setup temporary tables - await setupAndCleanupTempTables(localConnection, 'setup'); - - // Get product data from production - const [prodData] = await prodConnection.query(` - SELECT - p.pid, - p.description AS title, - p.notes AS description, - p.itemnumber AS SKU, - p.date_created, - p.datein AS first_received, - p.location, - p.upc AS barcode, - p.harmonized_tariff_code, - p.stamp AS updated_at, - CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible, - CASE - WHEN p.reorder < 0 THEN 0 - WHEN ( - (COALESCE(pls.date_sold, '0000-00-00') = '0000-00-00' OR pls.date_sold <= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR)) - OR (p.datein = '0000-00-00 00:00:00' OR p.datein <= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL 5 YEAR)) - OR (p.date_refill = '0000-00-00 00:00:00' OR p.date_refill <= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL 5 YEAR)) - ) THEN 0 - ELSE 1 - END AS replenishable, - COALESCE(si.available_local, 0) as stock_quantity, - 0 as pending_qty, - COALESCE(ci.onpreorder, 0) as preorder_count, - COALESCE(pnb.inventory, 0) as notions_inv_count, - COALESCE(pcp.price_each, 0) as price, - COALESCE(p.sellingprice, 0) AS regular_price, - CASE - WHEN EXISTS (SELECT 1 FROM product_inventory WHERE pid = p.pid AND count > 0) - THEN (SELECT ROUND(AVG(costeach), 5) FROM product_inventory WHERE pid = p.pid AND count > 0) - ELSE (SELECT costeach FROM product_inventory WHERE pid = p.pid ORDER BY daterec DESC LIMIT 1) - END AS cost_price, - NULL as landing_cost_price, - s.companyname AS vendor, - CASE - WHEN s.companyname = 'Notions' THEN sid.notions_itemnumber - ELSE sid.supplier_itemnumber - END AS vendor_reference, - sid.notions_itemnumber AS notions_reference, - CONCAT('https://www.acherryontop.com/shop/product/', p.pid) AS permalink, - pc1.name AS brand, - pc2.name AS line, - pc3.name AS subline, - pc4.name AS artist, - COALESCE(CASE - WHEN sid.supplier_id = 92 THEN sid.notions_qty_per_unit - ELSE sid.supplier_qty_per_unit - END, sid.notions_qty_per_unit) AS moq, - p.rating, - p.rating_votes AS reviews, - p.weight, - p.length, - p.width, - p.height, - p.country_of_origin, - (SELECT COUNT(*) FROM mybasket mb WHERE mb.item = p.pid AND mb.qty > 0) AS baskets, - (SELECT COUNT(*) FROM product_notify pn WHERE pn.pid = p.pid) AS notifies, - p.totalsold AS total_sold, - pls.date_sold as date_last_sold, - GROUP_CONCAT(DISTINCT CASE - WHEN pc.cat_id IS NOT NULL - AND pc.type IN (10, 20, 11, 21, 12, 13) - AND pci.cat_id NOT IN (16, 17) - THEN pci.cat_id - END) as category_ids - FROM products p - LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0 - LEFT JOIN current_inventory ci ON p.pid = ci.pid - LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid - LEFT JOIN product_current_prices pcp ON p.pid = pcp.pid AND pcp.active = 1 - LEFT JOIN supplier_item_data sid ON p.pid = sid.pid - LEFT JOIN suppliers s ON sid.supplier_id = s.supplierid - LEFT JOIN product_category_index pci ON p.pid = pci.pid - LEFT JOIN product_categories pc ON pci.cat_id = pc.cat_id - LEFT JOIN product_categories pc1 ON p.company = pc1.cat_id - LEFT JOIN product_categories pc2 ON p.line = pc2.cat_id - LEFT JOIN product_categories pc3 ON p.subline = pc3.cat_id - LEFT JOIN product_categories pc4 ON p.artist = pc4.cat_id - LEFT JOIN product_last_sold pls ON p.pid = pls.pid - WHERE p.pid IN (?) - GROUP BY p.pid - `, [missingPids]); - - if (!prodData || prodData.length === 0) { - return { - status: "complete", - recordsAdded: 0, - message: "No products found in production database" - }; - } - - // Process in batches - let recordsAdded = 0; - for (let i = 0; i < prodData.length; i += 1000) { - const batch = prodData.slice(i, i + 1000); - - // Build the parameterized query for PostgreSQL - const valueBlocks = batch.map((_, idx) => { - const offset = idx * 41; // 41 columns - const params = Array.from({ length: 41 }, (_, i) => `$${offset + i + 1}`); - return `(${params.join(', ')})`; - }).join(','); - - const values = batch.flatMap(row => [ - row.pid, - row.title, - row.description, - row.SKU, - row.stock_quantity, - row.pending_qty, - row.preorder_count, - row.notions_inv_count, - row.price, - row.regular_price, - row.cost_price, - row.vendor, - row.vendor_reference, - row.notions_reference, - row.brand, - row.line, - row.subline, - row.artist, - row.category_ids, - row.date_created, - row.first_received, - row.landing_cost_price, - row.barcode, - row.harmonized_tariff_code, - row.updated_at, - row.visible, - row.replenishable, - row.permalink, - row.moq, - row.rating, - row.reviews, - row.weight, - row.length, - row.width, - row.height, - row.country_of_origin, - row.location, - row.total_sold, - row.baskets, - row.notifies, - row.date_last_sold - ]); - - const result = await localConnection.query(` - INSERT INTO products ( - pid, title, description, SKU, stock_quantity, pending_qty, preorder_count, - notions_inv_count, price, regular_price, cost_price, vendor, vendor_reference, - notions_reference, brand, line, subline, artist, category_ids, created_at, - first_received, landing_cost_price, barcode, harmonized_tariff_code, - updated_at, visible, replenishable, permalink, moq, rating, reviews, - weight, length, width, height, country_of_origin, location, total_sold, - baskets, notifies, date_last_sold - ) VALUES ${valueBlocks} - ON CONFLICT (pid) DO NOTHING - RETURNING pid - `, values); - - recordsAdded += result.rows.length; - } - - return { - status: "complete", - recordsAdded, - message: `Successfully imported ${recordsAdded} missing products` - }; - } catch (error) { - console.error('Error importing missing products:', error); - throw error; - } -} - module.exports = { importProducts, importMissingProducts, - setupAndCleanupTempTables, + setupTemporaryTables, + cleanupTemporaryTables, materializeCalculations }; \ No newline at end of file diff --git a/inventory-server/scripts/import/purchase-orders.js b/inventory-server/scripts/import/purchase-orders.js index b2fbaeb..f0dfc79 100644 --- a/inventory-server/scripts/import/purchase-orders.js +++ b/inventory-server/scripts/import/purchase-orders.js @@ -10,22 +10,38 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental const [syncInfo] = await localConnection.query( "SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'purchase_orders'" ); - const lastSyncTime = syncInfo?.[0]?.last_sync_timestamp || '1970-01-01'; + const lastSyncTime = syncInfo?.rows?.[0]?.last_sync_timestamp || '1970-01-01'; console.log('Purchase Orders: Using last sync time:', lastSyncTime); - // Insert temporary table creation query for purchase orders + // Create temporary tables with PostgreSQL syntax await localConnection.query(` - CREATE TABLE IF NOT EXISTS temp_purchase_orders ( - po_id INT UNSIGNED NOT NULL, - pid INT UNSIGNED NOT NULL, + DROP TABLE IF EXISTS temp_purchase_orders; + DROP TABLE IF EXISTS temp_po_receivings; + + CREATE TEMP TABLE temp_purchase_orders ( + po_id INTEGER NOT NULL, + pid INTEGER NOT NULL, vendor VARCHAR(255), date DATE, expected_date DATE, - status INT, + status INTEGER, notes TEXT, PRIMARY KEY (po_id, pid) - ) ENGINE=InnoDB DEFAULT CHARSET=utf8; + ); + + CREATE TEMP TABLE temp_po_receivings ( + po_id INTEGER, + pid INTEGER NOT NULL, + receiving_id INTEGER NOT NULL, + qty_each INTEGER, + cost_each DECIMAL(10,3), + received_date TIMESTAMP, + received_by INTEGER, + received_by_name VARCHAR(255), + is_alt_po INTEGER, + PRIMARY KEY (receiving_id, pid) + ); `); outputProgress({ @@ -33,8 +49,8 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental status: "running", }); - // Get column names first - const [columns] = await localConnection.query(` + // Get column names - Keep MySQL compatible for production + const [columns] = await prodConnection.query(` SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'purchase_orders' @@ -60,7 +76,7 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []; - // First get all relevant PO IDs with basic info + // First get all relevant PO IDs with basic info - Keep MySQL compatible for production const [[{ total }]] = await prodConnection.query(` SELECT COUNT(*) as total FROM ( @@ -99,6 +115,7 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental console.log('Purchase Orders: Found changes:', total); + // Get PO list - Keep MySQL compatible for production const [poList] = await prodConnection.query(` SELECT DISTINCT COALESCE(p.po_id, r.receiving_id) as po_id, @@ -185,7 +202,7 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental const batch = poList.slice(i, Math.min(i + BATCH_SIZE, poList.length)); const poIds = batch.map(po => po.po_id); - // Get all products for these POs in one query + // Get all products for these POs in one query - Keep MySQL compatible for production const [poProducts] = await prodConnection.query(` SELECT pop.po_id, @@ -207,7 +224,7 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental const productPids = [...new Set(productBatch.map(p => p.pid))]; const batchPoIds = [...new Set(productBatch.map(p => p.po_id))]; - // Get receivings for this batch with employee names + // Get receivings for this batch with employee names - Keep MySQL compatible for production const [receivings] = await prodConnection.query(` SELECT r.po_id, @@ -232,315 +249,176 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental ORDER BY r.po_id, rp.pid, rp.received_date `, [batchPoIds, productPids]); - // Create maps for this sub-batch - const poProductMap = new Map(); - productBatch.forEach(product => { - const key = `${product.po_id}-${product.pid}`; - poProductMap.set(key, product); - }); + // Insert receivings into temp table + if (receivings.length > 0) { + const placeholders = receivings.map((_, idx) => { + const base = idx * 9; + return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9})`; + }).join(','); - const receivingMap = new Map(); - const altReceivingMap = new Map(); - const noPOReceivingMap = new Map(); - - receivings.forEach(receiving => { - const key = `${receiving.po_id}-${receiving.pid}`; - if (receiving.is_alt_po === 2) { - // No PO - if (!noPOReceivingMap.has(receiving.pid)) { - noPOReceivingMap.set(receiving.pid, []); - } - noPOReceivingMap.get(receiving.pid).push(receiving); - } else if (receiving.is_alt_po === 1) { - // Different PO - if (!altReceivingMap.has(receiving.pid)) { - altReceivingMap.set(receiving.pid, []); - } - altReceivingMap.get(receiving.pid).push(receiving); - } else { - // Original PO - if (!receivingMap.has(key)) { - receivingMap.set(key, []); - } - receivingMap.get(key).push(receiving); - } - }); + const values = receivings.flatMap(r => [ + r.po_id, + r.pid, + r.receiving_id, + r.qty_each, + r.cost_each, + r.received_date, + r.received_by, + r.received_by_name, + r.is_alt_po + ]); - // Verify PIDs exist - const [existingPids] = await localConnection.query( - 'SELECT pid FROM products WHERE pid IN (?)', - [productPids] - ); - const validPids = new Set(existingPids.map(p => p.pid)); + await localConnection.query(` + INSERT INTO temp_po_receivings ( + po_id, pid, receiving_id, qty_each, cost_each, received_date, + received_by, received_by_name, is_alt_po + ) + VALUES ${placeholders} + ON CONFLICT (receiving_id, pid) DO UPDATE SET + po_id = EXCLUDED.po_id, + qty_each = EXCLUDED.qty_each, + cost_each = EXCLUDED.cost_each, + received_date = EXCLUDED.received_date, + received_by = EXCLUDED.received_by, + received_by_name = EXCLUDED.received_by_name, + is_alt_po = EXCLUDED.is_alt_po + `, values); + } - // First check which PO lines already exist and get their current values - const poLines = Array.from(poProductMap.values()) - .filter(p => validPids.has(p.pid)) - .map(p => [p.po_id, p.pid]); + // Process each PO product + for (const product of productBatch) { + const po = batch.find(p => p.po_id === product.po_id); + if (!po) continue; - const [existingPOs] = await localConnection.query( - `SELECT ${columnNames.join(',')} FROM purchase_orders WHERE (po_id, pid) IN (${poLines.map(() => "(?,?)").join(",")})`, - poLines.flat() - ); - const existingPOMap = new Map( - existingPOs.map(po => [`${po.po_id}-${po.pid}`, po]) - ); + // Insert into temp_purchase_orders + const placeholders = `($1, $2, $3, $4, $5, $6, $7)`; + const values = [ + product.po_id, + product.pid, + po.vendor, + po.date, + po.expected_date, + po.status, + po.notes || po.long_note + ]; - // Split into inserts and updates - const insertsAndUpdates = { inserts: [], updates: [] }; - let batchProcessed = 0; + await localConnection.query(` + INSERT INTO temp_purchase_orders ( + po_id, pid, vendor, date, expected_date, status, notes + ) + VALUES ${placeholders} + ON CONFLICT (po_id, pid) DO UPDATE SET + vendor = EXCLUDED.vendor, + date = EXCLUDED.date, + expected_date = EXCLUDED.expected_date, + status = EXCLUDED.status, + notes = EXCLUDED.notes + `, values); - for (const po of batch) { - const poProducts = Array.from(poProductMap.values()) - .filter(p => p.po_id === po.po_id && validPids.has(p.pid)); - - for (const product of poProducts) { - const key = `${po.po_id}-${product.pid}`; - const receivingHistory = receivingMap.get(key) || []; - const altReceivingHistory = altReceivingMap.get(product.pid) || []; - const noPOReceivingHistory = noPOReceivingMap.get(product.pid) || []; - - // Combine all receivings and sort by date - const allReceivings = [ - ...receivingHistory.map(r => ({ ...r, type: 'original' })), - ...altReceivingHistory.map(r => ({ ...r, type: 'alternate' })), - ...noPOReceivingHistory.map(r => ({ ...r, type: 'no_po' })) - ].sort((a, b) => new Date(a.received_date || '9999-12-31') - new Date(b.received_date || '9999-12-31')); - - // Split receivings into original PO and others - const originalPOReceivings = allReceivings.filter(r => r.type === 'original'); - const otherReceivings = allReceivings.filter(r => r.type !== 'original'); - - // Track FIFO fulfillment - let remainingToFulfill = product.ordered; - const fulfillmentTracking = []; - let totalReceived = 0; - let actualCost = null; // Will store the cost of the first receiving that fulfills this PO - let firstFulfillmentReceiving = null; - let lastFulfillmentReceiving = null; - - for (const receiving of allReceivings) { - // Convert quantities to base units using supplier data - const baseQtyReceived = receiving.qty_each * ( - receiving.type === 'original' ? 1 : - Math.max(1, product.supplier_qty_per_unit || 1) - ); - const qtyToApply = Math.min(remainingToFulfill, baseQtyReceived); - - if (qtyToApply > 0) { - // If this is the first receiving being applied, use its cost - if (actualCost === null && receiving.cost_each > 0) { - actualCost = receiving.cost_each; - firstFulfillmentReceiving = receiving; - } - lastFulfillmentReceiving = receiving; - fulfillmentTracking.push({ - receiving_id: receiving.receiving_id, - qty_applied: qtyToApply, - qty_total: baseQtyReceived, - cost: receiving.cost_each || actualCost || product.cost_each, - date: receiving.received_date, - received_by: receiving.received_by, - received_by_name: receiving.received_by_name || 'Unknown', - type: receiving.type, - remaining_qty: baseQtyReceived - qtyToApply - }); - remainingToFulfill -= qtyToApply; - } else { - // Track excess receivings - fulfillmentTracking.push({ - receiving_id: receiving.receiving_id, - qty_applied: 0, - qty_total: baseQtyReceived, - cost: receiving.cost_each || actualCost || product.cost_each, - date: receiving.received_date, - received_by: receiving.received_by, - received_by_name: receiving.received_by_name || 'Unknown', - type: receiving.type, - is_excess: true - }); - } - totalReceived += baseQtyReceived; - } - - const receiving_status = !totalReceived ? 1 : // created - remainingToFulfill > 0 ? 30 : // partial - 40; // full - - function formatDate(dateStr) { - if (!dateStr) return null; - if (dateStr === '0000-00-00' || dateStr === '0000-00-00 00:00:00') return null; - if (typeof dateStr === 'string' && !dateStr.match(/^\d{4}-\d{2}-\d{2}/)) return null; - try { - const date = new Date(dateStr); - if (isNaN(date.getTime())) return null; - if (date.getFullYear() < 1900 || date.getFullYear() > 2100) return null; - return date.toISOString().split('T')[0]; - } catch (e) { - return null; - } - } - - const rowValues = columnNames.map(col => { - switch (col) { - case 'po_id': return po.po_id; - case 'vendor': return po.vendor; - case 'date': return formatDate(po.date); - case 'expected_date': return formatDate(po.expected_date); - case 'pid': return product.pid; - case 'sku': return product.sku; - case 'name': return product.name; - case 'cost_price': return actualCost || product.cost_each; - case 'po_cost_price': return product.cost_each; - case 'status': return po.status; - case 'notes': return po.notes; - case 'long_note': return po.long_note; - case 'ordered': return product.ordered; - case 'received': return totalReceived; - case 'unfulfilled': return remainingToFulfill; - case 'excess_received': return Math.max(0, totalReceived - product.ordered); - case 'received_date': return formatDate(firstFulfillmentReceiving?.received_date); - case 'last_received_date': return formatDate(lastFulfillmentReceiving?.received_date); - case 'received_by': return firstFulfillmentReceiving?.received_by_name || null; - case 'receiving_status': return receiving_status; - case 'receiving_history': return JSON.stringify({ - fulfillment: fulfillmentTracking, - ordered_qty: product.ordered, - total_received: totalReceived, - remaining_unfulfilled: remainingToFulfill, - excess_received: Math.max(0, totalReceived - product.ordered), - po_cost: product.cost_each, - actual_cost: actualCost || product.cost_each - }); - default: return null; - } + processed++; + + // Update progress periodically + if (Date.now() - lastProgressUpdate > PROGRESS_INTERVAL) { + outputProgress({ + status: "running", + operation: "Purchase orders import", + message: `Processing purchase orders: ${processed} of ${totalItems}`, + current: processed, + total: totalItems, + elapsed: formatElapsedTime((Date.now() - startTime) / 1000), + remaining: estimateRemaining(startTime, processed, totalItems), + rate: calculateRate(startTime, processed) }); - - if (existingPOMap.has(key)) { - const existing = existingPOMap.get(key); - // Check if any values are different - const hasChanges = columnNames.some(col => { - const newVal = rowValues[columnNames.indexOf(col)]; - const oldVal = existing[col] ?? null; - // Special handling for numbers to avoid type coercion issues - if (typeof newVal === 'number' && typeof oldVal === 'number') { - return Math.abs(newVal - oldVal) > 0.00001; // Allow for tiny floating point differences - } - // Special handling for receiving_history - parse and compare - if (col === 'receiving_history') { - const newHistory = JSON.parse(newVal || '{}'); - const oldHistory = JSON.parse(oldVal || '{}'); - return JSON.stringify(newHistory) !== JSON.stringify(oldHistory); - } - return newVal !== oldVal; - }); - - if (hasChanges) { - insertsAndUpdates.updates.push({ - po_id: po.po_id, - pid: product.pid, - values: rowValues - }); - } - } else { - insertsAndUpdates.inserts.push({ - po_id: po.po_id, - pid: product.pid, - values: rowValues - }); - } - batchProcessed++; + lastProgressUpdate = Date.now(); } } - - // Handle inserts - if (insertsAndUpdates.inserts.length > 0) { - const insertPlaceholders = insertsAndUpdates.inserts - .map(() => `(${Array(columnNames.length).fill("?").join(",")})`) - .join(","); - - const insertResult = await localConnection.query(` - INSERT INTO purchase_orders (${columnNames.join(",")}) - VALUES ${insertPlaceholders} - `, insertsAndUpdates.inserts.map(i => i.values).flat()); - - const affectedRows = insertResult[0].affectedRows; - // For an upsert, MySQL counts rows twice for updates - // So if affectedRows is odd, we have (updates * 2 + inserts) - const updates = Math.floor(affectedRows / 2); - const inserts = affectedRows - (updates * 2); - - recordsAdded += inserts; - recordsUpdated += Math.floor(updates); // Ensure we never have fractional updates - processed += batchProcessed; - } - - // Handle updates - now we know these actually have changes - if (insertsAndUpdates.updates.length > 0) { - const updatePlaceholders = insertsAndUpdates.updates - .map(() => `(${Array(columnNames.length).fill("?").join(",")})`) - .join(","); - - const updateResult = await localConnection.query(` - INSERT INTO purchase_orders (${columnNames.join(",")}) - VALUES ${updatePlaceholders} - ON DUPLICATE KEY UPDATE ${columnNames - .filter((col) => col !== "po_id" && col !== "pid") - .map((col) => `${col} = VALUES(${col})`) - .join(",")}; - `, insertsAndUpdates.updates.map(u => u.values).flat()); - - const affectedRows = updateResult[0].affectedRows; - // For an upsert, MySQL counts rows twice for updates - // So if affectedRows is odd, we have (updates * 2 + inserts) - const updates = Math.floor(affectedRows / 2); - const inserts = affectedRows - (updates * 2); - - recordsUpdated += Math.floor(updates); // Ensure we never have fractional updates - processed += batchProcessed; - } - - // Update progress based on time interval - const now = Date.now(); - if (now - lastProgressUpdate >= PROGRESS_INTERVAL || processed === totalItems) { - outputProgress({ - status: "running", - operation: "Purchase orders import", - current: processed, - total: totalItems, - elapsed: formatElapsedTime((Date.now() - startTime) / 1000), - remaining: estimateRemaining(startTime, processed, totalItems), - rate: calculateRate(startTime, processed) - }); - lastProgressUpdate = now; - } } } - // Only update sync status if we get here (no errors thrown) + // Insert final data into purchase_orders table + const [result] = await localConnection.query(` + WITH inserted_pos AS ( + INSERT INTO purchase_orders ( + po_id, pid, vendor, date, expected_date, status, notes, + received_qty, received_cost, last_received_date, last_received_by, + alt_po_received_qty, alt_po_last_received_date, + no_po_received_qty, no_po_last_received_date + ) + SELECT + po.po_id, + po.pid, + po.vendor, + po.date, + po.expected_date, + po.status, + po.notes, + COALESCE(SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END), 0) as received_qty, + COALESCE(AVG(CASE WHEN r.is_alt_po = 0 THEN r.cost_each END), 0) as received_cost, + MAX(CASE WHEN r.is_alt_po = 0 THEN r.received_date END) as last_received_date, + MAX(CASE WHEN r.is_alt_po = 0 THEN r.received_by_name END) as last_received_by, + COALESCE(SUM(CASE WHEN r.is_alt_po = 1 THEN r.qty_each END), 0) as alt_po_received_qty, + MAX(CASE WHEN r.is_alt_po = 1 THEN r.received_date END) as alt_po_last_received_date, + COALESCE(SUM(CASE WHEN r.is_alt_po = 2 THEN r.qty_each END), 0) as no_po_received_qty, + MAX(CASE WHEN r.is_alt_po = 2 THEN r.received_date END) as no_po_last_received_date + FROM temp_purchase_orders po + LEFT JOIN temp_po_receivings r ON po.pid = r.pid + GROUP BY po.po_id, po.pid, po.vendor, po.date, po.expected_date, po.status, po.notes + ON CONFLICT (po_id, pid) DO UPDATE SET + vendor = EXCLUDED.vendor, + date = EXCLUDED.date, + expected_date = EXCLUDED.expected_date, + status = EXCLUDED.status, + notes = EXCLUDED.notes, + received_qty = EXCLUDED.received_qty, + received_cost = EXCLUDED.received_cost, + last_received_date = EXCLUDED.last_received_date, + last_received_by = EXCLUDED.last_received_by, + alt_po_received_qty = EXCLUDED.alt_po_received_qty, + alt_po_last_received_date = EXCLUDED.alt_po_last_received_date, + no_po_received_qty = EXCLUDED.no_po_received_qty, + no_po_last_received_date = EXCLUDED.no_po_last_received_date + RETURNING xmax + ) + SELECT + COUNT(*) FILTER (WHERE xmax = 0) as inserted, + COUNT(*) FILTER (WHERE xmax <> 0) as updated + FROM inserted_pos + `); + + recordsAdded = result.rows[0].inserted; + recordsUpdated = result.rows[0].updated; + + // Update sync status await localConnection.query(` INSERT INTO sync_status (table_name, last_sync_timestamp) VALUES ('purchase_orders', NOW()) - ON DUPLICATE KEY UPDATE - last_sync_timestamp = NOW(), - last_sync_id = LAST_INSERT_ID(last_sync_id) + ON CONFLICT (table_name) DO UPDATE SET + last_sync_timestamp = NOW() + `); + + // Clean up temporary tables + await localConnection.query(` + DROP TABLE IF EXISTS temp_purchase_orders; + DROP TABLE IF EXISTS temp_po_receivings; `); return { status: "complete", - totalImported: totalItems, - recordsAdded: recordsAdded || 0, - recordsUpdated: recordsUpdated || 0, - incrementalUpdate, - lastSyncTime + recordsAdded, + recordsUpdated, + totalRecords: processed }; - } catch (error) { - outputProgress({ - operation: `${incrementalUpdate ? 'Incremental' : 'Full'} purchase orders import failed`, - status: "error", - error: error.message, - }); + console.error("Error during purchase orders import:", error); + // Attempt cleanup on error + try { + await localConnection.query(` + DROP TABLE IF EXISTS temp_purchase_orders; + DROP TABLE IF EXISTS temp_po_receivings; + `); + } catch (cleanupError) { + console.error('Error during cleanup:', cleanupError); + } throw error; } } diff --git a/inventory-server/scripts/import/utils.js b/inventory-server/scripts/import/utils.js index 12d8a21..d0ea46c 100644 --- a/inventory-server/scripts/import/utils.js +++ b/inventory-server/scripts/import/utils.js @@ -1,5 +1,6 @@ const mysql = require("mysql2/promise"); const { Client } = require("ssh2"); +const { Pool } = require('pg'); const dotenv = require("dotenv"); const path = require("path"); @@ -41,17 +42,41 @@ async function setupSshTunnel(sshConfig) { async function setupConnections(sshConfig) { const tunnel = await setupSshTunnel(sshConfig); + // Setup MySQL connection for production const prodConnection = await mysql.createConnection({ ...sshConfig.prodDbConfig, stream: tunnel.stream, }); - const localConnection = await mysql.createPool({ - ...sshConfig.localDbConfig, - waitForConnections: true, - connectionLimit: 10, - queueLimit: 0 - }); + // Setup PostgreSQL connection pool for local + const localPool = new Pool(sshConfig.localDbConfig); + + // Test the PostgreSQL connection + try { + const client = await localPool.connect(); + await client.query('SELECT NOW()'); + client.release(); + console.log('PostgreSQL connection successful'); + } catch (err) { + console.error('PostgreSQL connection error:', err); + throw err; + } + + // Create a wrapper for the PostgreSQL pool to match MySQL interface + const localConnection = { + query: async (text, params) => { + const client = await localPool.connect(); + try { + const result = await client.query(text, params); + return [result]; + } finally { + client.release(); + } + }, + end: async () => { + await localPool.end(); + } + }; return { ssh: tunnel.ssh,