Update import scripts through products

This commit is contained in:
2025-02-14 21:46:50 -05:00
parent 9623681a15
commit 9b8577f258
3 changed files with 431 additions and 326 deletions

View File

@@ -268,17 +268,25 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
`, [batchIds]);
if (discounts.length > 0) {
const placeholders = discounts.map((_, idx) =>
`($${idx * 3 + 1}, $${idx * 3 + 2}, $${idx * 3 + 3})`
).join(",");
const values = discounts.flatMap(d => [d.order_id, d.pid, d.discount]);
const uniqueDiscounts = new Map();
discounts.forEach(d => {
const key = `${d.order_id}-${d.pid}`;
uniqueDiscounts.set(key, d);
});
await localConnection.query(`
INSERT INTO temp_order_discounts (order_id, pid, discount)
VALUES ${placeholders}
ON CONFLICT (order_id, pid) DO UPDATE SET
discount = EXCLUDED.discount
`, values);
const values = Array.from(uniqueDiscounts.values()).flatMap(d => [d.order_id, d.pid, d.discount || 0]);
if (values.length > 0) {
const placeholders = Array.from({length: uniqueDiscounts.size}, (_, idx) => {
const base = idx * 3;
return `($${base + 1}, $${base + 2}, $${base + 3})`;
}).join(",");
await localConnection.query(`
INSERT INTO temp_order_discounts (order_id, pid, discount)
VALUES ${placeholders}
ON CONFLICT (order_id, pid) DO UPDATE SET
discount = EXCLUDED.discount
`, values);
}
}
}
@@ -404,7 +412,6 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
// Filter orders and track missing products
const validOrders = [];
const values = [];
const processedOrderItems = new Set();
const processedOrders = new Set();
@@ -425,7 +432,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, $${base + 10}, $${base + 11}, $${base + 12}, $${base + 13}, $${base + 14}, $${base + 15})`;
}).join(',');
const values = validOrders.flatMap(o => [
const batchValues = validOrders.flatMap(o => [
o.order_number,
o.pid,
o.SKU,
@@ -471,7 +478,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
COUNT(*) FILTER (WHERE xmax = 0) as inserted,
COUNT(*) FILTER (WHERE xmax <> 0) as updated
FROM inserted_orders
`, values);
`, batchValues);
const { inserted, updated } = result.rows[0];
recordsAdded += inserted;

View File

@@ -35,17 +35,28 @@ async function withRetry(operation, errorMessage) {
throw lastError;
}
async function setupTemporaryTables(connection) {
await connection.query(`
DROP TABLE IF EXISTS temp_products;
// Add helper function at the top of the file
function validateDate(mysqlDate) {
if (!mysqlDate || mysqlDate === '0000-00-00' || mysqlDate === '0000-00-00 00:00:00') {
return null;
}
// Check if the date is valid
const date = new Date(mysqlDate);
return isNaN(date.getTime()) ? null : mysqlDate;
}
async function setupTemporaryTables(connection) {
// Drop the table if it exists
await connection.query('DROP TABLE IF EXISTS temp_products');
// Create the temporary table
await connection.query(`
CREATE TEMP TABLE temp_products (
pid BIGINT NOT NULL,
title VARCHAR(255),
description TEXT,
SKU VARCHAR(50),
stock_quantity INTEGER DEFAULT 0,
pending_qty INTEGER DEFAULT 0,
preorder_count INTEGER DEFAULT 0,
notions_inv_count INTEGER DEFAULT 0,
price DECIMAL(10,3) NOT NULL DEFAULT 0,
@@ -58,7 +69,7 @@ async function setupTemporaryTables(connection) {
line VARCHAR(100),
subline VARCHAR(100),
artist VARCHAR(100),
category_ids TEXT,
categories TEXT,
created_at TIMESTAMP,
first_received TIMESTAMP,
landing_cost_price DECIMAL(10,3),
@@ -66,9 +77,11 @@ async function setupTemporaryTables(connection) {
harmonized_tariff_code VARCHAR(50),
updated_at TIMESTAMP,
visible BOOLEAN,
managing_stock BOOLEAN DEFAULT true,
replenishable BOOLEAN,
permalink VARCHAR(255),
moq DECIMAL(10,3),
moq INTEGER DEFAULT 1,
uom INTEGER DEFAULT 1,
rating DECIMAL(10,2),
reviews INTEGER,
weight DECIMAL(10,3),
@@ -81,12 +94,17 @@ async function setupTemporaryTables(connection) {
baskets INTEGER,
notifies INTEGER,
date_last_sold TIMESTAMP,
image VARCHAR(255),
image_175 VARCHAR(255),
image_full VARCHAR(255),
options TEXT,
tags TEXT,
needs_update BOOLEAN DEFAULT TRUE,
PRIMARY KEY (pid)
);
)`);
CREATE INDEX idx_temp_products_needs_update ON temp_products (needs_update);
`);
// Create the index
await connection.query('CREATE INDEX idx_temp_products_needs_update ON temp_products (needs_update)');
}
async function cleanupTemporaryTables(connection) {
@@ -205,64 +223,73 @@ async function importMissingProducts(prodConnection, localConnection, missingPid
const batch = prodData.slice(i, i + BATCH_SIZE);
const placeholders = batch.map((_, idx) => {
const base = idx * 41; // 41 columns
return `(${Array.from({ length: 41 }, (_, i) => `$${base + i + 1}`).join(', ')})`;
const base = idx * 47; // 47 columns
return `(${Array.from({ length: 47 }, (_, i) => `$${base + i + 1}`).join(', ')})`;
}).join(',');
const values = batch.flatMap(row => [
row.pid,
row.title,
row.description,
row.SKU,
row.stock_quantity,
row.pending_qty,
row.preorder_count,
row.notions_inv_count,
row.price,
row.regular_price,
row.cost_price,
row.vendor,
row.vendor_reference,
row.notions_reference,
row.brand,
row.line,
row.subline,
row.artist,
row.category_ids,
row.date_created,
row.first_received,
row.landing_cost_price,
row.barcode,
row.harmonized_tariff_code,
row.updated_at,
row.visible,
row.replenishable,
row.permalink,
row.moq,
row.rating,
row.reviews,
row.weight,
row.length,
row.width,
row.height,
row.country_of_origin,
row.location,
row.total_sold,
row.baskets,
row.notifies,
row.date_last_sold
]);
const values = batch.flatMap(row => {
const imageUrls = getImageUrls(row.pid);
return [
row.pid,
row.title,
row.description,
row.itemnumber || '',
row.stock_quantity > 5000 ? 0 : Math.max(0, row.stock_quantity),
row.preorder_count,
row.notions_inv_count,
row.price,
row.regular_price,
row.cost_price,
row.vendor,
row.vendor_reference,
row.notions_reference,
row.brand,
row.line,
row.subline,
row.artist,
row.category_ids,
validateDate(row.date_created),
validateDate(row.first_received),
row.landing_cost_price,
row.barcode,
row.harmonized_tariff_code,
validateDate(row.updated_at),
row.visible,
true,
row.replenishable,
row.permalink,
Math.max(1, Math.round(row.moq || 1)),
1,
row.rating,
row.reviews,
row.weight,
row.length,
row.width,
row.height,
row.country_of_origin,
row.location,
row.total_sold,
row.baskets,
row.notifies,
validateDate(row.date_last_sold),
imageUrls.image,
imageUrls.image_175,
imageUrls.image_full,
null,
null
];
});
const [result] = await localConnection.query(`
WITH inserted_products AS (
INSERT INTO products (
pid, title, description, SKU, stock_quantity, pending_qty, preorder_count,
notions_inv_count, price, regular_price, cost_price, vendor, vendor_reference,
notions_reference, brand, line, subline, artist, category_ids, created_at,
first_received, landing_cost_price, barcode, harmonized_tariff_code,
updated_at, visible, replenishable, permalink, moq, rating, reviews,
pid, title, description, SKU, stock_quantity, preorder_count, notions_inv_count,
price, regular_price, cost_price, vendor, vendor_reference, notions_reference,
brand, line, subline, artist, categories, created_at, first_received,
landing_cost_price, barcode, harmonized_tariff_code, updated_at, visible,
managing_stock, replenishable, permalink, moq, uom, rating, reviews,
weight, length, width, height, country_of_origin, location, total_sold,
baskets, notifies, date_last_sold
baskets, notifies, date_last_sold, image, image_175, image_full, options, tags
)
VALUES ${placeholders}
ON CONFLICT (pid) DO NOTHING
@@ -285,7 +312,7 @@ async function importMissingProducts(prodConnection, localConnection, missingPid
}
}
async function materializeCalculations(prodConnection, localConnection, incrementalUpdate = true, lastSyncTime = '1970-01-01') {
async function materializeCalculations(prodConnection, localConnection, incrementalUpdate = true, lastSyncTime = '1970-01-01', startTime = Date.now()) {
outputProgress({
status: "running",
operation: "Products import",
@@ -315,36 +342,8 @@ async function materializeCalculations(prodConnection, localConnection, incremen
) THEN 0
ELSE 1
END AS replenishable,
COALESCE(si.available_local, 0) - COALESCE(
(SELECT SUM(oi.qty_ordered - oi.qty_placed)
FROM order_items oi
JOIN _order o ON oi.order_id = o.order_id
WHERE oi.prod_pid = p.pid
AND o.date_placed != '0000-00-00 00:00:00'
AND o.date_shipped = '0000-00-00 00:00:00'
AND oi.pick_finished = 0
AND oi.qty_back = 0
AND o.order_status != 15
AND o.order_status < 90
AND oi.qty_ordered >= oi.qty_placed
AND oi.qty_ordered > 0
), 0
) as stock_quantity,
COALESCE(
(SELECT SUM(oi.qty_ordered - oi.qty_placed)
FROM order_items oi
JOIN _order o ON oi.order_id = o.order_id
WHERE oi.prod_pid = p.pid
AND o.date_placed != '0000-00-00 00:00:00'
AND o.date_shipped = '0000-00-00 00:00:00'
AND oi.pick_finished = 0
AND oi.qty_back = 0
AND o.order_status != 15
AND o.order_status < 90
AND oi.qty_ordered >= oi.qty_placed
AND oi.qty_ordered > 0
), 0
) as pending_qty,
COALESCE(si.available_local, 0) as stock_quantity,
0 as pending_qty,
COALESCE(ci.onpreorder, 0) as preorder_count,
COALESCE(pnb.inventory, 0) as notions_inv_count,
COALESCE(pcp.price_each, 0) as price,
@@ -423,71 +422,78 @@ async function materializeCalculations(prodConnection, localConnection, incremen
await withRetry(async () => {
const placeholders = batch.map((_, idx) => {
const offset = idx * 41; // 41 columns
return `(${Array.from({ length: 41 }, (_, i) => `$${offset + i + 1}`).join(', ')})`;
const base = idx * 47; // 47 columns
return `(${Array.from({ length: 47 }, (_, i) => `$${base + i + 1}`).join(', ')})`;
}).join(',');
const values = batch.flatMap(row => [
row.pid,
row.title,
row.description,
row.SKU,
// Set stock quantity to 0 if it's over 5000
row.stock_quantity > 5000 ? 0 : Math.max(0, row.stock_quantity),
row.pending_qty,
row.preorder_count,
row.notions_inv_count,
row.price,
row.regular_price,
row.cost_price,
row.vendor,
row.vendor_reference,
row.notions_reference,
row.brand,
row.line,
row.subline,
row.artist,
row.category_ids,
row.date_created, // map to created_at
row.first_received,
row.landing_cost_price,
row.barcode,
row.harmonized_tariff_code,
row.updated_at,
row.visible,
row.replenishable,
row.permalink,
row.moq,
row.rating ? Number(row.rating).toFixed(2) : null,
row.reviews,
row.weight,
row.length,
row.width,
row.height,
row.country_of_origin,
row.location,
row.total_sold,
row.baskets,
row.notifies,
row.date_last_sold
]);
const values = batch.flatMap(row => {
const imageUrls = getImageUrls(row.pid);
return [
row.pid,
row.title,
row.description,
row.itemnumber || '',
row.stock_quantity > 5000 ? 0 : Math.max(0, row.stock_quantity),
row.preorder_count,
row.notions_inv_count,
row.price,
row.regular_price,
row.cost_price,
row.vendor,
row.vendor_reference,
row.notions_reference,
row.brand,
row.line,
row.subline,
row.artist,
row.category_ids,
validateDate(row.date_created),
validateDate(row.first_received),
row.landing_cost_price,
row.barcode,
row.harmonized_tariff_code,
validateDate(row.updated_at),
row.visible,
true,
row.replenishable,
row.permalink,
Math.max(1, Math.round(row.moq || 1)),
1,
row.rating,
row.reviews,
row.weight,
row.length,
row.width,
row.height,
row.country_of_origin,
row.location,
row.total_sold,
row.baskets,
row.notifies,
validateDate(row.date_last_sold),
imageUrls.image,
imageUrls.image_175,
imageUrls.image_full,
null,
null
];
});
await localConnection.query(`
INSERT INTO temp_products (
pid, title, description, SKU, stock_quantity, pending_qty, preorder_count,
notions_inv_count, price, regular_price, cost_price, vendor, vendor_reference,
notions_reference, brand, line, subline, artist, category_ids, created_at,
first_received, landing_cost_price, barcode, harmonized_tariff_code,
updated_at, visible, replenishable, permalink, moq, rating, reviews,
pid, title, description, SKU, stock_quantity, preorder_count, notions_inv_count,
price, regular_price, cost_price, vendor, vendor_reference, notions_reference,
brand, line, subline, artist, categories, created_at, first_received,
landing_cost_price, barcode, harmonized_tariff_code, updated_at, visible,
managing_stock, replenishable, permalink, moq, uom, rating, reviews,
weight, length, width, height, country_of_origin, location, total_sold,
baskets, notifies, date_last_sold
baskets, notifies, date_last_sold, image, image_175, image_full, options, tags
) VALUES ${placeholders}
ON CONFLICT (pid) DO UPDATE SET
title = EXCLUDED.title,
description = EXCLUDED.description,
SKU = EXCLUDED.SKU,
stock_quantity = EXCLUDED.stock_quantity,
pending_qty = EXCLUDED.pending_qty,
preorder_count = EXCLUDED.preorder_count,
notions_inv_count = EXCLUDED.notions_inv_count,
price = EXCLUDED.price,
@@ -500,7 +506,6 @@ async function materializeCalculations(prodConnection, localConnection, incremen
line = EXCLUDED.line,
subline = EXCLUDED.subline,
artist = EXCLUDED.artist,
category_ids = EXCLUDED.category_ids,
created_at = EXCLUDED.created_at,
first_received = EXCLUDED.first_received,
landing_cost_price = EXCLUDED.landing_cost_price,
@@ -508,9 +513,11 @@ async function materializeCalculations(prodConnection, localConnection, incremen
harmonized_tariff_code = EXCLUDED.harmonized_tariff_code,
updated_at = EXCLUDED.updated_at,
visible = EXCLUDED.visible,
managing_stock = EXCLUDED.managing_stock,
replenishable = EXCLUDED.replenishable,
permalink = EXCLUDED.permalink,
moq = EXCLUDED.moq,
uom = EXCLUDED.uom,
rating = EXCLUDED.rating,
reviews = EXCLUDED.reviews,
weight = EXCLUDED.weight,
@@ -522,7 +529,12 @@ async function materializeCalculations(prodConnection, localConnection, incremen
total_sold = EXCLUDED.total_sold,
baskets = EXCLUDED.baskets,
notifies = EXCLUDED.notifies,
date_last_sold = EXCLUDED.date_last_sold
date_last_sold = EXCLUDED.date_last_sold,
image = EXCLUDED.image,
image_175 = EXCLUDED.image_175,
image_full = EXCLUDED.image_full,
options = EXCLUDED.options,
tags = EXCLUDED.tags
`, values);
}, `Error inserting batch ${i} to ${i + batch.length}`);
@@ -560,133 +572,151 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
}
}
// Setup temporary tables
await setupTemporaryTables(localConnection);
// Start a transaction to ensure temporary tables persist
await localConnection.beginTransaction();
// Materialize calculations into temp table
await materializeCalculations(prodConnection, localConnection, incrementalUpdate, lastSyncTime);
try {
// Setup temporary tables
await setupTemporaryTables(localConnection);
// Get the list of products that need updating
const [products] = await localConnection.query(`
SELECT
t.pid,
t.title,
t.description,
t.SKU,
t.stock_quantity,
t.pending_qty,
t.preorder_count,
t.notions_inv_count,
t.price,
t.regular_price,
t.cost_price,
t.vendor,
t.vendor_reference,
t.notions_reference,
t.brand,
t.line,
t.subline,
t.artist,
t.category_ids,
t.created_at,
t.first_received,
t.landing_cost_price,
t.barcode,
t.harmonized_tariff_code,
t.updated_at,
t.visible,
t.replenishable,
t.permalink,
t.moq,
t.rating,
t.reviews,
t.weight,
t.length,
t.width,
t.height,
t.country_of_origin,
t.location,
t.total_sold,
t.baskets,
t.notifies,
t.date_last_sold
FROM temp_products t
`);
// Materialize calculations into temp table
await materializeCalculations(prodConnection, localConnection, incrementalUpdate, lastSyncTime, startTime);
let recordsAdded = 0;
let recordsUpdated = 0;
// Get the list of products that need updating
const [products] = await localConnection.query(`
SELECT
t.pid,
t.title,
t.description,
t.SKU,
t.stock_quantity,
t.preorder_count,
t.notions_inv_count,
t.price,
t.regular_price,
t.cost_price,
t.vendor,
t.vendor_reference,
t.notions_reference,
t.brand,
t.line,
t.subline,
t.artist,
t.categories,
t.created_at,
t.first_received,
t.landing_cost_price,
t.barcode,
t.harmonized_tariff_code,
t.updated_at,
t.visible,
t.managing_stock,
t.replenishable,
t.permalink,
t.moq,
t.rating,
t.reviews,
t.weight,
t.length,
t.width,
t.height,
t.country_of_origin,
t.location,
t.total_sold,
t.baskets,
t.notifies,
t.date_last_sold,
t.image,
t.image_175,
t.image_full,
t.options,
t.tags
FROM temp_products t
WHERE t.needs_update = true
`);
// Process products in batches
for (let i = 0; i < products.length; i += BATCH_SIZE) {
const batch = products.slice(i, Math.min(i + BATCH_SIZE, products.length));
await withRetry(async () => {
// Process products in batches
let recordsAdded = 0;
let recordsUpdated = 0;
for (let i = 0; i < products.rows.length; i += BATCH_SIZE) {
const batch = products.rows.slice(i, i + BATCH_SIZE);
const placeholders = batch.map((_, idx) => {
const offset = idx * 41; // 41 columns
return `(${Array.from({ length: 41 }, (_, i) => `$${offset + i + 1}`).join(', ')})`;
const base = idx * 47; // 47 columns
return `(${Array.from({ length: 47 }, (_, i) => `$${base + i + 1}`).join(', ')})`;
}).join(',');
const values = batch.flatMap(row => [
row.pid,
row.title,
row.description,
row.SKU,
row.stock_quantity,
row.pending_qty,
row.preorder_count,
row.notions_inv_count,
row.price,
row.regular_price,
row.cost_price,
row.vendor,
row.vendor_reference,
row.notions_reference,
row.brand,
row.line,
row.subline,
row.artist,
row.category_ids,
row.created_at,
row.first_received,
row.landing_cost_price,
row.barcode,
row.harmonized_tariff_code,
row.updated_at,
row.visible,
row.replenishable,
row.permalink,
row.moq,
row.rating,
row.reviews,
row.weight,
row.length,
row.width,
row.height,
row.country_of_origin,
row.location,
row.total_sold,
row.baskets,
row.notifies,
row.date_last_sold
]);
const values = batch.flatMap(row => {
const imageUrls = getImageUrls(row.pid);
return [
row.pid,
row.title,
row.description,
row.SKU || '',
row.stock_quantity > 5000 ? 0 : Math.max(0, row.stock_quantity),
row.preorder_count,
row.notions_inv_count,
row.price,
row.regular_price,
row.cost_price,
row.vendor,
row.vendor_reference,
row.notions_reference,
row.brand,
row.line,
row.subline,
row.artist,
row.categories,
validateDate(row.created_at),
validateDate(row.first_received),
row.landing_cost_price,
row.barcode,
row.harmonized_tariff_code,
validateDate(row.updated_at),
row.visible,
row.managing_stock,
row.replenishable,
row.permalink,
row.moq,
1,
row.rating,
row.reviews,
row.weight,
row.length,
row.width,
row.height,
row.country_of_origin,
row.location,
row.total_sold,
row.baskets,
row.notifies,
validateDate(row.date_last_sold),
imageUrls.image,
imageUrls.image_175,
imageUrls.image_full,
row.options,
row.tags
];
});
const result = await localConnection.query(`
WITH upserted_products AS (
const [result] = await localConnection.query(`
WITH upserted AS (
INSERT INTO products (
pid, title, description, SKU, stock_quantity, pending_qty, preorder_count,
notions_inv_count, price, regular_price, cost_price, vendor, vendor_reference,
notions_reference, brand, line, subline, artist, category_ids, created_at,
first_received, landing_cost_price, barcode, harmonized_tariff_code,
updated_at, visible, replenishable, permalink, moq, rating, reviews,
pid, title, description, SKU, stock_quantity, preorder_count, notions_inv_count,
price, regular_price, cost_price, vendor, vendor_reference, notions_reference,
brand, line, subline, artist, categories, created_at, first_received,
landing_cost_price, barcode, harmonized_tariff_code, updated_at, visible,
managing_stock, replenishable, permalink, moq, uom, rating, reviews,
weight, length, width, height, country_of_origin, location, total_sold,
baskets, notifies, date_last_sold
) VALUES ${placeholders}
baskets, notifies, date_last_sold, image, image_175, image_full, options, tags
)
VALUES ${placeholders}
ON CONFLICT (pid) DO UPDATE SET
title = EXCLUDED.title,
description = EXCLUDED.description,
SKU = EXCLUDED.SKU,
stock_quantity = EXCLUDED.stock_quantity,
pending_qty = EXCLUDED.pending_qty,
preorder_count = EXCLUDED.preorder_count,
notions_inv_count = EXCLUDED.notions_inv_count,
price = EXCLUDED.price,
@@ -699,7 +729,6 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
line = EXCLUDED.line,
subline = EXCLUDED.subline,
artist = EXCLUDED.artist,
category_ids = EXCLUDED.category_ids,
created_at = EXCLUDED.created_at,
first_received = EXCLUDED.first_received,
landing_cost_price = EXCLUDED.landing_cost_price,
@@ -707,9 +736,11 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
harmonized_tariff_code = EXCLUDED.harmonized_tariff_code,
updated_at = EXCLUDED.updated_at,
visible = EXCLUDED.visible,
managing_stock = EXCLUDED.managing_stock,
replenishable = EXCLUDED.replenishable,
permalink = EXCLUDED.permalink,
moq = EXCLUDED.moq,
uom = EXCLUDED.uom,
rating = EXCLUDED.rating,
reviews = EXCLUDED.reviews,
weight = EXCLUDED.weight,
@@ -721,58 +752,82 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
total_sold = EXCLUDED.total_sold,
baskets = EXCLUDED.baskets,
notifies = EXCLUDED.notifies,
date_last_sold = EXCLUDED.date_last_sold
date_last_sold = EXCLUDED.date_last_sold,
image = EXCLUDED.image,
image_175 = EXCLUDED.image_175,
image_full = EXCLUDED.image_full,
options = EXCLUDED.options,
tags = EXCLUDED.tags
RETURNING
CASE WHEN xmax::text::int > 0 THEN 0 ELSE 1 END AS inserted,
CASE WHEN xmax::text::int > 0 THEN 1 ELSE 0 END AS updated
xmax = 0 as inserted
)
SELECT
COUNT(*) FILTER (WHERE inserted = 1) as inserted,
COUNT(*) FILTER (WHERE updated = 1) as updated
FROM upserted_products
COUNT(*) FILTER (WHERE inserted) as inserted,
COUNT(*) FILTER (WHERE NOT inserted) as updated
FROM upserted
`, values);
recordsAdded += result.rows[0].inserted;
recordsUpdated += result.rows[0].updated;
}, `Error inserting batch ${i} to ${i + batch.length}`);
recordsAdded += parseInt(result.rows[0].inserted, 10) || 0;
recordsUpdated += parseInt(result.rows[0].updated, 10) || 0;
outputProgress({
status: "running",
operation: "Products import",
message: `Imported ${i + batch.length} of ${products.length} products`,
current: i + batch.length,
total: products.length,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, i + batch.length, products.length),
rate: calculateRate(startTime, i + batch.length)
});
// Process category relationships for each product in the batch
for (const row of batch) {
if (row.categories) {
const categoryIds = row.categories.split(',').filter(id => id && id.trim());
if (categoryIds.length > 0) {
const catPlaceholders = categoryIds.map((_, idx) =>
`($${idx * 2 + 1}, $${idx * 2 + 2})`
).join(',');
const catValues = categoryIds.flatMap(catId => [row.pid, parseInt(catId.trim(), 10)]);
// First delete existing relationships for this product
await localConnection.query(
'DELETE FROM product_categories WHERE pid = $1',
[row.pid]
);
// Then insert the new relationships
await localConnection.query(`
INSERT INTO product_categories (pid, cat_id)
VALUES ${catPlaceholders}
ON CONFLICT (pid, cat_id) DO NOTHING
`, catValues);
}
}
}
outputProgress({
status: "running",
operation: "Products import",
message: `Processing products: ${i + batch.length} of ${products.rows.length}`,
current: i + batch.length,
total: products.rows.length,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, i + batch.length, products.rows.length),
rate: calculateRate(startTime, i + batch.length)
});
}
// Cleanup temporary tables
await cleanupTemporaryTables(localConnection);
// Commit the transaction
await localConnection.commit();
return {
status: 'complete',
recordsAdded,
recordsUpdated,
totalRecords: products.rows.length,
duration: formatElapsedTime(Date.now() - startTime)
};
} catch (error) {
// Rollback on error
await localConnection.rollback();
throw error;
}
// Update sync status
await localConnection.query(`
INSERT INTO sync_status (table_name, last_sync_timestamp)
VALUES ('products', NOW())
ON CONFLICT (table_name) DO UPDATE SET
last_sync_timestamp = NOW()
`);
// Cleanup temporary tables
await cleanupTemporaryTables(localConnection);
return {
status: "complete",
recordsAdded,
recordsUpdated,
totalRecords: products.length
};
} catch (error) {
console.error('Error in importProducts:', error);
// Attempt cleanup on error
try {
await cleanupTemporaryTables(localConnection);
} catch (cleanupError) {
console.error('Error during cleanup:', cleanupError);
}
throw error;
}
}

View File

@@ -64,25 +64,68 @@ async function setupConnections(sshConfig) {
// Create a wrapper for the PostgreSQL pool to match MySQL interface
const localConnection = {
_client: null,
_transactionActive: false,
query: async (text, params) => {
const client = await localPool.connect();
try {
const result = await client.query(text, params);
return [result];
} finally {
client.release();
// If we're not in a transaction, use the pool directly
if (!localConnection._transactionActive) {
const client = await localPool.connect();
try {
const result = await client.query(text, params);
return [result];
} finally {
client.release();
}
}
// If we're in a transaction, use the dedicated client
if (!localConnection._client) {
throw new Error('No active transaction client');
}
const result = await localConnection._client.query(text, params);
return [result];
},
beginTransaction: async () => {
if (localConnection._transactionActive) {
throw new Error('Transaction already active');
}
localConnection._client = await localPool.connect();
await localConnection._client.query('BEGIN');
localConnection._transactionActive = true;
},
commit: async () => {
if (!localConnection._transactionActive) {
throw new Error('No active transaction to commit');
}
await localConnection._client.query('COMMIT');
localConnection._client.release();
localConnection._client = null;
localConnection._transactionActive = false;
},
rollback: async () => {
if (!localConnection._transactionActive) {
throw new Error('No active transaction to rollback');
}
await localConnection._client.query('ROLLBACK');
localConnection._client.release();
localConnection._client = null;
localConnection._transactionActive = false;
},
end: async () => {
if (localConnection._client) {
localConnection._client.release();
localConnection._client = null;
}
await localPool.end();
}
};
return {
ssh: tunnel.ssh,
prodConnection,
localConnection
};
return { prodConnection, localConnection, tunnel };
}
// Helper function to close connections