11 Commits

8 changed files with 1093 additions and 766 deletions

1
.gitignore vendored
View File

@@ -57,3 +57,4 @@ csv/**/*
**/csv/**/* **/csv/**/*
!csv/.gitkeep !csv/.gitkeep
inventory/tsconfig.tsbuildinfo inventory/tsconfig.tsbuildinfo
inventory-server/scripts/.fuse_hidden00000fa20000000a

View File

@@ -184,6 +184,7 @@ CREATE TABLE IF NOT EXISTS import_history (
start_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, start_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
end_time TIMESTAMP NULL, end_time TIMESTAMP NULL,
duration_seconds INT, duration_seconds INT,
duration_minutes DECIMAL(10,2) GENERATED ALWAYS AS (duration_seconds / 60.0) STORED,
records_added INT DEFAULT 0, records_added INT DEFAULT 0,
records_updated INT DEFAULT 0, records_updated INT DEFAULT 0,
is_incremental BOOLEAN DEFAULT FALSE, is_incremental BOOLEAN DEFAULT FALSE,

View File

@@ -39,7 +39,7 @@ CREATE TABLE products (
tags TEXT, tags TEXT,
moq INT DEFAULT 1, moq INT DEFAULT 1,
uom INT DEFAULT 1, uom INT DEFAULT 1,
rating TINYINT UNSIGNED DEFAULT 0, rating DECIMAL(10,2) DEFAULT 0.00,
reviews INT UNSIGNED DEFAULT 0, reviews INT UNSIGNED DEFAULT 0,
weight DECIMAL(10,3), weight DECIMAL(10,3),
length DECIMAL(10,3), length DECIMAL(10,3),
@@ -52,7 +52,7 @@ CREATE TABLE products (
notifies INT UNSIGNED DEFAULT 0, notifies INT UNSIGNED DEFAULT 0,
date_last_sold DATE, date_last_sold DATE,
PRIMARY KEY (pid), PRIMARY KEY (pid),
UNIQUE KEY unique_sku (SKU), INDEX idx_sku (SKU),
INDEX idx_vendor (vendor), INDEX idx_vendor (vendor),
INDEX idx_brand (brand), INDEX idx_brand (brand),
INDEX idx_location (location), INDEX idx_location (location),
@@ -113,11 +113,13 @@ CREATE TABLE IF NOT EXISTS orders (
tax DECIMAL(10,3) DEFAULT 0.000, tax DECIMAL(10,3) DEFAULT 0.000,
tax_included TINYINT(1) DEFAULT 0, tax_included TINYINT(1) DEFAULT 0,
shipping DECIMAL(10,3) DEFAULT 0.000, shipping DECIMAL(10,3) DEFAULT 0.000,
costeach DECIMAL(10,3) DEFAULT 0.000,
customer VARCHAR(50) NOT NULL, customer VARCHAR(50) NOT NULL,
customer_name VARCHAR(100), customer_name VARCHAR(100),
status VARCHAR(20) DEFAULT 'pending', status VARCHAR(20) DEFAULT 'pending',
canceled TINYINT(1) DEFAULT 0, canceled TINYINT(1) DEFAULT 0,
PRIMARY KEY (id), PRIMARY KEY (id),
UNIQUE KEY unique_order_line (order_number, pid),
KEY order_number (order_number), KEY order_number (order_number),
KEY pid (pid), KEY pid (pid),
KEY customer (customer), KEY customer (customer),
@@ -135,7 +137,9 @@ CREATE TABLE purchase_orders (
expected_date DATE, expected_date DATE,
pid BIGINT NOT NULL, pid BIGINT NOT NULL,
sku VARCHAR(50) NOT NULL, sku VARCHAR(50) NOT NULL,
name VARCHAR(100) NOT NULL COMMENT 'Product name from products.description',
cost_price DECIMAL(10, 3) NOT NULL, cost_price DECIMAL(10, 3) NOT NULL,
po_cost_price DECIMAL(10, 3) NOT NULL COMMENT 'Original cost from PO, before receiving adjustments',
status TINYINT UNSIGNED DEFAULT 1 COMMENT '0=canceled,1=created,10=electronically_ready_send,11=ordered,12=preordered,13=electronically_sent,15=receiving_started,50=done', status TINYINT UNSIGNED DEFAULT 1 COMMENT '0=canceled,1=created,10=electronically_ready_send,11=ordered,12=preordered,13=electronically_sent,15=receiving_started,50=done',
receiving_status TINYINT UNSIGNED DEFAULT 1 COMMENT '0=canceled,1=created,30=partial_received,40=full_received,50=paid', receiving_status TINYINT UNSIGNED DEFAULT 1 COMMENT '0=canceled,1=created,30=partial_received,40=full_received,50=paid',
notes TEXT, notes TEXT,
@@ -147,7 +151,6 @@ CREATE TABLE purchase_orders (
received_by INT, received_by INT,
receiving_history JSON COMMENT 'Array of receiving records with qty, date, cost, receiving_id, and alt_po flag', receiving_history JSON COMMENT 'Array of receiving records with qty, date, cost, receiving_id, and alt_po flag',
FOREIGN KEY (pid) REFERENCES products(pid), FOREIGN KEY (pid) REFERENCES products(pid),
FOREIGN KEY (sku) REFERENCES products(SKU),
INDEX idx_po_id (po_id), INDEX idx_po_id (po_id),
INDEX idx_vendor (vendor), INDEX idx_vendor (vendor),
INDEX idx_status (status), INDEX idx_status (status),

View File

@@ -10,8 +10,8 @@ const importPurchaseOrders = require('./import/purchase-orders');
dotenv.config({ path: path.join(__dirname, "../.env") }); dotenv.config({ path: path.join(__dirname, "../.env") });
// Constants to control which imports run // Constants to control which imports run
const IMPORT_CATEGORIES = false; const IMPORT_CATEGORIES = true;
const IMPORT_PRODUCTS = false; const IMPORT_PRODUCTS = true;
const IMPORT_ORDERS = true; const IMPORT_ORDERS = true;
const IMPORT_PURCHASE_ORDERS = true; const IMPORT_PURCHASE_ORDERS = true;
@@ -48,7 +48,6 @@ const sshConfig = {
connectionLimit: 10, connectionLimit: 10,
queueLimit: 0, queueLimit: 0,
namedPlaceholders: true, namedPlaceholders: true,
maxAllowedPacket: 64 * 1024 * 1024, // 64MB
connectTimeout: 60000, connectTimeout: 60000,
enableKeepAlive: true, enableKeepAlive: true,
keepAliveInitialDelay: 10000, keepAliveInitialDelay: 10000,
@@ -162,32 +161,36 @@ async function main() {
results.categories = await importCategories(prodConnection, localConnection); results.categories = await importCategories(prodConnection, localConnection);
if (isImportCancelled) throw new Error("Import cancelled"); if (isImportCancelled) throw new Error("Import cancelled");
completedSteps++; completedSteps++;
if (results.categories.recordsAdded) totalRecordsAdded += results.categories.recordsAdded; console.log('Categories import result:', results.categories);
if (results.categories.recordsUpdated) totalRecordsUpdated += results.categories.recordsUpdated; totalRecordsAdded += results.categories?.recordsAdded || 0;
totalRecordsUpdated += results.categories?.recordsUpdated || 0;
} }
if (IMPORT_PRODUCTS) { if (IMPORT_PRODUCTS) {
results.products = await importProducts(prodConnection, localConnection, INCREMENTAL_UPDATE); results.products = await importProducts(prodConnection, localConnection, INCREMENTAL_UPDATE);
if (isImportCancelled) throw new Error("Import cancelled"); if (isImportCancelled) throw new Error("Import cancelled");
completedSteps++; completedSteps++;
if (results.products.recordsAdded) totalRecordsAdded += results.products.recordsAdded; console.log('Products import result:', results.products);
if (results.products.recordsUpdated) totalRecordsUpdated += results.products.recordsUpdated; totalRecordsAdded += results.products?.recordsAdded || 0;
totalRecordsUpdated += results.products?.recordsUpdated || 0;
} }
if (IMPORT_ORDERS) { if (IMPORT_ORDERS) {
results.orders = await importOrders(prodConnection, localConnection, INCREMENTAL_UPDATE); results.orders = await importOrders(prodConnection, localConnection, INCREMENTAL_UPDATE);
if (isImportCancelled) throw new Error("Import cancelled"); if (isImportCancelled) throw new Error("Import cancelled");
completedSteps++; completedSteps++;
if (results.orders.recordsAdded) totalRecordsAdded += results.orders.recordsAdded; console.log('Orders import result:', results.orders);
if (results.orders.recordsUpdated) totalRecordsUpdated += results.orders.recordsUpdated; totalRecordsAdded += results.orders?.recordsAdded || 0;
totalRecordsUpdated += results.orders?.recordsUpdated || 0;
} }
if (IMPORT_PURCHASE_ORDERS) { if (IMPORT_PURCHASE_ORDERS) {
results.purchaseOrders = await importPurchaseOrders(prodConnection, localConnection, INCREMENTAL_UPDATE); results.purchaseOrders = await importPurchaseOrders(prodConnection, localConnection, INCREMENTAL_UPDATE);
if (isImportCancelled) throw new Error("Import cancelled"); if (isImportCancelled) throw new Error("Import cancelled");
completedSteps++; completedSteps++;
if (results.purchaseOrders.recordsAdded) totalRecordsAdded += results.purchaseOrders.recordsAdded; console.log('Purchase orders import result:', results.purchaseOrders);
if (results.purchaseOrders.recordsUpdated) totalRecordsUpdated += results.purchaseOrders.recordsUpdated; totalRecordsAdded += results.purchaseOrders?.recordsAdded || 0;
totalRecordsUpdated += results.purchaseOrders?.recordsUpdated || 0;
} }
const endTime = Date.now(); const endTime = Date.now();

View File

@@ -1,5 +1,5 @@
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress'); const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress');
const { importMissingProducts } = require('./products'); const { importMissingProducts, setupTemporaryTables, cleanupTemporaryTables, materializeCalculations } = require('./products');
/** /**
* Imports orders from a production MySQL database to a local MySQL database. * Imports orders from a production MySQL database to a local MySQL database.
@@ -19,8 +19,63 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
const missingProducts = new Set(); const missingProducts = new Set();
let recordsAdded = 0; let recordsAdded = 0;
let recordsUpdated = 0; let recordsUpdated = 0;
let processedCount = 0;
let importedCount = 0;
let totalOrderItems = 0;
let totalUniqueOrders = 0;
// Add a cumulative counter for processed orders before the loop
let cumulativeProcessedOrders = 0;
try { try {
// Insert temporary table creation queries
await localConnection.query(`
CREATE TABLE IF NOT EXISTS temp_order_items (
order_id INT UNSIGNED NOT NULL,
pid INT UNSIGNED NOT NULL,
SKU VARCHAR(50) NOT NULL,
price DECIMAL(10,2) NOT NULL,
quantity INT NOT NULL,
base_discount DECIMAL(10,2) DEFAULT 0,
PRIMARY KEY (order_id, pid)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
`);
await localConnection.query(`
CREATE TABLE IF NOT EXISTS temp_order_meta (
order_id INT UNSIGNED NOT NULL,
date DATE NOT NULL,
customer VARCHAR(100) NOT NULL,
customer_name VARCHAR(150) NOT NULL,
status INT,
canceled TINYINT(1),
PRIMARY KEY (order_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
`);
await localConnection.query(`
CREATE TABLE IF NOT EXISTS temp_order_discounts (
order_id INT UNSIGNED NOT NULL,
pid INT UNSIGNED NOT NULL,
discount DECIMAL(10,2) NOT NULL,
PRIMARY KEY (order_id, pid)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
`);
await localConnection.query(`
CREATE TABLE IF NOT EXISTS temp_order_taxes (
order_id INT UNSIGNED NOT NULL,
pid INT UNSIGNED NOT NULL,
tax DECIMAL(10,2) NOT NULL,
PRIMARY KEY (order_id, pid)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
`);
await localConnection.query(`
CREATE TABLE IF NOT EXISTS temp_order_costs (
order_id INT UNSIGNED NOT NULL,
pid INT UNSIGNED NOT NULL,
costeach DECIMAL(10,3) DEFAULT 0.000,
PRIMARY KEY (order_id, pid)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
`);
// Get column names from the local table // Get column names from the local table
const [columns] = await localConnection.query(` const [columns] = await localConnection.query(`
SELECT COLUMN_NAME SELECT COLUMN_NAME
@@ -36,51 +91,11 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
); );
const lastSyncTime = syncInfo?.[0]?.last_sync_timestamp || '1970-01-01'; const lastSyncTime = syncInfo?.[0]?.last_sync_timestamp || '1970-01-01';
// Create temporary tables for staging data console.log('Orders: Using last sync time:', lastSyncTime);
await localConnection.query(`
CREATE TEMPORARY TABLE temp_order_items (
order_id INT UNSIGNED,
pid INT UNSIGNED,
SKU VARCHAR(50),
price DECIMAL(10,3),
quantity INT,
base_discount DECIMAL(10,3),
PRIMARY KEY (order_id, pid)
) ENGINE=InnoDB;
CREATE TEMPORARY TABLE temp_order_meta ( // First get count of order items
order_id INT UNSIGNED PRIMARY KEY, const [[{ total }]] = await prodConnection.query(`
date DATE, SELECT COUNT(*) as total
customer INT UNSIGNED,
customer_name VARCHAR(100),
status TINYINT UNSIGNED,
canceled TINYINT UNSIGNED
) ENGINE=InnoDB;
CREATE TEMPORARY TABLE temp_order_discounts (
order_id INT UNSIGNED,
pid INT UNSIGNED,
discount DECIMAL(10,3),
PRIMARY KEY (order_id, pid)
) ENGINE=InnoDB;
CREATE TEMPORARY TABLE temp_order_taxes (
order_id INT UNSIGNED,
pid INT UNSIGNED,
tax DECIMAL(10,3),
PRIMARY KEY (order_id, pid)
) ENGINE=InnoDB;
`);
// Get base order items first
const [orderItems] = await prodConnection.query(`
SELECT
oi.order_id,
oi.prod_pid as pid,
oi.prod_itemnumber as SKU,
oi.prod_price as price,
oi.qty_ordered as quantity,
COALESCE(oi.prod_price_reg - oi.prod_price, 0) * oi.qty_ordered as base_discount
FROM order_items oi FROM order_items oi
USE INDEX (PRIMARY) USE INDEX (PRIMARY)
JOIN _order o ON oi.order_id = o.order_id JOIN _order o ON oi.order_id = o.order_id
@@ -91,16 +106,62 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
AND ( AND (
o.stamp > ? o.stamp > ?
OR oi.stamp > ? OR oi.stamp > ?
OR o.date_placed > ? OR EXISTS (
OR o.date_shipped > ? SELECT 1 FROM order_discount_items odi
OR o.date_cancelled > ? WHERE odi.order_id = o.order_id
OR o.date_updated > ? AND odi.pid = oi.prod_pid
)
OR EXISTS (
SELECT 1 FROM order_tax_info oti
JOIN order_tax_info_products otip ON oti.taxinfo_id = otip.taxinfo_id
WHERE oti.order_id = o.order_id
AND otip.pid = oi.prod_pid
AND oti.stamp > ?
)
) )
` : ''} ` : ''}
`, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []); `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []);
const totalOrders = orderItems.length; totalOrderItems = total;
let processed = 0; console.log('Orders: Found changes:', totalOrderItems);
// Get order items in batches
const [orderItems] = await prodConnection.query(`
SELECT
oi.order_id,
oi.prod_pid as pid,
oi.prod_itemnumber as SKU,
oi.prod_price as price,
oi.qty_ordered as quantity,
COALESCE(oi.prod_price_reg - oi.prod_price, 0) as base_discount,
oi.stamp as last_modified
FROM order_items oi
USE INDEX (PRIMARY)
JOIN _order o ON oi.order_id = o.order_id
WHERE o.order_status >= 15
AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR)
AND o.date_placed_onlydate IS NOT NULL
${incrementalUpdate ? `
AND (
o.stamp > ?
OR oi.stamp > ?
OR EXISTS (
SELECT 1 FROM order_discount_items odi
WHERE odi.order_id = o.order_id
AND odi.pid = oi.prod_pid
)
OR EXISTS (
SELECT 1 FROM order_tax_info oti
JOIN order_tax_info_products otip ON oti.taxinfo_id = otip.taxinfo_id
WHERE oti.order_id = o.order_id
AND otip.pid = oi.prod_pid
AND oti.stamp > ?
)
)
` : ''}
`, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []);
console.log('Orders: Processing', orderItems.length, 'order items');
// Insert order items in batches // Insert order items in batches
for (let i = 0; i < orderItems.length; i += 5000) { for (let i = 0; i < orderItems.length; i += 5000) {
@@ -111,25 +172,39 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
]); ]);
await localConnection.query(` await localConnection.query(`
INSERT INTO temp_order_items VALUES ${placeholders} INSERT INTO temp_order_items (order_id, pid, SKU, price, quantity, base_discount)
VALUES ${placeholders}
ON DUPLICATE KEY UPDATE
SKU = VALUES(SKU),
price = VALUES(price),
quantity = VALUES(quantity),
base_discount = VALUES(base_discount)
`, values); `, values);
processed += batch.length; processedCount = i + batch.length;
outputProgress({ outputProgress({
status: "running", status: "running",
operation: "Orders import", operation: "Orders import",
message: `Loading order items: ${processed} of ${totalOrders}`, message: `Loading order items: ${processedCount} of ${totalOrderItems}`,
current: processed, current: processedCount,
total: totalOrders total: totalOrderItems
}); });
} }
// Get unique order IDs // Get unique order IDs
const orderIds = [...new Set(orderItems.map(item => item.order_id))]; const orderIds = [...new Set(orderItems.map(item => item.order_id))];
totalUniqueOrders = orderIds.length;
console.log('Total unique order IDs:', totalUniqueOrders);
// Reset processed count for order processing phase
processedCount = 0;
// Get order metadata in batches // Get order metadata in batches
for (let i = 0; i < orderIds.length; i += 5000) { for (let i = 0; i < orderIds.length; i += 5000) {
const batchIds = orderIds.slice(i, i + 5000); const batchIds = orderIds.slice(i, i + 5000);
console.log(`Processing batch ${i/5000 + 1}, size: ${batchIds.length}`);
console.log('Sample of batch IDs:', batchIds.slice(0, 5));
const [orders] = await prodConnection.query(` const [orders] = await prodConnection.query(`
SELECT SELECT
o.order_id, o.order_id,
@@ -143,6 +218,14 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
WHERE o.order_id IN (?) WHERE o.order_id IN (?)
`, [batchIds]); `, [batchIds]);
console.log(`Retrieved ${orders.length} orders for ${batchIds.length} IDs`);
const duplicates = orders.filter((order, index, self) =>
self.findIndex(o => o.order_id === order.order_id) !== index
);
if (duplicates.length > 0) {
console.log('Found duplicates:', duplicates);
}
const placeholders = orders.map(() => "(?, ?, ?, ?, ?, ?)").join(","); const placeholders = orders.map(() => "(?, ?, ?, ?, ?, ?)").join(",");
const values = orders.flatMap(order => [ const values = orders.flatMap(order => [
order.order_id, order.date, order.customer, order.customer_name, order.status, order.canceled order.order_id, order.date, order.customer, order.customer_name, order.status, order.canceled
@@ -150,17 +233,27 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
await localConnection.query(` await localConnection.query(`
INSERT INTO temp_order_meta VALUES ${placeholders} INSERT INTO temp_order_meta VALUES ${placeholders}
ON DUPLICATE KEY UPDATE
date = VALUES(date),
customer = VALUES(customer),
customer_name = VALUES(customer_name),
status = VALUES(status),
canceled = VALUES(canceled)
`, values); `, values);
processedCount = i + orders.length;
outputProgress({ outputProgress({
status: "running", status: "running",
operation: "Orders import", operation: "Orders import",
message: `Loading order metadata: ${i + orders.length} of ${orderIds.length}`, message: `Loading order metadata: ${processedCount} of ${totalUniqueOrders}`,
current: i + orders.length, current: processedCount,
total: orderIds.length total: totalUniqueOrders
}); });
} }
// Reset processed count for final phase
processedCount = 0;
// Get promotional discounts in batches // Get promotional discounts in batches
for (let i = 0; i < orderIds.length; i += 5000) { for (let i = 0; i < orderIds.length; i += 5000) {
const batchIds = orderIds.slice(i, i + 5000); const batchIds = orderIds.slice(i, i + 5000);
@@ -177,6 +270,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
await localConnection.query(` await localConnection.query(`
INSERT INTO temp_order_discounts VALUES ${placeholders} INSERT INTO temp_order_discounts VALUES ${placeholders}
ON DUPLICATE KEY UPDATE
discount = VALUES(discount)
`, values); `, values);
} }
} }
@@ -212,14 +307,33 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
const placeholders = Array(uniqueTaxes.size).fill("(?, ?, ?)").join(","); const placeholders = Array(uniqueTaxes.size).fill("(?, ?, ?)").join(",");
await localConnection.query(` await localConnection.query(`
INSERT INTO temp_order_taxes VALUES ${placeholders} INSERT INTO temp_order_taxes VALUES ${placeholders}
ON DUPLICATE KEY UPDATE tax = VALUES(tax)
`, values); `, values);
} }
} }
} }
// Now combine all the data and insert into orders table // Get costeach values in batches
let importedCount = 0; for (let i = 0; i < orderIds.length; i += 5000) {
const batchIds = orderIds.slice(i, i + 5000);
const [costs] = await prodConnection.query(`
SELECT orderid as order_id, pid, costeach
FROM order_costs
WHERE orderid IN (?)
`, [batchIds]);
if (costs.length > 0) {
const placeholders = costs.map(() => '(?, ?, ?)').join(",");
const values = costs.flatMap(c => [c.order_id, c.pid, c.costeach]);
await localConnection.query(`
INSERT INTO temp_order_costs (order_id, pid, costeach)
VALUES ${placeholders}
ON DUPLICATE KEY UPDATE costeach = VALUES(costeach)
`, values);
}
}
// Now combine all the data and insert into orders table
// Pre-check all products at once instead of per batch // Pre-check all products at once instead of per batch
const allOrderPids = [...new Set(orderItems.map(item => item.pid))]; const allOrderPids = [...new Set(orderItems.map(item => item.pid))];
const [existingProducts] = allOrderPids.length > 0 ? await localConnection.query( const [existingProducts] = allOrderPids.length > 0 ? await localConnection.query(
@@ -248,17 +362,21 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
om.customer, om.customer,
om.customer_name, om.customer_name,
om.status, om.status,
om.canceled om.canceled,
COALESCE(tc.costeach, 0) as costeach
FROM temp_order_items oi FROM temp_order_items oi
JOIN temp_order_meta om ON oi.order_id = om.order_id JOIN temp_order_meta om ON oi.order_id = om.order_id
LEFT JOIN temp_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid LEFT JOIN temp_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid
LEFT JOIN temp_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid LEFT JOIN temp_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid
LEFT JOIN temp_order_costs tc ON oi.order_id = tc.order_id AND oi.pid = tc.pid
WHERE oi.order_id IN (?) WHERE oi.order_id IN (?)
`, [batchIds]); `, [batchIds]);
// Filter orders and track missing products - do this in a single pass // Filter orders and track missing products - do this in a single pass
const validOrders = []; const validOrders = [];
const values = []; const values = [];
const processedOrderItems = new Set(); // Track unique order items
const processedOrders = new Set(); // Track unique orders
for (const order of orders) { for (const order of orders) {
if (!existingPids.has(order.pid)) { if (!existingPids.has(order.pid)) {
@@ -268,6 +386,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
} }
validOrders.push(order); validOrders.push(order);
values.push(...columnNames.map(col => order[col] ?? null)); values.push(...columnNames.map(col => order[col] ?? null));
processedOrderItems.add(`${order.order_number}-${order.pid}`);
processedOrders.add(order.order_number);
} }
if (validOrders.length > 0) { if (validOrders.length > 0) {
@@ -275,159 +395,153 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
const singlePlaceholder = `(${columnNames.map(() => "?").join(",")})`; const singlePlaceholder = `(${columnNames.map(() => "?").join(",")})`;
const placeholders = Array(validOrders.length).fill(singlePlaceholder).join(","); const placeholders = Array(validOrders.length).fill(singlePlaceholder).join(",");
const query = ` const result = await localConnection.query(`
INSERT INTO orders (${columnNames.join(",")}) INSERT INTO orders (${columnNames.join(",")})
VALUES ${placeholders} VALUES ${placeholders}
ON DUPLICATE KEY UPDATE ON DUPLICATE KEY UPDATE
${columnNames.map(col => `${col} = VALUES(${col})`).join(",")} SKU = VALUES(SKU),
`; date = VALUES(date),
price = VALUES(price),
quantity = VALUES(quantity),
discount = VALUES(discount),
tax = VALUES(tax),
tax_included = VALUES(tax_included),
shipping = VALUES(shipping),
customer = VALUES(customer),
customer_name = VALUES(customer_name),
status = VALUES(status),
canceled = VALUES(canceled),
costeach = VALUES(costeach)
`, validOrders.map(o => columnNames.map(col => o[col] ?? null)).flat());
const result = await localConnection.query(query, values.flat()); const affectedRows = result[0].affectedRows;
recordsAdded += result.affectedRows - result.changedRows; const updates = Math.floor(affectedRows / 2);
recordsUpdated += result.changedRows; const inserts = affectedRows - (updates * 2);
importedCount += validOrders.length; recordsAdded += inserts;
recordsUpdated += updates;
importedCount += processedOrderItems.size; // Count unique order items processed
} }
// Update progress based on unique orders processed
cumulativeProcessedOrders += processedOrders.size;
outputProgress({
status: "running",
operation: "Orders import",
message: `Imported ${importedCount} order items (${cumulativeProcessedOrders} of ${totalUniqueOrders} orders processed)`,
current: cumulativeProcessedOrders,
total: totalUniqueOrders,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, cumulativeProcessedOrders, totalUniqueOrders),
rate: calculateRate(startTime, cumulativeProcessedOrders)
});
}
// Now try to import any orders that were skipped due to missing products
if (skippedOrders.size > 0) {
try {
outputProgress({
status: "running",
operation: "Orders import",
message: `Retrying import of ${skippedOrders.size} orders with previously missing products`,
});
// Get the orders that were skipped
const [skippedProdOrders] = await localConnection.query(`
SELECT DISTINCT
oi.order_id as order_number,
oi.pid,
oi.SKU,
om.date,
oi.price,
oi.quantity,
oi.base_discount + COALESCE(od.discount, 0) as discount,
COALESCE(ot.tax, 0) as tax,
0 as tax_included,
0 as shipping,
om.customer,
om.customer_name,
om.status,
om.canceled,
COALESCE(tc.costeach, 0) as costeach
FROM temp_order_items oi
JOIN temp_order_meta om ON oi.order_id = om.order_id
LEFT JOIN temp_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid
LEFT JOIN temp_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid
LEFT JOIN temp_order_costs tc ON oi.order_id = tc.order_id AND oi.pid = tc.pid
WHERE oi.order_id IN (?)
`, [Array.from(skippedOrders)]);
// Check which products exist now
const skippedPids = [...new Set(skippedProdOrders.map(o => o.pid))];
const [existingProducts] = skippedPids.length > 0 ? await localConnection.query(
"SELECT pid FROM products WHERE pid IN (?)",
[skippedPids]
) : [[]];
const existingPids = new Set(existingProducts.map(p => p.pid));
// Filter orders that can now be imported
const validOrders = skippedProdOrders.filter(order => existingPids.has(order.pid));
const retryOrderItems = new Set(); // Track unique order items in retry
if (validOrders.length > 0) {
const placeholders = validOrders.map(() => `(${columnNames.map(() => "?").join(", ")})`).join(",");
const values = validOrders.map(o => columnNames.map(col => o[col] ?? null)).flat();
const result = await localConnection.query(`
INSERT INTO orders (${columnNames.join(", ")})
VALUES ${placeholders}
ON DUPLICATE KEY UPDATE
SKU = VALUES(SKU),
date = VALUES(date),
price = VALUES(price),
quantity = VALUES(quantity),
discount = VALUES(discount),
tax = VALUES(tax),
tax_included = VALUES(tax_included),
shipping = VALUES(shipping),
customer = VALUES(customer),
customer_name = VALUES(customer_name),
status = VALUES(status),
canceled = VALUES(canceled),
costeach = VALUES(costeach)
`, values);
const affectedRows = result[0].affectedRows;
const updates = Math.floor(affectedRows / 2);
const inserts = affectedRows - (updates * 2);
// Track unique order items
validOrders.forEach(order => {
retryOrderItems.add(`${order.order_number}-${order.pid}`);
});
outputProgress({ outputProgress({
status: "running", status: "running",
operation: "Orders import", operation: "Orders import",
message: `Imported ${importedCount} of ${totalOrders} orders`, message: `Successfully imported ${retryOrderItems.size} previously skipped order items`,
current: importedCount,
total: totalOrders,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, importedCount, totalOrders),
rate: calculateRate(startTime, importedCount)
}); });
// Update the main counters
recordsAdded += inserts;
recordsUpdated += updates;
importedCount += retryOrderItems.size;
}
} catch (error) {
console.warn('Warning: Failed to retry skipped orders:', error.message);
console.warn(`Skipped ${skippedOrders.size} orders due to ${missingProducts.size} missing products`);
}
} }
// Clean up temporary tables // Clean up temporary tables after ALL processing is complete
await localConnection.query(` await localConnection.query(`
DROP TEMPORARY TABLE IF EXISTS temp_order_items; DROP TEMPORARY TABLE IF EXISTS temp_order_items;
DROP TEMPORARY TABLE IF EXISTS temp_order_meta; DROP TEMPORARY TABLE IF EXISTS temp_order_meta;
DROP TEMPORARY TABLE IF EXISTS temp_order_discounts; DROP TEMPORARY TABLE IF EXISTS temp_order_discounts;
DROP TEMPORARY TABLE IF EXISTS temp_order_taxes; DROP TEMPORARY TABLE IF EXISTS temp_order_taxes;
DROP TEMPORARY TABLE IF EXISTS temp_order_costs;
`); `);
// Import missing products if any
if (missingProducts.size > 0) {
try {
// Setup temporary tables again since they were dropped
await setupTemporaryTables(localConnection);
await materializeCalculations(prodConnection, localConnection);
await importMissingProducts(prodConnection, localConnection, Array.from(missingProducts));
// Clean up temporary tables after missing products import
await cleanupTemporaryTables(localConnection);
// Retry skipped orders after importing products
if (skippedOrders.size > 0) {
outputProgress({
status: "running",
operation: "Orders import",
message: `Retrying import of ${skippedOrders.size} orders with previously missing products`
});
const skippedOrdersArray = Array.from(skippedOrders);
const [skippedProdOrders] = skippedOrdersArray.length > 0 ? await prodConnection.query(`
SELECT
o.order_id,
CASE
WHEN o.date_placed = '0000-00-00 00:00:00' OR o.date_placed IS NULL THEN o.stamp
ELSE o.date_placed
END as date,
o.order_cid,
o.bill_firstname,
o.bill_lastname,
o.order_email,
o.order_status,
o.date_shipped,
o.date_cancelled,
oi.prod_pid,
oi.prod_itemnumber,
oi.prod_price,
oi.qty_ordered,
oi.qty_back,
oi.qty_placed,
oi.qty_placed_2,
oi.discounted,
oi.summary_cogs,
oi.summary_profit,
oi.summary_orderdate,
oi.summary_paiddate,
oi.date_added,
oi.stamp
FROM order_items oi
JOIN _order o ON oi.order_id = o.order_id
WHERE o.order_id IN (?)
`, [skippedOrdersArray]) : [[]];
// Prepare values for insertion
const skippedOrderValues = skippedProdOrders.flatMap(order => {
if (!order.date) {
console.log(`Warning: Skipped order ${order.order_id} has null date:`, JSON.stringify(order, null, 2));
return [];
}
const canceled = order.date_cancelled !== '0000-00-00 00:00:00' ? 1 : 0;
const customerName = `${order.bill_firstname} ${order.bill_lastname}`;
// Create an object with keys based on column names
const orderData = {
id: order.order_id,
order_number: order.order_id,
pid: order.prod_pid,
SKU: order.prod_itemnumber,
date: order.date ? (
order.date instanceof Date ?
order.date.toJSON()?.slice(0,10) || null :
(typeof order.date === 'string' ? order.date.split(' ')[0] : null)
) : null,
price: order.prod_price,
quantity: order.qty_ordered,
discount: order.discounted,
tax: 0, // Placeholder, will be calculated later
tax_included: 0, // Placeholder, will be calculated later
shipping: 0, // Placeholder, will be calculated later
customer: order.order_email,
customer_name: customerName,
status: order.order_status,
canceled: canceled,
};
// Map column names to values, handling missing columns
return [columnNames.map(colName => orderData[colName] !== undefined ? orderData[colName] : null)];
});
// Construct the insert query dynamically
const skippedPlaceholders = skippedProdOrders.map(() => `(${columnNames.map(() => "?").join(", ")})`).join(",");
const skippedInsertQuery = `
INSERT INTO orders (${columnNames.join(", ")})
VALUES ${skippedPlaceholders}
ON DUPLICATE KEY UPDATE
${columnNames.map(col => `${col} = VALUES(${col})`).join(", ")}
`;
// Execute the insert query
if (skippedOrderValues.length > 0) {
await localConnection.query(skippedInsertQuery, skippedOrderValues.flat());
}
importedCount += skippedProdOrders.length;
outputProgress({
status: "running",
operation: "Orders import",
message: `Successfully imported ${skippedProdOrders.length} previously skipped orders`,
});
}
} catch (error) {
console.warn('Warning: Failed to import missing products:', error.message);
console.warn(`Skipped ${skippedOrders.size} orders due to ${missingProducts.size} missing products`);
}
}
// Only update sync status if we get here (no errors thrown) // Only update sync status if we get here (no errors thrown)
await localConnection.query(` await localConnection.query(`
INSERT INTO sync_status (table_name, last_sync_timestamp) INSERT INTO sync_status (table_name, last_sync_timestamp)
@@ -437,9 +551,9 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
return { return {
status: "complete", status: "complete",
totalImported: importedCount, totalImported: Math.floor(importedCount),
recordsAdded, recordsAdded: recordsAdded || 0,
recordsUpdated, recordsUpdated: Math.floor(recordsUpdated),
totalSkipped: skippedOrders.size, totalSkipped: skippedOrders.size,
missingProducts: missingProducts.size, missingProducts: missingProducts.size,
incrementalUpdate, incrementalUpdate,

File diff suppressed because it is too large Load Diff

View File

@@ -12,6 +12,22 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
); );
const lastSyncTime = syncInfo?.[0]?.last_sync_timestamp || '1970-01-01'; const lastSyncTime = syncInfo?.[0]?.last_sync_timestamp || '1970-01-01';
console.log('Purchase Orders: Using last sync time:', lastSyncTime);
// Insert temporary table creation query for purchase orders
await localConnection.query(`
CREATE TABLE IF NOT EXISTS temp_purchase_orders (
po_id INT UNSIGNED NOT NULL,
pid INT UNSIGNED NOT NULL,
vendor VARCHAR(255),
date DATE,
expected_date DATE,
status INT,
notes TEXT,
PRIMARY KEY (po_id, pid)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
`);
outputProgress({ outputProgress({
operation: `Starting ${incrementalUpdate ? 'incremental' : 'full'} purchase orders import`, operation: `Starting ${incrementalUpdate ? 'incremental' : 'full'} purchase orders import`,
status: "running", status: "running",
@@ -82,6 +98,8 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime // Receiving conditions lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime // Receiving conditions
] : []); ] : []);
console.log('Purchase Orders: Found changes:', total);
const [poList] = await prodConnection.query(` const [poList] = await prodConnection.query(`
SELECT DISTINCT SELECT DISTINCT
COALESCE(p.po_id, r.receiving_id) as po_id, COALESCE(p.po_id, r.receiving_id) as po_id,
@@ -90,31 +108,67 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
NULLIF(s2.companyname, ''), NULLIF(s2.companyname, ''),
'Unknown Vendor' 'Unknown Vendor'
) as vendor, ) as vendor,
CASE WHEN p.po_id IS NOT NULL THEN DATE(p.date_ordered) END as date, CASE
CASE WHEN p.po_id IS NOT NULL THEN DATE(p.date_estin) END as expected_date, WHEN p.po_id IS NOT NULL THEN
DATE(COALESCE(
NULLIF(p.date_ordered, '0000-00-00 00:00:00'),
p.date_created
))
WHEN r.receiving_id IS NOT NULL THEN
DATE(r.date_created)
END as date,
CASE
WHEN p.date_estin = '0000-00-00' THEN NULL
WHEN p.date_estin IS NULL THEN NULL
WHEN p.date_estin NOT REGEXP '^[0-9]{4}-[0-9]{2}-[0-9]{2}$' THEN NULL
ELSE p.date_estin
END as expected_date,
COALESCE(p.status, 50) as status, COALESCE(p.status, 50) as status,
COALESCE(p.short_note, '') as notes, p.short_note as notes,
COALESCE(p.notes, '') as long_note p.notes as long_note
FROM ( FROM (
SELECT po_id FROM po SELECT po_id FROM po
USE INDEX (idx_date_created) USE INDEX (idx_date_created)
WHERE date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) WHERE date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR)
AND (date_ordered > ? ${incrementalUpdate ? `
OR date_updated > ?) AND (
date_ordered > ?
OR date_updated > ?
OR date_estin > ?
)
` : ''}
UNION UNION
SELECT DISTINCT r.receiving_id as po_id SELECT DISTINCT r.receiving_id as po_id
FROM receivings r FROM receivings r
JOIN receivings_products rp USE INDEX (received_date) ON r.receiving_id = rp.receiving_id JOIN receivings_products rp USE INDEX (received_date) ON r.receiving_id = rp.receiving_id
WHERE rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) WHERE rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR)
AND (rp.received_date > ? ${incrementalUpdate ? `
OR rp.stamp > ?) AND (
r.date_created > ?
OR r.date_checked > ?
OR rp.stamp > ?
OR rp.received_date > ?
)
` : ''}
) ids ) ids
LEFT JOIN po p ON ids.po_id = p.po_id LEFT JOIN po p ON ids.po_id = p.po_id
LEFT JOIN suppliers s1 ON p.supplier_id = s1.supplierid LEFT JOIN suppliers s1 ON p.supplier_id = s1.supplierid
LEFT JOIN receivings r ON ids.po_id = r.receiving_id LEFT JOIN receivings r ON ids.po_id = r.receiving_id
LEFT JOIN suppliers s2 ON r.supplier_id = s2.supplierid LEFT JOIN suppliers s2 ON r.supplier_id = s2.supplierid
ORDER BY po_id ORDER BY po_id
`, [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime]); `, incrementalUpdate ? [
lastSyncTime, lastSyncTime, lastSyncTime, // PO conditions
lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime // Receiving conditions
] : []);
console.log('Sample PO dates:', poList.slice(0, 5).map(po => ({
po_id: po.po_id,
raw_date_ordered: po.raw_date_ordered,
raw_date_created: po.raw_date_created,
raw_date_estin: po.raw_date_estin,
computed_date: po.date,
expected_date: po.expected_date
})));
const totalItems = total; const totalItems = total;
let processed = 0; let processed = 0;
@@ -138,7 +192,8 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
pop.po_id, pop.po_id,
pop.pid, pop.pid,
pr.itemnumber as sku, pr.itemnumber as sku,
pop.cost_each as cost_price, pr.description as name,
pop.cost_each,
pop.qty_each as ordered pop.qty_each as ordered
FROM po_products pop FROM po_products pop
USE INDEX (PRIMARY) USE INDEX (PRIMARY)
@@ -153,7 +208,7 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
const productPids = [...new Set(productBatch.map(p => p.pid))]; const productPids = [...new Set(productBatch.map(p => p.pid))];
const batchPoIds = [...new Set(productBatch.map(p => p.po_id))]; const batchPoIds = [...new Set(productBatch.map(p => p.po_id))];
// Get receivings for this batch // Get receivings for this batch with employee names
const [receivings] = await prodConnection.query(` const [receivings] = await prodConnection.query(`
SELECT SELECT
r.po_id, r.po_id,
@@ -161,8 +216,9 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
rp.receiving_id, rp.receiving_id,
rp.qty_each, rp.qty_each,
rp.cost_each, rp.cost_each,
DATE(NULLIF(rp.received_date, '0000-00-00 00:00:00')) as received_date, COALESCE(rp.received_date, r.date_created) as received_date,
rp.received_by, rp.received_by,
CONCAT(e.firstname, ' ', e.lastname) as received_by_name,
CASE CASE
WHEN r.po_id IS NULL THEN 2 -- No PO WHEN r.po_id IS NULL THEN 2 -- No PO
WHEN r.po_id IN (?) THEN 0 -- Original PO WHEN r.po_id IN (?) THEN 0 -- Original PO
@@ -171,8 +227,9 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
FROM receivings_products rp FROM receivings_products rp
USE INDEX (received_date) USE INDEX (received_date)
LEFT JOIN receivings r ON r.receiving_id = rp.receiving_id LEFT JOIN receivings r ON r.receiving_id = rp.receiving_id
LEFT JOIN employees e ON rp.received_by = e.employeeid
WHERE rp.pid IN (?) WHERE rp.pid IN (?)
AND rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) AND rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR)
ORDER BY r.po_id, rp.pid, rp.received_date ORDER BY r.po_id, rp.pid, rp.received_date
`, [batchPoIds, productPids]); `, [batchPoIds, productPids]);
@@ -217,8 +274,21 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
); );
const validPids = new Set(existingPids.map(p => p.pid)); const validPids = new Set(existingPids.map(p => p.pid));
// Prepare values for this sub-batch // First check which PO lines already exist and get their current values
const values = []; const poLines = Array.from(poProductMap.values())
.filter(p => validPids.has(p.pid))
.map(p => [p.po_id, p.pid]);
const [existingPOs] = await localConnection.query(
`SELECT ${columnNames.join(',')} FROM purchase_orders WHERE (po_id, pid) IN (${poLines.map(() => "(?,?)").join(",")})`,
poLines.flat()
);
const existingPOMap = new Map(
existingPOs.map(po => [`${po.po_id}-${po.pid}`, po])
);
// Split into inserts and updates
const insertsAndUpdates = { inserts: [], updates: [] };
let batchProcessed = 0; let batchProcessed = 0;
for (const po of batch) { for (const po of batch) {
@@ -236,16 +306,29 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
...receivingHistory.map(r => ({ ...r, type: 'original' })), ...receivingHistory.map(r => ({ ...r, type: 'original' })),
...altReceivingHistory.map(r => ({ ...r, type: 'alternate' })), ...altReceivingHistory.map(r => ({ ...r, type: 'alternate' })),
...noPOReceivingHistory.map(r => ({ ...r, type: 'no_po' })) ...noPOReceivingHistory.map(r => ({ ...r, type: 'no_po' }))
].sort((a, b) => new Date(a.received_date) - new Date(b.received_date)); ].sort((a, b) => new Date(a.received_date || '9999-12-31') - new Date(b.received_date || '9999-12-31'));
// Split receivings into original PO and others
const originalPOReceivings = allReceivings.filter(r => r.type === 'original');
const otherReceivings = allReceivings.filter(r => r.type !== 'original');
// Track FIFO fulfillment // Track FIFO fulfillment
let remainingToFulfill = product.ordered; let remainingToFulfill = product.ordered;
const fulfillmentTracking = []; const fulfillmentTracking = [];
let totalReceived = 0; let totalReceived = 0;
let actualCost = null; // Will store the cost of the first receiving that fulfills this PO
let firstFulfillmentReceiving = null;
let lastFulfillmentReceiving = null;
for (const receiving of allReceivings) { for (const receiving of allReceivings) {
const qtyToApply = Math.min(remainingToFulfill, receiving.qty_each); const qtyToApply = Math.min(remainingToFulfill, receiving.qty_each);
if (qtyToApply > 0) { if (qtyToApply > 0) {
// If this is the first receiving being applied, use its cost
if (actualCost === null) {
actualCost = receiving.cost_each;
firstFulfillmentReceiving = receiving;
}
lastFulfillmentReceiving = receiving;
fulfillmentTracking.push({ fulfillmentTracking.push({
receiving_id: receiving.receiving_id, receiving_id: receiving.receiving_id,
qty_applied: qtyToApply, qty_applied: qtyToApply,
@@ -253,6 +336,7 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
cost: receiving.cost_each, cost: receiving.cost_each,
date: receiving.received_date, date: receiving.received_date,
received_by: receiving.received_by, received_by: receiving.received_by,
received_by_name: receiving.received_by_name || 'Unknown',
type: receiving.type, type: receiving.type,
remaining_qty: receiving.qty_each - qtyToApply remaining_qty: receiving.qty_each - qtyToApply
}); });
@@ -266,6 +350,7 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
cost: receiving.cost_each, cost: receiving.cost_each,
date: receiving.received_date, date: receiving.received_date,
received_by: receiving.received_by, received_by: receiving.received_by,
received_by_name: receiving.received_by_name || 'Unknown',
type: receiving.type, type: receiving.type,
is_excess: true is_excess: true
}); });
@@ -277,18 +362,31 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
remainingToFulfill > 0 ? 30 : // partial remainingToFulfill > 0 ? 30 : // partial
40; // full 40; // full
const firstReceiving = allReceivings[0] || {}; function formatDate(dateStr) {
const lastReceiving = allReceivings[allReceivings.length - 1] || {}; if (!dateStr) return null;
if (dateStr === '0000-00-00' || dateStr === '0000-00-00 00:00:00') return null;
if (typeof dateStr === 'string' && !dateStr.match(/^\d{4}-\d{2}-\d{2}/)) return null;
try {
const date = new Date(dateStr);
if (isNaN(date.getTime())) return null;
if (date.getFullYear() < 1900 || date.getFullYear() > 2100) return null;
return date.toISOString().split('T')[0];
} catch (e) {
return null;
}
}
values.push(columnNames.map(col => { const rowValues = columnNames.map(col => {
switch (col) { switch (col) {
case 'po_id': return po.po_id; case 'po_id': return po.po_id;
case 'vendor': return po.vendor; case 'vendor': return po.vendor;
case 'date': return po.date; case 'date': return formatDate(po.date);
case 'expected_date': return po.expected_date; case 'expected_date': return formatDate(po.expected_date);
case 'pid': return product.pid; case 'pid': return product.pid;
case 'sku': return product.sku; case 'sku': return product.sku;
case 'cost_price': return product.cost_price; case 'name': return product.name;
case 'cost_price': return actualCost || product.cost_each;
case 'po_cost_price': return product.cost_each;
case 'status': return po.status; case 'status': return po.status;
case 'notes': return po.notes; case 'notes': return po.notes;
case 'long_note': return po.long_note; case 'long_note': return po.long_note;
@@ -296,44 +394,106 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
case 'received': return totalReceived; case 'received': return totalReceived;
case 'unfulfilled': return remainingToFulfill; case 'unfulfilled': return remainingToFulfill;
case 'excess_received': return Math.max(0, totalReceived - product.ordered); case 'excess_received': return Math.max(0, totalReceived - product.ordered);
case 'received_date': return firstReceiving.received_date || null; case 'received_date': return formatDate(firstFulfillmentReceiving?.received_date);
case 'last_received_date': return lastReceiving.received_date || null; case 'last_received_date': return formatDate(lastFulfillmentReceiving?.received_date);
case 'received_by': return firstReceiving.received_by || null; case 'received_by': return firstFulfillmentReceiving?.received_by_name || null;
case 'receiving_status': return receiving_status; case 'receiving_status': return receiving_status;
case 'receiving_history': return JSON.stringify({ case 'receiving_history': return JSON.stringify({
fulfillment: fulfillmentTracking, fulfillment: fulfillmentTracking,
ordered_qty: product.ordered, ordered_qty: product.ordered,
total_received: totalReceived, total_received: totalReceived,
remaining_unfulfilled: remainingToFulfill, remaining_unfulfilled: remainingToFulfill,
excess_received: Math.max(0, totalReceived - product.ordered) excess_received: Math.max(0, totalReceived - product.ordered),
po_cost: product.cost_each,
actual_cost: actualCost || product.cost_each
}); });
default: return null; default: return null;
} }
})); });
if (existingPOMap.has(key)) {
const existing = existingPOMap.get(key);
// Check if any values are different
const hasChanges = columnNames.some(col => {
const newVal = rowValues[columnNames.indexOf(col)];
const oldVal = existing[col] ?? null;
// Special handling for numbers to avoid type coercion issues
if (typeof newVal === 'number' && typeof oldVal === 'number') {
return Math.abs(newVal - oldVal) > 0.00001; // Allow for tiny floating point differences
}
// Special handling for receiving_history - parse and compare
if (col === 'receiving_history') {
const newHistory = JSON.parse(newVal || '{}');
const oldHistory = JSON.parse(oldVal || '{}');
return JSON.stringify(newHistory) !== JSON.stringify(oldHistory);
}
return newVal !== oldVal;
});
if (hasChanges) {
insertsAndUpdates.updates.push({
po_id: po.po_id,
pid: product.pid,
values: rowValues
});
}
} else {
insertsAndUpdates.inserts.push({
po_id: po.po_id,
pid: product.pid,
values: rowValues
});
}
batchProcessed++; batchProcessed++;
} }
} }
if (values.length > 0) { // Handle inserts
const placeholders = values.map(() => if (insertsAndUpdates.inserts.length > 0) {
`(${Array(columnNames.length).fill("?").join(",")})` const insertPlaceholders = insertsAndUpdates.inserts
).join(","); .map(() => `(${Array(columnNames.length).fill("?").join(",")})`)
.join(",");
const query = ` const insertResult = await localConnection.query(`
INSERT INTO purchase_orders (${columnNames.join(",")}) INSERT INTO purchase_orders (${columnNames.join(",")})
VALUES ${placeholders} VALUES ${insertPlaceholders}
`, insertsAndUpdates.inserts.map(i => i.values).flat());
const affectedRows = insertResult[0].affectedRows;
// For an upsert, MySQL counts rows twice for updates
// So if affectedRows is odd, we have (updates * 2 + inserts)
const updates = Math.floor(affectedRows / 2);
const inserts = affectedRows - (updates * 2);
recordsAdded += inserts;
recordsUpdated += Math.floor(updates); // Ensure we never have fractional updates
processed += batchProcessed;
}
// Handle updates - now we know these actually have changes
if (insertsAndUpdates.updates.length > 0) {
const updatePlaceholders = insertsAndUpdates.updates
.map(() => `(${Array(columnNames.length).fill("?").join(",")})`)
.join(",");
const updateResult = await localConnection.query(`
INSERT INTO purchase_orders (${columnNames.join(",")})
VALUES ${updatePlaceholders}
ON DUPLICATE KEY UPDATE ${columnNames ON DUPLICATE KEY UPDATE ${columnNames
.filter((col) => col !== "po_id" && col !== "pid") .filter((col) => col !== "po_id" && col !== "pid")
.map((col) => `${col} = VALUES(${col})`) .map((col) => `${col} = VALUES(${col})`)
.join(",")}; .join(",")};
`; `, insertsAndUpdates.updates.map(u => u.values).flat());
const result = await localConnection.query(query, values.flat()); const affectedRows = updateResult[0].affectedRows;
recordsAdded += result.affectedRows - result.changedRows; // For an upsert, MySQL counts rows twice for updates
recordsUpdated += result.changedRows; // So if affectedRows is odd, we have (updates * 2 + inserts)
} const updates = Math.floor(affectedRows / 2);
const inserts = affectedRows - (updates * 2);
recordsUpdated += Math.floor(updates); // Ensure we never have fractional updates
processed += batchProcessed; processed += batchProcessed;
}
// Update progress based on time interval // Update progress based on time interval
const now = Date.now(); const now = Date.now();
@@ -364,8 +524,8 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
return { return {
status: "complete", status: "complete",
totalImported: totalItems, totalImported: totalItems,
recordsAdded, recordsAdded: recordsAdded || 0,
recordsUpdated, recordsUpdated: recordsUpdated || 0,
incrementalUpdate, incrementalUpdate,
lastSyncTime lastSyncTime
}; };

View File

@@ -0,0 +1,82 @@
// Split into inserts and updates
const insertsAndUpdates = batch.reduce((acc, po) => {
const key = `${po.po_id}-${po.pid}`;
if (existingPOMap.has(key)) {
const existing = existingPOMap.get(key);
// Check if any values are different
const hasChanges = columnNames.some(col => {
const newVal = po[col] ?? null;
const oldVal = existing[col] ?? null;
// Special handling for numbers to avoid type coercion issues
if (typeof newVal === 'number' && typeof oldVal === 'number') {
return Math.abs(newVal - oldVal) > 0.00001; // Allow for tiny floating point differences
}
// Special handling for receiving_history JSON
if (col === 'receiving_history') {
return JSON.stringify(newVal) !== JSON.stringify(oldVal);
}
return newVal !== oldVal;
});
if (hasChanges) {
console.log(`PO line changed: ${key}`, {
po_id: po.po_id,
pid: po.pid,
changes: columnNames.filter(col => {
const newVal = po[col] ?? null;
const oldVal = existing[col] ?? null;
if (typeof newVal === 'number' && typeof oldVal === 'number') {
return Math.abs(newVal - oldVal) > 0.00001;
}
if (col === 'receiving_history') {
return JSON.stringify(newVal) !== JSON.stringify(oldVal);
}
return newVal !== oldVal;
})
});
acc.updates.push({
po_id: po.po_id,
pid: po.pid,
values: columnNames.map(col => po[col] ?? null)
});
}
} else {
console.log(`New PO line: ${key}`);
acc.inserts.push({
po_id: po.po_id,
pid: po.pid,
values: columnNames.map(col => po[col] ?? null)
});
}
return acc;
}, { inserts: [], updates: [] });
// Handle inserts
if (insertsAndUpdates.inserts.length > 0) {
const insertPlaceholders = Array(insertsAndUpdates.inserts.length).fill(placeholderGroup).join(",");
const insertResult = await localConnection.query(`
INSERT INTO purchase_orders (${columnNames.join(",")})
VALUES ${insertPlaceholders}
`, insertsAndUpdates.inserts.map(i => i.values).flat());
recordsAdded += insertResult[0].affectedRows;
}
// Handle updates
if (insertsAndUpdates.updates.length > 0) {
const updatePlaceholders = Array(insertsAndUpdates.updates.length).fill(placeholderGroup).join(",");
const updateResult = await localConnection.query(`
INSERT INTO purchase_orders (${columnNames.join(",")})
VALUES ${updatePlaceholders}
ON DUPLICATE KEY UPDATE
${columnNames
.filter(col => col !== "po_id" && col !== "pid")
.map(col => `${col} = VALUES(${col})`)
.join(",")};
`, insertsAndUpdates.updates.map(u => u.values).flat());
// Each update affects 2 rows in affectedRows, so we divide by 2 to get actual count
recordsUpdated += insertsAndUpdates.updates.length;
}