Import script incremental fixes

This commit is contained in:
2025-01-31 11:12:38 -05:00
parent 996d3d36af
commit a867117c3c
4 changed files with 84 additions and 40 deletions

View File

@@ -161,32 +161,32 @@ async function main() {
results.categories = await importCategories(prodConnection, localConnection); results.categories = await importCategories(prodConnection, localConnection);
if (isImportCancelled) throw new Error("Import cancelled"); if (isImportCancelled) throw new Error("Import cancelled");
completedSteps++; completedSteps++;
if (results.categories?.recordsAdded) totalRecordsAdded += results.categories.recordsAdded; totalRecordsAdded += results.categories?.recordsAdded || 0;
if (results.categories?.recordsUpdated) totalRecordsUpdated += results.categories.recordsUpdated; totalRecordsUpdated += results.categories?.recordsUpdated || 0;
} }
if (IMPORT_PRODUCTS) { if (IMPORT_PRODUCTS) {
results.products = await importProducts(prodConnection, localConnection, INCREMENTAL_UPDATE); results.products = await importProducts(prodConnection, localConnection, INCREMENTAL_UPDATE);
if (isImportCancelled) throw new Error("Import cancelled"); if (isImportCancelled) throw new Error("Import cancelled");
completedSteps++; completedSteps++;
if (results.products?.recordsAdded) totalRecordsAdded += results.products.recordsAdded; totalRecordsAdded += results.products?.recordsAdded || 0;
if (results.products?.recordsUpdated) totalRecordsUpdated += results.products.recordsUpdated; totalRecordsUpdated += results.products?.recordsUpdated || 0;
} }
if (IMPORT_ORDERS) { if (IMPORT_ORDERS) {
results.orders = await importOrders(prodConnection, localConnection, INCREMENTAL_UPDATE); results.orders = await importOrders(prodConnection, localConnection, INCREMENTAL_UPDATE);
if (isImportCancelled) throw new Error("Import cancelled"); if (isImportCancelled) throw new Error("Import cancelled");
completedSteps++; completedSteps++;
if (results.orders?.recordsAdded) totalRecordsAdded += results.orders.recordsAdded; totalRecordsAdded += results.orders?.recordsAdded || 0;
if (results.orders?.recordsUpdated) totalRecordsUpdated += results.orders.recordsUpdated; totalRecordsUpdated += results.orders?.recordsUpdated || 0;
} }
if (IMPORT_PURCHASE_ORDERS) { if (IMPORT_PURCHASE_ORDERS) {
results.purchaseOrders = await importPurchaseOrders(prodConnection, localConnection, INCREMENTAL_UPDATE); results.purchaseOrders = await importPurchaseOrders(prodConnection, localConnection, INCREMENTAL_UPDATE);
if (isImportCancelled) throw new Error("Import cancelled"); if (isImportCancelled) throw new Error("Import cancelled");
completedSteps++; completedSteps++;
if (results.purchaseOrders?.recordsAdded) totalRecordsAdded += results.purchaseOrders.recordsAdded; totalRecordsAdded += results.purchaseOrders?.recordsAdded || 0;
if (results.purchaseOrders?.recordsUpdated) totalRecordsUpdated += results.purchaseOrders.recordsUpdated; totalRecordsUpdated += results.purchaseOrders?.recordsUpdated || 0;
} }
const endTime = Date.now(); const endTime = Date.now();

View File

