2 Commits

Author SHA1 Message Date
87d4b9e804 Fixes/improvements for import scripts 2025-03-24 22:27:44 -04:00
75da2c6772 Get all import scripts running again 2025-03-24 21:58:00 -04:00
7 changed files with 336 additions and 685 deletions

View File

@@ -120,27 +120,38 @@ async function main() {
`); `);
// Create import history record for the overall session // Create import history record for the overall session
const [historyResult] = await localConnection.query(` try {
INSERT INTO import_history ( const [historyResult] = await localConnection.query(`
table_name, INSERT INTO import_history (
start_time, table_name,
is_incremental, start_time,
status, is_incremental,
additional_info status,
) VALUES ( additional_info
'all_tables', ) VALUES (
NOW(), 'all_tables',
$1::boolean, NOW(),
'running', $1::boolean,
jsonb_build_object( 'running',
'categories_enabled', $2::boolean, jsonb_build_object(
'products_enabled', $3::boolean, 'categories_enabled', $2::boolean,
'orders_enabled', $4::boolean, 'products_enabled', $3::boolean,
'purchase_orders_enabled', $5::boolean 'orders_enabled', $4::boolean,
) 'purchase_orders_enabled', $5::boolean
) RETURNING id )
`, [INCREMENTAL_UPDATE, IMPORT_CATEGORIES, IMPORT_PRODUCTS, IMPORT_ORDERS, IMPORT_PURCHASE_ORDERS]); ) RETURNING id
importHistoryId = historyResult.rows[0].id; `, [INCREMENTAL_UPDATE, IMPORT_CATEGORIES, IMPORT_PRODUCTS, IMPORT_ORDERS, IMPORT_PURCHASE_ORDERS]);
importHistoryId = historyResult.rows[0].id;
} catch (error) {
console.error("Error creating import history record:", error);
outputProgress({
status: "error",
operation: "Import process",
message: "Failed to create import history record",
error: error.message
});
throw error;
}
const results = { const results = {
categories: null, categories: null,
@@ -158,8 +169,8 @@ async function main() {
if (isImportCancelled) throw new Error("Import cancelled"); if (isImportCancelled) throw new Error("Import cancelled");
completedSteps++; completedSteps++;
console.log('Categories import result:', results.categories); console.log('Categories import result:', results.categories);
totalRecordsAdded += parseInt(results.categories?.recordsAdded || 0); totalRecordsAdded += parseInt(results.categories?.recordsAdded || 0) || 0;
totalRecordsUpdated += parseInt(results.categories?.recordsUpdated || 0); totalRecordsUpdated += parseInt(results.categories?.recordsUpdated || 0) || 0;
} }
if (IMPORT_PRODUCTS) { if (IMPORT_PRODUCTS) {
@@ -167,8 +178,8 @@ async function main() {
if (isImportCancelled) throw new Error("Import cancelled"); if (isImportCancelled) throw new Error("Import cancelled");
completedSteps++; completedSteps++;
console.log('Products import result:', results.products); console.log('Products import result:', results.products);
totalRecordsAdded += parseInt(results.products?.recordsAdded || 0); totalRecordsAdded += parseInt(results.products?.recordsAdded || 0) || 0;
totalRecordsUpdated += parseInt(results.products?.recordsUpdated || 0); totalRecordsUpdated += parseInt(results.products?.recordsUpdated || 0) || 0;
} }
if (IMPORT_ORDERS) { if (IMPORT_ORDERS) {
@@ -176,17 +187,34 @@ async function main() {
if (isImportCancelled) throw new Error("Import cancelled"); if (isImportCancelled) throw new Error("Import cancelled");
completedSteps++; completedSteps++;
console.log('Orders import result:', results.orders); console.log('Orders import result:', results.orders);
totalRecordsAdded += parseInt(results.orders?.recordsAdded || 0); totalRecordsAdded += parseInt(results.orders?.recordsAdded || 0) || 0;
totalRecordsUpdated += parseInt(results.orders?.recordsUpdated || 0); totalRecordsUpdated += parseInt(results.orders?.recordsUpdated || 0) || 0;
} }
if (IMPORT_PURCHASE_ORDERS) { if (IMPORT_PURCHASE_ORDERS) {
results.purchaseOrders = await importPurchaseOrders(prodConnection, localConnection, INCREMENTAL_UPDATE); try {
if (isImportCancelled) throw new Error("Import cancelled"); results.purchaseOrders = await importPurchaseOrders(prodConnection, localConnection, INCREMENTAL_UPDATE);
completedSteps++; if (isImportCancelled) throw new Error("Import cancelled");
console.log('Purchase orders import result:', results.purchaseOrders); completedSteps++;
totalRecordsAdded += parseInt(results.purchaseOrders?.recordsAdded || 0); console.log('Purchase orders import result:', results.purchaseOrders);
totalRecordsUpdated += parseInt(results.purchaseOrders?.recordsUpdated || 0);
// Handle potential error status
if (results.purchaseOrders?.status === 'error') {
console.error('Purchase orders import had an error:', results.purchaseOrders.error);
} else {
totalRecordsAdded += parseInt(results.purchaseOrders?.recordsAdded || 0) || 0;
totalRecordsUpdated += parseInt(results.purchaseOrders?.recordsUpdated || 0) || 0;
}
} catch (error) {
console.error('Error during purchase orders import:', error);
// Continue with other imports, don't fail the whole process
results.purchaseOrders = {
status: 'error',
error: error.message,
recordsAdded: 0,
recordsUpdated: 0
};
}
} }
const endTime = Date.now(); const endTime = Date.now();
@@ -214,8 +242,8 @@ async function main() {
WHERE id = $12 WHERE id = $12
`, [ `, [
totalElapsedSeconds, totalElapsedSeconds,
totalRecordsAdded, parseInt(totalRecordsAdded) || 0,
totalRecordsUpdated, parseInt(totalRecordsUpdated) || 0,
IMPORT_CATEGORIES, IMPORT_CATEGORIES,
IMPORT_PRODUCTS, IMPORT_PRODUCTS,
IMPORT_ORDERS, IMPORT_ORDERS,

View File

@@ -47,42 +47,18 @@ async function importCategories(prodConnection, localConnection) {
continue; continue;
} }
console.log(`\nProcessing ${categories.length} type ${type} categories`); console.log(`Processing ${categories.length} type ${type} categories`);
if (type === 10) {
console.log("Type 10 categories:", JSON.stringify(categories, null, 2));
}
// For types that can have parents (11, 21, 12, 13), verify parent existence // For types that can have parents (11, 21, 12, 13), we'll proceed directly
// No need to check for parent existence since we process in hierarchical order
let categoriesToInsert = categories; let categoriesToInsert = categories;
if (![10, 20].includes(type)) {
// Get all parent IDs
const parentIds = [
...new Set(
categories
.filter(c => c && c.parent_id !== null)
.map(c => c.parent_id)
),
];
console.log(`Processing ${categories.length} type ${type} categories with ${parentIds.length} unique parent IDs`);
console.log('Parent IDs:', parentIds);
// No need to check for parent existence - we trust they exist since they were just inserted
categoriesToInsert = categories;
}
if (categoriesToInsert.length === 0) { if (categoriesToInsert.length === 0) {
console.log( console.log(`No valid categories of type ${type} to insert`);
`No valid categories of type ${type} to insert`
);
await localConnection.query(`RELEASE SAVEPOINT category_type_${type}`); await localConnection.query(`RELEASE SAVEPOINT category_type_${type}`);
continue; continue;
} }
console.log(
`Inserting ${categoriesToInsert.length} type ${type} categories`
);
// PostgreSQL upsert query with parameterized values // PostgreSQL upsert query with parameterized values
const values = categoriesToInsert.flatMap((cat) => [ const values = categoriesToInsert.flatMap((cat) => [
cat.cat_id, cat.cat_id,
@@ -95,14 +71,10 @@ async function importCategories(prodConnection, localConnection) {
new Date() new Date()
]); ]);
console.log('Attempting to insert/update with values:', JSON.stringify(values, null, 2));
const placeholders = categoriesToInsert const placeholders = categoriesToInsert
.map((_, i) => `($${i * 8 + 1}, $${i * 8 + 2}, $${i * 8 + 3}, $${i * 8 + 4}, $${i * 8 + 5}, $${i * 8 + 6}, $${i * 8 + 7}, $${i * 8 + 8})`) .map((_, i) => `($${i * 8 + 1}, $${i * 8 + 2}, $${i * 8 + 3}, $${i * 8 + 4}, $${i * 8 + 5}, $${i * 8 + 6}, $${i * 8 + 7}, $${i * 8 + 8})`)
.join(','); .join(',');
console.log('Using placeholders:', placeholders);
// Insert categories with ON CONFLICT clause for PostgreSQL // Insert categories with ON CONFLICT clause for PostgreSQL
const query = ` const query = `
WITH inserted_categories AS ( WITH inserted_categories AS (
@@ -130,16 +102,13 @@ async function importCategories(prodConnection, localConnection) {
COUNT(*) FILTER (WHERE NOT is_insert) as updated COUNT(*) FILTER (WHERE NOT is_insert) as updated
FROM inserted_categories`; FROM inserted_categories`;
console.log('Executing query:', query);
const result = await localConnection.query(query, values); const result = await localConnection.query(query, values);
console.log('Query result:', result);
// Get the first result since query returns an array // Get the first result since query returns an array
const queryResult = Array.isArray(result) ? result[0] : result; const queryResult = Array.isArray(result) ? result[0] : result;
if (!queryResult || !queryResult.rows || !queryResult.rows[0]) { if (!queryResult || !queryResult.rows || !queryResult.rows[0]) {
console.error('Query failed to return results. Result:', queryResult); console.error('Query failed to return results');
throw new Error('Query did not return expected results'); throw new Error('Query did not return expected results');
} }

View File

@@ -26,6 +26,9 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
let cumulativeProcessedOrders = 0; let cumulativeProcessedOrders = 0;
try { try {
// Begin transaction
await localConnection.beginTransaction();
// Get last sync info // Get last sync info
const [syncInfo] = await localConnection.query( const [syncInfo] = await localConnection.query(
"SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'orders'" "SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'orders'"
@@ -38,7 +41,6 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
const [[{ total }]] = await prodConnection.query(` const [[{ total }]] = await prodConnection.query(`
SELECT COUNT(*) as total SELECT COUNT(*) as total
FROM order_items oi FROM order_items oi
USE INDEX (PRIMARY)
JOIN _order o ON oi.order_id = o.order_id JOIN _order o ON oi.order_id = o.order_id
WHERE o.order_status >= 15 WHERE o.order_status >= 15
AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR)
@@ -78,7 +80,6 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
COALESCE(oi.prod_price_reg - oi.prod_price, 0) as base_discount, COALESCE(oi.prod_price_reg - oi.prod_price, 0) as base_discount,
oi.stamp as last_modified oi.stamp as last_modified
FROM order_items oi FROM order_items oi
USE INDEX (PRIMARY)
JOIN _order o ON oi.order_id = o.order_id JOIN _order o ON oi.order_id = o.order_id
WHERE o.order_status >= 15 WHERE o.order_status >= 15
AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR)
@@ -105,15 +106,15 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
console.log('Orders: Found', orderItems.length, 'order items to process'); console.log('Orders: Found', orderItems.length, 'order items to process');
// Create tables in PostgreSQL for debugging // Create tables in PostgreSQL for data processing
await localConnection.query(` await localConnection.query(`
DROP TABLE IF EXISTS debug_order_items; DROP TABLE IF EXISTS temp_order_items;
DROP TABLE IF EXISTS debug_order_meta; DROP TABLE IF EXISTS temp_order_meta;
DROP TABLE IF EXISTS debug_order_discounts; DROP TABLE IF EXISTS temp_order_discounts;
DROP TABLE IF EXISTS debug_order_taxes; DROP TABLE IF EXISTS temp_order_taxes;
DROP TABLE IF EXISTS debug_order_costs; DROP TABLE IF EXISTS temp_order_costs;
CREATE TABLE debug_order_items ( CREATE TEMP TABLE temp_order_items (
order_id INTEGER NOT NULL, order_id INTEGER NOT NULL,
pid INTEGER NOT NULL, pid INTEGER NOT NULL,
SKU VARCHAR(50) NOT NULL, SKU VARCHAR(50) NOT NULL,
@@ -123,7 +124,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
PRIMARY KEY (order_id, pid) PRIMARY KEY (order_id, pid)
); );
CREATE TABLE debug_order_meta ( CREATE TEMP TABLE temp_order_meta (
order_id INTEGER NOT NULL, order_id INTEGER NOT NULL,
date DATE NOT NULL, date DATE NOT NULL,
customer VARCHAR(100) NOT NULL, customer VARCHAR(100) NOT NULL,
@@ -135,26 +136,29 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
PRIMARY KEY (order_id) PRIMARY KEY (order_id)
); );
CREATE TABLE debug_order_discounts ( CREATE TEMP TABLE temp_order_discounts (
order_id INTEGER NOT NULL, order_id INTEGER NOT NULL,
pid INTEGER NOT NULL, pid INTEGER NOT NULL,
discount DECIMAL(10,2) NOT NULL, discount DECIMAL(10,2) NOT NULL,
PRIMARY KEY (order_id, pid) PRIMARY KEY (order_id, pid)
); );
CREATE TABLE debug_order_taxes ( CREATE TEMP TABLE temp_order_taxes (
order_id INTEGER NOT NULL, order_id INTEGER NOT NULL,
pid INTEGER NOT NULL, pid INTEGER NOT NULL,
tax DECIMAL(10,2) NOT NULL, tax DECIMAL(10,2) NOT NULL,
PRIMARY KEY (order_id, pid) PRIMARY KEY (order_id, pid)
); );
CREATE TABLE debug_order_costs ( CREATE TEMP TABLE temp_order_costs (
order_id INTEGER NOT NULL, order_id INTEGER NOT NULL,
pid INTEGER NOT NULL, pid INTEGER NOT NULL,
costeach DECIMAL(10,3) DEFAULT 0.000, costeach DECIMAL(10,3) DEFAULT 0.000,
PRIMARY KEY (order_id, pid) PRIMARY KEY (order_id, pid)
); );
CREATE INDEX idx_temp_order_items_pid ON temp_order_items(pid);
CREATE INDEX idx_temp_order_meta_order_id ON temp_order_meta(order_id);
`); `);
// Insert order items in batches // Insert order items in batches
@@ -168,7 +172,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
]); ]);
await localConnection.query(` await localConnection.query(`
INSERT INTO debug_order_items (order_id, pid, SKU, price, quantity, base_discount) INSERT INTO temp_order_items (order_id, pid, SKU, price, quantity, base_discount)
VALUES ${placeholders} VALUES ${placeholders}
ON CONFLICT (order_id, pid) DO UPDATE SET ON CONFLICT (order_id, pid) DO UPDATE SET
SKU = EXCLUDED.SKU, SKU = EXCLUDED.SKU,
@@ -239,7 +243,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
]); ]);
await localConnection.query(` await localConnection.query(`
INSERT INTO debug_order_meta ( INSERT INTO temp_order_meta (
order_id, date, customer, customer_name, status, canceled, order_id, date, customer, customer_name, status, canceled,
summary_discount, summary_subtotal summary_discount, summary_subtotal
) )
@@ -281,7 +285,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
]); ]);
await localConnection.query(` await localConnection.query(`
INSERT INTO debug_order_discounts (order_id, pid, discount) INSERT INTO temp_order_discounts (order_id, pid, discount)
VALUES ${placeholders} VALUES ${placeholders}
ON CONFLICT (order_id, pid) DO UPDATE SET ON CONFLICT (order_id, pid) DO UPDATE SET
discount = EXCLUDED.discount discount = EXCLUDED.discount
@@ -321,7 +325,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
]); ]);
await localConnection.query(` await localConnection.query(`
INSERT INTO debug_order_taxes (order_id, pid, tax) INSERT INTO temp_order_taxes (order_id, pid, tax)
VALUES ${placeholders} VALUES ${placeholders}
ON CONFLICT (order_id, pid) DO UPDATE SET ON CONFLICT (order_id, pid) DO UPDATE SET
tax = EXCLUDED.tax tax = EXCLUDED.tax
@@ -330,14 +334,23 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
}; };
const processCostsBatch = async (batchIds) => { const processCostsBatch = async (batchIds) => {
// Modified query to ensure one row per order_id/pid by using a subquery
const [costs] = await prodConnection.query(` const [costs] = await prodConnection.query(`
SELECT SELECT
oc.orderid as order_id, oc.orderid as order_id,
oc.pid, oc.pid,
oc.costeach oc.costeach
FROM order_costs oc FROM order_costs oc
WHERE oc.orderid IN (?) INNER JOIN (
AND oc.pending = 0 SELECT
orderid,
pid,
MAX(id) as max_id
FROM order_costs
WHERE orderid IN (?)
AND pending = 0
GROUP BY orderid, pid
) latest ON oc.orderid = latest.orderid AND oc.pid = latest.pid AND oc.id = latest.max_id
`, [batchIds]); `, [batchIds]);
if (costs.length === 0) return; if (costs.length === 0) return;
@@ -357,7 +370,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
]); ]);
await localConnection.query(` await localConnection.query(`
INSERT INTO debug_order_costs (order_id, pid, costeach) INSERT INTO temp_order_costs (order_id, pid, costeach)
VALUES ${placeholders} VALUES ${placeholders}
ON CONFLICT (order_id, pid) DO UPDATE SET ON CONFLICT (order_id, pid) DO UPDATE SET
costeach = EXCLUDED.costeach costeach = EXCLUDED.costeach
@@ -417,9 +430,9 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
SUM(COALESCE(od.discount, 0)) as promo_discount, SUM(COALESCE(od.discount, 0)) as promo_discount,
COALESCE(ot.tax, 0) as total_tax, COALESCE(ot.tax, 0) as total_tax,
COALESCE(oi.price * 0.5, 0) as costeach COALESCE(oi.price * 0.5, 0) as costeach
FROM debug_order_items oi FROM temp_order_items oi
LEFT JOIN debug_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid LEFT JOIN temp_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid
LEFT JOIN debug_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid LEFT JOIN temp_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid
GROUP BY oi.order_id, oi.pid, ot.tax GROUP BY oi.order_id, oi.pid, ot.tax
) )
SELECT SELECT
@@ -447,11 +460,11 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
FROM ( FROM (
SELECT DISTINCT ON (order_id, pid) SELECT DISTINCT ON (order_id, pid)
order_id, pid, SKU, price, quantity, base_discount order_id, pid, SKU, price, quantity, base_discount
FROM debug_order_items FROM temp_order_items
WHERE order_id = ANY($1) WHERE order_id = ANY($1)
ORDER BY order_id, pid ORDER BY order_id, pid
) oi ) oi
JOIN debug_order_meta om ON oi.order_id = om.order_id JOIN temp_order_meta om ON oi.order_id = om.order_id
LEFT JOIN order_totals ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid LEFT JOIN order_totals ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid
ORDER BY oi.order_id, oi.pid ORDER BY oi.order_id, oi.pid
`, [subBatchIds]); `, [subBatchIds]);
@@ -529,8 +542,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
`, batchValues); `, batchValues);
const { inserted, updated } = result.rows[0]; const { inserted, updated } = result.rows[0];
recordsAdded += inserted; recordsAdded += parseInt(inserted) || 0;
recordsUpdated += updated; recordsUpdated += parseInt(updated) || 0;
importedCount += subBatch.length; importedCount += subBatch.length;
} }
@@ -556,18 +569,38 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
last_sync_timestamp = NOW() last_sync_timestamp = NOW()
`); `);
// Cleanup temporary tables
await localConnection.query(`
DROP TABLE IF EXISTS temp_order_items;
DROP TABLE IF EXISTS temp_order_meta;
DROP TABLE IF EXISTS temp_order_discounts;
DROP TABLE IF EXISTS temp_order_taxes;
DROP TABLE IF EXISTS temp_order_costs;
`);
// Commit transaction
await localConnection.commit();
return { return {
status: "complete", status: "complete",
totalImported: Math.floor(importedCount), totalImported: Math.floor(importedCount) || 0,
recordsAdded: recordsAdded || 0, recordsAdded: parseInt(recordsAdded) || 0,
recordsUpdated: Math.floor(recordsUpdated), recordsUpdated: parseInt(recordsUpdated) || 0,
totalSkipped: skippedOrders.size, totalSkipped: skippedOrders.size || 0,
missingProducts: missingProducts.size, missingProducts: missingProducts.size || 0,
incrementalUpdate, incrementalUpdate,
lastSyncTime lastSyncTime
}; };
} catch (error) { } catch (error) {
console.error("Error during orders import:", error); console.error("Error during orders import:", error);
// Rollback transaction
try {
await localConnection.rollback();
} catch (rollbackError) {
console.error("Error during rollback:", rollbackError);
}
throw error; throw error;
} }
} }

