Fix/add data to orders script and fix other import errors

This commit is contained in:
2025-02-01 01:06:45 -05:00
parent e77b488cd4
commit 07f14c0017
3 changed files with 409 additions and 499 deletions

View File

@@ -39,7 +39,7 @@ CREATE TABLE products (
tags TEXT,
moq INT DEFAULT 1,
uom INT DEFAULT 1,
rating TINYINT UNSIGNED DEFAULT 0,
rating DECIMAL(10,2) DEFAULT 0.00,
reviews INT UNSIGNED DEFAULT 0,
weight DECIMAL(10,3),
length DECIMAL(10,3),
@@ -113,6 +113,7 @@ CREATE TABLE IF NOT EXISTS orders (
tax DECIMAL(10,3) DEFAULT 0.000,
tax_included TINYINT(1) DEFAULT 0,
shipping DECIMAL(10,3) DEFAULT 0.000,
costeach DECIMAL(10,3) DEFAULT 0.000,
customer VARCHAR(50) NOT NULL,
customer_name VARCHAR(100),
status VARCHAR(20) DEFAULT 'pending',

View File

@@ -60,6 +60,14 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
PRIMARY KEY (order_id, pid)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
`);
await localConnection.query(`
CREATE TABLE IF NOT EXISTS temp_order_costs (
order_id INT UNSIGNED NOT NULL,
pid INT UNSIGNED NOT NULL,
costeach DECIMAL(10,3) DEFAULT 0.000,
PRIMARY KEY (order_id, pid)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
`);
// Get column names from the local table
const [columns] = await localConnection.query(`
@@ -117,7 +125,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
oi.prod_itemnumber as SKU,
oi.prod_price as price,
oi.qty_ordered as quantity,
COALESCE(oi.prod_price_reg - oi.prod_price, 0) * oi.qty_ordered as base_discount,
COALESCE(oi.prod_price_reg - oi.prod_price, 0) as base_discount,
oi.stamp as last_modified
FROM order_items oi
USE INDEX (PRIMARY)
@@ -271,6 +279,26 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
}
}
// Get costeach values in batches
for (let i = 0; i < orderIds.length; i += 5000) {
const batchIds = orderIds.slice(i, i + 5000);
const [costs] = await prodConnection.query(`
SELECT orderid as order_id, pid, costeach
FROM order_costs
WHERE orderid IN (?)
`, [batchIds]);
if (costs.length > 0) {
const placeholders = costs.map(() => '(?, ?, ?)').join(",");
const values = costs.flatMap(c => [c.order_id, c.pid, c.costeach]);
await localConnection.query(`
INSERT INTO temp_order_costs (order_id, pid, costeach)
VALUES ${placeholders}
ON DUPLICATE KEY UPDATE costeach = VALUES(costeach)
`, values);
}
}
// Now combine all the data and insert into orders table
let importedCount = 0;
@@ -302,18 +330,19 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
om.customer,
om.customer_name,
om.status,
om.canceled
om.canceled,
COALESCE(tc.costeach, 0) as costeach
FROM temp_order_items oi
JOIN temp_order_meta om ON oi.order_id = om.order_id
LEFT JOIN temp_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid
LEFT JOIN temp_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid
LEFT JOIN temp_order_costs tc ON oi.order_id = tc.order_id AND oi.pid = tc.pid
WHERE oi.order_id IN (?)
`, [batchIds]);
// Filter orders and track missing products - do this in a single pass
const validOrders = [];
const values = [];
for (const order of orders) {
if (!existingPids.has(order.pid)) {
missingProducts.add(order.pid);
@@ -331,7 +360,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
// First check which orders exist and get their current values
const [existingOrders] = await localConnection.query(
`SELECT ${columnNames.join(',')} FROM orders WHERE (order_number, pid) IN (${validOrders.map(() => "(?,?)").join(",")})`,
`SELECT ${columnNames.join(",")} FROM orders WHERE (order_number, pid) IN (${validOrders.map(() => "(?,?)").join(",")})`,
validOrders.flatMap(o => [o.order_number, o.pid])
);
const existingOrderMap = new Map(
@@ -347,13 +376,11 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
const hasChanges = columnNames.some(col => {
const newVal = order[col] ?? null;
const oldVal = existing[col] ?? null;
// Special handling for numbers to avoid type coercion issues
if (typeof newVal === 'number' && typeof oldVal === 'number') {
return Math.abs(newVal - oldVal) > 0.00001; // Allow for tiny floating point differences
}
return newVal !== oldVal;
});
if (hasChanges) {
acc.updates.push({
order_number: order.order_number,
@@ -367,7 +394,15 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
values: columnNames.map(col => order[col] ?? null)
});
}
} else {
acc.inserts.push({
order_number: order.order_number,
pid: order.pid,
values: columnNames.map(col => order[col] ?? null)
});
}
return acc;
}, { inserts: [], updates: [] });
// Handle inserts
if (insertsAndUpdates.inserts.length > 0) {
@@ -427,6 +462,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
DROP TEMPORARY TABLE IF EXISTS temp_order_meta;
DROP TEMPORARY TABLE IF EXISTS temp_order_discounts;
DROP TEMPORARY TABLE IF EXISTS temp_order_taxes;
DROP TEMPORARY TABLE IF EXISTS temp_order_costs;
`);
// Import missing products if any

