From 07f14c0017e41aea4c39c9ec383058eb05fe386c Mon Sep 17 00:00:00 2001 From: Matt Date: Sat, 1 Feb 2025 01:06:45 -0500 Subject: [PATCH] Fix/add data to orders script and fix other import errors --- inventory-server/db/schema.sql | 3 +- inventory-server/scripts/import/orders.js | 50 +- inventory-server/scripts/import/products.js | 855 +++++++++----------- 3 files changed, 409 insertions(+), 499 deletions(-) diff --git a/inventory-server/db/schema.sql b/inventory-server/db/schema.sql index 372dfb6..38e2531 100644 --- a/inventory-server/db/schema.sql +++ b/inventory-server/db/schema.sql @@ -39,7 +39,7 @@ CREATE TABLE products ( tags TEXT, moq INT DEFAULT 1, uom INT DEFAULT 1, - rating TINYINT UNSIGNED DEFAULT 0, + rating DECIMAL(10,2) DEFAULT 0.00, reviews INT UNSIGNED DEFAULT 0, weight DECIMAL(10,3), length DECIMAL(10,3), @@ -113,6 +113,7 @@ CREATE TABLE IF NOT EXISTS orders ( tax DECIMAL(10,3) DEFAULT 0.000, tax_included TINYINT(1) DEFAULT 0, shipping DECIMAL(10,3) DEFAULT 0.000, + costeach DECIMAL(10,3) DEFAULT 0.000, customer VARCHAR(50) NOT NULL, customer_name VARCHAR(100), status VARCHAR(20) DEFAULT 'pending', diff --git a/inventory-server/scripts/import/orders.js b/inventory-server/scripts/import/orders.js index 1ba7d93..442d107 100644 --- a/inventory-server/scripts/import/orders.js +++ b/inventory-server/scripts/import/orders.js @@ -60,6 +60,14 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = PRIMARY KEY (order_id, pid) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; `); + await localConnection.query(` + CREATE TABLE IF NOT EXISTS temp_order_costs ( + order_id INT UNSIGNED NOT NULL, + pid INT UNSIGNED NOT NULL, + costeach DECIMAL(10,3) DEFAULT 0.000, + PRIMARY KEY (order_id, pid) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8; + `); // Get column names from the local table const [columns] = await localConnection.query(` @@ -117,7 +125,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = oi.prod_itemnumber as SKU, oi.prod_price as price, oi.qty_ordered as quantity, - COALESCE(oi.prod_price_reg - oi.prod_price, 0) * oi.qty_ordered as base_discount, + COALESCE(oi.prod_price_reg - oi.prod_price, 0) as base_discount, oi.stamp as last_modified FROM order_items oi USE INDEX (PRIMARY) @@ -271,6 +279,26 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = } } + // Get costeach values in batches + for (let i = 0; i < orderIds.length; i += 5000) { + const batchIds = orderIds.slice(i, i + 5000); + const [costs] = await prodConnection.query(` + SELECT orderid as order_id, pid, costeach + FROM order_costs + WHERE orderid IN (?) + `, [batchIds]); + + if (costs.length > 0) { + const placeholders = costs.map(() => '(?, ?, ?)').join(","); + const values = costs.flatMap(c => [c.order_id, c.pid, c.costeach]); + await localConnection.query(` + INSERT INTO temp_order_costs (order_id, pid, costeach) + VALUES ${placeholders} + ON DUPLICATE KEY UPDATE costeach = VALUES(costeach) + `, values); + } + } + // Now combine all the data and insert into orders table let importedCount = 0; @@ -302,18 +330,19 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = om.customer, om.customer_name, om.status, - om.canceled + om.canceled, + COALESCE(tc.costeach, 0) as costeach FROM temp_order_items oi JOIN temp_order_meta om ON oi.order_id = om.order_id LEFT JOIN temp_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid LEFT JOIN temp_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid + LEFT JOIN temp_order_costs tc ON oi.order_id = tc.order_id AND oi.pid = tc.pid WHERE oi.order_id IN (?) `, [batchIds]); // Filter orders and track missing products - do this in a single pass const validOrders = []; const values = []; - for (const order of orders) { if (!existingPids.has(order.pid)) { missingProducts.add(order.pid); @@ -331,7 +360,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = // First check which orders exist and get their current values const [existingOrders] = await localConnection.query( - `SELECT ${columnNames.join(',')} FROM orders WHERE (order_number, pid) IN (${validOrders.map(() => "(?,?)").join(",")})`, + `SELECT ${columnNames.join(",")} FROM orders WHERE (order_number, pid) IN (${validOrders.map(() => "(?,?)").join(",")})`, validOrders.flatMap(o => [o.order_number, o.pid]) ); const existingOrderMap = new Map( @@ -347,13 +376,11 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = const hasChanges = columnNames.some(col => { const newVal = order[col] ?? null; const oldVal = existing[col] ?? null; - // Special handling for numbers to avoid type coercion issues if (typeof newVal === 'number' && typeof oldVal === 'number') { return Math.abs(newVal - oldVal) > 0.00001; // Allow for tiny floating point differences } return newVal !== oldVal; }); - if (hasChanges) { acc.updates.push({ order_number: order.order_number, @@ -367,7 +394,15 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = values: columnNames.map(col => order[col] ?? null) }); } - return acc; + } else { + acc.inserts.push({ + order_number: order.order_number, + pid: order.pid, + values: columnNames.map(col => order[col] ?? null) + }); + } + return acc; + }, { inserts: [], updates: [] }); // Handle inserts if (insertsAndUpdates.inserts.length > 0) { @@ -427,6 +462,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = DROP TEMPORARY TABLE IF EXISTS temp_order_meta; DROP TEMPORARY TABLE IF EXISTS temp_order_discounts; DROP TEMPORARY TABLE IF EXISTS temp_order_taxes; + DROP TEMPORARY TABLE IF EXISTS temp_order_costs; `); // Import missing products if any diff --git a/inventory-server/scripts/import/products.js b/inventory-server/scripts/import/products.js index 70fe5e2..5986858 100644 --- a/inventory-server/scripts/import/products.js +++ b/inventory-server/scripts/import/products.js @@ -15,147 +15,299 @@ const getImageUrls = (pid, iid = 1) => { }; }; -async function setupTemporaryTables(connection) { - await connection.query(`CREATE TEMPORARY TABLE IF NOT EXISTS temp_categories ( cat_id INT PRIMARY KEY, name VARCHAR(255) ) ENGINE=InnoDB;`); - await connection.query(`CREATE TEMPORARY TABLE IF NOT EXISTS temp_product_images ( pid INT, iid INT, image_type ENUM('thumbnail', '175', 'full'), url VARCHAR(255), PRIMARY KEY (pid, image_type) ) ENGINE=InnoDB;`); - await connection.query(`CREATE TEMPORARY TABLE IF NOT EXISTS temp_inventory_status ( pid INT PRIMARY KEY, stock_quantity INT, pending_qty INT, preorder_count INT, notions_inv_count INT, needs_update BOOLEAN ) ENGINE=InnoDB;`); - await connection.query(`CREATE TEMPORARY TABLE IF NOT EXISTS temp_product_prices ( pid INT PRIMARY KEY, price DECIMAL(10,2), regular_price DECIMAL(10,2), cost_price DECIMAL(10,5), needs_update BOOLEAN ) ENGINE=InnoDB;`); - await connection.query(`INSERT INTO temp_categories SELECT cat_id, name FROM categories;`); - await connection.query(`CREATE INDEX idx_temp_cat_id ON temp_categories(cat_id);`); +async function setupAndCleanupTempTables(connection, operation = 'setup') { + if (operation === 'setup') { + await connection.query(` + CREATE TEMPORARY TABLE IF NOT EXISTS temp_products ( + pid BIGINT NOT NULL, + title VARCHAR(255), + description TEXT, + SKU VARCHAR(50), + stock_quantity INT DEFAULT 0, + pending_qty INT DEFAULT 0, + preorder_count INT DEFAULT 0, + notions_inv_count INT DEFAULT 0, + price DECIMAL(10,3) NOT NULL DEFAULT 0, + regular_price DECIMAL(10,3) NOT NULL DEFAULT 0, + cost_price DECIMAL(10,3), + vendor VARCHAR(100), + vendor_reference VARCHAR(100), + notions_reference VARCHAR(100), + brand VARCHAR(100), + line VARCHAR(100), + subline VARCHAR(100), + artist VARCHAR(100), + category_ids TEXT, + created_at DATETIME, + first_received DATETIME, + landing_cost_price DECIMAL(10,3), + barcode VARCHAR(50), + harmonized_tariff_code VARCHAR(50), + updated_at DATETIME, + visible BOOLEAN, + replenishable BOOLEAN, + permalink VARCHAR(255), + moq DECIMAL(10,3), + rating DECIMAL(10,2), + reviews INT, + weight DECIMAL(10,3), + length DECIMAL(10,3), + width DECIMAL(10,3), + height DECIMAL(10,3), + country_of_origin VARCHAR(100), + location VARCHAR(100), + total_sold INT, + baskets INT, + notifies INT, + date_last_sold DATETIME, + needs_update BOOLEAN DEFAULT TRUE, + PRIMARY KEY (pid), + INDEX idx_needs_update (needs_update) + ) ENGINE=InnoDB; + `); + } else { + await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_products;'); + } } -async function cleanupTemporaryTables(connection) { - await connection.query(` - DROP TEMPORARY TABLE IF EXISTS temp_categories; - DROP TEMPORARY TABLE IF EXISTS temp_product_images; - DROP TEMPORARY TABLE IF EXISTS temp_inventory_status; - DROP TEMPORARY TABLE IF EXISTS temp_product_prices; - `); -} - -async function materializeCalculations(prodConnection, localConnection) { +async function materializeCalculations(prodConnection, localConnection, incrementalUpdate = true, lastSyncTime = '1970-01-01') { outputProgress({ status: "running", operation: "Products import", - message: "Fetching inventory and order data from production" + message: "Fetching product data from production" }); - // Get all inventory and order data from production in one query - const [prodInventory] = await prodConnection.query(` + // Get all product data in a single optimized query + const [prodData] = await prodConnection.query(` SELECT p.pid, - COALESCE(si.available_local, 0) as stock_quantity, + p.description AS title, + p.notes AS description, + p.itemnumber AS SKU, + p.date_created, + p.datein AS first_received, + p.location, + p.upc AS barcode, + p.harmonized_tariff_code, + p.stamp AS updated_at, + CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible, + CASE + WHEN p.reorder < 0 THEN 0 + WHEN ( + (IFNULL(pls.date_sold, '0000-00-00') = '0000-00-00' OR pls.date_sold <= DATE_SUB(CURDATE(), INTERVAL 5 YEAR)) + OR (p.datein = '0000-00-00 00:00:00' OR p.datein <= DATE_SUB(NOW(), INTERVAL 5 YEAR)) + OR (p.date_refill = '0000-00-00 00:00:00' OR p.date_refill <= DATE_SUB(NOW(), INTERVAL 5 YEAR)) + ) THEN 0 + ELSE 1 + END AS replenishable, + COALESCE(si.available_local, 0) - COALESCE( + (SELECT SUM(oi.qty_ordered - oi.qty_placed) + FROM order_items oi + JOIN _order o ON oi.order_id = o.order_id + WHERE oi.prod_pid = p.pid + AND o.date_placed != '0000-00-00 00:00:00' + AND o.date_shipped = '0000-00-00 00:00:00' + AND oi.pick_finished = 0 + AND oi.qty_back = 0 + AND o.order_status != 15 + AND o.order_status < 90 + AND oi.qty_ordered >= oi.qty_placed + AND oi.qty_ordered > 0 + ), 0 + ) as stock_quantity, + COALESCE( + (SELECT SUM(oi.qty_ordered - oi.qty_placed) + FROM order_items oi + JOIN _order o ON oi.order_id = o.order_id + WHERE oi.prod_pid = p.pid + AND o.date_placed != '0000-00-00 00:00:00' + AND o.date_shipped = '0000-00-00 00:00:00' + AND oi.pick_finished = 0 + AND oi.qty_back = 0 + AND o.order_status != 15 + AND o.order_status < 90 + AND oi.qty_ordered >= oi.qty_placed + AND oi.qty_ordered > 0 + ), 0 + ) as pending_qty, COALESCE(ci.onpreorder, 0) as preorder_count, COALESCE(pnb.inventory, 0) as notions_inv_count, - COALESCE( - ( - SELECT SUM(oi.qty_ordered - oi.qty_placed) - FROM order_items oi - JOIN _order o ON oi.order_id = o.order_id - WHERE oi.prod_pid = p.pid - AND o.date_placed != '0000-00-00 00:00:00' - AND o.date_shipped = '0000-00-00 00:00:00' - AND oi.pick_finished = 0 - AND oi.qty_back = 0 - AND o.order_status != 15 - AND o.order_status < 90 - AND oi.qty_ordered >= oi.qty_placed - AND oi.qty_ordered > 0 - ), 0 - ) as pending_qty - FROM products p - LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0 - LEFT JOIN current_inventory ci ON p.pid = ci.pid - LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid - `); - - outputProgress({ - status: "running", - operation: "Products import", - message: `Processing ${prodInventory.length} inventory records` - }); - - // Insert inventory data into local temp table in batches - for (let i = 0; i < prodInventory.length; i += 1000) { - const batch = prodInventory.slice(i, i + 1000); - const values = batch.map(row => [ - row.pid, - Math.max(0, row.stock_quantity - row.pending_qty), // Calculate final stock quantity - row.pending_qty, - row.preorder_count, - row.notions_inv_count, - true // Mark as needing update - ]); - - if (values.length > 0) { - await localConnection.query(` - INSERT INTO temp_inventory_status (pid, stock_quantity, pending_qty, preorder_count, notions_inv_count, needs_update) - VALUES ? - ON DUPLICATE KEY UPDATE - stock_quantity = VALUES(stock_quantity), - pending_qty = VALUES(pending_qty), - preorder_count = VALUES(preorder_count), - notions_inv_count = VALUES(notions_inv_count), - needs_update = TRUE - `, [values]); - } - - outputProgress({ - status: "running", - operation: "Products import", - message: `Processed ${Math.min(i + 1000, prodInventory.length)} of ${prodInventory.length} inventory records`, - current: i + batch.length, - total: prodInventory.length - }); - } - - outputProgress({ - status: "running", - operation: "Products import", - message: "Fetching pricing data from production" - }); - - // Get prices from production - const [prodPrices] = await prodConnection.query(` - SELECT - p.pid, COALESCE(pcp.price_each, 0) as price, COALESCE(p.sellingprice, 0) AS regular_price, CASE WHEN EXISTS (SELECT 1 FROM product_inventory WHERE pid = p.pid AND count > 0) THEN (SELECT ROUND(AVG(costeach), 5) FROM product_inventory WHERE pid = p.pid AND count > 0) ELSE (SELECT costeach FROM product_inventory WHERE pid = p.pid ORDER BY daterec DESC LIMIT 1) - END AS cost_price + END AS cost_price, + NULL as landing_cost_price, + s.companyname AS vendor, + CASE + WHEN s.companyname = 'Notions' THEN sid.notions_itemnumber + ELSE sid.supplier_itemnumber + END AS vendor_reference, + sid.notions_itemnumber AS notions_reference, + CONCAT('https://www.acherryontop.com/shop/product/', p.pid) AS permalink, + pc1.name AS brand, + pc2.name AS line, + pc3.name AS subline, + pc4.name AS artist, + COALESCE(CASE + WHEN sid.supplier_id = 92 THEN sid.notions_qty_per_unit + ELSE sid.supplier_qty_per_unit + END, sid.notions_qty_per_unit) AS moq, + p.rating, + p.rating_votes AS reviews, + p.weight, + p.length, + p.width, + p.height, + p.country_of_origin, + (SELECT COUNT(*) FROM mybasket mb WHERE mb.item = p.pid AND mb.qty > 0) AS baskets, + (SELECT COUNT(*) FROM product_notify pn WHERE pn.pid = p.pid) AS notifies, + p.totalsold AS total_sold, + pls.date_sold as date_last_sold, + GROUP_CONCAT(DISTINCT CASE + WHEN pc.cat_id IS NOT NULL + AND pc.type IN (10, 20, 11, 21, 12, 13) + AND pci.cat_id NOT IN (16, 17) + THEN pci.cat_id + END) as category_ids FROM products p - LEFT JOIN product_current_prices pcp ON p.pid = pcp.pid - WHERE pcp.active = 1 - `); + LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0 + LEFT JOIN current_inventory ci ON p.pid = ci.pid + LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid + LEFT JOIN product_current_prices pcp ON p.pid = pcp.pid AND pcp.active = 1 + LEFT JOIN supplier_item_data sid ON p.pid = sid.pid + LEFT JOIN suppliers s ON sid.supplier_id = s.supplierid + LEFT JOIN product_category_index pci ON p.pid = pci.pid + LEFT JOIN product_categories pc ON pci.cat_id = pc.cat_id + LEFT JOIN product_categories pc1 ON p.company = pc1.cat_id + LEFT JOIN product_categories pc2 ON p.line = pc2.cat_id + LEFT JOIN product_categories pc3 ON p.subline = pc3.cat_id + LEFT JOIN product_categories pc4 ON p.artist = pc4.cat_id + LEFT JOIN product_last_sold pls ON p.pid = pls.pid + WHERE ${incrementalUpdate ? ` + p.stamp > ? OR + ci.stamp > ? OR + pcp.date_deactive > ? OR + pcp.date_active > ? OR + pnb.date_updated > ? + ` : 'TRUE'} + GROUP BY p.pid + `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []); outputProgress({ status: "running", operation: "Products import", - message: `Processing ${prodPrices.length} price records` + message: `Processing ${prodData.length} product records` }); - // Insert prices into local temp table in batches - for (let i = 0; i < prodPrices.length; i += 1000) { - const batch = prodPrices.slice(i, i + 1000); + // Insert all product data into temp table in batches + for (let i = 0; i < prodData.length; i += 1000) { + const batch = prodData.slice(i, i + 1000); const values = batch.map(row => [ row.pid, + row.title, + row.description, + row.SKU, + // Set stock quantity to 0 if it's over 5000 + row.stock_quantity > 5000 ? 0 : Math.max(0, row.stock_quantity), + row.pending_qty, + row.preorder_count, + row.notions_inv_count, row.price, row.regular_price, row.cost_price, + row.vendor, + row.vendor_reference, + row.notions_reference, + row.brand, + row.line, + row.subline, + row.artist, + row.category_ids, + row.date_created, // map to created_at + row.first_received, + row.landing_cost_price, + row.barcode, + row.harmonized_tariff_code, + row.updated_at, + row.visible, + row.replenishable, + row.permalink, + row.moq, + row.rating ? Number(row.rating).toFixed(2) : null, + row.reviews, + row.weight, + row.length, + row.width, + row.height, + row.country_of_origin, + row.location, + row.total_sold, + row.baskets, + row.notifies, + row.date_last_sold, true // Mark as needing update ]); if (values.length > 0) { await localConnection.query(` - INSERT INTO temp_product_prices (pid, price, regular_price, cost_price, needs_update) + INSERT INTO temp_products ( + pid, title, description, SKU, + stock_quantity, pending_qty, preorder_count, notions_inv_count, + price, regular_price, cost_price, + vendor, vendor_reference, notions_reference, + brand, line, subline, artist, + category_ids, created_at, first_received, + landing_cost_price, barcode, harmonized_tariff_code, + updated_at, visible, replenishable, permalink, + moq, rating, reviews, weight, length, width, + height, country_of_origin, location, total_sold, + baskets, notifies, date_last_sold, needs_update + ) VALUES ? ON DUPLICATE KEY UPDATE + title = VALUES(title), + description = VALUES(description), + SKU = VALUES(SKU), + stock_quantity = VALUES(stock_quantity), + pending_qty = VALUES(pending_qty), + preorder_count = VALUES(preorder_count), + notions_inv_count = VALUES(notions_inv_count), price = VALUES(price), regular_price = VALUES(regular_price), cost_price = VALUES(cost_price), + vendor = VALUES(vendor), + vendor_reference = VALUES(vendor_reference), + notions_reference = VALUES(notions_reference), + brand = VALUES(brand), + line = VALUES(line), + subline = VALUES(subline), + artist = VALUES(artist), + category_ids = VALUES(category_ids), + created_at = VALUES(created_at), + first_received = VALUES(first_received), + landing_cost_price = VALUES(landing_cost_price), + barcode = VALUES(barcode), + harmonized_tariff_code = VALUES(harmonized_tariff_code), + updated_at = VALUES(updated_at), + visible = VALUES(visible), + replenishable = VALUES(replenishable), + permalink = VALUES(permalink), + moq = VALUES(moq), + rating = VALUES(rating), + reviews = VALUES(reviews), + weight = VALUES(weight), + length = VALUES(length), + width = VALUES(width), + height = VALUES(height), + country_of_origin = VALUES(country_of_origin), + location = VALUES(location), + total_sold = VALUES(total_sold), + baskets = VALUES(baskets), + notifies = VALUES(notifies), + date_last_sold = VALUES(date_last_sold), needs_update = TRUE `, [values]); } @@ -163,9 +315,9 @@ async function materializeCalculations(prodConnection, localConnection) { outputProgress({ status: "running", operation: "Products import", - message: `Processed ${Math.min(i + 1000, prodPrices.length)} of ${prodPrices.length} price records`, + message: `Processed ${Math.min(i + 1000, prodData.length)} of ${prodData.length} product records`, current: i + batch.length, - total: prodPrices.length + total: prodData.length }); } @@ -200,263 +352,32 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate console.log('Products: Using last sync time:', lastSyncTime); // Setup temporary tables - await setupTemporaryTables(localConnection); + await setupAndCleanupTempTables(localConnection, 'setup'); - // Materialize calculations - await materializeCalculations(prodConnection, localConnection); + // Materialize calculations - this will populate temp_products + await materializeCalculations(prodConnection, localConnection, incrementalUpdate, lastSyncTime); - // Optimized count query for changes since last sync - const [countResult] = await prodConnection.query(` - SELECT COUNT(*) as total - FROM products p - LEFT JOIN current_inventory ci ON p.pid = ci.pid - LEFT JOIN product_current_prices pcp ON p.pid = pcp.pid AND pcp.active = 1 - LEFT JOIN supplier_item_data sid ON p.pid = sid.pid - LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid - LEFT JOIN product_last_sold pls ON p.pid = pls.pid - WHERE ${incrementalUpdate ? ` - p.stamp > ? OR - ci.stamp > ? OR - pcp.date_deactive > ? OR - pcp.date_active > ? OR - sid.stamp > ? OR - pnb.date_updated > ? OR - pls.date_sold > ? - ` : 'TRUE'} - `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []); - - console.log('Products: Found changes:', countResult[0].total); - - const totalProducts = countResult[0].total; - - // Main product query using materialized data - modified for incremental - outputProgress({ - status: "running", - operation: "Products import", - message: `Fetching ${incrementalUpdate ? 'updated' : 'all'} product data from production` - }); - - // Create temporary table for production data - await localConnection.query(` - CREATE TEMPORARY TABLE temp_prod_data ( - pid BIGINT NOT NULL, - title VARCHAR(255), - description TEXT, - SKU VARCHAR(50), - date_created TIMESTAMP NULL, - first_received TIMESTAMP NULL, - location VARCHAR(50), - barcode VARCHAR(50), - harmonized_tariff_code VARCHAR(20), - updated_at TIMESTAMP, - visible BOOLEAN, - replenishable BOOLEAN, - vendor VARCHAR(100), - vendor_reference VARCHAR(100), - notions_reference VARCHAR(100), - brand VARCHAR(100), - line VARCHAR(100), - subline VARCHAR(100), - artist VARCHAR(100), - landing_cost_price DECIMAL(10,2) DEFAULT NULL, - permalink VARCHAR(255) DEFAULT NULL, - options TEXT DEFAULT NULL, - tags TEXT DEFAULT NULL, - uom VARCHAR(50) DEFAULT NULL, - baskets INT DEFAULT 0, - notifies INT DEFAULT 0, - moq INT, - rating TINYINT UNSIGNED, - reviews INT UNSIGNED, - weight DECIMAL(10,3), - length DECIMAL(10,3), - width DECIMAL(10,3), - height DECIMAL(10,3), - total_sold INT UNSIGNED, - country_of_origin VARCHAR(5), - date_last_sold DATE, - category_ids TEXT, - needs_update BOOLEAN DEFAULT TRUE, - PRIMARY KEY (pid) - ) ENGINE=InnoDB - `); - - // Get data from production and insert into temp table - const [prodData] = await prodConnection.query(` - SELECT - p.pid, - p.description AS title, - p.notes AS description, - p.itemnumber AS SKU, - p.date_created, - p.datein AS first_received, - p.location, - p.upc AS barcode, - p.harmonized_tariff_code, - p.stamp AS updated_at, - CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible, - CASE - WHEN p.reorder < 0 THEN 0 - WHEN ( - ((p.datein = '0000-00-00 00:00:00' OR p.datein <= DATE_SUB(NOW(), INTERVAL 5 YEAR)) - AND (p.date_refill = '0000-00-00 00:00:00' OR p.date_refill <= DATE_SUB(NOW(), INTERVAL 5 YEAR))) - ) THEN 0 - ELSE 1 - END AS replenishable, - s.companyname AS vendor, - CASE WHEN s.companyname = 'Notions' - THEN sid.notions_itemnumber - ELSE sid.supplier_itemnumber - END AS vendor_reference, - sid.notions_itemnumber AS notions_reference, - pc1.name AS brand, - pc2.name AS line, - pc3.name AS subline, - pc4.name AS artist, - NULL AS landing_cost_price, - CONCAT('https://www.acherryontop.com/shop/product/', p.pid) AS permalink, - NULL AS options, - NULL AS tags, - NULL AS uom, - (SELECT COUNT(*) FROM mybasket mb WHERE mb.item = p.pid AND mb.qty > 0) AS baskets, - (SELECT COUNT(*) FROM product_notify pn WHERE pn.pid = p.pid) AS notifies, - COALESCE(CASE - WHEN sid.supplier_id = 92 THEN sid.notions_qty_per_unit - ELSE sid.supplier_qty_per_unit - END, sid.notions_qty_per_unit) AS moq, - p.rating, - p.rating_votes AS reviews, - p.weight, - p.length, - p.width, - p.height, - p.totalsold AS total_sold, - p.country_of_origin, - pls.date_sold as date_last_sold, - GROUP_CONCAT(DISTINCT pci.cat_id) as category_ids, - true // needs_update - FROM products p - LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0 - LEFT JOIN supplier_item_data sid ON p.pid = sid.pid - LEFT JOIN suppliers s ON sid.supplier_id = s.supplierid - LEFT JOIN product_category_index pci ON p.pid = pci.pid - LEFT JOIN product_categories pc1 ON p.company = pc1.cat_id - LEFT JOIN product_categories pc2 ON p.line = pc2.cat_id - LEFT JOIN product_categories pc3 ON p.subline = pc3.cat_id - LEFT JOIN product_categories pc4 ON p.artist = pc4.cat_id - LEFT JOIN product_last_sold pls ON p.pid = pls.pid - LEFT JOIN current_inventory ci ON p.pid = ci.pid - LEFT JOIN product_current_prices pcp ON p.pid = pcp.pid AND pcp.active = 1 - LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid - WHERE ${incrementalUpdate ? ` - p.stamp > ? OR - ci.stamp > ? OR - pcp.date_deactive > ? OR - pcp.date_active > ? OR - sid.stamp > ? OR - pnb.date_updated > ? OR - pls.date_sold > ? - ` : 'TRUE'} - GROUP BY p.pid - `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []); - - // Insert production data in batches, but only for products that need updates - for (let i = 0; i < prodData.length; i += 1000) { - const batch = prodData.slice(i, i + 1000); - const placeholders = batch.map(() => `(${Array(31).fill("?").join(",")})`).join(","); - - // Map each row to exactly match our temp table columns - const values = batch.flatMap(row => [ - row.pid, - row.title, - row.description, - row.SKU, - row.date_created, - row.first_received, - row.location, - row.barcode, - row.harmonized_tariff_code, - row.updated_at, - row.visible, - row.replenishable, - row.vendor, - row.vendor_reference, - row.notions_reference, - row.brand, - row.line, - row.subline, - row.artist, - row.landing_cost_price, - row.permalink, - row.options, - row.tags, - row.uom, - row.baskets, - row.notifies, - row.moq, - row.rating, - row.reviews, - row.weight, - row.length, - row.width, - row.height, - row.total_sold, - row.country_of_origin, - row.date_last_sold, - row.category_ids, - true // needs_update - ]); - - await localConnection.query(` - INSERT INTO temp_prod_data VALUES ${placeholders} - `, values); - - outputProgress({ - status: "running", - operation: "Products import", - message: `Loaded ${Math.min(i + 1000, prodData.length)} of ${prodData.length} products from production`, - current: i + batch.length, - total: prodData.length - }); - } - - // Now join with local temp tables and process in batches, but only for products that need updates - const BATCH_SIZE = 2500; - let processed = 0; - let recordsAdded = 0; - let recordsUpdated = 0; - // Get actual count from temp table - only count products that need updates const [[{ actualTotal }]] = await localConnection.query(` - SELECT COUNT(DISTINCT p.pid) as actualTotal - FROM temp_prod_data p - LEFT JOIN temp_inventory_status tis ON p.pid = tis.pid - LEFT JOIN temp_product_prices tpp ON p.pid = tpp.pid - WHERE p.needs_update = 1 - OR tis.needs_update = 1 - OR tpp.needs_update = 1 + SELECT COUNT(DISTINCT pid) as actualTotal + FROM temp_products + WHERE needs_update = 1 `); + + console.log('Products: Found changes:', actualTotal); + + // Process in batches + const BATCH_SIZE = 5000; + let processed = 0; while (processed < actualTotal) { const [batch] = await localConnection.query(` - SELECT - p.*, - COALESCE(tis.stock_quantity, 0) as stock_quantity, - COALESCE(tis.preorder_count, 0) as preorder_count, - COALESCE(tis.notions_inv_count, 0) as notions_inv_count, - COALESCE(tpp.price, 0) as price, - COALESCE(tpp.regular_price, 0) as regular_price, - COALESCE(tpp.cost_price, 0) as cost_price - FROM temp_prod_data p - LEFT JOIN temp_inventory_status tis ON p.pid = tis.pid - LEFT JOIN temp_product_prices tpp ON p.pid = tpp.pid - WHERE p.needs_update = 1 - OR tis.needs_update = 1 - OR tpp.needs_update = 1 + SELECT * FROM temp_products + WHERE needs_update = 1 LIMIT ? OFFSET ? `, [BATCH_SIZE, processed]); - if (!batch || batch.length === 0) break; // Exit if no more records + if (!batch || batch.length === 0) break; // Add image URLs batch.forEach(row => { @@ -467,25 +388,14 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate }); if (batch.length > 0) { - // MySQL 8.0 optimized insert with proper placeholders - const placeholderGroup = `(${Array(columnNames.length).fill("?").join(",")})`; - - // First check which products already exist and get their current values + // Get existing products in one query const [existingProducts] = await localConnection.query( `SELECT ${columnNames.join(',')} FROM products WHERE pid IN (?)`, [batch.map(p => p.pid)] ); const existingPidsMap = new Map(existingProducts.map(p => [p.pid, p])); - // Helper function to map values consistently - const mapValues = (product) => columnNames.map(col => { - const val = product[col] ?? null; - if (col === "managing_stock") return 1; - if (typeof val === "number") return val || 0; - return val; - }); - - // Split into inserts and updates, comparing values for updates + // Split into inserts and updates const insertsAndUpdates = batch.reduce((acc, product) => { if (existingPidsMap.has(product.pid)) { const existing = existingPidsMap.get(product.pid); @@ -493,119 +403,114 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate const hasChanges = columnNames.some(col => { const newVal = product[col] ?? null; const oldVal = existing[col] ?? null; - // Special handling for numbers to avoid type coercion issues + if (col === "managing_stock") return false; // Skip this as it's always 1 if (typeof newVal === 'number' && typeof oldVal === 'number') { - // Handle NaN and Infinity - if (isNaN(newVal) || isNaN(oldVal)) return isNaN(newVal) !== isNaN(oldVal); - if (!isFinite(newVal) || !isFinite(oldVal)) return !isFinite(newVal) !== !isFinite(oldVal); - // Allow for tiny floating point differences return Math.abs(newVal - oldVal) > 0.00001; } - if (col === 'managing_stock') return false; // Skip this as it's always 1 return newVal !== oldVal; }); if (hasChanges) { - acc.updates.push({ - pid: product.pid, - values: mapValues(product) - }); + acc.updates.push(product); } } else { - acc.inserts.push({ - pid: product.pid, - values: mapValues(product) - }); + acc.inserts.push(product); } return acc; }, { inserts: [], updates: [] }); - // Log summary for this batch - if (insertsAndUpdates.inserts.length > 0 || insertsAndUpdates.updates.length > 0) { - console.log(`Batch summary: ${insertsAndUpdates.inserts.length} new products, ${insertsAndUpdates.updates.length} updates`); - } - - // Handle inserts + // Process inserts if (insertsAndUpdates.inserts.length > 0) { - const insertPlaceholders = Array(insertsAndUpdates.inserts.length).fill(placeholderGroup).join(","); - + const insertValues = insertsAndUpdates.inserts.map(product => + columnNames.map(col => { + const val = product[col] ?? null; + if (col === "managing_stock") return 1; + return val; + }) + ); + + const insertPlaceholders = insertsAndUpdates.inserts + .map(() => `(${Array(columnNames.length).fill('?').join(',')})`) + .join(','); + const insertResult = await localConnection.query(` - INSERT INTO products (${columnNames.join(",")}) + INSERT INTO products (${columnNames.join(',')}) VALUES ${insertPlaceholders} - `, insertsAndUpdates.inserts.map(i => i.values).flat()); - + `, insertValues.flat()); + recordsAdded += insertResult[0].affectedRows; } - // Handle updates - now we know these actually have changes + // Process updates if (insertsAndUpdates.updates.length > 0) { - const updatePlaceholders = Array(insertsAndUpdates.updates.length).fill(placeholderGroup).join(","); - + const updateValues = insertsAndUpdates.updates.map(product => + columnNames.map(col => { + const val = product[col] ?? null; + if (col === "managing_stock") return 1; + return val; + }) + ); + + const updatePlaceholders = insertsAndUpdates.updates + .map(() => `(${Array(columnNames.length).fill('?').join(',')})`) + .join(','); + const updateResult = await localConnection.query(` - INSERT INTO products (${columnNames.join(",")}) + INSERT INTO products (${columnNames.join(',')}) VALUES ${updatePlaceholders} ON DUPLICATE KEY UPDATE - ${columnNames - .filter(col => col !== "pid") - .map(col => `${col} = VALUES(${col})`) - .join(",")}; - `, insertsAndUpdates.updates.map(u => u.values).flat()); - + ${columnNames + .filter(col => col !== 'pid') + .map(col => `${col} = VALUES(${col})`) + .join(',')}; + `, updateValues.flat()); + recordsUpdated += insertsAndUpdates.updates.length; } - } - // Insert category relationships - const categoryRelationships = []; - batch.forEach(row => { - if (row.category_ids) { - const catIds = row.category_ids - .split(",") - .map(id => id.trim()) - .filter(id => id) - .map(Number); - - catIds.forEach(catId => { - if (catId) categoryRelationships.push([row.pid, catId]); - }); - } - }); + // Process category relationships + if (batch.some(p => p.category_ids)) { + const categoryRelationships = batch + .filter(p => p.category_ids) + .flatMap(product => + product.category_ids + .split(',') + .map(id => id.trim()) + .filter(id => id) + .map(Number) + .filter(id => !isNaN(id)) + .map(catId => [catId, product.pid]) + ); - if (categoryRelationships.length > 0) { - // First verify categories exist - const uniqueCatIds = [...new Set(categoryRelationships.map(([_, catId]) => catId))]; - const [existingCats] = await localConnection.query( - "SELECT cat_id FROM categories WHERE cat_id IN (?)", - [uniqueCatIds] - ); - const existingCatIds = new Set(existingCats.map(c => c.cat_id)); + if (categoryRelationships.length > 0) { + // Verify categories exist before inserting relationships + const uniqueCatIds = [...new Set(categoryRelationships.map(([catId]) => catId))]; + const [existingCats] = await localConnection.query( + "SELECT cat_id FROM categories WHERE cat_id IN (?)", + [uniqueCatIds] + ); + const existingCatIds = new Set(existingCats.map(c => c.cat_id)); - // Filter relationships to only include existing categories - const validRelationships = categoryRelationships.filter(([_, catId]) => - existingCatIds.has(catId) - ); + // Filter relationships to only include existing categories + const validRelationships = categoryRelationships.filter(([catId]) => + existingCatIds.has(catId) + ); - if (validRelationships.length > 0) { - // Delete existing relationships for these products first - await localConnection.query( - "DELETE FROM product_categories WHERE pid IN (?)", - [batch.map(p => p.pid)] - ); - - // Insert new relationships using INSERT IGNORE - const catPlaceholders = validRelationships - .map(() => "(?, ?)") - .join(","); - - await localConnection.query( - `INSERT IGNORE INTO product_categories (pid, cat_id) - VALUES ${catPlaceholders}`, - validRelationships.flat() - ); + if (validRelationships.length > 0) { + const catPlaceholders = validRelationships + .map(() => "(?, ?)") + .join(","); + await localConnection.query( + `INSERT IGNORE INTO product_categories (cat_id, pid) + VALUES ${catPlaceholders}`, + validRelationships.flat() + ); + } + } } } - processed += batch.length; // Only increment by actual records processed + processed += batch.length; outputProgress({ status: "running", @@ -617,15 +522,10 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate remaining: estimateRemaining(startTime, processed, actualTotal), rate: calculateRate(startTime, processed) }); - - // Force garbage collection between batches - if (global.gc) { - global.gc(); - } } // Drop temporary tables - await cleanupTemporaryTables(localConnection); + await setupAndCleanupTempTables(localConnection, 'cleanup'); // Only update sync status if we get here (no errors thrown) await localConnection.query(` @@ -668,30 +568,6 @@ async function importMissingProducts(prodConnection, localConnection, missingPid p.date_created, p.datein AS first_received, p.location, - COALESCE(si.available_local, 0) - COALESCE( - (SELECT SUM(oi.qty_ordered - oi.qty_placed) - FROM order_items oi - JOIN _order o ON oi.order_id = o.order_id - WHERE oi.prod_pid = p.pid - AND o.date_placed != '0000-00-00 00:00:00' - AND o.date_shipped = '0000-00-00 00:00:00' - AND oi.pick_finished = 0 - AND oi.qty_back = 0 - AND o.order_status != 15 - AND o.order_status < 90 - AND oi.qty_ordered >= oi.qty_placed - AND oi.qty_ordered > 0), 0 - ) as stock_quantity, - COALESCE(ci.onpreorder, 0) as preorder_count, - COALESCE(pnb.inventory, 0) as notions_inv_count, - COALESCE(pcp.price_each, 0) as price, - COALESCE(p.sellingprice, 0) AS regular_price, - CASE - WHEN EXISTS (SELECT 1 FROM product_inventory WHERE pid = p.pid AND count > 0) - THEN (SELECT ROUND(AVG(costeach), 5) FROM product_inventory WHERE pid = p.pid AND count > 0) - ELSE (SELECT costeach FROM product_inventory WHERE pid = p.pid ORDER BY daterec DESC LIMIT 1) - END AS cost_price, - NULL AS landing_cost_price, p.upc AS barcode, p.harmonized_tariff_code, p.stamp AS updated_at, @@ -705,21 +581,18 @@ async function importMissingProducts(prodConnection, localConnection, missingPid ) THEN 0 ELSE 1 END AS replenishable, - s.companyname AS vendor, - CASE WHEN s.companyname = 'Notions' THEN sid.notions_itemnumber ELSE sid.supplier_itemnumber END AS vendor_reference, - sid.notions_itemnumber AS notions_reference, - CONCAT('https://www.acherryontop.com/shop/product/', p.pid) AS permalink, - pc1.name AS brand, - pc2.name AS line, - pc3.name AS subline, - pc4.name AS artist, - NULL AS options, - NULL AS tags, - COALESCE(CASE - WHEN sid.supplier_id = 92 THEN sid.notions_qty_per_unit - ELSE sid.supplier_qty_per_unit - END, sid.notions_qty_per_unit) AS moq, - NULL AS uom, + COALESCE(si.available_local, 0) as stock_quantity, + COALESCE(pq.qty, 0) as pending_qty, + COALESCE(ci.onpreorder, 0) as preorder_count, + COALESCE(pnb.inventory, 0) as notions_inv_count, + COALESCE(pcp.price_each, 0) as price, + COALESCE(p.sellingprice, 0) AS regular_price, + CASE + WHEN EXISTS (SELECT 1 FROM product_inventory WHERE pid = p.pid AND count > 0) + THEN (SELECT ROUND(AVG(costeach), 5) FROM product_inventory WHERE pid = p.pid AND count > 0) + ELSE (SELECT costeach FROM product_inventory WHERE pid = p.pid ORDER BY daterec DESC LIMIT 1) + END AS cost_price, + NULL AS landing_cost_price, p.rating, p.rating_votes AS reviews, p.weight, @@ -786,7 +659,7 @@ async function importMissingProducts(prodConnection, localConnection, missingPid ON DUPLICATE KEY UPDATE ${columnNames .filter((col) => col !== "pid") .map((col) => `${col} = VALUES(${col})`) - .join(",")} + .join(",")}; `; const result = await localConnection.query(query, productValues); @@ -849,4 +722,4 @@ async function importMissingProducts(prodConnection, localConnection, missingPid module.exports = { importProducts, importMissingProducts -}; \ No newline at end of file +}; \ No newline at end of file