Enhance import scripts with incremental update support and improved error handling

- Update import-from-prod.js to support granular incremental updates for different import types
- Modify orders.js to handle complex order data retrieval with better performance and error tracking
- Add support for incremental updates in products.js import function
- Improve logging and progress tracking for import processes
This commit is contained in:
2025-01-30 15:49:47 -05:00
parent 31d4011902
commit c433f1aae8
3 changed files with 287 additions and 268 deletions

View File

@@ -10,10 +10,10 @@ const importPurchaseOrders = require('./import/purchase-orders');
dotenv.config({ path: path.join(__dirname, "../.env") });
// Constants to control which imports run
const IMPORT_CATEGORIES = true;
const IMPORT_PRODUCTS = true;
const IMPORT_CATEGORIES = false;
const IMPORT_PRODUCTS = false;
const IMPORT_ORDERS = true;
const IMPORT_PURCHASE_ORDERS = true;
const IMPORT_PURCHASE_ORDERS = false;
// Add flag for incremental updates
const INCREMENTAL_UPDATE = process.env.INCREMENTAL_UPDATE === 'true';
@@ -156,7 +156,7 @@ async function main() {
}
if (IMPORT_PRODUCTS) {
results.products = await importProducts(prodConnection, localConnection);
results.products = await importProducts(prodConnection, localConnection, INCREMENTAL_UPDATE);
if (isImportCancelled) throw new Error("Import cancelled");
completedSteps++;
if (results.products.recordsAdded) totalRecordsAdded += results.products.recordsAdded;
@@ -164,7 +164,7 @@ async function main() {
}
if (IMPORT_ORDERS) {
results.orders = await importOrders(prodConnection, localConnection);
results.orders = await importOrders(prodConnection, localConnection, INCREMENTAL_UPDATE);
if (isImportCancelled) throw new Error("Import cancelled");
completedSteps++;
if (results.orders.recordsAdded) totalRecordsAdded += results.orders.recordsAdded;
@@ -172,7 +172,7 @@ async function main() {
}
if (IMPORT_PURCHASE_ORDERS) {
results.purchaseOrders = await importPurchaseOrders(prodConnection, localConnection);
results.purchaseOrders = await importPurchaseOrders(prodConnection, localConnection, INCREMENTAL_UPDATE);
if (isImportCancelled) throw new Error("Import cancelled");
completedSteps++;
if (results.purchaseOrders.recordsAdded) totalRecordsAdded += results.purchaseOrders.recordsAdded;

View File

@@ -19,317 +19,334 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
const missingProducts = new Set();
try {
// Get the last sync time
const [syncInfo] = await localConnection.query(
"SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'orders'"
);
const lastSyncTime = syncInfo?.[0]?.last_sync_timestamp || '1970-01-01';
// Retrieve column names for the 'orders' table, skip 'id' since it's auto-increment
// Get column names from the local table
const [columns] = await localConnection.query(`
SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = 'orders'
ORDER BY ORDINAL_POSITION
`);
const columnNames = columns
.map(col => col.COLUMN_NAME)
.filter(name => name !== "id");
const columnNames = columns.map(col => col.COLUMN_NAME);
// Build query clauses for incremental vs. full update
const incrementalWhereClause = incrementalUpdate
? `AND (
o.stamp > ?
OR o.date_modified > ?
OR o.date_placed > ?
OR o.date_shipped > ?
OR oi.stamp > ?
)`
: "";
const incrementalParams = incrementalUpdate
? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime]
: [];
// Count how many orders we need to process
const [countResult] = await prodConnection.query(
`
SELECT COUNT(*) AS total
FROM order_items oi USE INDEX (PRIMARY)
JOIN _order o USE INDEX (PRIMARY)
ON oi.order_id = o.order_id
WHERE o.order_status >= 15
AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR)
${incrementalWhereClause}
`,
incrementalParams
// Get last sync info
const [syncInfo] = await localConnection.query(
"SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'orders'"
);
const lastSyncTime = syncInfo?.[0]?.last_sync_timestamp || '1970-01-01';
const total = countResult[0].total;
outputProgress({
operation: `Starting ${incrementalUpdate ? 'incremental' : 'full'} orders import - Fetching ${total} orders`,
status: "running",
// Count the total number of orders to be imported
const [countResults] = await prodConnection.query(`
SELECT
COUNT(DISTINCT oi.order_id, oi.prod_pid) as total_all,
SUM(CASE
WHEN o.stamp > ? OR o.date_placed > ? OR o.date_shipped > ? OR oi.stamp > ?
THEN 1 ELSE 0
END) as total_incremental
FROM order_items oi
JOIN _order o ON oi.order_id = o.order_id
WHERE o.order_status >= 15
AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR)
AND o.date_placed_onlydate IS NOT NULL
`, [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime]);
console.log('Count details:', {
total_all: countResults[0].total_all,
total_incremental: countResults[0].total_incremental,
lastSyncTime,
incrementalUpdate
});
let processed = 0;
// Increase or decrease this if you find a more optimal size
const batchSize = 20000;
let offset = 0;
const totalOrders = incrementalUpdate ? countResults[0].total_incremental : countResults[0].total_all;
// Process in batches for memory efficiency
while (offset < total) {
// Fetch orders (initially with tax set to 0, to be updated later)
const [orders] = await prodConnection.query(
`
SELECT
oi.order_id AS order_number,
oi.prod_pid AS pid,
oi.prod_itemnumber AS SKU,
o.date_placed_onlydate AS date,
oi.prod_price_reg AS price,
oi.qty_ordered AS quantity,
(oi.prod_price_reg - oi.prod_price) AS discount,
0 AS tax,
0 AS tax_included,
ROUND(
(
(o.summary_shipping - COALESCE(o.summary_discount_shipping, 0))
* (oi.prod_price * oi.qty_ordered)
) / NULLIF(o.summary_subtotal, 0),
2
) AS shipping,
o.order_cid AS customer,
CONCAT(o.bill_firstname, ' ', o.bill_lastname) AS customer_name,
'pending' AS status,
CASE WHEN o.order_status = 15 THEN 1 ELSE 0 END AS canceled
FROM order_items oi
FORCE INDEX (PRIMARY)
JOIN _order o
ON oi.order_id = o.order_id
WHERE o.order_status >= 15
AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR)
${incrementalWhereClause}
LIMIT ? OFFSET ?
`,
[...incrementalParams, batchSize, offset]
outputProgress({
status: "running",
operation: "Orders import",
message: `Starting ${incrementalUpdate ? 'incremental' : 'full'} import of ${totalOrders} orders`,
current: 0,
total: totalOrders
});
// Fetch orders in batches
const batchSize = 5000;
let offset = 0;
let importedCount = 0;
let lastProgressUpdate = Date.now();
while (offset < totalOrders) {
// First get the base order data
const [prodOrders] = await prodConnection.query(`
SELECT
oi.order_id as order_number,
oi.prod_pid as pid,
oi.prod_itemnumber as SKU,
o.date_placed_onlydate as date,
oi.prod_price as price,
oi.qty_ordered as quantity,
COALESCE(oi.prod_price_reg - oi.prod_price, 0) * oi.qty_ordered as base_discount,
o.order_cid as customer,
CONCAT(COALESCE(u.firstname, ''), ' ', COALESCE(u.lastname, '')) as customer_name,
o.order_status as status,
CASE WHEN o.date_cancelled != '0000-00-00 00:00:00' THEN 1 ELSE 0 END as canceled
FROM order_items oi
JOIN _order o ON oi.order_id = o.order_id
LEFT JOIN users u ON o.order_cid = u.cid
WHERE o.order_status >= 15
AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR)
AND o.date_placed_onlydate IS NOT NULL
${incrementalUpdate ? `
AND (
o.stamp > ?
OR o.date_placed > ?
OR o.date_shipped > ?
OR oi.stamp > ?
)
` : ''}
ORDER BY oi.order_id, oi.prod_pid
LIMIT ? OFFSET ?
`, incrementalUpdate ?
[lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, batchSize, offset] :
[batchSize, offset]
);
// Fetch the latest tax info for these orders
if (orders.length > 0) {
const orderIds = [...new Set(orders.map(o => o.order_number))];
const [taxInfo] = await prodConnection.query(`
SELECT oti.order_id, otp.pid, otp.item_taxes_to_collect
FROM (
SELECT order_id, MAX(stamp) AS latest_stamp
if (prodOrders.length === 0) break;
// Get order numbers for this batch
const orderNumbers = [...new Set(prodOrders.map(o => o.order_number))];
const orderPids = prodOrders.map(o => o.pid);
// Get promotional discounts in a separate query
const [promoDiscounts] = await prodConnection.query(`
SELECT order_id, pid, amount
FROM order_discount_items
WHERE order_id IN (?)
`, [orderNumbers]);
// Create a map for quick discount lookups
const discountMap = new Map();
promoDiscounts.forEach(d => {
const key = `${d.order_id}-${d.pid}`;
discountMap.set(key, d.amount || 0);
});
// Get tax information in a separate query
const [taxInfo] = await prodConnection.query(`
SELECT oti.order_id, otip.pid, otip.item_taxes_to_collect
FROM order_tax_info oti
JOIN order_tax_info_products otip ON oti.taxinfo_id = otip.taxinfo_id
WHERE oti.order_id IN (?)
AND (oti.order_id, oti.stamp) IN (
SELECT order_id, MAX(stamp)
FROM order_tax_info
WHERE order_id IN (?)
GROUP BY order_id
) latest
JOIN order_tax_info oti
ON oti.order_id = latest.order_id
AND oti.stamp = latest.latest_stamp
JOIN order_tax_info_products otp
ON oti.taxinfo_id = otp.taxinfo_id
`, [orderIds]);
)
`, [orderNumbers, orderNumbers]);
// Map (order_id-pid) -> tax amount
const taxMap = new Map();
taxInfo.forEach(t => {
taxMap.set(`${t.order_id}-${t.pid}`, t.item_taxes_to_collect);
});
// Create a map for quick tax lookups
const taxMap = new Map();
taxInfo.forEach(t => {
const key = `${t.order_id}-${t.pid}`;
taxMap.set(key, t.item_taxes_to_collect || 0);
});
// Merge tax into the orders array
orders.forEach(order => {
const key = `${order.order_number}-${order.pid}`;
if (taxMap.has(key)) {
order.tax = taxMap.get(key) || 0;
}
});
}
// Check local DB for existing products to ensure we don't insert orders for missing products
const orderProductPids = [...new Set(orders.map(o => o.pid))];
// Check for missing products
const [existingProducts] = await localConnection.query(
"SELECT pid FROM products WHERE pid IN (?)",
[orderProductPids]
[orderPids]
);
const existingPids = new Set(existingProducts.map(p => p.pid));
// Separate valid orders from those referencing missing products
const validOrders = [];
for (const order of orders) {
// Track missing products and filter orders
const validOrders = prodOrders.filter(order => {
if (!order.date) return false;
if (!existingPids.has(order.pid)) {
missingProducts.add(order.pid);
skippedOrders.add(order.order_number);
} else {
validOrders.push(order);
return false;
}
}
return true;
});
// Bulk insert valid orders
if (validOrders.length > 0) {
const placeholders = validOrders
.map(() => `(${Array(columnNames.length).fill("?").join(",")})`)
.join(",");
const updateClauses = columnNames
.filter(col => col !== "order_number") // don't overwrite primary key
.map(col => `${col} = VALUES(${col})`)
.join(",");
// Prepare values for insertion
const orderValues = validOrders.map(order => {
const orderKey = `${order.order_number}-${order.pid}`;
const orderData = {
id: order.order_number,
order_number: order.order_number,
pid: order.pid,
SKU: order.SKU,
date: order.date,
price: order.price,
quantity: order.quantity,
discount: Number(order.base_discount || 0) + Number(discountMap.get(orderKey) || 0),
tax: Number(taxMap.get(orderKey) || 0),
tax_included: 0,
shipping: 0,
customer: order.customer,
customer_name: order.customer_name || '',
status: order.status,
canceled: order.canceled,
};
const upsertQuery = `
INSERT INTO orders (${columnNames.join(",")})
return columnNames.map(colName => orderData[colName] !== undefined ? orderData[colName] : null);
});
// Execute the insert
if (orderValues.length > 0) {
const placeholders = validOrders.map(() => `(${columnNames.map(() => "?").join(", ")})`).join(",");
const insertQuery = `
INSERT INTO orders (${columnNames.join(", ")})
VALUES ${placeholders}
ON DUPLICATE KEY UPDATE ${updateClauses}
ON DUPLICATE KEY UPDATE
${columnNames.map(col => `${col} = VALUES(${col})`).join(", ")}
`;
await localConnection.query(
upsertQuery,
validOrders.flatMap(order => columnNames.map(col => order[col]))
);
await localConnection.query(insertQuery, orderValues.flat());
}
processed += orders.length;
importedCount += validOrders.length;
offset += batchSize;
outputProgress({
status: "running",
operation: "Orders import",
current: processed,
total,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, processed, total),
rate: calculateRate(startTime, processed)
});
}
// If we found missing products, import them and retry the skipped orders
if (missingProducts.size > 0) {
outputProgress({
operation: `Found ${missingProducts.size} missing products, importing them now`,
status: "running",
});
// Import missing products
await importMissingProducts(prodConnection, localConnection, [...missingProducts]);
// Retry orders that were skipped due to missing products
if (skippedOrders.size > 0) {
// Update progress every second
const now = Date.now();
if (now - lastProgressUpdate >= 1000) {
outputProgress({
operation: `Retrying ${skippedOrders.size} skipped orders`,
status: "running",
operation: "Orders import",
message: `Imported ${importedCount} of ${totalOrders} orders`,
current: importedCount,
total: totalOrders,
elapsed: formatElapsedTime((now - startTime) / 1000),
remaining: estimateRemaining(startTime, importedCount, totalOrders),
rate: calculateRate(startTime, importedCount)
});
const [retryOrders] = await prodConnection.query(`
SELECT
oi.order_id AS order_number,
oi.prod_pid AS pid,
oi.prod_itemnumber AS SKU,
o.date_placed_onlydate AS date,
oi.prod_price_reg AS price,
oi.qty_ordered AS quantity,
(oi.prod_price_reg - oi.prod_price) AS discount,
0 AS tax,
0 AS tax_included,
ROUND(
(
(o.summary_shipping - COALESCE(o.summary_discount_shipping, 0))
* (oi.prod_price * oi.qty_ordered)
) / NULLIF(o.summary_subtotal, 0),
2
) AS shipping,
o.order_cid AS customer,
CONCAT(o.bill_firstname, ' ', o.bill_lastname) AS customer_name,
'pending' AS status,
CASE WHEN o.order_status = 15 THEN 1 ELSE 0 END AS canceled
FROM order_items oi
JOIN _order o ON oi.order_id = o.order_id
WHERE oi.order_id IN (?)
`, [[...skippedOrders]]);
if (retryOrders.length > 0) {
// Fetch tax data for these specific retry orders
const retryOrderIds = [...new Set(retryOrders.map(o => o.order_number))];
const [retryTaxInfo] = await prodConnection.query(`
SELECT oti.order_id, otp.pid, otp.item_taxes_to_collect
FROM (
SELECT order_id, MAX(stamp) AS latest_stamp
FROM order_tax_info
WHERE order_id IN (?)
GROUP BY order_id
) latest
JOIN order_tax_info oti
ON oti.order_id = latest.order_id
AND oti.stamp = latest.latest_stamp
JOIN order_tax_info_products otp
ON oti.taxinfo_id = otp.taxinfo_id
`, [retryOrderIds]);
const taxMap = new Map();
retryTaxInfo.forEach(t => {
taxMap.set(`${t.order_id}-${t.pid}`, t.item_taxes_to_collect);
});
retryOrders.forEach(order => {
const key = `${order.order_number}-${order.pid}`;
if (taxMap.has(key)) {
order.tax = taxMap.get(key) || 0;
}
});
const placeholders = retryOrders
.map(() => `(${Array(columnNames.length).fill("?").join(",")})`)
.join(",");
const updateClauses = columnNames
.filter(col => col !== "order_number")
.map(col => `${col} = VALUES(${col})`)
.join(",");
const upsertQuery = `
INSERT INTO orders (${columnNames.join(",")})
VALUES ${placeholders}
ON DUPLICATE KEY UPDATE ${updateClauses}
`;
await localConnection.query(
upsertQuery,
retryOrders.flatMap(order => columnNames.map(col => order[col]))
);
}
lastProgressUpdate = now;
}
}
// Update the sync timestamp
// Import missing products if any
if (missingProducts.size > 0) {
await importMissingProducts(prodConnection, localConnection, Array.from(missingProducts));
// Retry skipped orders after importing products
if (skippedOrders.size > 0) {
outputProgress({
status: "running",
operation: "Orders import",
message: `Retrying import of ${skippedOrders.size} orders with previously missing products`
});
const [skippedProdOrders] = await prodConnection.query(`
SELECT
o.order_id,
CASE
WHEN o.date_placed = '0000-00-00 00:00:00' OR o.date_placed IS NULL THEN o.stamp
ELSE o.date_placed
END as date,
o.order_cid,
o.bill_firstname,
o.bill_lastname,
o.order_email,
o.order_status,
o.date_shipped,
o.date_cancelled,
oi.prod_pid,
oi.prod_itemnumber,
oi.prod_price,
oi.qty_ordered,
oi.qty_back,
oi.qty_placed,
oi.qty_placed_2,
oi.discounted,
oi.summary_cogs,
oi.summary_profit,
oi.summary_orderdate,
oi.summary_paiddate,
oi.date_added,
oi.stamp
FROM order_items oi
JOIN _order o ON oi.order_id = o.order_id
WHERE o.order_id IN (?)
`, [Array.from(skippedOrders)]);
// Prepare values for insertion
const skippedOrderValues = skippedProdOrders.flatMap(order => {
if (!order.date) {
console.log(`Warning: Skipped order ${order.order_id} has null date:`, JSON.stringify(order, null, 2));
return [];
}
const canceled = order.date_cancelled !== '0000-00-00 00:00:00' ? 1 : 0;
const customerName = `${order.bill_firstname} ${order.bill_lastname}`;
// Create an object with keys based on column names
const orderData = {
id: order.order_id,
order_number: order.order_id,
pid: order.prod_pid,
SKU: order.prod_itemnumber,
date: order.date ? (
order.date instanceof Date ?
order.date.toJSON()?.slice(0,10) || null :
(typeof order.date === 'string' ? order.date.split(' ')[0] : null)
) : null,
price: order.prod_price,
quantity: order.qty_ordered,
discount: order.discounted,
tax: 0, // Placeholder, will be calculated later
tax_included: 0, // Placeholder, will be calculated later
shipping: 0, // Placeholder, will be calculated later
customer: order.order_email,
customer_name: customerName,
status: order.order_status,
canceled: canceled,
};
// Map column names to values, handling missing columns
return [columnNames.map(colName => orderData[colName] !== undefined ? orderData[colName] : null)];
});
// Construct the insert query dynamically
const skippedPlaceholders = skippedProdOrders.map(() => `(${columnNames.map(() => "?").join(", ")})`).join(",");
const skippedInsertQuery = `
INSERT INTO orders (${columnNames.join(", ")})
VALUES ${skippedPlaceholders}
ON DUPLICATE KEY UPDATE
${columnNames.map(col => `${col} = VALUES(${col})`).join(", ")}
`;
// Execute the insert query
if (skippedOrderValues.length > 0) {
await localConnection.query(skippedInsertQuery, skippedOrderValues.flat());
}
importedCount += skippedProdOrders.length;
outputProgress({
status: "running",
operation: "Orders import",
message: `Successfully imported ${skippedProdOrders.length} previously skipped orders`,
});
}
}
// Update sync status
await localConnection.query(`
INSERT INTO sync_status (table_name, last_sync_timestamp)
VALUES ('orders', NOW())
ON DUPLICATE KEY UPDATE
last_sync_timestamp = NOW(),
last_sync_id = LAST_INSERT_ID(last_sync_id)
ON DUPLICATE KEY UPDATE last_sync_timestamp = NOW()
`);
const endTime = Date.now();
outputProgress({
status: "complete",
operation: `${incrementalUpdate ? 'Incremental' : 'Full'} orders import completed`,
current: total,
total,
duration: formatElapsedTime((endTime - startTime) / 1000),
});
return {
status: "complete",
totalImported: total,
totalImported: importedCount,
totalSkipped: skippedOrders.size,
missingProducts: missingProducts.size,
retriedOrders: skippedOrders.size,
incrementalUpdate,
lastSyncTime
};
} catch (error) {
outputProgress({
operation: `${incrementalUpdate ? 'Incremental' : 'Full'} orders import failed`,
status: "error",
error: error.message,
});
console.error("Error during orders import:", error);
throw error;
}
}

View File

@@ -198,7 +198,7 @@ async function materializeCalculations(prodConnection, localConnection) {
});
}
async function importProducts(prodConnection, localConnection) {
async function importProducts(prodConnection, localConnection, incrementalUpdate = true) {
const startTime = Date.now();
try {
@@ -332,12 +332,14 @@ async function importProducts(prodConnection, localConnection) {
LEFT JOIN product_categories pc3 ON p.subline = pc3.cat_id
LEFT JOIN product_categories pc4 ON p.artist = pc4.cat_id
LEFT JOIN product_last_sold pls ON p.pid = pls.pid
WHERE p.stamp > ?
OR pls.date_sold > ?
OR p.date_created > ?
OR p.datein > ?
${incrementalUpdate ? `
WHERE p.stamp > ?
OR pls.date_sold > ?
OR p.date_created > ?
OR p.datein > ?
` : ''}
GROUP BY p.pid
`, [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime]);
`, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []);
// Insert production data in batches
for (let i = 0; i < prodData.length; i += 1000) {