View File

@@ -15,41 +15,93 @@ const getImageUrls = (pid, iid = 1) => {
};
};
async function setupTemporaryTables(connection) {
await connection.query(`CREATE TEMPORARY TABLE IF NOT EXISTS temp_categories ( cat_id INT PRIMARY KEY, name VARCHAR(255) ) ENGINE=InnoDB;`);
await connection.query(`CREATE TEMPORARY TABLE IF NOT EXISTS temp_product_images ( pid INT, iid INT, image_type ENUM('thumbnail', '175', 'full'), url VARCHAR(255), PRIMARY KEY (pid, image_type) ) ENGINE=InnoDB;`);
await connection.query(`CREATE TEMPORARY TABLE IF NOT EXISTS temp_inventory_status ( pid INT PRIMARY KEY, stock_quantity INT, pending_qty INT, preorder_count INT, notions_inv_count INT, needs_update BOOLEAN ) ENGINE=InnoDB;`);
await connection.query(`CREATE TEMPORARY TABLE IF NOT EXISTS temp_product_prices ( pid INT PRIMARY KEY, price DECIMAL(10,2), regular_price DECIMAL(10,2), cost_price DECIMAL(10,5), needs_update BOOLEAN ) ENGINE=InnoDB;`);
await connection.query(`INSERT INTO temp_categories SELECT cat_id, name FROM categories;`);
await connection.query(`CREATE INDEX idx_temp_cat_id ON temp_categories(cat_id);`);
}
async function cleanupTemporaryTables(connection) {
async function setupAndCleanupTempTables(connection, operation = 'setup') {
if (operation === 'setup') {
await connection.query(`
DROP TEMPORARY TABLE IF EXISTS temp_categories;
DROP TEMPORARY TABLE IF EXISTS temp_product_images;
DROP TEMPORARY TABLE IF EXISTS temp_inventory_status;
DROP TEMPORARY TABLE IF EXISTS temp_product_prices;
CREATE TEMPORARY TABLE IF NOT EXISTS temp_products (
pid BIGINT NOT NULL,
title VARCHAR(255),
description TEXT,
SKU VARCHAR(50),
stock_quantity INT DEFAULT 0,
pending_qty INT DEFAULT 0,
preorder_count INT DEFAULT 0,
notions_inv_count INT DEFAULT 0,
price DECIMAL(10,3) NOT NULL DEFAULT 0,
regular_price DECIMAL(10,3) NOT NULL DEFAULT 0,
cost_price DECIMAL(10,3),
vendor VARCHAR(100),
vendor_reference VARCHAR(100),
notions_reference VARCHAR(100),
brand VARCHAR(100),
line VARCHAR(100),
subline VARCHAR(100),
artist VARCHAR(100),
category_ids TEXT,
created_at DATETIME,
first_received DATETIME,
landing_cost_price DECIMAL(10,3),
barcode VARCHAR(50),
harmonized_tariff_code VARCHAR(50),
updated_at DATETIME,
visible BOOLEAN,
replenishable BOOLEAN,
permalink VARCHAR(255),
moq DECIMAL(10,3),
rating DECIMAL(10,2),
reviews INT,
weight DECIMAL(10,3),
length DECIMAL(10,3),
width DECIMAL(10,3),
height DECIMAL(10,3),
country_of_origin VARCHAR(100),
location VARCHAR(100),
total_sold INT,
baskets INT,
notifies INT,
date_last_sold DATETIME,
needs_update BOOLEAN DEFAULT TRUE,
PRIMARY KEY (pid),
INDEX idx_needs_update (needs_update)
) ENGINE=InnoDB;
`);
} else {
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_products;');
}
}
async function materializeCalculations(prodConnection, localConnection) {
async function materializeCalculations(prodConnection, localConnection, incrementalUpdate = true, lastSyncTime = '1970-01-01') {
outputProgress({
status: "running",
operation: "Products import",
message: "Fetching inventory and order data from production"
message: "Fetching product data from production"
});
// Get all inventory and order data from production in one query
const [prodInventory] = await prodConnection.query(`
// Get all product data in a single optimized query
const [prodData] = await prodConnection.query(`
SELECT
p.pid,
COALESCE(si.available_local, 0) as stock_quantity,
COALESCE(ci.onpreorder, 0) as preorder_count,
COALESCE(pnb.inventory, 0) as notions_inv_count,
COALESCE(
(
SELECT SUM(oi.qty_ordered - oi.qty_placed)
p.description AS title,
p.notes AS description,
p.itemnumber AS SKU,
p.date_created,
p.datein AS first_received,
p.location,
p.upc AS barcode,
p.harmonized_tariff_code,
p.stamp AS updated_at,
CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible,
CASE
WHEN p.reorder < 0 THEN 0
WHEN (
(IFNULL(pls.date_sold, '0000-00-00') = '0000-00-00' OR pls.date_sold <= DATE_SUB(CURDATE(), INTERVAL 5 YEAR))
OR (p.datein = '0000-00-00 00:00:00' OR p.datein <= DATE_SUB(NOW(), INTERVAL 5 YEAR))
OR (p.date_refill = '0000-00-00 00:00:00' OR p.date_refill <= DATE_SUB(NOW(), INTERVAL 5 YEAR))
) THEN 0
ELSE 1
END AS replenishable,
COALESCE(si.available_local, 0) - COALESCE(
(SELECT SUM(oi.qty_ordered - oi.qty_placed)
FROM order_items oi
JOIN _order o ON oi.order_id = o.order_id
WHERE oi.prod_pid = p.pid
@@ -62,100 +114,200 @@ async function materializeCalculations(prodConnection, localConnection) {
AND oi.qty_ordered >= oi.qty_placed
AND oi.qty_ordered > 0
), 0
) as pending_qty
FROM products p
LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0
LEFT JOIN current_inventory ci ON p.pid = ci.pid
LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid
`);
outputProgress({
status: "running",
operation: "Products import",
message: `Processing ${prodInventory.length} inventory records`
});
// Insert inventory data into local temp table in batches
for (let i = 0; i < prodInventory.length; i += 1000) {
const batch = prodInventory.slice(i, i + 1000);
const values = batch.map(row => [
row.pid,
Math.max(0, row.stock_quantity - row.pending_qty), // Calculate final stock quantity
row.pending_qty,
row.preorder_count,
row.notions_inv_count,
true // Mark as needing update
]);
if (values.length > 0) {
await localConnection.query(`
INSERT INTO temp_inventory_status (pid, stock_quantity, pending_qty, preorder_count, notions_inv_count, needs_update)
VALUES ?
ON DUPLICATE KEY UPDATE
stock_quantity = VALUES(stock_quantity),
pending_qty = VALUES(pending_qty),
preorder_count = VALUES(preorder_count),
notions_inv_count = VALUES(notions_inv_count),
needs_update = TRUE
`, [values]);
}
outputProgress({
status: "running",
operation: "Products import",
message: `Processed ${Math.min(i + 1000, prodInventory.length)} of ${prodInventory.length} inventory records`,
current: i + batch.length,
total: prodInventory.length
});
}
outputProgress({
status: "running",
operation: "Products import",
message: "Fetching pricing data from production"
});
// Get prices from production
const [prodPrices] = await prodConnection.query(`
SELECT
p.pid,
) as stock_quantity,
COALESCE(
(SELECT SUM(oi.qty_ordered - oi.qty_placed)
FROM order_items oi
JOIN _order o ON oi.order_id = o.order_id
WHERE oi.prod_pid = p.pid
AND o.date_placed != '0000-00-00 00:00:00'
AND o.date_shipped = '0000-00-00 00:00:00'
AND oi.pick_finished = 0
AND oi.qty_back = 0
AND o.order_status != 15
AND o.order_status < 90
AND oi.qty_ordered >= oi.qty_placed
AND oi.qty_ordered > 0
), 0
) as pending_qty,
COALESCE(ci.onpreorder, 0) as preorder_count,
COALESCE(pnb.inventory, 0) as notions_inv_count,
COALESCE(pcp.price_each, 0) as price,
COALESCE(p.sellingprice, 0) AS regular_price,
CASE
WHEN EXISTS (SELECT 1 FROM product_inventory WHERE pid = p.pid AND count > 0)
THEN (SELECT ROUND(AVG(costeach), 5) FROM product_inventory WHERE pid = p.pid AND count > 0)
ELSE (SELECT costeach FROM product_inventory WHERE pid = p.pid ORDER BY daterec DESC LIMIT 1)
END AS cost_price
END AS cost_price,
NULL as landing_cost_price,
s.companyname AS vendor,
CASE
WHEN s.companyname = 'Notions' THEN sid.notions_itemnumber
ELSE sid.supplier_itemnumber
END AS vendor_reference,
sid.notions_itemnumber AS notions_reference,
CONCAT('https://www.acherryontop.com/shop/product/', p.pid) AS permalink,
pc1.name AS brand,
pc2.name AS line,
pc3.name AS subline,
pc4.name AS artist,
COALESCE(CASE
WHEN sid.supplier_id = 92 THEN sid.notions_qty_per_unit
ELSE sid.supplier_qty_per_unit
END, sid.notions_qty_per_unit) AS moq,
p.rating,
p.rating_votes AS reviews,
p.weight,
p.length,
p.width,
p.height,
p.country_of_origin,
(SELECT COUNT(*) FROM mybasket mb WHERE mb.item = p.pid AND mb.qty > 0) AS baskets,
(SELECT COUNT(*) FROM product_notify pn WHERE pn.pid = p.pid) AS notifies,
p.totalsold AS total_sold,
pls.date_sold as date_last_sold,
GROUP_CONCAT(DISTINCT CASE
WHEN pc.cat_id IS NOT NULL
AND pc.type IN (10, 20, 11, 21, 12, 13)
AND pci.cat_id NOT IN (16, 17)
THEN pci.cat_id
END) as category_ids
FROM products p
LEFT JOIN product_current_prices pcp ON p.pid = pcp.pid
WHERE pcp.active = 1
`);
LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0
LEFT JOIN current_inventory ci ON p.pid = ci.pid
LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid
LEFT JOIN product_current_prices pcp ON p.pid = pcp.pid AND pcp.active = 1
LEFT JOIN supplier_item_data sid ON p.pid = sid.pid
LEFT JOIN suppliers s ON sid.supplier_id = s.supplierid
LEFT JOIN product_category_index pci ON p.pid = pci.pid
LEFT JOIN product_categories pc ON pci.cat_id = pc.cat_id
LEFT JOIN product_categories pc1 ON p.company = pc1.cat_id
LEFT JOIN product_categories pc2 ON p.line = pc2.cat_id
LEFT JOIN product_categories pc3 ON p.subline = pc3.cat_id
LEFT JOIN product_categories pc4 ON p.artist = pc4.cat_id
LEFT JOIN product_last_sold pls ON p.pid = pls.pid
WHERE ${incrementalUpdate ? `
p.stamp > ? OR
ci.stamp > ? OR
pcp.date_deactive > ? OR
pcp.date_active > ? OR
pnb.date_updated > ?
` : 'TRUE'}
GROUP BY p.pid
`, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []);
outputProgress({
status: "running",
operation: "Products import",
message: `Processing ${prodPrices.length} price records`
message: `Processing ${prodData.length} product records`
});
// Insert prices into local temp table in batches
for (let i = 0; i < prodPrices.length; i += 1000) {
const batch = prodPrices.slice(i, i + 1000);
// Insert all product data into temp table in batches
for (let i = 0; i < prodData.length; i += 1000) {
const batch = prodData.slice(i, i + 1000);
const values = batch.map(row => [
row.pid,
row.title,
row.description,
row.SKU,
// Set stock quantity to 0 if it's over 5000
row.stock_quantity > 5000 ? 0 : Math.max(0, row.stock_quantity),
row.pending_qty,
row.preorder_count,
row.notions_inv_count,
row.price,
row.regular_price,
row.cost_price,
row.vendor,
row.vendor_reference,
row.notions_reference,
row.brand,
row.line,
row.subline,
row.artist,
row.category_ids,
row.date_created, // map to created_at
row.first_received,
row.landing_cost_price,
row.barcode,
row.harmonized_tariff_code,
row.updated_at,
row.visible,
row.replenishable,
row.permalink,
row.moq,
row.rating ? Number(row.rating).toFixed(2) : null,
row.reviews,
row.weight,
row.length,
row.width,
row.height,
row.country_of_origin,
row.location,
row.total_sold,
row.baskets,
row.notifies,
row.date_last_sold,
true // Mark as needing update
]);
if (values.length > 0) {
await localConnection.query(`
INSERT INTO temp_product_prices (pid, price, regular_price, cost_price, needs_update)
INSERT INTO temp_products (
pid, title, description, SKU,
stock_quantity, pending_qty, preorder_count, notions_inv_count,
price, regular_price, cost_price,
vendor, vendor_reference, notions_reference,
brand, line, subline, artist,
category_ids, created_at, first_received,
landing_cost_price, barcode, harmonized_tariff_code,
updated_at, visible, replenishable, permalink,
moq, rating, reviews, weight, length, width,
height, country_of_origin, location, total_sold,
baskets, notifies, date_last_sold, needs_update
)
VALUES ?
ON DUPLICATE KEY UPDATE
title = VALUES(title),
description = VALUES(description),
SKU = VALUES(SKU),
stock_quantity = VALUES(stock_quantity),
pending_qty = VALUES(pending_qty),
preorder_count = VALUES(preorder_count),
notions_inv_count = VALUES(notions_inv_count),
price = VALUES(price),
regular_price = VALUES(regular_price),
cost_price = VALUES(cost_price),
vendor = VALUES(vendor),
vendor_reference = VALUES(vendor_reference),
notions_reference = VALUES(notions_reference),
brand = VALUES(brand),
line = VALUES(line),
subline = VALUES(subline),
artist = VALUES(artist),
category_ids = VALUES(category_ids),
created_at = VALUES(created_at),
first_received = VALUES(first_received),
landing_cost_price = VALUES(landing_cost_price),
barcode = VALUES(barcode),
harmonized_tariff_code = VALUES(harmonized_tariff_code),
updated_at = VALUES(updated_at),
visible = VALUES(visible),
replenishable = VALUES(replenishable),
permalink = VALUES(permalink),
moq = VALUES(moq),
rating = VALUES(rating),
reviews = VALUES(reviews),
weight = VALUES(weight),
length = VALUES(length),
width = VALUES(width),
height = VALUES(height),
country_of_origin = VALUES(country_of_origin),
location = VALUES(location),
total_sold = VALUES(total_sold),
baskets = VALUES(baskets),
notifies = VALUES(notifies),
date_last_sold = VALUES(date_last_sold),
needs_update = TRUE
`, [values]);
}
@@ -163,9 +315,9 @@ async function materializeCalculations(prodConnection, localConnection) {
outputProgress({
status: "running",
operation: "Products import",
message: `Processed ${Math.min(i + 1000, prodPrices.length)} of ${prodPrices.length} price records`,
message: `Processed ${Math.min(i + 1000, prodData.length)} of ${prodData.length} product records`,
current: i + batch.length,
total: prodPrices.length
total: prodData.length
});
}
@@ -200,263 +352,32 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
console.log('Products: Using last sync time:', lastSyncTime);
// Setup temporary tables
await setupTemporaryTables(localConnection);
await setupAndCleanupTempTables(localConnection, 'setup');
// Materialize calculations
await materializeCalculations(prodConnection, localConnection);
// Optimized count query for changes since last sync
const [countResult] = await prodConnection.query(`
SELECT COUNT(*) as total
FROM products p
LEFT JOIN current_inventory ci ON p.pid = ci.pid
LEFT JOIN product_current_prices pcp ON p.pid = pcp.pid AND pcp.active = 1
LEFT JOIN supplier_item_data sid ON p.pid = sid.pid
LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid
LEFT JOIN product_last_sold pls ON p.pid = pls.pid
WHERE ${incrementalUpdate ? `
p.stamp > ? OR
ci.stamp > ? OR
pcp.date_deactive > ? OR
pcp.date_active > ? OR
sid.stamp > ? OR
pnb.date_updated > ? OR
pls.date_sold > ?
` : 'TRUE'}
`, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []);
console.log('Products: Found changes:', countResult[0].total);
const totalProducts = countResult[0].total;
// Main product query using materialized data - modified for incremental
outputProgress({
status: "running",
operation: "Products import",
message: `Fetching ${incrementalUpdate ? 'updated' : 'all'} product data from production`
});
// Create temporary table for production data
await localConnection.query(`
CREATE TEMPORARY TABLE temp_prod_data (
pid BIGINT NOT NULL,
title VARCHAR(255),
description TEXT,
SKU VARCHAR(50),
date_created TIMESTAMP NULL,
first_received TIMESTAMP NULL,
location VARCHAR(50),
barcode VARCHAR(50),
harmonized_tariff_code VARCHAR(20),
updated_at TIMESTAMP,
visible BOOLEAN,
replenishable BOOLEAN,
vendor VARCHAR(100),
vendor_reference VARCHAR(100),
notions_reference VARCHAR(100),
brand VARCHAR(100),
line VARCHAR(100),
subline VARCHAR(100),
artist VARCHAR(100),
landing_cost_price DECIMAL(10,2) DEFAULT NULL,
permalink VARCHAR(255) DEFAULT NULL,
options TEXT DEFAULT NULL,
tags TEXT DEFAULT NULL,
uom VARCHAR(50) DEFAULT NULL,
baskets INT DEFAULT 0,
notifies INT DEFAULT 0,
moq INT,
rating TINYINT UNSIGNED,
reviews INT UNSIGNED,
weight DECIMAL(10,3),
length DECIMAL(10,3),
width DECIMAL(10,3),
height DECIMAL(10,3),
total_sold INT UNSIGNED,
country_of_origin VARCHAR(5),
date_last_sold DATE,
category_ids TEXT,
needs_update BOOLEAN DEFAULT TRUE,
PRIMARY KEY (pid)
) ENGINE=InnoDB
`);
// Get data from production and insert into temp table
const [prodData] = await prodConnection.query(`
SELECT
p.pid,
p.description AS title,
p.notes AS description,
p.itemnumber AS SKU,
p.date_created,
p.datein AS first_received,
p.location,
p.upc AS barcode,
p.harmonized_tariff_code,
p.stamp AS updated_at,
CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible,
CASE
WHEN p.reorder < 0 THEN 0
WHEN (
((p.datein = '0000-00-00 00:00:00' OR p.datein <= DATE_SUB(NOW(), INTERVAL 5 YEAR))
AND (p.date_refill = '0000-00-00 00:00:00' OR p.date_refill <= DATE_SUB(NOW(), INTERVAL 5 YEAR)))
) THEN 0
ELSE 1
END AS replenishable,
s.companyname AS vendor,
CASE WHEN s.companyname = 'Notions'
THEN sid.notions_itemnumber
ELSE sid.supplier_itemnumber
END AS vendor_reference,
sid.notions_itemnumber AS notions_reference,
pc1.name AS brand,
pc2.name AS line,
pc3.name AS subline,
pc4.name AS artist,
NULL AS landing_cost_price,
CONCAT('https://www.acherryontop.com/shop/product/', p.pid) AS permalink,
NULL AS options,
NULL AS tags,
NULL AS uom,
(SELECT COUNT(*) FROM mybasket mb WHERE mb.item = p.pid AND mb.qty > 0) AS baskets,
(SELECT COUNT(*) FROM product_notify pn WHERE pn.pid = p.pid) AS notifies,
COALESCE(CASE
WHEN sid.supplier_id = 92 THEN sid.notions_qty_per_unit
ELSE sid.supplier_qty_per_unit
END, sid.notions_qty_per_unit) AS moq,
p.rating,
p.rating_votes AS reviews,
p.weight,
p.length,
p.width,
p.height,
p.totalsold AS total_sold,
p.country_of_origin,
pls.date_sold as date_last_sold,
GROUP_CONCAT(DISTINCT pci.cat_id) as category_ids,
true // needs_update
FROM products p
LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0
LEFT JOIN supplier_item_data sid ON p.pid = sid.pid
LEFT JOIN suppliers s ON sid.supplier_id = s.supplierid
LEFT JOIN product_category_index pci ON p.pid = pci.pid
LEFT JOIN product_categories pc1 ON p.company = pc1.cat_id
LEFT JOIN product_categories pc2 ON p.line = pc2.cat_id
LEFT JOIN product_categories pc3 ON p.subline = pc3.cat_id
LEFT JOIN product_categories pc4 ON p.artist = pc4.cat_id
LEFT JOIN product_last_sold pls ON p.pid = pls.pid
LEFT JOIN current_inventory ci ON p.pid = ci.pid
LEFT JOIN product_current_prices pcp ON p.pid = pcp.pid AND pcp.active = 1
LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid
WHERE ${incrementalUpdate ? `
p.stamp > ? OR
ci.stamp > ? OR
pcp.date_deactive > ? OR
pcp.date_active > ? OR
sid.stamp > ? OR
pnb.date_updated > ? OR
pls.date_sold > ?
` : 'TRUE'}
GROUP BY p.pid
`, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []);
// Insert production data in batches, but only for products that need updates
for (let i = 0; i < prodData.length; i += 1000) {
const batch = prodData.slice(i, i + 1000);
const placeholders = batch.map(() => `(${Array(31).fill("?").join(",")})`).join(",");
// Map each row to exactly match our temp table columns
const values = batch.flatMap(row => [
row.pid,
row.title,
row.description,
row.SKU,
row.date_created,
row.first_received,
row.location,
row.barcode,
row.harmonized_tariff_code,
row.updated_at,
row.visible,
row.replenishable,
row.vendor,
row.vendor_reference,
row.notions_reference,
row.brand,
row.line,
row.subline,
row.artist,
row.landing_cost_price,
row.permalink,
row.options,
row.tags,
row.uom,
row.baskets,
row.notifies,
row.moq,
row.rating,
row.reviews,
row.weight,
row.length,
row.width,
row.height,
row.total_sold,
row.country_of_origin,
row.date_last_sold,
row.category_ids,
true // needs_update
]);
await localConnection.query(`
INSERT INTO temp_prod_data VALUES ${placeholders}
`, values);
outputProgress({
status: "running",
operation: "Products import",
message: `Loaded ${Math.min(i + 1000, prodData.length)} of ${prodData.length} products from production`,
current: i + batch.length,
total: prodData.length
});
}
// Now join with local temp tables and process in batches, but only for products that need updates
const BATCH_SIZE = 2500;
let processed = 0;
let recordsAdded = 0;
let recordsUpdated = 0;
// Materialize calculations - this will populate temp_products
await materializeCalculations(prodConnection, localConnection, incrementalUpdate, lastSyncTime);
// Get actual count from temp table - only count products that need updates
const [[{ actualTotal }]] = await localConnection.query(`
SELECT COUNT(DISTINCT p.pid) as actualTotal
FROM temp_prod_data p
LEFT JOIN temp_inventory_status tis ON p.pid = tis.pid
LEFT JOIN temp_product_prices tpp ON p.pid = tpp.pid
WHERE p.needs_update = 1
OR tis.needs_update = 1
OR tpp.needs_update = 1
SELECT COUNT(DISTINCT pid) as actualTotal
FROM temp_products
WHERE needs_update = 1
`);
console.log('Products: Found changes:', actualTotal);
// Process in batches
const BATCH_SIZE = 5000;
let processed = 0;
while (processed < actualTotal) {
const [batch] = await localConnection.query(`
SELECT
p.*,
COALESCE(tis.stock_quantity, 0) as stock_quantity,
COALESCE(tis.preorder_count, 0) as preorder_count,
COALESCE(tis.notions_inv_count, 0) as notions_inv_count,
COALESCE(tpp.price, 0) as price,
COALESCE(tpp.regular_price, 0) as regular_price,
COALESCE(tpp.cost_price, 0) as cost_price
FROM temp_prod_data p
LEFT JOIN temp_inventory_status tis ON p.pid = tis.pid
LEFT JOIN temp_product_prices tpp ON p.pid = tpp.pid
WHERE p.needs_update = 1
OR tis.needs_update = 1
OR tpp.needs_update = 1
SELECT * FROM temp_products
WHERE needs_update = 1
LIMIT ? OFFSET ?
`, [BATCH_SIZE, processed]);
if (!batch || batch.length === 0) break; // Exit if no more records
if (!batch || batch.length === 0) break;
// Add image URLs
batch.forEach(row => {
@@ -467,25 +388,14 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
});
if (batch.length > 0) {
// MySQL 8.0 optimized insert with proper placeholders
const placeholderGroup = `(${Array(columnNames.length).fill("?").join(",")})`;
// First check which products already exist and get their current values
// Get existing products in one query
const [existingProducts] = await localConnection.query(
`SELECT ${columnNames.join(',')} FROM products WHERE pid IN (?)`,
[batch.map(p => p.pid)]
);
const existingPidsMap = new Map(existingProducts.map(p => [p.pid, p]));
// Helper function to map values consistently
const mapValues = (product) => columnNames.map(col => {
const val = product[col] ?? null;
if (col === "managing_stock") return 1;
if (typeof val === "number") return val || 0;
return val;
});
// Split into inserts and updates, comparing values for updates
// Split into inserts and updates
const insertsAndUpdates = batch.reduce((acc, product) => {
if (existingPidsMap.has(product.pid)) {
const existing = existingPidsMap.get(product.pid);
@@ -493,87 +403,88 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
const hasChanges = columnNames.some(col => {
const newVal = product[col] ?? null;
const oldVal = existing[col] ?? null;
// Special handling for numbers to avoid type coercion issues
if (col === "managing_stock") return false; // Skip this as it's always 1
if (typeof newVal === 'number' && typeof oldVal === 'number') {
// Handle NaN and Infinity
if (isNaN(newVal) || isNaN(oldVal)) return isNaN(newVal) !== isNaN(oldVal);
if (!isFinite(newVal) || !isFinite(oldVal)) return !isFinite(newVal) !== !isFinite(oldVal);
// Allow for tiny floating point differences
return Math.abs(newVal - oldVal) > 0.00001;
}
if (col === 'managing_stock') return false; // Skip this as it's always 1
return newVal !== oldVal;
});
if (hasChanges) {
acc.updates.push({
pid: product.pid,
values: mapValues(product)
});
acc.updates.push(product);
}
} else {
acc.inserts.push({
pid: product.pid,
values: mapValues(product)
});
acc.inserts.push(product);
}
return acc;
}, { inserts: [], updates: [] });
// Log summary for this batch
if (insertsAndUpdates.inserts.length > 0 || insertsAndUpdates.updates.length > 0) {
console.log(`Batch summary: ${insertsAndUpdates.inserts.length} new products, ${insertsAndUpdates.updates.length} updates`);
}
// Handle inserts
// Process inserts
if (insertsAndUpdates.inserts.length > 0) {
const insertPlaceholders = Array(insertsAndUpdates.inserts.length).fill(placeholderGroup).join(",");
const insertValues = insertsAndUpdates.inserts.map(product =>
columnNames.map(col => {
const val = product[col] ?? null;
if (col === "managing_stock") return 1;
return val;
})
);
const insertPlaceholders = insertsAndUpdates.inserts
.map(() => `(${Array(columnNames.length).fill('?').join(',')})`)
.join(',');
const insertResult = await localConnection.query(`
INSERT INTO products (${columnNames.join(",")})
INSERT INTO products (${columnNames.join(',')})
VALUES ${insertPlaceholders}
`, insertsAndUpdates.inserts.map(i => i.values).flat());
`, insertValues.flat());
recordsAdded += insertResult[0].affectedRows;
}
// Handle updates - now we know these actually have changes
// Process updates
if (insertsAndUpdates.updates.length > 0) {
const updatePlaceholders = Array(insertsAndUpdates.updates.length).fill(placeholderGroup).join(",");
const updateValues = insertsAndUpdates.updates.map(product =>
columnNames.map(col => {
const val = product[col] ?? null;
if (col === "managing_stock") return 1;
return val;
})
);
const updatePlaceholders = insertsAndUpdates.updates
.map(() => `(${Array(columnNames.length).fill('?').join(',')})`)
.join(',');
const updateResult = await localConnection.query(`
INSERT INTO products (${columnNames.join(",")})
INSERT INTO products (${columnNames.join(',')})
VALUES ${updatePlaceholders}
ON DUPLICATE KEY UPDATE
${columnNames
.filter(col => col !== "pid")
.filter(col => col !== 'pid')
.map(col => `${col} = VALUES(${col})`)
.join(",")};
`, insertsAndUpdates.updates.map(u => u.values).flat());
.join(',')};
`, updateValues.flat());
recordsUpdated += insertsAndUpdates.updates.length;
}
}
// Insert category relationships
const categoryRelationships = [];
batch.forEach(row => {
if (row.category_ids) {
const catIds = row.category_ids
.split(",")
// Process category relationships
if (batch.some(p => p.category_ids)) {
const categoryRelationships = batch
.filter(p => p.category_ids)
.flatMap(product =>
product.category_ids
.split(',')
.map(id => id.trim())
.filter(id => id)
.map(Number);
catIds.forEach(catId => {
if (catId) categoryRelationships.push([row.pid, catId]);
});
}
});
.map(Number)
.filter(id => !isNaN(id))
.map(catId => [catId, product.pid])
);
if (categoryRelationships.length > 0) {
// First verify categories exist
const uniqueCatIds = [...new Set(categoryRelationships.map(([_, catId]) => catId))];
// Verify categories exist before inserting relationships
const uniqueCatIds = [...new Set(categoryRelationships.map(([catId]) => catId))];
const [existingCats] = await localConnection.query(
"SELECT cat_id FROM categories WHERE cat_id IN (?)",
[uniqueCatIds]
@@ -581,31 +492,25 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
const existingCatIds = new Set(existingCats.map(c => c.cat_id));
// Filter relationships to only include existing categories
const validRelationships = categoryRelationships.filter(([_, catId]) =>
const validRelationships = categoryRelationships.filter(([catId]) =>
existingCatIds.has(catId)
);
if (validRelationships.length > 0) {
// Delete existing relationships for these products first
await localConnection.query(
"DELETE FROM product_categories WHERE pid IN (?)",
[batch.map(p => p.pid)]
);
// Insert new relationships using INSERT IGNORE
const catPlaceholders = validRelationships
.map(() => "(?, ?)")
.join(",");
await localConnection.query(
`INSERT IGNORE INTO product_categories (pid, cat_id)
`INSERT IGNORE INTO product_categories (cat_id, pid)
VALUES ${catPlaceholders}`,
validRelationships.flat()
);
}
}
}
}
processed += batch.length; // Only increment by actual records processed
processed += batch.length;
outputProgress({
status: "running",
@@ -617,15 +522,10 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
remaining: estimateRemaining(startTime, processed, actualTotal),
rate: calculateRate(startTime, processed)
});
// Force garbage collection between batches
if (global.gc) {
global.gc();
}
}
// Drop temporary tables
await cleanupTemporaryTables(localConnection);
await setupAndCleanupTempTables(localConnection, 'cleanup');
// Only update sync status if we get here (no errors thrown)
await localConnection.query(`
@@ -668,30 +568,6 @@ async function importMissingProducts(prodConnection, localConnection, missingPid
p.date_created,
p.datein AS first_received,
p.location,
COALESCE(si.available_local, 0) - COALESCE(
(SELECT SUM(oi.qty_ordered - oi.qty_placed)
FROM order_items oi
JOIN _order o ON oi.order_id = o.order_id
WHERE oi.prod_pid = p.pid
AND o.date_placed != '0000-00-00 00:00:00'
AND o.date_shipped = '0000-00-00 00:00:00'
AND oi.pick_finished = 0
AND oi.qty_back = 0
AND o.order_status != 15
AND o.order_status < 90
AND oi.qty_ordered >= oi.qty_placed
AND oi.qty_ordered > 0), 0
) as stock_quantity,
COALESCE(ci.onpreorder, 0) as preorder_count,
COALESCE(pnb.inventory, 0) as notions_inv_count,
COALESCE(pcp.price_each, 0) as price,
COALESCE(p.sellingprice, 0) AS regular_price,
CASE
WHEN EXISTS (SELECT 1 FROM product_inventory WHERE pid = p.pid AND count > 0)
THEN (SELECT ROUND(AVG(costeach), 5) FROM product_inventory WHERE pid = p.pid AND count > 0)
ELSE (SELECT costeach FROM product_inventory WHERE pid = p.pid ORDER BY daterec DESC LIMIT 1)
END AS cost_price,
NULL AS landing_cost_price,
p.upc AS barcode,
p.harmonized_tariff_code,
p.stamp AS updated_at,
@@ -705,21 +581,18 @@ async function importMissingProducts(prodConnection, localConnection, missingPid
) THEN 0
ELSE 1
END AS replenishable,
s.companyname AS vendor,
CASE WHEN s.companyname = 'Notions' THEN sid.notions_itemnumber ELSE sid.supplier_itemnumber END AS vendor_reference,
sid.notions_itemnumber AS notions_reference,
CONCAT('https://www.acherryontop.com/shop/product/', p.pid) AS permalink,
pc1.name AS brand,
pc2.name AS line,
pc3.name AS subline,
pc4.name AS artist,
NULL AS options,
NULL AS tags,
COALESCE(CASE
WHEN sid.supplier_id = 92 THEN sid.notions_qty_per_unit
ELSE sid.supplier_qty_per_unit
END, sid.notions_qty_per_unit) AS moq,
NULL AS uom,
COALESCE(si.available_local, 0) as stock_quantity,
COALESCE(pq.qty, 0) as pending_qty,
COALESCE(ci.onpreorder, 0) as preorder_count,
COALESCE(pnb.inventory, 0) as notions_inv_count,
COALESCE(pcp.price_each, 0) as price,
COALESCE(p.sellingprice, 0) AS regular_price,
CASE
WHEN EXISTS (SELECT 1 FROM product_inventory WHERE pid = p.pid AND count > 0)
THEN (SELECT ROUND(AVG(costeach), 5) FROM product_inventory WHERE pid = p.pid AND count > 0)
ELSE (SELECT costeach FROM product_inventory WHERE pid = p.pid ORDER BY daterec DESC LIMIT 1)
END AS cost_price,
NULL AS landing_cost_price,
p.rating,
p.rating_votes AS reviews,
p.weight,
@@ -786,7 +659,7 @@ async function importMissingProducts(prodConnection, localConnection, missingPid
ON DUPLICATE KEY UPDATE ${columnNames
.filter((col) => col !== "pid")
.map((col) => `${col} = VALUES(${col})`)
.join(",")}
.join(",")};
`;
const result = await localConnection.query(query, productValues);