Optimize order and product import scripts with improved performance and incremental update handling

- Refactor orders import to use temporary tables for more efficient data processing
- Improve batch processing and memory management in order import script
- Update product import to use temporary tables for inventory status
- Modify purchase orders import to use updated timestamp for incremental updates
- Enhance error handling and logging for import processes
This commit is contained in:
2025-01-30 21:13:53 -05:00
parent c433f1aae8
commit b506f89dd7
4 changed files with 357 additions and 274 deletions

View File

@@ -10,10 +10,10 @@ const importPurchaseOrders = require('./import/purchase-orders');
dotenv.config({ path: path.join(__dirname, "../.env") }); dotenv.config({ path: path.join(__dirname, "../.env") });
// Constants to control which imports run // Constants to control which imports run
const IMPORT_CATEGORIES = false; const IMPORT_CATEGORIES = true;
const IMPORT_PRODUCTS = false; const IMPORT_PRODUCTS = true;
const IMPORT_ORDERS = true; const IMPORT_ORDERS = true;
const IMPORT_PURCHASE_ORDERS = false; const IMPORT_PURCHASE_ORDERS = true;
// Add flag for incremental updates // Add flag for incremental updates
const INCREMENTAL_UPDATE = process.env.INCREMENTAL_UPDATE === 'true'; const INCREMENTAL_UPDATE = process.env.INCREMENTAL_UPDATE === 'true';

View File

