diff --git a/inventory-server/scripts/import/orders.js b/inventory-server/scripts/import/orders.js index bb80a44..2d069e5 100644 --- a/inventory-server/scripts/import/orders.js +++ b/inventory-server/scripts/import/orders.js @@ -268,17 +268,25 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = `, [batchIds]); if (discounts.length > 0) { - const placeholders = discounts.map((_, idx) => - `($${idx * 3 + 1}, $${idx * 3 + 2}, $${idx * 3 + 3})` - ).join(","); - const values = discounts.flatMap(d => [d.order_id, d.pid, d.discount]); + const uniqueDiscounts = new Map(); + discounts.forEach(d => { + const key = `${d.order_id}-${d.pid}`; + uniqueDiscounts.set(key, d); + }); - await localConnection.query(` - INSERT INTO temp_order_discounts (order_id, pid, discount) - VALUES ${placeholders} - ON CONFLICT (order_id, pid) DO UPDATE SET - discount = EXCLUDED.discount - `, values); + const values = Array.from(uniqueDiscounts.values()).flatMap(d => [d.order_id, d.pid, d.discount || 0]); + if (values.length > 0) { + const placeholders = Array.from({length: uniqueDiscounts.size}, (_, idx) => { + const base = idx * 3; + return `($${base + 1}, $${base + 2}, $${base + 3})`; + }).join(","); + await localConnection.query(` + INSERT INTO temp_order_discounts (order_id, pid, discount) + VALUES ${placeholders} + ON CONFLICT (order_id, pid) DO UPDATE SET + discount = EXCLUDED.discount + `, values); + } } } @@ -404,7 +412,6 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = // Filter orders and track missing products const validOrders = []; - const values = []; const processedOrderItems = new Set(); const processedOrders = new Set(); @@ -425,7 +432,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, $${base + 10}, $${base + 11}, $${base + 12}, $${base + 13}, $${base + 14}, $${base + 15})`; }).join(','); - const values = validOrders.flatMap(o => [ + const batchValues = validOrders.flatMap(o => [ o.order_number, o.pid, o.SKU, @@ -471,7 +478,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate = COUNT(*) FILTER (WHERE xmax = 0) as inserted, COUNT(*) FILTER (WHERE xmax <> 0) as updated FROM inserted_orders - `, values); + `, batchValues); const { inserted, updated } = result.rows[0]; recordsAdded += inserted; diff --git a/inventory-server/scripts/import/products.js b/inventory-server/scripts/import/products.js index 4a62a33..5c475c7 100644 --- a/inventory-server/scripts/import/products.js +++ b/inventory-server/scripts/import/products.js @@ -35,17 +35,28 @@ async function withRetry(operation, errorMessage) { throw lastError; } -async function setupTemporaryTables(connection) { - await connection.query(` - DROP TABLE IF EXISTS temp_products; +// Add helper function at the top of the file +function validateDate(mysqlDate) { + if (!mysqlDate || mysqlDate === '0000-00-00' || mysqlDate === '0000-00-00 00:00:00') { + return null; + } + // Check if the date is valid + const date = new Date(mysqlDate); + return isNaN(date.getTime()) ? null : mysqlDate; +} +async function setupTemporaryTables(connection) { + // Drop the table if it exists + await connection.query('DROP TABLE IF EXISTS temp_products'); + + // Create the temporary table + await connection.query(` CREATE TEMP TABLE temp_products ( pid BIGINT NOT NULL, title VARCHAR(255), description TEXT, SKU VARCHAR(50), stock_quantity INTEGER DEFAULT 0, - pending_qty INTEGER DEFAULT 0, preorder_count INTEGER DEFAULT 0, notions_inv_count INTEGER DEFAULT 0, price DECIMAL(10,3) NOT NULL DEFAULT 0, @@ -58,7 +69,7 @@ async function setupTemporaryTables(connection) { line VARCHAR(100), subline VARCHAR(100), artist VARCHAR(100), - category_ids TEXT, + categories TEXT, created_at TIMESTAMP, first_received TIMESTAMP, landing_cost_price DECIMAL(10,3), @@ -66,9 +77,11 @@ async function setupTemporaryTables(connection) { harmonized_tariff_code VARCHAR(50), updated_at TIMESTAMP, visible BOOLEAN, + managing_stock BOOLEAN DEFAULT true, replenishable BOOLEAN, permalink VARCHAR(255), - moq DECIMAL(10,3), + moq INTEGER DEFAULT 1, + uom INTEGER DEFAULT 1, rating DECIMAL(10,2), reviews INTEGER, weight DECIMAL(10,3), @@ -81,12 +94,17 @@ async function setupTemporaryTables(connection) { baskets INTEGER, notifies INTEGER, date_last_sold TIMESTAMP, + image VARCHAR(255), + image_175 VARCHAR(255), + image_full VARCHAR(255), + options TEXT, + tags TEXT, needs_update BOOLEAN DEFAULT TRUE, PRIMARY KEY (pid) - ); + )`); - CREATE INDEX idx_temp_products_needs_update ON temp_products (needs_update); - `); + // Create the index + await connection.query('CREATE INDEX idx_temp_products_needs_update ON temp_products (needs_update)'); } async function cleanupTemporaryTables(connection) { @@ -205,64 +223,73 @@ async function importMissingProducts(prodConnection, localConnection, missingPid const batch = prodData.slice(i, i + BATCH_SIZE); const placeholders = batch.map((_, idx) => { - const base = idx * 41; // 41 columns - return `(${Array.from({ length: 41 }, (_, i) => `$${base + i + 1}`).join(', ')})`; + const base = idx * 47; // 47 columns + return `(${Array.from({ length: 47 }, (_, i) => `$${base + i + 1}`).join(', ')})`; }).join(','); - const values = batch.flatMap(row => [ - row.pid, - row.title, - row.description, - row.SKU, - row.stock_quantity, - row.pending_qty, - row.preorder_count, - row.notions_inv_count, - row.price, - row.regular_price, - row.cost_price, - row.vendor, - row.vendor_reference, - row.notions_reference, - row.brand, - row.line, - row.subline, - row.artist, - row.category_ids, - row.date_created, - row.first_received, - row.landing_cost_price, - row.barcode, - row.harmonized_tariff_code, - row.updated_at, - row.visible, - row.replenishable, - row.permalink, - row.moq, - row.rating, - row.reviews, - row.weight, - row.length, - row.width, - row.height, - row.country_of_origin, - row.location, - row.total_sold, - row.baskets, - row.notifies, - row.date_last_sold - ]); + const values = batch.flatMap(row => { + const imageUrls = getImageUrls(row.pid); + return [ + row.pid, + row.title, + row.description, + row.itemnumber || '', + row.stock_quantity > 5000 ? 0 : Math.max(0, row.stock_quantity), + row.preorder_count, + row.notions_inv_count, + row.price, + row.regular_price, + row.cost_price, + row.vendor, + row.vendor_reference, + row.notions_reference, + row.brand, + row.line, + row.subline, + row.artist, + row.category_ids, + validateDate(row.date_created), + validateDate(row.first_received), + row.landing_cost_price, + row.barcode, + row.harmonized_tariff_code, + validateDate(row.updated_at), + row.visible, + true, + row.replenishable, + row.permalink, + Math.max(1, Math.round(row.moq || 1)), + 1, + row.rating, + row.reviews, + row.weight, + row.length, + row.width, + row.height, + row.country_of_origin, + row.location, + row.total_sold, + row.baskets, + row.notifies, + validateDate(row.date_last_sold), + imageUrls.image, + imageUrls.image_175, + imageUrls.image_full, + null, + null + ]; + }); const [result] = await localConnection.query(` WITH inserted_products AS ( INSERT INTO products ( - pid, title, description, SKU, stock_quantity, pending_qty, preorder_count, - notions_inv_count, price, regular_price, cost_price, vendor, vendor_reference, - notions_reference, brand, line, subline, artist, category_ids, created_at, - first_received, landing_cost_price, barcode, harmonized_tariff_code, - updated_at, visible, replenishable, permalink, moq, rating, reviews, + pid, title, description, SKU, stock_quantity, preorder_count, notions_inv_count, + price, regular_price, cost_price, vendor, vendor_reference, notions_reference, + brand, line, subline, artist, categories, created_at, first_received, + landing_cost_price, barcode, harmonized_tariff_code, updated_at, visible, + managing_stock, replenishable, permalink, moq, uom, rating, reviews, weight, length, width, height, country_of_origin, location, total_sold, - baskets, notifies, date_last_sold + baskets, notifies, date_last_sold, image, image_175, image_full, options, tags ) VALUES ${placeholders} ON CONFLICT (pid) DO NOTHING @@ -285,7 +312,7 @@ async function importMissingProducts(prodConnection, localConnection, missingPid } } -async function materializeCalculations(prodConnection, localConnection, incrementalUpdate = true, lastSyncTime = '1970-01-01') { +async function materializeCalculations(prodConnection, localConnection, incrementalUpdate = true, lastSyncTime = '1970-01-01', startTime = Date.now()) { outputProgress({ status: "running", operation: "Products import", @@ -315,36 +342,8 @@ async function materializeCalculations(prodConnection, localConnection, incremen ) THEN 0 ELSE 1 END AS replenishable, - COALESCE(si.available_local, 0) - COALESCE( - (SELECT SUM(oi.qty_ordered - oi.qty_placed) - FROM order_items oi - JOIN _order o ON oi.order_id = o.order_id - WHERE oi.prod_pid = p.pid - AND o.date_placed != '0000-00-00 00:00:00' - AND o.date_shipped = '0000-00-00 00:00:00' - AND oi.pick_finished = 0 - AND oi.qty_back = 0 - AND o.order_status != 15 - AND o.order_status < 90 - AND oi.qty_ordered >= oi.qty_placed - AND oi.qty_ordered > 0 - ), 0 - ) as stock_quantity, - COALESCE( - (SELECT SUM(oi.qty_ordered - oi.qty_placed) - FROM order_items oi - JOIN _order o ON oi.order_id = o.order_id - WHERE oi.prod_pid = p.pid - AND o.date_placed != '0000-00-00 00:00:00' - AND o.date_shipped = '0000-00-00 00:00:00' - AND oi.pick_finished = 0 - AND oi.qty_back = 0 - AND o.order_status != 15 - AND o.order_status < 90 - AND oi.qty_ordered >= oi.qty_placed - AND oi.qty_ordered > 0 - ), 0 - ) as pending_qty, + COALESCE(si.available_local, 0) as stock_quantity, + 0 as pending_qty, COALESCE(ci.onpreorder, 0) as preorder_count, COALESCE(pnb.inventory, 0) as notions_inv_count, COALESCE(pcp.price_each, 0) as price, @@ -423,71 +422,78 @@ async function materializeCalculations(prodConnection, localConnection, incremen await withRetry(async () => { const placeholders = batch.map((_, idx) => { - const offset = idx * 41; // 41 columns - return `(${Array.from({ length: 41 }, (_, i) => `$${offset + i + 1}`).join(', ')})`; + const base = idx * 47; // 47 columns + return `(${Array.from({ length: 47 }, (_, i) => `$${base + i + 1}`).join(', ')})`; }).join(','); - const values = batch.flatMap(row => [ - row.pid, - row.title, - row.description, - row.SKU, - // Set stock quantity to 0 if it's over 5000 - row.stock_quantity > 5000 ? 0 : Math.max(0, row.stock_quantity), - row.pending_qty, - row.preorder_count, - row.notions_inv_count, - row.price, - row.regular_price, - row.cost_price, - row.vendor, - row.vendor_reference, - row.notions_reference, - row.brand, - row.line, - row.subline, - row.artist, - row.category_ids, - row.date_created, // map to created_at - row.first_received, - row.landing_cost_price, - row.barcode, - row.harmonized_tariff_code, - row.updated_at, - row.visible, - row.replenishable, - row.permalink, - row.moq, - row.rating ? Number(row.rating).toFixed(2) : null, - row.reviews, - row.weight, - row.length, - row.width, - row.height, - row.country_of_origin, - row.location, - row.total_sold, - row.baskets, - row.notifies, - row.date_last_sold - ]); + const values = batch.flatMap(row => { + const imageUrls = getImageUrls(row.pid); + return [ + row.pid, + row.title, + row.description, + row.itemnumber || '', + row.stock_quantity > 5000 ? 0 : Math.max(0, row.stock_quantity), + row.preorder_count, + row.notions_inv_count, + row.price, + row.regular_price, + row.cost_price, + row.vendor, + row.vendor_reference, + row.notions_reference, + row.brand, + row.line, + row.subline, + row.artist, + row.category_ids, + validateDate(row.date_created), + validateDate(row.first_received), + row.landing_cost_price, + row.barcode, + row.harmonized_tariff_code, + validateDate(row.updated_at), + row.visible, + true, + row.replenishable, + row.permalink, + Math.max(1, Math.round(row.moq || 1)), + 1, + row.rating, + row.reviews, + row.weight, + row.length, + row.width, + row.height, + row.country_of_origin, + row.location, + row.total_sold, + row.baskets, + row.notifies, + validateDate(row.date_last_sold), + imageUrls.image, + imageUrls.image_175, + imageUrls.image_full, + null, + null + ]; + }); await localConnection.query(` INSERT INTO temp_products ( - pid, title, description, SKU, stock_quantity, pending_qty, preorder_count, - notions_inv_count, price, regular_price, cost_price, vendor, vendor_reference, - notions_reference, brand, line, subline, artist, category_ids, created_at, - first_received, landing_cost_price, barcode, harmonized_tariff_code, - updated_at, visible, replenishable, permalink, moq, rating, reviews, + pid, title, description, SKU, stock_quantity, preorder_count, notions_inv_count, + price, regular_price, cost_price, vendor, vendor_reference, notions_reference, + brand, line, subline, artist, categories, created_at, first_received, + landing_cost_price, barcode, harmonized_tariff_code, updated_at, visible, + managing_stock, replenishable, permalink, moq, uom, rating, reviews, weight, length, width, height, country_of_origin, location, total_sold, - baskets, notifies, date_last_sold + baskets, notifies, date_last_sold, image, image_175, image_full, options, tags ) VALUES ${placeholders} ON CONFLICT (pid) DO UPDATE SET title = EXCLUDED.title, description = EXCLUDED.description, SKU = EXCLUDED.SKU, stock_quantity = EXCLUDED.stock_quantity, - pending_qty = EXCLUDED.pending_qty, preorder_count = EXCLUDED.preorder_count, notions_inv_count = EXCLUDED.notions_inv_count, price = EXCLUDED.price, @@ -500,7 +506,6 @@ async function materializeCalculations(prodConnection, localConnection, incremen line = EXCLUDED.line, subline = EXCLUDED.subline, artist = EXCLUDED.artist, - category_ids = EXCLUDED.category_ids, created_at = EXCLUDED.created_at, first_received = EXCLUDED.first_received, landing_cost_price = EXCLUDED.landing_cost_price, @@ -508,9 +513,11 @@ async function materializeCalculations(prodConnection, localConnection, incremen harmonized_tariff_code = EXCLUDED.harmonized_tariff_code, updated_at = EXCLUDED.updated_at, visible = EXCLUDED.visible, + managing_stock = EXCLUDED.managing_stock, replenishable = EXCLUDED.replenishable, permalink = EXCLUDED.permalink, moq = EXCLUDED.moq, + uom = EXCLUDED.uom, rating = EXCLUDED.rating, reviews = EXCLUDED.reviews, weight = EXCLUDED.weight, @@ -522,7 +529,12 @@ async function materializeCalculations(prodConnection, localConnection, incremen total_sold = EXCLUDED.total_sold, baskets = EXCLUDED.baskets, notifies = EXCLUDED.notifies, - date_last_sold = EXCLUDED.date_last_sold + date_last_sold = EXCLUDED.date_last_sold, + image = EXCLUDED.image, + image_175 = EXCLUDED.image_175, + image_full = EXCLUDED.image_full, + options = EXCLUDED.options, + tags = EXCLUDED.tags `, values); }, `Error inserting batch ${i} to ${i + batch.length}`); @@ -560,133 +572,151 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate } } - // Setup temporary tables - await setupTemporaryTables(localConnection); + // Start a transaction to ensure temporary tables persist + await localConnection.beginTransaction(); - // Materialize calculations into temp table - await materializeCalculations(prodConnection, localConnection, incrementalUpdate, lastSyncTime); + try { + // Setup temporary tables + await setupTemporaryTables(localConnection); - // Get the list of products that need updating - const [products] = await localConnection.query(` - SELECT - t.pid, - t.title, - t.description, - t.SKU, - t.stock_quantity, - t.pending_qty, - t.preorder_count, - t.notions_inv_count, - t.price, - t.regular_price, - t.cost_price, - t.vendor, - t.vendor_reference, - t.notions_reference, - t.brand, - t.line, - t.subline, - t.artist, - t.category_ids, - t.created_at, - t.first_received, - t.landing_cost_price, - t.barcode, - t.harmonized_tariff_code, - t.updated_at, - t.visible, - t.replenishable, - t.permalink, - t.moq, - t.rating, - t.reviews, - t.weight, - t.length, - t.width, - t.height, - t.country_of_origin, - t.location, - t.total_sold, - t.baskets, - t.notifies, - t.date_last_sold - FROM temp_products t - `); + // Materialize calculations into temp table + await materializeCalculations(prodConnection, localConnection, incrementalUpdate, lastSyncTime, startTime); - let recordsAdded = 0; - let recordsUpdated = 0; + // Get the list of products that need updating + const [products] = await localConnection.query(` + SELECT + t.pid, + t.title, + t.description, + t.SKU, + t.stock_quantity, + t.preorder_count, + t.notions_inv_count, + t.price, + t.regular_price, + t.cost_price, + t.vendor, + t.vendor_reference, + t.notions_reference, + t.brand, + t.line, + t.subline, + t.artist, + t.categories, + t.created_at, + t.first_received, + t.landing_cost_price, + t.barcode, + t.harmonized_tariff_code, + t.updated_at, + t.visible, + t.managing_stock, + t.replenishable, + t.permalink, + t.moq, + t.rating, + t.reviews, + t.weight, + t.length, + t.width, + t.height, + t.country_of_origin, + t.location, + t.total_sold, + t.baskets, + t.notifies, + t.date_last_sold, + t.image, + t.image_175, + t.image_full, + t.options, + t.tags + FROM temp_products t + WHERE t.needs_update = true + `); - // Process products in batches - for (let i = 0; i < products.length; i += BATCH_SIZE) { - const batch = products.slice(i, Math.min(i + BATCH_SIZE, products.length)); - - await withRetry(async () => { + // Process products in batches + let recordsAdded = 0; + let recordsUpdated = 0; + + for (let i = 0; i < products.rows.length; i += BATCH_SIZE) { + const batch = products.rows.slice(i, i + BATCH_SIZE); + const placeholders = batch.map((_, idx) => { - const offset = idx * 41; // 41 columns - return `(${Array.from({ length: 41 }, (_, i) => `$${offset + i + 1}`).join(', ')})`; + const base = idx * 47; // 47 columns + return `(${Array.from({ length: 47 }, (_, i) => `$${base + i + 1}`).join(', ')})`; }).join(','); - const values = batch.flatMap(row => [ - row.pid, - row.title, - row.description, - row.SKU, - row.stock_quantity, - row.pending_qty, - row.preorder_count, - row.notions_inv_count, - row.price, - row.regular_price, - row.cost_price, - row.vendor, - row.vendor_reference, - row.notions_reference, - row.brand, - row.line, - row.subline, - row.artist, - row.category_ids, - row.created_at, - row.first_received, - row.landing_cost_price, - row.barcode, - row.harmonized_tariff_code, - row.updated_at, - row.visible, - row.replenishable, - row.permalink, - row.moq, - row.rating, - row.reviews, - row.weight, - row.length, - row.width, - row.height, - row.country_of_origin, - row.location, - row.total_sold, - row.baskets, - row.notifies, - row.date_last_sold - ]); + const values = batch.flatMap(row => { + const imageUrls = getImageUrls(row.pid); + return [ + row.pid, + row.title, + row.description, + row.SKU || '', + row.stock_quantity > 5000 ? 0 : Math.max(0, row.stock_quantity), + row.preorder_count, + row.notions_inv_count, + row.price, + row.regular_price, + row.cost_price, + row.vendor, + row.vendor_reference, + row.notions_reference, + row.brand, + row.line, + row.subline, + row.artist, + row.categories, + validateDate(row.created_at), + validateDate(row.first_received), + row.landing_cost_price, + row.barcode, + row.harmonized_tariff_code, + validateDate(row.updated_at), + row.visible, + row.managing_stock, + row.replenishable, + row.permalink, + row.moq, + 1, + row.rating, + row.reviews, + row.weight, + row.length, + row.width, + row.height, + row.country_of_origin, + row.location, + row.total_sold, + row.baskets, + row.notifies, + validateDate(row.date_last_sold), + imageUrls.image, + imageUrls.image_175, + imageUrls.image_full, + row.options, + row.tags + ]; + }); - const result = await localConnection.query(` - WITH upserted_products AS ( + const [result] = await localConnection.query(` + WITH upserted AS ( INSERT INTO products ( - pid, title, description, SKU, stock_quantity, pending_qty, preorder_count, - notions_inv_count, price, regular_price, cost_price, vendor, vendor_reference, - notions_reference, brand, line, subline, artist, category_ids, created_at, - first_received, landing_cost_price, barcode, harmonized_tariff_code, - updated_at, visible, replenishable, permalink, moq, rating, reviews, + pid, title, description, SKU, stock_quantity, preorder_count, notions_inv_count, + price, regular_price, cost_price, vendor, vendor_reference, notions_reference, + brand, line, subline, artist, categories, created_at, first_received, + landing_cost_price, barcode, harmonized_tariff_code, updated_at, visible, + managing_stock, replenishable, permalink, moq, uom, rating, reviews, weight, length, width, height, country_of_origin, location, total_sold, - baskets, notifies, date_last_sold - ) VALUES ${placeholders} + baskets, notifies, date_last_sold, image, image_175, image_full, options, tags + ) + VALUES ${placeholders} ON CONFLICT (pid) DO UPDATE SET title = EXCLUDED.title, description = EXCLUDED.description, SKU = EXCLUDED.SKU, stock_quantity = EXCLUDED.stock_quantity, - pending_qty = EXCLUDED.pending_qty, preorder_count = EXCLUDED.preorder_count, notions_inv_count = EXCLUDED.notions_inv_count, price = EXCLUDED.price, @@ -699,7 +729,6 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate line = EXCLUDED.line, subline = EXCLUDED.subline, artist = EXCLUDED.artist, - category_ids = EXCLUDED.category_ids, created_at = EXCLUDED.created_at, first_received = EXCLUDED.first_received, landing_cost_price = EXCLUDED.landing_cost_price, @@ -707,9 +736,11 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate harmonized_tariff_code = EXCLUDED.harmonized_tariff_code, updated_at = EXCLUDED.updated_at, visible = EXCLUDED.visible, + managing_stock = EXCLUDED.managing_stock, replenishable = EXCLUDED.replenishable, permalink = EXCLUDED.permalink, moq = EXCLUDED.moq, + uom = EXCLUDED.uom, rating = EXCLUDED.rating, reviews = EXCLUDED.reviews, weight = EXCLUDED.weight, @@ -721,58 +752,82 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate total_sold = EXCLUDED.total_sold, baskets = EXCLUDED.baskets, notifies = EXCLUDED.notifies, - date_last_sold = EXCLUDED.date_last_sold + date_last_sold = EXCLUDED.date_last_sold, + image = EXCLUDED.image, + image_175 = EXCLUDED.image_175, + image_full = EXCLUDED.image_full, + options = EXCLUDED.options, + tags = EXCLUDED.tags RETURNING - CASE WHEN xmax::text::int > 0 THEN 0 ELSE 1 END AS inserted, - CASE WHEN xmax::text::int > 0 THEN 1 ELSE 0 END AS updated + xmax = 0 as inserted ) SELECT - COUNT(*) FILTER (WHERE inserted = 1) as inserted, - COUNT(*) FILTER (WHERE updated = 1) as updated - FROM upserted_products + COUNT(*) FILTER (WHERE inserted) as inserted, + COUNT(*) FILTER (WHERE NOT inserted) as updated + FROM upserted `, values); - recordsAdded += result.rows[0].inserted; - recordsUpdated += result.rows[0].updated; - }, `Error inserting batch ${i} to ${i + batch.length}`); + recordsAdded += parseInt(result.rows[0].inserted, 10) || 0; + recordsUpdated += parseInt(result.rows[0].updated, 10) || 0; - outputProgress({ - status: "running", - operation: "Products import", - message: `Imported ${i + batch.length} of ${products.length} products`, - current: i + batch.length, - total: products.length, - elapsed: formatElapsedTime((Date.now() - startTime) / 1000), - remaining: estimateRemaining(startTime, i + batch.length, products.length), - rate: calculateRate(startTime, i + batch.length) - }); + // Process category relationships for each product in the batch + for (const row of batch) { + if (row.categories) { + const categoryIds = row.categories.split(',').filter(id => id && id.trim()); + if (categoryIds.length > 0) { + const catPlaceholders = categoryIds.map((_, idx) => + `($${idx * 2 + 1}, $${idx * 2 + 2})` + ).join(','); + const catValues = categoryIds.flatMap(catId => [row.pid, parseInt(catId.trim(), 10)]); + + // First delete existing relationships for this product + await localConnection.query( + 'DELETE FROM product_categories WHERE pid = $1', + [row.pid] + ); + + // Then insert the new relationships + await localConnection.query(` + INSERT INTO product_categories (pid, cat_id) + VALUES ${catPlaceholders} + ON CONFLICT (pid, cat_id) DO NOTHING + `, catValues); + } + } + } + + outputProgress({ + status: "running", + operation: "Products import", + message: `Processing products: ${i + batch.length} of ${products.rows.length}`, + current: i + batch.length, + total: products.rows.length, + elapsed: formatElapsedTime((Date.now() - startTime) / 1000), + remaining: estimateRemaining(startTime, i + batch.length, products.rows.length), + rate: calculateRate(startTime, i + batch.length) + }); + } + + // Cleanup temporary tables + await cleanupTemporaryTables(localConnection); + + // Commit the transaction + await localConnection.commit(); + + return { + status: 'complete', + recordsAdded, + recordsUpdated, + totalRecords: products.rows.length, + duration: formatElapsedTime(Date.now() - startTime) + }; + } catch (error) { + // Rollback on error + await localConnection.rollback(); + throw error; } - - // Update sync status - await localConnection.query(` - INSERT INTO sync_status (table_name, last_sync_timestamp) - VALUES ('products', NOW()) - ON CONFLICT (table_name) DO UPDATE SET - last_sync_timestamp = NOW() - `); - - // Cleanup temporary tables - await cleanupTemporaryTables(localConnection); - - return { - status: "complete", - recordsAdded, - recordsUpdated, - totalRecords: products.length - }; } catch (error) { console.error('Error in importProducts:', error); - // Attempt cleanup on error - try { - await cleanupTemporaryTables(localConnection); - } catch (cleanupError) { - console.error('Error during cleanup:', cleanupError); - } throw error; } } diff --git a/inventory-server/scripts/import/utils.js b/inventory-server/scripts/import/utils.js index d0ea46c..b7d888d 100644 --- a/inventory-server/scripts/import/utils.js +++ b/inventory-server/scripts/import/utils.js @@ -64,25 +64,68 @@ async function setupConnections(sshConfig) { // Create a wrapper for the PostgreSQL pool to match MySQL interface const localConnection = { + _client: null, + _transactionActive: false, + query: async (text, params) => { - const client = await localPool.connect(); - try { - const result = await client.query(text, params); - return [result]; - } finally { - client.release(); + // If we're not in a transaction, use the pool directly + if (!localConnection._transactionActive) { + const client = await localPool.connect(); + try { + const result = await client.query(text, params); + return [result]; + } finally { + client.release(); + } } + + // If we're in a transaction, use the dedicated client + if (!localConnection._client) { + throw new Error('No active transaction client'); + } + const result = await localConnection._client.query(text, params); + return [result]; }, + + beginTransaction: async () => { + if (localConnection._transactionActive) { + throw new Error('Transaction already active'); + } + localConnection._client = await localPool.connect(); + await localConnection._client.query('BEGIN'); + localConnection._transactionActive = true; + }, + + commit: async () => { + if (!localConnection._transactionActive) { + throw new Error('No active transaction to commit'); + } + await localConnection._client.query('COMMIT'); + localConnection._client.release(); + localConnection._client = null; + localConnection._transactionActive = false; + }, + + rollback: async () => { + if (!localConnection._transactionActive) { + throw new Error('No active transaction to rollback'); + } + await localConnection._client.query('ROLLBACK'); + localConnection._client.release(); + localConnection._client = null; + localConnection._transactionActive = false; + }, + end: async () => { + if (localConnection._client) { + localConnection._client.release(); + localConnection._client = null; + } await localPool.end(); } }; - return { - ssh: tunnel.ssh, - prodConnection, - localConnection - }; + return { prodConnection, localConnection, tunnel }; } // Helper function to close connections