Update import scripts, working through categories

This commit is contained in:
2025-02-14 13:30:14 -05:00
parent cc22fd8c35
commit 9623681a15
6 changed files with 928 additions and 1106 deletions

View File

@@ -19,7 +19,6 @@ const IMPORT_PURCHASE_ORDERS = true;
const INCREMENTAL_UPDATE = process.env.INCREMENTAL_UPDATE !== 'false'; // Default to true unless explicitly set to false const INCREMENTAL_UPDATE = process.env.INCREMENTAL_UPDATE !== 'false'; // Default to true unless explicitly set to false
// SSH configuration // SSH configuration
// In import-from-prod.js
const sshConfig = { const sshConfig = {
ssh: { ssh: {
host: process.env.PROD_SSH_HOST, host: process.env.PROD_SSH_HOST,
@@ -31,6 +30,7 @@ const sshConfig = {
compress: true, // Enable SSH compression compress: true, // Enable SSH compression
}, },
prodDbConfig: { prodDbConfig: {
// MySQL config for production
host: process.env.PROD_DB_HOST || "localhost", host: process.env.PROD_DB_HOST || "localhost",
user: process.env.PROD_DB_USER, user: process.env.PROD_DB_USER,
password: process.env.PROD_DB_PASSWORD, password: process.env.PROD_DB_PASSWORD,
@@ -39,21 +39,16 @@ const sshConfig = {
timezone: 'Z', timezone: 'Z',
}, },
localDbConfig: { localDbConfig: {
// PostgreSQL config for local
host: process.env.DB_HOST, host: process.env.DB_HOST,
user: process.env.DB_USER, user: process.env.DB_USER,
password: process.env.DB_PASSWORD, password: process.env.DB_PASSWORD,
database: process.env.DB_NAME, database: process.env.DB_NAME,
multipleStatements: true, port: process.env.DB_PORT || 5432,
waitForConnections: true, ssl: process.env.DB_SSL === 'true',
connectionLimit: 10, connectionTimeoutMillis: 60000,
queueLimit: 0, idleTimeoutMillis: 30000,
namedPlaceholders: true, max: 10 // connection pool max size
connectTimeout: 60000,
enableKeepAlive: true,
keepAliveInitialDelay: 10000,
compress: true,
timezone: 'Z',
stringifyObjects: false,
} }
}; };
@@ -108,7 +103,7 @@ async function main() {
SET SET
status = 'cancelled', status = 'cancelled',
end_time = NOW(), end_time = NOW(),
duration_seconds = TIMESTAMPDIFF(SECOND, start_time, NOW()), duration_seconds = EXTRACT(EPOCH FROM (NOW() - start_time))::INTEGER,
error_message = 'Previous import was not completed properly' error_message = 'Previous import was not completed properly'
WHERE status = 'running' WHERE status = 'running'
`); `);
@@ -118,9 +113,10 @@ async function main() {
CREATE TABLE IF NOT EXISTS sync_status ( CREATE TABLE IF NOT EXISTS sync_status (
table_name VARCHAR(50) PRIMARY KEY, table_name VARCHAR(50) PRIMARY KEY,
last_sync_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, last_sync_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_sync_id BIGINT, last_sync_id BIGINT
INDEX idx_last_sync (last_sync_timestamp)
); );
CREATE INDEX IF NOT EXISTS idx_last_sync ON sync_status (last_sync_timestamp);
`); `);
// Create import history record for the overall session // Create import history record for the overall session
@@ -134,17 +130,17 @@ async function main() {
) VALUES ( ) VALUES (
'all_tables', 'all_tables',
NOW(), NOW(),
?, $1::boolean,
'running', 'running',
JSON_OBJECT( jsonb_build_object(
'categories_enabled', ?, 'categories_enabled', $2::boolean,
'products_enabled', ?, 'products_enabled', $3::boolean,
'orders_enabled', ?, 'orders_enabled', $4::boolean,
'purchase_orders_enabled', ? 'purchase_orders_enabled', $5::boolean
) )
) ) RETURNING id
`, [INCREMENTAL_UPDATE, IMPORT_CATEGORIES, IMPORT_PRODUCTS, IMPORT_ORDERS, IMPORT_PURCHASE_ORDERS]); `, [INCREMENTAL_UPDATE, IMPORT_CATEGORIES, IMPORT_PRODUCTS, IMPORT_ORDERS, IMPORT_PURCHASE_ORDERS]);
importHistoryId = historyResult.insertId; importHistoryId = historyResult.rows[0].id;
const results = { const results = {
categories: null, categories: null,
@@ -201,21 +197,21 @@ async function main() {
UPDATE import_history UPDATE import_history
SET SET
end_time = NOW(), end_time = NOW(),
duration_seconds = ?, duration_seconds = $1,
records_added = ?, records_added = $2,
records_updated = ?, records_updated = $3,
status = 'completed', status = 'completed',
additional_info = JSON_OBJECT( additional_info = jsonb_build_object(
'categories_enabled', ?, 'categories_enabled', $4,
'products_enabled', ?, 'products_enabled', $5,
'orders_enabled', ?, 'orders_enabled', $6,
'purchase_orders_enabled', ?, 'purchase_orders_enabled', $7,
'categories_result', CAST(? AS JSON), 'categories_result', $8::jsonb,
'products_result', CAST(? AS JSON), 'products_result', $9::jsonb,
'orders_result', CAST(? AS JSON), 'orders_result', $10::jsonb,
'purchase_orders_result', CAST(? AS JSON) 'purchase_orders_result', $11::jsonb
) )
WHERE id = ? WHERE id = $12
`, [ `, [
totalElapsedSeconds, totalElapsedSeconds,
totalRecordsAdded, totalRecordsAdded,
@@ -259,10 +255,10 @@ async function main() {
UPDATE import_history UPDATE import_history
SET SET
end_time = NOW(), end_time = NOW(),
duration_seconds = ?, duration_seconds = $1,
status = ?, status = $2,
error_message = ? error_message = $3
WHERE id = ? WHERE id = $4
`, [totalElapsedSeconds, error.message === "Import cancelled" ? 'cancelled' : 'failed', error.message, importHistoryId]); `, [totalElapsedSeconds, error.message === "Import cancelled" ? 'cancelled' : 'failed', error.message, importHistoryId]);
} }

View File

@@ -9,170 +9,206 @@ async function importCategories(prodConnection, localConnection) {
const startTime = Date.now(); const startTime = Date.now();
const typeOrder = [10, 20, 11, 21, 12, 13]; const typeOrder = [10, 20, 11, 21, 12, 13];
let totalInserted = 0; let totalInserted = 0;
let totalUpdated = 0;
let skippedCategories = []; let skippedCategories = [];
try { try {
// Process each type in order with its own query // Start a single transaction for the entire import
await localConnection.query('BEGIN');
// Process each type in order with its own savepoint
for (const type of typeOrder) { for (const type of typeOrder) {
const [categories] = await prodConnection.query( try {
` // Create a savepoint for this type
SELECT await localConnection.query(`SAVEPOINT category_type_${type}`);
pc.cat_id,
pc.name,
pc.type,
CASE
WHEN pc.type IN (10, 20) THEN NULL -- Top level categories should have no parent
WHEN pc.master_cat_id IS NULL THEN NULL
ELSE pc.master_cat_id
END as parent_id,
pc.combined_name as description
FROM product_categories pc
WHERE pc.type = ?
ORDER BY pc.cat_id
`,
[type]
);
if (categories.length === 0) continue; // Production query remains MySQL compatible
const [categories] = await prodConnection.query(
console.log(`\nProcessing ${categories.length} type ${type} categories`); `
if (type === 10) { SELECT
console.log("Type 10 categories:", JSON.stringify(categories, null, 2)); pc.cat_id,
} pc.name,
pc.type,
// For types that can have parents (11, 21, 12, 13), verify parent existence CASE
let categoriesToInsert = categories; WHEN pc.type IN (10, 20) THEN NULL -- Top level categories should have no parent
if (![10, 20].includes(type)) { WHEN pc.master_cat_id IS NULL THEN NULL
// Get all parent IDs ELSE pc.master_cat_id
const parentIds = [ END as parent_id,
...new Set( pc.combined_name as description
categories.map((c) => c.parent_id).filter((id) => id !== null) FROM product_categories pc
), WHERE pc.type = ?
]; ORDER BY pc.cat_id
`,
// Check which parents exist [type]
const [existingParents] = await localConnection.query(
"SELECT cat_id FROM categories WHERE cat_id IN (?)",
[parentIds]
);
const existingParentIds = new Set(existingParents.map((p) => p.cat_id));
// Filter categories and track skipped ones
categoriesToInsert = categories.filter(
(cat) =>
cat.parent_id === null || existingParentIds.has(cat.parent_id)
);
const invalidCategories = categories.filter(
(cat) =>
cat.parent_id !== null && !existingParentIds.has(cat.parent_id)
); );
if (invalidCategories.length > 0) { if (categories.length === 0) {
const skippedInfo = invalidCategories.map((c) => ({ await localConnection.query(`RELEASE SAVEPOINT category_type_${type}`);
id: c.cat_id, continue;
name: c.name, }
type: c.type,
missing_parent: c.parent_id,
}));
skippedCategories.push(...skippedInfo);
console.log( console.log(`\nProcessing ${categories.length} type ${type} categories`);
"\nSkipping categories with missing parents:", if (type === 10) {
invalidCategories console.log("Type 10 categories:", JSON.stringify(categories, null, 2));
.map( }
(c) =>
`${c.cat_id} - ${c.name} (missing parent: ${c.parent_id})` // For types that can have parents (11, 21, 12, 13), verify parent existence
) let categoriesToInsert = categories;
.join("\n") if (![10, 20].includes(type)) {
); // Get all parent IDs
const parentIds = [
...new Set(
categories
.filter(c => c && c.parent_id !== null)
.map(c => c.parent_id)
),
];
console.log(`Processing ${categories.length} type ${type} categories with ${parentIds.length} unique parent IDs`);
console.log('Parent IDs:', parentIds);
// No need to check for parent existence - we trust they exist since they were just inserted
categoriesToInsert = categories;
} }
if (categoriesToInsert.length === 0) { if (categoriesToInsert.length === 0) {
console.log( console.log(
`No valid categories of type ${type} to insert - all had missing parents` `No valid categories of type ${type} to insert`
); );
await localConnection.query(`RELEASE SAVEPOINT category_type_${type}`);
continue; continue;
} }
console.log(
`Inserting ${categoriesToInsert.length} type ${type} categories`
);
// PostgreSQL upsert query with parameterized values
const values = categoriesToInsert.flatMap((cat) => [
cat.cat_id,
cat.name,
cat.type,
cat.parent_id,
cat.description,
'active',
new Date(),
new Date()
]);
console.log('Attempting to insert/update with values:', JSON.stringify(values, null, 2));
const placeholders = categoriesToInsert
.map((_, i) => `($${i * 8 + 1}, $${i * 8 + 2}, $${i * 8 + 3}, $${i * 8 + 4}, $${i * 8 + 5}, $${i * 8 + 6}, $${i * 8 + 7}, $${i * 8 + 8})`)
.join(',');
console.log('Using placeholders:', placeholders);
// Insert categories with ON CONFLICT clause for PostgreSQL
const query = `
WITH inserted_categories AS (
INSERT INTO categories (
cat_id, name, type, parent_id, description, status, created_at, updated_at
)
VALUES ${placeholders}
ON CONFLICT (cat_id) DO UPDATE SET
name = EXCLUDED.name,
type = EXCLUDED.type,
parent_id = EXCLUDED.parent_id,
description = EXCLUDED.description,
status = EXCLUDED.status,
updated_at = EXCLUDED.updated_at
RETURNING
cat_id,
CASE
WHEN xmax = 0 THEN true
ELSE false
END as is_insert
)
SELECT
COUNT(*) as total,
COUNT(*) FILTER (WHERE is_insert) as inserted,
COUNT(*) FILTER (WHERE NOT is_insert) as updated
FROM inserted_categories`;
console.log('Executing query:', query);
const result = await localConnection.query(query, values);
console.log('Query result:', result);
// Get the first result since query returns an array
const queryResult = Array.isArray(result) ? result[0] : result;
if (!queryResult || !queryResult.rows || !queryResult.rows[0]) {
console.error('Query failed to return results. Result:', queryResult);
throw new Error('Query did not return expected results');
}
const total = parseInt(queryResult.rows[0].total) || 0;
const inserted = parseInt(queryResult.rows[0].inserted) || 0;
const updated = parseInt(queryResult.rows[0].updated) || 0;
console.log(`Total: ${total}, Inserted: ${inserted}, Updated: ${updated}`);
totalInserted += inserted;
totalUpdated += updated;
// Release the savepoint for this type
await localConnection.query(`RELEASE SAVEPOINT category_type_${type}`);
outputProgress({
status: "running",
operation: "Categories import",
message: `Imported ${inserted} (updated ${updated}) categories of type ${type}`,
current: totalInserted + totalUpdated,
total: categories.length,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
});
} catch (error) {
// Rollback to the savepoint for this type
await localConnection.query(`ROLLBACK TO SAVEPOINT category_type_${type}`);
throw error;
} }
console.log(
`Inserting ${categoriesToInsert.length} type ${type} categories`
);
const placeholders = categoriesToInsert
.map(() => "(?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)")
.join(",");
const values = categoriesToInsert.flatMap((cat) => [
cat.cat_id,
cat.name,
cat.type,
cat.parent_id,
cat.description,
"active",
]);
// Insert categories and create relationships in one query to avoid race conditions
await localConnection.query(
`
INSERT INTO categories (cat_id, name, type, parent_id, description, status, created_at, updated_at)
VALUES ${placeholders}
ON DUPLICATE KEY UPDATE
name = VALUES(name),
type = VALUES(type),
parent_id = VALUES(parent_id),
description = VALUES(description),
status = VALUES(status),
updated_at = CURRENT_TIMESTAMP
`,
values
);
totalInserted += categoriesToInsert.length;
outputProgress({
status: "running",
operation: "Categories import",
current: totalInserted,
total: totalInserted,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
});
} }
// After all imports, if we skipped any categories, throw an error // Commit the entire transaction - we'll do this even if we have skipped categories
if (skippedCategories.length > 0) { await localConnection.query('COMMIT');
const error = new Error(
"Categories import completed with errors - some categories were skipped due to missing parents"
);
error.skippedCategories = skippedCategories;
throw error;
}
outputProgress({ outputProgress({
status: "complete", status: "complete",
operation: "Categories import completed", operation: "Categories import completed",
current: totalInserted, current: totalInserted + totalUpdated,
total: totalInserted, total: totalInserted + totalUpdated,
duration: formatElapsedTime((Date.now() - startTime) / 1000), duration: formatElapsedTime((Date.now() - startTime) / 1000),
warnings: skippedCategories.length > 0 ? {
message: "Some categories were skipped due to missing parents",
skippedCategories
} : undefined
}); });
return { return {
status: "complete", status: "complete",
totalImported: totalInserted recordsAdded: totalInserted,
recordsUpdated: totalUpdated,
totalRecords: totalInserted + totalUpdated,
warnings: skippedCategories.length > 0 ? {
message: "Some categories were skipped due to missing parents",
skippedCategories
} : undefined
}; };
} catch (error) { } catch (error) {
console.error("Error importing categories:", error); console.error("Error importing categories:", error);
if (error.skippedCategories) {
console.error( // Only rollback if we haven't committed yet
"Skipped categories:", try {
JSON.stringify(error.skippedCategories, null, 2) await localConnection.query('ROLLBACK');
); } catch (rollbackError) {
console.error("Error during rollback:", rollbackError);
} }
outputProgress({ outputProgress({
status: "error", status: "error",
operation: "Categories import failed", operation: "Categories import failed",
error: error.message, error: error.message
skippedCategories: error.skippedCategories
}); });
throw error; throw error;

View File

@@ -2,7 +2,7 @@ const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } =
const { importMissingProducts, setupTemporaryTables, cleanupTemporaryTables, materializeCalculations } = require('./products'); const { importMissingProducts, setupTemporaryTables, cleanupTemporaryTables, materializeCalculations } = require('./products');
/** /**
* Imports orders from a production MySQL database to a local MySQL database. * Imports orders from a production MySQL database to a local PostgreSQL database.
* It can run in two modes: * It can run in two modes:
* 1. Incremental update mode (default): Only fetch orders that have changed since the last sync time. * 1. Incremental update mode (default): Only fetch orders that have changed since the last sync time.
* 2. Full update mode: Fetch all eligible orders within the last 5 years regardless of timestamp. * 2. Full update mode: Fetch all eligible orders within the last 5 years regardless of timestamp.
@@ -23,93 +23,18 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
let importedCount = 0; let importedCount = 0;
let totalOrderItems = 0; let totalOrderItems = 0;
let totalUniqueOrders = 0; let totalUniqueOrders = 0;
// Add a cumulative counter for processed orders before the loop
let cumulativeProcessedOrders = 0; let cumulativeProcessedOrders = 0;
try { try {
// Clean up any existing temp tables first
await localConnection.query(`
DROP TEMPORARY TABLE IF EXISTS temp_order_items;
DROP TEMPORARY TABLE IF EXISTS temp_order_meta;
DROP TEMPORARY TABLE IF EXISTS temp_order_discounts;
DROP TEMPORARY TABLE IF EXISTS temp_order_taxes;
DROP TEMPORARY TABLE IF EXISTS temp_order_costs;
`);
// Create all temp tables with correct schema
await localConnection.query(`
CREATE TEMPORARY TABLE temp_order_items (
order_id INT UNSIGNED NOT NULL,
pid INT UNSIGNED NOT NULL,
SKU VARCHAR(50) NOT NULL,
price DECIMAL(10,2) NOT NULL,
quantity INT NOT NULL,
base_discount DECIMAL(10,2) DEFAULT 0,
PRIMARY KEY (order_id, pid)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
`);
await localConnection.query(`
CREATE TEMPORARY TABLE temp_order_meta (
order_id INT UNSIGNED NOT NULL,
date DATE NOT NULL,
customer VARCHAR(100) NOT NULL,
customer_name VARCHAR(150) NOT NULL,
status INT,
canceled TINYINT(1),
summary_discount DECIMAL(10,2) DEFAULT 0.00,
summary_subtotal DECIMAL(10,2) DEFAULT 0.00,
PRIMARY KEY (order_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
`);
await localConnection.query(`
CREATE TEMPORARY TABLE temp_order_discounts (
order_id INT UNSIGNED NOT NULL,
pid INT UNSIGNED NOT NULL,
discount DECIMAL(10,2) NOT NULL,
PRIMARY KEY (order_id, pid)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
`);
await localConnection.query(`
CREATE TEMPORARY TABLE temp_order_taxes (
order_id INT UNSIGNED NOT NULL,
pid INT UNSIGNED NOT NULL,
tax DECIMAL(10,2) NOT NULL,
PRIMARY KEY (order_id, pid)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
`);
await localConnection.query(`
CREATE TEMPORARY TABLE temp_order_costs (
order_id INT UNSIGNED NOT NULL,
pid INT UNSIGNED NOT NULL,
costeach DECIMAL(10,3) DEFAULT 0.000,
PRIMARY KEY (order_id, pid)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
`);
// Get column names from the local table
const [columns] = await localConnection.query(`
SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = 'orders'
AND COLUMN_NAME != 'updated' -- Exclude the updated column
ORDER BY ORDINAL_POSITION
`);
const columnNames = columns.map(col => col.COLUMN_NAME);
// Get last sync info // Get last sync info
const [syncInfo] = await localConnection.query( const [syncInfo] = await localConnection.query(
"SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'orders'" "SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'orders'"
); );
const lastSyncTime = syncInfo?.[0]?.last_sync_timestamp || '1970-01-01'; const lastSyncTime = syncInfo?.rows?.[0]?.last_sync_timestamp || '1970-01-01';
console.log('Orders: Using last sync time:', lastSyncTime); console.log('Orders: Using last sync time:', lastSyncTime);
// First get count of order items // First get count of order items - Keep MySQL compatible for production
const [[{ total }]] = await prodConnection.query(` const [[{ total }]] = await prodConnection.query(`
SELECT COUNT(*) as total SELECT COUNT(*) as total
FROM order_items oi FROM order_items oi
@@ -141,7 +66,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
totalOrderItems = total; totalOrderItems = total;
console.log('Orders: Found changes:', totalOrderItems); console.log('Orders: Found changes:', totalOrderItems);
// Get order items in batches // Get order items - Keep MySQL compatible for production
const [orderItems] = await prodConnection.query(` const [orderItems] = await prodConnection.query(`
SELECT SELECT
oi.order_id, oi.order_id,
@@ -179,10 +104,64 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
console.log('Orders: Processing', orderItems.length, 'order items'); console.log('Orders: Processing', orderItems.length, 'order items');
// Create temporary tables in PostgreSQL
await localConnection.query(`
DROP TABLE IF EXISTS temp_order_items;
DROP TABLE IF EXISTS temp_order_meta;
DROP TABLE IF EXISTS temp_order_discounts;
DROP TABLE IF EXISTS temp_order_taxes;
DROP TABLE IF EXISTS temp_order_costs;
CREATE TEMP TABLE temp_order_items (
order_id INTEGER NOT NULL,
pid INTEGER NOT NULL,
SKU VARCHAR(50) NOT NULL,
price DECIMAL(10,2) NOT NULL,
quantity INTEGER NOT NULL,
base_discount DECIMAL(10,2) DEFAULT 0,
PRIMARY KEY (order_id, pid)
);
CREATE TEMP TABLE temp_order_meta (
order_id INTEGER NOT NULL,
date DATE NOT NULL,
customer VARCHAR(100) NOT NULL,
customer_name VARCHAR(150) NOT NULL,
status INTEGER,
canceled BOOLEAN,
summary_discount DECIMAL(10,2) DEFAULT 0.00,
summary_subtotal DECIMAL(10,2) DEFAULT 0.00,
PRIMARY KEY (order_id)
);
CREATE TEMP TABLE temp_order_discounts (
order_id INTEGER NOT NULL,
pid INTEGER NOT NULL,
discount DECIMAL(10,2) NOT NULL,
PRIMARY KEY (order_id, pid)
);
CREATE TEMP TABLE temp_order_taxes (
order_id INTEGER NOT NULL,
pid INTEGER NOT NULL,
tax DECIMAL(10,2) NOT NULL,
PRIMARY KEY (order_id, pid)
);
CREATE TEMP TABLE temp_order_costs (
order_id INTEGER NOT NULL,
pid INTEGER NOT NULL,
costeach DECIMAL(10,3) DEFAULT 0.000,
PRIMARY KEY (order_id, pid)
);
`);
// Insert order items in batches // Insert order items in batches
for (let i = 0; i < orderItems.length; i += 5000) { for (let i = 0; i < orderItems.length; i += 5000) {
const batch = orderItems.slice(i, Math.min(i + 5000, orderItems.length)); const batch = orderItems.slice(i, Math.min(i + 5000, orderItems.length));
const placeholders = batch.map(() => "(?, ?, ?, ?, ?, ?)").join(","); const placeholders = batch.map((_, idx) =>
`($${idx * 6 + 1}, $${idx * 6 + 2}, $${idx * 6 + 3}, $${idx * 6 + 4}, $${idx * 6 + 5}, $${idx * 6 + 6})`
).join(",");
const values = batch.flatMap(item => [ const values = batch.flatMap(item => [
item.order_id, item.pid, item.SKU, item.price, item.quantity, item.base_discount item.order_id, item.pid, item.SKU, item.price, item.quantity, item.base_discount
]); ]);
@@ -190,11 +169,11 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
await localConnection.query(` await localConnection.query(`
INSERT INTO temp_order_items (order_id, pid, SKU, price, quantity, base_discount) INSERT INTO temp_order_items (order_id, pid, SKU, price, quantity, base_discount)
VALUES ${placeholders} VALUES ${placeholders}
ON DUPLICATE KEY UPDATE ON CONFLICT (order_id, pid) DO UPDATE SET
SKU = VALUES(SKU), SKU = EXCLUDED.SKU,
price = VALUES(price), price = EXCLUDED.price,
quantity = VALUES(quantity), quantity = EXCLUDED.quantity,
base_discount = VALUES(base_discount) base_discount = EXCLUDED.base_discount
`, values); `, values);
processedCount = i + batch.length; processedCount = i + batch.length;
@@ -215,11 +194,10 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
// Reset processed count for order processing phase // Reset processed count for order processing phase
processedCount = 0; processedCount = 0;
// Get order metadata in batches // Get order metadata in batches - Keep MySQL compatible for production
for (let i = 0; i < orderIds.length; i += 5000) { for (let i = 0; i < orderIds.length; i += 5000) {
const batchIds = orderIds.slice(i, i + 5000); const batchIds = orderIds.slice(i, i + 5000);
console.log(`Processing batch ${i/5000 + 1}, size: ${batchIds.length}`); console.log(`Processing batch ${i/5000 + 1}, size: ${batchIds.length}`);
console.log('Sample of batch IDs:', batchIds.slice(0, 5));
const [orders] = await prodConnection.query(` const [orders] = await prodConnection.query(`
SELECT SELECT
@@ -235,48 +213,40 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
LEFT JOIN users u ON o.order_cid = u.cid LEFT JOIN users u ON o.order_cid = u.cid
WHERE o.order_id IN (?) WHERE o.order_id IN (?)
`, [batchIds]); `, [batchIds]);
console.log(`Retrieved ${orders.length} orders for ${batchIds.length} IDs`); // Insert into PostgreSQL temp table
const duplicates = orders.filter((order, index, self) => if (orders.length > 0) {
self.findIndex(o => o.order_id === order.order_id) !== index const placeholders = orders.map((_, idx) =>
); `($${idx * 8 + 1}, $${idx * 8 + 2}, $${idx * 8 + 3}, $${idx * 8 + 4}, $${idx * 8 + 5}, $${idx * 8 + 6}, $${idx * 8 + 7}, $${idx * 8 + 8})`
if (duplicates.length > 0) { ).join(",");
console.log('Found duplicates:', duplicates); const values = orders.flatMap(order => [
order.order_id,
order.date,
order.customer,
order.customer_name,
order.status,
order.canceled,
order.summary_discount,
order.summary_subtotal
]);
await localConnection.query(`
INSERT INTO temp_order_meta (
order_id, date, customer, customer_name, status, canceled,
summary_discount, summary_subtotal
)
VALUES ${placeholders}
ON CONFLICT (order_id) DO UPDATE SET
date = EXCLUDED.date,
customer = EXCLUDED.customer,
customer_name = EXCLUDED.customer_name,
status = EXCLUDED.status,
canceled = EXCLUDED.canceled,
summary_discount = EXCLUDED.summary_discount,
summary_subtotal = EXCLUDED.summary_subtotal
`, values);
} }
const placeholders = orders.map(() => "(?, ?, ?, ?, ?, ?, ?, ?)").join(",");
const values = orders.flatMap(order => [
order.order_id,
order.date,
order.customer,
order.customer_name,
order.status,
order.canceled,
order.summary_discount,
order.summary_subtotal
]);
await localConnection.query(`
INSERT INTO temp_order_meta (
order_id,
date,
customer,
customer_name,
status,
canceled,
summary_discount,
summary_subtotal
) VALUES ${placeholders}
ON DUPLICATE KEY UPDATE
date = VALUES(date),
customer = VALUES(customer),
customer_name = VALUES(customer_name),
status = VALUES(status),
canceled = VALUES(canceled),
summary_discount = VALUES(summary_discount),
summary_subtotal = VALUES(summary_subtotal)
`, values);
processedCount = i + orders.length; processedCount = i + orders.length;
outputProgress({ outputProgress({
status: "running", status: "running",
@@ -287,10 +257,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
}); });
} }
// Reset processed count for final phase // Process promotional discounts in batches - Keep MySQL compatible for production
processedCount = 0;
// Get promotional discounts in batches
for (let i = 0; i < orderIds.length; i += 5000) { for (let i = 0; i < orderIds.length; i += 5000) {
const batchIds = orderIds.slice(i, i + 5000); const batchIds = orderIds.slice(i, i + 5000);
const [discounts] = await prodConnection.query(` const [discounts] = await prodConnection.query(`
@@ -301,18 +268,21 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
`, [batchIds]); `, [batchIds]);
if (discounts.length > 0) { if (discounts.length > 0) {
const placeholders = discounts.map(() => "(?, ?, ?)").join(","); const placeholders = discounts.map((_, idx) =>
`($${idx * 3 + 1}, $${idx * 3 + 2}, $${idx * 3 + 3})`
).join(",");
const values = discounts.flatMap(d => [d.order_id, d.pid, d.discount]); const values = discounts.flatMap(d => [d.order_id, d.pid, d.discount]);
await localConnection.query(` await localConnection.query(`
INSERT INTO temp_order_discounts VALUES ${placeholders} INSERT INTO temp_order_discounts (order_id, pid, discount)
ON DUPLICATE KEY UPDATE VALUES ${placeholders}
discount = VALUES(discount) ON CONFLICT (order_id, pid) DO UPDATE SET
discount = EXCLUDED.discount
`, values); `, values);
} }
} }
// Get tax information in batches // Get tax information in batches - Keep MySQL compatible for production
for (let i = 0; i < orderIds.length; i += 5000) { for (let i = 0; i < orderIds.length; i += 5000) {
const batchIds = orderIds.slice(i, i + 5000); const batchIds = orderIds.slice(i, i + 5000);
const [taxes] = await prodConnection.query(` const [taxes] = await prodConnection.query(`
@@ -331,7 +301,6 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
`, [batchIds]); `, [batchIds]);
if (taxes.length > 0) { if (taxes.length > 0) {
// Remove any duplicates before inserting
const uniqueTaxes = new Map(); const uniqueTaxes = new Map();
taxes.forEach(t => { taxes.forEach(t => {
const key = `${t.order_id}-${t.pid}`; const key = `${t.order_id}-${t.pid}`;
@@ -340,16 +309,21 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
const values = Array.from(uniqueTaxes.values()).flatMap(t => [t.order_id, t.pid, t.tax]); const values = Array.from(uniqueTaxes.values()).flatMap(t => [t.order_id, t.pid, t.tax]);
if (values.length > 0) { if (values.length > 0) {
const placeholders = Array(uniqueTaxes.size).fill("(?, ?, ?)").join(","); const placeholders = Array.from({length: uniqueTaxes.size}, (_, idx) => {
const base = idx * 3;
return `($${base + 1}, $${base + 2}, $${base + 3})`;
}).join(",");
await localConnection.query(` await localConnection.query(`
INSERT INTO temp_order_taxes VALUES ${placeholders} INSERT INTO temp_order_taxes (order_id, pid, tax)
ON DUPLICATE KEY UPDATE tax = VALUES(tax) VALUES ${placeholders}
ON CONFLICT (order_id, pid) DO UPDATE SET
tax = EXCLUDED.tax
`, values); `, values);
} }
} }
} }
// Get costeach values in batches // Get costeach values in batches - Keep MySQL compatible for production
for (let i = 0; i < orderIds.length; i += 5000) { for (let i = 0; i < orderIds.length; i += 5000) {
const batchIds = orderIds.slice(i, i + 5000); const batchIds = orderIds.slice(i, i + 5000);
const [costs] = await prodConnection.query(` const [costs] = await prodConnection.query(`
@@ -370,24 +344,27 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
`, [batchIds]); `, [batchIds]);
if (costs.length > 0) { if (costs.length > 0) {
const placeholders = costs.map(() => '(?, ?, ?)').join(","); const placeholders = costs.map((_, idx) =>
`($${idx * 3 + 1}, $${idx * 3 + 2}, $${idx * 3 + 3})`
).join(",");
const values = costs.flatMap(c => [c.order_id, c.pid, c.costeach || 0]); const values = costs.flatMap(c => [c.order_id, c.pid, c.costeach || 0]);
await localConnection.query(` await localConnection.query(`
INSERT INTO temp_order_costs (order_id, pid, costeach) INSERT INTO temp_order_costs (order_id, pid, costeach)
VALUES ${placeholders} VALUES ${placeholders}
ON DUPLICATE KEY UPDATE costeach = VALUES(costeach) ON CONFLICT (order_id, pid) DO UPDATE SET
costeach = EXCLUDED.costeach
`, values); `, values);
} }
} }
// Now combine all the data and insert into orders table // Pre-check all products at once
// Pre-check all products at once instead of per batch
const allOrderPids = [...new Set(orderItems.map(item => item.pid))]; const allOrderPids = [...new Set(orderItems.map(item => item.pid))];
const [existingProducts] = allOrderPids.length > 0 ? await localConnection.query( const [existingProducts] = allOrderPids.length > 0 ? await localConnection.query(
"SELECT pid FROM products WHERE pid IN (?)", "SELECT pid FROM products WHERE pid = ANY($1)",
[allOrderPids] [allOrderPids]
) : [[]]; ) : [[]];
const existingPids = new Set(existingProducts.map(p => p.pid)); const existingPids = new Set(existingProducts.rows.map(p => p.pid));
// Process in larger batches // Process in larger batches
for (let i = 0; i < orderIds.length; i += 5000) { for (let i = 0; i < orderIds.length; i += 5000) {
@@ -410,7 +387,7 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
ELSE 0 ELSE 0
END as discount, END as discount,
COALESCE(ot.tax, 0) as tax, COALESCE(ot.tax, 0) as tax,
0 as tax_included, false as tax_included,
0 as shipping, 0 as shipping,
om.customer, om.customer,
om.customer_name, om.customer_name,
@@ -422,61 +399,86 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
LEFT JOIN temp_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid LEFT JOIN temp_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid
LEFT JOIN temp_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid LEFT JOIN temp_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid
LEFT JOIN temp_order_costs tc ON oi.order_id = tc.order_id AND oi.pid = tc.pid LEFT JOIN temp_order_costs tc ON oi.order_id = tc.order_id AND oi.pid = tc.pid
WHERE oi.order_id IN (?) WHERE oi.order_id = ANY($1)
`, [batchIds]); `, [batchIds]);
// Filter orders and track missing products - do this in a single pass // Filter orders and track missing products
const validOrders = []; const validOrders = [];
const values = []; const values = [];
const processedOrderItems = new Set(); // Track unique order items const processedOrderItems = new Set();
const processedOrders = new Set(); // Track unique orders const processedOrders = new Set();
for (const order of orders) { for (const order of orders.rows) {
if (!existingPids.has(order.pid)) { if (!existingPids.has(order.pid)) {
missingProducts.add(order.pid); missingProducts.add(order.pid);
skippedOrders.add(order.order_number); skippedOrders.add(order.order_number);
continue; continue;
} }
validOrders.push(order); validOrders.push(order);
values.push(...columnNames.map(col => order[col] ?? null));
processedOrderItems.add(`${order.order_number}-${order.pid}`); processedOrderItems.add(`${order.order_number}-${order.pid}`);
processedOrders.add(order.order_number); processedOrders.add(order.order_number);
} }
if (validOrders.length > 0) { if (validOrders.length > 0) {
// Pre-compute the placeholders string once const placeholders = validOrders.map((_, idx) => {
const singlePlaceholder = `(${columnNames.map(() => "?").join(",")})`; const base = idx * 15;
const placeholders = Array(validOrders.length).fill(singlePlaceholder).join(","); return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, $${base + 10}, $${base + 11}, $${base + 12}, $${base + 13}, $${base + 14}, $${base + 15})`;
}).join(',');
const values = validOrders.flatMap(o => [
o.order_number,
o.pid,
o.SKU,
o.date,
o.price,
o.quantity,
o.discount,
o.tax,
o.tax_included,
o.shipping,
o.customer,
o.customer_name,
o.status,
o.canceled,
o.costeach
]);
const [result] = await localConnection.query(`
WITH inserted_orders AS (
INSERT INTO orders (
order_number, pid, SKU, date, price, quantity, discount,
tax, tax_included, shipping, customer, customer_name,
status, canceled, costeach
)
VALUES ${placeholders}
ON CONFLICT (order_number, pid) DO UPDATE SET
SKU = EXCLUDED.SKU,
date = EXCLUDED.date,
price = EXCLUDED.price,
quantity = EXCLUDED.quantity,
discount = EXCLUDED.discount,
tax = EXCLUDED.tax,
tax_included = EXCLUDED.tax_included,
shipping = EXCLUDED.shipping,
customer = EXCLUDED.customer,
customer_name = EXCLUDED.customer_name,
status = EXCLUDED.status,
canceled = EXCLUDED.canceled,
costeach = EXCLUDED.costeach
RETURNING xmax, xmin
)
SELECT
COUNT(*) FILTER (WHERE xmax = 0) as inserted,
COUNT(*) FILTER (WHERE xmax <> 0) as updated
FROM inserted_orders
`, values);
const result = await localConnection.query(` const { inserted, updated } = result.rows[0];
INSERT INTO orders (${columnNames.join(",")}) recordsAdded += inserted;
VALUES ${placeholders} recordsUpdated += updated;
ON DUPLICATE KEY UPDATE importedCount += processedOrderItems.size;
SKU = VALUES(SKU),
date = VALUES(date),
price = VALUES(price),
quantity = VALUES(quantity),
discount = VALUES(discount),
tax = VALUES(tax),
tax_included = VALUES(tax_included),
shipping = VALUES(shipping),
customer = VALUES(customer),
customer_name = VALUES(customer_name),
status = VALUES(status),
canceled = VALUES(canceled),
costeach = VALUES(costeach)
`, validOrders.map(o => columnNames.map(col => o[col] ?? null)).flat());
const affectedRows = result[0].affectedRows;
const updates = Math.floor(affectedRows / 2);
const inserts = affectedRows - (updates * 2);
recordsAdded += inserts;
recordsUpdated += updates;
importedCount += processedOrderItems.size; // Count unique order items processed
} }
// Update progress based on unique orders processed
cumulativeProcessedOrders += processedOrders.size; cumulativeProcessedOrders += processedOrders.size;
outputProgress({ outputProgress({
status: "running", status: "running",
@@ -490,123 +492,21 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
}); });
} }
// Now try to import any orders that were skipped due to missing products // Clean up temporary tables
if (skippedOrders.size > 0) {
try {
outputProgress({
status: "running",
operation: "Orders import",
message: `Retrying import of ${skippedOrders.size} orders with previously missing products`,
});
// Get the orders that were skipped
const [skippedProdOrders] = await localConnection.query(`
SELECT DISTINCT
oi.order_id as order_number,
oi.pid,
oi.SKU,
om.date,
oi.price,
oi.quantity,
oi.base_discount + COALESCE(od.discount, 0) +
CASE
WHEN o.summary_discount > 0 THEN
ROUND((o.summary_discount * (oi.price * oi.quantity)) /
NULLIF(o.summary_subtotal, 0), 2)
ELSE 0
END as discount,
COALESCE(ot.tax, 0) as tax,
0 as tax_included,
0 as shipping,
om.customer,
om.customer_name,
om.status,
om.canceled,
COALESCE(tc.costeach, 0) as costeach
FROM temp_order_items oi
JOIN temp_order_meta om ON oi.order_id = om.order_id
LEFT JOIN _order o ON oi.order_id = o.order_id
LEFT JOIN temp_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid
LEFT JOIN temp_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid
LEFT JOIN temp_order_costs tc ON oi.order_id = tc.order_id AND oi.pid = tc.pid
WHERE oi.order_id IN (?)
`, [Array.from(skippedOrders)]);
// Check which products exist now
const skippedPids = [...new Set(skippedProdOrders.map(o => o.pid))];
const [existingProducts] = skippedPids.length > 0 ? await localConnection.query(
"SELECT pid FROM products WHERE pid IN (?)",
[skippedPids]
) : [[]];
const existingPids = new Set(existingProducts.map(p => p.pid));
// Filter orders that can now be imported
const validOrders = skippedProdOrders.filter(order => existingPids.has(order.pid));
const retryOrderItems = new Set(); // Track unique order items in retry
if (validOrders.length > 0) {
const placeholders = validOrders.map(() => `(${columnNames.map(() => "?").join(", ")})`).join(",");
const values = validOrders.map(o => columnNames.map(col => o[col] ?? null)).flat();
const result = await localConnection.query(`
INSERT INTO orders (${columnNames.join(", ")})
VALUES ${placeholders}
ON DUPLICATE KEY UPDATE
SKU = VALUES(SKU),
date = VALUES(date),
price = VALUES(price),
quantity = VALUES(quantity),
discount = VALUES(discount),
tax = VALUES(tax),
tax_included = VALUES(tax_included),
shipping = VALUES(shipping),
customer = VALUES(customer),
customer_name = VALUES(customer_name),
status = VALUES(status),
canceled = VALUES(canceled),
costeach = VALUES(costeach)
`, values);
const affectedRows = result[0].affectedRows;
const updates = Math.floor(affectedRows / 2);
const inserts = affectedRows - (updates * 2);
// Track unique order items
validOrders.forEach(order => {
retryOrderItems.add(`${order.order_number}-${order.pid}`);
});
outputProgress({
status: "running",
operation: "Orders import",
message: `Successfully imported ${retryOrderItems.size} previously skipped order items`,
});
// Update the main counters
recordsAdded += inserts;
recordsUpdated += updates;
importedCount += retryOrderItems.size;
}
} catch (error) {
console.warn('Warning: Failed to retry skipped orders:', error.message);
console.warn(`Skipped ${skippedOrders.size} orders due to ${missingProducts.size} missing products`);
}
}
// Clean up temporary tables after ALL processing is complete
await localConnection.query(` await localConnection.query(`
DROP TEMPORARY TABLE IF EXISTS temp_order_items; DROP TABLE IF EXISTS temp_order_items;
DROP TEMPORARY TABLE IF EXISTS temp_order_meta; DROP TABLE IF EXISTS temp_order_meta;
DROP TEMPORARY TABLE IF EXISTS temp_order_discounts; DROP TABLE IF EXISTS temp_order_discounts;
DROP TEMPORARY TABLE IF EXISTS temp_order_taxes; DROP TABLE IF EXISTS temp_order_taxes;
DROP TEMPORARY TABLE IF EXISTS temp_order_costs; DROP TABLE IF EXISTS temp_order_costs;
`); `);
// Only update sync status if we get here (no errors thrown) // Update sync status
await localConnection.query(` await localConnection.query(`
INSERT INTO sync_status (table_name, last_sync_timestamp) INSERT INTO sync_status (table_name, last_sync_timestamp)
VALUES ('orders', NOW()) VALUES ('orders', NOW())
ON DUPLICATE KEY UPDATE last_sync_timestamp = NOW() ON CONFLICT (table_name) DO UPDATE SET
last_sync_timestamp = NOW()
`); `);
return { return {

View File

@@ -1,5 +1,5 @@
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress'); const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress');
const BATCH_SIZE = 100; // Smaller batch size const BATCH_SIZE = 100; // Smaller batch size for better progress tracking
const MAX_RETRIES = 3; const MAX_RETRIES = 3;
const RETRY_DELAY = 5000; // 5 seconds const RETRY_DELAY = 5000; // 5 seconds
@@ -35,69 +35,253 @@ async function withRetry(operation, errorMessage) {
throw lastError; throw lastError;
} }
async function setupAndCleanupTempTables(connection, operation = 'setup') { async function setupTemporaryTables(connection) {
if (operation === 'setup') { await connection.query(`
await connection.query(` DROP TABLE IF EXISTS temp_products;
CREATE TEMP TABLE temp_products (
pid BIGINT NOT NULL, CREATE TEMP TABLE temp_products (
title VARCHAR(255), pid BIGINT NOT NULL,
description TEXT, title VARCHAR(255),
SKU VARCHAR(50), description TEXT,
stock_quantity INTEGER DEFAULT 0, SKU VARCHAR(50),
pending_qty INTEGER DEFAULT 0, stock_quantity INTEGER DEFAULT 0,
preorder_count INTEGER DEFAULT 0, pending_qty INTEGER DEFAULT 0,
notions_inv_count INTEGER DEFAULT 0, preorder_count INTEGER DEFAULT 0,
price DECIMAL(10,3) NOT NULL DEFAULT 0, notions_inv_count INTEGER DEFAULT 0,
regular_price DECIMAL(10,3) NOT NULL DEFAULT 0, price DECIMAL(10,3) NOT NULL DEFAULT 0,
cost_price DECIMAL(10,3), regular_price DECIMAL(10,3) NOT NULL DEFAULT 0,
vendor VARCHAR(100), cost_price DECIMAL(10,3),
vendor_reference VARCHAR(100), vendor VARCHAR(100),
notions_reference VARCHAR(100), vendor_reference VARCHAR(100),
brand VARCHAR(100), notions_reference VARCHAR(100),
line VARCHAR(100), brand VARCHAR(100),
subline VARCHAR(100), line VARCHAR(100),
artist VARCHAR(100), subline VARCHAR(100),
category_ids TEXT, artist VARCHAR(100),
created_at TIMESTAMP, category_ids TEXT,
first_received TIMESTAMP, created_at TIMESTAMP,
landing_cost_price DECIMAL(10,3), first_received TIMESTAMP,
barcode VARCHAR(50), landing_cost_price DECIMAL(10,3),
harmonized_tariff_code VARCHAR(50), barcode VARCHAR(50),
updated_at TIMESTAMP, harmonized_tariff_code VARCHAR(50),
visible BOOLEAN, updated_at TIMESTAMP,
replenishable BOOLEAN, visible BOOLEAN,
permalink VARCHAR(255), replenishable BOOLEAN,
moq DECIMAL(10,3), permalink VARCHAR(255),
rating DECIMAL(10,2), moq DECIMAL(10,3),
reviews INTEGER, rating DECIMAL(10,2),
weight DECIMAL(10,3), reviews INTEGER,
length DECIMAL(10,3), weight DECIMAL(10,3),
width DECIMAL(10,3), length DECIMAL(10,3),
height DECIMAL(10,3), width DECIMAL(10,3),
country_of_origin VARCHAR(100), height DECIMAL(10,3),
location VARCHAR(100), country_of_origin VARCHAR(100),
total_sold INTEGER, location VARCHAR(100),
baskets INTEGER, total_sold INTEGER,
notifies INTEGER, baskets INTEGER,
date_last_sold TIMESTAMP, notifies INTEGER,
needs_update BOOLEAN DEFAULT TRUE, date_last_sold TIMESTAMP,
PRIMARY KEY (pid) needs_update BOOLEAN DEFAULT TRUE,
) PRIMARY KEY (pid)
`); );
// Create index separately without IF NOT EXISTS CREATE INDEX idx_temp_products_needs_update ON temp_products (needs_update);
try { `);
await connection.query(` }
CREATE INDEX idx_needs_update ON temp_products (needs_update)
`); async function cleanupTemporaryTables(connection) {
} catch (err) { await connection.query('DROP TABLE IF EXISTS temp_products');
// Ignore error if index already exists }
if (!err.message.includes('already exists')) {
throw err; async function importMissingProducts(prodConnection, localConnection, missingPids) {
} if (!missingPids || missingPids.length === 0) {
return {
status: "complete",
recordsAdded: 0,
message: "No missing products to import"
};
}
try {
// Setup temporary tables
await setupTemporaryTables(localConnection);
// Get product data from production - Keep MySQL compatible
const [prodData] = await prodConnection.query(`
SELECT
p.pid,
p.description AS title,
p.notes AS description,
p.itemnumber AS SKU,
p.date_created,
p.datein AS first_received,
p.location,
p.upc AS barcode,
p.harmonized_tariff_code,
p.stamp AS updated_at,
CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible,
CASE
WHEN p.reorder < 0 THEN 0
WHEN (
(COALESCE(pls.date_sold, '0000-00-00') = '0000-00-00' OR pls.date_sold <= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR))
OR (p.datein = '0000-00-00 00:00:00' OR p.datein <= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL 5 YEAR))
OR (p.date_refill = '0000-00-00 00:00:00' OR p.date_refill <= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL 5 YEAR))
) THEN 0
ELSE 1
END AS replenishable,
COALESCE(si.available_local, 0) as stock_quantity,
0 as pending_qty,
COALESCE(ci.onpreorder, 0) as preorder_count,
COALESCE(pnb.inventory, 0) as notions_inv_count,
COALESCE(pcp.price_each, 0) as price,
COALESCE(p.sellingprice, 0) AS regular_price,
CASE
WHEN EXISTS (SELECT 1 FROM product_inventory WHERE pid = p.pid AND count > 0)
THEN (SELECT ROUND(AVG(costeach), 5) FROM product_inventory WHERE pid = p.pid AND count > 0)
ELSE (SELECT costeach FROM product_inventory WHERE pid = p.pid ORDER BY daterec DESC LIMIT 1)
END AS cost_price,
NULL as landing_cost_price,
s.companyname AS vendor,
CASE
WHEN s.companyname = 'Notions' THEN sid.notions_itemnumber
ELSE sid.supplier_itemnumber
END AS vendor_reference,
sid.notions_itemnumber AS notions_reference,
CONCAT('https://www.acherryontop.com/shop/product/', p.pid) AS permalink,
pc1.name AS brand,
pc2.name AS line,
pc3.name AS subline,
pc4.name AS artist,
COALESCE(CASE
WHEN sid.supplier_id = 92 THEN sid.notions_qty_per_unit
ELSE sid.supplier_qty_per_unit
END, sid.notions_qty_per_unit) AS moq,
p.rating,
p.rating_votes AS reviews,
p.weight,
p.length,
p.width,
p.height,
p.country_of_origin,
(SELECT COUNT(*) FROM mybasket mb WHERE mb.item = p.pid AND mb.qty > 0) AS baskets,
(SELECT COUNT(*) FROM product_notify pn WHERE pn.pid = p.pid) AS notifies,
p.totalsold AS total_sold,
pls.date_sold as date_last_sold,
GROUP_CONCAT(DISTINCT CASE
WHEN pc.cat_id IS NOT NULL
AND pc.type IN (10, 20, 11, 21, 12, 13)
AND pci.cat_id NOT IN (16, 17)
THEN pci.cat_id
END) as category_ids
FROM products p
LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0
LEFT JOIN current_inventory ci ON p.pid = ci.pid
LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid
LEFT JOIN product_current_prices pcp ON p.pid = pcp.pid AND pcp.active = 1
LEFT JOIN supplier_item_data sid ON p.pid = sid.pid
LEFT JOIN suppliers s ON sid.supplier_id = s.supplierid
LEFT JOIN product_category_index pci ON p.pid = pci.pid
LEFT JOIN product_categories pc ON pci.cat_id = pc.cat_id
LEFT JOIN product_categories pc1 ON p.company = pc1.cat_id
LEFT JOIN product_categories pc2 ON p.line = pc2.cat_id
LEFT JOIN product_categories pc3 ON p.subline = pc3.cat_id
LEFT JOIN product_categories pc4 ON p.artist = pc4.cat_id
LEFT JOIN product_last_sold pls ON p.pid = pls.pid
WHERE p.pid IN (?)
GROUP BY p.pid
`, [missingPids]);
if (!prodData || prodData.length === 0) {
return {
status: "complete",
recordsAdded: 0,
message: "No products found in production database"
};
} }
} else {
await connection.query('DROP TABLE IF EXISTS temp_products'); // Process in batches
let recordsAdded = 0;
for (let i = 0; i < prodData.length; i += BATCH_SIZE) {
const batch = prodData.slice(i, i + BATCH_SIZE);
const placeholders = batch.map((_, idx) => {
const base = idx * 41; // 41 columns
return `(${Array.from({ length: 41 }, (_, i) => `$${base + i + 1}`).join(', ')})`;
}).join(',');
const values = batch.flatMap(row => [
row.pid,
row.title,
row.description,
row.SKU,
row.stock_quantity,
row.pending_qty,
row.preorder_count,
row.notions_inv_count,
row.price,
row.regular_price,
row.cost_price,
row.vendor,
row.vendor_reference,
row.notions_reference,
row.brand,
row.line,
row.subline,
row.artist,
row.category_ids,
row.date_created,
row.first_received,
row.landing_cost_price,
row.barcode,
row.harmonized_tariff_code,
row.updated_at,
row.visible,
row.replenishable,
row.permalink,
row.moq,
row.rating,
row.reviews,
row.weight,
row.length,
row.width,
row.height,
row.country_of_origin,
row.location,
row.total_sold,
row.baskets,
row.notifies,
row.date_last_sold
]);
const [result] = await localConnection.query(`
WITH inserted_products AS (
INSERT INTO products (
pid, title, description, SKU, stock_quantity, pending_qty, preorder_count,
notions_inv_count, price, regular_price, cost_price, vendor, vendor_reference,
notions_reference, brand, line, subline, artist, category_ids, created_at,
first_received, landing_cost_price, barcode, harmonized_tariff_code,
updated_at, visible, replenishable, permalink, moq, rating, reviews,
weight, length, width, height, country_of_origin, location, total_sold,
baskets, notifies, date_last_sold
)
VALUES ${placeholders}
ON CONFLICT (pid) DO NOTHING
RETURNING pid
)
SELECT COUNT(*) as inserted FROM inserted_products
`, values);
recordsAdded += result.rows[0].inserted;
}
return {
status: "complete",
recordsAdded,
message: `Successfully imported ${recordsAdded} missing products`
};
} catch (error) {
console.error('Error importing missing products:', error);
throw error;
} }
} }
@@ -108,7 +292,7 @@ async function materializeCalculations(prodConnection, localConnection, incremen
message: "Fetching product data from production" message: "Fetching product data from production"
}); });
// Get all product data in a single optimized query // Get all product data in a single optimized query - Keep MySQL compatible
const [prodData] = await prodConnection.query(` const [prodData] = await prodConnection.query(`
SELECT SELECT
p.pid, p.pid,
@@ -238,11 +422,9 @@ async function materializeCalculations(prodConnection, localConnection, incremen
const batch = prodData.slice(i, Math.min(i + BATCH_SIZE, prodData.length)); const batch = prodData.slice(i, Math.min(i + BATCH_SIZE, prodData.length));
await withRetry(async () => { await withRetry(async () => {
// Build the parameterized query for PostgreSQL const placeholders = batch.map((_, idx) => {
const valueBlocks = batch.map((_, idx) => {
const offset = idx * 41; // 41 columns const offset = idx * 41; // 41 columns
const params = Array.from({ length: 41 }, (_, i) => `$${offset + i + 1}`); return `(${Array.from({ length: 41 }, (_, i) => `$${offset + i + 1}`).join(', ')})`;
return `(${params.join(', ')})`;
}).join(','); }).join(',');
const values = batch.flatMap(row => [ const values = batch.flatMap(row => [
@@ -299,7 +481,7 @@ async function materializeCalculations(prodConnection, localConnection, incremen
updated_at, visible, replenishable, permalink, moq, rating, reviews, updated_at, visible, replenishable, permalink, moq, rating, reviews,
weight, length, width, height, country_of_origin, location, total_sold, weight, length, width, height, country_of_origin, location, total_sold,
baskets, notifies, date_last_sold baskets, notifies, date_last_sold
) VALUES ${valueBlocks} ) VALUES ${placeholders}
ON CONFLICT (pid) DO UPDATE SET ON CONFLICT (pid) DO UPDATE SET
title = EXCLUDED.title, title = EXCLUDED.title,
description = EXCLUDED.description, description = EXCLUDED.description,
@@ -341,9 +523,6 @@ async function materializeCalculations(prodConnection, localConnection, incremen
baskets = EXCLUDED.baskets, baskets = EXCLUDED.baskets,
notifies = EXCLUDED.notifies, notifies = EXCLUDED.notifies,
date_last_sold = EXCLUDED.date_last_sold date_last_sold = EXCLUDED.date_last_sold
RETURNING
CASE WHEN xmax::text::int > 0 THEN 0 ELSE 1 END AS inserted,
CASE WHEN xmax::text::int > 0 THEN 1 ELSE 0 END AS updated
`, values); `, values);
}, `Error inserting batch ${i} to ${i + batch.length}`); }, `Error inserting batch ${i} to ${i + batch.length}`);
@@ -376,13 +555,13 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
const [syncResult] = await localConnection.query( const [syncResult] = await localConnection.query(
"SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'products'" "SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'products'"
); );
if (syncResult.length > 0) { if (syncResult.rows.length > 0) {
lastSyncTime = syncResult[0].last_sync_timestamp; lastSyncTime = syncResult.rows[0].last_sync_timestamp;
} }
} }
// Setup temporary tables // Setup temporary tables
await setupAndCleanupTempTables(localConnection, 'setup'); await setupTemporaryTables(localConnection);
// Materialize calculations into temp table // Materialize calculations into temp table
await materializeCalculations(prodConnection, localConnection, incrementalUpdate, lastSyncTime); await materializeCalculations(prodConnection, localConnection, incrementalUpdate, lastSyncTime);
@@ -442,11 +621,9 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
const batch = products.slice(i, Math.min(i + BATCH_SIZE, products.length)); const batch = products.slice(i, Math.min(i + BATCH_SIZE, products.length));
await withRetry(async () => { await withRetry(async () => {
// Build the parameterized query for PostgreSQL const placeholders = batch.map((_, idx) => {
const valueBlocks = batch.map((_, idx) => {
const offset = idx * 41; // 41 columns const offset = idx * 41; // 41 columns
const params = Array.from({ length: 41 }, (_, i) => `$${offset + i + 1}`); return `(${Array.from({ length: 41 }, (_, i) => `$${offset + i + 1}`).join(', ')})`;
return `(${params.join(', ')})`;
}).join(','); }).join(',');
const values = batch.flatMap(row => [ const values = batch.flatMap(row => [
@@ -494,70 +671,69 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
]); ]);
const result = await localConnection.query(` const result = await localConnection.query(`
INSERT INTO products ( WITH upserted_products AS (
pid, title, description, SKU, stock_quantity, pending_qty, preorder_count, INSERT INTO products (
notions_inv_count, price, regular_price, cost_price, vendor, vendor_reference, pid, title, description, SKU, stock_quantity, pending_qty, preorder_count,
notions_reference, brand, line, subline, artist, category_ids, created_at, notions_inv_count, price, regular_price, cost_price, vendor, vendor_reference,
first_received, landing_cost_price, barcode, harmonized_tariff_code, notions_reference, brand, line, subline, artist, category_ids, created_at,
updated_at, visible, replenishable, permalink, moq, rating, reviews, first_received, landing_cost_price, barcode, harmonized_tariff_code,
weight, length, width, height, country_of_origin, location, total_sold, updated_at, visible, replenishable, permalink, moq, rating, reviews,
baskets, notifies, date_last_sold weight, length, width, height, country_of_origin, location, total_sold,
) VALUES ${valueBlocks} baskets, notifies, date_last_sold
ON CONFLICT (pid) DO UPDATE SET ) VALUES ${placeholders}
title = EXCLUDED.title, ON CONFLICT (pid) DO UPDATE SET
description = EXCLUDED.description, title = EXCLUDED.title,
SKU = EXCLUDED.SKU, description = EXCLUDED.description,
stock_quantity = EXCLUDED.stock_quantity, SKU = EXCLUDED.SKU,
pending_qty = EXCLUDED.pending_qty, stock_quantity = EXCLUDED.stock_quantity,
preorder_count = EXCLUDED.preorder_count, pending_qty = EXCLUDED.pending_qty,
notions_inv_count = EXCLUDED.notions_inv_count, preorder_count = EXCLUDED.preorder_count,
price = EXCLUDED.price, notions_inv_count = EXCLUDED.notions_inv_count,
regular_price = EXCLUDED.regular_price, price = EXCLUDED.price,
cost_price = EXCLUDED.cost_price, regular_price = EXCLUDED.regular_price,
vendor = EXCLUDED.vendor, cost_price = EXCLUDED.cost_price,
vendor_reference = EXCLUDED.vendor_reference, vendor = EXCLUDED.vendor,
notions_reference = EXCLUDED.notions_reference, vendor_reference = EXCLUDED.vendor_reference,
brand = EXCLUDED.brand, notions_reference = EXCLUDED.notions_reference,
line = EXCLUDED.line, brand = EXCLUDED.brand,
subline = EXCLUDED.subline, line = EXCLUDED.line,
artist = EXCLUDED.artist, subline = EXCLUDED.subline,
category_ids = EXCLUDED.category_ids, artist = EXCLUDED.artist,
created_at = EXCLUDED.created_at, category_ids = EXCLUDED.category_ids,
first_received = EXCLUDED.first_received, created_at = EXCLUDED.created_at,
landing_cost_price = EXCLUDED.landing_cost_price, first_received = EXCLUDED.first_received,
barcode = EXCLUDED.barcode, landing_cost_price = EXCLUDED.landing_cost_price,
harmonized_tariff_code = EXCLUDED.harmonized_tariff_code, barcode = EXCLUDED.barcode,
updated_at = EXCLUDED.updated_at, harmonized_tariff_code = EXCLUDED.harmonized_tariff_code,
visible = EXCLUDED.visible, updated_at = EXCLUDED.updated_at,
replenishable = EXCLUDED.replenishable, visible = EXCLUDED.visible,
permalink = EXCLUDED.permalink, replenishable = EXCLUDED.replenishable,
moq = EXCLUDED.moq, permalink = EXCLUDED.permalink,
rating = EXCLUDED.rating, moq = EXCLUDED.moq,
reviews = EXCLUDED.reviews, rating = EXCLUDED.rating,
weight = EXCLUDED.weight, reviews = EXCLUDED.reviews,
length = EXCLUDED.length, weight = EXCLUDED.weight,
width = EXCLUDED.width, length = EXCLUDED.length,
height = EXCLUDED.height, width = EXCLUDED.width,
country_of_origin = EXCLUDED.country_of_origin, height = EXCLUDED.height,
location = EXCLUDED.location, country_of_origin = EXCLUDED.country_of_origin,
total_sold = EXCLUDED.total_sold, location = EXCLUDED.location,
baskets = EXCLUDED.baskets, total_sold = EXCLUDED.total_sold,
notifies = EXCLUDED.notifies, baskets = EXCLUDED.baskets,
date_last_sold = EXCLUDED.date_last_sold notifies = EXCLUDED.notifies,
RETURNING date_last_sold = EXCLUDED.date_last_sold
CASE WHEN xmax::text::int > 0 THEN 0 ELSE 1 END AS inserted, RETURNING
CASE WHEN xmax::text::int > 0 THEN 1 ELSE 0 END AS updated CASE WHEN xmax::text::int > 0 THEN 0 ELSE 1 END AS inserted,
CASE WHEN xmax::text::int > 0 THEN 1 ELSE 0 END AS updated
)
SELECT
COUNT(*) FILTER (WHERE inserted = 1) as inserted,
COUNT(*) FILTER (WHERE updated = 1) as updated
FROM upserted_products
`, values); `, values);
// Count inserted and updated records recordsAdded += result.rows[0].inserted;
const stats = result.rows.reduce((acc, row) => { recordsUpdated += result.rows[0].updated;
if (row.inserted) acc.inserted++;
if (row.updated) acc.updated++;
return acc;
}, { inserted: 0, updated: 0 });
recordsAdded += stats.inserted;
recordsUpdated += stats.updated;
}, `Error inserting batch ${i} to ${i + batch.length}`); }, `Error inserting batch ${i} to ${i + batch.length}`);
outputProgress({ outputProgress({
@@ -575,13 +751,13 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
// Update sync status // Update sync status
await localConnection.query(` await localConnection.query(`
INSERT INTO sync_status (table_name, last_sync_timestamp) INSERT INTO sync_status (table_name, last_sync_timestamp)
VALUES ('products', CURRENT_TIMESTAMP) VALUES ('products', NOW())
ON CONFLICT (table_name) DO UPDATE SET ON CONFLICT (table_name) DO UPDATE SET
last_sync_timestamp = CURRENT_TIMESTAMP last_sync_timestamp = NOW()
`); `);
// Cleanup temporary tables // Cleanup temporary tables
await setupAndCleanupTempTables(localConnection, 'cleanup'); await cleanupTemporaryTables(localConnection);
return { return {
status: "complete", status: "complete",
@@ -593,7 +769,7 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
console.error('Error in importProducts:', error); console.error('Error in importProducts:', error);
// Attempt cleanup on error // Attempt cleanup on error
try { try {
await setupAndCleanupTempTables(localConnection, 'cleanup'); await cleanupTemporaryTables(localConnection);
} catch (cleanupError) { } catch (cleanupError) {
console.error('Error during cleanup:', cleanupError); console.error('Error during cleanup:', cleanupError);
} }
@@ -601,199 +777,10 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
} }
} }
async function importMissingProducts(prodConnection, localConnection, missingPids) {
if (!missingPids || missingPids.length === 0) {
return {
status: "complete",
recordsAdded: 0,
message: "No missing products to import"
};
}
try {
// Setup temporary tables
await setupAndCleanupTempTables(localConnection, 'setup');
// Get product data from production
const [prodData] = await prodConnection.query(`
SELECT
p.pid,
p.description AS title,
p.notes AS description,
p.itemnumber AS SKU,
p.date_created,
p.datein AS first_received,
p.location,
p.upc AS barcode,
p.harmonized_tariff_code,
p.stamp AS updated_at,
CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible,
CASE
WHEN p.reorder < 0 THEN 0
WHEN (
(COALESCE(pls.date_sold, '0000-00-00') = '0000-00-00' OR pls.date_sold <= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR))
OR (p.datein = '0000-00-00 00:00:00' OR p.datein <= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL 5 YEAR))
OR (p.date_refill = '0000-00-00 00:00:00' OR p.date_refill <= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL 5 YEAR))
) THEN 0
ELSE 1
END AS replenishable,
COALESCE(si.available_local, 0) as stock_quantity,
0 as pending_qty,
COALESCE(ci.onpreorder, 0) as preorder_count,
COALESCE(pnb.inventory, 0) as notions_inv_count,
COALESCE(pcp.price_each, 0) as price,
COALESCE(p.sellingprice, 0) AS regular_price,
CASE
WHEN EXISTS (SELECT 1 FROM product_inventory WHERE pid = p.pid AND count > 0)
THEN (SELECT ROUND(AVG(costeach), 5) FROM product_inventory WHERE pid = p.pid AND count > 0)
ELSE (SELECT costeach FROM product_inventory WHERE pid = p.pid ORDER BY daterec DESC LIMIT 1)
END AS cost_price,
NULL as landing_cost_price,
s.companyname AS vendor,
CASE
WHEN s.companyname = 'Notions' THEN sid.notions_itemnumber
ELSE sid.supplier_itemnumber
END AS vendor_reference,
sid.notions_itemnumber AS notions_reference,
CONCAT('https://www.acherryontop.com/shop/product/', p.pid) AS permalink,
pc1.name AS brand,
pc2.name AS line,
pc3.name AS subline,
pc4.name AS artist,
COALESCE(CASE
WHEN sid.supplier_id = 92 THEN sid.notions_qty_per_unit
ELSE sid.supplier_qty_per_unit
END, sid.notions_qty_per_unit) AS moq,
p.rating,
p.rating_votes AS reviews,
p.weight,
p.length,
p.width,
p.height,
p.country_of_origin,
(SELECT COUNT(*) FROM mybasket mb WHERE mb.item = p.pid AND mb.qty > 0) AS baskets,
(SELECT COUNT(*) FROM product_notify pn WHERE pn.pid = p.pid) AS notifies,
p.totalsold AS total_sold,
pls.date_sold as date_last_sold,
GROUP_CONCAT(DISTINCT CASE
WHEN pc.cat_id IS NOT NULL
AND pc.type IN (10, 20, 11, 21, 12, 13)
AND pci.cat_id NOT IN (16, 17)
THEN pci.cat_id
END) as category_ids
FROM products p
LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0
LEFT JOIN current_inventory ci ON p.pid = ci.pid
LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid
LEFT JOIN product_current_prices pcp ON p.pid = pcp.pid AND pcp.active = 1
LEFT JOIN supplier_item_data sid ON p.pid = sid.pid
LEFT JOIN suppliers s ON sid.supplier_id = s.supplierid
LEFT JOIN product_category_index pci ON p.pid = pci.pid
LEFT JOIN product_categories pc ON pci.cat_id = pc.cat_id
LEFT JOIN product_categories pc1 ON p.company = pc1.cat_id
LEFT JOIN product_categories pc2 ON p.line = pc2.cat_id
LEFT JOIN product_categories pc3 ON p.subline = pc3.cat_id
LEFT JOIN product_categories pc4 ON p.artist = pc4.cat_id
LEFT JOIN product_last_sold pls ON p.pid = pls.pid
WHERE p.pid IN (?)
GROUP BY p.pid
`, [missingPids]);
if (!prodData || prodData.length === 0) {
return {
status: "complete",
recordsAdded: 0,
message: "No products found in production database"
};
}
// Process in batches
let recordsAdded = 0;
for (let i = 0; i < prodData.length; i += 1000) {
const batch = prodData.slice(i, i + 1000);
// Build the parameterized query for PostgreSQL
const valueBlocks = batch.map((_, idx) => {
const offset = idx * 41; // 41 columns
const params = Array.from({ length: 41 }, (_, i) => `$${offset + i + 1}`);
return `(${params.join(', ')})`;
}).join(',');
const values = batch.flatMap(row => [
row.pid,
row.title,
row.description,
row.SKU,
row.stock_quantity,
row.pending_qty,
row.preorder_count,
row.notions_inv_count,
row.price,
row.regular_price,
row.cost_price,
row.vendor,
row.vendor_reference,
row.notions_reference,
row.brand,
row.line,
row.subline,
row.artist,
row.category_ids,
row.date_created,
row.first_received,
row.landing_cost_price,
row.barcode,
row.harmonized_tariff_code,
row.updated_at,
row.visible,
row.replenishable,
row.permalink,
row.moq,
row.rating,
row.reviews,
row.weight,
row.length,
row.width,
row.height,
row.country_of_origin,
row.location,
row.total_sold,
row.baskets,
row.notifies,
row.date_last_sold
]);
const result = await localConnection.query(`
INSERT INTO products (
pid, title, description, SKU, stock_quantity, pending_qty, preorder_count,
notions_inv_count, price, regular_price, cost_price, vendor, vendor_reference,
notions_reference, brand, line, subline, artist, category_ids, created_at,
first_received, landing_cost_price, barcode, harmonized_tariff_code,
updated_at, visible, replenishable, permalink, moq, rating, reviews,
weight, length, width, height, country_of_origin, location, total_sold,
baskets, notifies, date_last_sold
) VALUES ${valueBlocks}
ON CONFLICT (pid) DO NOTHING
RETURNING pid
`, values);
recordsAdded += result.rows.length;
}
return {
status: "complete",
recordsAdded,
message: `Successfully imported ${recordsAdded} missing products`
};
} catch (error) {
console.error('Error importing missing products:', error);
throw error;
}
}
module.exports = { module.exports = {
importProducts, importProducts,
importMissingProducts, importMissingProducts,
setupAndCleanupTempTables, setupTemporaryTables,
cleanupTemporaryTables,
materializeCalculations materializeCalculations
}; };

View File

@@ -10,22 +10,38 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
const [syncInfo] = await localConnection.query( const [syncInfo] = await localConnection.query(
"SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'purchase_orders'" "SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'purchase_orders'"
); );
const lastSyncTime = syncInfo?.[0]?.last_sync_timestamp || '1970-01-01'; const lastSyncTime = syncInfo?.rows?.[0]?.last_sync_timestamp || '1970-01-01';
console.log('Purchase Orders: Using last sync time:', lastSyncTime); console.log('Purchase Orders: Using last sync time:', lastSyncTime);
// Insert temporary table creation query for purchase orders // Create temporary tables with PostgreSQL syntax
await localConnection.query(` await localConnection.query(`
CREATE TABLE IF NOT EXISTS temp_purchase_orders ( DROP TABLE IF EXISTS temp_purchase_orders;
po_id INT UNSIGNED NOT NULL, DROP TABLE IF EXISTS temp_po_receivings;
pid INT UNSIGNED NOT NULL,
CREATE TEMP TABLE temp_purchase_orders (
po_id INTEGER NOT NULL,
pid INTEGER NOT NULL,
vendor VARCHAR(255), vendor VARCHAR(255),
date DATE, date DATE,
expected_date DATE, expected_date DATE,
status INT, status INTEGER,
notes TEXT, notes TEXT,
PRIMARY KEY (po_id, pid) PRIMARY KEY (po_id, pid)
) ENGINE=InnoDB DEFAULT CHARSET=utf8; );
CREATE TEMP TABLE temp_po_receivings (
po_id INTEGER,
pid INTEGER NOT NULL,
receiving_id INTEGER NOT NULL,
qty_each INTEGER,
cost_each DECIMAL(10,3),
received_date TIMESTAMP,
received_by INTEGER,
received_by_name VARCHAR(255),
is_alt_po INTEGER,
PRIMARY KEY (receiving_id, pid)
);
`); `);
outputProgress({ outputProgress({
@@ -33,8 +49,8 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
status: "running", status: "running",
}); });
// Get column names first // Get column names - Keep MySQL compatible for production
const [columns] = await localConnection.query(` const [columns] = await prodConnection.query(`
SELECT COLUMN_NAME SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = 'purchase_orders' WHERE TABLE_NAME = 'purchase_orders'
@@ -60,7 +76,7 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime]
: []; : [];
// First get all relevant PO IDs with basic info // First get all relevant PO IDs with basic info - Keep MySQL compatible for production
const [[{ total }]] = await prodConnection.query(` const [[{ total }]] = await prodConnection.query(`
SELECT COUNT(*) as total SELECT COUNT(*) as total
FROM ( FROM (
@@ -99,6 +115,7 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
console.log('Purchase Orders: Found changes:', total); console.log('Purchase Orders: Found changes:', total);
// Get PO list - Keep MySQL compatible for production
const [poList] = await prodConnection.query(` const [poList] = await prodConnection.query(`
SELECT DISTINCT SELECT DISTINCT
COALESCE(p.po_id, r.receiving_id) as po_id, COALESCE(p.po_id, r.receiving_id) as po_id,
@@ -185,7 +202,7 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
const batch = poList.slice(i, Math.min(i + BATCH_SIZE, poList.length)); const batch = poList.slice(i, Math.min(i + BATCH_SIZE, poList.length));
const poIds = batch.map(po => po.po_id); const poIds = batch.map(po => po.po_id);
// Get all products for these POs in one query // Get all products for these POs in one query - Keep MySQL compatible for production
const [poProducts] = await prodConnection.query(` const [poProducts] = await prodConnection.query(`
SELECT SELECT
pop.po_id, pop.po_id,
@@ -207,7 +224,7 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
const productPids = [...new Set(productBatch.map(p => p.pid))]; const productPids = [...new Set(productBatch.map(p => p.pid))];
const batchPoIds = [...new Set(productBatch.map(p => p.po_id))]; const batchPoIds = [...new Set(productBatch.map(p => p.po_id))];
// Get receivings for this batch with employee names // Get receivings for this batch with employee names - Keep MySQL compatible for production
const [receivings] = await prodConnection.query(` const [receivings] = await prodConnection.query(`
SELECT SELECT
r.po_id, r.po_id,
@@ -232,315 +249,176 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
ORDER BY r.po_id, rp.pid, rp.received_date ORDER BY r.po_id, rp.pid, rp.received_date
`, [batchPoIds, productPids]); `, [batchPoIds, productPids]);
// Create maps for this sub-batch // Insert receivings into temp table
const poProductMap = new Map(); if (receivings.length > 0) {
productBatch.forEach(product => { const placeholders = receivings.map((_, idx) => {
const key = `${product.po_id}-${product.pid}`; const base = idx * 9;
poProductMap.set(key, product); return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9})`;
}); }).join(',');
const receivingMap = new Map(); const values = receivings.flatMap(r => [
const altReceivingMap = new Map(); r.po_id,
const noPOReceivingMap = new Map(); r.pid,
r.receiving_id,
receivings.forEach(receiving => { r.qty_each,
const key = `${receiving.po_id}-${receiving.pid}`; r.cost_each,
if (receiving.is_alt_po === 2) { r.received_date,
// No PO r.received_by,
if (!noPOReceivingMap.has(receiving.pid)) { r.received_by_name,
noPOReceivingMap.set(receiving.pid, []); r.is_alt_po
} ]);
noPOReceivingMap.get(receiving.pid).push(receiving);
} else if (receiving.is_alt_po === 1) {
// Different PO
if (!altReceivingMap.has(receiving.pid)) {
altReceivingMap.set(receiving.pid, []);
}
altReceivingMap.get(receiving.pid).push(receiving);
} else {
// Original PO
if (!receivingMap.has(key)) {
receivingMap.set(key, []);
}
receivingMap.get(key).push(receiving);
}
});
// Verify PIDs exist await localConnection.query(`
const [existingPids] = await localConnection.query( INSERT INTO temp_po_receivings (
'SELECT pid FROM products WHERE pid IN (?)', po_id, pid, receiving_id, qty_each, cost_each, received_date,
[productPids] received_by, received_by_name, is_alt_po
); )
const validPids = new Set(existingPids.map(p => p.pid)); VALUES ${placeholders}
ON CONFLICT (receiving_id, pid) DO UPDATE SET
po_id = EXCLUDED.po_id,
qty_each = EXCLUDED.qty_each,
cost_each = EXCLUDED.cost_each,
received_date = EXCLUDED.received_date,
received_by = EXCLUDED.received_by,
received_by_name = EXCLUDED.received_by_name,
is_alt_po = EXCLUDED.is_alt_po
`, values);
}
// First check which PO lines already exist and get their current values // Process each PO product
const poLines = Array.from(poProductMap.values()) for (const product of productBatch) {
.filter(p => validPids.has(p.pid)) const po = batch.find(p => p.po_id === product.po_id);
.map(p => [p.po_id, p.pid]); if (!po) continue;
const [existingPOs] = await localConnection.query( // Insert into temp_purchase_orders
`SELECT ${columnNames.join(',')} FROM purchase_orders WHERE (po_id, pid) IN (${poLines.map(() => "(?,?)").join(",")})`, const placeholders = `($1, $2, $3, $4, $5, $6, $7)`;
poLines.flat() const values = [
); product.po_id,
const existingPOMap = new Map( product.pid,
existingPOs.map(po => [`${po.po_id}-${po.pid}`, po]) po.vendor,
); po.date,
po.expected_date,
po.status,
po.notes || po.long_note
];
// Split into inserts and updates await localConnection.query(`
const insertsAndUpdates = { inserts: [], updates: [] }; INSERT INTO temp_purchase_orders (
let batchProcessed = 0; po_id, pid, vendor, date, expected_date, status, notes
)
VALUES ${placeholders}
ON CONFLICT (po_id, pid) DO UPDATE SET
vendor = EXCLUDED.vendor,
date = EXCLUDED.date,
expected_date = EXCLUDED.expected_date,
status = EXCLUDED.status,
notes = EXCLUDED.notes
`, values);
for (const po of batch) { processed++;
const poProducts = Array.from(poProductMap.values())
.filter(p => p.po_id === po.po_id && validPids.has(p.pid)); // Update progress periodically
if (Date.now() - lastProgressUpdate > PROGRESS_INTERVAL) {
for (const product of poProducts) { outputProgress({
const key = `${po.po_id}-${product.pid}`; status: "running",
const receivingHistory = receivingMap.get(key) || []; operation: "Purchase orders import",
const altReceivingHistory = altReceivingMap.get(product.pid) || []; message: `Processing purchase orders: ${processed} of ${totalItems}`,
const noPOReceivingHistory = noPOReceivingMap.get(product.pid) || []; current: processed,
total: totalItems,
// Combine all receivings and sort by date elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
const allReceivings = [ remaining: estimateRemaining(startTime, processed, totalItems),
...receivingHistory.map(r => ({ ...r, type: 'original' })), rate: calculateRate(startTime, processed)
...altReceivingHistory.map(r => ({ ...r, type: 'alternate' })),
...noPOReceivingHistory.map(r => ({ ...r, type: 'no_po' }))
].sort((a, b) => new Date(a.received_date || '9999-12-31') - new Date(b.received_date || '9999-12-31'));
// Split receivings into original PO and others
const originalPOReceivings = allReceivings.filter(r => r.type === 'original');
const otherReceivings = allReceivings.filter(r => r.type !== 'original');
// Track FIFO fulfillment
let remainingToFulfill = product.ordered;
const fulfillmentTracking = [];
let totalReceived = 0;
let actualCost = null; // Will store the cost of the first receiving that fulfills this PO
let firstFulfillmentReceiving = null;
let lastFulfillmentReceiving = null;
for (const receiving of allReceivings) {
// Convert quantities to base units using supplier data
const baseQtyReceived = receiving.qty_each * (
receiving.type === 'original' ? 1 :
Math.max(1, product.supplier_qty_per_unit || 1)
);
const qtyToApply = Math.min(remainingToFulfill, baseQtyReceived);
if (qtyToApply > 0) {
// If this is the first receiving being applied, use its cost
if (actualCost === null && receiving.cost_each > 0) {
actualCost = receiving.cost_each;
firstFulfillmentReceiving = receiving;
}
lastFulfillmentReceiving = receiving;
fulfillmentTracking.push({
receiving_id: receiving.receiving_id,
qty_applied: qtyToApply,
qty_total: baseQtyReceived,
cost: receiving.cost_each || actualCost || product.cost_each,
date: receiving.received_date,
received_by: receiving.received_by,
received_by_name: receiving.received_by_name || 'Unknown',
type: receiving.type,
remaining_qty: baseQtyReceived - qtyToApply
});
remainingToFulfill -= qtyToApply;
} else {
// Track excess receivings
fulfillmentTracking.push({
receiving_id: receiving.receiving_id,
qty_applied: 0,
qty_total: baseQtyReceived,
cost: receiving.cost_each || actualCost || product.cost_each,
date: receiving.received_date,
received_by: receiving.received_by,
received_by_name: receiving.received_by_name || 'Unknown',
type: receiving.type,
is_excess: true
});
}
totalReceived += baseQtyReceived;
}
const receiving_status = !totalReceived ? 1 : // created
remainingToFulfill > 0 ? 30 : // partial
40; // full
function formatDate(dateStr) {
if (!dateStr) return null;
if (dateStr === '0000-00-00' || dateStr === '0000-00-00 00:00:00') return null;
if (typeof dateStr === 'string' && !dateStr.match(/^\d{4}-\d{2}-\d{2}/)) return null;
try {
const date = new Date(dateStr);
if (isNaN(date.getTime())) return null;
if (date.getFullYear() < 1900 || date.getFullYear() > 2100) return null;
return date.toISOString().split('T')[0];
} catch (e) {
return null;
}
}
const rowValues = columnNames.map(col => {
switch (col) {
case 'po_id': return po.po_id;
case 'vendor': return po.vendor;
case 'date': return formatDate(po.date);
case 'expected_date': return formatDate(po.expected_date);
case 'pid': return product.pid;
case 'sku': return product.sku;
case 'name': return product.name;
case 'cost_price': return actualCost || product.cost_each;
case 'po_cost_price': return product.cost_each;
case 'status': return po.status;
case 'notes': return po.notes;
case 'long_note': return po.long_note;
case 'ordered': return product.ordered;
case 'received': return totalReceived;
case 'unfulfilled': return remainingToFulfill;
case 'excess_received': return Math.max(0, totalReceived - product.ordered);
case 'received_date': return formatDate(firstFulfillmentReceiving?.received_date);
case 'last_received_date': return formatDate(lastFulfillmentReceiving?.received_date);
case 'received_by': return firstFulfillmentReceiving?.received_by_name || null;
case 'receiving_status': return receiving_status;
case 'receiving_history': return JSON.stringify({
fulfillment: fulfillmentTracking,
ordered_qty: product.ordered,
total_received: totalReceived,
remaining_unfulfilled: remainingToFulfill,
excess_received: Math.max(0, totalReceived - product.ordered),
po_cost: product.cost_each,
actual_cost: actualCost || product.cost_each
});
default: return null;
}
}); });
lastProgressUpdate = Date.now();
if (existingPOMap.has(key)) {
const existing = existingPOMap.get(key);
// Check if any values are different
const hasChanges = columnNames.some(col => {
const newVal = rowValues[columnNames.indexOf(col)];
const oldVal = existing[col] ?? null;
// Special handling for numbers to avoid type coercion issues
if (typeof newVal === 'number' && typeof oldVal === 'number') {
return Math.abs(newVal - oldVal) > 0.00001; // Allow for tiny floating point differences
}
// Special handling for receiving_history - parse and compare
if (col === 'receiving_history') {
const newHistory = JSON.parse(newVal || '{}');
const oldHistory = JSON.parse(oldVal || '{}');
return JSON.stringify(newHistory) !== JSON.stringify(oldHistory);
}
return newVal !== oldVal;
});
if (hasChanges) {
insertsAndUpdates.updates.push({
po_id: po.po_id,
pid: product.pid,
values: rowValues
});
}
} else {
insertsAndUpdates.inserts.push({
po_id: po.po_id,
pid: product.pid,
values: rowValues
});
}
batchProcessed++;
} }
} }
// Handle inserts
if (insertsAndUpdates.inserts.length > 0) {
const insertPlaceholders = insertsAndUpdates.inserts
.map(() => `(${Array(columnNames.length).fill("?").join(",")})`)
.join(",");
const insertResult = await localConnection.query(`
INSERT INTO purchase_orders (${columnNames.join(",")})
VALUES ${insertPlaceholders}
`, insertsAndUpdates.inserts.map(i => i.values).flat());
const affectedRows = insertResult[0].affectedRows;
// For an upsert, MySQL counts rows twice for updates
// So if affectedRows is odd, we have (updates * 2 + inserts)
const updates = Math.floor(affectedRows / 2);
const inserts = affectedRows - (updates * 2);
recordsAdded += inserts;
recordsUpdated += Math.floor(updates); // Ensure we never have fractional updates
processed += batchProcessed;
}
// Handle updates - now we know these actually have changes
if (insertsAndUpdates.updates.length > 0) {
const updatePlaceholders = insertsAndUpdates.updates
.map(() => `(${Array(columnNames.length).fill("?").join(",")})`)
.join(",");
const updateResult = await localConnection.query(`
INSERT INTO purchase_orders (${columnNames.join(",")})
VALUES ${updatePlaceholders}
ON DUPLICATE KEY UPDATE ${columnNames
.filter((col) => col !== "po_id" && col !== "pid")
.map((col) => `${col} = VALUES(${col})`)
.join(",")};
`, insertsAndUpdates.updates.map(u => u.values).flat());
const affectedRows = updateResult[0].affectedRows;
// For an upsert, MySQL counts rows twice for updates
// So if affectedRows is odd, we have (updates * 2 + inserts)
const updates = Math.floor(affectedRows / 2);
const inserts = affectedRows - (updates * 2);
recordsUpdated += Math.floor(updates); // Ensure we never have fractional updates
processed += batchProcessed;
}
// Update progress based on time interval
const now = Date.now();
if (now - lastProgressUpdate >= PROGRESS_INTERVAL || processed === totalItems) {
outputProgress({
status: "running",
operation: "Purchase orders import",
current: processed,
total: totalItems,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, processed, totalItems),
rate: calculateRate(startTime, processed)
});
lastProgressUpdate = now;
}
} }
} }
// Only update sync status if we get here (no errors thrown) // Insert final data into purchase_orders table
const [result] = await localConnection.query(`
WITH inserted_pos AS (
INSERT INTO purchase_orders (
po_id, pid, vendor, date, expected_date, status, notes,
received_qty, received_cost, last_received_date, last_received_by,
alt_po_received_qty, alt_po_last_received_date,
no_po_received_qty, no_po_last_received_date
)
SELECT
po.po_id,
po.pid,
po.vendor,
po.date,
po.expected_date,
po.status,
po.notes,
COALESCE(SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END), 0) as received_qty,
COALESCE(AVG(CASE WHEN r.is_alt_po = 0 THEN r.cost_each END), 0) as received_cost,
MAX(CASE WHEN r.is_alt_po = 0 THEN r.received_date END) as last_received_date,
MAX(CASE WHEN r.is_alt_po = 0 THEN r.received_by_name END) as last_received_by,
COALESCE(SUM(CASE WHEN r.is_alt_po = 1 THEN r.qty_each END), 0) as alt_po_received_qty,
MAX(CASE WHEN r.is_alt_po = 1 THEN r.received_date END) as alt_po_last_received_date,
COALESCE(SUM(CASE WHEN r.is_alt_po = 2 THEN r.qty_each END), 0) as no_po_received_qty,
MAX(CASE WHEN r.is_alt_po = 2 THEN r.received_date END) as no_po_last_received_date
FROM temp_purchase_orders po
LEFT JOIN temp_po_receivings r ON po.pid = r.pid
GROUP BY po.po_id, po.pid, po.vendor, po.date, po.expected_date, po.status, po.notes
ON CONFLICT (po_id, pid) DO UPDATE SET
vendor = EXCLUDED.vendor,
date = EXCLUDED.date,
expected_date = EXCLUDED.expected_date,
status = EXCLUDED.status,
notes = EXCLUDED.notes,
received_qty = EXCLUDED.received_qty,
received_cost = EXCLUDED.received_cost,
last_received_date = EXCLUDED.last_received_date,
last_received_by = EXCLUDED.last_received_by,
alt_po_received_qty = EXCLUDED.alt_po_received_qty,
alt_po_last_received_date = EXCLUDED.alt_po_last_received_date,
no_po_received_qty = EXCLUDED.no_po_received_qty,
no_po_last_received_date = EXCLUDED.no_po_last_received_date
RETURNING xmax
)
SELECT
COUNT(*) FILTER (WHERE xmax = 0) as inserted,
COUNT(*) FILTER (WHERE xmax <> 0) as updated
FROM inserted_pos
`);
recordsAdded = result.rows[0].inserted;
recordsUpdated = result.rows[0].updated;
// Update sync status
await localConnection.query(` await localConnection.query(`
INSERT INTO sync_status (table_name, last_sync_timestamp) INSERT INTO sync_status (table_name, last_sync_timestamp)
VALUES ('purchase_orders', NOW()) VALUES ('purchase_orders', NOW())
ON DUPLICATE KEY UPDATE ON CONFLICT (table_name) DO UPDATE SET
last_sync_timestamp = NOW(), last_sync_timestamp = NOW()
last_sync_id = LAST_INSERT_ID(last_sync_id) `);
// Clean up temporary tables
await localConnection.query(`
DROP TABLE IF EXISTS temp_purchase_orders;
DROP TABLE IF EXISTS temp_po_receivings;
`); `);
return { return {
status: "complete", status: "complete",
totalImported: totalItems, recordsAdded,
recordsAdded: recordsAdded || 0, recordsUpdated,
recordsUpdated: recordsUpdated || 0, totalRecords: processed
incrementalUpdate,
lastSyncTime
}; };
} catch (error) { } catch (error) {
outputProgress({ console.error("Error during purchase orders import:", error);
operation: `${incrementalUpdate ? 'Incremental' : 'Full'} purchase orders import failed`, // Attempt cleanup on error
status: "error", try {
error: error.message, await localConnection.query(`
}); DROP TABLE IF EXISTS temp_purchase_orders;
DROP TABLE IF EXISTS temp_po_receivings;
`);
} catch (cleanupError) {
console.error('Error during cleanup:', cleanupError);
}
throw error; throw error;
} }
} }

View File

@@ -1,5 +1,6 @@
const mysql = require("mysql2/promise"); const mysql = require("mysql2/promise");
const { Client } = require("ssh2"); const { Client } = require("ssh2");
const { Pool } = require('pg');
const dotenv = require("dotenv"); const dotenv = require("dotenv");
const path = require("path"); const path = require("path");
@@ -41,17 +42,41 @@ async function setupSshTunnel(sshConfig) {
async function setupConnections(sshConfig) { async function setupConnections(sshConfig) {
const tunnel = await setupSshTunnel(sshConfig); const tunnel = await setupSshTunnel(sshConfig);
// Setup MySQL connection for production
const prodConnection = await mysql.createConnection({ const prodConnection = await mysql.createConnection({
...sshConfig.prodDbConfig, ...sshConfig.prodDbConfig,
stream: tunnel.stream, stream: tunnel.stream,
}); });
const localConnection = await mysql.createPool({ // Setup PostgreSQL connection pool for local
...sshConfig.localDbConfig, const localPool = new Pool(sshConfig.localDbConfig);
waitForConnections: true,
connectionLimit: 10, // Test the PostgreSQL connection
queueLimit: 0 try {
}); const client = await localPool.connect();
await client.query('SELECT NOW()');
client.release();
console.log('PostgreSQL connection successful');
} catch (err) {
console.error('PostgreSQL connection error:', err);
throw err;
}
// Create a wrapper for the PostgreSQL pool to match MySQL interface
const localConnection = {
query: async (text, params) => {
const client = await localPool.connect();
try {
const result = await client.query(text, params);
return [result];
} finally {
client.release();
}
},
end: async () => {
await localPool.end();
}
};
return { return {
ssh: tunnel.ssh, ssh: tunnel.ssh,