Compare commits
2 Commits
00a02aa788
...
87d4b9e804
| Author | SHA1 | Date | |
|---|---|---|---|
| 87d4b9e804 | |||
| 75da2c6772 |
@@ -120,27 +120,38 @@ async function main() {
|
|||||||
`);
|
`);
|
||||||
|
|
||||||
// Create import history record for the overall session
|
// Create import history record for the overall session
|
||||||
const [historyResult] = await localConnection.query(`
|
try {
|
||||||
INSERT INTO import_history (
|
const [historyResult] = await localConnection.query(`
|
||||||
table_name,
|
INSERT INTO import_history (
|
||||||
start_time,
|
table_name,
|
||||||
is_incremental,
|
start_time,
|
||||||
status,
|
is_incremental,
|
||||||
additional_info
|
status,
|
||||||
) VALUES (
|
additional_info
|
||||||
'all_tables',
|
) VALUES (
|
||||||
NOW(),
|
'all_tables',
|
||||||
$1::boolean,
|
NOW(),
|
||||||
'running',
|
$1::boolean,
|
||||||
jsonb_build_object(
|
'running',
|
||||||
'categories_enabled', $2::boolean,
|
jsonb_build_object(
|
||||||
'products_enabled', $3::boolean,
|
'categories_enabled', $2::boolean,
|
||||||
'orders_enabled', $4::boolean,
|
'products_enabled', $3::boolean,
|
||||||
'purchase_orders_enabled', $5::boolean
|
'orders_enabled', $4::boolean,
|
||||||
)
|
'purchase_orders_enabled', $5::boolean
|
||||||
) RETURNING id
|
)
|
||||||
`, [INCREMENTAL_UPDATE, IMPORT_CATEGORIES, IMPORT_PRODUCTS, IMPORT_ORDERS, IMPORT_PURCHASE_ORDERS]);
|
) RETURNING id
|
||||||
importHistoryId = historyResult.rows[0].id;
|
`, [INCREMENTAL_UPDATE, IMPORT_CATEGORIES, IMPORT_PRODUCTS, IMPORT_ORDERS, IMPORT_PURCHASE_ORDERS]);
|
||||||
|
importHistoryId = historyResult.rows[0].id;
|
||||||
|
} catch (error) {
|
||||||
|
console.error("Error creating import history record:", error);
|
||||||
|
outputProgress({
|
||||||
|
status: "error",
|
||||||
|
operation: "Import process",
|
||||||
|
message: "Failed to create import history record",
|
||||||
|
error: error.message
|
||||||
|
});
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
|
||||||
const results = {
|
const results = {
|
||||||
categories: null,
|
categories: null,
|
||||||
@@ -158,8 +169,8 @@ async function main() {
|
|||||||
if (isImportCancelled) throw new Error("Import cancelled");
|
if (isImportCancelled) throw new Error("Import cancelled");
|
||||||
completedSteps++;
|
completedSteps++;
|
||||||
console.log('Categories import result:', results.categories);
|
console.log('Categories import result:', results.categories);
|
||||||
totalRecordsAdded += parseInt(results.categories?.recordsAdded || 0);
|
totalRecordsAdded += parseInt(results.categories?.recordsAdded || 0) || 0;
|
||||||
totalRecordsUpdated += parseInt(results.categories?.recordsUpdated || 0);
|
totalRecordsUpdated += parseInt(results.categories?.recordsUpdated || 0) || 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (IMPORT_PRODUCTS) {
|
if (IMPORT_PRODUCTS) {
|
||||||
@@ -167,8 +178,8 @@ async function main() {
|
|||||||
if (isImportCancelled) throw new Error("Import cancelled");
|
if (isImportCancelled) throw new Error("Import cancelled");
|
||||||
completedSteps++;
|
completedSteps++;
|
||||||
console.log('Products import result:', results.products);
|
console.log('Products import result:', results.products);
|
||||||
totalRecordsAdded += parseInt(results.products?.recordsAdded || 0);
|
totalRecordsAdded += parseInt(results.products?.recordsAdded || 0) || 0;
|
||||||
totalRecordsUpdated += parseInt(results.products?.recordsUpdated || 0);
|
totalRecordsUpdated += parseInt(results.products?.recordsUpdated || 0) || 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (IMPORT_ORDERS) {
|
if (IMPORT_ORDERS) {
|
||||||
@@ -176,17 +187,34 @@ async function main() {
|
|||||||
if (isImportCancelled) throw new Error("Import cancelled");
|
if (isImportCancelled) throw new Error("Import cancelled");
|
||||||
completedSteps++;
|
completedSteps++;
|
||||||
console.log('Orders import result:', results.orders);
|
console.log('Orders import result:', results.orders);
|
||||||
totalRecordsAdded += parseInt(results.orders?.recordsAdded || 0);
|
totalRecordsAdded += parseInt(results.orders?.recordsAdded || 0) || 0;
|
||||||
totalRecordsUpdated += parseInt(results.orders?.recordsUpdated || 0);
|
totalRecordsUpdated += parseInt(results.orders?.recordsUpdated || 0) || 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (IMPORT_PURCHASE_ORDERS) {
|
if (IMPORT_PURCHASE_ORDERS) {
|
||||||
results.purchaseOrders = await importPurchaseOrders(prodConnection, localConnection, INCREMENTAL_UPDATE);
|
try {
|
||||||
if (isImportCancelled) throw new Error("Import cancelled");
|
results.purchaseOrders = await importPurchaseOrders(prodConnection, localConnection, INCREMENTAL_UPDATE);
|
||||||
completedSteps++;
|
if (isImportCancelled) throw new Error("Import cancelled");
|
||||||
console.log('Purchase orders import result:', results.purchaseOrders);
|
completedSteps++;
|
||||||
totalRecordsAdded += parseInt(results.purchaseOrders?.recordsAdded || 0);
|
console.log('Purchase orders import result:', results.purchaseOrders);
|
||||||
totalRecordsUpdated += parseInt(results.purchaseOrders?.recordsUpdated || 0);
|
|
||||||
|
// Handle potential error status
|
||||||
|
if (results.purchaseOrders?.status === 'error') {
|
||||||
|
console.error('Purchase orders import had an error:', results.purchaseOrders.error);
|
||||||
|
} else {
|
||||||
|
totalRecordsAdded += parseInt(results.purchaseOrders?.recordsAdded || 0) || 0;
|
||||||
|
totalRecordsUpdated += parseInt(results.purchaseOrders?.recordsUpdated || 0) || 0;
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
console.error('Error during purchase orders import:', error);
|
||||||
|
// Continue with other imports, don't fail the whole process
|
||||||
|
results.purchaseOrders = {
|
||||||
|
status: 'error',
|
||||||
|
error: error.message,
|
||||||
|
recordsAdded: 0,
|
||||||
|
recordsUpdated: 0
|
||||||
|
};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const endTime = Date.now();
|
const endTime = Date.now();
|
||||||
@@ -214,8 +242,8 @@ async function main() {
|
|||||||
WHERE id = $12
|
WHERE id = $12
|
||||||
`, [
|
`, [
|
||||||
totalElapsedSeconds,
|
totalElapsedSeconds,
|
||||||
totalRecordsAdded,
|
parseInt(totalRecordsAdded) || 0,
|
||||||
totalRecordsUpdated,
|
parseInt(totalRecordsUpdated) || 0,
|
||||||
IMPORT_CATEGORIES,
|
IMPORT_CATEGORIES,
|
||||||
IMPORT_PRODUCTS,
|
IMPORT_PRODUCTS,
|
||||||
IMPORT_ORDERS,
|
IMPORT_ORDERS,
|
||||||
|
|||||||
@@ -47,42 +47,18 @@ async function importCategories(prodConnection, localConnection) {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
console.log(`\nProcessing ${categories.length} type ${type} categories`);
|
console.log(`Processing ${categories.length} type ${type} categories`);
|
||||||
if (type === 10) {
|
|
||||||
console.log("Type 10 categories:", JSON.stringify(categories, null, 2));
|
|
||||||
}
|
|
||||||
|
|
||||||
// For types that can have parents (11, 21, 12, 13), verify parent existence
|
// For types that can have parents (11, 21, 12, 13), we'll proceed directly
|
||||||
|
// No need to check for parent existence since we process in hierarchical order
|
||||||
let categoriesToInsert = categories;
|
let categoriesToInsert = categories;
|
||||||
if (![10, 20].includes(type)) {
|
|
||||||
// Get all parent IDs
|
|
||||||
const parentIds = [
|
|
||||||
...new Set(
|
|
||||||
categories
|
|
||||||
.filter(c => c && c.parent_id !== null)
|
|
||||||
.map(c => c.parent_id)
|
|
||||||
),
|
|
||||||
];
|
|
||||||
|
|
||||||
console.log(`Processing ${categories.length} type ${type} categories with ${parentIds.length} unique parent IDs`);
|
|
||||||
console.log('Parent IDs:', parentIds);
|
|
||||||
|
|
||||||
// No need to check for parent existence - we trust they exist since they were just inserted
|
|
||||||
categoriesToInsert = categories;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (categoriesToInsert.length === 0) {
|
if (categoriesToInsert.length === 0) {
|
||||||
console.log(
|
console.log(`No valid categories of type ${type} to insert`);
|
||||||
`No valid categories of type ${type} to insert`
|
|
||||||
);
|
|
||||||
await localConnection.query(`RELEASE SAVEPOINT category_type_${type}`);
|
await localConnection.query(`RELEASE SAVEPOINT category_type_${type}`);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
console.log(
|
|
||||||
`Inserting ${categoriesToInsert.length} type ${type} categories`
|
|
||||||
);
|
|
||||||
|
|
||||||
// PostgreSQL upsert query with parameterized values
|
// PostgreSQL upsert query with parameterized values
|
||||||
const values = categoriesToInsert.flatMap((cat) => [
|
const values = categoriesToInsert.flatMap((cat) => [
|
||||||
cat.cat_id,
|
cat.cat_id,
|
||||||
@@ -95,14 +71,10 @@ async function importCategories(prodConnection, localConnection) {
|
|||||||
new Date()
|
new Date()
|
||||||
]);
|
]);
|
||||||
|
|
||||||
console.log('Attempting to insert/update with values:', JSON.stringify(values, null, 2));
|
|
||||||
|
|
||||||
const placeholders = categoriesToInsert
|
const placeholders = categoriesToInsert
|
||||||
.map((_, i) => `($${i * 8 + 1}, $${i * 8 + 2}, $${i * 8 + 3}, $${i * 8 + 4}, $${i * 8 + 5}, $${i * 8 + 6}, $${i * 8 + 7}, $${i * 8 + 8})`)
|
.map((_, i) => `($${i * 8 + 1}, $${i * 8 + 2}, $${i * 8 + 3}, $${i * 8 + 4}, $${i * 8 + 5}, $${i * 8 + 6}, $${i * 8 + 7}, $${i * 8 + 8})`)
|
||||||
.join(',');
|
.join(',');
|
||||||
|
|
||||||
console.log('Using placeholders:', placeholders);
|
|
||||||
|
|
||||||
// Insert categories with ON CONFLICT clause for PostgreSQL
|
// Insert categories with ON CONFLICT clause for PostgreSQL
|
||||||
const query = `
|
const query = `
|
||||||
WITH inserted_categories AS (
|
WITH inserted_categories AS (
|
||||||
@@ -129,17 +101,14 @@ async function importCategories(prodConnection, localConnection) {
|
|||||||
COUNT(*) FILTER (WHERE is_insert) as inserted,
|
COUNT(*) FILTER (WHERE is_insert) as inserted,
|
||||||
COUNT(*) FILTER (WHERE NOT is_insert) as updated
|
COUNT(*) FILTER (WHERE NOT is_insert) as updated
|
||||||
FROM inserted_categories`;
|
FROM inserted_categories`;
|
||||||
|
|
||||||
console.log('Executing query:', query);
|
|
||||||
|
|
||||||
const result = await localConnection.query(query, values);
|
const result = await localConnection.query(query, values);
|
||||||
console.log('Query result:', result);
|
|
||||||
|
|
||||||
// Get the first result since query returns an array
|
// Get the first result since query returns an array
|
||||||
const queryResult = Array.isArray(result) ? result[0] : result;
|
const queryResult = Array.isArray(result) ? result[0] : result;
|
||||||
|
|
||||||
if (!queryResult || !queryResult.rows || !queryResult.rows[0]) {
|
if (!queryResult || !queryResult.rows || !queryResult.rows[0]) {
|
||||||
console.error('Query failed to return results. Result:', queryResult);
|
console.error('Query failed to return results');
|
||||||
throw new Error('Query did not return expected results');
|
throw new Error('Query did not return expected results');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -26,6 +26,9 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
|
|||||||
let cumulativeProcessedOrders = 0;
|
let cumulativeProcessedOrders = 0;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
// Begin transaction
|
||||||
|
await localConnection.beginTransaction();
|
||||||
|
|
||||||
// Get last sync info
|
// Get last sync info
|
||||||
const [syncInfo] = await localConnection.query(
|
const [syncInfo] = await localConnection.query(
|
||||||
"SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'orders'"
|
"SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'orders'"
|
||||||
@@ -38,7 +41,6 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
|
|||||||
const [[{ total }]] = await prodConnection.query(`
|
const [[{ total }]] = await prodConnection.query(`
|
||||||
SELECT COUNT(*) as total
|
SELECT COUNT(*) as total
|
||||||
FROM order_items oi
|
FROM order_items oi
|
||||||
USE INDEX (PRIMARY)
|
|
||||||
JOIN _order o ON oi.order_id = o.order_id
|
JOIN _order o ON oi.order_id = o.order_id
|
||||||
WHERE o.order_status >= 15
|
WHERE o.order_status >= 15
|
||||||
AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR)
|
AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR)
|
||||||
@@ -78,7 +80,6 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
|
|||||||
COALESCE(oi.prod_price_reg - oi.prod_price, 0) as base_discount,
|
COALESCE(oi.prod_price_reg - oi.prod_price, 0) as base_discount,
|
||||||
oi.stamp as last_modified
|
oi.stamp as last_modified
|
||||||
FROM order_items oi
|
FROM order_items oi
|
||||||
USE INDEX (PRIMARY)
|
|
||||||
JOIN _order o ON oi.order_id = o.order_id
|
JOIN _order o ON oi.order_id = o.order_id
|
||||||
WHERE o.order_status >= 15
|
WHERE o.order_status >= 15
|
||||||
AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR)
|
AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR)
|
||||||
@@ -105,15 +106,15 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
|
|||||||
|
|
||||||
console.log('Orders: Found', orderItems.length, 'order items to process');
|
console.log('Orders: Found', orderItems.length, 'order items to process');
|
||||||
|
|
||||||
// Create tables in PostgreSQL for debugging
|
// Create tables in PostgreSQL for data processing
|
||||||
await localConnection.query(`
|
await localConnection.query(`
|
||||||
DROP TABLE IF EXISTS debug_order_items;
|
DROP TABLE IF EXISTS temp_order_items;
|
||||||
DROP TABLE IF EXISTS debug_order_meta;
|
DROP TABLE IF EXISTS temp_order_meta;
|
||||||
DROP TABLE IF EXISTS debug_order_discounts;
|
DROP TABLE IF EXISTS temp_order_discounts;
|
||||||
DROP TABLE IF EXISTS debug_order_taxes;
|
DROP TABLE IF EXISTS temp_order_taxes;
|
||||||
DROP TABLE IF EXISTS debug_order_costs;
|
DROP TABLE IF EXISTS temp_order_costs;
|
||||||
|
|
||||||
CREATE TABLE debug_order_items (
|
CREATE TEMP TABLE temp_order_items (
|
||||||
order_id INTEGER NOT NULL,
|
order_id INTEGER NOT NULL,
|
||||||
pid INTEGER NOT NULL,
|
pid INTEGER NOT NULL,
|
||||||
SKU VARCHAR(50) NOT NULL,
|
SKU VARCHAR(50) NOT NULL,
|
||||||
@@ -123,7 +124,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
|
|||||||
PRIMARY KEY (order_id, pid)
|
PRIMARY KEY (order_id, pid)
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE TABLE debug_order_meta (
|
CREATE TEMP TABLE temp_order_meta (
|
||||||
order_id INTEGER NOT NULL,
|
order_id INTEGER NOT NULL,
|
||||||
date DATE NOT NULL,
|
date DATE NOT NULL,
|
||||||
customer VARCHAR(100) NOT NULL,
|
customer VARCHAR(100) NOT NULL,
|
||||||
@@ -135,26 +136,29 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
|
|||||||
PRIMARY KEY (order_id)
|
PRIMARY KEY (order_id)
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE TABLE debug_order_discounts (
|
CREATE TEMP TABLE temp_order_discounts (
|
||||||
order_id INTEGER NOT NULL,
|
order_id INTEGER NOT NULL,
|
||||||
pid INTEGER NOT NULL,
|
pid INTEGER NOT NULL,
|
||||||
discount DECIMAL(10,2) NOT NULL,
|
discount DECIMAL(10,2) NOT NULL,
|
||||||
PRIMARY KEY (order_id, pid)
|
PRIMARY KEY (order_id, pid)
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE TABLE debug_order_taxes (
|
CREATE TEMP TABLE temp_order_taxes (
|
||||||
order_id INTEGER NOT NULL,
|
order_id INTEGER NOT NULL,
|
||||||
pid INTEGER NOT NULL,
|
pid INTEGER NOT NULL,
|
||||||
tax DECIMAL(10,2) NOT NULL,
|
tax DECIMAL(10,2) NOT NULL,
|
||||||
PRIMARY KEY (order_id, pid)
|
PRIMARY KEY (order_id, pid)
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE TABLE debug_order_costs (
|
CREATE TEMP TABLE temp_order_costs (
|
||||||
order_id INTEGER NOT NULL,
|
order_id INTEGER NOT NULL,
|
||||||
pid INTEGER NOT NULL,
|
pid INTEGER NOT NULL,
|
||||||
costeach DECIMAL(10,3) DEFAULT 0.000,
|
costeach DECIMAL(10,3) DEFAULT 0.000,
|
||||||
PRIMARY KEY (order_id, pid)
|
PRIMARY KEY (order_id, pid)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
CREATE INDEX idx_temp_order_items_pid ON temp_order_items(pid);
|
||||||
|
CREATE INDEX idx_temp_order_meta_order_id ON temp_order_meta(order_id);
|
||||||
`);
|
`);
|
||||||
|
|
||||||
// Insert order items in batches
|
// Insert order items in batches
|
||||||
@@ -168,7 +172,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
|
|||||||
]);
|
]);
|
||||||
|
|
||||||
await localConnection.query(`
|
await localConnection.query(`
|
||||||
INSERT INTO debug_order_items (order_id, pid, SKU, price, quantity, base_discount)
|
INSERT INTO temp_order_items (order_id, pid, SKU, price, quantity, base_discount)
|
||||||
VALUES ${placeholders}
|
VALUES ${placeholders}
|
||||||
ON CONFLICT (order_id, pid) DO UPDATE SET
|
ON CONFLICT (order_id, pid) DO UPDATE SET
|
||||||
SKU = EXCLUDED.SKU,
|
SKU = EXCLUDED.SKU,
|
||||||
@@ -239,7 +243,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
|
|||||||
]);
|
]);
|
||||||
|
|
||||||
await localConnection.query(`
|
await localConnection.query(`
|
||||||
INSERT INTO debug_order_meta (
|
INSERT INTO temp_order_meta (
|
||||||
order_id, date, customer, customer_name, status, canceled,
|
order_id, date, customer, customer_name, status, canceled,
|
||||||
summary_discount, summary_subtotal
|
summary_discount, summary_subtotal
|
||||||
)
|
)
|
||||||
@@ -281,7 +285,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
|
|||||||
]);
|
]);
|
||||||
|
|
||||||
await localConnection.query(`
|
await localConnection.query(`
|
||||||
INSERT INTO debug_order_discounts (order_id, pid, discount)
|
INSERT INTO temp_order_discounts (order_id, pid, discount)
|
||||||
VALUES ${placeholders}
|
VALUES ${placeholders}
|
||||||
ON CONFLICT (order_id, pid) DO UPDATE SET
|
ON CONFLICT (order_id, pid) DO UPDATE SET
|
||||||
discount = EXCLUDED.discount
|
discount = EXCLUDED.discount
|
||||||
@@ -321,7 +325,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
|
|||||||
]);
|
]);
|
||||||
|
|
||||||
await localConnection.query(`
|
await localConnection.query(`
|
||||||
INSERT INTO debug_order_taxes (order_id, pid, tax)
|
INSERT INTO temp_order_taxes (order_id, pid, tax)
|
||||||
VALUES ${placeholders}
|
VALUES ${placeholders}
|
||||||
ON CONFLICT (order_id, pid) DO UPDATE SET
|
ON CONFLICT (order_id, pid) DO UPDATE SET
|
||||||
tax = EXCLUDED.tax
|
tax = EXCLUDED.tax
|
||||||
@@ -330,14 +334,23 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
|
|||||||
};
|
};
|
||||||
|
|
||||||
const processCostsBatch = async (batchIds) => {
|
const processCostsBatch = async (batchIds) => {
|
||||||
|
// Modified query to ensure one row per order_id/pid by using a subquery
|
||||||
const [costs] = await prodConnection.query(`
|
const [costs] = await prodConnection.query(`
|
||||||
SELECT
|
SELECT
|
||||||
oc.orderid as order_id,
|
oc.orderid as order_id,
|
||||||
oc.pid,
|
oc.pid,
|
||||||
oc.costeach
|
oc.costeach
|
||||||
FROM order_costs oc
|
FROM order_costs oc
|
||||||
WHERE oc.orderid IN (?)
|
INNER JOIN (
|
||||||
AND oc.pending = 0
|
SELECT
|
||||||
|
orderid,
|
||||||
|
pid,
|
||||||
|
MAX(id) as max_id
|
||||||
|
FROM order_costs
|
||||||
|
WHERE orderid IN (?)
|
||||||
|
AND pending = 0
|
||||||
|
GROUP BY orderid, pid
|
||||||
|
) latest ON oc.orderid = latest.orderid AND oc.pid = latest.pid AND oc.id = latest.max_id
|
||||||
`, [batchIds]);
|
`, [batchIds]);
|
||||||
|
|
||||||
if (costs.length === 0) return;
|
if (costs.length === 0) return;
|
||||||
@@ -357,7 +370,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
|
|||||||
]);
|
]);
|
||||||
|
|
||||||
await localConnection.query(`
|
await localConnection.query(`
|
||||||
INSERT INTO debug_order_costs (order_id, pid, costeach)
|
INSERT INTO temp_order_costs (order_id, pid, costeach)
|
||||||
VALUES ${placeholders}
|
VALUES ${placeholders}
|
||||||
ON CONFLICT (order_id, pid) DO UPDATE SET
|
ON CONFLICT (order_id, pid) DO UPDATE SET
|
||||||
costeach = EXCLUDED.costeach
|
costeach = EXCLUDED.costeach
|
||||||
@@ -417,9 +430,9 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
|
|||||||
SUM(COALESCE(od.discount, 0)) as promo_discount,
|
SUM(COALESCE(od.discount, 0)) as promo_discount,
|
||||||
COALESCE(ot.tax, 0) as total_tax,
|
COALESCE(ot.tax, 0) as total_tax,
|
||||||
COALESCE(oi.price * 0.5, 0) as costeach
|
COALESCE(oi.price * 0.5, 0) as costeach
|
||||||
FROM debug_order_items oi
|
FROM temp_order_items oi
|
||||||
LEFT JOIN debug_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid
|
LEFT JOIN temp_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid
|
||||||
LEFT JOIN debug_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid
|
LEFT JOIN temp_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid
|
||||||
GROUP BY oi.order_id, oi.pid, ot.tax
|
GROUP BY oi.order_id, oi.pid, ot.tax
|
||||||
)
|
)
|
||||||
SELECT
|
SELECT
|
||||||
@@ -447,11 +460,11 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
|
|||||||
FROM (
|
FROM (
|
||||||
SELECT DISTINCT ON (order_id, pid)
|
SELECT DISTINCT ON (order_id, pid)
|
||||||
order_id, pid, SKU, price, quantity, base_discount
|
order_id, pid, SKU, price, quantity, base_discount
|
||||||
FROM debug_order_items
|
FROM temp_order_items
|
||||||
WHERE order_id = ANY($1)
|
WHERE order_id = ANY($1)
|
||||||
ORDER BY order_id, pid
|
ORDER BY order_id, pid
|
||||||
) oi
|
) oi
|
||||||
JOIN debug_order_meta om ON oi.order_id = om.order_id
|
JOIN temp_order_meta om ON oi.order_id = om.order_id
|
||||||
LEFT JOIN order_totals ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid
|
LEFT JOIN order_totals ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid
|
||||||
ORDER BY oi.order_id, oi.pid
|
ORDER BY oi.order_id, oi.pid
|
||||||
`, [subBatchIds]);
|
`, [subBatchIds]);
|
||||||
@@ -529,8 +542,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
|
|||||||
`, batchValues);
|
`, batchValues);
|
||||||
|
|
||||||
const { inserted, updated } = result.rows[0];
|
const { inserted, updated } = result.rows[0];
|
||||||
recordsAdded += inserted;
|
recordsAdded += parseInt(inserted) || 0;
|
||||||
recordsUpdated += updated;
|
recordsUpdated += parseInt(updated) || 0;
|
||||||
importedCount += subBatch.length;
|
importedCount += subBatch.length;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -555,19 +568,39 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
|
|||||||
ON CONFLICT (table_name) DO UPDATE SET
|
ON CONFLICT (table_name) DO UPDATE SET
|
||||||
last_sync_timestamp = NOW()
|
last_sync_timestamp = NOW()
|
||||||
`);
|
`);
|
||||||
|
|
||||||
|
// Cleanup temporary tables
|
||||||
|
await localConnection.query(`
|
||||||
|
DROP TABLE IF EXISTS temp_order_items;
|
||||||
|
DROP TABLE IF EXISTS temp_order_meta;
|
||||||
|
DROP TABLE IF EXISTS temp_order_discounts;
|
||||||
|
DROP TABLE IF EXISTS temp_order_taxes;
|
||||||
|
DROP TABLE IF EXISTS temp_order_costs;
|
||||||
|
`);
|
||||||
|
|
||||||
|
// Commit transaction
|
||||||
|
await localConnection.commit();
|
||||||
|
|
||||||
return {
|
return {
|
||||||
status: "complete",
|
status: "complete",
|
||||||
totalImported: Math.floor(importedCount),
|
totalImported: Math.floor(importedCount) || 0,
|
||||||
recordsAdded: recordsAdded || 0,
|
recordsAdded: parseInt(recordsAdded) || 0,
|
||||||
recordsUpdated: Math.floor(recordsUpdated),
|
recordsUpdated: parseInt(recordsUpdated) || 0,
|
||||||
totalSkipped: skippedOrders.size,
|
totalSkipped: skippedOrders.size || 0,
|
||||||
missingProducts: missingProducts.size,
|
missingProducts: missingProducts.size || 0,
|
||||||
incrementalUpdate,
|
incrementalUpdate,
|
||||||
lastSyncTime
|
lastSyncTime
|
||||||
};
|
};
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error("Error during orders import:", error);
|
console.error("Error during orders import:", error);
|
||||||
|
|
||||||
|
// Rollback transaction
|
||||||
|
try {
|
||||||
|
await localConnection.rollback();
|
||||||
|
} catch (rollbackError) {
|
||||||
|
console.error("Error during rollback:", rollbackError);
|
||||||
|
}
|
||||||
|
|
||||||
throw error;
|
throw error;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,9 +2,12 @@ const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } =
|
|||||||
const BATCH_SIZE = 100; // Smaller batch size for better progress tracking
|
const BATCH_SIZE = 100; // Smaller batch size for better progress tracking
|
||||||
const MAX_RETRIES = 3;
|
const MAX_RETRIES = 3;
|
||||||
const RETRY_DELAY = 5000; // 5 seconds
|
const RETRY_DELAY = 5000; // 5 seconds
|
||||||
|
const dotenv = require("dotenv");
|
||||||
|
const path = require("path");
|
||||||
|
dotenv.config({ path: path.join(__dirname, "../../.env") });
|
||||||
|
|
||||||
// Utility functions
|
// Utility functions
|
||||||
const imageUrlBase = 'https://sbing.com/i/products/0000/';
|
const imageUrlBase = process.env.PRODUCT_IMAGE_URL_BASE || 'https://sbing.com/i/products/0000/';
|
||||||
const getImageUrls = (pid, iid = 1) => {
|
const getImageUrls = (pid, iid = 1) => {
|
||||||
const paddedPid = pid.toString().padStart(6, '0');
|
const paddedPid = pid.toString().padStart(6, '0');
|
||||||
// Use padded PID only for the first 3 digits
|
// Use padded PID only for the first 3 digits
|
||||||
@@ -18,7 +21,7 @@ const getImageUrls = (pid, iid = 1) => {
|
|||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
// Add helper function for retrying operations
|
// Add helper function for retrying operations with exponential backoff
|
||||||
async function withRetry(operation, errorMessage) {
|
async function withRetry(operation, errorMessage) {
|
||||||
let lastError;
|
let lastError;
|
||||||
for (let attempt = 1; attempt <= MAX_RETRIES; attempt++) {
|
for (let attempt = 1; attempt <= MAX_RETRIES; attempt++) {
|
||||||
@@ -28,7 +31,8 @@ async function withRetry(operation, errorMessage) {
|
|||||||
lastError = error;
|
lastError = error;
|
||||||
console.error(`${errorMessage} (Attempt ${attempt}/${MAX_RETRIES}):`, error);
|
console.error(`${errorMessage} (Attempt ${attempt}/${MAX_RETRIES}):`, error);
|
||||||
if (attempt < MAX_RETRIES) {
|
if (attempt < MAX_RETRIES) {
|
||||||
await new Promise(resolve => setTimeout(resolve, RETRY_DELAY));
|
const backoffTime = RETRY_DELAY * Math.pow(2, attempt - 1);
|
||||||
|
await new Promise(resolve => setTimeout(resolve, backoffTime));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -772,32 +776,44 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
|
|||||||
recordsAdded += parseInt(result.rows[0].inserted, 10) || 0;
|
recordsAdded += parseInt(result.rows[0].inserted, 10) || 0;
|
||||||
recordsUpdated += parseInt(result.rows[0].updated, 10) || 0;
|
recordsUpdated += parseInt(result.rows[0].updated, 10) || 0;
|
||||||
|
|
||||||
// Process category relationships for each product in the batch
|
// Process category relationships in batches
|
||||||
|
const allCategories = [];
|
||||||
for (const row of batch) {
|
for (const row of batch) {
|
||||||
if (row.categories) {
|
if (row.categories) {
|
||||||
const categoryIds = row.categories.split(',').filter(id => id && id.trim());
|
const categoryIds = row.categories.split(',').filter(id => id && id.trim());
|
||||||
if (categoryIds.length > 0) {
|
if (categoryIds.length > 0) {
|
||||||
const catPlaceholders = categoryIds.map((_, idx) =>
|
categoryIds.forEach(catId => {
|
||||||
`($${idx * 2 + 1}, $${idx * 2 + 2})`
|
allCategories.push([row.pid, parseInt(catId.trim(), 10)]);
|
||||||
).join(',');
|
});
|
||||||
const catValues = categoryIds.flatMap(catId => [row.pid, parseInt(catId.trim(), 10)]);
|
|
||||||
|
|
||||||
// First delete existing relationships for this product
|
|
||||||
await localConnection.query(
|
|
||||||
'DELETE FROM product_categories WHERE pid = $1',
|
|
||||||
[row.pid]
|
|
||||||
);
|
|
||||||
|
|
||||||
// Then insert the new relationships
|
|
||||||
await localConnection.query(`
|
|
||||||
INSERT INTO product_categories (pid, cat_id)
|
|
||||||
VALUES ${catPlaceholders}
|
|
||||||
ON CONFLICT (pid, cat_id) DO NOTHING
|
|
||||||
`, catValues);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If we have categories to process
|
||||||
|
if (allCategories.length > 0) {
|
||||||
|
// First get all products in this batch
|
||||||
|
const productIds = batch.map(p => p.pid);
|
||||||
|
|
||||||
|
// Delete all existing relationships for products in this batch
|
||||||
|
await localConnection.query(
|
||||||
|
'DELETE FROM product_categories WHERE pid = ANY($1)',
|
||||||
|
[productIds]
|
||||||
|
);
|
||||||
|
|
||||||
|
// Insert all new relationships in one batch
|
||||||
|
const catPlaceholders = allCategories.map((_, idx) =>
|
||||||
|
`($${idx * 2 + 1}, $${idx * 2 + 2})`
|
||||||
|
).join(',');
|
||||||
|
|
||||||
|
const catValues = allCategories.flat();
|
||||||
|
|
||||||
|
await localConnection.query(`
|
||||||
|
INSERT INTO product_categories (pid, cat_id)
|
||||||
|
VALUES ${catPlaceholders}
|
||||||
|
ON CONFLICT (pid, cat_id) DO NOTHING
|
||||||
|
`, catValues);
|
||||||
|
}
|
||||||
|
|
||||||
outputProgress({
|
outputProgress({
|
||||||
status: "running",
|
status: "running",
|
||||||
operation: "Products import",
|
operation: "Products import",
|
||||||
|
|||||||
@@ -6,6 +6,9 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
|
|||||||
let recordsUpdated = 0;
|
let recordsUpdated = 0;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
// Begin transaction for the entire import process
|
||||||
|
await localConnection.beginTransaction();
|
||||||
|
|
||||||
// Get last sync info
|
// Get last sync info
|
||||||
const [syncInfo] = await localConnection.query(
|
const [syncInfo] = await localConnection.query(
|
||||||
"SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'purchase_orders'"
|
"SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'purchase_orders'"
|
||||||
@@ -14,12 +17,10 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
|
|||||||
|
|
||||||
console.log('Purchase Orders: Using last sync time:', lastSyncTime);
|
console.log('Purchase Orders: Using last sync time:', lastSyncTime);
|
||||||
|
|
||||||
// Create temporary tables with PostgreSQL syntax
|
// Create temp tables
|
||||||
await localConnection.query(`
|
await localConnection.query(`
|
||||||
DROP TABLE IF EXISTS temp_purchase_orders;
|
DROP TABLE IF EXISTS temp_purchase_orders;
|
||||||
DROP TABLE IF EXISTS temp_po_receivings;
|
CREATE TABLE temp_purchase_orders (
|
||||||
|
|
||||||
CREATE TEMP TABLE temp_purchase_orders (
|
|
||||||
po_id INTEGER NOT NULL,
|
po_id INTEGER NOT NULL,
|
||||||
pid INTEGER NOT NULL,
|
pid INTEGER NOT NULL,
|
||||||
sku VARCHAR(50),
|
sku VARCHAR(50),
|
||||||
@@ -33,60 +34,14 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
|
|||||||
cost_price DECIMAL(10,3),
|
cost_price DECIMAL(10,3),
|
||||||
PRIMARY KEY (po_id, pid)
|
PRIMARY KEY (po_id, pid)
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE TEMP TABLE temp_po_receivings (
|
|
||||||
po_id INTEGER,
|
|
||||||
pid INTEGER NOT NULL,
|
|
||||||
receiving_id INTEGER NOT NULL,
|
|
||||||
qty_each INTEGER,
|
|
||||||
cost_each DECIMAL(10,3),
|
|
||||||
received_date TIMESTAMP WITH TIME ZONE,
|
|
||||||
received_by INTEGER,
|
|
||||||
received_by_name VARCHAR(255),
|
|
||||||
is_alt_po INTEGER,
|
|
||||||
PRIMARY KEY (receiving_id, pid)
|
|
||||||
);
|
|
||||||
`);
|
`);
|
||||||
|
|
||||||
outputProgress({
|
|
||||||
operation: `Starting ${incrementalUpdate ? 'incremental' : 'full'} purchase orders import`,
|
|
||||||
status: "running",
|
|
||||||
});
|
|
||||||
|
|
||||||
// Get column names - Keep MySQL compatible for production
|
|
||||||
const [columns] = await prodConnection.query(`
|
|
||||||
SELECT COLUMN_NAME
|
|
||||||
FROM INFORMATION_SCHEMA.COLUMNS
|
|
||||||
WHERE TABLE_NAME = 'purchase_orders'
|
|
||||||
AND COLUMN_NAME != 'updated' -- Exclude the updated column
|
|
||||||
ORDER BY ORDINAL_POSITION
|
|
||||||
`);
|
|
||||||
const columnNames = columns.map(col => col.COLUMN_NAME);
|
|
||||||
|
|
||||||
// Build incremental conditions
|
|
||||||
const incrementalWhereClause = incrementalUpdate
|
|
||||||
? `AND (
|
|
||||||
p.date_updated > ?
|
|
||||||
OR p.date_ordered > ?
|
|
||||||
OR p.date_estin > ?
|
|
||||||
OR r.date_updated > ?
|
|
||||||
OR r.date_created > ?
|
|
||||||
OR r.date_checked > ?
|
|
||||||
OR rp.stamp > ?
|
|
||||||
OR rp.received_date > ?
|
|
||||||
)`
|
|
||||||
: "";
|
|
||||||
const incrementalParams = incrementalUpdate
|
|
||||||
? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime]
|
|
||||||
: [];
|
|
||||||
|
|
||||||
// First get all relevant PO IDs with basic info - Keep MySQL compatible for production
|
// First get all relevant PO IDs with basic info - Keep MySQL compatible for production
|
||||||
const [[{ total }]] = await prodConnection.query(`
|
const [[{ total }]] = await prodConnection.query(`
|
||||||
SELECT COUNT(*) as total
|
SELECT COUNT(*) as total
|
||||||
FROM (
|
FROM (
|
||||||
SELECT DISTINCT pop.po_id, pop.pid
|
SELECT DISTINCT pop.po_id, pop.pid
|
||||||
FROM po p
|
FROM po p
|
||||||
USE INDEX (idx_date_created)
|
|
||||||
JOIN po_products pop ON p.po_id = pop.po_id
|
JOIN po_products pop ON p.po_id = pop.po_id
|
||||||
JOIN suppliers s ON p.supplier_id = s.supplierid
|
JOIN suppliers s ON p.supplier_id = s.supplierid
|
||||||
WHERE p.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR)
|
WHERE p.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR)
|
||||||
@@ -97,520 +52,165 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
|
|||||||
OR p.date_estin > ?
|
OR p.date_estin > ?
|
||||||
)
|
)
|
||||||
` : ''}
|
` : ''}
|
||||||
UNION
|
|
||||||
SELECT DISTINCT r.receiving_id as po_id, rp.pid
|
|
||||||
FROM receivings_products rp
|
|
||||||
USE INDEX (received_date)
|
|
||||||
LEFT JOIN receivings r ON r.receiving_id = rp.receiving_id
|
|
||||||
WHERE rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR)
|
|
||||||
${incrementalUpdate ? `
|
|
||||||
AND (
|
|
||||||
r.date_created > ?
|
|
||||||
OR r.date_checked > ?
|
|
||||||
OR rp.stamp > ?
|
|
||||||
OR rp.received_date > ?
|
|
||||||
)
|
|
||||||
` : ''}
|
|
||||||
) all_items
|
) all_items
|
||||||
`, incrementalUpdate ? [
|
`, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []);
|
||||||
lastSyncTime, lastSyncTime, lastSyncTime, // PO conditions
|
|
||||||
lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime // Receiving conditions
|
|
||||||
] : []);
|
|
||||||
|
|
||||||
console.log('Purchase Orders: Found changes:', total);
|
console.log('Purchase Orders: Found changes:', total);
|
||||||
|
|
||||||
// Get PO list - Keep MySQL compatible for production
|
// Get PO list - Keep MySQL compatible for production
|
||||||
const [poList] = await prodConnection.query(`
|
console.log('Fetching purchase orders in batches...');
|
||||||
SELECT DISTINCT
|
|
||||||
COALESCE(p.po_id, r.receiving_id) as po_id,
|
const FETCH_BATCH_SIZE = 5000;
|
||||||
COALESCE(
|
const INSERT_BATCH_SIZE = 200; // Process 200 records at a time for inserts
|
||||||
NULLIF(s1.companyname, ''),
|
let offset = 0;
|
||||||
NULLIF(s2.companyname, ''),
|
let allProcessed = false;
|
||||||
'Unknown Vendor'
|
|
||||||
) as vendor,
|
|
||||||
CASE
|
|
||||||
WHEN p.po_id IS NOT NULL THEN
|
|
||||||
COALESCE(
|
|
||||||
NULLIF(p.date_ordered, '0000-00-00 00:00:00'),
|
|
||||||
p.date_created
|
|
||||||
)
|
|
||||||
WHEN r.receiving_id IS NOT NULL THEN
|
|
||||||
r.date_created
|
|
||||||
END as date,
|
|
||||||
CASE
|
|
||||||
WHEN p.date_estin = '0000-00-00' THEN NULL
|
|
||||||
WHEN p.date_estin IS NULL THEN NULL
|
|
||||||
WHEN p.date_estin NOT REGEXP '^[0-9]{4}-[0-9]{2}-[0-9]{2}$' THEN NULL
|
|
||||||
ELSE p.date_estin
|
|
||||||
END as expected_date,
|
|
||||||
COALESCE(p.status, 50) as status,
|
|
||||||
p.short_note as notes,
|
|
||||||
p.notes as long_note
|
|
||||||
FROM (
|
|
||||||
SELECT po_id FROM po
|
|
||||||
USE INDEX (idx_date_created)
|
|
||||||
WHERE date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR)
|
|
||||||
${incrementalUpdate ? `
|
|
||||||
AND (
|
|
||||||
date_ordered > ?
|
|
||||||
OR date_updated > ?
|
|
||||||
OR date_estin > ?
|
|
||||||
)
|
|
||||||
` : ''}
|
|
||||||
UNION
|
|
||||||
SELECT DISTINCT r.receiving_id as po_id
|
|
||||||
FROM receivings r
|
|
||||||
JOIN receivings_products rp USE INDEX (received_date) ON r.receiving_id = rp.receiving_id
|
|
||||||
WHERE rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR)
|
|
||||||
${incrementalUpdate ? `
|
|
||||||
AND (
|
|
||||||
r.date_created > ?
|
|
||||||
OR r.date_checked > ?
|
|
||||||
OR rp.stamp > ?
|
|
||||||
OR rp.received_date > ?
|
|
||||||
)
|
|
||||||
` : ''}
|
|
||||||
) ids
|
|
||||||
LEFT JOIN po p ON ids.po_id = p.po_id
|
|
||||||
LEFT JOIN suppliers s1 ON p.supplier_id = s1.supplierid
|
|
||||||
LEFT JOIN receivings r ON ids.po_id = r.receiving_id
|
|
||||||
LEFT JOIN suppliers s2 ON r.supplier_id = s2.supplierid
|
|
||||||
ORDER BY po_id
|
|
||||||
`, incrementalUpdate ? [
|
|
||||||
lastSyncTime, lastSyncTime, lastSyncTime, // PO conditions
|
|
||||||
lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime // Receiving conditions
|
|
||||||
] : []);
|
|
||||||
|
|
||||||
console.log('Sample PO dates:', poList.slice(0, 5).map(po => ({
|
|
||||||
po_id: po.po_id,
|
|
||||||
raw_date_ordered: po.raw_date_ordered,
|
|
||||||
raw_date_created: po.raw_date_created,
|
|
||||||
raw_date_estin: po.raw_date_estin,
|
|
||||||
computed_date: po.date,
|
|
||||||
expected_date: po.expected_date
|
|
||||||
})));
|
|
||||||
|
|
||||||
const totalItems = total;
|
|
||||||
let processed = 0;
|
|
||||||
|
|
||||||
const BATCH_SIZE = 5000;
|
|
||||||
const PROGRESS_INTERVAL = 500;
|
|
||||||
let lastProgressUpdate = Date.now();
|
|
||||||
|
|
||||||
outputProgress({
|
|
||||||
operation: `Starting purchase orders import - Processing ${totalItems} purchase order items`,
|
|
||||||
status: "running",
|
|
||||||
});
|
|
||||||
|
|
||||||
for (let i = 0; i < poList.length; i += BATCH_SIZE) {
|
|
||||||
const batch = poList.slice(i, Math.min(i + BATCH_SIZE, poList.length));
|
|
||||||
const poIds = batch.map(po => po.po_id);
|
|
||||||
|
|
||||||
// Get all products for these POs in one query - Keep MySQL compatible for production
|
|
||||||
const [poProducts] = await prodConnection.query(`
|
|
||||||
SELECT
|
|
||||||
pop.po_id,
|
|
||||||
pop.pid,
|
|
||||||
pr.itemnumber as sku,
|
|
||||||
pr.description as name,
|
|
||||||
pop.cost_each as cost_price,
|
|
||||||
pop.qty_each as ordered
|
|
||||||
FROM po_products pop
|
|
||||||
USE INDEX (PRIMARY)
|
|
||||||
JOIN products pr ON pop.pid = pr.pid
|
|
||||||
WHERE pop.po_id IN (?)
|
|
||||||
`, [poIds]);
|
|
||||||
|
|
||||||
// Process PO products in smaller sub-batches to avoid packet size issues
|
|
||||||
const SUB_BATCH_SIZE = 5000;
|
|
||||||
for (let j = 0; j < poProducts.length; j += SUB_BATCH_SIZE) {
|
|
||||||
const productBatch = poProducts.slice(j, j + SUB_BATCH_SIZE);
|
|
||||||
const productPids = [...new Set(productBatch.map(p => p.pid))];
|
|
||||||
const batchPoIds = [...new Set(productBatch.map(p => p.po_id))];
|
|
||||||
|
|
||||||
// Get receivings for this batch with employee names
|
|
||||||
const [receivings] = await prodConnection.query(`
|
|
||||||
SELECT
|
|
||||||
r.po_id,
|
|
||||||
rp.pid,
|
|
||||||
rp.receiving_id,
|
|
||||||
rp.qty_each,
|
|
||||||
rp.cost_each,
|
|
||||||
COALESCE(rp.received_date, r.date_created) as received_date,
|
|
||||||
rp.received_by,
|
|
||||||
CONCAT(e.firstname, ' ', e.lastname) as received_by_name,
|
|
||||||
CASE
|
|
||||||
WHEN r.po_id IS NULL THEN 2 -- No PO
|
|
||||||
WHEN r.po_id IN (?) THEN 0 -- Original PO
|
|
||||||
ELSE 1 -- Different PO
|
|
||||||
END as is_alt_po
|
|
||||||
FROM receivings_products rp
|
|
||||||
USE INDEX (received_date)
|
|
||||||
LEFT JOIN receivings r ON r.receiving_id = rp.receiving_id
|
|
||||||
LEFT JOIN employees e ON rp.received_by = e.employeeid
|
|
||||||
WHERE rp.pid IN (?)
|
|
||||||
AND rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR)
|
|
||||||
ORDER BY r.po_id, rp.pid, rp.received_date
|
|
||||||
`, [batchPoIds, productPids]);
|
|
||||||
|
|
||||||
// Insert receivings into temp table
|
|
||||||
if (receivings.length > 0) {
|
|
||||||
// Process in smaller chunks to avoid parameter limits
|
|
||||||
const CHUNK_SIZE = 100; // Reduce chunk size to avoid parameter limits
|
|
||||||
for (let i = 0; i < receivings.length; i += CHUNK_SIZE) {
|
|
||||||
const chunk = receivings.slice(i, Math.min(i + CHUNK_SIZE, receivings.length));
|
|
||||||
|
|
||||||
const values = [];
|
|
||||||
const placeholders = [];
|
|
||||||
|
|
||||||
chunk.forEach((r, idx) => {
|
|
||||||
values.push(
|
|
||||||
r.po_id,
|
|
||||||
r.pid,
|
|
||||||
r.receiving_id,
|
|
||||||
r.qty_each,
|
|
||||||
r.cost_each,
|
|
||||||
r.received_date,
|
|
||||||
r.received_by,
|
|
||||||
r.received_by_name || null,
|
|
||||||
r.is_alt_po
|
|
||||||
);
|
|
||||||
|
|
||||||
const offset = idx * 9;
|
|
||||||
placeholders.push(`($${offset + 1}, $${offset + 2}, $${offset + 3}, $${offset + 4}, $${offset + 5}, $${offset + 6}, $${offset + 7}, $${offset + 8}, $${offset + 9})`);
|
|
||||||
});
|
|
||||||
|
|
||||||
await localConnection.query(`
|
|
||||||
INSERT INTO temp_po_receivings (
|
|
||||||
po_id, pid, receiving_id, qty_each, cost_each, received_date,
|
|
||||||
received_by, received_by_name, is_alt_po
|
|
||||||
)
|
|
||||||
VALUES ${placeholders.join(',')}
|
|
||||||
ON CONFLICT (receiving_id, pid) DO UPDATE SET
|
|
||||||
po_id = EXCLUDED.po_id,
|
|
||||||
qty_each = EXCLUDED.qty_each,
|
|
||||||
cost_each = EXCLUDED.cost_each,
|
|
||||||
received_date = EXCLUDED.received_date,
|
|
||||||
received_by = EXCLUDED.received_by,
|
|
||||||
received_by_name = EXCLUDED.received_by_name,
|
|
||||||
is_alt_po = EXCLUDED.is_alt_po
|
|
||||||
`, values);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Process each PO product in chunks
|
|
||||||
const PRODUCT_CHUNK_SIZE = 100;
|
|
||||||
for (let i = 0; i < productBatch.length; i += PRODUCT_CHUNK_SIZE) {
|
|
||||||
const chunk = productBatch.slice(i, Math.min(i + PRODUCT_CHUNK_SIZE, productBatch.length));
|
|
||||||
const values = [];
|
|
||||||
const placeholders = [];
|
|
||||||
|
|
||||||
chunk.forEach((product, idx) => {
|
|
||||||
const po = batch.find(p => p.po_id === product.po_id);
|
|
||||||
if (!po) return;
|
|
||||||
|
|
||||||
values.push(
|
|
||||||
product.po_id,
|
|
||||||
product.pid,
|
|
||||||
product.sku,
|
|
||||||
product.name,
|
|
||||||
po.vendor,
|
|
||||||
po.date,
|
|
||||||
po.expected_date,
|
|
||||||
po.status,
|
|
||||||
po.notes || po.long_note,
|
|
||||||
product.ordered,
|
|
||||||
product.cost_price
|
|
||||||
);
|
|
||||||
|
|
||||||
const offset = idx * 11; // Updated to match 11 fields
|
|
||||||
placeholders.push(`($${offset + 1}, $${offset + 2}, $${offset + 3}, $${offset + 4}, $${offset + 5}, $${offset + 6}, $${offset + 7}, $${offset + 8}, $${offset + 9}, $${offset + 10}, $${offset + 11})`);
|
|
||||||
});
|
|
||||||
|
|
||||||
if (placeholders.length > 0) {
|
|
||||||
await localConnection.query(`
|
|
||||||
INSERT INTO temp_purchase_orders (
|
|
||||||
po_id, pid, sku, name, vendor, date, expected_date,
|
|
||||||
status, notes, ordered, cost_price
|
|
||||||
)
|
|
||||||
VALUES ${placeholders.join(',')}
|
|
||||||
ON CONFLICT (po_id, pid) DO UPDATE SET
|
|
||||||
sku = EXCLUDED.sku,
|
|
||||||
name = EXCLUDED.name,
|
|
||||||
vendor = EXCLUDED.vendor,
|
|
||||||
date = EXCLUDED.date,
|
|
||||||
expected_date = EXCLUDED.expected_date,
|
|
||||||
status = EXCLUDED.status,
|
|
||||||
notes = EXCLUDED.notes,
|
|
||||||
ordered = EXCLUDED.ordered,
|
|
||||||
cost_price = EXCLUDED.cost_price
|
|
||||||
`, values);
|
|
||||||
}
|
|
||||||
|
|
||||||
processed += chunk.length;
|
|
||||||
|
|
||||||
// Update progress based on time interval
|
|
||||||
const now = Date.now();
|
|
||||||
if (now - lastProgressUpdate >= PROGRESS_INTERVAL || processed === totalItems) {
|
|
||||||
outputProgress({
|
|
||||||
status: "running",
|
|
||||||
operation: "Purchase orders import",
|
|
||||||
current: processed,
|
|
||||||
total: totalItems,
|
|
||||||
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
|
|
||||||
remaining: estimateRemaining(startTime, processed, totalItems),
|
|
||||||
rate: calculateRate(startTime, processed)
|
|
||||||
});
|
|
||||||
lastProgressUpdate = now;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Insert final data into purchase_orders table in chunks
|
|
||||||
const FINAL_CHUNK_SIZE = 1000;
|
|
||||||
let totalProcessed = 0;
|
let totalProcessed = 0;
|
||||||
const totalPosResult = await localConnection.query('SELECT COUNT(*) as total_pos FROM temp_purchase_orders');
|
|
||||||
const total_pos = parseInt(totalPosResult.rows?.[0]?.total_pos || '0', 10);
|
while (!allProcessed) {
|
||||||
|
console.log(`Fetching batch at offset ${offset}...`);
|
||||||
|
const [poList] = await prodConnection.query(`
|
||||||
|
SELECT DISTINCT
|
||||||
|
COALESCE(p.po_id, 0) as po_id,
|
||||||
|
pop.pid,
|
||||||
|
COALESCE(NULLIF(pr.itemnumber, ''), 'NO-SKU') as sku,
|
||||||
|
COALESCE(pr.description, 'Unknown Product') as name,
|
||||||
|
COALESCE(NULLIF(s.companyname, ''), 'Unknown Vendor') as vendor,
|
||||||
|
COALESCE(p.date_ordered, p.date_created) as date,
|
||||||
|
p.date_estin as expected_date,
|
||||||
|
COALESCE(p.status, 1) as status,
|
||||||
|
COALESCE(p.short_note, p.notes) as notes,
|
||||||
|
pop.qty_each as ordered,
|
||||||
|
pop.cost_each as cost_price
|
||||||
|
FROM po p
|
||||||
|
JOIN po_products pop ON p.po_id = pop.po_id
|
||||||
|
JOIN products pr ON pop.pid = pr.pid
|
||||||
|
LEFT JOIN suppliers s ON p.supplier_id = s.supplierid
|
||||||
|
WHERE p.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR)
|
||||||
|
${incrementalUpdate ? `
|
||||||
|
AND (
|
||||||
|
p.date_updated > ?
|
||||||
|
OR p.date_ordered > ?
|
||||||
|
OR p.date_estin > ?
|
||||||
|
)
|
||||||
|
` : ''}
|
||||||
|
ORDER BY p.po_id, pop.pid
|
||||||
|
LIMIT ${FETCH_BATCH_SIZE} OFFSET ${offset}
|
||||||
|
`, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []);
|
||||||
|
|
||||||
outputProgress({
|
if (poList.length === 0) {
|
||||||
status: "running",
|
allProcessed = true;
|
||||||
operation: "Purchase orders final import",
|
|
||||||
message: `Processing ${total_pos} purchase orders for final import`,
|
|
||||||
current: 0,
|
|
||||||
total: total_pos
|
|
||||||
});
|
|
||||||
|
|
||||||
// Process in chunks using cursor-based pagination
|
|
||||||
let lastPoId = 0;
|
|
||||||
let lastPid = 0;
|
|
||||||
let recordsAdded = 0;
|
|
||||||
let recordsUpdated = 0;
|
|
||||||
|
|
||||||
while (true) {
|
|
||||||
console.log('Fetching next chunk with lastPoId:', lastPoId, 'lastPid:', lastPid);
|
|
||||||
const chunkResult = await localConnection.query(`
|
|
||||||
SELECT po_id, pid FROM temp_purchase_orders
|
|
||||||
WHERE (po_id, pid) > ($1, $2)
|
|
||||||
ORDER BY po_id, pid
|
|
||||||
LIMIT $3
|
|
||||||
`, [lastPoId, lastPid, FINAL_CHUNK_SIZE]);
|
|
||||||
|
|
||||||
if (!chunkResult?.rows) {
|
|
||||||
console.error('No rows returned from chunk query:', chunkResult);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
const chunk = chunkResult.rows;
|
console.log(`Processing batch of ${poList.length} purchase order items (${offset}-${offset + poList.length})`);
|
||||||
console.log('Got chunk of size:', chunk.length);
|
|
||||||
if (chunk.length === 0) break;
|
// Process in smaller batches for inserts
|
||||||
|
for (let i = 0; i < poList.length; i += INSERT_BATCH_SIZE) {
|
||||||
const result = await localConnection.query(`
|
const batch = poList.slice(i, Math.min(i + INSERT_BATCH_SIZE, poList.length));
|
||||||
WITH inserted_pos AS (
|
|
||||||
INSERT INTO purchase_orders (
|
// Create parameterized query with placeholders
|
||||||
po_id, pid, sku, name, cost_price, po_cost_price,
|
const placeholders = batch.map((_, idx) => {
|
||||||
vendor, date, expected_date, status, notes,
|
const base = idx * 11; // 11 columns
|
||||||
ordered, received, receiving_status,
|
return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, $${base + 10}, $${base + 11})`;
|
||||||
received_date, last_received_date, received_by,
|
}).join(',');
|
||||||
receiving_history
|
|
||||||
|
// Create flattened values array
|
||||||
|
const values = batch.flatMap(po => [
|
||||||
|
po.po_id,
|
||||||
|
po.pid,
|
||||||
|
po.sku,
|
||||||
|
po.name,
|
||||||
|
po.vendor,
|
||||||
|
po.date,
|
||||||
|
po.expected_date,
|
||||||
|
po.status,
|
||||||
|
po.notes,
|
||||||
|
po.ordered,
|
||||||
|
po.cost_price
|
||||||
|
]);
|
||||||
|
|
||||||
|
// Execute batch insert
|
||||||
|
await localConnection.query(`
|
||||||
|
INSERT INTO temp_purchase_orders (
|
||||||
|
po_id, pid, sku, name, vendor, date, expected_date,
|
||||||
|
status, notes, ordered, cost_price
|
||||||
)
|
)
|
||||||
SELECT
|
VALUES ${placeholders}
|
||||||
po.po_id,
|
|
||||||
po.pid,
|
|
||||||
po.sku,
|
|
||||||
po.name,
|
|
||||||
COALESCE(
|
|
||||||
(
|
|
||||||
SELECT cost_each
|
|
||||||
FROM temp_po_receivings r2
|
|
||||||
WHERE r2.pid = po.pid
|
|
||||||
AND r2.po_id = po.po_id
|
|
||||||
AND r2.is_alt_po = 0
|
|
||||||
AND r2.cost_each > 0
|
|
||||||
ORDER BY r2.received_date
|
|
||||||
LIMIT 1
|
|
||||||
),
|
|
||||||
po.cost_price
|
|
||||||
) as cost_price,
|
|
||||||
po.cost_price as po_cost_price,
|
|
||||||
po.vendor,
|
|
||||||
po.date,
|
|
||||||
po.expected_date,
|
|
||||||
po.status,
|
|
||||||
po.notes,
|
|
||||||
po.ordered,
|
|
||||||
COALESCE(SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END), 0) as received,
|
|
||||||
CASE
|
|
||||||
WHEN COUNT(r.receiving_id) = 0 THEN 1 -- created
|
|
||||||
WHEN SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END) < po.ordered THEN 30 -- partial
|
|
||||||
ELSE 40 -- full
|
|
||||||
END as receiving_status,
|
|
||||||
MIN(CASE WHEN r.is_alt_po = 0 THEN r.received_date END) as received_date,
|
|
||||||
MAX(CASE WHEN r.is_alt_po = 0 THEN r.received_date END) as last_received_date,
|
|
||||||
(
|
|
||||||
SELECT r2.received_by_name
|
|
||||||
FROM temp_po_receivings r2
|
|
||||||
WHERE r2.pid = po.pid
|
|
||||||
AND r2.is_alt_po = 0
|
|
||||||
ORDER BY r2.received_date
|
|
||||||
LIMIT 1
|
|
||||||
) as received_by,
|
|
||||||
jsonb_build_object(
|
|
||||||
'ordered_qty', po.ordered,
|
|
||||||
'total_received', COALESCE(SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END), 0),
|
|
||||||
'remaining_unfulfilled', GREATEST(0, po.ordered - COALESCE(SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END), 0)),
|
|
||||||
'excess_received', GREATEST(0, COALESCE(SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END), 0) - po.ordered),
|
|
||||||
'po_cost', po.cost_price,
|
|
||||||
'actual_cost', COALESCE(
|
|
||||||
(
|
|
||||||
SELECT cost_each
|
|
||||||
FROM temp_po_receivings r2
|
|
||||||
WHERE r2.pid = po.pid
|
|
||||||
AND r2.is_alt_po = 0
|
|
||||||
AND r2.cost_each > 0
|
|
||||||
ORDER BY r2.received_date
|
|
||||||
LIMIT 1
|
|
||||||
),
|
|
||||||
po.cost_price
|
|
||||||
),
|
|
||||||
'fulfillment', (
|
|
||||||
SELECT jsonb_agg(
|
|
||||||
jsonb_build_object(
|
|
||||||
'receiving_id', r2.receiving_id,
|
|
||||||
'qty_applied', CASE
|
|
||||||
WHEN r2.running_total <= po.ordered THEN r2.qty_each
|
|
||||||
WHEN r2.running_total - r2.qty_each < po.ordered THEN po.ordered - (r2.running_total - r2.qty_each)
|
|
||||||
ELSE 0
|
|
||||||
END,
|
|
||||||
'qty_total', r2.qty_each,
|
|
||||||
'cost', r2.cost_each,
|
|
||||||
'date', r2.received_date,
|
|
||||||
'received_by', r2.received_by,
|
|
||||||
'received_by_name', r2.received_by_name,
|
|
||||||
'type', CASE r2.is_alt_po
|
|
||||||
WHEN 0 THEN 'original'
|
|
||||||
WHEN 1 THEN 'alternate'
|
|
||||||
ELSE 'no_po'
|
|
||||||
END,
|
|
||||||
'remaining_qty', CASE
|
|
||||||
WHEN r2.running_total <= po.ordered THEN 0
|
|
||||||
WHEN r2.running_total - r2.qty_each < po.ordered THEN r2.running_total - po.ordered
|
|
||||||
ELSE r2.qty_each
|
|
||||||
END,
|
|
||||||
'is_excess', r2.running_total > po.ordered
|
|
||||||
)
|
|
||||||
ORDER BY r2.received_date
|
|
||||||
)
|
|
||||||
FROM (
|
|
||||||
SELECT
|
|
||||||
r2.*,
|
|
||||||
SUM(r2.qty_each) OVER (
|
|
||||||
PARTITION BY r2.pid
|
|
||||||
ORDER BY r2.received_date
|
|
||||||
ROWS UNBOUNDED PRECEDING
|
|
||||||
) as running_total
|
|
||||||
FROM temp_po_receivings r2
|
|
||||||
WHERE r2.pid = po.pid
|
|
||||||
) r2
|
|
||||||
),
|
|
||||||
'alternate_po_receivings', (
|
|
||||||
SELECT jsonb_agg(
|
|
||||||
jsonb_build_object(
|
|
||||||
'receiving_id', r2.receiving_id,
|
|
||||||
'qty', r2.qty_each,
|
|
||||||
'cost', r2.cost_each,
|
|
||||||
'date', r2.received_date,
|
|
||||||
'received_by', r2.received_by,
|
|
||||||
'received_by_name', r2.received_by_name
|
|
||||||
)
|
|
||||||
ORDER BY r2.received_date
|
|
||||||
)
|
|
||||||
FROM temp_po_receivings r2
|
|
||||||
WHERE r2.pid = po.pid AND r2.is_alt_po = 1
|
|
||||||
),
|
|
||||||
'no_po_receivings', (
|
|
||||||
SELECT jsonb_agg(
|
|
||||||
jsonb_build_object(
|
|
||||||
'receiving_id', r2.receiving_id,
|
|
||||||
'qty', r2.qty_each,
|
|
||||||
'cost', r2.cost_each,
|
|
||||||
'date', r2.received_date,
|
|
||||||
'received_by', r2.received_by,
|
|
||||||
'received_by_name', r2.received_by_name
|
|
||||||
)
|
|
||||||
ORDER BY r2.received_date
|
|
||||||
)
|
|
||||||
FROM temp_po_receivings r2
|
|
||||||
WHERE r2.pid = po.pid AND r2.is_alt_po = 2
|
|
||||||
)
|
|
||||||
) as receiving_history
|
|
||||||
FROM temp_purchase_orders po
|
|
||||||
LEFT JOIN temp_po_receivings r ON po.pid = r.pid
|
|
||||||
WHERE (po.po_id, po.pid) IN (
|
|
||||||
SELECT po_id, pid FROM UNNEST($1::int[], $2::int[])
|
|
||||||
)
|
|
||||||
GROUP BY po.po_id, po.pid, po.sku, po.name, po.vendor, po.date,
|
|
||||||
po.expected_date, po.status, po.notes, po.ordered, po.cost_price
|
|
||||||
ON CONFLICT (po_id, pid) DO UPDATE SET
|
ON CONFLICT (po_id, pid) DO UPDATE SET
|
||||||
|
sku = EXCLUDED.sku,
|
||||||
|
name = EXCLUDED.name,
|
||||||
vendor = EXCLUDED.vendor,
|
vendor = EXCLUDED.vendor,
|
||||||
date = EXCLUDED.date,
|
date = EXCLUDED.date,
|
||||||
expected_date = EXCLUDED.expected_date,
|
expected_date = EXCLUDED.expected_date,
|
||||||
status = EXCLUDED.status,
|
status = EXCLUDED.status,
|
||||||
notes = EXCLUDED.notes,
|
notes = EXCLUDED.notes,
|
||||||
ordered = EXCLUDED.ordered,
|
ordered = EXCLUDED.ordered,
|
||||||
received = EXCLUDED.received,
|
cost_price = EXCLUDED.cost_price
|
||||||
receiving_status = EXCLUDED.receiving_status,
|
`, values);
|
||||||
received_date = EXCLUDED.received_date,
|
|
||||||
last_received_date = EXCLUDED.last_received_date,
|
totalProcessed += batch.length;
|
||||||
received_by = EXCLUDED.received_by,
|
|
||||||
receiving_history = EXCLUDED.receiving_history,
|
outputProgress({
|
||||||
cost_price = EXCLUDED.cost_price,
|
status: "running",
|
||||||
po_cost_price = EXCLUDED.po_cost_price
|
operation: "Purchase orders import",
|
||||||
RETURNING xmax
|
message: `Processed ${totalProcessed}/${total} purchase order items`,
|
||||||
)
|
current: totalProcessed,
|
||||||
SELECT
|
total: total,
|
||||||
COUNT(*) FILTER (WHERE xmax = 0) as inserted,
|
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
|
||||||
COUNT(*) FILTER (WHERE xmax <> 0) as updated
|
remaining: estimateRemaining(startTime, totalProcessed, total),
|
||||||
FROM inserted_pos
|
rate: calculateRate(startTime, totalProcessed)
|
||||||
`, [
|
});
|
||||||
chunk.map(r => r.po_id),
|
}
|
||||||
chunk.map(r => r.pid)
|
|
||||||
]);
|
// Update offset for next batch
|
||||||
|
offset += poList.length;
|
||||||
// Add debug logging
|
|
||||||
console.log('Insert result:', result?.rows?.[0]);
|
// Check if we've received fewer records than the batch size, meaning we're done
|
||||||
|
if (poList.length < FETCH_BATCH_SIZE) {
|
||||||
// Handle the result properly for PostgreSQL with more defensive coding
|
allProcessed = true;
|
||||||
const resultRow = result?.rows?.[0] || {};
|
|
||||||
const insertCount = parseInt(resultRow.inserted || '0', 10);
|
|
||||||
const updateCount = parseInt(resultRow.updated || '0', 10);
|
|
||||||
|
|
||||||
recordsAdded += insertCount;
|
|
||||||
recordsUpdated += updateCount;
|
|
||||||
totalProcessed += chunk.length;
|
|
||||||
|
|
||||||
// Update progress
|
|
||||||
outputProgress({
|
|
||||||
status: "running",
|
|
||||||
operation: "Purchase orders final import",
|
|
||||||
message: `Processed ${totalProcessed} of ${total_pos} purchase orders`,
|
|
||||||
current: totalProcessed,
|
|
||||||
total: total_pos,
|
|
||||||
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
|
|
||||||
remaining: estimateRemaining(startTime, totalProcessed, total_pos),
|
|
||||||
rate: calculateRate(startTime, totalProcessed)
|
|
||||||
});
|
|
||||||
|
|
||||||
// Update last processed IDs for next chunk with safety check
|
|
||||||
if (chunk.length > 0) {
|
|
||||||
const lastItem = chunk[chunk.length - 1];
|
|
||||||
if (lastItem) {
|
|
||||||
lastPoId = lastItem.po_id;
|
|
||||||
lastPid = lastItem.pid;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Count the temp table contents
|
||||||
|
const [tempCount] = await localConnection.query(`SELECT COUNT(*) FROM temp_purchase_orders`);
|
||||||
|
const tempRowCount = parseInt(tempCount.rows[0].count);
|
||||||
|
console.log(`Successfully inserted ${tempRowCount} rows into temp_purchase_orders`);
|
||||||
|
|
||||||
|
// Now insert into the final table
|
||||||
|
const [result] = await localConnection.query(`
|
||||||
|
WITH inserted_pos AS (
|
||||||
|
INSERT INTO purchase_orders (
|
||||||
|
po_id, pid, sku, name, cost_price, po_cost_price,
|
||||||
|
vendor, date, expected_date, status, notes,
|
||||||
|
ordered, received, receiving_status
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
po_id, pid, sku, name, cost_price, cost_price,
|
||||||
|
vendor, date, expected_date, status, notes,
|
||||||
|
ordered, 0 as received, 1 as receiving_status
|
||||||
|
FROM temp_purchase_orders
|
||||||
|
ON CONFLICT (po_id, pid) DO UPDATE SET
|
||||||
|
vendor = EXCLUDED.vendor,
|
||||||
|
date = EXCLUDED.date,
|
||||||
|
expected_date = EXCLUDED.expected_date,
|
||||||
|
status = EXCLUDED.status,
|
||||||
|
notes = EXCLUDED.notes,
|
||||||
|
ordered = EXCLUDED.ordered,
|
||||||
|
cost_price = EXCLUDED.cost_price,
|
||||||
|
po_cost_price = EXCLUDED.po_cost_price
|
||||||
|
RETURNING xmax = 0 as inserted
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
COUNT(*) FILTER (WHERE inserted) as inserted,
|
||||||
|
COUNT(*) FILTER (WHERE NOT inserted) as updated
|
||||||
|
FROM inserted_pos
|
||||||
|
`);
|
||||||
|
|
||||||
|
// Parse the result
|
||||||
|
const { inserted, updated } = result.rows[0];
|
||||||
|
recordsAdded = parseInt(inserted) || 0;
|
||||||
|
recordsUpdated = parseInt(updated) || 0;
|
||||||
|
|
||||||
// Update sync status
|
// Update sync status
|
||||||
await localConnection.query(`
|
await localConnection.query(`
|
||||||
INSERT INTO sync_status (table_name, last_sync_timestamp)
|
INSERT INTO sync_status (table_name, last_sync_timestamp)
|
||||||
@@ -620,29 +220,34 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
|
|||||||
`);
|
`);
|
||||||
|
|
||||||
// Clean up temporary tables
|
// Clean up temporary tables
|
||||||
await localConnection.query(`
|
await localConnection.query(`DROP TABLE IF EXISTS temp_purchase_orders;`);
|
||||||
DROP TABLE IF EXISTS temp_purchase_orders;
|
|
||||||
DROP TABLE IF EXISTS temp_po_receivings;
|
// Commit transaction
|
||||||
`);
|
await localConnection.commit();
|
||||||
|
|
||||||
return {
|
return {
|
||||||
status: "complete",
|
status: "complete",
|
||||||
recordsAdded,
|
recordsAdded: recordsAdded || 0,
|
||||||
recordsUpdated,
|
recordsUpdated: recordsUpdated || 0,
|
||||||
totalRecords: processed
|
totalRecords: totalProcessed
|
||||||
};
|
};
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error("Error during purchase orders import:", error);
|
console.error("Error during purchase orders import:", error);
|
||||||
// Attempt cleanup on error
|
|
||||||
|
// Rollback transaction
|
||||||
try {
|
try {
|
||||||
await localConnection.query(`
|
await localConnection.rollback();
|
||||||
DROP TABLE IF EXISTS temp_purchase_orders;
|
} catch (rollbackError) {
|
||||||
DROP TABLE IF EXISTS temp_po_receivings;
|
console.error('Error during rollback:', rollbackError.message);
|
||||||
`);
|
|
||||||
} catch (cleanupError) {
|
|
||||||
console.error('Error during cleanup:', cleanupError);
|
|
||||||
}
|
}
|
||||||
throw error;
|
|
||||||
|
return {
|
||||||
|
status: "error",
|
||||||
|
error: error.message,
|
||||||
|
recordsAdded: 0,
|
||||||
|
recordsUpdated: 0,
|
||||||
|
totalRecords: 0
|
||||||
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -184,7 +184,7 @@ async function resetDatabase() {
|
|||||||
SELECT string_agg(tablename, ', ') as tables
|
SELECT string_agg(tablename, ', ') as tables
|
||||||
FROM pg_tables
|
FROM pg_tables
|
||||||
WHERE schemaname = 'public'
|
WHERE schemaname = 'public'
|
||||||
AND tablename NOT IN ('users', 'permissions', 'user_permissions', 'calculate_history', 'import_history');
|
AND tablename NOT IN ('users', 'permissions', 'user_permissions', 'calculate_history', 'import_history', 'ai_prompts', 'ai_validation_performance', 'templates');
|
||||||
`);
|
`);
|
||||||
|
|
||||||
if (!tablesResult.rows[0].tables) {
|
if (!tablesResult.rows[0].tables) {
|
||||||
|
|||||||
@@ -757,8 +757,8 @@ router.get('/history/import', async (req, res) => {
|
|||||||
end_time,
|
end_time,
|
||||||
status,
|
status,
|
||||||
error_message,
|
error_message,
|
||||||
rows_processed::integer,
|
records_added::integer,
|
||||||
files_processed::integer
|
records_updated::integer
|
||||||
FROM import_history
|
FROM import_history
|
||||||
ORDER BY start_time DESC
|
ORDER BY start_time DESC
|
||||||
LIMIT 20
|
LIMIT 20
|
||||||
|
|||||||
Reference in New Issue
Block a user