UI tweaks for match columns step + auto hide empty columns
This commit is contained in:
@@ -1,884 +0,0 @@
|
||||
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics-new/utils/progress');
|
||||
|
||||
/**
|
||||
* Validates a date from MySQL before inserting it into PostgreSQL
|
||||
* @param {string|Date|null} mysqlDate - Date string or object from MySQL
|
||||
* @returns {string|null} Valid date string or null if invalid
|
||||
*/
|
||||
function validateDate(mysqlDate) {
|
||||
// Handle null, undefined, or empty values
|
||||
if (!mysqlDate) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Convert to string if it's not already
|
||||
const dateStr = String(mysqlDate);
|
||||
|
||||
// Handle MySQL zero dates and empty values
|
||||
if (dateStr === '0000-00-00' ||
|
||||
dateStr === '0000-00-00 00:00:00' ||
|
||||
dateStr.indexOf('0000-00-00') !== -1 ||
|
||||
dateStr === '') {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Check if the date is valid
|
||||
const date = new Date(mysqlDate);
|
||||
|
||||
// If the date is invalid or suspiciously old (pre-1970), return null
|
||||
if (isNaN(date.getTime()) || date.getFullYear() < 1970) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return mysqlDate;
|
||||
}
|
||||
|
||||
/**
|
||||
* Imports purchase orders and receivings from a production MySQL database to a local PostgreSQL database.
|
||||
* Handles these as separate data streams without complex FIFO allocation.
|
||||
*
|
||||
* @param {object} prodConnection - A MySQL connection to production DB
|
||||
* @param {object} localConnection - A PostgreSQL connection to local DB
|
||||
* @param {boolean} incrementalUpdate - Set to false for a full sync; true for incremental
|
||||
* @returns {object} Information about the sync operation
|
||||
*/
|
||||
async function importPurchaseOrders(prodConnection, localConnection, incrementalUpdate = true) {
|
||||
const startTime = Date.now();
|
||||
let poRecordsAdded = 0;
|
||||
let poRecordsUpdated = 0;
|
||||
let poRecordsDeleted = 0;
|
||||
let receivingRecordsAdded = 0;
|
||||
let receivingRecordsUpdated = 0;
|
||||
let receivingRecordsDeleted = 0;
|
||||
let totalProcessed = 0;
|
||||
|
||||
// Batch size constants
|
||||
const PO_BATCH_SIZE = 500;
|
||||
const INSERT_BATCH_SIZE = 100;
|
||||
|
||||
try {
|
||||
// Begin transaction for the entire import process
|
||||
await localConnection.beginTransaction();
|
||||
|
||||
// Get last sync info
|
||||
const [syncInfo] = await localConnection.query(
|
||||
"SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'purchase_orders'"
|
||||
);
|
||||
const lastSyncTime = syncInfo?.rows?.[0]?.last_sync_timestamp || '1970-01-01';
|
||||
|
||||
console.log('Purchase Orders: Using last sync time:', lastSyncTime);
|
||||
|
||||
// Create temp tables for processing
|
||||
await localConnection.query(`
|
||||
DROP TABLE IF EXISTS temp_purchase_orders;
|
||||
DROP TABLE IF EXISTS temp_receivings;
|
||||
DROP TABLE IF EXISTS employee_names;
|
||||
DROP TABLE IF EXISTS temp_supplier_names;
|
||||
|
||||
-- Temporary table for purchase orders
|
||||
CREATE TEMP TABLE temp_purchase_orders (
|
||||
po_id TEXT NOT NULL,
|
||||
pid BIGINT NOT NULL,
|
||||
sku TEXT,
|
||||
name TEXT,
|
||||
vendor TEXT,
|
||||
date TIMESTAMP WITH TIME ZONE,
|
||||
expected_date DATE,
|
||||
status TEXT,
|
||||
notes TEXT,
|
||||
long_note TEXT,
|
||||
ordered INTEGER,
|
||||
po_cost_price NUMERIC(14, 4),
|
||||
supplier_id INTEGER,
|
||||
date_created TIMESTAMP WITH TIME ZONE,
|
||||
date_ordered TIMESTAMP WITH TIME ZONE,
|
||||
PRIMARY KEY (po_id, pid)
|
||||
);
|
||||
|
||||
-- Temporary table for receivings
|
||||
CREATE TEMP TABLE temp_receivings (
|
||||
receiving_id TEXT NOT NULL,
|
||||
pid BIGINT NOT NULL,
|
||||
sku TEXT,
|
||||
name TEXT,
|
||||
vendor TEXT,
|
||||
qty_each INTEGER,
|
||||
qty_each_orig INTEGER,
|
||||
cost_each NUMERIC(14, 5),
|
||||
cost_each_orig NUMERIC(14, 5),
|
||||
received_by INTEGER,
|
||||
received_by_name TEXT,
|
||||
received_date TIMESTAMP WITH TIME ZONE,
|
||||
receiving_created_date TIMESTAMP WITH TIME ZONE,
|
||||
supplier_id INTEGER,
|
||||
status TEXT,
|
||||
PRIMARY KEY (receiving_id, pid)
|
||||
);
|
||||
|
||||
-- Temporary table for employee names
|
||||
CREATE TEMP TABLE employee_names (
|
||||
employeeid INTEGER PRIMARY KEY,
|
||||
firstname TEXT,
|
||||
lastname TEXT
|
||||
);
|
||||
|
||||
-- Create indexes for efficient joins
|
||||
CREATE INDEX idx_temp_po_pid ON temp_purchase_orders(pid);
|
||||
CREATE INDEX idx_temp_receiving_pid ON temp_receivings(pid);
|
||||
`);
|
||||
|
||||
// Map status codes to text values
|
||||
const poStatusMap = {
|
||||
0: 'canceled',
|
||||
1: 'created',
|
||||
10: 'electronically_ready_send',
|
||||
11: 'ordered',
|
||||
12: 'preordered',
|
||||
13: 'electronically_sent',
|
||||
15: 'receiving_started',
|
||||
50: 'done'
|
||||
};
|
||||
|
||||
const receivingStatusMap = {
|
||||
0: 'canceled',
|
||||
1: 'created',
|
||||
30: 'partial_received',
|
||||
40: 'full_received',
|
||||
50: 'paid'
|
||||
};
|
||||
|
||||
// Get time window for data retrieval
|
||||
const yearInterval = incrementalUpdate ? 1 : 5;
|
||||
|
||||
// Fetch employee data from production
|
||||
outputProgress({
|
||||
status: "running",
|
||||
operation: "Purchase orders import",
|
||||
message: "Fetching employee data"
|
||||
});
|
||||
|
||||
const [employees] = await prodConnection.query(`
|
||||
SELECT
|
||||
employeeid,
|
||||
firstname,
|
||||
lastname
|
||||
FROM employees
|
||||
`);
|
||||
|
||||
// Insert employee data into temp table
|
||||
if (employees.length > 0) {
|
||||
const employeeValues = employees.map(emp => [
|
||||
emp.employeeid,
|
||||
emp.firstname || '',
|
||||
emp.lastname || ''
|
||||
]).flat();
|
||||
|
||||
const placeholders = employees.map((_, idx) => {
|
||||
const base = idx * 3;
|
||||
return `($${base + 1}, $${base + 2}, $${base + 3})`;
|
||||
}).join(',');
|
||||
|
||||
await localConnection.query(`
|
||||
INSERT INTO employee_names (employeeid, firstname, lastname)
|
||||
VALUES ${placeholders}
|
||||
ON CONFLICT (employeeid) DO UPDATE SET
|
||||
firstname = EXCLUDED.firstname,
|
||||
lastname = EXCLUDED.lastname
|
||||
`, employeeValues);
|
||||
}
|
||||
|
||||
// Add this section before the PO import to create a supplier names mapping
|
||||
outputProgress({
|
||||
status: "running",
|
||||
operation: "Purchase orders import",
|
||||
message: "Fetching supplier data for vendor mapping"
|
||||
});
|
||||
|
||||
// Fetch supplier data from production and store in a temp table
|
||||
const [suppliers] = await prodConnection.query(`
|
||||
SELECT
|
||||
supplierid,
|
||||
companyname
|
||||
FROM suppliers
|
||||
WHERE companyname IS NOT NULL AND companyname != ''
|
||||
`);
|
||||
|
||||
if (suppliers.length > 0) {
|
||||
// Create temp table for supplier names
|
||||
await localConnection.query(`
|
||||
DROP TABLE IF EXISTS temp_supplier_names;
|
||||
CREATE TEMP TABLE temp_supplier_names (
|
||||
supplier_id INTEGER PRIMARY KEY,
|
||||
company_name TEXT NOT NULL
|
||||
);
|
||||
`);
|
||||
|
||||
// Insert supplier data in batches
|
||||
for (let i = 0; i < suppliers.length; i += INSERT_BATCH_SIZE) {
|
||||
const batch = suppliers.slice(i, i + INSERT_BATCH_SIZE);
|
||||
|
||||
const placeholders = batch.map((_, idx) => {
|
||||
const base = idx * 2;
|
||||
return `($${base + 1}, $${base + 2})`;
|
||||
}).join(',');
|
||||
|
||||
const values = batch.flatMap(s => [
|
||||
s.supplierid,
|
||||
s.companyname || 'Unnamed Supplier'
|
||||
]);
|
||||
|
||||
await localConnection.query(`
|
||||
INSERT INTO temp_supplier_names (supplier_id, company_name)
|
||||
VALUES ${placeholders}
|
||||
ON CONFLICT (supplier_id) DO UPDATE SET
|
||||
company_name = EXCLUDED.company_name
|
||||
`, values);
|
||||
}
|
||||
}
|
||||
|
||||
// 1. Fetch and process purchase orders
|
||||
outputProgress({
|
||||
status: "running",
|
||||
operation: "Purchase orders import",
|
||||
message: "Fetching purchase orders"
|
||||
});
|
||||
|
||||
const [poCount] = await prodConnection.query(`
|
||||
SELECT COUNT(*) as total
|
||||
FROM po p
|
||||
WHERE p.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL ${yearInterval} YEAR)
|
||||
${incrementalUpdate ? `
|
||||
AND (
|
||||
p.date_updated > ?
|
||||
OR p.date_ordered > ?
|
||||
OR p.date_estin > ?
|
||||
)
|
||||
` : ''}
|
||||
`, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []);
|
||||
|
||||
const totalPOs = poCount[0].total;
|
||||
console.log(`Found ${totalPOs} relevant purchase orders`);
|
||||
|
||||
// Skip processing if no POs to process
|
||||
if (totalPOs === 0) {
|
||||
console.log('No purchase orders to process, skipping PO import step');
|
||||
} else {
|
||||
// Fetch and process POs in batches
|
||||
let offset = 0;
|
||||
let allPOsProcessed = false;
|
||||
|
||||
while (!allPOsProcessed) {
|
||||
const [poList] = await prodConnection.query(`
|
||||
SELECT
|
||||
p.po_id,
|
||||
p.supplier_id,
|
||||
s.companyname AS vendor,
|
||||
p.status,
|
||||
p.notes AS long_note,
|
||||
p.short_note AS notes,
|
||||
p.date_created,
|
||||
p.date_ordered,
|
||||
p.date_estin
|
||||
FROM po p
|
||||
LEFT JOIN suppliers s ON p.supplier_id = s.supplierid
|
||||
WHERE p.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL ${yearInterval} YEAR)
|
||||
${incrementalUpdate ? `
|
||||
AND (
|
||||
p.date_updated > ?
|
||||
OR p.date_ordered > ?
|
||||
OR p.date_estin > ?
|
||||
)
|
||||
` : ''}
|
||||
ORDER BY p.po_id
|
||||
LIMIT ${PO_BATCH_SIZE} OFFSET ${offset}
|
||||
`, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []);
|
||||
|
||||
if (poList.length === 0) {
|
||||
allPOsProcessed = true;
|
||||
break;
|
||||
}
|
||||
|
||||
// Get products for these POs
|
||||
const poIds = poList.map(po => po.po_id);
|
||||
|
||||
const [poProducts] = await prodConnection.query(`
|
||||
SELECT
|
||||
pp.po_id,
|
||||
pp.pid,
|
||||
pp.qty_each,
|
||||
pp.cost_each,
|
||||
COALESCE(p.itemnumber, 'NO-SKU') AS sku,
|
||||
COALESCE(p.description, 'Unknown Product') AS name
|
||||
FROM po_products pp
|
||||
LEFT JOIN products p ON pp.pid = p.pid
|
||||
WHERE pp.po_id IN (?)
|
||||
`, [poIds]);
|
||||
|
||||
// Build complete PO records
|
||||
const completePOs = [];
|
||||
for (const product of poProducts) {
|
||||
const po = poList.find(p => p.po_id == product.po_id);
|
||||
if (!po) continue;
|
||||
|
||||
completePOs.push({
|
||||
po_id: po.po_id.toString(),
|
||||
pid: product.pid,
|
||||
sku: product.sku,
|
||||
name: product.name,
|
||||
vendor: po.vendor || 'Unknown Vendor',
|
||||
date: validateDate(po.date_ordered) || validateDate(po.date_created),
|
||||
expected_date: validateDate(po.date_estin),
|
||||
status: poStatusMap[po.status] || 'created',
|
||||
notes: po.notes || '',
|
||||
long_note: po.long_note || '',
|
||||
ordered: product.qty_each,
|
||||
po_cost_price: product.cost_each,
|
||||
supplier_id: po.supplier_id,
|
||||
date_created: validateDate(po.date_created),
|
||||
date_ordered: validateDate(po.date_ordered)
|
||||
});
|
||||
}
|
||||
|
||||
// Insert PO data in batches
|
||||
for (let i = 0; i < completePOs.length; i += INSERT_BATCH_SIZE) {
|
||||
const batch = completePOs.slice(i, i + INSERT_BATCH_SIZE);
|
||||
|
||||
const placeholders = batch.map((_, idx) => {
|
||||
const base = idx * 15;
|
||||
return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, $${base + 10}, $${base + 11}, $${base + 12}, $${base + 13}, $${base + 14}, $${base + 15})`;
|
||||
}).join(',');
|
||||
|
||||
const values = batch.flatMap(po => [
|
||||
po.po_id,
|
||||
po.pid,
|
||||
po.sku,
|
||||
po.name,
|
||||
po.vendor,
|
||||
po.date,
|
||||
po.expected_date,
|
||||
po.status,
|
||||
po.notes,
|
||||
po.long_note,
|
||||
po.ordered,
|
||||
po.po_cost_price,
|
||||
po.supplier_id,
|
||||
po.date_created,
|
||||
po.date_ordered
|
||||
]);
|
||||
|
||||
await localConnection.query(`
|
||||
INSERT INTO temp_purchase_orders (
|
||||
po_id, pid, sku, name, vendor, date, expected_date, status, notes, long_note,
|
||||
ordered, po_cost_price, supplier_id, date_created, date_ordered
|
||||
)
|
||||
VALUES ${placeholders}
|
||||
ON CONFLICT (po_id, pid) DO UPDATE SET
|
||||
sku = EXCLUDED.sku,
|
||||
name = EXCLUDED.name,
|
||||
vendor = EXCLUDED.vendor,
|
||||
date = EXCLUDED.date,
|
||||
expected_date = EXCLUDED.expected_date,
|
||||
status = EXCLUDED.status,
|
||||
notes = EXCLUDED.notes,
|
||||
long_note = EXCLUDED.long_note,
|
||||
ordered = EXCLUDED.ordered,
|
||||
po_cost_price = EXCLUDED.po_cost_price,
|
||||
supplier_id = EXCLUDED.supplier_id,
|
||||
date_created = EXCLUDED.date_created,
|
||||
date_ordered = EXCLUDED.date_ordered
|
||||
`, values);
|
||||
}
|
||||
|
||||
offset += poList.length;
|
||||
totalProcessed += completePOs.length;
|
||||
|
||||
outputProgress({
|
||||
status: "running",
|
||||
operation: "Purchase orders import",
|
||||
message: `Processed ${offset} of ${totalPOs} purchase orders (${totalProcessed} line items)`,
|
||||
current: offset,
|
||||
total: totalPOs,
|
||||
elapsed: formatElapsedTime(startTime),
|
||||
remaining: estimateRemaining(startTime, offset, totalPOs),
|
||||
rate: calculateRate(startTime, offset)
|
||||
});
|
||||
|
||||
if (poList.length < PO_BATCH_SIZE) {
|
||||
allPOsProcessed = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 2. Next, fetch all relevant receivings
|
||||
outputProgress({
|
||||
status: "running",
|
||||
operation: "Purchase orders import",
|
||||
message: "Fetching receivings data"
|
||||
});
|
||||
|
||||
const [receivingCount] = await prodConnection.query(`
|
||||
SELECT COUNT(*) as total
|
||||
FROM receivings r
|
||||
WHERE r.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL ${yearInterval} YEAR)
|
||||
${incrementalUpdate ? `
|
||||
AND (
|
||||
r.date_updated > ?
|
||||
OR r.date_created > ?
|
||||
)
|
||||
` : ''}
|
||||
`, incrementalUpdate ? [lastSyncTime, lastSyncTime] : []);
|
||||
|
||||
const totalReceivings = receivingCount[0].total;
|
||||
console.log(`Found ${totalReceivings} relevant receivings`);
|
||||
|
||||
// Skip processing if no receivings to process
|
||||
if (totalReceivings === 0) {
|
||||
console.log('No receivings to process, skipping receivings import step');
|
||||
} else {
|
||||
// Fetch and process receivings in batches
|
||||
offset = 0; // Reset offset for receivings
|
||||
let allReceivingsProcessed = false;
|
||||
|
||||
while (!allReceivingsProcessed) {
|
||||
const [receivingList] = await prodConnection.query(`
|
||||
SELECT
|
||||
r.receiving_id,
|
||||
r.supplier_id,
|
||||
r.status,
|
||||
r.notes,
|
||||
r.shipping,
|
||||
r.total_amount,
|
||||
r.hold,
|
||||
r.for_storefront,
|
||||
r.date_created,
|
||||
r.date_paid,
|
||||
r.date_checked
|
||||
FROM receivings r
|
||||
WHERE r.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL ${yearInterval} YEAR)
|
||||
${incrementalUpdate ? `
|
||||
AND (
|
||||
r.date_updated > ?
|
||||
OR r.date_created > ?
|
||||
)
|
||||
` : ''}
|
||||
ORDER BY r.receiving_id
|
||||
LIMIT ${PO_BATCH_SIZE} OFFSET ${offset}
|
||||
`, incrementalUpdate ? [lastSyncTime, lastSyncTime] : []);
|
||||
|
||||
if (receivingList.length === 0) {
|
||||
allReceivingsProcessed = true;
|
||||
break;
|
||||
}
|
||||
|
||||
// Get products for these receivings
|
||||
const receivingIds = receivingList.map(r => r.receiving_id);
|
||||
|
||||
const [receivingProducts] = await prodConnection.query(`
|
||||
SELECT
|
||||
rp.receiving_id,
|
||||
rp.pid,
|
||||
rp.qty_each,
|
||||
rp.qty_each_orig,
|
||||
rp.cost_each,
|
||||
rp.cost_each_orig,
|
||||
rp.received_by,
|
||||
rp.received_date,
|
||||
r.date_created as receiving_created_date,
|
||||
COALESCE(p.itemnumber, 'NO-SKU') AS sku,
|
||||
COALESCE(p.description, 'Unknown Product') AS name
|
||||
FROM receivings_products rp
|
||||
JOIN receivings r ON rp.receiving_id = r.receiving_id
|
||||
LEFT JOIN products p ON rp.pid = p.pid
|
||||
WHERE rp.receiving_id IN (?)
|
||||
`, [receivingIds]);
|
||||
|
||||
// Build complete receiving records
|
||||
const completeReceivings = [];
|
||||
for (const product of receivingProducts) {
|
||||
const receiving = receivingList.find(r => r.receiving_id == product.receiving_id);
|
||||
if (!receiving) continue;
|
||||
|
||||
// Get employee name if available
|
||||
let receivedByName = null;
|
||||
if (product.received_by) {
|
||||
const [employeeResult] = await localConnection.query(`
|
||||
SELECT CONCAT(firstname, ' ', lastname) as full_name
|
||||
FROM employee_names
|
||||
WHERE employeeid = $1
|
||||
`, [product.received_by]);
|
||||
|
||||
if (employeeResult.rows.length > 0) {
|
||||
receivedByName = employeeResult.rows[0].full_name;
|
||||
}
|
||||
}
|
||||
|
||||
// Get vendor name if available
|
||||
let vendorName = 'Unknown Vendor';
|
||||
if (receiving.supplier_id) {
|
||||
const [vendorResult] = await localConnection.query(`
|
||||
SELECT company_name
|
||||
FROM temp_supplier_names
|
||||
WHERE supplier_id = $1
|
||||
`, [receiving.supplier_id]);
|
||||
|
||||
if (vendorResult.rows.length > 0) {
|
||||
vendorName = vendorResult.rows[0].company_name;
|
||||
}
|
||||
}
|
||||
|
||||
completeReceivings.push({
|
||||
receiving_id: receiving.receiving_id.toString(),
|
||||
pid: product.pid,
|
||||
sku: product.sku,
|
||||
name: product.name,
|
||||
vendor: vendorName,
|
||||
qty_each: product.qty_each,
|
||||
qty_each_orig: product.qty_each_orig,
|
||||
cost_each: product.cost_each,
|
||||
cost_each_orig: product.cost_each_orig,
|
||||
received_by: product.received_by,
|
||||
received_by_name: receivedByName,
|
||||
received_date: validateDate(product.received_date) || validateDate(product.receiving_created_date),
|
||||
receiving_created_date: validateDate(product.receiving_created_date),
|
||||
supplier_id: receiving.supplier_id,
|
||||
status: receivingStatusMap[receiving.status] || 'created'
|
||||
});
|
||||
}
|
||||
|
||||
// Insert receiving data in batches
|
||||
for (let i = 0; i < completeReceivings.length; i += INSERT_BATCH_SIZE) {
|
||||
const batch = completeReceivings.slice(i, i + INSERT_BATCH_SIZE);
|
||||
|
||||
const placeholders = batch.map((_, idx) => {
|
||||
const base = idx * 15;
|
||||
return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, $${base + 10}, $${base + 11}, $${base + 12}, $${base + 13}, $${base + 14}, $${base + 15})`;
|
||||
}).join(',');
|
||||
|
||||
const values = batch.flatMap(r => [
|
||||
r.receiving_id,
|
||||
r.pid,
|
||||
r.sku,
|
||||
r.name,
|
||||
r.vendor,
|
||||
r.qty_each,
|
||||
r.qty_each_orig,
|
||||
r.cost_each,
|
||||
r.cost_each_orig,
|
||||
r.received_by,
|
||||
r.received_by_name,
|
||||
r.received_date,
|
||||
r.receiving_created_date,
|
||||
r.supplier_id,
|
||||
r.status
|
||||
]);
|
||||
|
||||
await localConnection.query(`
|
||||
INSERT INTO temp_receivings (
|
||||
receiving_id, pid, sku, name, vendor, qty_each, qty_each_orig,
|
||||
cost_each, cost_each_orig, received_by, received_by_name,
|
||||
received_date, receiving_created_date, supplier_id, status
|
||||
)
|
||||
VALUES ${placeholders}
|
||||
ON CONFLICT (receiving_id, pid) DO UPDATE SET
|
||||
sku = EXCLUDED.sku,
|
||||
name = EXCLUDED.name,
|
||||
vendor = EXCLUDED.vendor,
|
||||
qty_each = EXCLUDED.qty_each,
|
||||
qty_each_orig = EXCLUDED.qty_each_orig,
|
||||
cost_each = EXCLUDED.cost_each,
|
||||
cost_each_orig = EXCLUDED.cost_each_orig,
|
||||
received_by = EXCLUDED.received_by,
|
||||
received_by_name = EXCLUDED.received_by_name,
|
||||
received_date = EXCLUDED.received_date,
|
||||
receiving_created_date = EXCLUDED.receiving_created_date,
|
||||
supplier_id = EXCLUDED.supplier_id,
|
||||
status = EXCLUDED.status
|
||||
`, values);
|
||||
}
|
||||
|
||||
offset += receivingList.length;
|
||||
totalProcessed += completeReceivings.length;
|
||||
|
||||
outputProgress({
|
||||
status: "running",
|
||||
operation: "Purchase orders import",
|
||||
message: `Processed ${offset} of ${totalReceivings} receivings (${totalProcessed} line items total)`,
|
||||
current: offset,
|
||||
total: totalReceivings,
|
||||
elapsed: formatElapsedTime(startTime),
|
||||
remaining: estimateRemaining(startTime, offset, totalReceivings),
|
||||
rate: calculateRate(startTime, offset)
|
||||
});
|
||||
|
||||
if (receivingList.length < PO_BATCH_SIZE) {
|
||||
allReceivingsProcessed = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add this section to filter out invalid PIDs before final import
|
||||
outputProgress({
|
||||
status: "running",
|
||||
operation: "Purchase orders import",
|
||||
message: "Validating product IDs before final import"
|
||||
});
|
||||
|
||||
await localConnection.query(`
|
||||
-- Create temp table to store invalid PIDs
|
||||
DROP TABLE IF EXISTS temp_invalid_pids;
|
||||
CREATE TEMP TABLE temp_invalid_pids AS (
|
||||
-- Get all unique PIDs from our temp tables
|
||||
WITH all_pids AS (
|
||||
SELECT DISTINCT pid FROM temp_purchase_orders
|
||||
UNION
|
||||
SELECT DISTINCT pid FROM temp_receivings
|
||||
)
|
||||
-- Filter to only those that don't exist in products table
|
||||
SELECT p.pid
|
||||
FROM all_pids p
|
||||
WHERE NOT EXISTS (
|
||||
SELECT 1 FROM products WHERE pid = p.pid
|
||||
)
|
||||
);
|
||||
|
||||
-- Remove purchase orders with invalid PIDs
|
||||
DELETE FROM temp_purchase_orders
|
||||
WHERE pid IN (SELECT pid FROM temp_invalid_pids);
|
||||
|
||||
-- Remove receivings with invalid PIDs
|
||||
DELETE FROM temp_receivings
|
||||
WHERE pid IN (SELECT pid FROM temp_invalid_pids);
|
||||
`);
|
||||
|
||||
// Get count of filtered items for reporting
|
||||
const [filteredResult] = await localConnection.query(`
|
||||
SELECT COUNT(*) as count FROM temp_invalid_pids
|
||||
`);
|
||||
const filteredCount = filteredResult.rows[0].count;
|
||||
|
||||
if (filteredCount > 0) {
|
||||
console.log(`Filtered out ${filteredCount} items with invalid product IDs`);
|
||||
}
|
||||
|
||||
// 3. Insert final purchase order records to the actual table
|
||||
outputProgress({
|
||||
status: "running",
|
||||
operation: "Purchase orders import",
|
||||
message: "Inserting final purchase order records"
|
||||
});
|
||||
|
||||
// Create a temp table to track PO IDs being processed
|
||||
await localConnection.query(`
|
||||
DROP TABLE IF EXISTS processed_po_ids;
|
||||
CREATE TEMP TABLE processed_po_ids AS (
|
||||
SELECT DISTINCT po_id FROM temp_purchase_orders
|
||||
);
|
||||
`);
|
||||
|
||||
// Delete products that were removed from POs and count them
|
||||
const [poDeletedResult] = await localConnection.query(`
|
||||
WITH deleted AS (
|
||||
DELETE FROM purchase_orders
|
||||
WHERE po_id IN (SELECT po_id FROM processed_po_ids)
|
||||
AND NOT EXISTS (
|
||||
SELECT 1 FROM temp_purchase_orders tp
|
||||
WHERE purchase_orders.po_id = tp.po_id AND purchase_orders.pid = tp.pid
|
||||
)
|
||||
RETURNING po_id, pid
|
||||
)
|
||||
SELECT COUNT(*) as count FROM deleted
|
||||
`);
|
||||
|
||||
poRecordsDeleted = poDeletedResult.rows[0]?.count || 0;
|
||||
console.log(`Deleted ${poRecordsDeleted} products that were removed from purchase orders`);
|
||||
|
||||
const [poResult] = await localConnection.query(`
|
||||
INSERT INTO purchase_orders (
|
||||
po_id, vendor, date, expected_date, pid, sku, name,
|
||||
po_cost_price, status, notes, long_note,
|
||||
ordered, supplier_id, date_created, date_ordered
|
||||
)
|
||||
SELECT
|
||||
po_id,
|
||||
vendor,
|
||||
COALESCE(date, date_created, now()) as date,
|
||||
expected_date,
|
||||
pid,
|
||||
sku,
|
||||
name,
|
||||
po_cost_price,
|
||||
status,
|
||||
notes,
|
||||
long_note,
|
||||
ordered,
|
||||
supplier_id,
|
||||
date_created,
|
||||
date_ordered
|
||||
FROM temp_purchase_orders
|
||||
ON CONFLICT (po_id, pid) DO UPDATE SET
|
||||
vendor = EXCLUDED.vendor,
|
||||
date = EXCLUDED.date,
|
||||
expected_date = EXCLUDED.expected_date,
|
||||
sku = EXCLUDED.sku,
|
||||
name = EXCLUDED.name,
|
||||
po_cost_price = EXCLUDED.po_cost_price,
|
||||
status = EXCLUDED.status,
|
||||
notes = EXCLUDED.notes,
|
||||
long_note = EXCLUDED.long_note,
|
||||
ordered = EXCLUDED.ordered,
|
||||
supplier_id = EXCLUDED.supplier_id,
|
||||
date_created = EXCLUDED.date_created,
|
||||
date_ordered = EXCLUDED.date_ordered,
|
||||
updated = CURRENT_TIMESTAMP
|
||||
WHERE -- Only update if at least one key field has changed
|
||||
purchase_orders.ordered IS DISTINCT FROM EXCLUDED.ordered OR
|
||||
purchase_orders.po_cost_price IS DISTINCT FROM EXCLUDED.po_cost_price OR
|
||||
purchase_orders.status IS DISTINCT FROM EXCLUDED.status OR
|
||||
purchase_orders.expected_date IS DISTINCT FROM EXCLUDED.expected_date OR
|
||||
purchase_orders.date IS DISTINCT FROM EXCLUDED.date OR
|
||||
purchase_orders.vendor IS DISTINCT FROM EXCLUDED.vendor
|
||||
RETURNING (xmax = 0) as inserted
|
||||
`);
|
||||
|
||||
poRecordsAdded = poResult.rows.filter(r => r.inserted).length;
|
||||
poRecordsUpdated = poResult.rows.filter(r => !r.inserted).length;
|
||||
|
||||
// 4. Insert final receiving records to the actual table
|
||||
outputProgress({
|
||||
status: "running",
|
||||
operation: "Purchase orders import",
|
||||
message: "Inserting final receiving records"
|
||||
});
|
||||
|
||||
// Create a temp table to track receiving IDs being processed
|
||||
await localConnection.query(`
|
||||
DROP TABLE IF EXISTS processed_receiving_ids;
|
||||
CREATE TEMP TABLE processed_receiving_ids AS (
|
||||
SELECT DISTINCT receiving_id FROM temp_receivings
|
||||
);
|
||||
`);
|
||||
|
||||
// Delete products that were removed from receivings and count them
|
||||
const [receivingDeletedResult] = await localConnection.query(`
|
||||
WITH deleted AS (
|
||||
DELETE FROM receivings
|
||||
WHERE receiving_id IN (SELECT receiving_id FROM processed_receiving_ids)
|
||||
AND NOT EXISTS (
|
||||
SELECT 1 FROM temp_receivings tr
|
||||
WHERE receivings.receiving_id = tr.receiving_id AND receivings.pid = tr.pid
|
||||
)
|
||||
RETURNING receiving_id, pid
|
||||
)
|
||||
SELECT COUNT(*) as count FROM deleted
|
||||
`);
|
||||
|
||||
receivingRecordsDeleted = receivingDeletedResult.rows[0]?.count || 0;
|
||||
console.log(`Deleted ${receivingRecordsDeleted} products that were removed from receivings`);
|
||||
|
||||
const [receivingsResult] = await localConnection.query(`
|
||||
INSERT INTO receivings (
|
||||
receiving_id, pid, sku, name, vendor, qty_each, qty_each_orig,
|
||||
cost_each, cost_each_orig, received_by, received_by_name,
|
||||
received_date, receiving_created_date, supplier_id, status
|
||||
)
|
||||
SELECT
|
||||
receiving_id,
|
||||
pid,
|
||||
sku,
|
||||
name,
|
||||
vendor,
|
||||
qty_each,
|
||||
qty_each_orig,
|
||||
cost_each,
|
||||
cost_each_orig,
|
||||
received_by,
|
||||
received_by_name,
|
||||
COALESCE(received_date, receiving_created_date, now()) as received_date,
|
||||
receiving_created_date,
|
||||
supplier_id,
|
||||
status
|
||||
FROM temp_receivings
|
||||
ON CONFLICT (receiving_id, pid) DO UPDATE SET
|
||||
sku = EXCLUDED.sku,
|
||||
name = EXCLUDED.name,
|
||||
vendor = EXCLUDED.vendor,
|
||||
qty_each = EXCLUDED.qty_each,
|
||||
qty_each_orig = EXCLUDED.qty_each_orig,
|
||||
cost_each = EXCLUDED.cost_each,
|
||||
cost_each_orig = EXCLUDED.cost_each_orig,
|
||||
received_by = EXCLUDED.received_by,
|
||||
received_by_name = EXCLUDED.received_by_name,
|
||||
received_date = EXCLUDED.received_date,
|
||||
receiving_created_date = EXCLUDED.receiving_created_date,
|
||||
supplier_id = EXCLUDED.supplier_id,
|
||||
status = EXCLUDED.status,
|
||||
updated = CURRENT_TIMESTAMP
|
||||
WHERE -- Only update if at least one key field has changed
|
||||
receivings.qty_each IS DISTINCT FROM EXCLUDED.qty_each OR
|
||||
receivings.cost_each IS DISTINCT FROM EXCLUDED.cost_each OR
|
||||
receivings.status IS DISTINCT FROM EXCLUDED.status OR
|
||||
receivings.received_date IS DISTINCT FROM EXCLUDED.received_date OR
|
||||
receivings.received_by IS DISTINCT FROM EXCLUDED.received_by
|
||||
RETURNING (xmax = 0) as inserted
|
||||
`);
|
||||
|
||||
receivingRecordsAdded = receivingsResult.rows.filter(r => r.inserted).length;
|
||||
receivingRecordsUpdated = receivingsResult.rows.filter(r => !r.inserted).length;
|
||||
|
||||
// Update sync status
|
||||
await localConnection.query(`
|
||||
INSERT INTO sync_status (table_name, last_sync_timestamp)
|
||||
VALUES ('purchase_orders', NOW())
|
||||
ON CONFLICT (table_name) DO UPDATE SET
|
||||
last_sync_timestamp = NOW()
|
||||
`);
|
||||
|
||||
// Clean up temporary tables
|
||||
await localConnection.query(`
|
||||
DROP TABLE IF EXISTS temp_purchase_orders;
|
||||
DROP TABLE IF EXISTS temp_receivings;
|
||||
DROP TABLE IF EXISTS employee_names;
|
||||
DROP TABLE IF EXISTS temp_supplier_names;
|
||||
DROP TABLE IF EXISTS temp_invalid_pids;
|
||||
DROP TABLE IF EXISTS processed_po_ids;
|
||||
DROP TABLE IF EXISTS processed_receiving_ids;
|
||||
`);
|
||||
|
||||
// Commit transaction
|
||||
await localConnection.commit();
|
||||
|
||||
return {
|
||||
status: "complete",
|
||||
recordsAdded: poRecordsAdded + receivingRecordsAdded,
|
||||
recordsUpdated: poRecordsUpdated + receivingRecordsUpdated,
|
||||
recordsDeleted: poRecordsDeleted + receivingRecordsDeleted,
|
||||
poRecordsAdded,
|
||||
poRecordsUpdated,
|
||||
poRecordsDeleted,
|
||||
receivingRecordsAdded,
|
||||
receivingRecordsUpdated,
|
||||
receivingRecordsDeleted,
|
||||
totalRecords: totalProcessed
|
||||
};
|
||||
} catch (error) {
|
||||
console.error("Error during purchase orders import:", error);
|
||||
|
||||
// Rollback transaction
|
||||
try {
|
||||
await localConnection.rollback();
|
||||
} catch (rollbackError) {
|
||||
console.error('Error during rollback:', rollbackError.message);
|
||||
}
|
||||
|
||||
return {
|
||||
status: "error",
|
||||
error: error.message,
|
||||
recordsAdded: 0,
|
||||
recordsUpdated: 0,
|
||||
recordsDeleted: 0,
|
||||
totalRecords: 0
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = importPurchaseOrders;
|
||||
Reference in New Issue
Block a user