Revert batching
This commit is contained in:
@@ -77,46 +77,8 @@ async function materializeCalculations(prodConnection, localConnection, incremen
|
||||
message: "Fetching product data from production"
|
||||
});
|
||||
|
||||
// First get total count
|
||||
const [[{ total }]] = await prodConnection.query(`
|
||||
SELECT COUNT(DISTINCT p.pid) as total
|
||||
FROM products p
|
||||
FORCE INDEX (PRIMARY)
|
||||
LEFT JOIN current_inventory ci FORCE INDEX (PRIMARY) ON p.pid = ci.pid
|
||||
LEFT JOIN product_current_prices pcp ON p.pid = pcp.pid AND pcp.active = 1
|
||||
LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid
|
||||
WHERE ${incrementalUpdate ? `
|
||||
p.stamp > ? OR
|
||||
ci.stamp > ? OR
|
||||
pcp.date_deactive > ? OR
|
||||
pcp.date_active > ? OR
|
||||
pnb.date_updated > ?
|
||||
` : 'TRUE'}
|
||||
`, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []);
|
||||
|
||||
if (total === 0) {
|
||||
return [];
|
||||
}
|
||||
|
||||
outputProgress({
|
||||
status: "running",
|
||||
operation: "Products import",
|
||||
message: `Found ${total} products to process`
|
||||
});
|
||||
|
||||
// Process in batches
|
||||
const BATCH_SIZE = 5000;
|
||||
const results = [];
|
||||
|
||||
for (let offset = 0; offset < total; offset += BATCH_SIZE) {
|
||||
outputProgress({
|
||||
status: "running",
|
||||
operation: "Products import",
|
||||
message: `Fetching products ${offset + 1} to ${Math.min(offset + BATCH_SIZE, total)} of ${total}`
|
||||
});
|
||||
|
||||
// Get batch of products
|
||||
const [batchData] = await prodConnection.query(`
|
||||
// Get all product data in a single optimized query
|
||||
const [prodData] = await prodConnection.query(`
|
||||
SELECT
|
||||
p.pid,
|
||||
p.description AS title,
|
||||
@@ -141,8 +103,7 @@ async function materializeCalculations(prodConnection, localConnection, incremen
|
||||
COALESCE(si.available_local, 0) - COALESCE(
|
||||
(SELECT SUM(oi.qty_ordered - oi.qty_placed)
|
||||
FROM order_items oi
|
||||
FORCE INDEX (prod_pid)
|
||||
JOIN _order o FORCE INDEX (PRIMARY) ON oi.order_id = o.order_id
|
||||
JOIN _order o ON oi.order_id = o.order_id
|
||||
WHERE oi.prod_pid = p.pid
|
||||
AND o.date_placed != '0000-00-00 00:00:00'
|
||||
AND o.date_shipped = '0000-00-00 00:00:00'
|
||||
@@ -154,29 +115,29 @@ async function materializeCalculations(prodConnection, localConnection, incremen
|
||||
AND oi.qty_ordered > 0
|
||||
), 0
|
||||
) as stock_quantity,
|
||||
COALESCE(
|
||||
(SELECT SUM(oi.qty_ordered - oi.qty_placed)
|
||||
FROM order_items oi
|
||||
JOIN _order o ON oi.order_id = o.order_id
|
||||
WHERE oi.prod_pid = p.pid
|
||||
AND o.date_placed != '0000-00-00 00:00:00'
|
||||
AND o.date_shipped = '0000-00-00 00:00:00'
|
||||
AND oi.pick_finished = 0
|
||||
AND oi.qty_back = 0
|
||||
AND o.order_status != 15
|
||||
AND o.order_status < 90
|
||||
AND oi.qty_ordered >= oi.qty_placed
|
||||
AND oi.qty_ordered > 0
|
||||
), 0
|
||||
) as pending_qty,
|
||||
COALESCE(ci.onpreorder, 0) as preorder_count,
|
||||
COALESCE(pnb.inventory, 0) as notions_inv_count,
|
||||
COALESCE(pcp.price_each, 0) as price,
|
||||
COALESCE(p.sellingprice, 0) AS regular_price,
|
||||
CASE
|
||||
WHEN EXISTS (
|
||||
SELECT 1 FROM product_inventory pi
|
||||
FORCE INDEX (idx_pid)
|
||||
WHERE pi.pid = p.pid AND pi.count > 0 LIMIT 1
|
||||
)
|
||||
THEN (
|
||||
SELECT ROUND(AVG(costeach), 5)
|
||||
FROM product_inventory pi
|
||||
FORCE INDEX (idx_pid)
|
||||
WHERE pi.pid = p.pid AND pi.count > 0
|
||||
)
|
||||
ELSE (
|
||||
SELECT costeach
|
||||
FROM product_inventory pi
|
||||
FORCE INDEX (idx_pid, daterec)
|
||||
WHERE pi.pid = p.pid
|
||||
ORDER BY daterec DESC LIMIT 1
|
||||
)
|
||||
WHEN EXISTS (SELECT 1 FROM product_inventory WHERE pid = p.pid AND count > 0)
|
||||
THEN (SELECT ROUND(AVG(costeach), 5) FROM product_inventory WHERE pid = p.pid AND count > 0)
|
||||
ELSE (SELECT costeach FROM product_inventory WHERE pid = p.pid ORDER BY daterec DESC LIMIT 1)
|
||||
END AS cost_price,
|
||||
NULL as landing_cost_price,
|
||||
s.companyname AS vendor,
|
||||
@@ -201,7 +162,7 @@ async function materializeCalculations(prodConnection, localConnection, incremen
|
||||
p.width,
|
||||
p.height,
|
||||
p.country_of_origin,
|
||||
(SELECT COUNT(*) FROM mybasket mb FORCE INDEX (idx_item) WHERE mb.item = p.pid AND mb.qty > 0) AS baskets,
|
||||
(SELECT COUNT(*) FROM mybasket mb WHERE mb.item = p.pid AND mb.qty > 0) AS baskets,
|
||||
(SELECT COUNT(*) FROM product_notify pn WHERE pn.pid = p.pid) AS notifies,
|
||||
p.totalsold AS total_sold,
|
||||
pls.date_sold as date_last_sold,
|
||||
@@ -212,14 +173,13 @@ async function materializeCalculations(prodConnection, localConnection, incremen
|
||||
THEN pci.cat_id
|
||||
END) as category_ids
|
||||
FROM products p
|
||||
FORCE INDEX (PRIMARY)
|
||||
LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0
|
||||
LEFT JOIN current_inventory ci FORCE INDEX (PRIMARY) ON p.pid = ci.pid
|
||||
LEFT JOIN current_inventory ci ON p.pid = ci.pid
|
||||
LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid
|
||||
LEFT JOIN product_current_prices pcp ON p.pid = pcp.pid AND pcp.active = 1
|
||||
LEFT JOIN supplier_item_data sid ON p.pid = sid.pid
|
||||
LEFT JOIN suppliers s ON sid.supplier_id = s.supplierid
|
||||
LEFT JOIN product_category_index pci FORCE INDEX (pid) ON p.pid = pci.pid
|
||||
LEFT JOIN product_category_index pci ON p.pid = pci.pid
|
||||
LEFT JOIN product_categories pc ON pci.cat_id = pc.cat_id
|
||||
LEFT JOIN product_categories pc1 ON p.company = pc1.cat_id
|
||||
LEFT JOIN product_categories pc2 ON p.line = pc2.cat_id
|
||||
@@ -234,19 +194,25 @@ async function materializeCalculations(prodConnection, localConnection, incremen
|
||||
pnb.date_updated > ?
|
||||
` : 'TRUE'}
|
||||
GROUP BY p.pid
|
||||
LIMIT ? OFFSET ?
|
||||
`, [...(incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []), BATCH_SIZE, offset]);
|
||||
`, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []);
|
||||
|
||||
if (batchData.length === 0) break;
|
||||
outputProgress({
|
||||
status: "running",
|
||||
operation: "Products import",
|
||||
message: `Processing ${prodData.length} product records`
|
||||
});
|
||||
|
||||
// Insert batch into temp table
|
||||
const values = batchData.map(row => [
|
||||
// Insert all product data into temp table in batches
|
||||
for (let i = 0; i < prodData.length; i += 1000) {
|
||||
const batch = prodData.slice(i, i + 1000);
|
||||
const values = batch.map(row => [
|
||||
row.pid,
|
||||
row.title,
|
||||
row.description,
|
||||
row.SKU,
|
||||
// Set stock quantity to 0 if it's over 5000
|
||||
row.stock_quantity > 5000 ? 0 : Math.max(0, row.stock_quantity),
|
||||
row.pending_qty || 0,
|
||||
row.pending_qty,
|
||||
row.preorder_count,
|
||||
row.notions_inv_count,
|
||||
row.price,
|
||||
@@ -260,7 +226,7 @@ async function materializeCalculations(prodConnection, localConnection, incremen
|
||||
row.subline,
|
||||
row.artist,
|
||||
row.category_ids,
|
||||
row.date_created,
|
||||
row.date_created, // map to created_at
|
||||
row.first_received,
|
||||
row.landing_cost_price,
|
||||
row.barcode,
|
||||
@@ -285,6 +251,7 @@ async function materializeCalculations(prodConnection, localConnection, incremen
|
||||
true // Mark as needing update
|
||||
]);
|
||||
|
||||
if (values.length > 0) {
|
||||
await localConnection.query(`
|
||||
INSERT INTO temp_products (
|
||||
pid, title, description, SKU,
|
||||
@@ -343,8 +310,15 @@ async function materializeCalculations(prodConnection, localConnection, incremen
|
||||
date_last_sold = VALUES(date_last_sold),
|
||||
needs_update = TRUE
|
||||
`, [values]);
|
||||
}
|
||||
|
||||
results.push(...batchData);
|
||||
outputProgress({
|
||||
status: "running",
|
||||
operation: "Products import",
|
||||
message: `Processed ${Math.min(i + 1000, prodData.length)} of ${prodData.length} product records`,
|
||||
current: i + batch.length,
|
||||
total: prodData.length
|
||||
});
|
||||
}
|
||||
|
||||
outputProgress({
|
||||
@@ -352,8 +326,6 @@ async function materializeCalculations(prodConnection, localConnection, incremen
|
||||
operation: "Products import",
|
||||
message: "Finished materializing calculations"
|
||||
});
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
async function importProducts(prodConnection, localConnection, incrementalUpdate = true) {
|
||||
@@ -379,45 +351,11 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
|
||||
|
||||
console.log('Products: Using last sync time:', lastSyncTime);
|
||||
|
||||
// Cache valid categories and their hierarchy at the start
|
||||
const [categories] = await localConnection.query(`
|
||||
WITH RECURSIVE category_hierarchy AS (
|
||||
SELECT
|
||||
cat_id,
|
||||
parent_id,
|
||||
type,
|
||||
1 as level,
|
||||
CAST(cat_id AS CHAR(200)) as path
|
||||
FROM categories
|
||||
UNION ALL
|
||||
SELECT
|
||||
c.cat_id,
|
||||
c.parent_id,
|
||||
c.type,
|
||||
ch.level + 1,
|
||||
CONCAT(ch.path, ',', c.cat_id)
|
||||
FROM categories c
|
||||
JOIN category_hierarchy ch ON c.parent_id = ch.cat_id
|
||||
WHERE ch.level < 10 -- Prevent infinite recursion
|
||||
)
|
||||
SELECT DISTINCT
|
||||
cat_id,
|
||||
parent_id,
|
||||
type,
|
||||
path,
|
||||
level
|
||||
FROM category_hierarchy
|
||||
ORDER BY level DESC
|
||||
`);
|
||||
|
||||
const validCategories = new Map(categories.map(c => [c.cat_id, c]));
|
||||
const validCategoryIds = new Set(categories.map(c => c.cat_id));
|
||||
|
||||
// Setup temporary tables
|
||||
await setupAndCleanupTempTables(localConnection, 'setup');
|
||||
|
||||
// Materialize calculations - this will populate temp_products
|
||||
const results = await materializeCalculations(prodConnection, localConnection, incrementalUpdate, lastSyncTime);
|
||||
await materializeCalculations(prodConnection, localConnection, incrementalUpdate, lastSyncTime);
|
||||
|
||||
// Get actual count from temp table - only count products that need updates
|
||||
const [[{ actualTotal }]] = await localConnection.query(`
|
||||
@@ -431,7 +369,6 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
|
||||
// Process in batches
|
||||
const BATCH_SIZE = 5000;
|
||||
let processed = 0;
|
||||
let categoryBuffer = [];
|
||||
|
||||
while (processed < actualTotal) {
|
||||
const [batch] = await localConnection.query(`
|
||||
@@ -531,7 +468,70 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
|
||||
recordsUpdated += insertsAndUpdates.updates.length;
|
||||
}
|
||||
|
||||
if (insertsAndUpdates.updates.length > 0 || insertsAndUpdates.inserts.length > 0) {
|
||||
const affectedPids = [
|
||||
...insertsAndUpdates.updates.map(p => p.pid),
|
||||
...insertsAndUpdates.inserts.map(p => p.pid)
|
||||
];
|
||||
}
|
||||
|
||||
// Process category relationships
|
||||
if (batch.some(p => p.category_ids)) {
|
||||
// First get all valid categories
|
||||
const allCategoryIds = [...new Set(
|
||||
batch
|
||||
.filter(p => p.category_ids)
|
||||
.flatMap(product =>
|
||||
product.category_ids
|
||||
.split(',')
|
||||
.map(id => id.trim())
|
||||
.filter(id => id)
|
||||
.map(Number)
|
||||
.filter(id => !isNaN(id))
|
||||
)
|
||||
)];
|
||||
|
||||
// Verify categories exist and get their hierarchy
|
||||
const [categories] = await localConnection.query(`
|
||||
WITH RECURSIVE category_hierarchy AS (
|
||||
SELECT
|
||||
cat_id,
|
||||
parent_id,
|
||||
type,
|
||||
1 as level,
|
||||
CAST(cat_id AS CHAR(200)) as path
|
||||
FROM categories
|
||||
WHERE cat_id IN (?)
|
||||
UNION ALL
|
||||
SELECT
|
||||
c.cat_id,
|
||||
c.parent_id,
|
||||
c.type,
|
||||
ch.level + 1,
|
||||
CONCAT(ch.path, ',', c.cat_id)
|
||||
FROM categories c
|
||||
JOIN category_hierarchy ch ON c.parent_id = ch.cat_id
|
||||
WHERE ch.level < 10 -- Prevent infinite recursion
|
||||
)
|
||||
SELECT
|
||||
h.cat_id,
|
||||
h.parent_id,
|
||||
h.type,
|
||||
h.path,
|
||||
h.level
|
||||
FROM (
|
||||
SELECT DISTINCT cat_id, parent_id, type, path, level
|
||||
FROM category_hierarchy
|
||||
WHERE cat_id IN (?)
|
||||
) h
|
||||
ORDER BY h.level DESC
|
||||
`, [allCategoryIds, allCategoryIds]);
|
||||
|
||||
const validCategories = new Map(categories.map(c => [c.cat_id, c]));
|
||||
const validCategoryIds = new Set(categories.map(c => c.cat_id));
|
||||
|
||||
// Build category relationships ensuring proper hierarchy
|
||||
const categoryRelationships = [];
|
||||
batch
|
||||
.filter(p => p.category_ids)
|
||||
.forEach(product => {
|
||||
@@ -550,25 +550,30 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
|
||||
if (category.path.split(',').every(parentId =>
|
||||
validCategoryIds.has(Number(parentId))
|
||||
)) {
|
||||
categoryBuffer.push([category.cat_id, product.pid]);
|
||||
categoryRelationships.push([category.cat_id, product.pid]);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// Process category buffer if it gets too large
|
||||
if (categoryBuffer.length >= 10000) {
|
||||
if (categoryBuffer.length > 0) {
|
||||
const placeholders = categoryBuffer
|
||||
if (categoryRelationships.length > 0) {
|
||||
// First remove any existing relationships that will be replaced
|
||||
await localConnection.query(`
|
||||
DELETE FROM product_categories
|
||||
WHERE pid IN (?) AND cat_id IN (?)
|
||||
`, [
|
||||
[...new Set(categoryRelationships.map(([_, pid]) => pid))],
|
||||
[...new Set(categoryRelationships.map(([catId, _]) => catId))]
|
||||
]);
|
||||
|
||||
// Then insert the new relationships
|
||||
const placeholders = categoryRelationships
|
||||
.map(() => "(?, ?)")
|
||||
.join(",");
|
||||
|
||||
await localConnection.query(`
|
||||
INSERT IGNORE INTO product_categories (cat_id, pid)
|
||||
INSERT INTO product_categories (cat_id, pid)
|
||||
VALUES ${placeholders}
|
||||
ON DUPLICATE KEY UPDATE cat_id = VALUES(cat_id)
|
||||
`, categoryBuffer.flat());
|
||||
|
||||
categoryBuffer = [];
|
||||
`, categoryRelationships.flat());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -587,19 +592,6 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
|
||||
});
|
||||
}
|
||||
|
||||
// Process any remaining category relationships
|
||||
if (categoryBuffer.length > 0) {
|
||||
const placeholders = categoryBuffer
|
||||
.map(() => "(?, ?)")
|
||||
.join(",");
|
||||
|
||||
await localConnection.query(`
|
||||
INSERT IGNORE INTO product_categories (cat_id, pid)
|
||||
VALUES ${placeholders}
|
||||
ON DUPLICATE KEY UPDATE cat_id = VALUES(cat_id)
|
||||
`, categoryBuffer.flat());
|
||||
}
|
||||
|
||||
// Drop temporary tables
|
||||
await setupAndCleanupTempTables(localConnection, 'cleanup');
|
||||
|
||||
|
||||
Reference in New Issue
Block a user