7 Commits

Author SHA1 Message Date
9e1989ac66 Cleanup 2025-02-01 14:08:27 -05:00
5bfd6f6d04 Fix import script order count issues 2025-02-01 12:54:33 -05:00
1003ff3cf2 More incremental import fixes 2025-02-01 11:42:51 -05:00
2d0089dc52 Incremental import order fixes 2025-02-01 11:03:42 -05:00
50b86d6d8a Fix/add data to PO script 2025-02-01 10:51:47 -05:00
07f14c0017 Fix/add data to orders script and fix other import errors 2025-02-01 01:06:45 -05:00
e77b488cd4 Fix/add data to products script 2025-01-31 18:44:11 -05:00
5 changed files with 701 additions and 691 deletions

1
.gitignore vendored
View File

@@ -57,3 +57,4 @@ csv/**/*
**/csv/**/* **/csv/**/*
!csv/.gitkeep !csv/.gitkeep
inventory/tsconfig.tsbuildinfo inventory/tsconfig.tsbuildinfo
inventory-server/scripts/.fuse_hidden00000fa20000000a

View File

@@ -39,7 +39,7 @@ CREATE TABLE products (
tags TEXT, tags TEXT,
moq INT DEFAULT 1, moq INT DEFAULT 1,
uom INT DEFAULT 1, uom INT DEFAULT 1,
rating TINYINT UNSIGNED DEFAULT 0, rating DECIMAL(10,2) DEFAULT 0.00,
reviews INT UNSIGNED DEFAULT 0, reviews INT UNSIGNED DEFAULT 0,
weight DECIMAL(10,3), weight DECIMAL(10,3),
length DECIMAL(10,3), length DECIMAL(10,3),
@@ -113,6 +113,7 @@ CREATE TABLE IF NOT EXISTS orders (
tax DECIMAL(10,3) DEFAULT 0.000, tax DECIMAL(10,3) DEFAULT 0.000,
tax_included TINYINT(1) DEFAULT 0, tax_included TINYINT(1) DEFAULT 0,
shipping DECIMAL(10,3) DEFAULT 0.000, shipping DECIMAL(10,3) DEFAULT 0.000,
costeach DECIMAL(10,3) DEFAULT 0.000,
customer VARCHAR(50) NOT NULL, customer VARCHAR(50) NOT NULL,
customer_name VARCHAR(100), customer_name VARCHAR(100),
status VARCHAR(20) DEFAULT 'pending', status VARCHAR(20) DEFAULT 'pending',
@@ -136,7 +137,9 @@ CREATE TABLE purchase_orders (
expected_date DATE, expected_date DATE,
pid BIGINT NOT NULL, pid BIGINT NOT NULL,
sku VARCHAR(50) NOT NULL, sku VARCHAR(50) NOT NULL,
name VARCHAR(100) NOT NULL COMMENT 'Product name from products.description',
cost_price DECIMAL(10, 3) NOT NULL, cost_price DECIMAL(10, 3) NOT NULL,
po_cost_price DECIMAL(10, 3) NOT NULL COMMENT 'Original cost from PO, before receiving adjustments',
status TINYINT UNSIGNED DEFAULT 1 COMMENT '0=canceled,1=created,10=electronically_ready_send,11=ordered,12=preordered,13=electronically_sent,15=receiving_started,50=done', status TINYINT UNSIGNED DEFAULT 1 COMMENT '0=canceled,1=created,10=electronically_ready_send,11=ordered,12=preordered,13=electronically_sent,15=receiving_started,50=done',
receiving_status TINYINT UNSIGNED DEFAULT 1 COMMENT '0=canceled,1=created,30=partial_received,40=full_received,50=paid', receiving_status TINYINT UNSIGNED DEFAULT 1 COMMENT '0=canceled,1=created,30=partial_received,40=full_received,50=paid',
notes TEXT, notes TEXT,

View File

@@ -19,6 +19,13 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
const missingProducts = new Set(); const missingProducts = new Set();
let recordsAdded = 0; let recordsAdded = 0;
let recordsUpdated = 0; let recordsUpdated = 0;
let processedCount = 0;
let importedCount = 0;
let totalOrderItems = 0;
let totalUniqueOrders = 0;
// Add a cumulative counter for processed orders before the loop
let cumulativeProcessedOrders = 0;
try { try {
// Insert temporary table creation queries // Insert temporary table creation queries
@@ -60,6 +67,14 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
PRIMARY KEY (order_id, pid) PRIMARY KEY (order_id, pid)
) ENGINE=InnoDB DEFAULT CHARSET=utf8; ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
`); `);
await localConnection.query(`
CREATE TABLE IF NOT EXISTS temp_order_costs (
order_id INT UNSIGNED NOT NULL,
pid INT UNSIGNED NOT NULL,
costeach DECIMAL(10,3) DEFAULT 0.000,
PRIMARY KEY (order_id, pid)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
`);
// Get column names from the local table // Get column names from the local table
const [columns] = await localConnection.query(` const [columns] = await localConnection.query(`
@@ -78,7 +93,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
console.log('Orders: Using last sync time:', lastSyncTime); console.log('Orders: Using last sync time:', lastSyncTime);
// First get all relevant order items with basic info // First get count of order items
const [[{ total }]] = await prodConnection.query(` const [[{ total }]] = await prodConnection.query(`
SELECT COUNT(*) as total SELECT COUNT(*) as total
FROM order_items oi FROM order_items oi
@@ -107,7 +122,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
` : ''} ` : ''}
`, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []); `, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []);
console.log('Orders: Found changes:', total); totalOrderItems = total;
console.log('Orders: Found changes:', totalOrderItems);
// Get order items in batches // Get order items in batches
const [orderItems] = await prodConnection.query(` const [orderItems] = await prodConnection.query(`
@@ -117,7 +133,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
oi.prod_itemnumber as SKU, oi.prod_itemnumber as SKU,
oi.prod_price as price, oi.prod_price as price,
oi.qty_ordered as quantity, oi.qty_ordered as quantity,
COALESCE(oi.prod_price_reg - oi.prod_price, 0) * oi.qty_ordered as base_discount, COALESCE(oi.prod_price_reg - oi.prod_price, 0) as base_discount,
oi.stamp as last_modified oi.stamp as last_modified
FROM order_items oi FROM order_items oi
USE INDEX (PRIMARY) USE INDEX (PRIMARY)
@@ -147,9 +163,6 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
console.log('Orders: Processing', orderItems.length, 'order items'); console.log('Orders: Processing', orderItems.length, 'order items');
const totalOrders = orderItems.length;
let processed = 0;
// Insert order items in batches // Insert order items in batches
for (let i = 0; i < orderItems.length; i += 5000) { for (let i = 0; i < orderItems.length; i += 5000) {
const batch = orderItems.slice(i, Math.min(i + 5000, orderItems.length)); const batch = orderItems.slice(i, Math.min(i + 5000, orderItems.length));
@@ -168,22 +181,30 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
base_discount = VALUES(base_discount) base_discount = VALUES(base_discount)
`, values); `, values);
processed += batch.length; processedCount = i + batch.length;
outputProgress({ outputProgress({
status: "running", status: "running",
operation: "Orders import", operation: "Orders import",
message: `Loading order items: ${processed} of ${totalOrders}`, message: `Loading order items: ${processedCount} of ${totalOrderItems}`,
current: processed, current: processedCount,
total: totalOrders total: totalOrderItems
}); });
} }
// Get unique order IDs // Get unique order IDs
const orderIds = [...new Set(orderItems.map(item => item.order_id))]; const orderIds = [...new Set(orderItems.map(item => item.order_id))];
totalUniqueOrders = orderIds.length;
console.log('Total unique order IDs:', totalUniqueOrders);
// Reset processed count for order processing phase
processedCount = 0;
// Get order metadata in batches // Get order metadata in batches
for (let i = 0; i < orderIds.length; i += 5000) { for (let i = 0; i < orderIds.length; i += 5000) {
const batchIds = orderIds.slice(i, i + 5000); const batchIds = orderIds.slice(i, i + 5000);
console.log(`Processing batch ${i/5000 + 1}, size: ${batchIds.length}`);
console.log('Sample of batch IDs:', batchIds.slice(0, 5));
const [orders] = await prodConnection.query(` const [orders] = await prodConnection.query(`
SELECT SELECT
o.order_id, o.order_id,
@@ -197,6 +218,14 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
WHERE o.order_id IN (?) WHERE o.order_id IN (?)
`, [batchIds]); `, [batchIds]);
console.log(`Retrieved ${orders.length} orders for ${batchIds.length} IDs`);
const duplicates = orders.filter((order, index, self) =>
self.findIndex(o => o.order_id === order.order_id) !== index
);
if (duplicates.length > 0) {
console.log('Found duplicates:', duplicates);
}
const placeholders = orders.map(() => "(?, ?, ?, ?, ?, ?)").join(","); const placeholders = orders.map(() => "(?, ?, ?, ?, ?, ?)").join(",");
const values = orders.flatMap(order => [ const values = orders.flatMap(order => [
order.order_id, order.date, order.customer, order.customer_name, order.status, order.canceled order.order_id, order.date, order.customer, order.customer_name, order.status, order.canceled
@@ -204,17 +233,27 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
await localConnection.query(` await localConnection.query(`
INSERT INTO temp_order_meta VALUES ${placeholders} INSERT INTO temp_order_meta VALUES ${placeholders}
ON DUPLICATE KEY UPDATE
date = VALUES(date),
customer = VALUES(customer),
customer_name = VALUES(customer_name),
status = VALUES(status),
canceled = VALUES(canceled)
`, values); `, values);
processedCount = i + orders.length;
outputProgress({ outputProgress({
status: "running", status: "running",
operation: "Orders import", operation: "Orders import",
message: `Loading order metadata: ${i + orders.length} of ${orderIds.length}`, message: `Loading order metadata: ${processedCount} of ${totalUniqueOrders}`,
current: i + orders.length, current: processedCount,
total: orderIds.length total: totalUniqueOrders
}); });
} }
// Reset processed count for final phase
processedCount = 0;
// Get promotional discounts in batches // Get promotional discounts in batches
for (let i = 0; i < orderIds.length; i += 5000) { for (let i = 0; i < orderIds.length; i += 5000) {
const batchIds = orderIds.slice(i, i + 5000); const batchIds = orderIds.slice(i, i + 5000);
@@ -231,6 +270,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
await localConnection.query(` await localConnection.query(`
INSERT INTO temp_order_discounts VALUES ${placeholders} INSERT INTO temp_order_discounts VALUES ${placeholders}
ON DUPLICATE KEY UPDATE
discount = VALUES(discount)
`, values); `, values);
} }
} }
@@ -266,14 +307,33 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
const placeholders = Array(uniqueTaxes.size).fill("(?, ?, ?)").join(","); const placeholders = Array(uniqueTaxes.size).fill("(?, ?, ?)").join(",");
await localConnection.query(` await localConnection.query(`
INSERT INTO temp_order_taxes VALUES ${placeholders} INSERT INTO temp_order_taxes VALUES ${placeholders}
ON DUPLICATE KEY UPDATE tax = VALUES(tax)
`, values); `, values);
} }
} }
} }
// Now combine all the data and insert into orders table // Get costeach values in batches
let importedCount = 0; for (let i = 0; i < orderIds.length; i += 5000) {
const batchIds = orderIds.slice(i, i + 5000);
const [costs] = await prodConnection.query(`
SELECT orderid as order_id, pid, costeach
FROM order_costs
WHERE orderid IN (?)
`, [batchIds]);
if (costs.length > 0) {
const placeholders = costs.map(() => '(?, ?, ?)').join(",");
const values = costs.flatMap(c => [c.order_id, c.pid, c.costeach]);
await localConnection.query(`
INSERT INTO temp_order_costs (order_id, pid, costeach)
VALUES ${placeholders}
ON DUPLICATE KEY UPDATE costeach = VALUES(costeach)
`, values);
}
}
// Now combine all the data and insert into orders table
// Pre-check all products at once instead of per batch // Pre-check all products at once instead of per batch
const allOrderPids = [...new Set(orderItems.map(item => item.pid))]; const allOrderPids = [...new Set(orderItems.map(item => item.pid))];
const [existingProducts] = allOrderPids.length > 0 ? await localConnection.query( const [existingProducts] = allOrderPids.length > 0 ? await localConnection.query(
@@ -302,17 +362,21 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
om.customer, om.customer,
om.customer_name, om.customer_name,
om.status, om.status,
om.canceled om.canceled,
COALESCE(tc.costeach, 0) as costeach
FROM temp_order_items oi FROM temp_order_items oi
JOIN temp_order_meta om ON oi.order_id = om.order_id JOIN temp_order_meta om ON oi.order_id = om.order_id
LEFT JOIN temp_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid LEFT JOIN temp_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid
LEFT JOIN temp_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid LEFT JOIN temp_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid
LEFT JOIN temp_order_costs tc ON oi.order_id = tc.order_id AND oi.pid = tc.pid
WHERE oi.order_id IN (?) WHERE oi.order_id IN (?)
`, [batchIds]); `, [batchIds]);
// Filter orders and track missing products - do this in a single pass // Filter orders and track missing products - do this in a single pass
const validOrders = []; const validOrders = [];
const values = []; const values = [];
const processedOrderItems = new Set(); // Track unique order items
const processedOrders = new Set(); // Track unique orders
for (const order of orders) { for (const order of orders) {
if (!existingPids.has(order.pid)) { if (!existingPids.has(order.pid)) {
@@ -322,6 +386,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
} }
validOrders.push(order); validOrders.push(order);
values.push(...columnNames.map(col => order[col] ?? null)); values.push(...columnNames.map(col => order[col] ?? null));
processedOrderItems.add(`${order.order_number}-${order.pid}`);
processedOrders.add(order.order_number);
} }
if (validOrders.length > 0) { if (validOrders.length > 0) {
@@ -329,65 +395,102 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
const singlePlaceholder = `(${columnNames.map(() => "?").join(",")})`; const singlePlaceholder = `(${columnNames.map(() => "?").join(",")})`;
const placeholders = Array(validOrders.length).fill(singlePlaceholder).join(","); const placeholders = Array(validOrders.length).fill(singlePlaceholder).join(",");
// First check which orders exist and get their current values const result = await localConnection.query(`
const [existingOrders] = await localConnection.query( INSERT INTO orders (${columnNames.join(",")})
`SELECT ${columnNames.join(',')} FROM orders WHERE (order_number, pid) IN (${validOrders.map(() => "(?,?)").join(",")})`, VALUES ${placeholders}
validOrders.flatMap(o => [o.order_number, o.pid]) ON DUPLICATE KEY UPDATE
); SKU = VALUES(SKU),
const existingOrderMap = new Map( date = VALUES(date),
existingOrders.map(o => [`${o.order_number}-${o.pid}`, o]) price = VALUES(price),
); quantity = VALUES(quantity),
discount = VALUES(discount),
tax = VALUES(tax),
tax_included = VALUES(tax_included),
shipping = VALUES(shipping),
customer = VALUES(customer),
customer_name = VALUES(customer_name),
status = VALUES(status),
canceled = VALUES(canceled),
costeach = VALUES(costeach)
`, validOrders.map(o => columnNames.map(col => o[col] ?? null)).flat());
// Split into inserts and updates const affectedRows = result[0].affectedRows;
const insertsAndUpdates = validOrders.reduce((acc, order) => { const updates = Math.floor(affectedRows / 2);
const key = `${order.order_number}-${order.pid}`; const inserts = affectedRows - (updates * 2);
if (existingOrderMap.has(key)) {
const existing = existingOrderMap.get(key);
// Check if any values are different
const hasChanges = columnNames.some(col => {
const newVal = order[col] ?? null;
const oldVal = existing[col] ?? null;
// Special handling for numbers to avoid type coercion issues
if (typeof newVal === 'number' && typeof oldVal === 'number') {
return Math.abs(newVal - oldVal) > 0.00001; // Allow for tiny floating point differences
}
return newVal !== oldVal;
});
if (hasChanges) { recordsAdded += inserts;
acc.updates.push({ recordsUpdated += updates;
order_number: order.order_number, importedCount += processedOrderItems.size; // Count unique order items processed
pid: order.pid, }
values: columnNames.map(col => order[col] ?? null)
});
} else {
acc.inserts.push({
order_number: order.order_number,
pid: order.pid,
values: columnNames.map(col => order[col] ?? null)
});
}
return acc;
// Handle inserts // Update progress based on unique orders processed
if (insertsAndUpdates.inserts.length > 0) { cumulativeProcessedOrders += processedOrders.size;
const insertPlaceholders = Array(insertsAndUpdates.inserts.length).fill(singlePlaceholder).join(","); outputProgress({
status: "running",
operation: "Orders import",
message: `Imported ${importedCount} order items (${cumulativeProcessedOrders} of ${totalUniqueOrders} orders processed)`,
current: cumulativeProcessedOrders,
total: totalUniqueOrders,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, cumulativeProcessedOrders, totalUniqueOrders),
rate: calculateRate(startTime, cumulativeProcessedOrders)
});
}
const insertResult = await localConnection.query(` // Now try to import any orders that were skipped due to missing products
INSERT INTO orders (${columnNames.join(",")}) if (skippedOrders.size > 0) {
VALUES ${insertPlaceholders} try {
`, insertsAndUpdates.inserts.map(i => i.values).flat()); outputProgress({
status: "running",
operation: "Orders import",
message: `Retrying import of ${skippedOrders.size} orders with previously missing products`,
});
recordsAdded += insertResult[0].affectedRows; // Get the orders that were skipped
} const [skippedProdOrders] = await localConnection.query(`
SELECT DISTINCT
oi.order_id as order_number,
oi.pid,
oi.SKU,
om.date,
oi.price,
oi.quantity,
oi.base_discount + COALESCE(od.discount, 0) as discount,
COALESCE(ot.tax, 0) as tax,
0 as tax_included,
0 as shipping,
om.customer,
om.customer_name,
om.status,
om.canceled,
COALESCE(tc.costeach, 0) as costeach
FROM temp_order_items oi
JOIN temp_order_meta om ON oi.order_id = om.order_id
LEFT JOIN temp_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid
LEFT JOIN temp_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid
LEFT JOIN temp_order_costs tc ON oi.order_id = tc.order_id AND oi.pid = tc.pid
WHERE oi.order_id IN (?)
`, [Array.from(skippedOrders)]);
// Handle updates - now we know these actually have changes // Check which products exist now
if (insertsAndUpdates.updates.length > 0) { const skippedPids = [...new Set(skippedProdOrders.map(o => o.pid))];
const updatePlaceholders = Array(insertsAndUpdates.updates.length).fill(singlePlaceholder).join(","); const [existingProducts] = skippedPids.length > 0 ? await localConnection.query(
"SELECT pid FROM products WHERE pid IN (?)",
[skippedPids]
) : [[]];
const existingPids = new Set(existingProducts.map(p => p.pid));
const updateResult = await localConnection.query(` // Filter orders that can now be imported
INSERT INTO orders (${columnNames.join(",")}) const validOrders = skippedProdOrders.filter(order => existingPids.has(order.pid));
VALUES ${updatePlaceholders} const retryOrderItems = new Set(); // Track unique order items in retry
if (validOrders.length > 0) {
const placeholders = validOrders.map(() => `(${columnNames.map(() => "?").join(", ")})`).join(",");
const values = validOrders.map(o => columnNames.map(col => o[col] ?? null)).flat();
const result = await localConnection.query(`
INSERT INTO orders (${columnNames.join(", ")})
VALUES ${placeholders}
ON DUPLICATE KEY UPDATE ON DUPLICATE KEY UPDATE
SKU = VALUES(SKU), SKU = VALUES(SKU),
date = VALUES(date), date = VALUES(date),
@@ -400,148 +503,45 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
customer = VALUES(customer), customer = VALUES(customer),
customer_name = VALUES(customer_name), customer_name = VALUES(customer_name),
status = VALUES(status), status = VALUES(status),
canceled = VALUES(canceled) canceled = VALUES(canceled),
`, insertsAndUpdates.updates.map(u => u.values).flat()); costeach = VALUES(costeach)
`, values);
recordsUpdated += updateResult[0].affectedRows / 2; // Each update counts as 2 in affectedRows const affectedRows = result[0].affectedRows;
const updates = Math.floor(affectedRows / 2);
const inserts = affectedRows - (updates * 2);
// Track unique order items
validOrders.forEach(order => {
retryOrderItems.add(`${order.order_number}-${order.pid}`);
});
outputProgress({
status: "running",
operation: "Orders import",
message: `Successfully imported ${retryOrderItems.size} previously skipped order items`,
});
// Update the main counters
recordsAdded += inserts;
recordsUpdated += updates;
importedCount += retryOrderItems.size;
} }
} catch (error) {
importedCount += validOrders.length; console.warn('Warning: Failed to retry skipped orders:', error.message);
console.warn(`Skipped ${skippedOrders.size} orders due to ${missingProducts.size} missing products`);
} }
outputProgress({
status: "running",
operation: "Orders import",
message: `Imported ${importedCount} of ${totalOrders} orders`,
current: importedCount,
total: totalOrders,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, importedCount, totalOrders),
rate: calculateRate(startTime, importedCount)
});
} }
// Clean up temporary tables // Clean up temporary tables after ALL processing is complete
await localConnection.query(` await localConnection.query(`
DROP TEMPORARY TABLE IF EXISTS temp_order_items; DROP TEMPORARY TABLE IF EXISTS temp_order_items;
DROP TEMPORARY TABLE IF EXISTS temp_order_meta; DROP TEMPORARY TABLE IF EXISTS temp_order_meta;
DROP TEMPORARY TABLE IF EXISTS temp_order_discounts; DROP TEMPORARY TABLE IF EXISTS temp_order_discounts;
DROP TEMPORARY TABLE IF EXISTS temp_order_taxes; DROP TEMPORARY TABLE IF EXISTS temp_order_taxes;
DROP TEMPORARY TABLE IF EXISTS temp_order_costs;
`); `);
// Import missing products if any
if (missingProducts.size > 0) {
try {
// Import missing products directly without materialization
await importMissingProducts(prodConnection, localConnection, Array.from(missingProducts));
// Retry skipped orders after importing products
if (skippedOrders.size > 0) {
outputProgress({
status: "running",
operation: "Orders import",
message: `Retrying import of ${skippedOrders.size} orders with previously missing products`
});
const skippedOrdersArray = Array.from(skippedOrders);
const [skippedProdOrders] = skippedOrdersArray.length > 0 ? await prodConnection.query(`
SELECT
o.order_id,
CASE
WHEN o.date_placed = '0000-00-00 00:00:00' OR o.date_placed IS NULL THEN o.stamp
ELSE o.date_placed
END as date,
o.order_cid,
o.bill_firstname,
o.bill_lastname,
o.order_email,
o.order_status,
o.date_shipped,
o.date_cancelled,
oi.prod_pid,
oi.prod_itemnumber,
oi.prod_price,
oi.qty_ordered,
oi.qty_back,
oi.qty_placed,
oi.qty_placed_2,
oi.discounted,
oi.summary_cogs,
oi.summary_profit,
oi.summary_orderdate,
oi.summary_paiddate,
oi.date_added,
oi.stamp
FROM order_items oi
JOIN _order o ON oi.order_id = o.order_id
WHERE o.order_id IN (?)
`, [skippedOrdersArray]) : [[]];
// Prepare values for insertion
const skippedOrderValues = skippedProdOrders.flatMap(order => {
if (!order.date) {
console.log(`Warning: Skipped order ${order.order_id} has null date:`, JSON.stringify(order, null, 2));
return [];
}
const canceled = order.date_cancelled !== '0000-00-00 00:00:00' ? 1 : 0;
const customerName = `${order.bill_firstname} ${order.bill_lastname}`;
// Create an object with keys based on column names
const orderData = {
id: order.order_id,
order_number: order.order_id,
pid: order.prod_pid,
SKU: order.prod_itemnumber,
date: order.date ? (
order.date instanceof Date ?
order.date.toJSON()?.slice(0,10) || null :
(typeof order.date === 'string' ? order.date.split(' ')[0] : null)
) : null,
price: order.prod_price,
quantity: order.qty_ordered,
discount: order.discounted,
tax: 0, // Placeholder, will be calculated later
tax_included: 0, // Placeholder, will be calculated later
shipping: 0, // Placeholder, will be calculated later
customer: order.order_email,
customer_name: customerName,
status: order.order_status,
canceled: canceled,
};
// Map column names to values, handling missing columns
return [columnNames.map(colName => orderData[colName] !== undefined ? orderData[colName] : null)];
});
// Construct the insert query dynamically
const skippedPlaceholders = skippedProdOrders.map(() => `(${columnNames.map(() => "?").join(", ")})`).join(",");
const skippedInsertQuery = `
INSERT INTO orders (${columnNames.join(", ")})
VALUES ${skippedPlaceholders}
ON DUPLICATE KEY UPDATE
${columnNames.map(col => `${col} = VALUES(${col})`).join(", ")}
`;
// Execute the insert query
if (skippedOrderValues.length > 0) {
await localConnection.query(skippedInsertQuery, skippedOrderValues.flat());
}
importedCount += skippedProdOrders.length;
outputProgress({
status: "running",
operation: "Orders import",
message: `Successfully imported ${skippedProdOrders.length} previously skipped orders`,
});
}
} catch (error) {
console.warn('Warning: Failed to import missing products:', error.message);
console.warn(`Skipped ${skippedOrders.size} orders due to ${missingProducts.size} missing products`);
}
}
// Only update sync status if we get here (no errors thrown) // Only update sync status if we get here (no errors thrown)
await localConnection.query(` await localConnection.query(`
INSERT INTO sync_status (table_name, last_sync_timestamp) INSERT INTO sync_status (table_name, last_sync_timestamp)
@@ -551,9 +551,9 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
return { return {
status: "complete", status: "complete",
totalImported: importedCount, totalImported: Math.floor(importedCount),
recordsAdded: recordsAdded || 0, recordsAdded: recordsAdded || 0,
recordsUpdated: recordsUpdated || 0, recordsUpdated: Math.floor(recordsUpdated),
totalSkipped: skippedOrders.size, totalSkipped: skippedOrders.size,
missingProducts: missingProducts.size, missingProducts: missingProducts.size,
incrementalUpdate, incrementalUpdate,

View File

@@ -2,158 +2,312 @@ const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } =
// Utility functions // Utility functions
const imageUrlBase = 'https://sbing.com/i/products/0000/'; const imageUrlBase = 'https://sbing.com/i/products/0000/';
const getImageUrls = (pid) => { const getImageUrls = (pid, iid = 1) => {
const paddedPid = pid.toString().padStart(6, '0'); const paddedPid = pid.toString().padStart(6, '0');
const basePath = `${imageUrlBase}${paddedPid.slice(0, 3)}/${pid}`; // Use padded PID only for the first 3 digits
const prefix = paddedPid.slice(0, 3);
// Use the actual pid for the rest of the URL
const basePath = `${imageUrlBase}${prefix}/${pid}`;
return { return {
image: `${basePath}-t-`, image: `${basePath}-t-${iid}.jpg`,
image_175: `${basePath}-175x175-`, image_175: `${basePath}-175x175-${iid}.jpg`,
image_full: `${basePath}-o-` image_full: `${basePath}-o-${iid}.jpg`
}; };
}; };
async function setupTemporaryTables(connection) { async function setupAndCleanupTempTables(connection, operation = 'setup') {
await connection.query(`CREATE TEMPORARY TABLE IF NOT EXISTS temp_categories ( cat_id INT PRIMARY KEY, name VARCHAR(255) ) ENGINE=InnoDB;`); if (operation === 'setup') {
await connection.query(`CREATE TEMPORARY TABLE IF NOT EXISTS temp_product_images ( pid INT, iid INT, image_type ENUM('thumbnail', '175', 'full'), url VARCHAR(255), PRIMARY KEY (pid, image_type) ) ENGINE=InnoDB;`); await connection.query(`
await connection.query(`CREATE TEMPORARY TABLE IF NOT EXISTS temp_inventory_status ( pid INT PRIMARY KEY, stock_quantity INT, pending_qty INT, preorder_count INT, notions_inv_count INT, needs_update BOOLEAN ) ENGINE=InnoDB;`); CREATE TEMPORARY TABLE IF NOT EXISTS temp_products (
await connection.query(`CREATE TEMPORARY TABLE IF NOT EXISTS temp_product_prices ( pid INT PRIMARY KEY, price DECIMAL(10,2), regular_price DECIMAL(10,2), cost_price DECIMAL(10,5), needs_update BOOLEAN ) ENGINE=InnoDB;`); pid BIGINT NOT NULL,
await connection.query(`INSERT INTO temp_categories SELECT cat_id, name FROM categories;`); title VARCHAR(255),
await connection.query(`CREATE INDEX idx_temp_cat_id ON temp_categories(cat_id);`); description TEXT,
SKU VARCHAR(50),
stock_quantity INT DEFAULT 0,
pending_qty INT DEFAULT 0,
preorder_count INT DEFAULT 0,
notions_inv_count INT DEFAULT 0,
price DECIMAL(10,3) NOT NULL DEFAULT 0,
regular_price DECIMAL(10,3) NOT NULL DEFAULT 0,
cost_price DECIMAL(10,3),
vendor VARCHAR(100),
vendor_reference VARCHAR(100),
notions_reference VARCHAR(100),
brand VARCHAR(100),
line VARCHAR(100),
subline VARCHAR(100),
artist VARCHAR(100),
category_ids TEXT,
created_at DATETIME,
first_received DATETIME,
landing_cost_price DECIMAL(10,3),
barcode VARCHAR(50),
harmonized_tariff_code VARCHAR(50),
updated_at DATETIME,
visible BOOLEAN,
replenishable BOOLEAN,
permalink VARCHAR(255),
moq DECIMAL(10,3),
rating DECIMAL(10,2),
reviews INT,
weight DECIMAL(10,3),
length DECIMAL(10,3),
width DECIMAL(10,3),
height DECIMAL(10,3),
country_of_origin VARCHAR(100),
location VARCHAR(100),
total_sold INT,
baskets INT,
notifies INT,
date_last_sold DATETIME,
needs_update BOOLEAN DEFAULT TRUE,
PRIMARY KEY (pid),
INDEX idx_needs_update (needs_update)
) ENGINE=InnoDB;
`);
} else {
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_products;');
}
} }
async function cleanupTemporaryTables(connection) { async function materializeCalculations(prodConnection, localConnection, incrementalUpdate = true, lastSyncTime = '1970-01-01') {
await connection.query(`
DROP TEMPORARY TABLE IF EXISTS temp_categories;
DROP TEMPORARY TABLE IF EXISTS temp_product_images;
DROP TEMPORARY TABLE IF EXISTS temp_inventory_status;
DROP TEMPORARY TABLE IF EXISTS temp_product_prices;
`);
}
async function materializeCalculations(prodConnection, localConnection) {
outputProgress({ outputProgress({
status: "running", status: "running",
operation: "Products import", operation: "Products import",
message: "Fetching inventory and order data from production" message: "Fetching product data from production"
}); });
// Get all inventory and order data from production in one query // Get all product data in a single optimized query
const [prodInventory] = await prodConnection.query(` const [prodData] = await prodConnection.query(`
SELECT SELECT
p.pid, p.pid,
COALESCE(si.available_local, 0) as stock_quantity, p.description AS title,
p.notes AS description,
p.itemnumber AS SKU,
p.date_created,
p.datein AS first_received,
p.location,
p.upc AS barcode,
p.harmonized_tariff_code,
p.stamp AS updated_at,
CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible,
CASE
WHEN p.reorder < 0 THEN 0
WHEN (
(IFNULL(pls.date_sold, '0000-00-00') = '0000-00-00' OR pls.date_sold <= DATE_SUB(CURDATE(), INTERVAL 5 YEAR))
OR (p.datein = '0000-00-00 00:00:00' OR p.datein <= DATE_SUB(NOW(), INTERVAL 5 YEAR))
OR (p.date_refill = '0000-00-00 00:00:00' OR p.date_refill <= DATE_SUB(NOW(), INTERVAL 5 YEAR))
) THEN 0
ELSE 1
END AS replenishable,
COALESCE(si.available_local, 0) - COALESCE(
(SELECT SUM(oi.qty_ordered - oi.qty_placed)
FROM order_items oi
JOIN _order o ON oi.order_id = o.order_id
WHERE oi.prod_pid = p.pid
AND o.date_placed != '0000-00-00 00:00:00'
AND o.date_shipped = '0000-00-00 00:00:00'
AND oi.pick_finished = 0
AND oi.qty_back = 0
AND o.order_status != 15
AND o.order_status < 90
AND oi.qty_ordered >= oi.qty_placed
AND oi.qty_ordered > 0
), 0
) as stock_quantity,
COALESCE(
(SELECT SUM(oi.qty_ordered - oi.qty_placed)
FROM order_items oi
JOIN _order o ON oi.order_id = o.order_id
WHERE oi.prod_pid = p.pid
AND o.date_placed != '0000-00-00 00:00:00'
AND o.date_shipped = '0000-00-00 00:00:00'
AND oi.pick_finished = 0
AND oi.qty_back = 0
AND o.order_status != 15
AND o.order_status < 90
AND oi.qty_ordered >= oi.qty_placed
AND oi.qty_ordered > 0
), 0
) as pending_qty,
COALESCE(ci.onpreorder, 0) as preorder_count, COALESCE(ci.onpreorder, 0) as preorder_count,
COALESCE(pnb.inventory, 0) as notions_inv_count, COALESCE(pnb.inventory, 0) as notions_inv_count,
COALESCE( COALESCE(pcp.price_each, 0) as price,
( COALESCE(p.sellingprice, 0) AS regular_price,
SELECT SUM(oi.qty_ordered - oi.qty_placed) CASE
FROM order_items oi WHEN EXISTS (SELECT 1 FROM product_inventory WHERE pid = p.pid AND count > 0)
JOIN _order o ON oi.order_id = o.order_id THEN (SELECT ROUND(AVG(costeach), 5) FROM product_inventory WHERE pid = p.pid AND count > 0)
WHERE oi.prod_pid = p.pid ELSE (SELECT costeach FROM product_inventory WHERE pid = p.pid ORDER BY daterec DESC LIMIT 1)
AND o.date_placed != '0000-00-00 00:00:00' END AS cost_price,
AND o.date_shipped = '0000-00-00 00:00:00' NULL as landing_cost_price,
AND oi.pick_finished = 0 s.companyname AS vendor,
AND oi.qty_back = 0 CASE
AND o.order_status != 15 WHEN s.companyname = 'Notions' THEN sid.notions_itemnumber
AND o.order_status < 90 ELSE sid.supplier_itemnumber
AND oi.qty_ordered >= oi.qty_placed END AS vendor_reference,
AND oi.qty_ordered > 0 sid.notions_itemnumber AS notions_reference,
), 0 CONCAT('https://www.acherryontop.com/shop/product/', p.pid) AS permalink,
) as pending_qty pc1.name AS brand,
pc2.name AS line,
pc3.name AS subline,
pc4.name AS artist,
COALESCE(CASE
WHEN sid.supplier_id = 92 THEN sid.notions_qty_per_unit
ELSE sid.supplier_qty_per_unit
END, sid.notions_qty_per_unit) AS moq,
p.rating,
p.rating_votes AS reviews,
p.weight,
p.length,
p.width,
p.height,
p.country_of_origin,
(SELECT COUNT(*) FROM mybasket mb WHERE mb.item = p.pid AND mb.qty > 0) AS baskets,
(SELECT COUNT(*) FROM product_notify pn WHERE pn.pid = p.pid) AS notifies,
p.totalsold AS total_sold,
pls.date_sold as date_last_sold,
GROUP_CONCAT(DISTINCT CASE
WHEN pc.cat_id IS NOT NULL
AND pc.type IN (10, 20, 11, 21, 12, 13)
AND pci.cat_id NOT IN (16, 17)
THEN pci.cat_id
END) as category_ids
FROM products p FROM products p
LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0 LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0
LEFT JOIN current_inventory ci ON p.pid = ci.pid LEFT JOIN current_inventory ci ON p.pid = ci.pid
LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid
`); LEFT JOIN product_current_prices pcp ON p.pid = pcp.pid AND pcp.active = 1
LEFT JOIN supplier_item_data sid ON p.pid = sid.pid
LEFT JOIN suppliers s ON sid.supplier_id = s.supplierid
LEFT JOIN product_category_index pci ON p.pid = pci.pid
LEFT JOIN product_categories pc ON pci.cat_id = pc.cat_id
LEFT JOIN product_categories pc1 ON p.company = pc1.cat_id
LEFT JOIN product_categories pc2 ON p.line = pc2.cat_id
LEFT JOIN product_categories pc3 ON p.subline = pc3.cat_id
LEFT JOIN product_categories pc4 ON p.artist = pc4.cat_id
LEFT JOIN product_last_sold pls ON p.pid = pls.pid
WHERE ${incrementalUpdate ? `
p.stamp > ? OR
ci.stamp > ? OR
pcp.date_deactive > ? OR
pcp.date_active > ? OR
pnb.date_updated > ?
` : 'TRUE'}
GROUP BY p.pid
`, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []);
outputProgress({ outputProgress({
status: "running", status: "running",
operation: "Products import", operation: "Products import",
message: `Processing ${prodInventory.length} inventory records` message: `Processing ${prodData.length} product records`
}); });
// Insert inventory data into local temp table in batches // Insert all product data into temp table in batches
for (let i = 0; i < prodInventory.length; i += 1000) { for (let i = 0; i < prodData.length; i += 1000) {
const batch = prodInventory.slice(i, i + 1000); const batch = prodData.slice(i, i + 1000);
const values = batch.map(row => [ const values = batch.map(row => [
row.pid, row.pid,
Math.max(0, row.stock_quantity - row.pending_qty), // Calculate final stock quantity row.title,
row.description,
row.SKU,
// Set stock quantity to 0 if it's over 5000
row.stock_quantity > 5000 ? 0 : Math.max(0, row.stock_quantity),
row.pending_qty, row.pending_qty,
row.preorder_count, row.preorder_count,
row.notions_inv_count, row.notions_inv_count,
row.price,
row.regular_price,
row.cost_price,
row.vendor,
row.vendor_reference,
row.notions_reference,
row.brand,
row.line,
row.subline,
row.artist,
row.category_ids,
row.date_created, // map to created_at
row.first_received,
row.landing_cost_price,
row.barcode,
row.harmonized_tariff_code,
row.updated_at,
row.visible,
row.replenishable,
row.permalink,
row.moq,
row.rating ? Number(row.rating).toFixed(2) : null,
row.reviews,
row.weight,
row.length,
row.width,
row.height,
row.country_of_origin,
row.location,
row.total_sold,
row.baskets,
row.notifies,
row.date_last_sold,
true // Mark as needing update true // Mark as needing update
]); ]);
if (values.length > 0) { if (values.length > 0) {
await localConnection.query(` await localConnection.query(`
INSERT INTO temp_inventory_status (pid, stock_quantity, pending_qty, preorder_count, notions_inv_count, needs_update) INSERT INTO temp_products (
pid, title, description, SKU,
stock_quantity, pending_qty, preorder_count, notions_inv_count,
price, regular_price, cost_price,
vendor, vendor_reference, notions_reference,
brand, line, subline, artist,
category_ids, created_at, first_received,
landing_cost_price, barcode, harmonized_tariff_code,
updated_at, visible, replenishable, permalink,
moq, rating, reviews, weight, length, width,
height, country_of_origin, location, total_sold,
baskets, notifies, date_last_sold, needs_update
)
VALUES ? VALUES ?
ON DUPLICATE KEY UPDATE ON DUPLICATE KEY UPDATE
title = VALUES(title),
description = VALUES(description),
SKU = VALUES(SKU),
stock_quantity = VALUES(stock_quantity), stock_quantity = VALUES(stock_quantity),
pending_qty = VALUES(pending_qty), pending_qty = VALUES(pending_qty),
preorder_count = VALUES(preorder_count), preorder_count = VALUES(preorder_count),
notions_inv_count = VALUES(notions_inv_count), notions_inv_count = VALUES(notions_inv_count),
needs_update = TRUE
`, [values]);
}
outputProgress({
status: "running",
operation: "Products import",
message: `Processed ${Math.min(i + 1000, prodInventory.length)} of ${prodInventory.length} inventory records`,
current: i + batch.length,
total: prodInventory.length
});
}
outputProgress({
status: "running",
operation: "Products import",
message: "Fetching pricing data from production"
});
// Get prices from production
const [prodPrices] = await prodConnection.query(`
SELECT
p.pid,
COALESCE(pcp.price_each, 0) as price,
COALESCE(p.sellingprice, 0) AS regular_price,
COALESCE(
(SELECT ROUND(AVG(costeach), 5)
FROM product_inventory
WHERE pid = p.pid
AND COUNT > 0), 0
) AS cost_price
FROM products p
LEFT JOIN product_current_prices pcp ON p.pid = pcp.pid
WHERE pcp.active = 1
`);
outputProgress({
status: "running",
operation: "Products import",
message: `Processing ${prodPrices.length} price records`
});
// Insert prices into local temp table in batches
for (let i = 0; i < prodPrices.length; i += 1000) {
const batch = prodPrices.slice(i, i + 1000);
const values = batch.map(row => [
row.pid,
row.price,
row.regular_price,
row.cost_price,
true // Mark as needing update
]);
if (values.length > 0) {
await localConnection.query(`
INSERT INTO temp_product_prices (pid, price, regular_price, cost_price, needs_update)
VALUES ?
ON DUPLICATE KEY UPDATE
price = VALUES(price), price = VALUES(price),
regular_price = VALUES(regular_price), regular_price = VALUES(regular_price),
cost_price = VALUES(cost_price), cost_price = VALUES(cost_price),
vendor = VALUES(vendor),
vendor_reference = VALUES(vendor_reference),
notions_reference = VALUES(notions_reference),
brand = VALUES(brand),
line = VALUES(line),
subline = VALUES(subline),
artist = VALUES(artist),
category_ids = VALUES(category_ids),
created_at = VALUES(created_at),
first_received = VALUES(first_received),
landing_cost_price = VALUES(landing_cost_price),
barcode = VALUES(barcode),
harmonized_tariff_code = VALUES(harmonized_tariff_code),
updated_at = VALUES(updated_at),
visible = VALUES(visible),
replenishable = VALUES(replenishable),
permalink = VALUES(permalink),
moq = VALUES(moq),
rating = VALUES(rating),
reviews = VALUES(reviews),
weight = VALUES(weight),
length = VALUES(length),
width = VALUES(width),
height = VALUES(height),
country_of_origin = VALUES(country_of_origin),
location = VALUES(location),
total_sold = VALUES(total_sold),
baskets = VALUES(baskets),
notifies = VALUES(notifies),
date_last_sold = VALUES(date_last_sold),
needs_update = TRUE needs_update = TRUE
`, [values]); `, [values]);
} }
@@ -161,9 +315,9 @@ async function materializeCalculations(prodConnection, localConnection) {
outputProgress({ outputProgress({
status: "running", status: "running",
operation: "Products import", operation: "Products import",
message: `Processed ${Math.min(i + 1000, prodPrices.length)} of ${prodPrices.length} price records`, message: `Processed ${Math.min(i + 1000, prodData.length)} of ${prodData.length} product records`,
current: i + batch.length, current: i + batch.length,
total: prodPrices.length total: prodData.length
}); });
} }
@@ -198,234 +352,32 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
console.log('Products: Using last sync time:', lastSyncTime); console.log('Products: Using last sync time:', lastSyncTime);
// Setup temporary tables // Setup temporary tables
await setupTemporaryTables(localConnection); await setupAndCleanupTempTables(localConnection, 'setup');
// Materialize calculations // Materialize calculations - this will populate temp_products
await materializeCalculations(prodConnection, localConnection); await materializeCalculations(prodConnection, localConnection, incrementalUpdate, lastSyncTime);
// Optimized count query for changes since last sync
const [countResult] = await prodConnection.query(`
SELECT COUNT(*) as total
FROM products p
LEFT JOIN current_inventory ci ON p.pid = ci.pid
LEFT JOIN product_current_prices pcp ON p.pid = pcp.pid AND pcp.active = 1
LEFT JOIN supplier_item_data sid ON p.pid = sid.pid
LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid
LEFT JOIN product_last_sold pls ON p.pid = pls.pid
WHERE ${incrementalUpdate ? `
p.stamp > ? OR
ci.stamp > ? OR
pcp.date_deactive > ? OR
pcp.date_active > ? OR
sid.stamp > ? OR
pnb.date_updated > ? OR
pls.date_sold > ?
` : 'TRUE'}
`, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []);
console.log('Products: Found changes:', countResult[0].total);
const totalProducts = countResult[0].total;
// Main product query using materialized data - modified for incremental
outputProgress({
status: "running",
operation: "Products import",
message: `Fetching ${incrementalUpdate ? 'updated' : 'all'} product data from production`
});
// Create temporary table for production data
await localConnection.query(`
CREATE TEMPORARY TABLE temp_prod_data (
pid BIGINT NOT NULL,
title VARCHAR(255),
description TEXT,
SKU VARCHAR(50),
date_created TIMESTAMP NULL,
first_received TIMESTAMP NULL,
location VARCHAR(50),
barcode VARCHAR(50),
harmonized_tariff_code VARCHAR(20),
updated_at TIMESTAMP,
visible BOOLEAN,
replenishable BOOLEAN,
vendor VARCHAR(100),
vendor_reference VARCHAR(100),
notions_reference VARCHAR(100),
brand VARCHAR(100),
line VARCHAR(100),
subline VARCHAR(100),
artist VARCHAR(100),
moq INT,
rating TINYINT UNSIGNED,
reviews INT UNSIGNED,
weight DECIMAL(10,3),
length DECIMAL(10,3),
width DECIMAL(10,3),
height DECIMAL(10,3),
total_sold INT UNSIGNED,
country_of_origin VARCHAR(5),
date_last_sold DATE,
category_ids TEXT,
needs_update BOOLEAN DEFAULT TRUE,
PRIMARY KEY (pid)
) ENGINE=InnoDB
`);
// Get data from production and insert into temp table
const [prodData] = await prodConnection.query(`
SELECT
p.pid,
p.description AS title,
p.notes AS description,
p.itemnumber AS SKU,
p.date_created,
p.datein AS first_received,
p.location,
p.upc AS barcode,
p.harmonized_tariff_code,
p.stamp AS updated_at,
CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible,
CASE WHEN p.reorder >= 0 THEN 1 ELSE 0 END AS replenishable,
s.companyname AS vendor,
CASE WHEN s.companyname = 'Notions'
THEN sid.notions_itemnumber
ELSE sid.supplier_itemnumber
END AS vendor_reference,
sid.notions_itemnumber AS notions_reference,
pc1.name AS brand,
pc2.name AS line,
pc3.name AS subline,
pc4.name AS artist,
COALESCE(CASE
WHEN sid.supplier_id = 92 THEN sid.notions_qty_per_unit
ELSE sid.supplier_qty_per_unit
END, sid.notions_qty_per_unit) AS moq,
p.rating,
p.rating_votes AS reviews,
p.weight,
p.length,
p.width,
p.height,
p.totalsold AS total_sold,
p.country_of_origin,
pls.date_sold as date_last_sold,
GROUP_CONCAT(DISTINCT pci.cat_id) as category_ids
FROM products p
LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0
LEFT JOIN supplier_item_data sid ON p.pid = sid.pid
LEFT JOIN suppliers s ON sid.supplier_id = s.supplierid
LEFT JOIN product_category_index pci ON p.pid = pci.pid
LEFT JOIN product_categories pc1 ON p.company = pc1.cat_id
LEFT JOIN product_categories pc2 ON p.line = pc2.cat_id
LEFT JOIN product_categories pc3 ON p.subline = pc3.cat_id
LEFT JOIN product_categories pc4 ON p.artist = pc4.cat_id
LEFT JOIN product_last_sold pls ON p.pid = pls.pid
LEFT JOIN current_inventory ci ON p.pid = ci.pid
LEFT JOIN product_current_prices pcp ON p.pid = pcp.pid AND pcp.active = 1
LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid
WHERE ${incrementalUpdate ? `
p.stamp > ? OR
ci.stamp > ? OR
pcp.date_deactive > ? OR
pcp.date_active > ? OR
sid.stamp > ? OR
pnb.date_updated > ? OR
pls.date_sold > ?
` : 'TRUE'}
GROUP BY p.pid
`, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []);
// Insert production data in batches, but only for products that need updates
for (let i = 0; i < prodData.length; i += 1000) {
const batch = prodData.slice(i, i + 1000);
const placeholders = batch.map(() => `(${Array(31).fill("?").join(",")})`).join(",");
// Map each row to exactly match our temp table columns
const values = batch.flatMap(row => [
row.pid,
row.title,
row.description,
row.SKU,
row.date_created,
row.first_received,
row.location,
row.barcode,
row.harmonized_tariff_code,
row.updated_at,
row.visible,
row.replenishable,
row.vendor,
row.vendor_reference,
row.notions_reference,
row.brand,
row.line,
row.subline,
row.artist,
row.moq,
row.rating,
row.reviews,
row.weight,
row.length,
row.width,
row.height,
row.total_sold,
row.country_of_origin,
row.date_last_sold,
row.category_ids,
true // needs_update
]);
await localConnection.query(`
INSERT INTO temp_prod_data VALUES ${placeholders}
`, values);
outputProgress({
status: "running",
operation: "Products import",
message: `Loaded ${Math.min(i + 1000, prodData.length)} of ${prodData.length} products from production`,
current: i + batch.length,
total: prodData.length
});
}
// Now join with local temp tables and process in batches, but only for products that need updates
const BATCH_SIZE = 2500;
let processed = 0;
let recordsAdded = 0;
let recordsUpdated = 0;
// Get actual count from temp table - only count products that need updates // Get actual count from temp table - only count products that need updates
const [[{ actualTotal }]] = await localConnection.query(` const [[{ actualTotal }]] = await localConnection.query(`
SELECT COUNT(DISTINCT p.pid) as actualTotal SELECT COUNT(DISTINCT pid) as actualTotal
FROM temp_prod_data p FROM temp_products
LEFT JOIN temp_inventory_status tis ON p.pid = tis.pid WHERE needs_update = 1
LEFT JOIN temp_product_prices tpp ON p.pid = tpp.pid
WHERE p.needs_update = 1
OR tis.needs_update = 1
OR tpp.needs_update = 1
`); `);
console.log('Products: Found changes:', actualTotal);
// Process in batches
const BATCH_SIZE = 5000;
let processed = 0;
while (processed < actualTotal) { while (processed < actualTotal) {
const [batch] = await localConnection.query(` const [batch] = await localConnection.query(`
SELECT SELECT * FROM temp_products
p.*, WHERE needs_update = 1
COALESCE(tis.stock_quantity, 0) as stock_quantity,
COALESCE(tis.preorder_count, 0) as preorder_count,
COALESCE(tis.notions_inv_count, 0) as notions_inv_count,
COALESCE(tpp.price, 0) as price,
COALESCE(tpp.regular_price, 0) as regular_price,
COALESCE(tpp.cost_price, 0) as cost_price
FROM temp_prod_data p
LEFT JOIN temp_inventory_status tis ON p.pid = tis.pid
LEFT JOIN temp_product_prices tpp ON p.pid = tpp.pid
WHERE p.needs_update = 1
OR tis.needs_update = 1
OR tpp.needs_update = 1
LIMIT ? OFFSET ? LIMIT ? OFFSET ?
`, [BATCH_SIZE, processed]); `, [BATCH_SIZE, processed]);
if (!batch || batch.length === 0) break; // Exit if no more records if (!batch || batch.length === 0) break;
// Add image URLs // Add image URLs
batch.forEach(row => { batch.forEach(row => {
@@ -436,25 +388,14 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
}); });
if (batch.length > 0) { if (batch.length > 0) {
// MySQL 8.0 optimized insert with proper placeholders // Get existing products in one query
const placeholderGroup = `(${Array(columnNames.length).fill("?").join(",")})`;
// First check which products already exist and get their current values
const [existingProducts] = await localConnection.query( const [existingProducts] = await localConnection.query(
`SELECT ${columnNames.join(',')} FROM products WHERE pid IN (?)`, `SELECT ${columnNames.join(',')} FROM products WHERE pid IN (?)`,
[batch.map(p => p.pid)] [batch.map(p => p.pid)]
); );
const existingPidsMap = new Map(existingProducts.map(p => [p.pid, p])); const existingPidsMap = new Map(existingProducts.map(p => [p.pid, p]));
// Helper function to map values consistently // Split into inserts and updates
const mapValues = (product) => columnNames.map(col => {
const val = product[col] ?? null;
if (col === "managing_stock") return 1;
if (typeof val === "number") return val || 0;
return val;
});
// Split into inserts and updates, comparing values for updates
const insertsAndUpdates = batch.reduce((acc, product) => { const insertsAndUpdates = batch.reduce((acc, product) => {
if (existingPidsMap.has(product.pid)) { if (existingPidsMap.has(product.pid)) {
const existing = existingPidsMap.get(product.pid); const existing = existingPidsMap.get(product.pid);
@@ -462,119 +403,114 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
const hasChanges = columnNames.some(col => { const hasChanges = columnNames.some(col => {
const newVal = product[col] ?? null; const newVal = product[col] ?? null;
const oldVal = existing[col] ?? null; const oldVal = existing[col] ?? null;
// Special handling for numbers to avoid type coercion issues if (col === "managing_stock") return false; // Skip this as it's always 1
if (typeof newVal === 'number' && typeof oldVal === 'number') { if (typeof newVal === 'number' && typeof oldVal === 'number') {
// Handle NaN and Infinity
if (isNaN(newVal) || isNaN(oldVal)) return isNaN(newVal) !== isNaN(oldVal);
if (!isFinite(newVal) || !isFinite(oldVal)) return !isFinite(newVal) !== !isFinite(oldVal);
// Allow for tiny floating point differences
return Math.abs(newVal - oldVal) > 0.00001; return Math.abs(newVal - oldVal) > 0.00001;
} }
if (col === 'managing_stock') return false; // Skip this as it's always 1
return newVal !== oldVal; return newVal !== oldVal;
}); });
if (hasChanges) { if (hasChanges) {
acc.updates.push({ acc.updates.push(product);
pid: product.pid,
values: mapValues(product)
});
} }
} else { } else {
acc.inserts.push({ acc.inserts.push(product);
pid: product.pid,
values: mapValues(product)
});
} }
return acc; return acc;
}, { inserts: [], updates: [] }); }, { inserts: [], updates: [] });
// Log summary for this batch // Process inserts
if (insertsAndUpdates.inserts.length > 0 || insertsAndUpdates.updates.length > 0) {
console.log(`Batch summary: ${insertsAndUpdates.inserts.length} new products, ${insertsAndUpdates.updates.length} updates`);
}
// Handle inserts
if (insertsAndUpdates.inserts.length > 0) { if (insertsAndUpdates.inserts.length > 0) {
const insertPlaceholders = Array(insertsAndUpdates.inserts.length).fill(placeholderGroup).join(","); const insertValues = insertsAndUpdates.inserts.map(product =>
columnNames.map(col => {
const val = product[col] ?? null;
if (col === "managing_stock") return 1;
return val;
})
);
const insertPlaceholders = insertsAndUpdates.inserts
.map(() => `(${Array(columnNames.length).fill('?').join(',')})`)
.join(',');
const insertResult = await localConnection.query(` const insertResult = await localConnection.query(`
INSERT INTO products (${columnNames.join(",")}) INSERT INTO products (${columnNames.join(',')})
VALUES ${insertPlaceholders} VALUES ${insertPlaceholders}
`, insertsAndUpdates.inserts.map(i => i.values).flat()); `, insertValues.flat());
recordsAdded += insertResult[0].affectedRows; recordsAdded += insertResult[0].affectedRows;
} }
// Handle updates - now we know these actually have changes // Process updates
if (insertsAndUpdates.updates.length > 0) { if (insertsAndUpdates.updates.length > 0) {
const updatePlaceholders = Array(insertsAndUpdates.updates.length).fill(placeholderGroup).join(","); const updateValues = insertsAndUpdates.updates.map(product =>
columnNames.map(col => {
const val = product[col] ?? null;
if (col === "managing_stock") return 1;
return val;
})
);
const updatePlaceholders = insertsAndUpdates.updates
.map(() => `(${Array(columnNames.length).fill('?').join(',')})`)
.join(',');
const updateResult = await localConnection.query(` const updateResult = await localConnection.query(`
INSERT INTO products (${columnNames.join(",")}) INSERT INTO products (${columnNames.join(',')})
VALUES ${updatePlaceholders} VALUES ${updatePlaceholders}
ON DUPLICATE KEY UPDATE ON DUPLICATE KEY UPDATE
${columnNames ${columnNames
.filter(col => col !== "pid") .filter(col => col !== 'pid')
.map(col => `${col} = VALUES(${col})`) .map(col => `${col} = VALUES(${col})`)
.join(",")}; .join(',')};
`, insertsAndUpdates.updates.map(u => u.values).flat()); `, updateValues.flat());
recordsUpdated += insertsAndUpdates.updates.length; recordsUpdated += insertsAndUpdates.updates.length;
} }
}
// Insert category relationships // Process category relationships
const categoryRelationships = []; if (batch.some(p => p.category_ids)) {
batch.forEach(row => { const categoryRelationships = batch
if (row.category_ids) { .filter(p => p.category_ids)
const catIds = row.category_ids .flatMap(product =>
.split(",") product.category_ids
.map(id => id.trim()) .split(',')
.filter(id => id) .map(id => id.trim())
.map(Number); .filter(id => id)
.map(Number)
.filter(id => !isNaN(id))
.map(catId => [catId, product.pid])
);
catIds.forEach(catId => { if (categoryRelationships.length > 0) {
if (catId) categoryRelationships.push([row.pid, catId]); // Verify categories exist before inserting relationships
}); const uniqueCatIds = [...new Set(categoryRelationships.map(([catId]) => catId))];
} const [existingCats] = await localConnection.query(
}); "SELECT cat_id FROM categories WHERE cat_id IN (?)",
[uniqueCatIds]
);
const existingCatIds = new Set(existingCats.map(c => c.cat_id));
if (categoryRelationships.length > 0) { // Filter relationships to only include existing categories
// First verify categories exist const validRelationships = categoryRelationships.filter(([catId]) =>
const uniqueCatIds = [...new Set(categoryRelationships.map(([_, catId]) => catId))]; existingCatIds.has(catId)
const [existingCats] = await localConnection.query( );
"SELECT cat_id FROM categories WHERE cat_id IN (?)",
[uniqueCatIds]
);
const existingCatIds = new Set(existingCats.map(c => c.cat_id));
// Filter relationships to only include existing categories if (validRelationships.length > 0) {
const validRelationships = categoryRelationships.filter(([_, catId]) => const catPlaceholders = validRelationships
existingCatIds.has(catId) .map(() => "(?, ?)")
); .join(",");
await localConnection.query(
if (validRelationships.length > 0) { `INSERT IGNORE INTO product_categories (cat_id, pid)
// Delete existing relationships for these products first VALUES ${catPlaceholders}`,
await localConnection.query( validRelationships.flat()
"DELETE FROM product_categories WHERE pid IN (?)", );
[batch.map(p => p.pid)] }
); }
// Insert new relationships using INSERT IGNORE
const catPlaceholders = validRelationships
.map(() => "(?, ?)")
.join(",");
await localConnection.query(
`INSERT IGNORE INTO product_categories (pid, cat_id)
VALUES ${catPlaceholders}`,
validRelationships.flat()
);
} }
} }
processed += batch.length; // Only increment by actual records processed processed += batch.length;
outputProgress({ outputProgress({
status: "running", status: "running",
@@ -586,15 +522,10 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
remaining: estimateRemaining(startTime, processed, actualTotal), remaining: estimateRemaining(startTime, processed, actualTotal),
rate: calculateRate(startTime, processed) rate: calculateRate(startTime, processed)
}); });
// Force garbage collection between batches
if (global.gc) {
global.gc();
}
} }
// Drop temporary tables // Drop temporary tables
await cleanupTemporaryTables(localConnection); await setupAndCleanupTempTables(localConnection, 'cleanup');
// Only update sync status if we get here (no errors thrown) // Only update sync status if we get here (no errors thrown)
await localConnection.query(` await localConnection.query(`
@@ -637,7 +568,21 @@ async function importMissingProducts(prodConnection, localConnection, missingPid
p.date_created, p.date_created,
p.datein AS first_received, p.datein AS first_received,
p.location, p.location,
COALESCE(si.available_local, 0) - COALESCE( p.upc AS barcode,
p.harmonized_tariff_code,
p.stamp AS updated_at,
CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible,
CASE
WHEN p.reorder < 0 THEN 0
WHEN (
(IFNULL(pls.date_sold, '0000-00-00') = '0000-00-00' OR pls.date_sold <= DATE_SUB(CURDATE(), INTERVAL 5 YEAR))
OR (p.datein = '0000-00-00 00:00:00' OR p.datein <= DATE_SUB(NOW(), INTERVAL 5 YEAR))
OR (p.date_refill = '0000-00-00 00:00:00' OR p.date_refill <= DATE_SUB(NOW(), INTERVAL 5 YEAR))
) THEN 0
ELSE 1
END AS replenishable,
COALESCE(si.available_local, 0) as stock_quantity,
COALESCE(
(SELECT SUM(oi.qty_ordered - oi.qty_placed) (SELECT SUM(oi.qty_ordered - oi.qty_placed)
FROM order_items oi FROM order_items oi
JOIN _order o ON oi.order_id = o.order_id JOIN _order o ON oi.order_id = o.order_id
@@ -649,37 +594,19 @@ async function importMissingProducts(prodConnection, localConnection, missingPid
AND o.order_status != 15 AND o.order_status != 15
AND o.order_status < 90 AND o.order_status < 90
AND oi.qty_ordered >= oi.qty_placed AND oi.qty_ordered >= oi.qty_placed
AND oi.qty_ordered > 0), 0 AND oi.qty_ordered > 0
) as stock_quantity, ), 0
) as pending_qty,
COALESCE(ci.onpreorder, 0) as preorder_count, COALESCE(ci.onpreorder, 0) as preorder_count,
COALESCE(pnb.inventory, 0) as notions_inv_count, COALESCE(pnb.inventory, 0) as notions_inv_count,
COALESCE(pcp.price_each, 0) as price, COALESCE(pcp.price_each, 0) as price,
COALESCE(p.sellingprice, 0) AS regular_price, COALESCE(p.sellingprice, 0) AS regular_price,
COALESCE((SELECT ROUND(AVG(costeach), 5) CASE
FROM product_inventory WHEN EXISTS (SELECT 1 FROM product_inventory WHERE pid = p.pid AND count > 0)
WHERE pid = p.pid THEN (SELECT ROUND(AVG(costeach), 5) FROM product_inventory WHERE pid = p.pid AND count > 0)
AND COUNT > 0), 0) AS cost_price, ELSE (SELECT costeach FROM product_inventory WHERE pid = p.pid ORDER BY daterec DESC LIMIT 1)
END AS cost_price,
NULL AS landing_cost_price, NULL AS landing_cost_price,
p.upc AS barcode,
p.harmonized_tariff_code,
p.stamp AS updated_at,
CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible,
CASE WHEN p.reorder >= 0 THEN 1 ELSE 0 END AS replenishable,
s.companyname AS vendor,
CASE WHEN s.companyname = 'Notions' THEN sid.notions_itemnumber ELSE sid.supplier_itemnumber END AS vendor_reference,
sid.notions_itemnumber AS notions_reference,
CONCAT('https://www.acherryontop.com/shop/product/', p.pid) AS permalink,
pc1.name AS brand,
pc2.name AS line,
pc3.name AS subline,
pc4.name AS artist,
NULL AS options,
NULL AS tags,
COALESCE(CASE
WHEN sid.supplier_id = 92 THEN sid.notions_qty_per_unit
ELSE sid.supplier_qty_per_unit
END, sid.notions_qty_per_unit) AS moq,
NULL AS uom,
p.rating, p.rating,
p.rating_votes AS reviews, p.rating_votes AS reviews,
p.weight, p.weight,
@@ -746,7 +673,7 @@ async function importMissingProducts(prodConnection, localConnection, missingPid
ON DUPLICATE KEY UPDATE ${columnNames ON DUPLICATE KEY UPDATE ${columnNames
.filter((col) => col !== "pid") .filter((col) => col !== "pid")
.map((col) => `${col} = VALUES(${col})`) .map((col) => `${col} = VALUES(${col})`)
.join(",")} .join(",")};
`; `;
const result = await localConnection.query(query, productValues); const result = await localConnection.query(query, productValues);

View File

@@ -108,31 +108,67 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
NULLIF(s2.companyname, ''), NULLIF(s2.companyname, ''),
'Unknown Vendor' 'Unknown Vendor'
) as vendor, ) as vendor,
CASE WHEN p.po_id IS NOT NULL THEN DATE(p.date_ordered) END as date, CASE
CASE WHEN p.po_id IS NOT NULL THEN DATE(p.date_estin) END as expected_date, WHEN p.po_id IS NOT NULL THEN
DATE(COALESCE(
NULLIF(p.date_ordered, '0000-00-00 00:00:00'),
p.date_created
))
WHEN r.receiving_id IS NOT NULL THEN
DATE(r.date_created)
END as date,
CASE
WHEN p.date_estin = '0000-00-00' THEN NULL
WHEN p.date_estin IS NULL THEN NULL
WHEN p.date_estin NOT REGEXP '^[0-9]{4}-[0-9]{2}-[0-9]{2}$' THEN NULL
ELSE p.date_estin
END as expected_date,
COALESCE(p.status, 50) as status, COALESCE(p.status, 50) as status,
COALESCE(p.short_note, '') as notes, p.short_note as notes,
COALESCE(p.notes, '') as long_note p.notes as long_note
FROM ( FROM (
SELECT po_id FROM po SELECT po_id FROM po
USE INDEX (idx_date_created) USE INDEX (idx_date_created)
WHERE date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) WHERE date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR)
AND (date_ordered > ? ${incrementalUpdate ? `
OR date_updated > ?) AND (
date_ordered > ?
OR date_updated > ?
OR date_estin > ?
)
` : ''}
UNION UNION
SELECT DISTINCT r.receiving_id as po_id SELECT DISTINCT r.receiving_id as po_id
FROM receivings r FROM receivings r
JOIN receivings_products rp USE INDEX (received_date) ON r.receiving_id = rp.receiving_id JOIN receivings_products rp USE INDEX (received_date) ON r.receiving_id = rp.receiving_id
WHERE rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR) WHERE rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR)
AND (rp.received_date > ? ${incrementalUpdate ? `
OR rp.stamp > ?) AND (
r.date_created > ?
OR r.date_checked > ?
OR rp.stamp > ?
OR rp.received_date > ?
)
` : ''}
) ids ) ids
LEFT JOIN po p ON ids.po_id = p.po_id LEFT JOIN po p ON ids.po_id = p.po_id
LEFT JOIN suppliers s1 ON p.supplier_id = s1.supplierid LEFT JOIN suppliers s1 ON p.supplier_id = s1.supplierid
LEFT JOIN receivings r ON ids.po_id = r.receiving_id LEFT JOIN receivings r ON ids.po_id = r.receiving_id
LEFT JOIN suppliers s2 ON r.supplier_id = s2.supplierid LEFT JOIN suppliers s2 ON r.supplier_id = s2.supplierid
ORDER BY po_id ORDER BY po_id
`, [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime]); `, incrementalUpdate ? [
lastSyncTime, lastSyncTime, lastSyncTime, // PO conditions
lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime // Receiving conditions
] : []);
console.log('Sample PO dates:', poList.slice(0, 5).map(po => ({
po_id: po.po_id,
raw_date_ordered: po.raw_date_ordered,
raw_date_created: po.raw_date_created,
raw_date_estin: po.raw_date_estin,
computed_date: po.date,
expected_date: po.expected_date
})));
const totalItems = total; const totalItems = total;
let processed = 0; let processed = 0;
@@ -156,7 +192,8 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
pop.po_id, pop.po_id,
pop.pid, pop.pid,
pr.itemnumber as sku, pr.itemnumber as sku,
pop.cost_each as cost_price, pr.description as name,
pop.cost_each,
pop.qty_each as ordered pop.qty_each as ordered
FROM po_products pop FROM po_products pop
USE INDEX (PRIMARY) USE INDEX (PRIMARY)
@@ -171,7 +208,7 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
const productPids = [...new Set(productBatch.map(p => p.pid))]; const productPids = [...new Set(productBatch.map(p => p.pid))];
const batchPoIds = [...new Set(productBatch.map(p => p.po_id))]; const batchPoIds = [...new Set(productBatch.map(p => p.po_id))];
// Get receivings for this batch // Get receivings for this batch with employee names
const [receivings] = await prodConnection.query(` const [receivings] = await prodConnection.query(`
SELECT SELECT
r.po_id, r.po_id,
@@ -179,8 +216,9 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
rp.receiving_id, rp.receiving_id,
rp.qty_each, rp.qty_each,
rp.cost_each, rp.cost_each,
DATE(NULLIF(rp.received_date, '0000-00-00 00:00:00')) as received_date, COALESCE(rp.received_date, r.date_created) as received_date,
rp.received_by, rp.received_by,
CONCAT(e.firstname, ' ', e.lastname) as received_by_name,
CASE CASE
WHEN r.po_id IS NULL THEN 2 -- No PO WHEN r.po_id IS NULL THEN 2 -- No PO
WHEN r.po_id IN (?) THEN 0 -- Original PO WHEN r.po_id IN (?) THEN 0 -- Original PO
@@ -189,8 +227,9 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
FROM receivings_products rp FROM receivings_products rp
USE INDEX (received_date) USE INDEX (received_date)
LEFT JOIN receivings r ON r.receiving_id = rp.receiving_id LEFT JOIN receivings r ON r.receiving_id = rp.receiving_id
LEFT JOIN employees e ON rp.received_by = e.employeeid
WHERE rp.pid IN (?) WHERE rp.pid IN (?)
AND rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) AND rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR)
ORDER BY r.po_id, rp.pid, rp.received_date ORDER BY r.po_id, rp.pid, rp.received_date
`, [batchPoIds, productPids]); `, [batchPoIds, productPids]);
@@ -235,10 +274,6 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
); );
const validPids = new Set(existingPids.map(p => p.pid)); const validPids = new Set(existingPids.map(p => p.pid));
// Prepare values for this sub-batch
const values = [];
let batchProcessed = 0;
// First check which PO lines already exist and get their current values // First check which PO lines already exist and get their current values
const poLines = Array.from(poProductMap.values()) const poLines = Array.from(poProductMap.values())
.filter(p => validPids.has(p.pid)) .filter(p => validPids.has(p.pid))
@@ -254,6 +289,7 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
// Split into inserts and updates // Split into inserts and updates
const insertsAndUpdates = { inserts: [], updates: [] }; const insertsAndUpdates = { inserts: [], updates: [] };
let batchProcessed = 0;
for (const po of batch) { for (const po of batch) {
const poProducts = Array.from(poProductMap.values()) const poProducts = Array.from(poProductMap.values())
@@ -270,16 +306,29 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
...receivingHistory.map(r => ({ ...r, type: 'original' })), ...receivingHistory.map(r => ({ ...r, type: 'original' })),
...altReceivingHistory.map(r => ({ ...r, type: 'alternate' })), ...altReceivingHistory.map(r => ({ ...r, type: 'alternate' })),
...noPOReceivingHistory.map(r => ({ ...r, type: 'no_po' })) ...noPOReceivingHistory.map(r => ({ ...r, type: 'no_po' }))
].sort((a, b) => new Date(a.received_date) - new Date(b.received_date)); ].sort((a, b) => new Date(a.received_date || '9999-12-31') - new Date(b.received_date || '9999-12-31'));
// Split receivings into original PO and others
const originalPOReceivings = allReceivings.filter(r => r.type === 'original');
const otherReceivings = allReceivings.filter(r => r.type !== 'original');
// Track FIFO fulfillment // Track FIFO fulfillment
let remainingToFulfill = product.ordered; let remainingToFulfill = product.ordered;
const fulfillmentTracking = []; const fulfillmentTracking = [];
let totalReceived = 0; let totalReceived = 0;
let actualCost = null; // Will store the cost of the first receiving that fulfills this PO
let firstFulfillmentReceiving = null;
let lastFulfillmentReceiving = null;
for (const receiving of allReceivings) { for (const receiving of allReceivings) {
const qtyToApply = Math.min(remainingToFulfill, receiving.qty_each); const qtyToApply = Math.min(remainingToFulfill, receiving.qty_each);
if (qtyToApply > 0) { if (qtyToApply > 0) {
// If this is the first receiving being applied, use its cost
if (actualCost === null) {
actualCost = receiving.cost_each;
firstFulfillmentReceiving = receiving;
}
lastFulfillmentReceiving = receiving;
fulfillmentTracking.push({ fulfillmentTracking.push({
receiving_id: receiving.receiving_id, receiving_id: receiving.receiving_id,
qty_applied: qtyToApply, qty_applied: qtyToApply,
@@ -287,6 +336,7 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
cost: receiving.cost_each, cost: receiving.cost_each,
date: receiving.received_date, date: receiving.received_date,
received_by: receiving.received_by, received_by: receiving.received_by,
received_by_name: receiving.received_by_name || 'Unknown',
type: receiving.type, type: receiving.type,
remaining_qty: receiving.qty_each - qtyToApply remaining_qty: receiving.qty_each - qtyToApply
}); });
@@ -300,6 +350,7 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
cost: receiving.cost_each, cost: receiving.cost_each,
date: receiving.received_date, date: receiving.received_date,
received_by: receiving.received_by, received_by: receiving.received_by,
received_by_name: receiving.received_by_name || 'Unknown',
type: receiving.type, type: receiving.type,
is_excess: true is_excess: true
}); });
@@ -311,18 +362,31 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
remainingToFulfill > 0 ? 30 : // partial remainingToFulfill > 0 ? 30 : // partial
40; // full 40; // full
const firstReceiving = allReceivings[0] || {}; function formatDate(dateStr) {
const lastReceiving = allReceivings[allReceivings.length - 1] || {}; if (!dateStr) return null;
if (dateStr === '0000-00-00' || dateStr === '0000-00-00 00:00:00') return null;
if (typeof dateStr === 'string' && !dateStr.match(/^\d{4}-\d{2}-\d{2}/)) return null;
try {
const date = new Date(dateStr);
if (isNaN(date.getTime())) return null;
if (date.getFullYear() < 1900 || date.getFullYear() > 2100) return null;
return date.toISOString().split('T')[0];
} catch (e) {
return null;
}
}
const rowValues = columnNames.map(col => { const rowValues = columnNames.map(col => {
switch (col) { switch (col) {
case 'po_id': return po.po_id; case 'po_id': return po.po_id;
case 'vendor': return po.vendor; case 'vendor': return po.vendor;
case 'date': return po.date; case 'date': return formatDate(po.date);
case 'expected_date': return po.expected_date; case 'expected_date': return formatDate(po.expected_date);
case 'pid': return product.pid; case 'pid': return product.pid;
case 'sku': return product.sku; case 'sku': return product.sku;
case 'cost_price': return product.cost_price; case 'name': return product.name;
case 'cost_price': return actualCost || product.cost_each;
case 'po_cost_price': return product.cost_each;
case 'status': return po.status; case 'status': return po.status;
case 'notes': return po.notes; case 'notes': return po.notes;
case 'long_note': return po.long_note; case 'long_note': return po.long_note;
@@ -330,16 +394,18 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
case 'received': return totalReceived; case 'received': return totalReceived;
case 'unfulfilled': return remainingToFulfill; case 'unfulfilled': return remainingToFulfill;
case 'excess_received': return Math.max(0, totalReceived - product.ordered); case 'excess_received': return Math.max(0, totalReceived - product.ordered);
case 'received_date': return firstReceiving.received_date || null; case 'received_date': return formatDate(firstFulfillmentReceiving?.received_date);
case 'last_received_date': return lastReceiving.received_date || null; case 'last_received_date': return formatDate(lastFulfillmentReceiving?.received_date);
case 'received_by': return firstReceiving.received_by || null; case 'received_by': return firstFulfillmentReceiving?.received_by_name || null;
case 'receiving_status': return receiving_status; case 'receiving_status': return receiving_status;
case 'receiving_history': return JSON.stringify({ case 'receiving_history': return JSON.stringify({
fulfillment: fulfillmentTracking, fulfillment: fulfillmentTracking,
ordered_qty: product.ordered, ordered_qty: product.ordered,
total_received: totalReceived, total_received: totalReceived,
remaining_unfulfilled: remainingToFulfill, remaining_unfulfilled: remainingToFulfill,
excess_received: Math.max(0, totalReceived - product.ordered) excess_received: Math.max(0, totalReceived - product.ordered),
po_cost: product.cost_each,
actual_cost: actualCost || product.cost_each
}); });
default: return null; default: return null;
} }
@@ -393,7 +459,15 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
VALUES ${insertPlaceholders} VALUES ${insertPlaceholders}
`, insertsAndUpdates.inserts.map(i => i.values).flat()); `, insertsAndUpdates.inserts.map(i => i.values).flat());
recordsAdded += insertResult[0].affectedRows; const affectedRows = insertResult[0].affectedRows;
// For an upsert, MySQL counts rows twice for updates
// So if affectedRows is odd, we have (updates * 2 + inserts)
const updates = Math.floor(affectedRows / 2);
const inserts = affectedRows - (updates * 2);
recordsAdded += inserts;
recordsUpdated += Math.floor(updates); // Ensure we never have fractional updates
processed += batchProcessed;
} }
// Handle updates - now we know these actually have changes // Handle updates - now we know these actually have changes
@@ -411,10 +485,15 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
.join(",")}; .join(",")};
`, insertsAndUpdates.updates.map(u => u.values).flat()); `, insertsAndUpdates.updates.map(u => u.values).flat());
recordsUpdated += updateResult[0].affectedRows / 2; // Each update counts as 2 in affectedRows const affectedRows = updateResult[0].affectedRows;
} // For an upsert, MySQL counts rows twice for updates
// So if affectedRows is odd, we have (updates * 2 + inserts)
const updates = Math.floor(affectedRows / 2);
const inserts = affectedRows - (updates * 2);
processed += batchProcessed; recordsUpdated += Math.floor(updates); // Ensure we never have fractional updates
processed += batchProcessed;
}
// Update progress based on time interval // Update progress based on time interval
const now = Date.now(); const now = Date.now();