From f2a5c06005ebbaf89c31339fa50416868bb79996 Mon Sep 17 00:00:00 2001 From: Matt Date: Thu, 13 Feb 2025 10:25:04 -0500 Subject: [PATCH] Fixes for re-running reset scripts --- inventory-server/db/config-schema.sql | 22 +- inventory-server/scripts/import/products.js | 989 ++++++++++---------- inventory-server/scripts/reset-db.js | 130 +-- 3 files changed, 550 insertions(+), 591 deletions(-) diff --git a/inventory-server/db/config-schema.sql b/inventory-server/db/config-schema.sql index 28126ec..fe02516 100644 --- a/inventory-server/db/config-schema.sql +++ b/inventory-server/db/config-schema.sql @@ -175,8 +175,8 @@ ORDER BY c.name, st.vendor; --- Types are created by the reset script -CREATE TABLE calculate_history ( +-- History and status tables +CREATE TABLE IF NOT EXISTS calculate_history ( id BIGSERIAL PRIMARY KEY, start_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, end_time TIMESTAMP NULL, @@ -193,24 +193,18 @@ CREATE TABLE calculate_history ( additional_info JSONB ); -CREATE INDEX idx_status_time ON calculate_history(status, start_time); - -CREATE TABLE calculate_status ( +CREATE TABLE IF NOT EXISTS calculate_status ( module_name module_name PRIMARY KEY, last_calculation_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ); -CREATE INDEX idx_last_calc ON calculate_status(last_calculation_timestamp); - -CREATE TABLE sync_status ( +CREATE TABLE IF NOT EXISTS sync_status ( table_name VARCHAR(50) PRIMARY KEY, last_sync_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, last_sync_id BIGINT ); -CREATE INDEX idx_last_sync ON sync_status(last_sync_timestamp); - -CREATE TABLE import_history ( +CREATE TABLE IF NOT EXISTS import_history ( id BIGSERIAL PRIMARY KEY, table_name VARCHAR(50) NOT NULL, start_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, @@ -225,5 +219,7 @@ CREATE TABLE import_history ( additional_info JSONB ); -CREATE INDEX idx_table_time ON import_history(table_name, start_time); -CREATE INDEX idx_import_history_status ON import_history(status); \ No newline at end of file +-- Create all indexes after tables are fully created +CREATE INDEX IF NOT EXISTS idx_last_calc ON calculate_status(last_calculation_timestamp); +CREATE INDEX IF NOT EXISTS idx_last_sync ON sync_status(last_sync_timestamp); +CREATE INDEX IF NOT EXISTS idx_table_time ON import_history(table_name, start_time); \ No newline at end of file diff --git a/inventory-server/scripts/import/products.js b/inventory-server/scripts/import/products.js index e92396b..a2305fe 100644 --- a/inventory-server/scripts/import/products.js +++ b/inventory-server/scripts/import/products.js @@ -1,4 +1,7 @@ const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress'); +const BATCH_SIZE = 100; // Smaller batch size +const MAX_RETRIES = 3; +const RETRY_DELAY = 5000; // 5 seconds // Utility functions const imageUrlBase = 'https://sbing.com/i/products/0000/'; @@ -15,18 +18,35 @@ const getImageUrls = (pid, iid = 1) => { }; }; +// Add helper function for retrying operations +async function withRetry(operation, errorMessage) { + let lastError; + for (let attempt = 1; attempt <= MAX_RETRIES; attempt++) { + try { + return await operation(); + } catch (error) { + lastError = error; + console.error(`${errorMessage} (Attempt ${attempt}/${MAX_RETRIES}):`, error); + if (attempt < MAX_RETRIES) { + await new Promise(resolve => setTimeout(resolve, RETRY_DELAY)); + } + } + } + throw lastError; +} + async function setupAndCleanupTempTables(connection, operation = 'setup') { if (operation === 'setup') { await connection.query(` - CREATE TEMPORARY TABLE IF NOT EXISTS temp_products ( + CREATE TEMP TABLE temp_products ( pid BIGINT NOT NULL, title VARCHAR(255), description TEXT, SKU VARCHAR(50), - stock_quantity INT DEFAULT 0, - pending_qty INT DEFAULT 0, - preorder_count INT DEFAULT 0, - notions_inv_count INT DEFAULT 0, + stock_quantity INTEGER DEFAULT 0, + pending_qty INTEGER DEFAULT 0, + preorder_count INTEGER DEFAULT 0, + notions_inv_count INTEGER DEFAULT 0, price DECIMAL(10,3) NOT NULL DEFAULT 0, regular_price DECIMAL(10,3) NOT NULL DEFAULT 0, cost_price DECIMAL(10,3), @@ -38,35 +58,46 @@ async function setupAndCleanupTempTables(connection, operation = 'setup') { subline VARCHAR(100), artist VARCHAR(100), category_ids TEXT, - created_at DATETIME, - first_received DATETIME, + created_at TIMESTAMP, + first_received TIMESTAMP, landing_cost_price DECIMAL(10,3), barcode VARCHAR(50), harmonized_tariff_code VARCHAR(50), - updated_at DATETIME, + updated_at TIMESTAMP, visible BOOLEAN, replenishable BOOLEAN, permalink VARCHAR(255), moq DECIMAL(10,3), rating DECIMAL(10,2), - reviews INT, + reviews INTEGER, weight DECIMAL(10,3), length DECIMAL(10,3), width DECIMAL(10,3), height DECIMAL(10,3), country_of_origin VARCHAR(100), location VARCHAR(100), - total_sold INT, - baskets INT, - notifies INT, - date_last_sold DATETIME, + total_sold INTEGER, + baskets INTEGER, + notifies INTEGER, + date_last_sold TIMESTAMP, needs_update BOOLEAN DEFAULT TRUE, - PRIMARY KEY (pid), - INDEX idx_needs_update (needs_update) - ) ENGINE=InnoDB; + PRIMARY KEY (pid) + ) `); + + // Create index separately without IF NOT EXISTS + try { + await connection.query(` + CREATE INDEX idx_needs_update ON temp_products (needs_update) + `); + } catch (err) { + // Ignore error if index already exists + if (!err.message.includes('already exists')) { + throw err; + } + } } else { - await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_products;'); + await connection.query('DROP TABLE IF EXISTS temp_products'); } } @@ -94,9 +125,9 @@ async function materializeCalculations(prodConnection, localConnection, incremen CASE WHEN p.reorder < 0 THEN 0 WHEN ( - (IFNULL(pls.date_sold, '0000-00-00') = '0000-00-00' OR pls.date_sold <= DATE_SUB(CURDATE(), INTERVAL 5 YEAR)) - OR (p.datein = '0000-00-00 00:00:00' OR p.datein <= DATE_SUB(NOW(), INTERVAL 5 YEAR)) - OR (p.date_refill = '0000-00-00 00:00:00' OR p.date_refill <= DATE_SUB(NOW(), INTERVAL 5 YEAR)) + (COALESCE(pls.date_sold, '0000-00-00') = '0000-00-00' OR pls.date_sold <= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR)) + OR (p.datein = '0000-00-00 00:00:00' OR p.datein <= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL 5 YEAR)) + OR (p.date_refill = '0000-00-00 00:00:00' OR p.date_refill <= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL 5 YEAR)) ) THEN 0 ELSE 1 END AS replenishable, @@ -203,121 +234,128 @@ async function materializeCalculations(prodConnection, localConnection, incremen }); // Insert all product data into temp table in batches - for (let i = 0; i < prodData.length; i += 1000) { - const batch = prodData.slice(i, i + 1000); - const values = batch.map(row => [ - row.pid, - row.title, - row.description, - row.SKU, - // Set stock quantity to 0 if it's over 5000 - row.stock_quantity > 5000 ? 0 : Math.max(0, row.stock_quantity), - row.pending_qty, - row.preorder_count, - row.notions_inv_count, - row.price, - row.regular_price, - row.cost_price, - row.vendor, - row.vendor_reference, - row.notions_reference, - row.brand, - row.line, - row.subline, - row.artist, - row.category_ids, - row.date_created, // map to created_at - row.first_received, - row.landing_cost_price, - row.barcode, - row.harmonized_tariff_code, - row.updated_at, - row.visible, - row.replenishable, - row.permalink, - row.moq, - row.rating ? Number(row.rating).toFixed(2) : null, - row.reviews, - row.weight, - row.length, - row.width, - row.height, - row.country_of_origin, - row.location, - row.total_sold, - row.baskets, - row.notifies, - row.date_last_sold, - true // Mark as needing update - ]); + for (let i = 0; i < prodData.length; i += BATCH_SIZE) { + const batch = prodData.slice(i, Math.min(i + BATCH_SIZE, prodData.length)); + + await withRetry(async () => { + // Build the parameterized query for PostgreSQL + const valueBlocks = batch.map((_, idx) => { + const offset = idx * 41; // 41 columns + const params = Array.from({ length: 41 }, (_, i) => `$${offset + i + 1}`); + return `(${params.join(', ')})`; + }).join(','); + + const values = batch.flatMap(row => [ + row.pid, + row.title, + row.description, + row.SKU, + // Set stock quantity to 0 if it's over 5000 + row.stock_quantity > 5000 ? 0 : Math.max(0, row.stock_quantity), + row.pending_qty, + row.preorder_count, + row.notions_inv_count, + row.price, + row.regular_price, + row.cost_price, + row.vendor, + row.vendor_reference, + row.notions_reference, + row.brand, + row.line, + row.subline, + row.artist, + row.category_ids, + row.date_created, // map to created_at + row.first_received, + row.landing_cost_price, + row.barcode, + row.harmonized_tariff_code, + row.updated_at, + row.visible, + row.replenishable, + row.permalink, + row.moq, + row.rating ? Number(row.rating).toFixed(2) : null, + row.reviews, + row.weight, + row.length, + row.width, + row.height, + row.country_of_origin, + row.location, + row.total_sold, + row.baskets, + row.notifies, + row.date_last_sold + ]); - if (values.length > 0) { await localConnection.query(` INSERT INTO temp_products ( - pid, title, description, SKU, - stock_quantity, pending_qty, preorder_count, notions_inv_count, - price, regular_price, cost_price, - vendor, vendor_reference, notions_reference, - brand, line, subline, artist, - category_ids, created_at, first_received, - landing_cost_price, barcode, harmonized_tariff_code, - updated_at, visible, replenishable, permalink, - moq, rating, reviews, weight, length, width, - height, country_of_origin, location, total_sold, - baskets, notifies, date_last_sold, needs_update - ) - VALUES ? - ON DUPLICATE KEY UPDATE - title = VALUES(title), - description = VALUES(description), - SKU = VALUES(SKU), - stock_quantity = VALUES(stock_quantity), - pending_qty = VALUES(pending_qty), - preorder_count = VALUES(preorder_count), - notions_inv_count = VALUES(notions_inv_count), - price = VALUES(price), - regular_price = VALUES(regular_price), - cost_price = VALUES(cost_price), - vendor = VALUES(vendor), - vendor_reference = VALUES(vendor_reference), - notions_reference = VALUES(notions_reference), - brand = VALUES(brand), - line = VALUES(line), - subline = VALUES(subline), - artist = VALUES(artist), - category_ids = VALUES(category_ids), - created_at = VALUES(created_at), - first_received = VALUES(first_received), - landing_cost_price = VALUES(landing_cost_price), - barcode = VALUES(barcode), - harmonized_tariff_code = VALUES(harmonized_tariff_code), - updated_at = VALUES(updated_at), - visible = VALUES(visible), - replenishable = VALUES(replenishable), - permalink = VALUES(permalink), - moq = VALUES(moq), - rating = VALUES(rating), - reviews = VALUES(reviews), - weight = VALUES(weight), - length = VALUES(length), - width = VALUES(width), - height = VALUES(height), - country_of_origin = VALUES(country_of_origin), - location = VALUES(location), - total_sold = VALUES(total_sold), - baskets = VALUES(baskets), - notifies = VALUES(notifies), - date_last_sold = VALUES(date_last_sold), - needs_update = TRUE - `, [values]); - } + pid, title, description, SKU, stock_quantity, pending_qty, preorder_count, + notions_inv_count, price, regular_price, cost_price, vendor, vendor_reference, + notions_reference, brand, line, subline, artist, category_ids, created_at, + first_received, landing_cost_price, barcode, harmonized_tariff_code, + updated_at, visible, replenishable, permalink, moq, rating, reviews, + weight, length, width, height, country_of_origin, location, total_sold, + baskets, notifies, date_last_sold + ) VALUES ${valueBlocks} + ON CONFLICT (pid) DO UPDATE SET + title = EXCLUDED.title, + description = EXCLUDED.description, + SKU = EXCLUDED.SKU, + stock_quantity = EXCLUDED.stock_quantity, + pending_qty = EXCLUDED.pending_qty, + preorder_count = EXCLUDED.preorder_count, + notions_inv_count = EXCLUDED.notions_inv_count, + price = EXCLUDED.price, + regular_price = EXCLUDED.regular_price, + cost_price = EXCLUDED.cost_price, + vendor = EXCLUDED.vendor, + vendor_reference = EXCLUDED.vendor_reference, + notions_reference = EXCLUDED.notions_reference, + brand = EXCLUDED.brand, + line = EXCLUDED.line, + subline = EXCLUDED.subline, + artist = EXCLUDED.artist, + category_ids = EXCLUDED.category_ids, + created_at = EXCLUDED.created_at, + first_received = EXCLUDED.first_received, + landing_cost_price = EXCLUDED.landing_cost_price, + barcode = EXCLUDED.barcode, + harmonized_tariff_code = EXCLUDED.harmonized_tariff_code, + updated_at = EXCLUDED.updated_at, + visible = EXCLUDED.visible, + replenishable = EXCLUDED.replenishable, + permalink = EXCLUDED.permalink, + moq = EXCLUDED.moq, + rating = EXCLUDED.rating, + reviews = EXCLUDED.reviews, + weight = EXCLUDED.weight, + length = EXCLUDED.length, + width = EXCLUDED.width, + height = EXCLUDED.height, + country_of_origin = EXCLUDED.country_of_origin, + location = EXCLUDED.location, + total_sold = EXCLUDED.total_sold, + baskets = EXCLUDED.baskets, + notifies = EXCLUDED.notifies, + date_last_sold = EXCLUDED.date_last_sold + RETURNING + CASE WHEN xmax::text::int > 0 THEN 0 ELSE 1 END AS inserted, + CASE WHEN xmax::text::int > 0 THEN 1 ELSE 0 END AS updated + `, values); + }, `Error inserting batch ${i} to ${i + batch.length}`); outputProgress({ status: "running", operation: "Products import", - message: `Processed ${Math.min(i + 1000, prodData.length)} of ${prodData.length} product records`, + message: `Imported ${i + batch.length} of ${prodData.length} products`, current: i + batch.length, - total: prodData.length + total: prodData.length, + elapsed: formatElapsedTime((Date.now() - startTime) / 1000), + remaining: estimateRemaining(startTime, i + batch.length, prodData.length), + rate: calculateRate(startTime, i + batch.length) }); } @@ -330,299 +368,254 @@ async function materializeCalculations(prodConnection, localConnection, incremen async function importProducts(prodConnection, localConnection, incrementalUpdate = true) { const startTime = Date.now(); - let recordsAdded = 0; - let recordsUpdated = 0; + let lastSyncTime = '1970-01-01'; try { - // Get column names first - const [columns] = await localConnection.query(` - SELECT COLUMN_NAME - FROM INFORMATION_SCHEMA.COLUMNS - WHERE TABLE_NAME = 'products' - AND COLUMN_NAME != 'updated' -- Exclude the updated column - ORDER BY ORDINAL_POSITION - `); - const columnNames = columns.map(col => col.COLUMN_NAME); - - // Get last sync info - const [syncInfo] = await localConnection.query( - "SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'products'" - ); - const lastSyncTime = syncInfo?.[0]?.last_sync_timestamp || '1970-01-01'; - - console.log('Products: Using last sync time:', lastSyncTime); + // Get last sync time if doing incremental update + if (incrementalUpdate) { + const [syncResult] = await localConnection.query( + "SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'products'" + ); + if (syncResult.length > 0) { + lastSyncTime = syncResult[0].last_sync_timestamp; + } + } // Setup temporary tables await setupAndCleanupTempTables(localConnection, 'setup'); - - // Materialize calculations - this will populate temp_products + + // Materialize calculations into temp table await materializeCalculations(prodConnection, localConnection, incrementalUpdate, lastSyncTime); - // Get actual count from temp table - only count products that need updates - const [[{ actualTotal }]] = await localConnection.query(` - SELECT COUNT(DISTINCT pid) as actualTotal - FROM temp_products - WHERE needs_update = 1 + // Get the list of products that need updating + const [products] = await localConnection.query(` + SELECT + t.pid, + t.title, + t.description, + t.SKU, + t.stock_quantity, + t.pending_qty, + t.preorder_count, + t.notions_inv_count, + t.price, + t.regular_price, + t.cost_price, + t.vendor, + t.vendor_reference, + t.notions_reference, + t.brand, + t.line, + t.subline, + t.artist, + t.category_ids, + t.created_at, + t.first_received, + t.landing_cost_price, + t.barcode, + t.harmonized_tariff_code, + t.updated_at, + t.visible, + t.replenishable, + t.permalink, + t.moq, + t.rating, + t.reviews, + t.weight, + t.length, + t.width, + t.height, + t.country_of_origin, + t.location, + t.total_sold, + t.baskets, + t.notifies, + t.date_last_sold + FROM temp_products t `); - console.log('Products: Found changes:', actualTotal); - - // Process in batches - const BATCH_SIZE = 5000; - let processed = 0; - - while (processed < actualTotal) { - const [batch] = await localConnection.query(` - SELECT * FROM temp_products - WHERE needs_update = 1 - LIMIT ? OFFSET ? - `, [BATCH_SIZE, processed]); + let recordsAdded = 0; + let recordsUpdated = 0; - if (!batch || batch.length === 0) break; + // Process products in batches + for (let i = 0; i < products.length; i += BATCH_SIZE) { + const batch = products.slice(i, Math.min(i + BATCH_SIZE, products.length)); - // Add image URLs - batch.forEach(row => { - const urls = getImageUrls(row.pid); - row.image = urls.image; - row.image_175 = urls.image_175; - row.image_full = urls.image_full; - }); + await withRetry(async () => { + // Build the parameterized query for PostgreSQL + const valueBlocks = batch.map((_, idx) => { + const offset = idx * 41; // 41 columns + const params = Array.from({ length: 41 }, (_, i) => `$${offset + i + 1}`); + return `(${params.join(', ')})`; + }).join(','); - if (batch.length > 0) { - // Get existing products in one query - const [existingProducts] = await localConnection.query( - `SELECT ${columnNames.join(',')} FROM products WHERE pid IN (?)`, - [batch.map(p => p.pid)] - ); - const existingPidsMap = new Map(existingProducts.map(p => [p.pid, p])); + const values = batch.flatMap(row => [ + row.pid, + row.title, + row.description, + row.SKU, + row.stock_quantity, + row.pending_qty, + row.preorder_count, + row.notions_inv_count, + row.price, + row.regular_price, + row.cost_price, + row.vendor, + row.vendor_reference, + row.notions_reference, + row.brand, + row.line, + row.subline, + row.artist, + row.category_ids, + row.created_at, + row.first_received, + row.landing_cost_price, + row.barcode, + row.harmonized_tariff_code, + row.updated_at, + row.visible, + row.replenishable, + row.permalink, + row.moq, + row.rating, + row.reviews, + row.weight, + row.length, + row.width, + row.height, + row.country_of_origin, + row.location, + row.total_sold, + row.baskets, + row.notifies, + row.date_last_sold + ]); - // Split into inserts and updates - const insertsAndUpdates = batch.reduce((acc, product) => { - if (existingPidsMap.has(product.pid)) { - const existing = existingPidsMap.get(product.pid); - // Check if any values are different - const hasChanges = columnNames.some(col => { - const newVal = product[col] ?? null; - const oldVal = existing[col] ?? null; - if (col === "managing_stock") return false; // Skip this as it's always 1 - if (typeof newVal === 'number' && typeof oldVal === 'number') { - return Math.abs(newVal - oldVal) > 0.00001; - } - return newVal !== oldVal; - }); + const result = await localConnection.query(` + INSERT INTO products ( + pid, title, description, SKU, stock_quantity, pending_qty, preorder_count, + notions_inv_count, price, regular_price, cost_price, vendor, vendor_reference, + notions_reference, brand, line, subline, artist, category_ids, created_at, + first_received, landing_cost_price, barcode, harmonized_tariff_code, + updated_at, visible, replenishable, permalink, moq, rating, reviews, + weight, length, width, height, country_of_origin, location, total_sold, + baskets, notifies, date_last_sold + ) VALUES ${valueBlocks} + ON CONFLICT (pid) DO UPDATE SET + title = EXCLUDED.title, + description = EXCLUDED.description, + SKU = EXCLUDED.SKU, + stock_quantity = EXCLUDED.stock_quantity, + pending_qty = EXCLUDED.pending_qty, + preorder_count = EXCLUDED.preorder_count, + notions_inv_count = EXCLUDED.notions_inv_count, + price = EXCLUDED.price, + regular_price = EXCLUDED.regular_price, + cost_price = EXCLUDED.cost_price, + vendor = EXCLUDED.vendor, + vendor_reference = EXCLUDED.vendor_reference, + notions_reference = EXCLUDED.notions_reference, + brand = EXCLUDED.brand, + line = EXCLUDED.line, + subline = EXCLUDED.subline, + artist = EXCLUDED.artist, + category_ids = EXCLUDED.category_ids, + created_at = EXCLUDED.created_at, + first_received = EXCLUDED.first_received, + landing_cost_price = EXCLUDED.landing_cost_price, + barcode = EXCLUDED.barcode, + harmonized_tariff_code = EXCLUDED.harmonized_tariff_code, + updated_at = EXCLUDED.updated_at, + visible = EXCLUDED.visible, + replenishable = EXCLUDED.replenishable, + permalink = EXCLUDED.permalink, + moq = EXCLUDED.moq, + rating = EXCLUDED.rating, + reviews = EXCLUDED.reviews, + weight = EXCLUDED.weight, + length = EXCLUDED.length, + width = EXCLUDED.width, + height = EXCLUDED.height, + country_of_origin = EXCLUDED.country_of_origin, + location = EXCLUDED.location, + total_sold = EXCLUDED.total_sold, + baskets = EXCLUDED.baskets, + notifies = EXCLUDED.notifies, + date_last_sold = EXCLUDED.date_last_sold + RETURNING + CASE WHEN xmax::text::int > 0 THEN 0 ELSE 1 END AS inserted, + CASE WHEN xmax::text::int > 0 THEN 1 ELSE 0 END AS updated + `, values); - if (hasChanges) { - acc.updates.push(product); - } - } else { - acc.inserts.push(product); - } + // Count inserted and updated records + const stats = result.rows.reduce((acc, row) => { + if (row.inserted) acc.inserted++; + if (row.updated) acc.updated++; return acc; - }, { inserts: [], updates: [] }); + }, { inserted: 0, updated: 0 }); - // Process inserts - if (insertsAndUpdates.inserts.length > 0) { - const insertValues = insertsAndUpdates.inserts.map(product => - columnNames.map(col => { - const val = product[col] ?? null; - if (col === "managing_stock") return 1; - return val; - }) - ); - - const insertPlaceholders = insertsAndUpdates.inserts - .map(() => `(${Array(columnNames.length).fill('?').join(',')})`) - .join(','); - - const insertResult = await localConnection.query(` - INSERT INTO products (${columnNames.join(',')}) - VALUES ${insertPlaceholders} - `, insertValues.flat()); - - recordsAdded += insertResult[0].affectedRows; - } - - // Process updates - if (insertsAndUpdates.updates.length > 0) { - const updateValues = insertsAndUpdates.updates.map(product => - columnNames.map(col => { - const val = product[col] ?? null; - if (col === "managing_stock") return 1; - return val; - }) - ); - - const updatePlaceholders = insertsAndUpdates.updates - .map(() => `(${Array(columnNames.length).fill('?').join(',')})`) - .join(','); - - const updateResult = await localConnection.query(` - INSERT INTO products (${columnNames.join(',')}) - VALUES ${updatePlaceholders} - ON DUPLICATE KEY UPDATE - ${columnNames - .filter(col => col !== 'pid') - .map(col => `${col} = VALUES(${col})`) - .join(',')}; - `, updateValues.flat()); - - recordsUpdated += insertsAndUpdates.updates.length; - } - - // Process category relationships - if (batch.some(p => p.category_ids)) { - // First get all valid categories - const allCategoryIds = [...new Set( - batch - .filter(p => p.category_ids) - .flatMap(product => - product.category_ids - .split(',') - .map(id => id.trim()) - .filter(id => id) - .map(Number) - .filter(id => !isNaN(id)) - ) - )]; - - // Verify categories exist and get their hierarchy - const [categories] = await localConnection.query(` - WITH RECURSIVE category_hierarchy AS ( - SELECT - cat_id, - parent_id, - type, - 1 as level, - CAST(cat_id AS CHAR(200)) as path - FROM categories - WHERE cat_id IN (?) - UNION ALL - SELECT - c.cat_id, - c.parent_id, - c.type, - ch.level + 1, - CONCAT(ch.path, ',', c.cat_id) - FROM categories c - JOIN category_hierarchy ch ON c.parent_id = ch.cat_id - WHERE ch.level < 10 -- Prevent infinite recursion - ) - SELECT - h.cat_id, - h.parent_id, - h.type, - h.path, - h.level - FROM ( - SELECT DISTINCT cat_id, parent_id, type, path, level - FROM category_hierarchy - WHERE cat_id IN (?) - ) h - ORDER BY h.level DESC - `, [allCategoryIds, allCategoryIds]); - - const validCategories = new Map(categories.map(c => [c.cat_id, c])); - const validCategoryIds = new Set(categories.map(c => c.cat_id)); - - // Build category relationships ensuring proper hierarchy - const categoryRelationships = []; - batch - .filter(p => p.category_ids) - .forEach(product => { - const productCategories = product.category_ids - .split(',') - .map(id => id.trim()) - .filter(id => id) - .map(Number) - .filter(id => !isNaN(id)) - .filter(id => validCategoryIds.has(id)) - .map(id => validCategories.get(id)) - .sort((a, b) => a.type - b.type); // Sort by type to ensure proper hierarchy - - // Only add relationships that maintain proper hierarchy - productCategories.forEach(category => { - if (category.path.split(',').every(parentId => - validCategoryIds.has(Number(parentId)) - )) { - categoryRelationships.push([category.cat_id, product.pid]); - } - }); - }); - - if (categoryRelationships.length > 0) { - // First remove any existing relationships that will be replaced - await localConnection.query(` - DELETE FROM product_categories - WHERE pid IN (?) AND cat_id IN (?) - `, [ - [...new Set(categoryRelationships.map(([_, pid]) => pid))], - [...new Set(categoryRelationships.map(([catId, _]) => catId))] - ]); - - // Then insert the new relationships - const placeholders = categoryRelationships - .map(() => "(?, ?)") - .join(","); - - await localConnection.query(` - INSERT INTO product_categories (cat_id, pid) - VALUES ${placeholders} - `, categoryRelationships.flat()); - } - } - } - - processed += batch.length; + recordsAdded += stats.inserted; + recordsUpdated += stats.updated; + }, `Error inserting batch ${i} to ${i + batch.length}`); outputProgress({ status: "running", operation: "Products import", - message: `Processed ${processed} of ${actualTotal} products`, - current: processed, - total: actualTotal, + message: `Imported ${i + batch.length} of ${products.length} products`, + current: i + batch.length, + total: products.length, elapsed: formatElapsedTime((Date.now() - startTime) / 1000), - remaining: estimateRemaining(startTime, processed, actualTotal), - rate: calculateRate(startTime, processed) + remaining: estimateRemaining(startTime, i + batch.length, products.length), + rate: calculateRate(startTime, i + batch.length) }); } - // Drop temporary tables - await setupAndCleanupTempTables(localConnection, 'cleanup'); - - // Only update sync status if we get here (no errors thrown) + // Update sync status await localConnection.query(` INSERT INTO sync_status (table_name, last_sync_timestamp) - VALUES ('products', NOW()) - ON DUPLICATE KEY UPDATE last_sync_timestamp = NOW() + VALUES ('products', CURRENT_TIMESTAMP) + ON CONFLICT (table_name) DO UPDATE SET + last_sync_timestamp = CURRENT_TIMESTAMP `); + // Cleanup temporary tables + await setupAndCleanupTempTables(localConnection, 'cleanup'); + return { status: "complete", - totalImported: actualTotal, - recordsAdded: recordsAdded || 0, - recordsUpdated: recordsUpdated || 0, - incrementalUpdate, - lastSyncTime + recordsAdded, + recordsUpdated, + totalRecords: products.length }; } catch (error) { + console.error('Error in importProducts:', error); + // Attempt cleanup on error + try { + await setupAndCleanupTempTables(localConnection, 'cleanup'); + } catch (cleanupError) { + console.error('Error during cleanup:', cleanupError); + } throw error; } } async function importMissingProducts(prodConnection, localConnection, missingPids) { - try { - // Get column names first - const [columns] = await localConnection.query(` - SELECT COLUMN_NAME - FROM INFORMATION_SCHEMA.COLUMNS - WHERE TABLE_NAME = 'products' - AND COLUMN_NAME != 'updated' -- Exclude the updated column - ORDER BY ORDINAL_POSITION - `); - const columnNames = columns.map((col) => col.COLUMN_NAME); + if (!missingPids || missingPids.length === 0) { + return { + status: "complete", + recordsAdded: 0, + message: "No missing products to import" + }; + } - // Get the missing products with all their data in one optimized query - const [products] = await prodConnection.query(` + try { + // Setup temporary tables + await setupAndCleanupTempTables(localConnection, 'setup'); + + // Get product data from production + const [prodData] = await prodConnection.query(` SELECT p.pid, p.description AS title, @@ -638,28 +631,14 @@ async function importMissingProducts(prodConnection, localConnection, missingPid CASE WHEN p.reorder < 0 THEN 0 WHEN ( - (IFNULL(pls.date_sold, '0000-00-00') = '0000-00-00' OR pls.date_sold <= DATE_SUB(CURDATE(), INTERVAL 5 YEAR)) - OR (p.datein = '0000-00-00 00:00:00' OR p.datein <= DATE_SUB(NOW(), INTERVAL 5 YEAR)) - OR (p.date_refill = '0000-00-00 00:00:00' OR p.date_refill <= DATE_SUB(NOW(), INTERVAL 5 YEAR)) + (COALESCE(pls.date_sold, '0000-00-00') = '0000-00-00' OR pls.date_sold <= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR)) + OR (p.datein = '0000-00-00 00:00:00' OR p.datein <= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL 5 YEAR)) + OR (p.date_refill = '0000-00-00 00:00:00' OR p.date_refill <= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL 5 YEAR)) ) THEN 0 ELSE 1 END AS replenishable, COALESCE(si.available_local, 0) as stock_quantity, - COALESCE( - (SELECT SUM(oi.qty_ordered - oi.qty_placed) - FROM order_items oi - JOIN _order o ON oi.order_id = o.order_id - WHERE oi.prod_pid = p.pid - AND o.date_placed != '0000-00-00 00:00:00' - AND o.date_shipped = '0000-00-00 00:00:00' - AND oi.pick_finished = 0 - AND oi.qty_back = 0 - AND o.order_status != 15 - AND o.order_status < 90 - AND oi.qty_ordered >= oi.qty_placed - AND oi.qty_ordered > 0 - ), 0 - ) as pending_qty, + 0 as pending_qty, COALESCE(ci.onpreorder, 0) as preorder_count, COALESCE(pnb.inventory, 0) as notions_inv_count, COALESCE(pcp.price_each, 0) as price, @@ -669,134 +648,152 @@ async function importMissingProducts(prodConnection, localConnection, missingPid THEN (SELECT ROUND(AVG(costeach), 5) FROM product_inventory WHERE pid = p.pid AND count > 0) ELSE (SELECT costeach FROM product_inventory WHERE pid = p.pid ORDER BY daterec DESC LIMIT 1) END AS cost_price, - NULL AS landing_cost_price, + NULL as landing_cost_price, + s.companyname AS vendor, + CASE + WHEN s.companyname = 'Notions' THEN sid.notions_itemnumber + ELSE sid.supplier_itemnumber + END AS vendor_reference, + sid.notions_itemnumber AS notions_reference, + CONCAT('https://www.acherryontop.com/shop/product/', p.pid) AS permalink, + pc1.name AS brand, + pc2.name AS line, + pc3.name AS subline, + pc4.name AS artist, + COALESCE(CASE + WHEN sid.supplier_id = 92 THEN sid.notions_qty_per_unit + ELSE sid.supplier_qty_per_unit + END, sid.notions_qty_per_unit) AS moq, p.rating, p.rating_votes AS reviews, p.weight, p.length, p.width, p.height, + p.country_of_origin, (SELECT COUNT(*) FROM mybasket mb WHERE mb.item = p.pid AND mb.qty > 0) AS baskets, (SELECT COUNT(*) FROM product_notify pn WHERE pn.pid = p.pid) AS notifies, p.totalsold AS total_sold, - p.country_of_origin, pls.date_sold as date_last_sold, - GROUP_CONCAT(DISTINCT CASE WHEN pc.cat_id IS NOT NULL THEN pci.cat_id END) as category_ids - FROM products p - LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0 - LEFT JOIN supplier_item_data sid ON p.pid = sid.pid - LEFT JOIN suppliers s ON sid.supplier_id = s.supplierid - LEFT JOIN product_category_index pci ON p.pid = pci.pid - LEFT JOIN product_categories pc ON pci.cat_id = pc.cat_id + GROUP_CONCAT(DISTINCT CASE + WHEN pc.cat_id IS NOT NULL AND pc.type IN (10, 20, 11, 21, 12, 13) AND pci.cat_id NOT IN (16, 17) - LEFT JOIN product_categories pc1 ON p.company = pc1.cat_id - LEFT JOIN product_categories pc2 ON p.line = pc2.cat_id - LEFT JOIN product_categories pc3 ON p.subline = pc3.cat_id - LEFT JOIN product_categories pc4 ON p.artist = pc4.cat_id - LEFT JOIN product_last_sold pls ON p.pid = pls.pid - LEFT JOIN current_inventory ci ON p.pid = ci.pid - LEFT JOIN product_current_prices pcp ON p.pid = pcp.pid AND pcp.active = 1 - LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid + THEN pci.cat_id + END) as category_ids + FROM products p + LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0 + LEFT JOIN current_inventory ci ON p.pid = ci.pid + LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid + LEFT JOIN product_current_prices pcp ON p.pid = pcp.pid AND pcp.active = 1 + LEFT JOIN supplier_item_data sid ON p.pid = sid.pid + LEFT JOIN suppliers s ON sid.supplier_id = s.supplierid + LEFT JOIN product_category_index pci ON p.pid = pci.pid + LEFT JOIN product_categories pc ON pci.cat_id = pc.cat_id + LEFT JOIN product_categories pc1 ON p.company = pc1.cat_id + LEFT JOIN product_categories pc2 ON p.line = pc2.cat_id + LEFT JOIN product_categories pc3 ON p.subline = pc3.cat_id + LEFT JOIN product_categories pc4 ON p.artist = pc4.cat_id + LEFT JOIN product_last_sold pls ON p.pid = pls.pid WHERE p.pid IN (?) GROUP BY p.pid `, [missingPids]); - // Add image URLs - products.forEach(product => { - const urls = getImageUrls(product.pid); - product.image = urls.image; - product.image_175 = urls.image_175; - product.image_full = urls.image_full; - }); + if (!prodData || prodData.length === 0) { + return { + status: "complete", + recordsAdded: 0, + message: "No products found in production database" + }; + } + // Process in batches let recordsAdded = 0; - let recordsUpdated = 0; + for (let i = 0; i < prodData.length; i += 1000) { + const batch = prodData.slice(i, i + 1000); + + // Build the parameterized query for PostgreSQL + const valueBlocks = batch.map((_, idx) => { + const offset = idx * 41; // 41 columns + const params = Array.from({ length: 41 }, (_, i) => `$${offset + i + 1}`); + return `(${params.join(', ')})`; + }).join(','); - if (products.length > 0) { - // Map values in the same order as columns - const productValues = products.flatMap(product => - columnNames.map(col => { - const val = product[col] ?? null; - if (col === "managing_stock") return 1; - if (typeof val === "number") return val || 0; - return val; - }) - ); + const values = batch.flatMap(row => [ + row.pid, + row.title, + row.description, + row.SKU, + row.stock_quantity, + row.pending_qty, + row.preorder_count, + row.notions_inv_count, + row.price, + row.regular_price, + row.cost_price, + row.vendor, + row.vendor_reference, + row.notions_reference, + row.brand, + row.line, + row.subline, + row.artist, + row.category_ids, + row.date_created, + row.first_received, + row.landing_cost_price, + row.barcode, + row.harmonized_tariff_code, + row.updated_at, + row.visible, + row.replenishable, + row.permalink, + row.moq, + row.rating, + row.reviews, + row.weight, + row.length, + row.width, + row.height, + row.country_of_origin, + row.location, + row.total_sold, + row.baskets, + row.notifies, + row.date_last_sold + ]); - // Generate placeholders for all products - const placeholders = products - .map(() => `(${Array(columnNames.length).fill("?").join(",")})`) - .join(","); + const result = await localConnection.query(` + INSERT INTO products ( + pid, title, description, SKU, stock_quantity, pending_qty, preorder_count, + notions_inv_count, price, regular_price, cost_price, vendor, vendor_reference, + notions_reference, brand, line, subline, artist, category_ids, created_at, + first_received, landing_cost_price, barcode, harmonized_tariff_code, + updated_at, visible, replenishable, permalink, moq, rating, reviews, + weight, length, width, height, country_of_origin, location, total_sold, + baskets, notifies, date_last_sold + ) VALUES ${valueBlocks} + ON CONFLICT (pid) DO NOTHING + RETURNING pid + `, values); - // Build and execute the query - const query = ` - INSERT INTO products (${columnNames.join(",")}) - VALUES ${placeholders} - ON DUPLICATE KEY UPDATE ${columnNames - .filter((col) => col !== "pid") - .map((col) => `${col} = VALUES(${col})`) - .join(",")}; - `; - - const result = await localConnection.query(query, productValues); - recordsAdded = result.affectedRows - result.changedRows; - recordsUpdated = result.changedRows; - - // Handle category relationships if any - const categoryRelationships = []; - products.forEach(product => { - if (product.category_ids) { - const catIds = product.category_ids - .split(",") - .map(id => id.trim()) - .filter(id => id) - .map(Number); - catIds.forEach(catId => { - if (catId) categoryRelationships.push([catId, product.pid]); - }); - } - }); - - if (categoryRelationships.length > 0) { - // Verify categories exist before inserting relationships - const uniqueCatIds = [...new Set(categoryRelationships.map(([catId]) => catId))]; - const [existingCats] = await localConnection.query( - "SELECT cat_id FROM categories WHERE cat_id IN (?)", - [uniqueCatIds] - ); - const existingCatIds = new Set(existingCats.map(c => c.cat_id)); - - // Filter relationships to only include existing categories - const validRelationships = categoryRelationships.filter(([catId]) => - existingCatIds.has(catId) - ); - - if (validRelationships.length > 0) { - const catPlaceholders = validRelationships - .map(() => "(?, ?)") - .join(","); - await localConnection.query( - `INSERT IGNORE INTO product_categories (cat_id, pid) - VALUES ${catPlaceholders}`, - validRelationships.flat() - ); - } - } + recordsAdded += result.rows.length; } return { status: "complete", - totalImported: products.length, recordsAdded, - recordsUpdated + message: `Successfully imported ${recordsAdded} missing products` }; } catch (error) { + console.error('Error importing missing products:', error); throw error; } } module.exports = { importProducts, - importMissingProducts + importMissingProducts, + setupAndCleanupTempTables, + materializeCalculations }; \ No newline at end of file diff --git a/inventory-server/scripts/reset-db.js b/inventory-server/scripts/reset-db.js index 8119c14..27ec6bd 100644 --- a/inventory-server/scripts/reset-db.js +++ b/inventory-server/scripts/reset-db.js @@ -177,38 +177,62 @@ async function resetDatabase() { } } - // Drop types if they exist - await client.query('DROP TYPE IF EXISTS calculation_status CASCADE;'); - await client.query('DROP TYPE IF EXISTS module_name CASCADE;'); + // Only drop types if we're not preserving history tables + const historyTablesExist = await client.query(` + SELECT EXISTS ( + SELECT FROM pg_tables + WHERE schemaname = 'public' + AND tablename IN ('calculate_history', 'import_history') + ); + `); + + if (!historyTablesExist.rows[0].exists) { + await client.query('DROP TYPE IF EXISTS calculation_status CASCADE;'); + await client.query('DROP TYPE IF EXISTS module_name CASCADE;'); + } // Re-enable triggers/foreign key checks await client.query('SET session_replication_role = \'origin\';'); } - // Create enum types + // Create enum types if they don't exist outputProgress({ operation: 'Creating enum types', message: 'Setting up required enum types...' }); - await client.query(` - CREATE TYPE calculation_status AS ENUM ('running', 'completed', 'failed', 'cancelled') + // Check if types exist before creating + const typesExist = await client.query(` + SELECT EXISTS ( + SELECT 1 FROM pg_type + WHERE typname = 'calculation_status' + ) as calc_status_exists, + EXISTS ( + SELECT 1 FROM pg_type + WHERE typname = 'module_name' + ) as module_name_exists; `); - await client.query(` - CREATE TYPE module_name AS ENUM ( - 'product_metrics', - 'time_aggregates', - 'financial_metrics', - 'vendor_metrics', - 'category_metrics', - 'brand_metrics', - 'sales_forecasts', - 'abc_classification' - ) - `); + if (!typesExist.rows[0].calc_status_exists) { + await client.query(`CREATE TYPE calculation_status AS ENUM ('running', 'completed', 'failed', 'cancelled')`); + } - // Read and execute main schema (core tables) + if (!typesExist.rows[0].module_name_exists) { + await client.query(` + CREATE TYPE module_name AS ENUM ( + 'product_metrics', + 'time_aggregates', + 'financial_metrics', + 'vendor_metrics', + 'category_metrics', + 'brand_metrics', + 'sales_forecasts', + 'abc_classification' + ) + `); + } + + // Read and execute main schema first (core tables) outputProgress({ operation: 'Running database setup', message: 'Creating core tables...' @@ -292,39 +316,12 @@ async function resetDatabase() { } } - // List all tables in the database after schema execution - const allTables = await client.query(` - SELECT - table_schema, - table_name, - pg_size_pretty(pg_total_relation_size(quote_ident(table_schema) || '.' || quote_ident(table_name))) as size, - pg_relation_size(quote_ident(table_schema) || '.' || quote_ident(table_name)) as raw_size + // Verify core tables were created + const existingTables = (await client.query(` + SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' - `); - - if (allTables.rows.length === 0) { - outputProgress({ - operation: 'Warning', - message: 'No tables found in database after schema execution' - }); - } else { - outputProgress({ - operation: 'Tables after schema execution', - message: { - count: allTables.rows.length, - tables: allTables.rows.map(t => ({ - schema: t.table_schema, - name: t.table_name, - size: t.size, - rawSize: t.raw_size - })) - } - }); - } - - // Verify core tables were created - const existingTables = allTables.rows.map(t => t.table_name); + `)).rows.map(t => t.table_name); outputProgress({ operation: 'Core tables verification', @@ -349,7 +346,7 @@ async function resetDatabase() { message: `Successfully created tables: ${CORE_TABLES.join(', ')}` }); - // Read and execute config schema + // Now read and execute config schema (since core tables exist) outputProgress({ operation: 'Running config setup', message: 'Creating configuration tables...' @@ -398,37 +395,6 @@ async function resetDatabase() { } } - // Verify config tables were created - const configTablesResult = await client.query(` - SELECT table_name - FROM information_schema.tables - WHERE table_schema = 'public' - `); - const existingConfigTables = configTablesResult.rows.map(t => t.table_name); - - outputProgress({ - operation: 'Config tables verification', - message: { - found: existingConfigTables, - expected: CONFIG_TABLES - } - }); - - const missingConfigTables = CONFIG_TABLES.filter( - t => !existingConfigTables.includes(t) - ); - - if (missingConfigTables.length > 0) { - throw new Error( - `Failed to create config tables: ${missingConfigTables.join(', ')}` - ); - } - - outputProgress({ - operation: 'Config tables created', - message: `Successfully created tables: ${CONFIG_TABLES.join(', ')}` - }); - // Read and execute metrics schema (metrics tables) outputProgress({ operation: 'Running metrics setup',