Fixes/improvements for import scripts

This commit is contained in:
2025-03-24 22:27:44 -04:00
parent 75da2c6772
commit 87d4b9e804
5 changed files with 190 additions and 166 deletions

View File

@@ -10,9 +10,9 @@ const importPurchaseOrders = require('./import/purchase-orders');
dotenv.config({ path: path.join(__dirname, "../.env") });
// Constants to control which imports run
const IMPORT_CATEGORIES = false;
const IMPORT_PRODUCTS = false;
const IMPORT_ORDERS = false;
const IMPORT_CATEGORIES = true;
const IMPORT_PRODUCTS = true;
const IMPORT_ORDERS = true;
const IMPORT_PURCHASE_ORDERS = true;
// Add flag for incremental updates
@@ -120,27 +120,38 @@ async function main() {
`);
// Create import history record for the overall session
const [historyResult] = await localConnection.query(`
INSERT INTO import_history (
table_name,
start_time,
is_incremental,
status,
additional_info
) VALUES (
'all_tables',
NOW(),
$1::boolean,
'running',
jsonb_build_object(
'categories_enabled', $2::boolean,
'products_enabled', $3::boolean,
'orders_enabled', $4::boolean,
'purchase_orders_enabled', $5::boolean
)
) RETURNING id
`, [INCREMENTAL_UPDATE, IMPORT_CATEGORIES, IMPORT_PRODUCTS, IMPORT_ORDERS, IMPORT_PURCHASE_ORDERS]);
importHistoryId = historyResult.rows[0].id;
try {
const [historyResult] = await localConnection.query(`
INSERT INTO import_history (
table_name,
start_time,
is_incremental,
status,
additional_info
) VALUES (
'all_tables',
NOW(),
$1::boolean,
'running',
jsonb_build_object(
'categories_enabled', $2::boolean,
'products_enabled', $3::boolean,
'orders_enabled', $4::boolean,
'purchase_orders_enabled', $5::boolean
)
) RETURNING id
`, [INCREMENTAL_UPDATE, IMPORT_CATEGORIES, IMPORT_PRODUCTS, IMPORT_ORDERS, IMPORT_PURCHASE_ORDERS]);
importHistoryId = historyResult.rows[0].id;
} catch (error) {
console.error("Error creating import history record:", error);
outputProgress({
status: "error",
operation: "Import process",
message: "Failed to create import history record",
error: error.message
});
throw error;
}
const results = {
categories: null,

View File

@@ -47,42 +47,18 @@ async function importCategories(prodConnection, localConnection) {
continue;
}
console.log(`\nProcessing ${categories.length} type ${type} categories`);
if (type === 10) {
console.log("Type 10 categories:", JSON.stringify(categories, null, 2));
}
console.log(`Processing ${categories.length} type ${type} categories`);
// For types that can have parents (11, 21, 12, 13), verify parent existence
// For types that can have parents (11, 21, 12, 13), we'll proceed directly
// No need to check for parent existence since we process in hierarchical order
let categoriesToInsert = categories;
if (![10, 20].includes(type)) {
// Get all parent IDs
const parentIds = [
...new Set(
categories
.filter(c => c && c.parent_id !== null)
.map(c => c.parent_id)
),
];
console.log(`Processing ${categories.length} type ${type} categories with ${parentIds.length} unique parent IDs`);
console.log('Parent IDs:', parentIds);
// No need to check for parent existence - we trust they exist since they were just inserted
categoriesToInsert = categories;
}
if (categoriesToInsert.length === 0) {
console.log(
`No valid categories of type ${type} to insert`
);
console.log(`No valid categories of type ${type} to insert`);
await localConnection.query(`RELEASE SAVEPOINT category_type_${type}`);
continue;
}
console.log(
`Inserting ${categoriesToInsert.length} type ${type} categories`
);
// PostgreSQL upsert query with parameterized values
const values = categoriesToInsert.flatMap((cat) => [
cat.cat_id,
@@ -95,14 +71,10 @@ async function importCategories(prodConnection, localConnection) {
new Date()
]);
console.log('Attempting to insert/update with values:', JSON.stringify(values, null, 2));
const placeholders = categoriesToInsert
.map((_, i) => `($${i * 8 + 1}, $${i * 8 + 2}, $${i * 8 + 3}, $${i * 8 + 4}, $${i * 8 + 5}, $${i * 8 + 6}, $${i * 8 + 7}, $${i * 8 + 8})`)
.join(',');
console.log('Using placeholders:', placeholders);
// Insert categories with ON CONFLICT clause for PostgreSQL
const query = `
WITH inserted_categories AS (
@@ -129,17 +101,14 @@ async function importCategories(prodConnection, localConnection) {
COUNT(*) FILTER (WHERE is_insert) as inserted,
COUNT(*) FILTER (WHERE NOT is_insert) as updated
FROM inserted_categories`;
console.log('Executing query:', query);
const result = await localConnection.query(query, values);
console.log('Query result:', result);
// Get the first result since query returns an array
const queryResult = Array.isArray(result) ? result[0] : result;
if (!queryResult || !queryResult.rows || !queryResult.rows[0]) {
console.error('Query failed to return results. Result:', queryResult);
console.error('Query failed to return results');
throw new Error('Query did not return expected results');
}

View File

@@ -26,6 +26,9 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
let cumulativeProcessedOrders = 0;
try {
// Begin transaction
await localConnection.beginTransaction();
// Get last sync info
const [syncInfo] = await localConnection.query(
"SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'orders'"
@@ -38,7 +41,6 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
const [[{ total }]] = await prodConnection.query(`
SELECT COUNT(*) as total
FROM order_items oi
USE INDEX (PRIMARY)
JOIN _order o ON oi.order_id = o.order_id
WHERE o.order_status >= 15
AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR)
@@ -78,7 +80,6 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
COALESCE(oi.prod_price_reg - oi.prod_price, 0) as base_discount,
oi.stamp as last_modified
FROM order_items oi
USE INDEX (PRIMARY)
JOIN _order o ON oi.order_id = o.order_id
WHERE o.order_status >= 15
AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR)
@@ -105,15 +106,15 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
console.log('Orders: Found', orderItems.length, 'order items to process');
// Create tables in PostgreSQL for debugging
// Create tables in PostgreSQL for data processing
await localConnection.query(`
DROP TABLE IF EXISTS debug_order_items;
DROP TABLE IF EXISTS debug_order_meta;
DROP TABLE IF EXISTS debug_order_discounts;
DROP TABLE IF EXISTS debug_order_taxes;
DROP TABLE IF EXISTS debug_order_costs;
DROP TABLE IF EXISTS temp_order_items;
DROP TABLE IF EXISTS temp_order_meta;
DROP TABLE IF EXISTS temp_order_discounts;
DROP TABLE IF EXISTS temp_order_taxes;
DROP TABLE IF EXISTS temp_order_costs;
CREATE TABLE debug_order_items (
CREATE TEMP TABLE temp_order_items (
order_id INTEGER NOT NULL,
pid INTEGER NOT NULL,
SKU VARCHAR(50) NOT NULL,
@@ -123,7 +124,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
PRIMARY KEY (order_id, pid)
);
CREATE TABLE debug_order_meta (
CREATE TEMP TABLE temp_order_meta (
order_id INTEGER NOT NULL,
date DATE NOT NULL,
customer VARCHAR(100) NOT NULL,
@@ -135,26 +136,29 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
PRIMARY KEY (order_id)
);
CREATE TABLE debug_order_discounts (
CREATE TEMP TABLE temp_order_discounts (
order_id INTEGER NOT NULL,
pid INTEGER NOT NULL,
discount DECIMAL(10,2) NOT NULL,
PRIMARY KEY (order_id, pid)
);
CREATE TABLE debug_order_taxes (
CREATE TEMP TABLE temp_order_taxes (
order_id INTEGER NOT NULL,
pid INTEGER NOT NULL,
tax DECIMAL(10,2) NOT NULL,
PRIMARY KEY (order_id, pid)
);
CREATE TABLE debug_order_costs (
CREATE TEMP TABLE temp_order_costs (
order_id INTEGER NOT NULL,
pid INTEGER NOT NULL,
costeach DECIMAL(10,3) DEFAULT 0.000,
PRIMARY KEY (order_id, pid)
);
CREATE INDEX idx_temp_order_items_pid ON temp_order_items(pid);
CREATE INDEX idx_temp_order_meta_order_id ON temp_order_meta(order_id);
`);
// Insert order items in batches
@@ -168,7 +172,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
]);
await localConnection.query(`
INSERT INTO debug_order_items (order_id, pid, SKU, price, quantity, base_discount)
INSERT INTO temp_order_items (order_id, pid, SKU, price, quantity, base_discount)
VALUES ${placeholders}
ON CONFLICT (order_id, pid) DO UPDATE SET
SKU = EXCLUDED.SKU,
@@ -239,7 +243,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
]);
await localConnection.query(`
INSERT INTO debug_order_meta (
INSERT INTO temp_order_meta (
order_id, date, customer, customer_name, status, canceled,
summary_discount, summary_subtotal
)
@@ -281,7 +285,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
]);
await localConnection.query(`
INSERT INTO debug_order_discounts (order_id, pid, discount)
INSERT INTO temp_order_discounts (order_id, pid, discount)
VALUES ${placeholders}
ON CONFLICT (order_id, pid) DO UPDATE SET
discount = EXCLUDED.discount
@@ -321,7 +325,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
]);
await localConnection.query(`
INSERT INTO debug_order_taxes (order_id, pid, tax)
INSERT INTO temp_order_taxes (order_id, pid, tax)
VALUES ${placeholders}
ON CONFLICT (order_id, pid) DO UPDATE SET
tax = EXCLUDED.tax
@@ -366,7 +370,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
]);
await localConnection.query(`
INSERT INTO debug_order_costs (order_id, pid, costeach)
INSERT INTO temp_order_costs (order_id, pid, costeach)
VALUES ${placeholders}
ON CONFLICT (order_id, pid) DO UPDATE SET
costeach = EXCLUDED.costeach
@@ -426,9 +430,9 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
SUM(COALESCE(od.discount, 0)) as promo_discount,
COALESCE(ot.tax, 0) as total_tax,
COALESCE(oi.price * 0.5, 0) as costeach
FROM debug_order_items oi
LEFT JOIN debug_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid
LEFT JOIN debug_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid
FROM temp_order_items oi
LEFT JOIN temp_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid
LEFT JOIN temp_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid
GROUP BY oi.order_id, oi.pid, ot.tax
)
SELECT
@@ -456,11 +460,11 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
FROM (
SELECT DISTINCT ON (order_id, pid)
order_id, pid, SKU, price, quantity, base_discount
FROM debug_order_items
FROM temp_order_items
WHERE order_id = ANY($1)
ORDER BY order_id, pid
) oi
JOIN debug_order_meta om ON oi.order_id = om.order_id
JOIN temp_order_meta om ON oi.order_id = om.order_id
LEFT JOIN order_totals ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid
ORDER BY oi.order_id, oi.pid
`, [subBatchIds]);
@@ -564,6 +568,18 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
ON CONFLICT (table_name) DO UPDATE SET
last_sync_timestamp = NOW()
`);
// Cleanup temporary tables
await localConnection.query(`
DROP TABLE IF EXISTS temp_order_items;
DROP TABLE IF EXISTS temp_order_meta;
DROP TABLE IF EXISTS temp_order_discounts;
DROP TABLE IF EXISTS temp_order_taxes;
DROP TABLE IF EXISTS temp_order_costs;
`);
// Commit transaction
await localConnection.commit();
return {
status: "complete",
@@ -577,6 +593,14 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
};
} catch (error) {
console.error("Error during orders import:", error);
// Rollback transaction
try {
await localConnection.rollback();
} catch (rollbackError) {
console.error("Error during rollback:", rollbackError);
}
throw error;
}
}

View File

@@ -2,9 +2,12 @@ const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } =
const BATCH_SIZE = 100; // Smaller batch size for better progress tracking
const MAX_RETRIES = 3;
const RETRY_DELAY = 5000; // 5 seconds
const dotenv = require("dotenv");
const path = require("path");
dotenv.config({ path: path.join(__dirname, "../../.env") });
// Utility functions
const imageUrlBase = 'https://sbing.com/i/products/0000/';
const imageUrlBase = process.env.PRODUCT_IMAGE_URL_BASE || 'https://sbing.com/i/products/0000/';
const getImageUrls = (pid, iid = 1) => {
const paddedPid = pid.toString().padStart(6, '0');
// Use padded PID only for the first 3 digits
@@ -18,7 +21,7 @@ const getImageUrls = (pid, iid = 1) => {
};
};
// Add helper function for retrying operations
// Add helper function for retrying operations with exponential backoff
async function withRetry(operation, errorMessage) {
let lastError;
for (let attempt = 1; attempt <= MAX_RETRIES; attempt++) {
@@ -28,7 +31,8 @@ async function withRetry(operation, errorMessage) {
lastError = error;
console.error(`${errorMessage} (Attempt ${attempt}/${MAX_RETRIES}):`, error);
if (attempt < MAX_RETRIES) {
await new Promise(resolve => setTimeout(resolve, RETRY_DELAY));
const backoffTime = RETRY_DELAY * Math.pow(2, attempt - 1);
await new Promise(resolve => setTimeout(resolve, backoffTime));
}
}
}
@@ -772,32 +776,44 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
recordsAdded += parseInt(result.rows[0].inserted, 10) || 0;
recordsUpdated += parseInt(result.rows[0].updated, 10) || 0;
// Process category relationships for each product in the batch
// Process category relationships in batches
const allCategories = [];
for (const row of batch) {
if (row.categories) {
const categoryIds = row.categories.split(',').filter(id => id && id.trim());
if (categoryIds.length > 0) {
const catPlaceholders = categoryIds.map((_, idx) =>
`($${idx * 2 + 1}, $${idx * 2 + 2})`
).join(',');
const catValues = categoryIds.flatMap(catId => [row.pid, parseInt(catId.trim(), 10)]);
// First delete existing relationships for this product
await localConnection.query(
'DELETE FROM product_categories WHERE pid = $1',
[row.pid]
);
// Then insert the new relationships
await localConnection.query(`
INSERT INTO product_categories (pid, cat_id)
VALUES ${catPlaceholders}
ON CONFLICT (pid, cat_id) DO NOTHING
`, catValues);
categoryIds.forEach(catId => {
allCategories.push([row.pid, parseInt(catId.trim(), 10)]);
});
}
}
}
// If we have categories to process
if (allCategories.length > 0) {
// First get all products in this batch
const productIds = batch.map(p => p.pid);
// Delete all existing relationships for products in this batch
await localConnection.query(
'DELETE FROM product_categories WHERE pid = ANY($1)',
[productIds]
);
// Insert all new relationships in one batch
const catPlaceholders = allCategories.map((_, idx) =>
`($${idx * 2 + 1}, $${idx * 2 + 2})`
).join(',');
const catValues = allCategories.flat();
await localConnection.query(`
INSERT INTO product_categories (pid, cat_id)
VALUES ${catPlaceholders}
ON CONFLICT (pid, cat_id) DO NOTHING
`, catValues);
}
outputProgress({
status: "running",
operation: "Products import",

View File

@@ -6,6 +6,9 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
let recordsUpdated = 0;
try {
// Begin transaction for the entire import process
await localConnection.beginTransaction();
// Get last sync info
const [syncInfo] = await localConnection.query(
"SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'purchase_orders'"
@@ -39,7 +42,6 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
FROM (
SELECT DISTINCT pop.po_id, pop.pid
FROM po p
USE INDEX (idx_date_created)
JOIN po_products pop ON p.po_id = pop.po_id
JOIN suppliers s ON p.supplier_id = s.supplierid
WHERE p.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR)
@@ -59,6 +61,7 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
console.log('Fetching purchase orders in batches...');
const FETCH_BATCH_SIZE = 5000;
const INSERT_BATCH_SIZE = 200; // Process 200 records at a time for inserts
let offset = 0;
let allProcessed = false;
let totalProcessed = 0;
@@ -101,64 +104,62 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
console.log(`Processing batch of ${poList.length} purchase order items (${offset}-${offset + poList.length})`);
let processed = 0;
// Process each PO in a separate insert to avoid parameter issues
for (let i = 0; i < poList.length; i++) {
const po = poList[i];
// Process in smaller batches for inserts
for (let i = 0; i < poList.length; i += INSERT_BATCH_SIZE) {
const batch = poList.slice(i, Math.min(i + INSERT_BATCH_SIZE, poList.length));
try {
// Single row insert
await localConnection.query(`
INSERT INTO temp_purchase_orders (
po_id, pid, sku, name, vendor, date, expected_date,
status, notes, ordered, cost_price
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (po_id, pid) DO UPDATE SET
sku = EXCLUDED.sku,
name = EXCLUDED.name,
vendor = EXCLUDED.vendor,
date = EXCLUDED.date,
expected_date = EXCLUDED.expected_date,
status = EXCLUDED.status,
notes = EXCLUDED.notes,
ordered = EXCLUDED.ordered,
cost_price = EXCLUDED.cost_price
`, [
po.po_id,
po.pid,
po.sku,
po.name,
po.vendor,
po.date,
po.expected_date,
po.status,
po.notes,
po.ordered,
po.cost_price
]);
processed++;
totalProcessed++;
// Only log occasionally
if (processed % 500 === 0 || processed === 1 || processed === poList.length) {
outputProgress({
status: "running",
operation: "Purchase orders import",
message: `Batch ${Math.floor(offset/FETCH_BATCH_SIZE) + 1}: ${processed}/${poList.length} (Total: ${totalProcessed}/${total})`,
current: totalProcessed,
total: total,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, totalProcessed, total),
rate: calculateRate(startTime, totalProcessed)
});
}
} catch (error) {
console.error(`Error inserting PO #${po.po_id} product #${po.pid}:`, error.message);
console.log('PO data:', po);
}
// Create parameterized query with placeholders
const placeholders = batch.map((_, idx) => {
const base = idx * 11; // 11 columns
return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, $${base + 10}, $${base + 11})`;
}).join(',');
// Create flattened values array
const values = batch.flatMap(po => [
po.po_id,
po.pid,
po.sku,
po.name,
po.vendor,
po.date,
po.expected_date,
po.status,
po.notes,
po.ordered,
po.cost_price
]);
// Execute batch insert
await localConnection.query(`
INSERT INTO temp_purchase_orders (
po_id, pid, sku, name, vendor, date, expected_date,
status, notes, ordered, cost_price
)
VALUES ${placeholders}
ON CONFLICT (po_id, pid) DO UPDATE SET
sku = EXCLUDED.sku,
name = EXCLUDED.name,
vendor = EXCLUDED.vendor,
date = EXCLUDED.date,
expected_date = EXCLUDED.expected_date,
status = EXCLUDED.status,
notes = EXCLUDED.notes,
ordered = EXCLUDED.ordered,
cost_price = EXCLUDED.cost_price
`, values);
totalProcessed += batch.length;
outputProgress({
status: "running",
operation: "Purchase orders import",
message: `Processed ${totalProcessed}/${total} purchase order items`,
current: totalProcessed,
total: total,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, totalProcessed, total),
rate: calculateRate(startTime, totalProcessed)
});
}
// Update offset for next batch
@@ -220,6 +221,9 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
// Clean up temporary tables
await localConnection.query(`DROP TABLE IF EXISTS temp_purchase_orders;`);
// Commit transaction
await localConnection.commit();
return {
status: "complete",
@@ -230,11 +234,11 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
} catch (error) {
console.error("Error during purchase orders import:", error);
// Attempt cleanup on error
// Rollback transaction
try {
await localConnection.query(`DROP TABLE IF EXISTS temp_purchase_orders;`);
} catch (cleanupError) {
console.error('Error during cleanup:', cleanupError.message);
await localConnection.rollback();
} catch (rollbackError) {
console.error('Error during rollback:', rollbackError.message);
}
return {