diff --git a/inventory-server/scripts/import/products.js b/inventory-server/scripts/import/products.js index 4386f44..1aaae73 100644 --- a/inventory-server/scripts/import/products.js +++ b/inventory-server/scripts/import/products.js @@ -77,14 +77,115 @@ async function materializeCalculations(prodConnection, localConnection, incremen message: "Fetching product data from production" }); - // First get total count - const [[{ total }]] = await prodConnection.query(` - SELECT COUNT(DISTINCT p.pid) as total + // Get all product data in a single optimized query + const [prodData] = await prodConnection.query(` + SELECT + p.pid, + p.description AS title, + p.notes AS description, + p.itemnumber AS SKU, + p.date_created, + p.datein AS first_received, + p.location, + p.upc AS barcode, + p.harmonized_tariff_code, + p.stamp AS updated_at, + CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible, + CASE + WHEN p.reorder < 0 THEN 0 + WHEN ( + (IFNULL(pls.date_sold, '0000-00-00') = '0000-00-00' OR pls.date_sold <= DATE_SUB(CURDATE(), INTERVAL 5 YEAR)) + OR (p.datein = '0000-00-00 00:00:00' OR p.datein <= DATE_SUB(NOW(), INTERVAL 5 YEAR)) + OR (p.date_refill = '0000-00-00 00:00:00' OR p.date_refill <= DATE_SUB(NOW(), INTERVAL 5 YEAR)) + ) THEN 0 + ELSE 1 + END AS replenishable, + COALESCE(si.available_local, 0) - COALESCE( + (SELECT SUM(oi.qty_ordered - oi.qty_placed) + FROM order_items oi + JOIN _order o ON oi.order_id = o.order_id + WHERE oi.prod_pid = p.pid + AND o.date_placed != '0000-00-00 00:00:00' + AND o.date_shipped = '0000-00-00 00:00:00' + AND oi.pick_finished = 0 + AND oi.qty_back = 0 + AND o.order_status != 15 + AND o.order_status < 90 + AND oi.qty_ordered >= oi.qty_placed + AND oi.qty_ordered > 0 + ), 0 + ) as stock_quantity, + COALESCE( + (SELECT SUM(oi.qty_ordered - oi.qty_placed) + FROM order_items oi + JOIN _order o ON oi.order_id = o.order_id + WHERE oi.prod_pid = p.pid + AND o.date_placed != '0000-00-00 00:00:00' + AND o.date_shipped = '0000-00-00 00:00:00' + AND oi.pick_finished = 0 + AND oi.qty_back = 0 + AND o.order_status != 15 + AND o.order_status < 90 + AND oi.qty_ordered >= oi.qty_placed + AND oi.qty_ordered > 0 + ), 0 + ) as pending_qty, + COALESCE(ci.onpreorder, 0) as preorder_count, + COALESCE(pnb.inventory, 0) as notions_inv_count, + COALESCE(pcp.price_each, 0) as price, + COALESCE(p.sellingprice, 0) AS regular_price, + CASE + WHEN EXISTS (SELECT 1 FROM product_inventory WHERE pid = p.pid AND count > 0) + THEN (SELECT ROUND(AVG(costeach), 5) FROM product_inventory WHERE pid = p.pid AND count > 0) + ELSE (SELECT costeach FROM product_inventory WHERE pid = p.pid ORDER BY daterec DESC LIMIT 1) + END AS cost_price, + NULL as landing_cost_price, + s.companyname AS vendor, + CASE + WHEN s.companyname = 'Notions' THEN sid.notions_itemnumber + ELSE sid.supplier_itemnumber + END AS vendor_reference, + sid.notions_itemnumber AS notions_reference, + CONCAT('https://www.acherryontop.com/shop/product/', p.pid) AS permalink, + pc1.name AS brand, + pc2.name AS line, + pc3.name AS subline, + pc4.name AS artist, + COALESCE(CASE + WHEN sid.supplier_id = 92 THEN sid.notions_qty_per_unit + ELSE sid.supplier_qty_per_unit + END, sid.notions_qty_per_unit) AS moq, + p.rating, + p.rating_votes AS reviews, + p.weight, + p.length, + p.width, + p.height, + p.country_of_origin, + (SELECT COUNT(*) FROM mybasket mb WHERE mb.item = p.pid AND mb.qty > 0) AS baskets, + (SELECT COUNT(*) FROM product_notify pn WHERE pn.pid = p.pid) AS notifies, + p.totalsold AS total_sold, + pls.date_sold as date_last_sold, + GROUP_CONCAT(DISTINCT CASE + WHEN pc.cat_id IS NOT NULL + AND pc.type IN (10, 20, 11, 21, 12, 13) + AND pci.cat_id NOT IN (16, 17) + THEN pci.cat_id + END) as category_ids FROM products p - FORCE INDEX (PRIMARY) - LEFT JOIN current_inventory ci FORCE INDEX (PRIMARY) ON p.pid = ci.pid - LEFT JOIN product_current_prices pcp ON p.pid = pcp.pid AND pcp.active = 1 + LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0 + LEFT JOIN current_inventory ci ON p.pid = ci.pid LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid + LEFT JOIN product_current_prices pcp ON p.pid = pcp.pid AND pcp.active = 1 + LEFT JOIN supplier_item_data sid ON p.pid = sid.pid + LEFT JOIN suppliers s ON sid.supplier_id = s.supplierid + LEFT JOIN product_category_index pci ON p.pid = pci.pid + LEFT JOIN product_categories pc ON pci.cat_id = pc.cat_id + LEFT JOIN product_categories pc1 ON p.company = pc1.cat_id + LEFT JOIN product_categories pc2 ON p.line = pc2.cat_id + LEFT JOIN product_categories pc3 ON p.subline = pc3.cat_id + LEFT JOIN product_categories pc4 ON p.artist = pc4.cat_id + LEFT JOIN product_last_sold pls ON p.pid = pls.pid WHERE ${incrementalUpdate ? ` p.stamp > ? OR ci.stamp > ? OR @@ -92,161 +193,26 @@ async function materializeCalculations(prodConnection, localConnection, incremen pcp.date_active > ? OR pnb.date_updated > ? ` : 'TRUE'} + GROUP BY p.pid `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []); - if (total === 0) { - return []; - } - outputProgress({ status: "running", operation: "Products import", - message: `Found ${total} products to process` + message: `Processing ${prodData.length} product records` }); - // Process in batches - const BATCH_SIZE = 5000; - const results = []; - - for (let offset = 0; offset < total; offset += BATCH_SIZE) { - outputProgress({ - status: "running", - operation: "Products import", - message: `Fetching products ${offset + 1} to ${Math.min(offset + BATCH_SIZE, total)} of ${total}` - }); - - // Get batch of products - const [batchData] = await prodConnection.query(` - SELECT - p.pid, - p.description AS title, - p.notes AS description, - p.itemnumber AS SKU, - p.date_created, - p.datein AS first_received, - p.location, - p.upc AS barcode, - p.harmonized_tariff_code, - p.stamp AS updated_at, - CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible, - CASE - WHEN p.reorder < 0 THEN 0 - WHEN ( - (IFNULL(pls.date_sold, '0000-00-00') = '0000-00-00' OR pls.date_sold <= DATE_SUB(CURDATE(), INTERVAL 5 YEAR)) - OR (p.datein = '0000-00-00 00:00:00' OR p.datein <= DATE_SUB(NOW(), INTERVAL 5 YEAR)) - OR (p.date_refill = '0000-00-00 00:00:00' OR p.date_refill <= DATE_SUB(NOW(), INTERVAL 5 YEAR)) - ) THEN 0 - ELSE 1 - END AS replenishable, - COALESCE(si.available_local, 0) - COALESCE( - (SELECT SUM(oi.qty_ordered - oi.qty_placed) - FROM order_items oi - FORCE INDEX (prod_pid) - JOIN _order o FORCE INDEX (PRIMARY) ON oi.order_id = o.order_id - WHERE oi.prod_pid = p.pid - AND o.date_placed != '0000-00-00 00:00:00' - AND o.date_shipped = '0000-00-00 00:00:00' - AND oi.pick_finished = 0 - AND oi.qty_back = 0 - AND o.order_status != 15 - AND o.order_status < 90 - AND oi.qty_ordered >= oi.qty_placed - AND oi.qty_ordered > 0 - ), 0 - ) as stock_quantity, - COALESCE(ci.onpreorder, 0) as preorder_count, - COALESCE(pnb.inventory, 0) as notions_inv_count, - COALESCE(pcp.price_each, 0) as price, - COALESCE(p.sellingprice, 0) AS regular_price, - CASE - WHEN EXISTS ( - SELECT 1 FROM product_inventory pi - FORCE INDEX (idx_pid) - WHERE pi.pid = p.pid AND pi.count > 0 LIMIT 1 - ) - THEN ( - SELECT ROUND(AVG(costeach), 5) - FROM product_inventory pi - FORCE INDEX (idx_pid) - WHERE pi.pid = p.pid AND pi.count > 0 - ) - ELSE ( - SELECT costeach - FROM product_inventory pi - FORCE INDEX (idx_pid, daterec) - WHERE pi.pid = p.pid - ORDER BY daterec DESC LIMIT 1 - ) - END AS cost_price, - NULL as landing_cost_price, - s.companyname AS vendor, - CASE - WHEN s.companyname = 'Notions' THEN sid.notions_itemnumber - ELSE sid.supplier_itemnumber - END AS vendor_reference, - sid.notions_itemnumber AS notions_reference, - CONCAT('https://www.acherryontop.com/shop/product/', p.pid) AS permalink, - pc1.name AS brand, - pc2.name AS line, - pc3.name AS subline, - pc4.name AS artist, - COALESCE(CASE - WHEN sid.supplier_id = 92 THEN sid.notions_qty_per_unit - ELSE sid.supplier_qty_per_unit - END, sid.notions_qty_per_unit) AS moq, - p.rating, - p.rating_votes AS reviews, - p.weight, - p.length, - p.width, - p.height, - p.country_of_origin, - (SELECT COUNT(*) FROM mybasket mb FORCE INDEX (idx_item) WHERE mb.item = p.pid AND mb.qty > 0) AS baskets, - (SELECT COUNT(*) FROM product_notify pn WHERE pn.pid = p.pid) AS notifies, - p.totalsold AS total_sold, - pls.date_sold as date_last_sold, - GROUP_CONCAT(DISTINCT CASE - WHEN pc.cat_id IS NOT NULL - AND pc.type IN (10, 20, 11, 21, 12, 13) - AND pci.cat_id NOT IN (16, 17) - THEN pci.cat_id - END) as category_ids - FROM products p - FORCE INDEX (PRIMARY) - LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0 - LEFT JOIN current_inventory ci FORCE INDEX (PRIMARY) ON p.pid = ci.pid - LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid - LEFT JOIN product_current_prices pcp ON p.pid = pcp.pid AND pcp.active = 1 - LEFT JOIN supplier_item_data sid ON p.pid = sid.pid - LEFT JOIN suppliers s ON sid.supplier_id = s.supplierid - LEFT JOIN product_category_index pci FORCE INDEX (pid) ON p.pid = pci.pid - LEFT JOIN product_categories pc ON pci.cat_id = pc.cat_id - LEFT JOIN product_categories pc1 ON p.company = pc1.cat_id - LEFT JOIN product_categories pc2 ON p.line = pc2.cat_id - LEFT JOIN product_categories pc3 ON p.subline = pc3.cat_id - LEFT JOIN product_categories pc4 ON p.artist = pc4.cat_id - LEFT JOIN product_last_sold pls ON p.pid = pls.pid - WHERE ${incrementalUpdate ? ` - p.stamp > ? OR - ci.stamp > ? OR - pcp.date_deactive > ? OR - pcp.date_active > ? OR - pnb.date_updated > ? - ` : 'TRUE'} - GROUP BY p.pid - LIMIT ? OFFSET ? - `, [...(incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []), BATCH_SIZE, offset]); - - if (batchData.length === 0) break; - - // Insert batch into temp table - const values = batchData.map(row => [ + // Insert all product data into temp table in batches + for (let i = 0; i < prodData.length; i += 1000) { + const batch = prodData.slice(i, i + 1000); + const values = batch.map(row => [ row.pid, row.title, row.description, row.SKU, + // Set stock quantity to 0 if it's over 5000 row.stock_quantity > 5000 ? 0 : Math.max(0, row.stock_quantity), - row.pending_qty || 0, + row.pending_qty, row.preorder_count, row.notions_inv_count, row.price, @@ -260,7 +226,7 @@ async function materializeCalculations(prodConnection, localConnection, incremen row.subline, row.artist, row.category_ids, - row.date_created, + row.date_created, // map to created_at row.first_received, row.landing_cost_price, row.barcode, @@ -285,66 +251,74 @@ async function materializeCalculations(prodConnection, localConnection, incremen true // Mark as needing update ]); - await localConnection.query(` - INSERT INTO temp_products ( - pid, title, description, SKU, - stock_quantity, pending_qty, preorder_count, notions_inv_count, - price, regular_price, cost_price, - vendor, vendor_reference, notions_reference, - brand, line, subline, artist, - category_ids, created_at, first_received, - landing_cost_price, barcode, harmonized_tariff_code, - updated_at, visible, replenishable, permalink, - moq, rating, reviews, weight, length, width, - height, country_of_origin, location, total_sold, - baskets, notifies, date_last_sold, needs_update - ) - VALUES ? - ON DUPLICATE KEY UPDATE - title = VALUES(title), - description = VALUES(description), - SKU = VALUES(SKU), - stock_quantity = VALUES(stock_quantity), - pending_qty = VALUES(pending_qty), - preorder_count = VALUES(preorder_count), - notions_inv_count = VALUES(notions_inv_count), - price = VALUES(price), - regular_price = VALUES(regular_price), - cost_price = VALUES(cost_price), - vendor = VALUES(vendor), - vendor_reference = VALUES(vendor_reference), - notions_reference = VALUES(notions_reference), - brand = VALUES(brand), - line = VALUES(line), - subline = VALUES(subline), - artist = VALUES(artist), - category_ids = VALUES(category_ids), - created_at = VALUES(created_at), - first_received = VALUES(first_received), - landing_cost_price = VALUES(landing_cost_price), - barcode = VALUES(barcode), - harmonized_tariff_code = VALUES(harmonized_tariff_code), - updated_at = VALUES(updated_at), - visible = VALUES(visible), - replenishable = VALUES(replenishable), - permalink = VALUES(permalink), - moq = VALUES(moq), - rating = VALUES(rating), - reviews = VALUES(reviews), - weight = VALUES(weight), - length = VALUES(length), - width = VALUES(width), - height = VALUES(height), - country_of_origin = VALUES(country_of_origin), - location = VALUES(location), - total_sold = VALUES(total_sold), - baskets = VALUES(baskets), - notifies = VALUES(notifies), - date_last_sold = VALUES(date_last_sold), - needs_update = TRUE - `, [values]); + if (values.length > 0) { + await localConnection.query(` + INSERT INTO temp_products ( + pid, title, description, SKU, + stock_quantity, pending_qty, preorder_count, notions_inv_count, + price, regular_price, cost_price, + vendor, vendor_reference, notions_reference, + brand, line, subline, artist, + category_ids, created_at, first_received, + landing_cost_price, barcode, harmonized_tariff_code, + updated_at, visible, replenishable, permalink, + moq, rating, reviews, weight, length, width, + height, country_of_origin, location, total_sold, + baskets, notifies, date_last_sold, needs_update + ) + VALUES ? + ON DUPLICATE KEY UPDATE + title = VALUES(title), + description = VALUES(description), + SKU = VALUES(SKU), + stock_quantity = VALUES(stock_quantity), + pending_qty = VALUES(pending_qty), + preorder_count = VALUES(preorder_count), + notions_inv_count = VALUES(notions_inv_count), + price = VALUES(price), + regular_price = VALUES(regular_price), + cost_price = VALUES(cost_price), + vendor = VALUES(vendor), + vendor_reference = VALUES(vendor_reference), + notions_reference = VALUES(notions_reference), + brand = VALUES(brand), + line = VALUES(line), + subline = VALUES(subline), + artist = VALUES(artist), + category_ids = VALUES(category_ids), + created_at = VALUES(created_at), + first_received = VALUES(first_received), + landing_cost_price = VALUES(landing_cost_price), + barcode = VALUES(barcode), + harmonized_tariff_code = VALUES(harmonized_tariff_code), + updated_at = VALUES(updated_at), + visible = VALUES(visible), + replenishable = VALUES(replenishable), + permalink = VALUES(permalink), + moq = VALUES(moq), + rating = VALUES(rating), + reviews = VALUES(reviews), + weight = VALUES(weight), + length = VALUES(length), + width = VALUES(width), + height = VALUES(height), + country_of_origin = VALUES(country_of_origin), + location = VALUES(location), + total_sold = VALUES(total_sold), + baskets = VALUES(baskets), + notifies = VALUES(notifies), + date_last_sold = VALUES(date_last_sold), + needs_update = TRUE + `, [values]); + } - results.push(...batchData); + outputProgress({ + status: "running", + operation: "Products import", + message: `Processed ${Math.min(i + 1000, prodData.length)} of ${prodData.length} product records`, + current: i + batch.length, + total: prodData.length + }); } outputProgress({ @@ -352,8 +326,6 @@ async function materializeCalculations(prodConnection, localConnection, incremen operation: "Products import", message: "Finished materializing calculations" }); - - return results; } async function importProducts(prodConnection, localConnection, incrementalUpdate = true) { @@ -379,45 +351,11 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate console.log('Products: Using last sync time:', lastSyncTime); - // Cache valid categories and their hierarchy at the start - const [categories] = await localConnection.query(` - WITH RECURSIVE category_hierarchy AS ( - SELECT - cat_id, - parent_id, - type, - 1 as level, - CAST(cat_id AS CHAR(200)) as path - FROM categories - UNION ALL - SELECT - c.cat_id, - c.parent_id, - c.type, - ch.level + 1, - CONCAT(ch.path, ',', c.cat_id) - FROM categories c - JOIN category_hierarchy ch ON c.parent_id = ch.cat_id - WHERE ch.level < 10 -- Prevent infinite recursion - ) - SELECT DISTINCT - cat_id, - parent_id, - type, - path, - level - FROM category_hierarchy - ORDER BY level DESC - `); - - const validCategories = new Map(categories.map(c => [c.cat_id, c])); - const validCategoryIds = new Set(categories.map(c => c.cat_id)); - // Setup temporary tables await setupAndCleanupTempTables(localConnection, 'setup'); // Materialize calculations - this will populate temp_products - const results = await materializeCalculations(prodConnection, localConnection, incrementalUpdate, lastSyncTime); + await materializeCalculations(prodConnection, localConnection, incrementalUpdate, lastSyncTime); // Get actual count from temp table - only count products that need updates const [[{ actualTotal }]] = await localConnection.query(` @@ -431,7 +369,6 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate // Process in batches const BATCH_SIZE = 5000; let processed = 0; - let categoryBuffer = []; while (processed < actualTotal) { const [batch] = await localConnection.query(` @@ -531,45 +468,113 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate recordsUpdated += insertsAndUpdates.updates.length; } + if (insertsAndUpdates.updates.length > 0 || insertsAndUpdates.inserts.length > 0) { + const affectedPids = [ + ...insertsAndUpdates.updates.map(p => p.pid), + ...insertsAndUpdates.inserts.map(p => p.pid) + ]; + } + // Process category relationships - batch - .filter(p => p.category_ids) - .forEach(product => { - const productCategories = product.category_ids - .split(',') - .map(id => id.trim()) - .filter(id => id) - .map(Number) - .filter(id => !isNaN(id)) - .filter(id => validCategoryIds.has(id)) - .map(id => validCategories.get(id)) - .sort((a, b) => a.type - b.type); // Sort by type to ensure proper hierarchy + if (batch.some(p => p.category_ids)) { + // First get all valid categories + const allCategoryIds = [...new Set( + batch + .filter(p => p.category_ids) + .flatMap(product => + product.category_ids + .split(',') + .map(id => id.trim()) + .filter(id => id) + .map(Number) + .filter(id => !isNaN(id)) + ) + )]; - // Only add relationships that maintain proper hierarchy - productCategories.forEach(category => { - if (category.path.split(',').every(parentId => - validCategoryIds.has(Number(parentId)) - )) { - categoryBuffer.push([category.cat_id, product.pid]); - } - }); - }); + // Verify categories exist and get their hierarchy + const [categories] = await localConnection.query(` + WITH RECURSIVE category_hierarchy AS ( + SELECT + cat_id, + parent_id, + type, + 1 as level, + CAST(cat_id AS CHAR(200)) as path + FROM categories + WHERE cat_id IN (?) + UNION ALL + SELECT + c.cat_id, + c.parent_id, + c.type, + ch.level + 1, + CONCAT(ch.path, ',', c.cat_id) + FROM categories c + JOIN category_hierarchy ch ON c.parent_id = ch.cat_id + WHERE ch.level < 10 -- Prevent infinite recursion + ) + SELECT + h.cat_id, + h.parent_id, + h.type, + h.path, + h.level + FROM ( + SELECT DISTINCT cat_id, parent_id, type, path, level + FROM category_hierarchy + WHERE cat_id IN (?) + ) h + ORDER BY h.level DESC + `, [allCategoryIds, allCategoryIds]); - // Process category buffer if it gets too large - if (categoryBuffer.length >= 10000) { - if (categoryBuffer.length > 0) { - const placeholders = categoryBuffer - .map(() => "(?, ?)") - .join(","); + const validCategories = new Map(categories.map(c => [c.cat_id, c])); + const validCategoryIds = new Set(categories.map(c => c.cat_id)); - await localConnection.query(` - INSERT IGNORE INTO product_categories (cat_id, pid) - VALUES ${placeholders} - ON DUPLICATE KEY UPDATE cat_id = VALUES(cat_id) - `, categoryBuffer.flat()); + // Build category relationships ensuring proper hierarchy + const categoryRelationships = []; + batch + .filter(p => p.category_ids) + .forEach(product => { + const productCategories = product.category_ids + .split(',') + .map(id => id.trim()) + .filter(id => id) + .map(Number) + .filter(id => !isNaN(id)) + .filter(id => validCategoryIds.has(id)) + .map(id => validCategories.get(id)) + .sort((a, b) => a.type - b.type); // Sort by type to ensure proper hierarchy - categoryBuffer = []; - } + // Only add relationships that maintain proper hierarchy + productCategories.forEach(category => { + if (category.path.split(',').every(parentId => + validCategoryIds.has(Number(parentId)) + )) { + categoryRelationships.push([category.cat_id, product.pid]); + } + }); + }); + + if (categoryRelationships.length > 0) { + // First remove any existing relationships that will be replaced + await localConnection.query(` + DELETE FROM product_categories + WHERE pid IN (?) AND cat_id IN (?) + `, [ + [...new Set(categoryRelationships.map(([_, pid]) => pid))], + [...new Set(categoryRelationships.map(([catId, _]) => catId))] + ]); + + // Then insert the new relationships + const placeholders = categoryRelationships + .map(() => "(?, ?)") + .join(","); + + await localConnection.query(` + INSERT INTO product_categories (cat_id, pid) + VALUES ${placeholders} + `, categoryRelationships.flat()); + } } } @@ -587,19 +592,6 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate }); } - // Process any remaining category relationships - if (categoryBuffer.length > 0) { - const placeholders = categoryBuffer - .map(() => "(?, ?)") - .join(","); - - await localConnection.query(` - INSERT IGNORE INTO product_categories (cat_id, pid) - VALUES ${placeholders} - ON DUPLICATE KEY UPDATE cat_id = VALUES(cat_id) - `, categoryBuffer.flat()); - } - // Drop temporary tables await setupAndCleanupTempTables(localConnection, 'cleanup');