Import/calculations improvements
This commit is contained in:
@@ -13,10 +13,14 @@ async function importCategories(prodConnection, localConnection) {
|
||||
let skippedCategories = [];
|
||||
|
||||
try {
|
||||
// Start a single transaction for the entire import
|
||||
await localConnection.query('BEGIN');
|
||||
|
||||
// Temporarily disable the trigger that's causing problems
|
||||
// Start a single transaction for the entire import.
|
||||
// Must use the wrapper's beginTransaction() (dedicated client) — query('BEGIN')
|
||||
// checks out a client per call, so BEGIN/work/COMMIT would not be guaranteed
|
||||
// to share a connection.
|
||||
await localConnection.beginTransaction();
|
||||
|
||||
// Temporarily disable the trigger that's causing problems.
|
||||
// ALTER TABLE ... DISABLE TRIGGER is transactional: a rollback restores it.
|
||||
await localConnection.query('ALTER TABLE categories DISABLE TRIGGER update_categories_updated_at');
|
||||
|
||||
// Process each type in order with its own savepoint
|
||||
@@ -148,8 +152,11 @@ async function importCategories(prodConnection, localConnection) {
|
||||
}
|
||||
}
|
||||
|
||||
// Re-enable the trigger INSIDE the transaction so disable/enable are atomic
|
||||
await localConnection.query('ALTER TABLE categories ENABLE TRIGGER update_categories_updated_at');
|
||||
|
||||
// Commit the entire transaction - we'll do this even if we have skipped categories
|
||||
await localConnection.query('COMMIT');
|
||||
await localConnection.commit();
|
||||
|
||||
// Update sync status
|
||||
await localConnection.query(`
|
||||
@@ -158,9 +165,6 @@ async function importCategories(prodConnection, localConnection) {
|
||||
ON CONFLICT (table_name) DO UPDATE SET
|
||||
last_sync_timestamp = NOW()
|
||||
`);
|
||||
|
||||
// Re-enable the trigger
|
||||
await localConnection.query('ALTER TABLE categories ENABLE TRIGGER update_categories_updated_at');
|
||||
|
||||
outputProgress({
|
||||
status: "complete",
|
||||
@@ -187,12 +191,10 @@ async function importCategories(prodConnection, localConnection) {
|
||||
} catch (error) {
|
||||
console.error("Error importing categories:", error);
|
||||
|
||||
// Only rollback if we haven't committed yet
|
||||
// Only rollback if we haven't committed yet. The rollback also restores the
|
||||
// trigger state (DISABLE TRIGGER was inside the transaction).
|
||||
try {
|
||||
await localConnection.query('ROLLBACK');
|
||||
|
||||
// Make sure we re-enable the trigger even if there was an error
|
||||
await localConnection.query('ALTER TABLE categories ENABLE TRIGGER update_categories_updated_at');
|
||||
await localConnection.rollback();
|
||||
} catch (rollbackError) {
|
||||
console.error("Error during rollback:", rollbackError);
|
||||
}
|
||||
|
||||
@@ -24,7 +24,8 @@ async function importDailyDeals(prodConnection, localConnection) {
|
||||
const startTime = Date.now();
|
||||
|
||||
try {
|
||||
await localConnection.query('BEGIN');
|
||||
// Wrapper's beginTransaction() pins a dedicated client; query('BEGIN') would not.
|
||||
await localConnection.beginTransaction();
|
||||
|
||||
// Fetch recent daily deals from production (MySQL 5.7, no CTEs)
|
||||
// Join product_current_prices to get the actual deal price
|
||||
@@ -127,7 +128,7 @@ async function importDailyDeals(prodConnection, localConnection) {
|
||||
last_sync_timestamp = NOW()
|
||||
`);
|
||||
|
||||
await localConnection.query('COMMIT');
|
||||
await localConnection.commit();
|
||||
|
||||
outputProgress({
|
||||
status: "complete",
|
||||
@@ -149,7 +150,7 @@ async function importDailyDeals(prodConnection, localConnection) {
|
||||
console.error("Error importing daily deals:", error);
|
||||
|
||||
try {
|
||||
await localConnection.query('ROLLBACK');
|
||||
await localConnection.rollback();
|
||||
} catch (rollbackError) {
|
||||
console.error("Error during rollback:", rollbackError);
|
||||
}
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics-new/utils/progress');
|
||||
const { importMissingProducts, setupTemporaryTables, cleanupTemporaryTables, materializeCalculations } = require('./products');
|
||||
|
||||
/**
|
||||
* Imports orders from a production MySQL database to a local PostgreSQL database.
|
||||
@@ -28,6 +27,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
|
||||
22: 'placed_incomplete',
|
||||
30: 'canceled',
|
||||
40: 'awaiting_payment',
|
||||
45: 'payment_pending',
|
||||
50: 'awaiting_products',
|
||||
55: 'shipping_later',
|
||||
56: 'shipping_together',
|
||||
@@ -35,6 +35,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
|
||||
61: 'flagged',
|
||||
62: 'fix_before_pick',
|
||||
65: 'manual_picking',
|
||||
67: 'remote_send',
|
||||
70: 'in_pt',
|
||||
80: 'picked',
|
||||
90: 'awaiting_shipment',
|
||||
@@ -65,6 +66,12 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
|
||||
|
||||
console.log('Orders: Using last sync time:', lastSyncTime, '(adjusted:', mysqlSyncTime, ')');
|
||||
|
||||
// Capture the next watermark from MySQL's own clock BEFORE querying any data.
|
||||
// Rows modified while the import runs stay above this watermark for the next
|
||||
// incremental run (overlap re-imports are harmless upserts); writing NOW()
|
||||
// after the import finishes would permanently skip them.
|
||||
const [[{ source_now: sourceNow }]] = await prodConnection.query('SELECT NOW() as source_now');
|
||||
|
||||
// First get count of order items - Keep MySQL compatible for production
|
||||
const [[{ total }]] = await prodConnection.query(`
|
||||
SELECT COUNT(*) as total
|
||||
@@ -100,7 +107,6 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
|
||||
COALESCE(NULLIF(TRIM(oi.prod_itemnumber), ''), 'NO-SKU') as SKU,
|
||||
oi.prod_price as price,
|
||||
oi.qty_ordered as quantity,
|
||||
COALESCE(oi.prod_price_reg - oi.prod_price, 0) as base_discount,
|
||||
oi.stamp as last_modified
|
||||
FROM order_items oi
|
||||
JOIN _order o ON oi.order_id = o.order_id
|
||||
@@ -131,10 +137,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
|
||||
await localConnection.query(`
|
||||
DROP TABLE IF EXISTS temp_order_items;
|
||||
DROP TABLE IF EXISTS temp_order_meta;
|
||||
DROP TABLE IF EXISTS temp_order_discounts;
|
||||
DROP TABLE IF EXISTS temp_order_taxes;
|
||||
DROP TABLE IF EXISTS temp_order_costs;
|
||||
DROP TABLE IF EXISTS temp_main_discounts;
|
||||
DROP TABLE IF EXISTS temp_item_discounts;
|
||||
|
||||
CREATE TEMP TABLE temp_order_items (
|
||||
@@ -143,7 +147,6 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
|
||||
sku TEXT NOT NULL,
|
||||
price NUMERIC(14, 4) NOT NULL,
|
||||
quantity INTEGER NOT NULL,
|
||||
base_discount NUMERIC(14, 4) DEFAULT 0,
|
||||
PRIMARY KEY (order_id, pid)
|
||||
);
|
||||
|
||||
@@ -160,20 +163,6 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
|
||||
PRIMARY KEY (order_id)
|
||||
);
|
||||
|
||||
CREATE TEMP TABLE temp_order_discounts (
|
||||
order_id INTEGER NOT NULL,
|
||||
pid INTEGER NOT NULL,
|
||||
discount NUMERIC(14, 4) NOT NULL,
|
||||
PRIMARY KEY (order_id, pid)
|
||||
);
|
||||
|
||||
CREATE TEMP TABLE temp_main_discounts (
|
||||
order_id INTEGER NOT NULL,
|
||||
discount_id INTEGER NOT NULL,
|
||||
discount_amount_subtotal NUMERIC(14, 4) DEFAULT 0.0000,
|
||||
PRIMARY KEY (order_id, discount_id)
|
||||
);
|
||||
|
||||
CREATE TEMP TABLE temp_item_discounts (
|
||||
order_id INTEGER NOT NULL,
|
||||
pid INTEGER NOT NULL,
|
||||
@@ -198,10 +187,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
|
||||
|
||||
CREATE INDEX idx_temp_order_items_pid ON temp_order_items(pid);
|
||||
CREATE INDEX idx_temp_order_meta_order_id ON temp_order_meta(order_id);
|
||||
CREATE INDEX idx_temp_order_discounts_order_pid ON temp_order_discounts(order_id, pid);
|
||||
CREATE INDEX idx_temp_order_taxes_order_pid ON temp_order_taxes(order_id, pid);
|
||||
CREATE INDEX idx_temp_order_costs_order_pid ON temp_order_costs(order_id, pid);
|
||||
CREATE INDEX idx_temp_main_discounts_discount_id ON temp_main_discounts(discount_id);
|
||||
CREATE INDEX idx_temp_item_discounts_order_pid ON temp_item_discounts(order_id, pid);
|
||||
CREATE INDEX idx_temp_item_discounts_discount_id ON temp_item_discounts(discount_id);
|
||||
`);
|
||||
@@ -216,21 +203,20 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
|
||||
await localConnection.beginTransaction();
|
||||
try {
|
||||
const batch = orderItems.slice(i, Math.min(i + 5000, orderItems.length));
|
||||
const placeholders = batch.map((_, idx) =>
|
||||
`($${idx * 6 + 1}, $${idx * 6 + 2}, $${idx * 6 + 3}, $${idx * 6 + 4}, $${idx * 6 + 5}, $${idx * 6 + 6})`
|
||||
const placeholders = batch.map((_, idx) =>
|
||||
`($${idx * 5 + 1}, $${idx * 5 + 2}, $${idx * 5 + 3}, $${idx * 5 + 4}, $${idx * 5 + 5})`
|
||||
).join(",");
|
||||
const values = batch.flatMap(item => [
|
||||
item.order_id, item.prod_pid, item.SKU, item.price, item.quantity, item.base_discount
|
||||
item.order_id, item.prod_pid, item.SKU, item.price, item.quantity
|
||||
]);
|
||||
|
||||
await localConnection.query(`
|
||||
INSERT INTO temp_order_items (order_id, pid, sku, price, quantity, base_discount)
|
||||
INSERT INTO temp_order_items (order_id, pid, sku, price, quantity)
|
||||
VALUES ${placeholders}
|
||||
ON CONFLICT (order_id, pid) DO UPDATE SET
|
||||
sku = EXCLUDED.sku,
|
||||
price = EXCLUDED.price,
|
||||
quantity = EXCLUDED.quantity,
|
||||
base_discount = EXCLUDED.base_discount
|
||||
quantity = EXCLUDED.quantity
|
||||
`, values);
|
||||
|
||||
await localConnection.commit();
|
||||
@@ -337,49 +323,15 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
|
||||
};
|
||||
|
||||
const processDiscountsBatch = async (batchIds) => {
|
||||
// First, load main discount records
|
||||
const [mainDiscounts] = await prodConnection.query(`
|
||||
SELECT order_id, discount_id, discount_amount_subtotal
|
||||
FROM order_discounts
|
||||
WHERE order_id IN (?)
|
||||
`, [batchIds]);
|
||||
|
||||
if (mainDiscounts.length > 0) {
|
||||
await localConnection.beginTransaction();
|
||||
try {
|
||||
for (let j = 0; j < mainDiscounts.length; j += PG_BATCH_SIZE) {
|
||||
const subBatch = mainDiscounts.slice(j, j + PG_BATCH_SIZE);
|
||||
if (subBatch.length === 0) continue;
|
||||
|
||||
const placeholders = subBatch.map((_, idx) =>
|
||||
`($${idx * 3 + 1}, $${idx * 3 + 2}, $${idx * 3 + 3})`
|
||||
).join(",");
|
||||
|
||||
const values = subBatch.flatMap(d => [
|
||||
d.order_id,
|
||||
d.discount_id,
|
||||
d.discount_amount_subtotal || 0
|
||||
]);
|
||||
|
||||
await localConnection.query(`
|
||||
INSERT INTO temp_main_discounts (order_id, discount_id, discount_amount_subtotal)
|
||||
VALUES ${placeholders}
|
||||
ON CONFLICT (order_id, discount_id) DO UPDATE SET
|
||||
discount_amount_subtotal = EXCLUDED.discount_amount_subtotal
|
||||
`, values);
|
||||
}
|
||||
await localConnection.commit();
|
||||
} catch (error) {
|
||||
await localConnection.rollback();
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
// Then, load item discount records
|
||||
// Load item-level discount records. Only which = 2 rows are real per-item
|
||||
// discount amounts; which = 1 rows store the price of free promo-added
|
||||
// items and which = 3 rows are usage records (neither is a discount).
|
||||
// These amounts are NOT included in summary_discount_subtotal, so they
|
||||
// must be added on top of the prorated subtotal discount unconditionally.
|
||||
const [discounts] = await prodConnection.query(`
|
||||
SELECT order_id, pid, discount_id, amount
|
||||
FROM order_discount_items
|
||||
WHERE order_id IN (?)
|
||||
WHERE order_id IN (?) AND which = 2
|
||||
`, [batchIds]);
|
||||
|
||||
if (discounts.length === 0) return;
|
||||
@@ -418,16 +370,6 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
|
||||
`, values);
|
||||
}
|
||||
|
||||
// Create aggregated view with a simpler, safer query that avoids duplicates
|
||||
await localConnection.query(`
|
||||
TRUNCATE temp_order_discounts;
|
||||
|
||||
INSERT INTO temp_order_discounts (order_id, pid, discount)
|
||||
SELECT order_id, pid, SUM(amount) as discount
|
||||
FROM temp_item_discounts
|
||||
GROUP BY order_id, pid
|
||||
`);
|
||||
|
||||
await localConnection.commit();
|
||||
} catch (error) {
|
||||
await localConnection.rollback();
|
||||
@@ -603,42 +545,54 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
|
||||
try {
|
||||
const [orders] = await localConnection.query(`
|
||||
WITH order_totals AS (
|
||||
SELECT
|
||||
SELECT
|
||||
oi.order_id,
|
||||
oi.pid,
|
||||
-- Instead of using ARRAY_AGG which can cause duplicate issues, use SUM with a CASE
|
||||
SUM(CASE
|
||||
WHEN COALESCE(md.discount_amount_subtotal, 0) > 0 THEN id.amount
|
||||
ELSE 0
|
||||
END) as promo_discount_sum,
|
||||
-- Item-level promo discounts (which = 2 rows). These live outside
|
||||
-- summary_discount_subtotal, so they are summed unconditionally.
|
||||
SUM(COALESCE(id.amount, 0)) as promo_discount_sum,
|
||||
COALESCE(ot.tax, 0) as total_tax,
|
||||
COALESCE(oc.costeach, pc.cost_price, oi.price * 0.5) as costeach
|
||||
FROM temp_order_items oi
|
||||
LEFT JOIN temp_item_discounts id ON oi.order_id = id.order_id AND oi.pid = id.pid
|
||||
LEFT JOIN temp_main_discounts md ON id.order_id = md.order_id AND id.discount_id = md.discount_id
|
||||
LEFT JOIN temp_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid
|
||||
LEFT JOIN temp_order_costs oc ON oi.order_id = oc.order_id AND oi.pid = oc.pid
|
||||
LEFT JOIN temp_product_costs pc ON oi.pid = pc.pid
|
||||
WHERE oi.order_id = ANY($1)
|
||||
GROUP BY oi.order_id, oi.pid, ot.tax, oc.costeach, pc.cost_price
|
||||
)
|
||||
SELECT
|
||||
SELECT
|
||||
oi.order_id as order_number,
|
||||
oi.pid::bigint as pid,
|
||||
oi.sku,
|
||||
om.date,
|
||||
oi.price,
|
||||
oi.quantity,
|
||||
-- Discount = prorated order-level subtotal discount + item-level promo
|
||||
-- discounts, clamped so a sale line can never be discounted below free.
|
||||
(
|
||||
-- Prorated Points Discount (e.g. loyalty points applied at order level)
|
||||
CASE
|
||||
WHEN om.summary_discount_subtotal > 0 AND om.summary_subtotal > 0 THEN
|
||||
COALESCE(ROUND((om.summary_discount_subtotal * (oi.price * oi.quantity)) / NULLIF(om.summary_subtotal, 0), 4), 0)
|
||||
ELSE 0
|
||||
CASE WHEN oi.quantity > 0 THEN
|
||||
LEAST(
|
||||
(
|
||||
CASE
|
||||
WHEN om.summary_discount_subtotal > 0 AND om.summary_subtotal > 0 THEN
|
||||
COALESCE(ROUND((om.summary_discount_subtotal * (oi.price * oi.quantity)) / NULLIF(om.summary_subtotal, 0), 4), 0)
|
||||
ELSE 0
|
||||
END
|
||||
+ COALESCE(ot.promo_discount_sum, 0)
|
||||
),
|
||||
oi.price * oi.quantity
|
||||
)
|
||||
ELSE
|
||||
(
|
||||
CASE
|
||||
WHEN om.summary_discount_subtotal > 0 AND om.summary_subtotal > 0 THEN
|
||||
COALESCE(ROUND((om.summary_discount_subtotal * (oi.price * oi.quantity)) / NULLIF(om.summary_subtotal, 0), 4), 0)
|
||||
ELSE 0
|
||||
END
|
||||
+ COALESCE(ot.promo_discount_sum, 0)
|
||||
)
|
||||
END
|
||||
+
|
||||
-- Specific Item-Level Promo Discount (coupon codes, etc.)
|
||||
COALESCE(ot.promo_discount_sum, 0)
|
||||
)::NUMERIC(14, 4) as discount,
|
||||
COALESCE(ot.total_tax, 0)::NUMERIC(14, 4) as tax,
|
||||
false as tax_included,
|
||||
@@ -765,34 +719,83 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
|
||||
}
|
||||
}
|
||||
|
||||
// Start a transaction for updating sync status and dropping temp tables
|
||||
// Reconciliation 2 prep: fetch canceled (15) / combined (16) orders from MySQL
|
||||
// WITHOUT a date_placed filter — combine_orders zeroes date_placed on the source
|
||||
// orders, so the main item query can never re-fetch them. Done before opening
|
||||
// the PG transaction so we don't hold it across a MySQL round-trip.
|
||||
const [statusSweepRows] = await prodConnection.query(`
|
||||
SELECT order_id, order_status
|
||||
FROM _order
|
||||
WHERE order_status IN (15, 16)
|
||||
${incrementalUpdate ? 'AND stamp > ?' : ''}
|
||||
`, incrementalUpdate ? [mysqlSyncTime] : []);
|
||||
|
||||
let staleItemsDeleted = 0;
|
||||
let sweepUpdated = 0;
|
||||
|
||||
// Final transaction: reconcile deletions, sweep statuses, update sync status, drop temps
|
||||
await localConnection.beginTransaction();
|
||||
try {
|
||||
// Update sync status
|
||||
// Reconciliation 1: delete PG item rows that no longer exist in MySQL for the
|
||||
// orders fetched this run. temp_order_items holds the complete current item
|
||||
// set of every fetched order (staff edits and unpicked promo items DELETE
|
||||
// order_items rows in MySQL, which an upsert-only import never removes).
|
||||
const [reconcileResult] = await localConnection.query(`
|
||||
DELETE FROM orders o
|
||||
USING (SELECT DISTINCT order_id FROM temp_order_items) fetched
|
||||
WHERE o.order_number = fetched.order_id::text -- orders.order_number is TEXT
|
||||
AND NOT EXISTS (
|
||||
SELECT 1 FROM temp_order_items t
|
||||
WHERE t.order_id = fetched.order_id AND t.pid = o.pid
|
||||
)
|
||||
`);
|
||||
staleItemsDeleted = reconcileResult.rowCount || 0;
|
||||
|
||||
// Reconciliation 2: mark canceled/combined orders. 'combined' source orders were
|
||||
// merged into a new order that carries the same items — counting both would
|
||||
// double-count, so they also get canceled = true (routes filter on canceled).
|
||||
for (const [code, statusText] of [[15, 'canceled'], [16, 'combined']]) {
|
||||
const ids = statusSweepRows.filter(r => r.order_status === code).map(r => r.order_id);
|
||||
for (let i = 0; i < ids.length; i += 5000) {
|
||||
const chunk = ids.slice(i, i + 5000);
|
||||
const [sweepResult] = await localConnection.query(`
|
||||
UPDATE orders
|
||||
SET status = $1, canceled = true
|
||||
WHERE order_number = ANY($2::text[])
|
||||
AND (status IS DISTINCT FROM $1 OR canceled IS DISTINCT FROM true)
|
||||
`, [statusText, chunk.map(String)]);
|
||||
sweepUpdated += sweepResult.rowCount || 0;
|
||||
}
|
||||
}
|
||||
|
||||
// Update sync status with the watermark captured from MySQL BEFORE the
|
||||
// source queries ran (see sourceNow above).
|
||||
await localConnection.query(`
|
||||
INSERT INTO sync_status (table_name, last_sync_timestamp)
|
||||
VALUES ('orders', NOW())
|
||||
VALUES ('orders', $1)
|
||||
ON CONFLICT (table_name) DO UPDATE SET
|
||||
last_sync_timestamp = NOW()
|
||||
`);
|
||||
|
||||
last_sync_timestamp = $1
|
||||
`, [sourceNow]);
|
||||
|
||||
// Cleanup temporary tables
|
||||
await localConnection.query(`
|
||||
DROP TABLE IF EXISTS temp_order_items;
|
||||
DROP TABLE IF EXISTS temp_order_meta;
|
||||
DROP TABLE IF EXISTS temp_order_discounts;
|
||||
DROP TABLE IF EXISTS temp_order_taxes;
|
||||
DROP TABLE IF EXISTS temp_order_costs;
|
||||
DROP TABLE IF EXISTS temp_main_discounts;
|
||||
DROP TABLE IF EXISTS temp_item_discounts;
|
||||
DROP TABLE IF EXISTS temp_product_costs;
|
||||
`);
|
||||
|
||||
|
||||
// Commit final transaction
|
||||
await localConnection.commit();
|
||||
} catch (error) {
|
||||
await localConnection.rollback();
|
||||
throw error;
|
||||
throw error;
|
||||
}
|
||||
|
||||
if (staleItemsDeleted > 0 || sweepUpdated > 0) {
|
||||
console.log(`Orders: reconciliation removed ${staleItemsDeleted} stale item rows, swept ${sweepUpdated} canceled/combined rows`);
|
||||
}
|
||||
|
||||
return {
|
||||
@@ -800,6 +803,8 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
|
||||
totalImported: Math.floor(importedCount) || 0,
|
||||
recordsAdded: parseInt(recordsAdded) || 0,
|
||||
recordsUpdated: parseInt(recordsUpdated) || 0,
|
||||
recordsDeleted: staleItemsDeleted,
|
||||
statusSweepUpdated: sweepUpdated,
|
||||
totalSkipped: skippedOrders.size || 0,
|
||||
missingProducts: missingProducts.size || 0,
|
||||
totalProcessed: orderItems.length, // Total order items in source
|
||||
|
||||
@@ -622,6 +622,7 @@ async function materializeCalculations(prodConnection, localConnection, incremen
|
||||
AND t.total_sold IS NOT DISTINCT FROM p.total_sold
|
||||
AND t.date_online IS NOT DISTINCT FROM p.date_online
|
||||
AND t.shop_score IS NOT DISTINCT FROM p.shop_score
|
||||
AND t.categories IS NOT DISTINCT FROM p.categories
|
||||
`);
|
||||
|
||||
// Get count of products that need updating
|
||||
@@ -662,6 +663,11 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
|
||||
}
|
||||
}
|
||||
|
||||
// Capture the next watermark from MySQL's own clock BEFORE querying any data.
|
||||
// Rows modified while the import runs stay above this watermark for the next
|
||||
// incremental run (overlap re-imports are harmless upserts).
|
||||
const [[{ source_now: sourceNow }]] = await prodConnection.query('SELECT NOW() as source_now');
|
||||
|
||||
// Start a transaction to ensure temporary tables persist
|
||||
await localConnection.beginTransaction();
|
||||
|
||||
@@ -927,16 +933,22 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
|
||||
// legacy PHP backend will stamp onto the PO line item.
|
||||
await syncSupplierCosts(prodConnection, localConnection);
|
||||
|
||||
// Sync category assignments for ALL products. product_category_index has no
|
||||
// stamp column, so category-only changes never bump any of the incremental
|
||||
// WHERE timestamps — without this pass PG categories go permanently stale.
|
||||
await syncProductCategories(prodConnection, localConnection);
|
||||
|
||||
// Commit the transaction
|
||||
await localConnection.commit();
|
||||
|
||||
// Update sync status
|
||||
// Update sync status with the watermark captured from MySQL BEFORE the
|
||||
// source queries ran (see sourceNow above).
|
||||
await localConnection.query(`
|
||||
INSERT INTO sync_status (table_name, last_sync_timestamp)
|
||||
VALUES ('products', NOW())
|
||||
VALUES ('products', $1)
|
||||
ON CONFLICT (table_name) DO UPDATE SET
|
||||
last_sync_timestamp = NOW()
|
||||
`);
|
||||
last_sync_timestamp = $1
|
||||
`, [sourceNow]);
|
||||
|
||||
return {
|
||||
status: 'complete',
|
||||
@@ -1028,11 +1040,126 @@ async function syncSupplierCosts(prodConnection, localConnection) {
|
||||
return { updated };
|
||||
}
|
||||
|
||||
// Full category-assignment sweep. The incremental product import keys on
|
||||
// p.stamp / ci.stamp / price / b2b dates — none of which change when a product
|
||||
// is recategorized in product_category_index (the table has no stamp column).
|
||||
// This pass compares the canonical GROUP_CONCAT representation against
|
||||
// products.categories and rewrites product_categories only for changed pids.
|
||||
// Must run inside the caller's transaction (uses ON COMMIT DROP temp table).
|
||||
async function syncProductCategories(prodConnection, localConnection) {
|
||||
outputProgress({
|
||||
status: "running",
|
||||
operation: "Products import",
|
||||
message: "Syncing category assignments"
|
||||
});
|
||||
|
||||
// Same expression as the main import query so representations compare equal
|
||||
// (GROUP_CONCAT(DISTINCT int) returns values numerically sorted).
|
||||
const [rows] = await prodConnection.query(`
|
||||
SELECT
|
||||
p.pid,
|
||||
GROUP_CONCAT(DISTINCT CASE
|
||||
WHEN pc.cat_id IS NOT NULL
|
||||
AND pc.type IN (10, 20, 11, 21, 12, 13)
|
||||
AND pci.cat_id NOT IN (16, 17)
|
||||
THEN pci.cat_id
|
||||
END) as category_ids
|
||||
FROM products p
|
||||
LEFT JOIN product_category_index pci ON p.pid = pci.pid
|
||||
LEFT JOIN product_categories pc ON pci.cat_id = pc.cat_id
|
||||
GROUP BY p.pid
|
||||
`);
|
||||
|
||||
if (!rows || rows.length === 0) {
|
||||
return { updated: 0 };
|
||||
}
|
||||
|
||||
await localConnection.query(`
|
||||
CREATE TEMP TABLE temp_category_sync (
|
||||
pid BIGINT PRIMARY KEY,
|
||||
categories TEXT
|
||||
) ON COMMIT DROP
|
||||
`);
|
||||
|
||||
const CHUNK = 5000;
|
||||
for (let i = 0; i < rows.length; i += CHUNK) {
|
||||
const batch = rows.slice(i, i + CHUNK);
|
||||
const pids = batch.map(r => r.pid);
|
||||
const cats = batch.map(r => r.category_ids);
|
||||
await localConnection.query(
|
||||
`INSERT INTO temp_category_sync (pid, categories)
|
||||
SELECT * FROM UNNEST($1::bigint[], $2::text[])
|
||||
ON CONFLICT (pid) DO NOTHING`,
|
||||
[pids, cats]
|
||||
);
|
||||
}
|
||||
|
||||
// Which existing products actually changed?
|
||||
const [changed] = await localConnection.query(`
|
||||
SELECT t.pid, t.categories
|
||||
FROM temp_category_sync t
|
||||
JOIN products p ON p.pid = t.pid
|
||||
WHERE t.categories IS DISTINCT FROM p.categories
|
||||
`);
|
||||
|
||||
if (changed.rows.length === 0) {
|
||||
return { updated: 0 };
|
||||
}
|
||||
|
||||
await localConnection.query(`
|
||||
UPDATE products p
|
||||
SET categories = t.categories
|
||||
FROM temp_category_sync t
|
||||
WHERE p.pid = t.pid
|
||||
AND t.categories IS DISTINCT FROM p.categories
|
||||
`);
|
||||
|
||||
// Rewrite the relationship rows for changed products only
|
||||
const REL_CHUNK = 1000;
|
||||
for (let i = 0; i < changed.rows.length; i += REL_CHUNK) {
|
||||
const batch = changed.rows.slice(i, i + REL_CHUNK);
|
||||
const pids = batch.map(r => r.pid);
|
||||
|
||||
await localConnection.query(
|
||||
'DELETE FROM product_categories WHERE pid = ANY($1)',
|
||||
[pids]
|
||||
);
|
||||
|
||||
const relPids = [];
|
||||
const relCats = [];
|
||||
for (const row of batch) {
|
||||
if (!row.categories) continue;
|
||||
for (const catId of row.categories.split(',')) {
|
||||
if (catId && catId.trim()) {
|
||||
relPids.push(row.pid);
|
||||
relCats.push(parseInt(catId.trim(), 10));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (relPids.length > 0) {
|
||||
await localConnection.query(`
|
||||
INSERT INTO product_categories (pid, cat_id)
|
||||
SELECT * FROM UNNEST($1::bigint[], $2::int[])
|
||||
ON CONFLICT (pid, cat_id) DO NOTHING
|
||||
`, [relPids, relCats]);
|
||||
}
|
||||
}
|
||||
|
||||
outputProgress({
|
||||
status: "running",
|
||||
operation: "Products import",
|
||||
message: `Category assignments updated for ${changed.rows.length} products`
|
||||
});
|
||||
|
||||
return { updated: changed.rows.length };
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
importProducts,
|
||||
importMissingProducts,
|
||||
setupTemporaryTables,
|
||||
cleanupTemporaryTables,
|
||||
materializeCalculations,
|
||||
syncSupplierCosts
|
||||
syncSupplierCosts,
|
||||
syncProductCategories
|
||||
};
|
||||
@@ -72,6 +72,11 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
|
||||
|
||||
console.log('Purchase Orders: Using last sync time:', lastSyncTime, '(adjusted:', mysqlSyncTime, ')');
|
||||
|
||||
// Capture the next watermark from MySQL's own clock BEFORE querying any data.
|
||||
// Rows modified while the import runs stay above this watermark for the next
|
||||
// incremental run (overlap re-imports are harmless upserts).
|
||||
const [[{ source_now: sourceNow }]] = await prodConnection.query('SELECT NOW() as source_now');
|
||||
|
||||
// Create temp tables for processing
|
||||
await localConnection.query(`
|
||||
DROP TABLE IF EXISTS temp_purchase_orders;
|
||||
@@ -267,13 +272,16 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
|
||||
if (totalPOs === 0) {
|
||||
console.log('No purchase orders to process, skipping PO import step');
|
||||
} else {
|
||||
// Fetch and process POs in batches
|
||||
let offset = 0;
|
||||
// Fetch and process POs in batches using keyset pagination on po_id.
|
||||
// LIMIT/OFFSET over a date_updated predicate silently skips rows when
|
||||
// concurrent updates shift rows between pages.
|
||||
let processedPOCount = 0;
|
||||
let lastPoId = 0;
|
||||
let allPOsProcessed = false;
|
||||
|
||||
|
||||
while (!allPOsProcessed) {
|
||||
const [poList] = await prodConnection.query(`
|
||||
SELECT
|
||||
SELECT
|
||||
p.po_id,
|
||||
p.supplier_id,
|
||||
s.companyname AS vendor,
|
||||
@@ -286,21 +294,23 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
|
||||
FROM po p
|
||||
LEFT JOIN suppliers s ON p.supplier_id = s.supplierid
|
||||
WHERE p.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL ${yearInterval} YEAR)
|
||||
AND p.po_id > ?
|
||||
${incrementalUpdate ? `
|
||||
AND (
|
||||
p.date_updated > ?
|
||||
OR p.date_ordered > ?
|
||||
p.date_updated > ?
|
||||
OR p.date_ordered > ?
|
||||
OR p.date_estin > ?
|
||||
)
|
||||
` : ''}
|
||||
ORDER BY p.po_id
|
||||
LIMIT ${PO_BATCH_SIZE} OFFSET ${offset}
|
||||
`, incrementalUpdate ? [mysqlSyncTime, mysqlSyncTime, mysqlSyncTime] : []);
|
||||
|
||||
LIMIT ${PO_BATCH_SIZE}
|
||||
`, incrementalUpdate ? [lastPoId, mysqlSyncTime, mysqlSyncTime, mysqlSyncTime] : [lastPoId]);
|
||||
|
||||
if (poList.length === 0) {
|
||||
allPOsProcessed = true;
|
||||
break;
|
||||
}
|
||||
lastPoId = poList[poList.length - 1].po_id;
|
||||
|
||||
// Get products for these POs
|
||||
const poIds = poList.map(po => po.po_id);
|
||||
@@ -332,7 +342,11 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
|
||||
vendor: po.vendor || 'Unknown Vendor',
|
||||
date: validateDate(po.date_ordered) || validateDate(po.date_created),
|
||||
expected_date: validateDate(po.date_estin),
|
||||
status: poStatusMap[po.status] || 'created',
|
||||
// Unknown codes get a sentinel rather than 'created': defaulting an
|
||||
// unknown cancel-like code to an OPEN status would inflate on-order
|
||||
// FIFO (the metrics CTEs whitelist known-open statuses, so a sentinel
|
||||
// is simply ignored there).
|
||||
status: poStatusMap[po.status] || `unknown_${po.status}`,
|
||||
notes: po.notes || '',
|
||||
long_note: po.long_note || '',
|
||||
ordered: product.qty_each,
|
||||
@@ -393,20 +407,20 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
|
||||
`, values);
|
||||
}
|
||||
|
||||
offset += poList.length;
|
||||
processedPOCount += poList.length;
|
||||
totalProcessed += completePOs.length;
|
||||
|
||||
|
||||
outputProgress({
|
||||
status: "running",
|
||||
operation: "Purchase orders import",
|
||||
message: `Processed ${offset} of ${totalPOs} purchase orders (${totalProcessed} line items)`,
|
||||
current: offset,
|
||||
message: `Processed ${processedPOCount} of ${totalPOs} purchase orders (${totalProcessed} line items)`,
|
||||
current: processedPOCount,
|
||||
total: totalPOs,
|
||||
elapsed: formatElapsedTime(startTime),
|
||||
remaining: estimateRemaining(startTime, offset, totalPOs),
|
||||
rate: calculateRate(startTime, offset)
|
||||
remaining: estimateRemaining(startTime, processedPOCount, totalPOs),
|
||||
rate: calculateRate(startTime, processedPOCount)
|
||||
});
|
||||
|
||||
|
||||
if (poList.length < PO_BATCH_SIZE) {
|
||||
allPOsProcessed = true;
|
||||
}
|
||||
@@ -439,13 +453,14 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
|
||||
if (totalReceivings === 0) {
|
||||
console.log('No receivings to process, skipping receivings import step');
|
||||
} else {
|
||||
// Fetch and process receivings in batches
|
||||
offset = 0; // Reset offset for receivings
|
||||
// Fetch and process receivings in batches (keyset pagination, see POs above)
|
||||
let processedReceivingCount = 0;
|
||||
let lastReceivingId = 0;
|
||||
let allReceivingsProcessed = false;
|
||||
|
||||
|
||||
while (!allReceivingsProcessed) {
|
||||
const [receivingList] = await prodConnection.query(`
|
||||
SELECT
|
||||
SELECT
|
||||
r.receiving_id,
|
||||
r.supplier_id,
|
||||
r.status,
|
||||
@@ -459,6 +474,7 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
|
||||
r.date_checked
|
||||
FROM receivings r
|
||||
WHERE r.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL ${yearInterval} YEAR)
|
||||
AND r.receiving_id > ?
|
||||
${incrementalUpdate ? `
|
||||
AND (
|
||||
r.date_updated > ?
|
||||
@@ -466,13 +482,14 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
|
||||
)
|
||||
` : ''}
|
||||
ORDER BY r.receiving_id
|
||||
LIMIT ${PO_BATCH_SIZE} OFFSET ${offset}
|
||||
`, incrementalUpdate ? [mysqlSyncTime, mysqlSyncTime] : []);
|
||||
|
||||
LIMIT ${PO_BATCH_SIZE}
|
||||
`, incrementalUpdate ? [lastReceivingId, mysqlSyncTime, mysqlSyncTime] : [lastReceivingId]);
|
||||
|
||||
if (receivingList.length === 0) {
|
||||
allReceivingsProcessed = true;
|
||||
break;
|
||||
}
|
||||
lastReceivingId = receivingList[receivingList.length - 1].receiving_id;
|
||||
|
||||
// Get products for these receivings
|
||||
const receivingIds = receivingList.map(r => r.receiving_id);
|
||||
@@ -545,7 +562,8 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
|
||||
received_date: validateDate(product.received_date) || validateDate(product.receiving_created_date),
|
||||
receiving_created_date: validateDate(product.receiving_created_date),
|
||||
supplier_id: receiving.supplier_id,
|
||||
status: receivingStatusMap[receiving.status] || 'created'
|
||||
// Sentinel for unknown codes — see PO status mapping note above
|
||||
status: receivingStatusMap[receiving.status] || `unknown_${receiving.status}`
|
||||
});
|
||||
}
|
||||
|
||||
@@ -600,18 +618,18 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
|
||||
`, values);
|
||||
}
|
||||
|
||||
offset += receivingList.length;
|
||||
processedReceivingCount += receivingList.length;
|
||||
totalProcessed += completeReceivings.length;
|
||||
|
||||
|
||||
outputProgress({
|
||||
status: "running",
|
||||
operation: "Purchase orders import",
|
||||
message: `Processed ${offset} of ${totalReceivings} receivings (${totalProcessed} line items total)`,
|
||||
current: offset,
|
||||
message: `Processed ${processedReceivingCount} of ${totalReceivings} receivings (${totalProcessed} line items total)`,
|
||||
current: processedReceivingCount,
|
||||
total: totalReceivings,
|
||||
elapsed: formatElapsedTime(startTime),
|
||||
remaining: estimateRemaining(startTime, offset, totalReceivings),
|
||||
rate: calculateRate(startTime, offset)
|
||||
remaining: estimateRemaining(startTime, processedReceivingCount, totalReceivings),
|
||||
rate: calculateRate(startTime, processedReceivingCount)
|
||||
});
|
||||
|
||||
if (receivingList.length < PO_BATCH_SIZE) {
|
||||
@@ -829,13 +847,14 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
|
||||
receivingRecordsAdded = receivingsResult.rows.filter(r => r.inserted).length;
|
||||
receivingRecordsUpdated = receivingsResult.rows.filter(r => !r.inserted).length;
|
||||
|
||||
// Update sync status
|
||||
// Update sync status with the watermark captured from MySQL BEFORE the
|
||||
// source queries ran (see sourceNow above).
|
||||
await localConnection.query(`
|
||||
INSERT INTO sync_status (table_name, last_sync_timestamp)
|
||||
VALUES ('purchase_orders', NOW())
|
||||
VALUES ('purchase_orders', $1)
|
||||
ON CONFLICT (table_name) DO UPDATE SET
|
||||
last_sync_timestamp = NOW()
|
||||
`);
|
||||
last_sync_timestamp = $1
|
||||
`, [sourceNow]);
|
||||
|
||||
// Clean up temporary tables
|
||||
await localConnection.query(`
|
||||
|
||||
@@ -151,7 +151,10 @@ async function importStockSnapshots(prodConnection, localConnection, incremental
|
||||
|
||||
recordsAdded += batch.length;
|
||||
} catch (err) {
|
||||
// Fail the step: the next incremental starts at MAX(snapshot_date), so a
|
||||
// swallowed batch error would leave a permanent hole that is never revisited.
|
||||
console.error(`Error inserting batch at offset ${i} (date range ending ${currentDate}):`, err.message);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -165,7 +168,7 @@ async function importStockSnapshots(prodConnection, localConnection, incremental
|
||||
current: processedRows,
|
||||
total: totalRows,
|
||||
elapsed: formatElapsedTime(startTime),
|
||||
rate: calculateRate(processedRows, startTime)
|
||||
rate: calculateRate(startTime, processedRows)
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user