View File

@@ -2,9 +2,12 @@ const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } =
const BATCH_SIZE = 100; // Smaller batch size for better progress tracking const BATCH_SIZE = 100; // Smaller batch size for better progress tracking
const MAX_RETRIES = 3; const MAX_RETRIES = 3;
const RETRY_DELAY = 5000; // 5 seconds const RETRY_DELAY = 5000; // 5 seconds
const dotenv = require("dotenv");
const path = require("path");
dotenv.config({ path: path.join(__dirname, "../../.env") });
// Utility functions // Utility functions
const imageUrlBase = 'https://sbing.com/i/products/0000/'; const imageUrlBase = process.env.PRODUCT_IMAGE_URL_BASE || 'https://sbing.com/i/products/0000/';
const getImageUrls = (pid, iid = 1) => { const getImageUrls = (pid, iid = 1) => {
const paddedPid = pid.toString().padStart(6, '0'); const paddedPid = pid.toString().padStart(6, '0');
// Use padded PID only for the first 3 digits // Use padded PID only for the first 3 digits
@@ -18,7 +21,7 @@ const getImageUrls = (pid, iid = 1) => {
}; };
}; };
// Add helper function for retrying operations // Add helper function for retrying operations with exponential backoff
async function withRetry(operation, errorMessage) { async function withRetry(operation, errorMessage) {
let lastError; let lastError;
for (let attempt = 1; attempt <= MAX_RETRIES; attempt++) { for (let attempt = 1; attempt <= MAX_RETRIES; attempt++) {
@@ -28,7 +31,8 @@ async function withRetry(operation, errorMessage) {
lastError = error; lastError = error;
console.error(`${errorMessage} (Attempt ${attempt}/${MAX_RETRIES}):`, error); console.error(`${errorMessage} (Attempt ${attempt}/${MAX_RETRIES}):`, error);
if (attempt < MAX_RETRIES) { if (attempt < MAX_RETRIES) {
await new Promise(resolve => setTimeout(resolve, RETRY_DELAY)); const backoffTime = RETRY_DELAY * Math.pow(2, attempt - 1);
await new Promise(resolve => setTimeout(resolve, backoffTime));
} }
} }
} }
@@ -772,32 +776,44 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
recordsAdded += parseInt(result.rows[0].inserted, 10) || 0; recordsAdded += parseInt(result.rows[0].inserted, 10) || 0;
recordsUpdated += parseInt(result.rows[0].updated, 10) || 0; recordsUpdated += parseInt(result.rows[0].updated, 10) || 0;
// Process category relationships for each product in the batch // Process category relationships in batches
const allCategories = [];
for (const row of batch) { for (const row of batch) {
if (row.categories) { if (row.categories) {
const categoryIds = row.categories.split(',').filter(id => id && id.trim()); const categoryIds = row.categories.split(',').filter(id => id && id.trim());
if (categoryIds.length > 0) { if (categoryIds.length > 0) {
const catPlaceholders = categoryIds.map((_, idx) => categoryIds.forEach(catId => {
`($${idx * 2 + 1}, $${idx * 2 + 2})` allCategories.push([row.pid, parseInt(catId.trim(), 10)]);
).join(','); });
const catValues = categoryIds.flatMap(catId => [row.pid, parseInt(catId.trim(), 10)]);
// First delete existing relationships for this product
await localConnection.query(
'DELETE FROM product_categories WHERE pid = $1',
[row.pid]
);
// Then insert the new relationships
await localConnection.query(`
INSERT INTO product_categories (pid, cat_id)
VALUES ${catPlaceholders}
ON CONFLICT (pid, cat_id) DO NOTHING
`, catValues);
} }
} }
} }
// If we have categories to process
if (allCategories.length > 0) {
// First get all products in this batch
const productIds = batch.map(p => p.pid);
// Delete all existing relationships for products in this batch
await localConnection.query(
'DELETE FROM product_categories WHERE pid = ANY($1)',
[productIds]
);
// Insert all new relationships in one batch
const catPlaceholders = allCategories.map((_, idx) =>
`($${idx * 2 + 1}, $${idx * 2 + 2})`
).join(',');
const catValues = allCategories.flat();
await localConnection.query(`
INSERT INTO product_categories (pid, cat_id)
VALUES ${catPlaceholders}
ON CONFLICT (pid, cat_id) DO NOTHING
`, catValues);
}
outputProgress({ outputProgress({
status: "running", status: "running",
operation: "Products import", operation: "Products import",

View File

@@ -6,6 +6,9 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
let recordsUpdated = 0; let recordsUpdated = 0;
try { try {
// Begin transaction for the entire import process
await localConnection.beginTransaction();
// Get last sync info // Get last sync info
const [syncInfo] = await localConnection.query( const [syncInfo] = await localConnection.query(
"SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'purchase_orders'" "SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'purchase_orders'"
@@ -14,12 +17,10 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
console.log('Purchase Orders: Using last sync time:', lastSyncTime); console.log('Purchase Orders: Using last sync time:', lastSyncTime);
// Create temporary tables with PostgreSQL syntax // Create temp tables
await localConnection.query(` await localConnection.query(`
DROP TABLE IF EXISTS temp_purchase_orders; DROP TABLE IF EXISTS temp_purchase_orders;
DROP TABLE IF EXISTS temp_po_receivings; CREATE TABLE temp_purchase_orders (
CREATE TEMP TABLE temp_purchase_orders (
po_id INTEGER NOT NULL, po_id INTEGER NOT NULL,
pid INTEGER NOT NULL, pid INTEGER NOT NULL,
sku VARCHAR(50), sku VARCHAR(50),
@@ -33,60 +34,14 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
cost_price DECIMAL(10,3), cost_price DECIMAL(10,3),
PRIMARY KEY (po_id, pid) PRIMARY KEY (po_id, pid)
); );
CREATE TEMP TABLE temp_po_receivings (
po_id INTEGER,
pid INTEGER NOT NULL,
receiving_id INTEGER NOT NULL,
qty_each INTEGER,
cost_each DECIMAL(10,3),
received_date TIMESTAMP WITH TIME ZONE,
received_by INTEGER,
received_by_name VARCHAR(255),
is_alt_po INTEGER,
PRIMARY KEY (receiving_id, pid)
);
`); `);
outputProgress({
operation: `Starting ${incrementalUpdate ? 'incremental' : 'full'} purchase orders import`,
status: "running",
});
// Get column names - Keep MySQL compatible for production
const [columns] = await prodConnection.query(`
SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = 'purchase_orders'
AND COLUMN_NAME != 'updated' -- Exclude the updated column
ORDER BY ORDINAL_POSITION
`);
const columnNames = columns.map(col => col.COLUMN_NAME);
// Build incremental conditions
const incrementalWhereClause = incrementalUpdate
? `AND (
p.date_updated > ?
OR p.date_ordered > ?
OR p.date_estin > ?
OR r.date_updated > ?
OR r.date_created > ?
OR r.date_checked > ?
OR rp.stamp > ?
OR rp.received_date > ?
)`
: "";
const incrementalParams = incrementalUpdate
? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime]
: [];
// First get all relevant PO IDs with basic info - Keep MySQL compatible for production // First get all relevant PO IDs with basic info - Keep MySQL compatible for production
const [[{ total }]] = await prodConnection.query(` const [[{ total }]] = await prodConnection.query(`
SELECT COUNT(*) as total SELECT COUNT(*) as total
FROM ( FROM (
SELECT DISTINCT pop.po_id, pop.pid SELECT DISTINCT pop.po_id, pop.pid
FROM po p FROM po p
USE INDEX (idx_date_created)
JOIN po_products pop ON p.po_id = pop.po_id JOIN po_products pop ON p.po_id = pop.po_id
JOIN suppliers s ON p.supplier_id = s.supplierid JOIN suppliers s ON p.supplier_id = s.supplierid
WHERE p.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) WHERE p.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR)
@@ -97,520 +52,165 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
OR p.date_estin > ? OR p.date_estin > ?
) )
` : ''} ` : ''}
UNION
SELECT DISTINCT r.receiving_id as po_id, rp.pid
FROM receivings_products rp
USE INDEX (received_date)
LEFT JOIN receivings r ON r.receiving_id = rp.receiving_id
WHERE rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR)
${incrementalUpdate ? `
AND (
r.date_created > ?
OR r.date_checked > ?
OR rp.stamp > ?
OR rp.received_date > ?
)
` : ''}
) all_items ) all_items
`, incrementalUpdate ? [ `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []);
lastSyncTime, lastSyncTime, lastSyncTime, // PO conditions
lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime // Receiving conditions
] : []);
console.log('Purchase Orders: Found changes:', total); console.log('Purchase Orders: Found changes:', total);
// Get PO list - Keep MySQL compatible for production // Get PO list - Keep MySQL compatible for production
const [poList] = await prodConnection.query(` console.log('Fetching purchase orders in batches...');
SELECT DISTINCT
COALESCE(p.po_id, r.receiving_id) as po_id,
COALESCE(
NULLIF(s1.companyname, ''),
NULLIF(s2.companyname, ''),
'Unknown Vendor'
) as vendor,
CASE
WHEN p.po_id IS NOT NULL THEN
COALESCE(
NULLIF(p.date_ordered, '0000-00-00 00:00:00'),
p.date_created
)
WHEN r.receiving_id IS NOT NULL THEN
r.date_created
END as date,
CASE
WHEN p.date_estin = '0000-00-00' THEN NULL
WHEN p.date_estin IS NULL THEN NULL
WHEN p.date_estin NOT REGEXP '^[0-9]{4}-[0-9]{2}-[0-9]{2}$' THEN NULL
ELSE p.date_estin
END as expected_date,
COALESCE(p.status, 50) as status,
p.short_note as notes,
p.notes as long_note
FROM (
SELECT po_id FROM po
USE INDEX (idx_date_created)
WHERE date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR)
${incrementalUpdate ? `
AND (
date_ordered > ?
OR date_updated > ?
OR date_estin > ?
)
` : ''}
UNION
SELECT DISTINCT r.receiving_id as po_id
FROM receivings r
JOIN receivings_products rp USE INDEX (received_date) ON r.receiving_id = rp.receiving_id
WHERE rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR)
${incrementalUpdate ? `
AND (
r.date_created > ?
OR r.date_checked > ?
OR rp.stamp > ?
OR rp.received_date > ?
)
` : ''}
) ids
LEFT JOIN po p ON ids.po_id = p.po_id
LEFT JOIN suppliers s1 ON p.supplier_id = s1.supplierid
LEFT JOIN receivings r ON ids.po_id = r.receiving_id
LEFT JOIN suppliers s2 ON r.supplier_id = s2.supplierid
ORDER BY po_id
`, incrementalUpdate ? [
lastSyncTime, lastSyncTime, lastSyncTime, // PO conditions
lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime // Receiving conditions
] : []);
console.log('Sample PO dates:', poList.slice(0, 5).map(po => ({ const FETCH_BATCH_SIZE = 5000;
po_id: po.po_id, const INSERT_BATCH_SIZE = 200; // Process 200 records at a time for inserts
raw_date_ordered: po.raw_date_ordered, let offset = 0;
raw_date_created: po.raw_date_created, let allProcessed = false;
raw_date_estin: po.raw_date_estin,
computed_date: po.date,
expected_date: po.expected_date
})));
const totalItems = total;
let processed = 0;
const BATCH_SIZE = 5000;
const PROGRESS_INTERVAL = 500;
let lastProgressUpdate = Date.now();
outputProgress({
operation: `Starting purchase orders import - Processing ${totalItems} purchase order items`,
status: "running",
});
for (let i = 0; i < poList.length; i += BATCH_SIZE) {
const batch = poList.slice(i, Math.min(i + BATCH_SIZE, poList.length));
const poIds = batch.map(po => po.po_id);
// Get all products for these POs in one query - Keep MySQL compatible for production
const [poProducts] = await prodConnection.query(`
SELECT
pop.po_id,
pop.pid,
pr.itemnumber as sku,
pr.description as name,
pop.cost_each as cost_price,
pop.qty_each as ordered
FROM po_products pop
USE INDEX (PRIMARY)
JOIN products pr ON pop.pid = pr.pid
WHERE pop.po_id IN (?)
`, [poIds]);
// Process PO products in smaller sub-batches to avoid packet size issues
const SUB_BATCH_SIZE = 5000;
for (let j = 0; j < poProducts.length; j += SUB_BATCH_SIZE) {
const productBatch = poProducts.slice(j, j + SUB_BATCH_SIZE);
const productPids = [...new Set(productBatch.map(p => p.pid))];
const batchPoIds = [...new Set(productBatch.map(p => p.po_id))];
// Get receivings for this batch with employee names
const [receivings] = await prodConnection.query(`
SELECT
r.po_id,
rp.pid,
rp.receiving_id,
rp.qty_each,
rp.cost_each,
COALESCE(rp.received_date, r.date_created) as received_date,
rp.received_by,
CONCAT(e.firstname, ' ', e.lastname) as received_by_name,
CASE
WHEN r.po_id IS NULL THEN 2 -- No PO
WHEN r.po_id IN (?) THEN 0 -- Original PO
ELSE 1 -- Different PO
END as is_alt_po
FROM receivings_products rp
USE INDEX (received_date)
LEFT JOIN receivings r ON r.receiving_id = rp.receiving_id
LEFT JOIN employees e ON rp.received_by = e.employeeid
WHERE rp.pid IN (?)
AND rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR)
ORDER BY r.po_id, rp.pid, rp.received_date
`, [batchPoIds, productPids]);
// Insert receivings into temp table
if (receivings.length > 0) {
// Process in smaller chunks to avoid parameter limits
const CHUNK_SIZE = 100; // Reduce chunk size to avoid parameter limits
for (let i = 0; i < receivings.length; i += CHUNK_SIZE) {
const chunk = receivings.slice(i, Math.min(i + CHUNK_SIZE, receivings.length));
const values = [];
const placeholders = [];
chunk.forEach((r, idx) => {
values.push(
r.po_id,
r.pid,
r.receiving_id,
r.qty_each,
r.cost_each,
r.received_date,
r.received_by,
r.received_by_name || null,
r.is_alt_po
);
const offset = idx * 9;
placeholders.push(`($${offset + 1}, $${offset + 2}, $${offset + 3}, $${offset + 4}, $${offset + 5}, $${offset + 6}, $${offset + 7}, $${offset + 8}, $${offset + 9})`);
});
await localConnection.query(`
INSERT INTO temp_po_receivings (
po_id, pid, receiving_id, qty_each, cost_each, received_date,
received_by, received_by_name, is_alt_po
)
VALUES ${placeholders.join(',')}
ON CONFLICT (receiving_id, pid) DO UPDATE SET
po_id = EXCLUDED.po_id,
qty_each = EXCLUDED.qty_each,
cost_each = EXCLUDED.cost_each,
received_date = EXCLUDED.received_date,
received_by = EXCLUDED.received_by,
received_by_name = EXCLUDED.received_by_name,
is_alt_po = EXCLUDED.is_alt_po
`, values);
}
}
// Process each PO product in chunks
const PRODUCT_CHUNK_SIZE = 100;
for (let i = 0; i < productBatch.length; i += PRODUCT_CHUNK_SIZE) {
const chunk = productBatch.slice(i, Math.min(i + PRODUCT_CHUNK_SIZE, productBatch.length));
const values = [];
const placeholders = [];
chunk.forEach((product, idx) => {
const po = batch.find(p => p.po_id === product.po_id);
if (!po) return;
values.push(
product.po_id,
product.pid,
product.sku,
product.name,
po.vendor,
po.date,
po.expected_date,
po.status,
po.notes || po.long_note,
product.ordered,
product.cost_price
);
const offset = idx * 11; // Updated to match 11 fields
placeholders.push(`($${offset + 1}, $${offset + 2}, $${offset + 3}, $${offset + 4}, $${offset + 5}, $${offset + 6}, $${offset + 7}, $${offset + 8}, $${offset + 9}, $${offset + 10}, $${offset + 11})`);
});
if (placeholders.length > 0) {
await localConnection.query(`
INSERT INTO temp_purchase_orders (
po_id, pid, sku, name, vendor, date, expected_date,
status, notes, ordered, cost_price
)
VALUES ${placeholders.join(',')}
ON CONFLICT (po_id, pid) DO UPDATE SET
sku = EXCLUDED.sku,
name = EXCLUDED.name,
vendor = EXCLUDED.vendor,
date = EXCLUDED.date,
expected_date = EXCLUDED.expected_date,
status = EXCLUDED.status,
notes = EXCLUDED.notes,
ordered = EXCLUDED.ordered,
cost_price = EXCLUDED.cost_price
`, values);
}
processed += chunk.length;
// Update progress based on time interval
const now = Date.now();
if (now - lastProgressUpdate >= PROGRESS_INTERVAL || processed === totalItems) {
outputProgress({
status: "running",
operation: "Purchase orders import",
current: processed,
total: totalItems,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, processed, totalItems),
rate: calculateRate(startTime, processed)
});
lastProgressUpdate = now;
}
}
}
}
// Insert final data into purchase_orders table in chunks
const FINAL_CHUNK_SIZE = 1000;
let totalProcessed = 0; let totalProcessed = 0;
const totalPosResult = await localConnection.query('SELECT COUNT(*) as total_pos FROM temp_purchase_orders');
const total_pos = parseInt(totalPosResult.rows?.[0]?.total_pos || '0', 10);
outputProgress({ while (!allProcessed) {
status: "running", console.log(`Fetching batch at offset ${offset}...`);
operation: "Purchase orders final import", const [poList] = await prodConnection.query(`
message: `Processing ${total_pos} purchase orders for final import`, SELECT DISTINCT
current: 0, COALESCE(p.po_id, 0) as po_id,
total: total_pos pop.pid,
}); COALESCE(NULLIF(pr.itemnumber, ''), 'NO-SKU') as sku,
COALESCE(pr.description, 'Unknown Product') as name,
COALESCE(NULLIF(s.companyname, ''), 'Unknown Vendor') as vendor,
COALESCE(p.date_ordered, p.date_created) as date,
p.date_estin as expected_date,
COALESCE(p.status, 1) as status,
COALESCE(p.short_note, p.notes) as notes,
pop.qty_each as ordered,
pop.cost_each as cost_price
FROM po p
JOIN po_products pop ON p.po_id = pop.po_id
JOIN products pr ON pop.pid = pr.pid
LEFT JOIN suppliers s ON p.supplier_id = s.supplierid
WHERE p.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR)
${incrementalUpdate ? `
AND (
p.date_updated > ?
OR p.date_ordered > ?
OR p.date_estin > ?
)
` : ''}
ORDER BY p.po_id, pop.pid
LIMIT ${FETCH_BATCH_SIZE} OFFSET ${offset}
`, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []);
// Process in chunks using cursor-based pagination if (poList.length === 0) {
let lastPoId = 0; allProcessed = true;
let lastPid = 0;
let recordsAdded = 0;
let recordsUpdated = 0;
while (true) {
console.log('Fetching next chunk with lastPoId:', lastPoId, 'lastPid:', lastPid);
const chunkResult = await localConnection.query(`
SELECT po_id, pid FROM temp_purchase_orders
WHERE (po_id, pid) > ($1, $2)
ORDER BY po_id, pid
LIMIT $3
`, [lastPoId, lastPid, FINAL_CHUNK_SIZE]);
if (!chunkResult?.rows) {
console.error('No rows returned from chunk query:', chunkResult);
break; break;
} }
const chunk = chunkResult.rows; console.log(`Processing batch of ${poList.length} purchase order items (${offset}-${offset + poList.length})`);
console.log('Got chunk of size:', chunk.length);
if (chunk.length === 0) break;
const result = await localConnection.query(` // Process in smaller batches for inserts
WITH inserted_pos AS ( for (let i = 0; i < poList.length; i += INSERT_BATCH_SIZE) {
INSERT INTO purchase_orders ( const batch = poList.slice(i, Math.min(i + INSERT_BATCH_SIZE, poList.length));
po_id, pid, sku, name, cost_price, po_cost_price,
vendor, date, expected_date, status, notes, // Create parameterized query with placeholders
ordered, received, receiving_status, const placeholders = batch.map((_, idx) => {
received_date, last_received_date, received_by, const base = idx * 11; // 11 columns
receiving_history return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, $${base + 10}, $${base + 11})`;
}).join(',');
// Create flattened values array
const values = batch.flatMap(po => [
po.po_id,
po.pid,
po.sku,
po.name,
po.vendor,
po.date,
po.expected_date,
po.status,
po.notes,
po.ordered,
po.cost_price
]);
// Execute batch insert
await localConnection.query(`
INSERT INTO temp_purchase_orders (
po_id, pid, sku, name, vendor, date, expected_date,
status, notes, ordered, cost_price
) )
SELECT VALUES ${placeholders}
po.po_id,
po.pid,
po.sku,
po.name,
COALESCE(
(
SELECT cost_each
FROM temp_po_receivings r2
WHERE r2.pid = po.pid
AND r2.po_id = po.po_id
AND r2.is_alt_po = 0
AND r2.cost_each > 0
ORDER BY r2.received_date
LIMIT 1
),
po.cost_price
) as cost_price,
po.cost_price as po_cost_price,
po.vendor,
po.date,
po.expected_date,
po.status,
po.notes,
po.ordered,
COALESCE(SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END), 0) as received,
CASE
WHEN COUNT(r.receiving_id) = 0 THEN 1 -- created
WHEN SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END) < po.ordered THEN 30 -- partial
ELSE 40 -- full
END as receiving_status,
MIN(CASE WHEN r.is_alt_po = 0 THEN r.received_date END) as received_date,
MAX(CASE WHEN r.is_alt_po = 0 THEN r.received_date END) as last_received_date,
(
SELECT r2.received_by_name
FROM temp_po_receivings r2
WHERE r2.pid = po.pid
AND r2.is_alt_po = 0
ORDER BY r2.received_date
LIMIT 1
) as received_by,
jsonb_build_object(
'ordered_qty', po.ordered,
'total_received', COALESCE(SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END), 0),
'remaining_unfulfilled', GREATEST(0, po.ordered - COALESCE(SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END), 0)),
'excess_received', GREATEST(0, COALESCE(SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END), 0) - po.ordered),
'po_cost', po.cost_price,
'actual_cost', COALESCE(
(
SELECT cost_each
FROM temp_po_receivings r2
WHERE r2.pid = po.pid
AND r2.is_alt_po = 0
AND r2.cost_each > 0
ORDER BY r2.received_date
LIMIT 1
),
po.cost_price
),
'fulfillment', (
SELECT jsonb_agg(
jsonb_build_object(
'receiving_id', r2.receiving_id,
'qty_applied', CASE
WHEN r2.running_total <= po.ordered THEN r2.qty_each
WHEN r2.running_total - r2.qty_each < po.ordered THEN po.ordered - (r2.running_total - r2.qty_each)
ELSE 0
END,
'qty_total', r2.qty_each,
'cost', r2.cost_each,
'date', r2.received_date,
'received_by', r2.received_by,
'received_by_name', r2.received_by_name,
'type', CASE r2.is_alt_po
WHEN 0 THEN 'original'
WHEN 1 THEN 'alternate'
ELSE 'no_po'
END,
'remaining_qty', CASE
WHEN r2.running_total <= po.ordered THEN 0
WHEN r2.running_total - r2.qty_each < po.ordered THEN r2.running_total - po.ordered
ELSE r2.qty_each
END,
'is_excess', r2.running_total > po.ordered
)
ORDER BY r2.received_date
)
FROM (
SELECT
r2.*,
SUM(r2.qty_each) OVER (
PARTITION BY r2.pid
ORDER BY r2.received_date
ROWS UNBOUNDED PRECEDING
) as running_total
FROM temp_po_receivings r2
WHERE r2.pid = po.pid
) r2
),
'alternate_po_receivings', (
SELECT jsonb_agg(
jsonb_build_object(
'receiving_id', r2.receiving_id,
'qty', r2.qty_each,
'cost', r2.cost_each,
'date', r2.received_date,
'received_by', r2.received_by,
'received_by_name', r2.received_by_name
)
ORDER BY r2.received_date
)
FROM temp_po_receivings r2
WHERE r2.pid = po.pid AND r2.is_alt_po = 1
),
'no_po_receivings', (
SELECT jsonb_agg(
jsonb_build_object(
'receiving_id', r2.receiving_id,
'qty', r2.qty_each,
'cost', r2.cost_each,
'date', r2.received_date,
'received_by', r2.received_by,
'received_by_name', r2.received_by_name
)
ORDER BY r2.received_date
)
FROM temp_po_receivings r2
WHERE r2.pid = po.pid AND r2.is_alt_po = 2
)
) as receiving_history
FROM temp_purchase_orders po
LEFT JOIN temp_po_receivings r ON po.pid = r.pid
WHERE (po.po_id, po.pid) IN (
SELECT po_id, pid FROM UNNEST($1::int[], $2::int[])
)
GROUP BY po.po_id, po.pid, po.sku, po.name, po.vendor, po.date,
po.expected_date, po.status, po.notes, po.ordered, po.cost_price
ON CONFLICT (po_id, pid) DO UPDATE SET ON CONFLICT (po_id, pid) DO UPDATE SET
sku = EXCLUDED.sku,
name = EXCLUDED.name,
vendor = EXCLUDED.vendor, vendor = EXCLUDED.vendor,
date = EXCLUDED.date, date = EXCLUDED.date,
expected_date = EXCLUDED.expected_date, expected_date = EXCLUDED.expected_date,
status = EXCLUDED.status, status = EXCLUDED.status,
notes = EXCLUDED.notes, notes = EXCLUDED.notes,
ordered = EXCLUDED.ordered, ordered = EXCLUDED.ordered,
received = EXCLUDED.received, cost_price = EXCLUDED.cost_price
receiving_status = EXCLUDED.receiving_status, `, values);
received_date = EXCLUDED.received_date,
last_received_date = EXCLUDED.last_received_date,
received_by = EXCLUDED.received_by,
receiving_history = EXCLUDED.receiving_history,
cost_price = EXCLUDED.cost_price,
po_cost_price = EXCLUDED.po_cost_price
RETURNING xmax
)
SELECT
COUNT(*) FILTER (WHERE xmax = 0) as inserted,
COUNT(*) FILTER (WHERE xmax <> 0) as updated
FROM inserted_pos
`, [
chunk.map(r => r.po_id),
chunk.map(r => r.pid)
]);
// Add debug logging totalProcessed += batch.length;
console.log('Insert result:', result?.rows?.[0]);
// Handle the result properly for PostgreSQL with more defensive coding outputProgress({
const resultRow = result?.rows?.[0] || {}; status: "running",
const insertCount = parseInt(resultRow.inserted || '0', 10); operation: "Purchase orders import",
const updateCount = parseInt(resultRow.updated || '0', 10); message: `Processed ${totalProcessed}/${total} purchase order items`,
current: totalProcessed,
total: total,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, totalProcessed, total),
rate: calculateRate(startTime, totalProcessed)
});
}
recordsAdded += insertCount; // Update offset for next batch
recordsUpdated += updateCount; offset += poList.length;
totalProcessed += chunk.length;
// Update progress // Check if we've received fewer records than the batch size, meaning we're done
outputProgress({ if (poList.length < FETCH_BATCH_SIZE) {
status: "running", allProcessed = true;
operation: "Purchase orders final import",
message: `Processed ${totalProcessed} of ${total_pos} purchase orders`,
current: totalProcessed,
total: total_pos,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, totalProcessed, total_pos),
rate: calculateRate(startTime, totalProcessed)
});
// Update last processed IDs for next chunk with safety check
if (chunk.length > 0) {
const lastItem = chunk[chunk.length - 1];
if (lastItem) {
lastPoId = lastItem.po_id;
lastPid = lastItem.pid;
}
} }
} }
// Count the temp table contents
const [tempCount] = await localConnection.query(`SELECT COUNT(*) FROM temp_purchase_orders`);
const tempRowCount = parseInt(tempCount.rows[0].count);
console.log(`Successfully inserted ${tempRowCount} rows into temp_purchase_orders`);
// Now insert into the final table
const [result] = await localConnection.query(`
WITH inserted_pos AS (
INSERT INTO purchase_orders (
po_id, pid, sku, name, cost_price, po_cost_price,
vendor, date, expected_date, status, notes,
ordered, received, receiving_status
)
SELECT
po_id, pid, sku, name, cost_price, cost_price,
vendor, date, expected_date, status, notes,
ordered, 0 as received, 1 as receiving_status
FROM temp_purchase_orders
ON CONFLICT (po_id, pid) DO UPDATE SET
vendor = EXCLUDED.vendor,
date = EXCLUDED.date,
expected_date = EXCLUDED.expected_date,
status = EXCLUDED.status,
notes = EXCLUDED.notes,
ordered = EXCLUDED.ordered,
cost_price = EXCLUDED.cost_price,
po_cost_price = EXCLUDED.po_cost_price
RETURNING xmax = 0 as inserted
)
SELECT
COUNT(*) FILTER (WHERE inserted) as inserted,
COUNT(*) FILTER (WHERE NOT inserted) as updated
FROM inserted_pos
`);
// Parse the result
const { inserted, updated } = result.rows[0];
recordsAdded = parseInt(inserted) || 0;
recordsUpdated = parseInt(updated) || 0;
// Update sync status // Update sync status
await localConnection.query(` await localConnection.query(`
INSERT INTO sync_status (table_name, last_sync_timestamp) INSERT INTO sync_status (table_name, last_sync_timestamp)
@@ -620,29 +220,34 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
`); `);
// Clean up temporary tables // Clean up temporary tables
await localConnection.query(` await localConnection.query(`DROP TABLE IF EXISTS temp_purchase_orders;`);
DROP TABLE IF EXISTS temp_purchase_orders;
DROP TABLE IF EXISTS temp_po_receivings; // Commit transaction
`); await localConnection.commit();
return { return {
status: "complete", status: "complete",
recordsAdded, recordsAdded: recordsAdded || 0,
recordsUpdated, recordsUpdated: recordsUpdated || 0,
totalRecords: processed totalRecords: totalProcessed
}; };
} catch (error) { } catch (error) {
console.error("Error during purchase orders import:", error); console.error("Error during purchase orders import:", error);
// Attempt cleanup on error
// Rollback transaction
try { try {
await localConnection.query(` await localConnection.rollback();
DROP TABLE IF EXISTS temp_purchase_orders; } catch (rollbackError) {
DROP TABLE IF EXISTS temp_po_receivings; console.error('Error during rollback:', rollbackError.message);
`);
} catch (cleanupError) {
console.error('Error during cleanup:', cleanupError);
} }
throw error;
return {
status: "error",
error: error.message,
recordsAdded: 0,
recordsUpdated: 0,
totalRecords: 0
};
} }
} }

View File

@@ -184,7 +184,7 @@ async function resetDatabase() {
SELECT string_agg(tablename, ', ') as tables SELECT string_agg(tablename, ', ') as tables
FROM pg_tables FROM pg_tables
WHERE schemaname = 'public' WHERE schemaname = 'public'
AND tablename NOT IN ('users', 'permissions', 'user_permissions', 'calculate_history', 'import_history'); AND tablename NOT IN ('users', 'permissions', 'user_permissions', 'calculate_history', 'import_history', 'ai_prompts', 'ai_validation_performance', 'templates');
`); `);
if (!tablesResult.rows[0].tables) { if (!tablesResult.rows[0].tables) {

View File

@@ -757,8 +757,8 @@ router.get('/history/import', async (req, res) => {
end_time, end_time,
status, status,
error_message, error_message,
rows_processed::integer, records_added::integer,
files_processed::integer records_updated::integer
FROM import_history FROM import_history
ORDER BY start_time DESC ORDER BY start_time DESC
LIMIT 20 LIMIT 20