@@ -432,8 +432,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
return { return {
status: "complete", status: "complete",
totalImported: importedCount, totalImported: importedCount,
recordsAdded, recordsAdded: recordsAdded || 0,
recordsUpdated, recordsUpdated: recordsUpdated || 0,
totalSkipped: skippedOrders.size, totalSkipped: skippedOrders.size,
missingProducts: missingProducts.size, missingProducts: missingProducts.size,
incrementalUpdate, incrementalUpdate,

View File

@@ -357,11 +357,46 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
// Insert production data in batches, but only for products that need updates // Insert production data in batches, but only for products that need updates
for (let i = 0; i < prodData.length; i += 1000) { for (let i = 0; i < prodData.length; i += 1000) {
const batch = prodData.slice(i, i + 1000); const batch = prodData.slice(i, i + 1000);
const placeholders = batch.map(() => "(?)").join(","); const placeholders = batch.map(() => `(${Array(31).fill("?").join(",")})`).join(",");
// Map each row to exactly match our temp table columns
const values = batch.flatMap(row => [
row.pid,
row.title,
row.description,
row.SKU,
row.date_created,
row.first_received,
row.location,
row.barcode,
row.harmonized_tariff_code,
row.updated_at,
row.visible,
row.replenishable,
row.vendor,
row.vendor_reference,
row.notions_reference,
row.brand,
row.line,
row.subline,
row.artist,
row.moq,
row.rating,
row.reviews,
row.weight,
row.length,
row.width,
row.height,
row.total_sold,
row.country_of_origin,
row.date_last_sold,
row.category_ids,
true // needs_update
]);
await localConnection.query(` await localConnection.query(`
INSERT INTO temp_prod_data VALUES ${placeholders} INSERT INTO temp_prod_data VALUES ${placeholders}
`, batch.map(row => Object.values(row))); `, values);
outputProgress({ outputProgress({
status: "running", status: "running",
@@ -378,7 +413,12 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
let recordsAdded = 0; let recordsAdded = 0;
let recordsUpdated = 0; let recordsUpdated = 0;
while (processed < totalProducts) { // Get actual count from temp table
const [[{ actualTotal }]] = await localConnection.query(
"SELECT COUNT(*) as actualTotal FROM temp_prod_data WHERE needs_update = 1"
);
while (processed < actualTotal) {
const [batch] = await localConnection.query(` const [batch] = await localConnection.query(`
SELECT SELECT
p.*, p.*,
@@ -394,6 +434,8 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
WHERE p.needs_update = 1 WHERE p.needs_update = 1
LIMIT ? OFFSET ? LIMIT ? OFFSET ?
`, [BATCH_SIZE, processed]); `, [BATCH_SIZE, processed]);
if (!batch || batch.length === 0) break; // Exit if no more records
// Add image URLs // Add image URLs
batch.forEach(row => { batch.forEach(row => {
@@ -413,24 +455,25 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
}) })
); );
// MySQL 8.0 optimized insert if (productValues.length > 0) {
const placeholderGroup = `(${Array(columnNames.length).fill("?").join(",")})`; // MySQL 8.0 optimized insert with proper placeholders
const productPlaceholders = Array(batch.length).fill(placeholderGroup).join(","); const placeholderGroup = `(${Array(columnNames.length).fill("?").join(",")})`;
const productPlaceholders = Array(batch.length).fill(placeholderGroup).join(",");
const insertQuery = `
INSERT INTO products (${columnNames.join(",")}) const insertQuery = `
VALUES ${productPlaceholders} INSERT INTO products (${columnNames.join(",")})
AS new_products VALUES ${productPlaceholders}
ON DUPLICATE KEY UPDATE ON DUPLICATE KEY UPDATE
${columnNames ${columnNames
.filter(col => col !== "pid") .filter(col => col !== "pid")
.map(col => `${col} = new_products.${col}`) .map(col => `${col} = VALUES(${col})`)
.join(",")}; .join(",")};
`; `;
const result = await localConnection.query(insertQuery, productValues); const result = await localConnection.query(insertQuery, productValues);
recordsAdded += result.affectedRows - result.changedRows; recordsAdded += result.affectedRows - result.changedRows;
recordsUpdated += result.changedRows; recordsUpdated += result.changedRows;
}
// Insert category relationships // Insert category relationships
const categoryRelationships = []; const categoryRelationships = [];
@@ -482,15 +525,16 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
} }
} }
processed += batch.length; processed += batch.length; // Only increment by actual records processed
outputProgress({ outputProgress({
status: "running", status: "running",
operation: "Products import", operation: "Products import",
message: `Processed ${processed} of ${totalProducts} products`, message: `Processed ${processed} of ${actualTotal} products`,
current: processed, current: processed,
total: totalProducts, total: actualTotal,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000), elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, processed, totalProducts), remaining: estimateRemaining(startTime, processed, actualTotal),
rate: calculateRate(startTime, processed) rate: calculateRate(startTime, processed)
}); });
@@ -512,10 +556,10 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
return { return {
status: "complete", status: "complete",
totalImported: totalProducts, totalImported: actualTotal,
recordsAdded, recordsAdded: recordsAdded || 0,
recordsUpdated, recordsUpdated: recordsUpdated || 0,
incrementalUpdate: true, incrementalUpdate,
lastSyncTime lastSyncTime
}; };
} catch (error) { } catch (error) {

View File

@@ -364,8 +364,8 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
return { return {
status: "complete", status: "complete",
totalImported: totalItems, totalImported: totalItems,
recordsAdded, recordsAdded: recordsAdded || 0,
recordsUpdated, recordsUpdated: recordsUpdated || 0,
incrementalUpdate, incrementalUpdate,
lastSyncTime lastSyncTime
}; };