Add batching to initial product import query
This commit is contained in:
@@ -77,115 +77,14 @@ async function materializeCalculations(prodConnection, localConnection, incremen
|
|||||||
message: "Fetching product data from production"
|
message: "Fetching product data from production"
|
||||||
});
|
});
|
||||||
|
|
||||||
// Get all product data in a single optimized query
|
// First get total count
|
||||||
const [prodData] = await prodConnection.query(`
|
const [[{ total }]] = await prodConnection.query(`
|
||||||
SELECT
|
SELECT COUNT(DISTINCT p.pid) as total
|
||||||
p.pid,
|
|
||||||
p.description AS title,
|
|
||||||
p.notes AS description,
|
|
||||||
p.itemnumber AS SKU,
|
|
||||||
p.date_created,
|
|
||||||
p.datein AS first_received,
|
|
||||||
p.location,
|
|
||||||
p.upc AS barcode,
|
|
||||||
p.harmonized_tariff_code,
|
|
||||||
p.stamp AS updated_at,
|
|
||||||
CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible,
|
|
||||||
CASE
|
|
||||||
WHEN p.reorder < 0 THEN 0
|
|
||||||
WHEN (
|
|
||||||
(IFNULL(pls.date_sold, '0000-00-00') = '0000-00-00' OR pls.date_sold <= DATE_SUB(CURDATE(), INTERVAL 5 YEAR))
|
|
||||||
OR (p.datein = '0000-00-00 00:00:00' OR p.datein <= DATE_SUB(NOW(), INTERVAL 5 YEAR))
|
|
||||||
OR (p.date_refill = '0000-00-00 00:00:00' OR p.date_refill <= DATE_SUB(NOW(), INTERVAL 5 YEAR))
|
|
||||||
) THEN 0
|
|
||||||
ELSE 1
|
|
||||||
END AS replenishable,
|
|
||||||
COALESCE(si.available_local, 0) - COALESCE(
|
|
||||||
(SELECT SUM(oi.qty_ordered - oi.qty_placed)
|
|
||||||
FROM order_items oi
|
|
||||||
JOIN _order o ON oi.order_id = o.order_id
|
|
||||||
WHERE oi.prod_pid = p.pid
|
|
||||||
AND o.date_placed != '0000-00-00 00:00:00'
|
|
||||||
AND o.date_shipped = '0000-00-00 00:00:00'
|
|
||||||
AND oi.pick_finished = 0
|
|
||||||
AND oi.qty_back = 0
|
|
||||||
AND o.order_status != 15
|
|
||||||
AND o.order_status < 90
|
|
||||||
AND oi.qty_ordered >= oi.qty_placed
|
|
||||||
AND oi.qty_ordered > 0
|
|
||||||
), 0
|
|
||||||
) as stock_quantity,
|
|
||||||
COALESCE(
|
|
||||||
(SELECT SUM(oi.qty_ordered - oi.qty_placed)
|
|
||||||
FROM order_items oi
|
|
||||||
JOIN _order o ON oi.order_id = o.order_id
|
|
||||||
WHERE oi.prod_pid = p.pid
|
|
||||||
AND o.date_placed != '0000-00-00 00:00:00'
|
|
||||||
AND o.date_shipped = '0000-00-00 00:00:00'
|
|
||||||
AND oi.pick_finished = 0
|
|
||||||
AND oi.qty_back = 0
|
|
||||||
AND o.order_status != 15
|
|
||||||
AND o.order_status < 90
|
|
||||||
AND oi.qty_ordered >= oi.qty_placed
|
|
||||||
AND oi.qty_ordered > 0
|
|
||||||
), 0
|
|
||||||
) as pending_qty,
|
|
||||||
COALESCE(ci.onpreorder, 0) as preorder_count,
|
|
||||||
COALESCE(pnb.inventory, 0) as notions_inv_count,
|
|
||||||
COALESCE(pcp.price_each, 0) as price,
|
|
||||||
COALESCE(p.sellingprice, 0) AS regular_price,
|
|
||||||
CASE
|
|
||||||
WHEN EXISTS (SELECT 1 FROM product_inventory WHERE pid = p.pid AND count > 0)
|
|
||||||
THEN (SELECT ROUND(AVG(costeach), 5) FROM product_inventory WHERE pid = p.pid AND count > 0)
|
|
||||||
ELSE (SELECT costeach FROM product_inventory WHERE pid = p.pid ORDER BY daterec DESC LIMIT 1)
|
|
||||||
END AS cost_price,
|
|
||||||
NULL as landing_cost_price,
|
|
||||||
s.companyname AS vendor,
|
|
||||||
CASE
|
|
||||||
WHEN s.companyname = 'Notions' THEN sid.notions_itemnumber
|
|
||||||
ELSE sid.supplier_itemnumber
|
|
||||||
END AS vendor_reference,
|
|
||||||
sid.notions_itemnumber AS notions_reference,
|
|
||||||
CONCAT('https://www.acherryontop.com/shop/product/', p.pid) AS permalink,
|
|
||||||
pc1.name AS brand,
|
|
||||||
pc2.name AS line,
|
|
||||||
pc3.name AS subline,
|
|
||||||
pc4.name AS artist,
|
|
||||||
COALESCE(CASE
|
|
||||||
WHEN sid.supplier_id = 92 THEN sid.notions_qty_per_unit
|
|
||||||
ELSE sid.supplier_qty_per_unit
|
|
||||||
END, sid.notions_qty_per_unit) AS moq,
|
|
||||||
p.rating,
|
|
||||||
p.rating_votes AS reviews,
|
|
||||||
p.weight,
|
|
||||||
p.length,
|
|
||||||
p.width,
|
|
||||||
p.height,
|
|
||||||
p.country_of_origin,
|
|
||||||
(SELECT COUNT(*) FROM mybasket mb WHERE mb.item = p.pid AND mb.qty > 0) AS baskets,
|
|
||||||
(SELECT COUNT(*) FROM product_notify pn WHERE pn.pid = p.pid) AS notifies,
|
|
||||||
p.totalsold AS total_sold,
|
|
||||||
pls.date_sold as date_last_sold,
|
|
||||||
GROUP_CONCAT(DISTINCT CASE
|
|
||||||
WHEN pc.cat_id IS NOT NULL
|
|
||||||
AND pc.type IN (10, 20, 11, 21, 12, 13)
|
|
||||||
AND pci.cat_id NOT IN (16, 17)
|
|
||||||
THEN pci.cat_id
|
|
||||||
END) as category_ids
|
|
||||||
FROM products p
|
FROM products p
|
||||||
LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0
|
FORCE INDEX (PRIMARY)
|
||||||
LEFT JOIN current_inventory ci ON p.pid = ci.pid
|
LEFT JOIN current_inventory ci FORCE INDEX (PRIMARY) ON p.pid = ci.pid
|
||||||
LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid
|
|
||||||
LEFT JOIN product_current_prices pcp ON p.pid = pcp.pid AND pcp.active = 1
|
LEFT JOIN product_current_prices pcp ON p.pid = pcp.pid AND pcp.active = 1
|
||||||
LEFT JOIN supplier_item_data sid ON p.pid = sid.pid
|
LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid
|
||||||
LEFT JOIN suppliers s ON sid.supplier_id = s.supplierid
|
|
||||||
LEFT JOIN product_category_index pci ON p.pid = pci.pid
|
|
||||||
LEFT JOIN product_categories pc ON pci.cat_id = pc.cat_id
|
|
||||||
LEFT JOIN product_categories pc1 ON p.company = pc1.cat_id
|
|
||||||
LEFT JOIN product_categories pc2 ON p.line = pc2.cat_id
|
|
||||||
LEFT JOIN product_categories pc3 ON p.subline = pc3.cat_id
|
|
||||||
LEFT JOIN product_categories pc4 ON p.artist = pc4.cat_id
|
|
||||||
LEFT JOIN product_last_sold pls ON p.pid = pls.pid
|
|
||||||
WHERE ${incrementalUpdate ? `
|
WHERE ${incrementalUpdate ? `
|
||||||
p.stamp > ? OR
|
p.stamp > ? OR
|
||||||
ci.stamp > ? OR
|
ci.stamp > ? OR
|
||||||
@@ -193,26 +92,161 @@ async function materializeCalculations(prodConnection, localConnection, incremen
|
|||||||
pcp.date_active > ? OR
|
pcp.date_active > ? OR
|
||||||
pnb.date_updated > ?
|
pnb.date_updated > ?
|
||||||
` : 'TRUE'}
|
` : 'TRUE'}
|
||||||
GROUP BY p.pid
|
|
||||||
`, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []);
|
`, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []);
|
||||||
|
|
||||||
|
if (total === 0) {
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
|
||||||
outputProgress({
|
outputProgress({
|
||||||
status: "running",
|
status: "running",
|
||||||
operation: "Products import",
|
operation: "Products import",
|
||||||
message: `Processing ${prodData.length} product records`
|
message: `Found ${total} products to process`
|
||||||
});
|
});
|
||||||
|
|
||||||
// Insert all product data into temp table in batches
|
// Process in batches
|
||||||
for (let i = 0; i < prodData.length; i += 5000) {
|
const BATCH_SIZE = 5000;
|
||||||
const batch = prodData.slice(i, i + 5000);
|
const results = [];
|
||||||
const values = batch.map(row => [
|
|
||||||
|
for (let offset = 0; offset < total; offset += BATCH_SIZE) {
|
||||||
|
outputProgress({
|
||||||
|
status: "running",
|
||||||
|
operation: "Products import",
|
||||||
|
message: `Fetching products ${offset + 1} to ${Math.min(offset + BATCH_SIZE, total)} of ${total}`
|
||||||
|
});
|
||||||
|
|
||||||
|
// Get batch of products
|
||||||
|
const [batchData] = await prodConnection.query(`
|
||||||
|
SELECT
|
||||||
|
p.pid,
|
||||||
|
p.description AS title,
|
||||||
|
p.notes AS description,
|
||||||
|
p.itemnumber AS SKU,
|
||||||
|
p.date_created,
|
||||||
|
p.datein AS first_received,
|
||||||
|
p.location,
|
||||||
|
p.upc AS barcode,
|
||||||
|
p.harmonized_tariff_code,
|
||||||
|
p.stamp AS updated_at,
|
||||||
|
CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible,
|
||||||
|
CASE
|
||||||
|
WHEN p.reorder < 0 THEN 0
|
||||||
|
WHEN (
|
||||||
|
(IFNULL(pls.date_sold, '0000-00-00') = '0000-00-00' OR pls.date_sold <= DATE_SUB(CURDATE(), INTERVAL 5 YEAR))
|
||||||
|
OR (p.datein = '0000-00-00 00:00:00' OR p.datein <= DATE_SUB(NOW(), INTERVAL 5 YEAR))
|
||||||
|
OR (p.date_refill = '0000-00-00 00:00:00' OR p.date_refill <= DATE_SUB(NOW(), INTERVAL 5 YEAR))
|
||||||
|
) THEN 0
|
||||||
|
ELSE 1
|
||||||
|
END AS replenishable,
|
||||||
|
COALESCE(si.available_local, 0) - COALESCE(
|
||||||
|
(SELECT SUM(oi.qty_ordered - oi.qty_placed)
|
||||||
|
FROM order_items oi
|
||||||
|
FORCE INDEX (prod_pid)
|
||||||
|
JOIN _order o FORCE INDEX (PRIMARY) ON oi.order_id = o.order_id
|
||||||
|
WHERE oi.prod_pid = p.pid
|
||||||
|
AND o.date_placed != '0000-00-00 00:00:00'
|
||||||
|
AND o.date_shipped = '0000-00-00 00:00:00'
|
||||||
|
AND oi.pick_finished = 0
|
||||||
|
AND oi.qty_back = 0
|
||||||
|
AND o.order_status != 15
|
||||||
|
AND o.order_status < 90
|
||||||
|
AND oi.qty_ordered >= oi.qty_placed
|
||||||
|
AND oi.qty_ordered > 0
|
||||||
|
), 0
|
||||||
|
) as stock_quantity,
|
||||||
|
COALESCE(ci.onpreorder, 0) as preorder_count,
|
||||||
|
COALESCE(pnb.inventory, 0) as notions_inv_count,
|
||||||
|
COALESCE(pcp.price_each, 0) as price,
|
||||||
|
COALESCE(p.sellingprice, 0) AS regular_price,
|
||||||
|
CASE
|
||||||
|
WHEN EXISTS (
|
||||||
|
SELECT 1 FROM product_inventory pi
|
||||||
|
FORCE INDEX (idx_pid)
|
||||||
|
WHERE pi.pid = p.pid AND pi.count > 0 LIMIT 1
|
||||||
|
)
|
||||||
|
THEN (
|
||||||
|
SELECT ROUND(AVG(costeach), 5)
|
||||||
|
FROM product_inventory pi
|
||||||
|
FORCE INDEX (idx_pid)
|
||||||
|
WHERE pi.pid = p.pid AND pi.count > 0
|
||||||
|
)
|
||||||
|
ELSE (
|
||||||
|
SELECT costeach
|
||||||
|
FROM product_inventory pi
|
||||||
|
FORCE INDEX (idx_pid, daterec)
|
||||||
|
WHERE pi.pid = p.pid
|
||||||
|
ORDER BY daterec DESC LIMIT 1
|
||||||
|
)
|
||||||
|
END AS cost_price,
|
||||||
|
NULL as landing_cost_price,
|
||||||
|
s.companyname AS vendor,
|
||||||
|
CASE
|
||||||
|
WHEN s.companyname = 'Notions' THEN sid.notions_itemnumber
|
||||||
|
ELSE sid.supplier_itemnumber
|
||||||
|
END AS vendor_reference,
|
||||||
|
sid.notions_itemnumber AS notions_reference,
|
||||||
|
CONCAT('https://www.acherryontop.com/shop/product/', p.pid) AS permalink,
|
||||||
|
pc1.name AS brand,
|
||||||
|
pc2.name AS line,
|
||||||
|
pc3.name AS subline,
|
||||||
|
pc4.name AS artist,
|
||||||
|
COALESCE(CASE
|
||||||
|
WHEN sid.supplier_id = 92 THEN sid.notions_qty_per_unit
|
||||||
|
ELSE sid.supplier_qty_per_unit
|
||||||
|
END, sid.notions_qty_per_unit) AS moq,
|
||||||
|
p.rating,
|
||||||
|
p.rating_votes AS reviews,
|
||||||
|
p.weight,
|
||||||
|
p.length,
|
||||||
|
p.width,
|
||||||
|
p.height,
|
||||||
|
p.country_of_origin,
|
||||||
|
(SELECT COUNT(*) FROM mybasket mb FORCE INDEX (idx_item) WHERE mb.item = p.pid AND mb.qty > 0) AS baskets,
|
||||||
|
(SELECT COUNT(*) FROM product_notify pn WHERE pn.pid = p.pid) AS notifies,
|
||||||
|
p.totalsold AS total_sold,
|
||||||
|
pls.date_sold as date_last_sold,
|
||||||
|
GROUP_CONCAT(DISTINCT CASE
|
||||||
|
WHEN pc.cat_id IS NOT NULL
|
||||||
|
AND pc.type IN (10, 20, 11, 21, 12, 13)
|
||||||
|
AND pci.cat_id NOT IN (16, 17)
|
||||||
|
THEN pci.cat_id
|
||||||
|
END) as category_ids
|
||||||
|
FROM products p
|
||||||
|
FORCE INDEX (PRIMARY)
|
||||||
|
LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0
|
||||||
|
LEFT JOIN current_inventory ci FORCE INDEX (PRIMARY) ON p.pid = ci.pid
|
||||||
|
LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid
|
||||||
|
LEFT JOIN product_current_prices pcp ON p.pid = pcp.pid AND pcp.active = 1
|
||||||
|
LEFT JOIN supplier_item_data sid ON p.pid = sid.pid
|
||||||
|
LEFT JOIN suppliers s ON sid.supplier_id = s.supplierid
|
||||||
|
LEFT JOIN product_category_index pci FORCE INDEX (pid) ON p.pid = pci.pid
|
||||||
|
LEFT JOIN product_categories pc ON pci.cat_id = pc.cat_id
|
||||||
|
LEFT JOIN product_categories pc1 ON p.company = pc1.cat_id
|
||||||
|
LEFT JOIN product_categories pc2 ON p.line = pc2.cat_id
|
||||||
|
LEFT JOIN product_categories pc3 ON p.subline = pc3.cat_id
|
||||||
|
LEFT JOIN product_categories pc4 ON p.artist = pc4.cat_id
|
||||||
|
LEFT JOIN product_last_sold pls ON p.pid = pls.pid
|
||||||
|
WHERE ${incrementalUpdate ? `
|
||||||
|
p.stamp > ? OR
|
||||||
|
ci.stamp > ? OR
|
||||||
|
pcp.date_deactive > ? OR
|
||||||
|
pcp.date_active > ? OR
|
||||||
|
pnb.date_updated > ?
|
||||||
|
` : 'TRUE'}
|
||||||
|
GROUP BY p.pid
|
||||||
|
LIMIT ? OFFSET ?
|
||||||
|
`, [...(incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []), BATCH_SIZE, offset]);
|
||||||
|
|
||||||
|
if (batchData.length === 0) break;
|
||||||
|
|
||||||
|
// Insert batch into temp table
|
||||||
|
const values = batchData.map(row => [
|
||||||
row.pid,
|
row.pid,
|
||||||
row.title,
|
row.title,
|
||||||
row.description,
|
row.description,
|
||||||
row.SKU,
|
row.SKU,
|
||||||
// Set stock quantity to 0 if it's over 5000
|
|
||||||
row.stock_quantity > 5000 ? 0 : Math.max(0, row.stock_quantity),
|
row.stock_quantity > 5000 ? 0 : Math.max(0, row.stock_quantity),
|
||||||
row.pending_qty,
|
row.pending_qty || 0,
|
||||||
row.preorder_count,
|
row.preorder_count,
|
||||||
row.notions_inv_count,
|
row.notions_inv_count,
|
||||||
row.price,
|
row.price,
|
||||||
@@ -226,7 +260,7 @@ async function materializeCalculations(prodConnection, localConnection, incremen
|
|||||||
row.subline,
|
row.subline,
|
||||||
row.artist,
|
row.artist,
|
||||||
row.category_ids,
|
row.category_ids,
|
||||||
row.date_created, // map to created_at
|
row.date_created,
|
||||||
row.first_received,
|
row.first_received,
|
||||||
row.landing_cost_price,
|
row.landing_cost_price,
|
||||||
row.barcode,
|
row.barcode,
|
||||||
@@ -251,74 +285,66 @@ async function materializeCalculations(prodConnection, localConnection, incremen
|
|||||||
true // Mark as needing update
|
true // Mark as needing update
|
||||||
]);
|
]);
|
||||||
|
|
||||||
if (values.length > 0) {
|
await localConnection.query(`
|
||||||
await localConnection.query(`
|
INSERT INTO temp_products (
|
||||||
INSERT INTO temp_products (
|
pid, title, description, SKU,
|
||||||
pid, title, description, SKU,
|
stock_quantity, pending_qty, preorder_count, notions_inv_count,
|
||||||
stock_quantity, pending_qty, preorder_count, notions_inv_count,
|
price, regular_price, cost_price,
|
||||||
price, regular_price, cost_price,
|
vendor, vendor_reference, notions_reference,
|
||||||
vendor, vendor_reference, notions_reference,
|
brand, line, subline, artist,
|
||||||
brand, line, subline, artist,
|
category_ids, created_at, first_received,
|
||||||
category_ids, created_at, first_received,
|
landing_cost_price, barcode, harmonized_tariff_code,
|
||||||
landing_cost_price, barcode, harmonized_tariff_code,
|
updated_at, visible, replenishable, permalink,
|
||||||
updated_at, visible, replenishable, permalink,
|
moq, rating, reviews, weight, length, width,
|
||||||
moq, rating, reviews, weight, length, width,
|
height, country_of_origin, location, total_sold,
|
||||||
height, country_of_origin, location, total_sold,
|
baskets, notifies, date_last_sold, needs_update
|
||||||
baskets, notifies, date_last_sold, needs_update
|
)
|
||||||
)
|
VALUES ?
|
||||||
VALUES ?
|
ON DUPLICATE KEY UPDATE
|
||||||
ON DUPLICATE KEY UPDATE
|
title = VALUES(title),
|
||||||
title = VALUES(title),
|
description = VALUES(description),
|
||||||
description = VALUES(description),
|
SKU = VALUES(SKU),
|
||||||
SKU = VALUES(SKU),
|
stock_quantity = VALUES(stock_quantity),
|
||||||
stock_quantity = VALUES(stock_quantity),
|
pending_qty = VALUES(pending_qty),
|
||||||
pending_qty = VALUES(pending_qty),
|
preorder_count = VALUES(preorder_count),
|
||||||
preorder_count = VALUES(preorder_count),
|
notions_inv_count = VALUES(notions_inv_count),
|
||||||
notions_inv_count = VALUES(notions_inv_count),
|
price = VALUES(price),
|
||||||
price = VALUES(price),
|
regular_price = VALUES(regular_price),
|
||||||
regular_price = VALUES(regular_price),
|
cost_price = VALUES(cost_price),
|
||||||
cost_price = VALUES(cost_price),
|
vendor = VALUES(vendor),
|
||||||
vendor = VALUES(vendor),
|
vendor_reference = VALUES(vendor_reference),
|
||||||
vendor_reference = VALUES(vendor_reference),
|
notions_reference = VALUES(notions_reference),
|
||||||
notions_reference = VALUES(notions_reference),
|
brand = VALUES(brand),
|
||||||
brand = VALUES(brand),
|
line = VALUES(line),
|
||||||
line = VALUES(line),
|
subline = VALUES(subline),
|
||||||
subline = VALUES(subline),
|
artist = VALUES(artist),
|
||||||
artist = VALUES(artist),
|
category_ids = VALUES(category_ids),
|
||||||
category_ids = VALUES(category_ids),
|
created_at = VALUES(created_at),
|
||||||
created_at = VALUES(created_at),
|
first_received = VALUES(first_received),
|
||||||
first_received = VALUES(first_received),
|
landing_cost_price = VALUES(landing_cost_price),
|
||||||
landing_cost_price = VALUES(landing_cost_price),
|
barcode = VALUES(barcode),
|
||||||
barcode = VALUES(barcode),
|
harmonized_tariff_code = VALUES(harmonized_tariff_code),
|
||||||
harmonized_tariff_code = VALUES(harmonized_tariff_code),
|
updated_at = VALUES(updated_at),
|
||||||
updated_at = VALUES(updated_at),
|
visible = VALUES(visible),
|
||||||
visible = VALUES(visible),
|
replenishable = VALUES(replenishable),
|
||||||
replenishable = VALUES(replenishable),
|
permalink = VALUES(permalink),
|
||||||
permalink = VALUES(permalink),
|
moq = VALUES(moq),
|
||||||
moq = VALUES(moq),
|
rating = VALUES(rating),
|
||||||
rating = VALUES(rating),
|
reviews = VALUES(reviews),
|
||||||
reviews = VALUES(reviews),
|
weight = VALUES(weight),
|
||||||
weight = VALUES(weight),
|
length = VALUES(length),
|
||||||
length = VALUES(length),
|
width = VALUES(width),
|
||||||
width = VALUES(width),
|
height = VALUES(height),
|
||||||
height = VALUES(height),
|
country_of_origin = VALUES(country_of_origin),
|
||||||
country_of_origin = VALUES(country_of_origin),
|
location = VALUES(location),
|
||||||
location = VALUES(location),
|
total_sold = VALUES(total_sold),
|
||||||
total_sold = VALUES(total_sold),
|
baskets = VALUES(baskets),
|
||||||
baskets = VALUES(baskets),
|
notifies = VALUES(notifies),
|
||||||
notifies = VALUES(notifies),
|
date_last_sold = VALUES(date_last_sold),
|
||||||
date_last_sold = VALUES(date_last_sold),
|
needs_update = TRUE
|
||||||
needs_update = TRUE
|
`, [values]);
|
||||||
`, [values]);
|
|
||||||
}
|
|
||||||
|
|
||||||
outputProgress({
|
results.push(...batchData);
|
||||||
status: "running",
|
|
||||||
operation: "Products import",
|
|
||||||
message: `Processed ${Math.min(i + 5000, prodData.length)} of ${prodData.length} product records`,
|
|
||||||
current: i + batch.length,
|
|
||||||
total: prodData.length
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
outputProgress({
|
outputProgress({
|
||||||
@@ -326,6 +352,8 @@ async function materializeCalculations(prodConnection, localConnection, incremen
|
|||||||
operation: "Products import",
|
operation: "Products import",
|
||||||
message: "Finished materializing calculations"
|
message: "Finished materializing calculations"
|
||||||
});
|
});
|
||||||
|
|
||||||
|
return results;
|
||||||
}
|
}
|
||||||
|
|
||||||
async function importProducts(prodConnection, localConnection, incrementalUpdate = true) {
|
async function importProducts(prodConnection, localConnection, incrementalUpdate = true) {
|
||||||
@@ -351,11 +379,45 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
|
|||||||
|
|
||||||
console.log('Products: Using last sync time:', lastSyncTime);
|
console.log('Products: Using last sync time:', lastSyncTime);
|
||||||
|
|
||||||
|
// Cache valid categories and their hierarchy at the start
|
||||||
|
const [categories] = await localConnection.query(`
|
||||||
|
WITH RECURSIVE category_hierarchy AS (
|
||||||
|
SELECT
|
||||||
|
cat_id,
|
||||||
|
parent_id,
|
||||||
|
type,
|
||||||
|
1 as level,
|
||||||
|
CAST(cat_id AS CHAR(200)) as path
|
||||||
|
FROM categories
|
||||||
|
UNION ALL
|
||||||
|
SELECT
|
||||||
|
c.cat_id,
|
||||||
|
c.parent_id,
|
||||||
|
c.type,
|
||||||
|
ch.level + 1,
|
||||||
|
CONCAT(ch.path, ',', c.cat_id)
|
||||||
|
FROM categories c
|
||||||
|
JOIN category_hierarchy ch ON c.parent_id = ch.cat_id
|
||||||
|
WHERE ch.level < 10 -- Prevent infinite recursion
|
||||||
|
)
|
||||||
|
SELECT DISTINCT
|
||||||
|
cat_id,
|
||||||
|
parent_id,
|
||||||
|
type,
|
||||||
|
path,
|
||||||
|
level
|
||||||
|
FROM category_hierarchy
|
||||||
|
ORDER BY level DESC
|
||||||
|
`);
|
||||||
|
|
||||||
|
const validCategories = new Map(categories.map(c => [c.cat_id, c]));
|
||||||
|
const validCategoryIds = new Set(categories.map(c => c.cat_id));
|
||||||
|
|
||||||
// Setup temporary tables
|
// Setup temporary tables
|
||||||
await setupAndCleanupTempTables(localConnection, 'setup');
|
await setupAndCleanupTempTables(localConnection, 'setup');
|
||||||
|
|
||||||
// Materialize calculations - this will populate temp_products
|
// Materialize calculations - this will populate temp_products
|
||||||
await materializeCalculations(prodConnection, localConnection, incrementalUpdate, lastSyncTime);
|
const results = await materializeCalculations(prodConnection, localConnection, incrementalUpdate, lastSyncTime);
|
||||||
|
|
||||||
// Get actual count from temp table - only count products that need updates
|
// Get actual count from temp table - only count products that need updates
|
||||||
const [[{ actualTotal }]] = await localConnection.query(`
|
const [[{ actualTotal }]] = await localConnection.query(`
|
||||||
@@ -369,6 +431,7 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
|
|||||||
// Process in batches
|
// Process in batches
|
||||||
const BATCH_SIZE = 5000;
|
const BATCH_SIZE = 5000;
|
||||||
let processed = 0;
|
let processed = 0;
|
||||||
|
let categoryBuffer = [];
|
||||||
|
|
||||||
while (processed < actualTotal) {
|
while (processed < actualTotal) {
|
||||||
const [batch] = await localConnection.query(`
|
const [batch] = await localConnection.query(`
|
||||||
@@ -468,113 +531,45 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
|
|||||||
recordsUpdated += insertsAndUpdates.updates.length;
|
recordsUpdated += insertsAndUpdates.updates.length;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (insertsAndUpdates.updates.length > 0 || insertsAndUpdates.inserts.length > 0) {
|
|
||||||
const affectedPids = [
|
|
||||||
...insertsAndUpdates.updates.map(p => p.pid),
|
|
||||||
...insertsAndUpdates.inserts.map(p => p.pid)
|
|
||||||
];
|
|
||||||
}
|
|
||||||
|
|
||||||
// Process category relationships
|
// Process category relationships
|
||||||
if (batch.some(p => p.category_ids)) {
|
batch
|
||||||
// First get all valid categories
|
.filter(p => p.category_ids)
|
||||||
const allCategoryIds = [...new Set(
|
.forEach(product => {
|
||||||
batch
|
const productCategories = product.category_ids
|
||||||
.filter(p => p.category_ids)
|
.split(',')
|
||||||
.flatMap(product =>
|
.map(id => id.trim())
|
||||||
product.category_ids
|
.filter(id => id)
|
||||||
.split(',')
|
.map(Number)
|
||||||
.map(id => id.trim())
|
.filter(id => !isNaN(id))
|
||||||
.filter(id => id)
|
.filter(id => validCategoryIds.has(id))
|
||||||
.map(Number)
|
.map(id => validCategories.get(id))
|
||||||
.filter(id => !isNaN(id))
|
.sort((a, b) => a.type - b.type); // Sort by type to ensure proper hierarchy
|
||||||
)
|
|
||||||
)];
|
|
||||||
|
|
||||||
// Verify categories exist and get their hierarchy
|
// Only add relationships that maintain proper hierarchy
|
||||||
const [categories] = await localConnection.query(`
|
productCategories.forEach(category => {
|
||||||
WITH RECURSIVE category_hierarchy AS (
|
if (category.path.split(',').every(parentId =>
|
||||||
SELECT
|
validCategoryIds.has(Number(parentId))
|
||||||
cat_id,
|
)) {
|
||||||
parent_id,
|
categoryBuffer.push([category.cat_id, product.pid]);
|
||||||
type,
|
}
|
||||||
1 as level,
|
});
|
||||||
CAST(cat_id AS CHAR(200)) as path
|
});
|
||||||
FROM categories
|
|
||||||
WHERE cat_id IN (?)
|
|
||||||
UNION ALL
|
|
||||||
SELECT
|
|
||||||
c.cat_id,
|
|
||||||
c.parent_id,
|
|
||||||
c.type,
|
|
||||||
ch.level + 1,
|
|
||||||
CONCAT(ch.path, ',', c.cat_id)
|
|
||||||
FROM categories c
|
|
||||||
JOIN category_hierarchy ch ON c.parent_id = ch.cat_id
|
|
||||||
WHERE ch.level < 10 -- Prevent infinite recursion
|
|
||||||
)
|
|
||||||
SELECT
|
|
||||||
h.cat_id,
|
|
||||||
h.parent_id,
|
|
||||||
h.type,
|
|
||||||
h.path,
|
|
||||||
h.level
|
|
||||||
FROM (
|
|
||||||
SELECT DISTINCT cat_id, parent_id, type, path, level
|
|
||||||
FROM category_hierarchy
|
|
||||||
WHERE cat_id IN (?)
|
|
||||||
) h
|
|
||||||
ORDER BY h.level DESC
|
|
||||||
`, [allCategoryIds, allCategoryIds]);
|
|
||||||
|
|
||||||
const validCategories = new Map(categories.map(c => [c.cat_id, c]));
|
// Process category buffer if it gets too large
|
||||||
const validCategoryIds = new Set(categories.map(c => c.cat_id));
|
if (categoryBuffer.length >= 10000) {
|
||||||
|
if (categoryBuffer.length > 0) {
|
||||||
|
const placeholders = categoryBuffer
|
||||||
|
.map(() => "(?, ?)")
|
||||||
|
.join(",");
|
||||||
|
|
||||||
// Build category relationships ensuring proper hierarchy
|
await localConnection.query(`
|
||||||
const categoryRelationships = [];
|
INSERT IGNORE INTO product_categories (cat_id, pid)
|
||||||
batch
|
VALUES ${placeholders}
|
||||||
.filter(p => p.category_ids)
|
ON DUPLICATE KEY UPDATE cat_id = VALUES(cat_id)
|
||||||
.forEach(product => {
|
`, categoryBuffer.flat());
|
||||||
const productCategories = product.category_ids
|
|
||||||
.split(',')
|
|
||||||
.map(id => id.trim())
|
|
||||||
.filter(id => id)
|
|
||||||
.map(Number)
|
|
||||||
.filter(id => !isNaN(id))
|
|
||||||
.filter(id => validCategoryIds.has(id))
|
|
||||||
.map(id => validCategories.get(id))
|
|
||||||
.sort((a, b) => a.type - b.type); // Sort by type to ensure proper hierarchy
|
|
||||||
|
|
||||||
// Only add relationships that maintain proper hierarchy
|
categoryBuffer = [];
|
||||||
productCategories.forEach(category => {
|
}
|
||||||
if (category.path.split(',').every(parentId =>
|
|
||||||
validCategoryIds.has(Number(parentId))
|
|
||||||
)) {
|
|
||||||
categoryRelationships.push([category.cat_id, product.pid]);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
if (categoryRelationships.length > 0) {
|
|
||||||
// First remove any existing relationships that will be replaced
|
|
||||||
await localConnection.query(`
|
|
||||||
DELETE FROM product_categories
|
|
||||||
WHERE pid IN (?) AND cat_id IN (?)
|
|
||||||
`, [
|
|
||||||
[...new Set(categoryRelationships.map(([_, pid]) => pid))],
|
|
||||||
[...new Set(categoryRelationships.map(([catId, _]) => catId))]
|
|
||||||
]);
|
|
||||||
|
|
||||||
// Then insert the new relationships
|
|
||||||
const placeholders = categoryRelationships
|
|
||||||
.map(() => "(?, ?)")
|
|
||||||
.join(",");
|
|
||||||
|
|
||||||
await localConnection.query(`
|
|
||||||
INSERT INTO product_categories (cat_id, pid)
|
|
||||||
VALUES ${placeholders}
|
|
||||||
`, categoryRelationships.flat());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -592,6 +587,19 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Process any remaining category relationships
|
||||||
|
if (categoryBuffer.length > 0) {
|
||||||
|
const placeholders = categoryBuffer
|
||||||
|
.map(() => "(?, ?)")
|
||||||
|
.join(",");
|
||||||
|
|
||||||
|
await localConnection.query(`
|
||||||
|
INSERT IGNORE INTO product_categories (cat_id, pid)
|
||||||
|
VALUES ${placeholders}
|
||||||
|
ON DUPLICATE KEY UPDATE cat_id = VALUES(cat_id)
|
||||||
|
`, categoryBuffer.flat());
|
||||||
|
}
|
||||||
|
|
||||||
// Drop temporary tables
|
// Drop temporary tables
|
||||||
await setupAndCleanupTempTables(localConnection, 'cleanup');
|
await setupAndCleanupTempTables(localConnection, 'cleanup');
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user