Import fixes/optimizations
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress');
|
||||
|
||||
async function importPurchaseOrders(prodConnection, localConnection) {
|
||||
async function importPurchaseOrders(prodConnection, localConnection, incrementalUpdate = true) {
|
||||
const startTime = Date.now();
|
||||
|
||||
try {
|
||||
@@ -11,7 +11,7 @@ async function importPurchaseOrders(prodConnection, localConnection) {
|
||||
const lastSyncTime = syncInfo?.[0]?.last_sync_timestamp || '1970-01-01';
|
||||
|
||||
outputProgress({
|
||||
operation: "Starting purchase orders import - Initializing",
|
||||
operation: `Starting ${incrementalUpdate ? 'incremental' : 'full'} purchase orders import`,
|
||||
status: "running",
|
||||
});
|
||||
|
||||
@@ -26,7 +26,23 @@ async function importPurchaseOrders(prodConnection, localConnection) {
|
||||
.map((col) => col.COLUMN_NAME)
|
||||
.filter((name) => name !== "id");
|
||||
|
||||
// First get all relevant PO IDs with basic info - modified for incremental
|
||||
// Build incremental conditions
|
||||
const incrementalWhereClause = incrementalUpdate
|
||||
? `AND (
|
||||
p.stamp > ?
|
||||
OR p.date_modified > ?
|
||||
OR p.date_ordered > ?
|
||||
OR p.date_estin > ?
|
||||
OR r.stamp > ?
|
||||
OR rp.stamp > ?
|
||||
OR rp.received_date > ?
|
||||
)`
|
||||
: "";
|
||||
const incrementalParams = incrementalUpdate
|
||||
? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime]
|
||||
: [];
|
||||
|
||||
// First get all relevant PO IDs with basic info
|
||||
const [[{ total }]] = await prodConnection.query(`
|
||||
SELECT COUNT(*) as total
|
||||
FROM (
|
||||
@@ -36,19 +52,16 @@ async function importPurchaseOrders(prodConnection, localConnection) {
|
||||
JOIN po_products pop ON p.po_id = pop.po_id
|
||||
JOIN suppliers s ON p.supplier_id = s.supplierid
|
||||
WHERE p.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR)
|
||||
AND (p.date_ordered > ?
|
||||
OR p.stamp > ?
|
||||
OR p.date_modified > ?)
|
||||
${incrementalWhereClause}
|
||||
UNION
|
||||
SELECT DISTINCT r.receiving_id as po_id, rp.pid
|
||||
FROM receivings_products rp
|
||||
USE INDEX (received_date)
|
||||
LEFT JOIN receivings r ON r.receiving_id = rp.receiving_id
|
||||
WHERE rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR)
|
||||
AND (rp.received_date > ?
|
||||
OR rp.stamp > ?)
|
||||
${incrementalWhereClause}
|
||||
) all_items
|
||||
`, [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime]);
|
||||
`, [...incrementalParams, ...incrementalParams]);
|
||||
|
||||
const [poList] = await prodConnection.query(`
|
||||
SELECT DISTINCT
|
||||
@@ -294,11 +307,13 @@ async function importPurchaseOrders(prodConnection, localConnection) {
|
||||
}
|
||||
}
|
||||
|
||||
// After successful import, update sync status
|
||||
// Update sync status with proper incrementing of last_sync_id
|
||||
await localConnection.query(`
|
||||
INSERT INTO sync_status (table_name, last_sync_timestamp)
|
||||
VALUES ('purchase_orders', NOW())
|
||||
ON DUPLICATE KEY UPDATE last_sync_timestamp = NOW()
|
||||
ON DUPLICATE KEY UPDATE
|
||||
last_sync_timestamp = NOW(),
|
||||
last_sync_id = LAST_INSERT_ID(last_sync_id)
|
||||
`);
|
||||
|
||||
return {
|
||||
@@ -306,12 +321,13 @@ async function importPurchaseOrders(prodConnection, localConnection) {
|
||||
totalImported: totalItems,
|
||||
recordsAdded,
|
||||
recordsUpdated,
|
||||
incrementalUpdate: !!syncInfo?.[0]
|
||||
incrementalUpdate,
|
||||
lastSyncTime
|
||||
};
|
||||
|
||||
} catch (error) {
|
||||
outputProgress({
|
||||
operation: "Purchase orders import failed",
|
||||
operation: `${incrementalUpdate ? 'Incremental' : 'Full'} purchase orders import failed`,
|
||||
status: "error",
|
||||
error: error.message,
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user