Improve import scripts with enhanced incremental update tracking and performance

- Add record tracking for added and updated records in import scripts
- Modify products import to use a dynamic 'needs_update' flag for selective updates
- Enhance order import with more comprehensive timestamp checks
- Update import-from-prod.js to handle and clean up previously running imports
- Improve error handling and connection management in import processes
This commit is contained in:
2025-01-31 01:39:48 -05:00
parent 1be97d6610
commit 5e4d1c3bd8
4 changed files with 72 additions and 25 deletions

View File

@@ -16,7 +16,7 @@ const IMPORT_ORDERS = true;
const IMPORT_PURCHASE_ORDERS = true; const IMPORT_PURCHASE_ORDERS = true;
// Add flag for incremental updates // Add flag for incremental updates
const INCREMENTAL_UPDATE = process.env.INCREMENTAL_UPDATE === 'true'; const INCREMENTAL_UPDATE = process.env.INCREMENTAL_UPDATE !== 'false'; // Default to true unless explicitly set to false
// SSH configuration // SSH configuration
// In import-from-prod.js // In import-from-prod.js
@@ -103,6 +103,17 @@ async function main() {
if (isImportCancelled) throw new Error("Import cancelled"); if (isImportCancelled) throw new Error("Import cancelled");
// Clean up any previously running imports that weren't completed
await localConnection.query(`
UPDATE import_history
SET
status = 'cancelled',
end_time = NOW(),
duration_seconds = TIMESTAMPDIFF(SECOND, start_time, NOW()),
error_message = 'Previous import was not completed properly'
WHERE status = 'running'
`);
// Initialize sync_status table if it doesn't exist // Initialize sync_status table if it doesn't exist
await localConnection.query(` await localConnection.query(`
CREATE TABLE IF NOT EXISTS sync_status ( CREATE TABLE IF NOT EXISTS sync_status (
@@ -240,8 +251,8 @@ async function main() {
const totalElapsedSeconds = Math.round((endTime - startTime) / 1000); const totalElapsedSeconds = Math.round((endTime - startTime) / 1000);
// Update import history with error // Update import history with error
if (importHistoryId) { if (importHistoryId && connections?.localConnection) {
await connections?.localConnection?.query(` await connections.localConnection.query(`
UPDATE import_history UPDATE import_history
SET SET
end_time = NOW(), end_time = NOW(),

View File

@@ -17,6 +17,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
const startTime = Date.now(); const startTime = Date.now();
const skippedOrders = new Set(); const skippedOrders = new Set();
const missingProducts = new Set(); const missingProducts = new Set();
let recordsAdded = 0;
let recordsUpdated = 0;
try { try {
// Get column names from the local table // Get column names from the local table
@@ -88,12 +90,14 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
${incrementalUpdate ? ` ${incrementalUpdate ? `
AND ( AND (
o.stamp > ? o.stamp > ?
OR oi.stamp > ?
OR o.date_placed > ? OR o.date_placed > ?
OR o.date_shipped > ? OR o.date_shipped > ?
OR oi.stamp > ? OR o.date_cancelled > ?
OR o.date_updated > ?
) )
` : ''} ` : ''}
`, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []); `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []);
const totalOrders = orderItems.length; const totalOrders = orderItems.length;
let processed = 0; let processed = 0;
@@ -271,12 +275,16 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
const singlePlaceholder = `(${columnNames.map(() => "?").join(",")})`; const singlePlaceholder = `(${columnNames.map(() => "?").join(",")})`;
const placeholders = Array(validOrders.length).fill(singlePlaceholder).join(","); const placeholders = Array(validOrders.length).fill(singlePlaceholder).join(",");
await localConnection.query(` const query = `
INSERT INTO orders (${columnNames.join(",")}) INSERT INTO orders (${columnNames.join(",")})
VALUES ${placeholders} VALUES ${placeholders}
ON DUPLICATE KEY UPDATE ON DUPLICATE KEY UPDATE
${columnNames.map(col => `${col} = VALUES(${col})`).join(",")} ${columnNames.map(col => `${col} = VALUES(${col})`).join(",")}
`, values); `;
const result = await localConnection.query(query, values.flat());
recordsAdded += result.affectedRows - result.changedRows;
recordsUpdated += result.changedRows;
importedCount += validOrders.length; importedCount += validOrders.length;
} }
@@ -422,6 +430,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
return { return {
status: "complete", status: "complete",
totalImported: importedCount, totalImported: importedCount,
recordsAdded,
recordsUpdated,
totalSkipped: skippedOrders.size, totalSkipped: skippedOrders.size,
missingProducts: missingProducts.size, missingProducts: missingProducts.size,
incrementalUpdate, incrementalUpdate,

View File

@@ -279,6 +279,7 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
country_of_origin VARCHAR(5), country_of_origin VARCHAR(5),
date_last_sold DATE, date_last_sold DATE,
category_ids TEXT, category_ids TEXT,
needs_update BOOLEAN DEFAULT FALSE,
PRIMARY KEY (pid) PRIMARY KEY (pid)
) ENGINE=InnoDB ) ENGINE=InnoDB
`); `);
@@ -321,7 +322,19 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
p.totalsold AS total_sold, p.totalsold AS total_sold,
p.country_of_origin, p.country_of_origin,
pls.date_sold as date_last_sold, pls.date_sold as date_last_sold,
GROUP_CONCAT(DISTINCT pci.cat_id) as category_ids GROUP_CONCAT(DISTINCT pci.cat_id) as category_ids,
CASE WHEN
${incrementalUpdate ? `
p.stamp > ? OR
ci.stamp > ? OR
pcp.date_deactive > ? OR
pcp.date_active > ? OR
sid.stamp > ? OR
pnb.date_updated > ? OR
pls.date_sold > ? OR
si.stamp > ?
` : 'TRUE'}
THEN 1 ELSE 0 END as needs_update
FROM products p FROM products p
LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0 LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0
LEFT JOIN supplier_item_data sid ON p.pid = sid.pid LEFT JOIN supplier_item_data sid ON p.pid = sid.pid
@@ -332,16 +345,13 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
LEFT JOIN product_categories pc3 ON p.subline = pc3.cat_id LEFT JOIN product_categories pc3 ON p.subline = pc3.cat_id
LEFT JOIN product_categories pc4 ON p.artist = pc4.cat_id LEFT JOIN product_categories pc4 ON p.artist = pc4.cat_id
LEFT JOIN product_last_sold pls ON p.pid = pls.pid LEFT JOIN product_last_sold pls ON p.pid = pls.pid
${incrementalUpdate ? ` LEFT JOIN current_inventory ci ON p.pid = ci.pid
WHERE p.stamp > ? LEFT JOIN product_current_prices pcp ON p.pid = pcp.pid AND pcp.active = 1
OR pls.date_sold > ? LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid
OR p.date_created > ?
OR p.datein > ?
` : ''}
GROUP BY p.pid GROUP BY p.pid
`, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []); `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []);
// Insert production data in batches // Insert production data in batches, but only for products that need updates
for (let i = 0; i < prodData.length; i += 1000) { for (let i = 0; i < prodData.length; i += 1000) {
const batch = prodData.slice(i, i + 1000); const batch = prodData.slice(i, i + 1000);
const placeholders = batch.map(() => "(?)").join(","); const placeholders = batch.map(() => "(?)").join(",");
@@ -359,9 +369,11 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
}); });
} }
// Now join with local temp tables and process in batches // Now join with local temp tables and process in batches, but only for products that need updates
const BATCH_SIZE = 2500; const BATCH_SIZE = 2500;
let processed = 0; let processed = 0;
let recordsAdded = 0;
let recordsUpdated = 0;
while (processed < totalProducts) { while (processed < totalProducts) {
const [batch] = await localConnection.query(` const [batch] = await localConnection.query(`
@@ -376,6 +388,7 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
FROM temp_prod_data p FROM temp_prod_data p
LEFT JOIN temp_inventory_status tis ON p.pid = tis.pid LEFT JOIN temp_inventory_status tis ON p.pid = tis.pid
LEFT JOIN temp_product_prices tpp ON p.pid = tpp.pid LEFT JOIN temp_product_prices tpp ON p.pid = tpp.pid
WHERE p.needs_update = 1
LIMIT ? OFFSET ? LIMIT ? OFFSET ?
`, [BATCH_SIZE, processed]); `, [BATCH_SIZE, processed]);
@@ -412,7 +425,9 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
.join(",")}; .join(",")};
`; `;
await localConnection.query(insertQuery, productValues); const result = await localConnection.query(insertQuery, productValues);
recordsAdded += result.affectedRows - result.changedRows;
recordsUpdated += result.changedRows;
// Insert category relationships // Insert category relationships
const categoryRelationships = []; const categoryRelationships = [];
@@ -495,6 +510,8 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
return { return {
status: "complete", status: "complete",
totalImported: totalProducts, totalImported: totalProducts,
recordsAdded,
recordsUpdated,
incrementalUpdate: true, incrementalUpdate: true,
lastSyncTime lastSyncTime
}; };
@@ -682,7 +699,9 @@ async function importMissingProducts(prodConnection, localConnection, missingPid
.join(",")} .join(",")}
`; `;
await localConnection.query(query, productValues); const result = await localConnection.query(query, productValues);
recordsAdded += result.affectedRows - result.changedRows;
recordsUpdated += result.changedRows;
// Verify products were inserted before proceeding with categories // Verify products were inserted before proceeding with categories
const [insertedProducts] = await localConnection.query( const [insertedProducts] = await localConnection.query(
@@ -738,7 +757,11 @@ async function importMissingProducts(prodConnection, localConnection, missingPid
return { return {
status: "complete", status: "complete",
totalImported: products.length totalImported: products.length,
recordsAdded,
recordsUpdated,
incrementalUpdate: true,
lastSyncTime
}; };
} catch (error) { } catch (error) {
throw error; throw error;

View File

@@ -2,6 +2,8 @@ const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } =
async function importPurchaseOrders(prodConnection, localConnection, incrementalUpdate = true) { async function importPurchaseOrders(prodConnection, localConnection, incrementalUpdate = true) {
const startTime = Date.now(); const startTime = Date.now();
let recordsAdded = 0;
let recordsUpdated = 0;
try { try {
// Get last sync info // Get last sync info
@@ -29,16 +31,19 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
// Build incremental conditions // Build incremental conditions
const incrementalWhereClause = incrementalUpdate const incrementalWhereClause = incrementalUpdate
? `AND ( ? `AND (
p.date_updated > ? p.stamp > ?
OR p.date_updated > ?
OR p.date_ordered > ? OR p.date_ordered > ?
OR p.date_estin > ? OR p.date_estin > ?
OR r.stamp > ? OR r.date_updated > ?
OR r.date_created > ?
OR r.date_checked > ?
OR rp.stamp > ? OR rp.stamp > ?
OR rp.received_date > ? OR rp.received_date > ?
)` )`
: ""; : "";
const incrementalParams = incrementalUpdate const incrementalParams = incrementalUpdate
? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime]
: []; : [];
// First get all relevant PO IDs with basic info // First get all relevant PO IDs with basic info
@@ -98,8 +103,6 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
const totalItems = total; const totalItems = total;
let processed = 0; let processed = 0;
let recordsAdded = 0;
let recordsUpdated = 0;
const BATCH_SIZE = 5000; const BATCH_SIZE = 5000;
const PROGRESS_INTERVAL = 500; const PROGRESS_INTERVAL = 500;