@@ -34,62 +34,53 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
); );
const lastSyncTime = syncInfo?.[0]?.last_sync_timestamp || '1970-01-01'; const lastSyncTime = syncInfo?.[0]?.last_sync_timestamp || '1970-01-01';
// Count the total number of orders to be imported // Create temporary tables for staging data
const [countResults] = await prodConnection.query(` await localConnection.query(`
CREATE TEMPORARY TABLE temp_order_items (
order_id INT UNSIGNED,
pid INT UNSIGNED,
SKU VARCHAR(50),
price DECIMAL(10,3),
quantity INT,
base_discount DECIMAL(10,3),
PRIMARY KEY (order_id, pid)
) ENGINE=InnoDB;
CREATE TEMPORARY TABLE temp_order_meta (
order_id INT UNSIGNED PRIMARY KEY,
date DATE,
customer INT UNSIGNED,
customer_name VARCHAR(100),
status TINYINT UNSIGNED,
canceled TINYINT UNSIGNED
) ENGINE=InnoDB;
CREATE TEMPORARY TABLE temp_order_discounts (
order_id INT UNSIGNED,
pid INT UNSIGNED,
discount DECIMAL(10,3),
PRIMARY KEY (order_id, pid)
) ENGINE=InnoDB;
CREATE TEMPORARY TABLE temp_order_taxes (
order_id INT UNSIGNED,
pid INT UNSIGNED,
tax DECIMAL(10,3),
PRIMARY KEY (order_id, pid)
) ENGINE=InnoDB;
`);
// Get base order items first
const [orderItems] = await prodConnection.query(`
SELECT SELECT
COUNT(DISTINCT oi.order_id, oi.prod_pid) as total_all, oi.order_id,
SUM(CASE
WHEN o.stamp > ? OR o.date_placed > ? OR o.date_shipped > ? OR oi.stamp > ?
THEN 1 ELSE 0
END) as total_incremental
FROM order_items oi
JOIN _order o ON oi.order_id = o.order_id
WHERE o.order_status >= 15
AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR)
AND o.date_placed_onlydate IS NOT NULL
`, [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime]);
console.log('Count details:', {
total_all: countResults[0].total_all,
total_incremental: countResults[0].total_incremental,
lastSyncTime,
incrementalUpdate
});
const totalOrders = incrementalUpdate ? countResults[0].total_incremental : countResults[0].total_all;
outputProgress({
status: "running",
operation: "Orders import",
message: `Starting ${incrementalUpdate ? 'incremental' : 'full'} import of ${totalOrders} orders`,
current: 0,
total: totalOrders
});
// Fetch orders in batches
const batchSize = 5000;
let offset = 0;
let importedCount = 0;
let lastProgressUpdate = Date.now();
while (offset < totalOrders) {
// First get the base order data
const [prodOrders] = await prodConnection.query(`
SELECT
oi.order_id as order_number,
oi.prod_pid as pid, oi.prod_pid as pid,
oi.prod_itemnumber as SKU, oi.prod_itemnumber as SKU,
o.date_placed_onlydate as date,
oi.prod_price as price, oi.prod_price as price,
oi.qty_ordered as quantity, oi.qty_ordered as quantity,
COALESCE(oi.prod_price_reg - oi.prod_price, 0) * oi.qty_ordered as base_discount, COALESCE(oi.prod_price_reg - oi.prod_price, 0) * oi.qty_ordered as base_discount
o.order_cid as customer,
CONCAT(COALESCE(u.firstname, ''), ' ', COALESCE(u.lastname, '')) as customer_name,
o.order_status as status,
CASE WHEN o.date_cancelled != '0000-00-00 00:00:00' THEN 1 ELSE 0 END as canceled
FROM order_items oi FROM order_items oi
JOIN _order o ON oi.order_id = o.order_id JOIN _order o ON oi.order_id = o.order_id
LEFT JOIN users u ON o.order_cid = u.cid
WHERE o.order_status >= 15 WHERE o.order_status >= 15
AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR) AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR)
AND o.date_placed_onlydate IS NOT NULL AND o.date_placed_onlydate IS NOT NULL
@@ -101,131 +92,217 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
OR oi.stamp > ? OR oi.stamp > ?
) )
` : ''} ` : ''}
ORDER BY oi.order_id, oi.prod_pid `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []);
LIMIT ? OFFSET ?
`, incrementalUpdate ?
[lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, batchSize, offset] :
[batchSize, offset]
);
if (prodOrders.length === 0) break; const totalOrders = orderItems.length;
let processed = 0;
// Get order numbers for this batch // Insert order items in batches
const orderNumbers = [...new Set(prodOrders.map(o => o.order_number))]; for (let i = 0; i < orderItems.length; i += 5000) {
const orderPids = prodOrders.map(o => o.pid); const batch = orderItems.slice(i, Math.min(i + 5000, orderItems.length));
const placeholders = batch.map(() => "(?, ?, ?, ?, ?, ?)").join(",");
const values = batch.flatMap(item => [
item.order_id, item.pid, item.SKU, item.price, item.quantity, item.base_discount
]);
// Get promotional discounts in a separate query await localConnection.query(`
const [promoDiscounts] = await prodConnection.query(` INSERT INTO temp_order_items VALUES ${placeholders}
SELECT order_id, pid, amount `, values);
processed += batch.length;
outputProgress({
status: "running",
operation: "Orders import",
message: `Loading order items: ${processed} of ${totalOrders}`,
current: processed,
total: totalOrders
});
}
// Get unique order IDs
const orderIds = [...new Set(orderItems.map(item => item.order_id))];
// Get order metadata in batches
for (let i = 0; i < orderIds.length; i += 5000) {
const batchIds = orderIds.slice(i, i + 5000);
const [orders] = await prodConnection.query(`
SELECT
o.order_id,
o.date_placed_onlydate as date,
o.order_cid as customer,
CONCAT(COALESCE(u.firstname, ''), ' ', COALESCE(u.lastname, '')) as customer_name,
o.order_status as status,
CASE WHEN o.date_cancelled != '0000-00-00 00:00:00' THEN 1 ELSE 0 END as canceled
FROM _order o
LEFT JOIN users u ON o.order_cid = u.cid
WHERE o.order_id IN (?)
`, [batchIds]);
const placeholders = orders.map(() => "(?, ?, ?, ?, ?, ?)").join(",");
const values = orders.flatMap(order => [
order.order_id, order.date, order.customer, order.customer_name, order.status, order.canceled
]);
await localConnection.query(`
INSERT INTO temp_order_meta VALUES ${placeholders}
`, values);
outputProgress({
status: "running",
operation: "Orders import",
message: `Loading order metadata: ${i + orders.length} of ${orderIds.length}`,
current: i + orders.length,
total: orderIds.length
});
}
// Get promotional discounts in batches
for (let i = 0; i < orderIds.length; i += 5000) {
const batchIds = orderIds.slice(i, i + 5000);
const [discounts] = await prodConnection.query(`
SELECT order_id, pid, SUM(amount) as discount
FROM order_discount_items FROM order_discount_items
WHERE order_id IN (?) WHERE order_id IN (?)
`, [orderNumbers]); GROUP BY order_id, pid
`, [batchIds]);
// Create a map for quick discount lookups if (discounts.length > 0) {
const discountMap = new Map(); const placeholders = discounts.map(() => "(?, ?, ?)").join(",");
promoDiscounts.forEach(d => { const values = discounts.flatMap(d => [d.order_id, d.pid, d.discount]);
const key = `${d.order_id}-${d.pid}`;
discountMap.set(key, d.amount || 0);
});
// Get tax information in a separate query await localConnection.query(`
const [taxInfo] = await prodConnection.query(` INSERT INTO temp_order_discounts VALUES ${placeholders}
SELECT oti.order_id, otip.pid, otip.item_taxes_to_collect `, values);
}
}
// Get tax information in batches
for (let i = 0; i < orderIds.length; i += 5000) {
const batchIds = orderIds.slice(i, i + 5000);
const [taxes] = await prodConnection.query(`
SELECT DISTINCT
oti.order_id,
otip.pid,
otip.item_taxes_to_collect as tax
FROM order_tax_info oti FROM order_tax_info oti
JOIN order_tax_info_products otip ON oti.taxinfo_id = otip.taxinfo_id JOIN (
WHERE oti.order_id IN (?) SELECT order_id, MAX(stamp) as max_stamp
AND (oti.order_id, oti.stamp) IN (
SELECT order_id, MAX(stamp)
FROM order_tax_info FROM order_tax_info
WHERE order_id IN (?) WHERE order_id IN (?)
GROUP BY order_id GROUP BY order_id
) ) latest ON oti.order_id = latest.order_id AND oti.stamp = latest.max_stamp
`, [orderNumbers, orderNumbers]); JOIN order_tax_info_products otip ON oti.taxinfo_id = otip.taxinfo_id
`, [batchIds]);
// Create a map for quick tax lookups if (taxes.length > 0) {
const taxMap = new Map(); // Remove any duplicates before inserting
taxInfo.forEach(t => { const uniqueTaxes = new Map();
taxes.forEach(t => {
const key = `${t.order_id}-${t.pid}`; const key = `${t.order_id}-${t.pid}`;
taxMap.set(key, t.item_taxes_to_collect || 0); uniqueTaxes.set(key, t);
}); });
// Check for missing products const values = Array.from(uniqueTaxes.values()).flatMap(t => [t.order_id, t.pid, t.tax]);
if (values.length > 0) {
const placeholders = Array(uniqueTaxes.size).fill("(?, ?, ?)").join(",");
await localConnection.query(`
INSERT INTO temp_order_taxes VALUES ${placeholders}
`, values);
}
}
}
// Now combine all the data and insert into orders table
let importedCount = 0;
// Pre-check all products at once instead of per batch
const allOrderPids = [...new Set(orderItems.map(item => item.pid))];
const [existingProducts] = await localConnection.query( const [existingProducts] = await localConnection.query(
"SELECT pid FROM products WHERE pid IN (?)", "SELECT pid FROM products WHERE pid IN (?)",
[orderPids] [allOrderPids]
); );
const existingPids = new Set(existingProducts.map(p => p.pid)); const existingPids = new Set(existingProducts.map(p => p.pid));
// Track missing products and filter orders // Process in larger batches
const validOrders = prodOrders.filter(order => { for (let i = 0; i < orderIds.length; i += 5000) {
if (!order.date) return false; const batchIds = orderIds.slice(i, i + 5000);
// Get combined data for this batch
const [orders] = await localConnection.query(`
SELECT
oi.order_id as order_number,
oi.pid,
oi.SKU,
om.date,
oi.price,
oi.quantity,
oi.base_discount + COALESCE(od.discount, 0) as discount,
COALESCE(ot.tax, 0) as tax,
0 as tax_included,
0 as shipping,
om.customer,
om.customer_name,
om.status,
om.canceled
FROM temp_order_items oi
JOIN temp_order_meta om ON oi.order_id = om.order_id
LEFT JOIN temp_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid
LEFT JOIN temp_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid
WHERE oi.order_id IN (?)
`, [batchIds]);
// Filter orders and track missing products - do this in a single pass
const validOrders = [];
const values = [];
for (const order of orders) {
if (!existingPids.has(order.pid)) { if (!existingPids.has(order.pid)) {
missingProducts.add(order.pid); missingProducts.add(order.pid);
skippedOrders.add(order.order_number); skippedOrders.add(order.order_number);
return false; continue;
}
validOrders.push(order);
values.push(...columnNames.map(col => order[col] ?? null));
} }
return true;
});
// Prepare values for insertion if (validOrders.length > 0) {
const orderValues = validOrders.map(order => { // Pre-compute the placeholders string once
const orderKey = `${order.order_number}-${order.pid}`; const singlePlaceholder = `(${columnNames.map(() => "?").join(",")})`;
const orderData = { const placeholders = Array(validOrders.length).fill(singlePlaceholder).join(",");
id: order.order_number,
order_number: order.order_number,
pid: order.pid,
SKU: order.SKU,
date: order.date,
price: order.price,
quantity: order.quantity,
discount: Number(order.base_discount || 0) + Number(discountMap.get(orderKey) || 0),
tax: Number(taxMap.get(orderKey) || 0),
tax_included: 0,
shipping: 0,
customer: order.customer,
customer_name: order.customer_name || '',
status: order.status,
canceled: order.canceled,
};
return columnNames.map(colName => orderData[colName] !== undefined ? orderData[colName] : null); await localConnection.query(`
}); INSERT INTO orders (${columnNames.join(",")})
// Execute the insert
if (orderValues.length > 0) {
const placeholders = validOrders.map(() => `(${columnNames.map(() => "?").join(", ")})`).join(",");
const insertQuery = `
INSERT INTO orders (${columnNames.join(", ")})
VALUES ${placeholders} VALUES ${placeholders}
ON DUPLICATE KEY UPDATE ON DUPLICATE KEY UPDATE
${columnNames.map(col => `${col} = VALUES(${col})`).join(", ")} ${columnNames.map(col => `${col} = VALUES(${col})`).join(",")}
`; `, values);
await localConnection.query(insertQuery, orderValues.flat());
}
importedCount += validOrders.length; importedCount += validOrders.length;
offset += batchSize; }
// Update progress every second
const now = Date.now();
if (now - lastProgressUpdate >= 1000) {
outputProgress({ outputProgress({
status: "running", status: "running",
operation: "Orders import", operation: "Orders import",
message: `Imported ${importedCount} of ${totalOrders} orders`, message: `Imported ${importedCount} of ${totalOrders} orders`,
current: importedCount, current: importedCount,
total: totalOrders, total: totalOrders,
elapsed: formatElapsedTime((now - startTime) / 1000), elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, importedCount, totalOrders), remaining: estimateRemaining(startTime, importedCount, totalOrders),
rate: calculateRate(startTime, importedCount) rate: calculateRate(startTime, importedCount)
}); });
lastProgressUpdate = now;
}
} }
// Clean up temporary tables
await localConnection.query(`
DROP TEMPORARY TABLE IF EXISTS temp_order_items;
DROP TEMPORARY TABLE IF EXISTS temp_order_meta;
DROP TEMPORARY TABLE IF EXISTS temp_order_discounts;
DROP TEMPORARY TABLE IF EXISTS temp_order_taxes;
`);
// Import missing products if any // Import missing products if any
if (missingProducts.size > 0) { if (missingProducts.size > 0) {
try {
await importMissingProducts(prodConnection, localConnection, Array.from(missingProducts)); await importMissingProducts(prodConnection, localConnection, Array.from(missingProducts));
// Retry skipped orders after importing products // Retry skipped orders after importing products
@@ -328,9 +405,13 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
message: `Successfully imported ${skippedProdOrders.length} previously skipped orders`, message: `Successfully imported ${skippedProdOrders.length} previously skipped orders`,
}); });
} }
} catch (error) {
console.warn('Warning: Failed to import missing products:', error.message);
console.warn(`Skipped ${skippedOrders.size} orders due to ${missingProducts.size} missing products`);
}
} }
// Update sync status // Update sync status - do this even if missing products import fails
await localConnection.query(` await localConnection.query(`
INSERT INTO sync_status (table_name, last_sync_timestamp) INSERT INTO sync_status (table_name, last_sync_timestamp)
VALUES ('orders', NOW()) VALUES ('orders', NOW())

View File

@@ -508,9 +508,8 @@ async function importMissingProducts(prodConnection, localConnection, missingPid
// Setup temporary tables // Setup temporary tables
await setupTemporaryTables(localConnection); await setupTemporaryTables(localConnection);
// Materialize calculations for missing products // Get inventory data from production first
await localConnection.query(` const [prodInventory] = await prodConnection.query(`
INSERT INTO temp_inventory_status
SELECT SELECT
p.pid, p.pid,
COALESCE(si.available_local, 0) - COALESCE(ps.pending_qty, 0) as stock_quantity, COALESCE(si.available_local, 0) - COALESCE(ps.pending_qty, 0) as stock_quantity,
@@ -540,6 +539,22 @@ async function importMissingProducts(prodConnection, localConnection, missingPid
WHERE p.pid IN (?) WHERE p.pid IN (?)
`, [missingPids, missingPids]); `, [missingPids, missingPids]);
// Insert inventory data into temp table
if (prodInventory.length > 0) {
const placeholders = prodInventory.map(() => "(?, ?, ?, ?, ?)").join(",");
const values = prodInventory.flatMap(p => [
p.pid,
p.stock_quantity,
p.pending_qty,
p.preorder_count,
p.notions_inv_count
]);
await localConnection.query(`
INSERT INTO temp_inventory_status VALUES ${placeholders}
`, values);
}
// First get the column names from the table structure // First get the column names from the table structure
const [columns] = await localConnection.query(` const [columns] = await localConnection.query(`
SELECT COLUMN_NAME SELECT COLUMN_NAME
@@ -560,21 +575,9 @@ async function importMissingProducts(prodConnection, localConnection, missingPid
p.date_created, p.date_created,
p.datein AS first_received, p.datein AS first_received,
p.location, p.location,
COALESCE(si.available_local, 0) - COALESCE( tis.stock_quantity,
(SELECT SUM(oi.qty_ordered - oi.qty_placed) tis.preorder_count,
FROM order_items oi tis.notions_inv_count,
JOIN _order o ON oi.order_id = o.order_id
WHERE oi.prod_pid = p.pid
AND o.date_placed != '0000-00-00 00:00:00'
AND o.date_shipped = '0000-00-00 00:00:00'
AND oi.pick_finished = 0
AND oi.qty_back = 0
AND o.order_status != 15
AND o.order_status < 90
AND oi.qty_ordered >= oi.qty_placed
AND oi.qty_ordered > 0), 0) AS stock_quantity,
ci.onpreorder AS preorder_count,
pnb.inventory AS notions_inv_count,
COALESCE(pcp.price_each, 0) as price, COALESCE(pcp.price_each, 0) as price,
COALESCE(p.sellingprice, 0) AS regular_price, COALESCE(p.sellingprice, 0) AS regular_price,
COALESCE((SELECT ROUND(AVG(costeach), 5) COALESCE((SELECT ROUND(AVG(costeach), 5)
@@ -630,8 +633,7 @@ async function importMissingProducts(prodConnection, localConnection, missingPid
pls.date_sold as date_last_sold, pls.date_sold as date_last_sold,
GROUP_CONCAT(DISTINCT CASE WHEN pc.cat_id IS NOT NULL THEN pci.cat_id END) as category_ids GROUP_CONCAT(DISTINCT CASE WHEN pc.cat_id IS NOT NULL THEN pci.cat_id END) as category_ids
FROM products p FROM products p
LEFT JOIN current_inventory ci ON p.pid = ci.pid LEFT JOIN temp_inventory_status tis ON p.pid = tis.pid
LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid
LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0 LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0
LEFT JOIN supplier_item_data sid ON p.pid = sid.pid LEFT JOIN supplier_item_data sid ON p.pid = sid.pid
LEFT JOIN suppliers s ON sid.supplier_id = s.supplierid LEFT JOIN suppliers s ON sid.supplier_id = s.supplierid

View File

@@ -29,7 +29,7 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
// Build incremental conditions // Build incremental conditions
const incrementalWhereClause = incrementalUpdate const incrementalWhereClause = incrementalUpdate
? `AND ( ? `AND (
p.stamp > ? p.date_updated > ?
OR p.date_modified > ? OR p.date_modified > ?
OR p.date_ordered > ? OR p.date_ordered > ?
OR p.date_estin > ? OR p.date_estin > ?
@@ -81,7 +81,7 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
USE INDEX (idx_date_created) USE INDEX (idx_date_created)
WHERE date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR) WHERE date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR)
AND (date_ordered > ? AND (date_ordered > ?
OR stamp > ? OR date_updated > ?
OR date_modified > ?) OR date_modified > ?)
UNION UNION
SELECT DISTINCT r.receiving_id as po_id SELECT DISTINCT r.receiving_id as po_id