Merge branch 'master' into add-product-upload-page

This commit is contained in:
2025-02-23 15:40:54 -05:00
parent 3f16413769
commit f628774267
47 changed files with 4674 additions and 3199 deletions

View File

@@ -14,7 +14,15 @@ function outputProgress(data) {
function runScript(scriptPath) {
return new Promise((resolve, reject) => {
const child = spawn('node', [scriptPath], {
stdio: ['inherit', 'pipe', 'pipe']
stdio: ['inherit', 'pipe', 'pipe'],
env: {
...process.env,
PGHOST: process.env.DB_HOST,
PGUSER: process.env.DB_USER,
PGPASSWORD: process.env.DB_PASSWORD,
PGDATABASE: process.env.DB_NAME,
PGPORT: process.env.DB_PORT || '5432'
}
});
let output = '';

View File

@@ -19,7 +19,6 @@ const IMPORT_PURCHASE_ORDERS = true;
const INCREMENTAL_UPDATE = process.env.INCREMENTAL_UPDATE !== 'false'; // Default to true unless explicitly set to false
// SSH configuration
// In import-from-prod.js
const sshConfig = {
ssh: {
host: process.env.PROD_SSH_HOST,
@@ -31,6 +30,7 @@ const sshConfig = {
compress: true, // Enable SSH compression
},
prodDbConfig: {
// MySQL config for production
host: process.env.PROD_DB_HOST || "localhost",
user: process.env.PROD_DB_USER,
password: process.env.PROD_DB_PASSWORD,
@@ -39,21 +39,16 @@ const sshConfig = {
timezone: 'Z',
},
localDbConfig: {
// PostgreSQL config for local
host: process.env.DB_HOST,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
database: process.env.DB_NAME,
multipleStatements: true,
waitForConnections: true,
connectionLimit: 10,
queueLimit: 0,
namedPlaceholders: true,
connectTimeout: 60000,
enableKeepAlive: true,
keepAliveInitialDelay: 10000,
compress: true,
timezone: 'Z',
stringifyObjects: false,
port: process.env.DB_PORT || 5432,
ssl: process.env.DB_SSL === 'true',
connectionTimeoutMillis: 60000,
idleTimeoutMillis: 30000,
max: 10 // connection pool max size
}
};
@@ -108,7 +103,7 @@ async function main() {
SET
status = 'cancelled',
end_time = NOW(),
duration_seconds = TIMESTAMPDIFF(SECOND, start_time, NOW()),
duration_seconds = EXTRACT(EPOCH FROM (NOW() - start_time))::INTEGER,
error_message = 'Previous import was not completed properly'
WHERE status = 'running'
`);
@@ -118,9 +113,10 @@ async function main() {
CREATE TABLE IF NOT EXISTS sync_status (
table_name VARCHAR(50) PRIMARY KEY,
last_sync_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_sync_id BIGINT,
INDEX idx_last_sync (last_sync_timestamp)
last_sync_id BIGINT
);
CREATE INDEX IF NOT EXISTS idx_last_sync ON sync_status (last_sync_timestamp);
`);
// Create import history record for the overall session
@@ -134,17 +130,17 @@ async function main() {
) VALUES (
'all_tables',
NOW(),
?,
$1::boolean,
'running',
JSON_OBJECT(
'categories_enabled', ?,
'products_enabled', ?,
'orders_enabled', ?,
'purchase_orders_enabled', ?
jsonb_build_object(
'categories_enabled', $2::boolean,
'products_enabled', $3::boolean,
'orders_enabled', $4::boolean,
'purchase_orders_enabled', $5::boolean
)
)
) RETURNING id
`, [INCREMENTAL_UPDATE, IMPORT_CATEGORIES, IMPORT_PRODUCTS, IMPORT_ORDERS, IMPORT_PURCHASE_ORDERS]);
importHistoryId = historyResult.insertId;
importHistoryId = historyResult.rows[0].id;
const results = {
categories: null,
@@ -162,8 +158,8 @@ async function main() {
if (isImportCancelled) throw new Error("Import cancelled");
completedSteps++;
console.log('Categories import result:', results.categories);
totalRecordsAdded += results.categories?.recordsAdded || 0;
totalRecordsUpdated += results.categories?.recordsUpdated || 0;
totalRecordsAdded += parseInt(results.categories?.recordsAdded || 0);
totalRecordsUpdated += parseInt(results.categories?.recordsUpdated || 0);
}
if (IMPORT_PRODUCTS) {
@@ -171,8 +167,8 @@ async function main() {
if (isImportCancelled) throw new Error("Import cancelled");
completedSteps++;
console.log('Products import result:', results.products);
totalRecordsAdded += results.products?.recordsAdded || 0;
totalRecordsUpdated += results.products?.recordsUpdated || 0;
totalRecordsAdded += parseInt(results.products?.recordsAdded || 0);
totalRecordsUpdated += parseInt(results.products?.recordsUpdated || 0);
}
if (IMPORT_ORDERS) {
@@ -180,8 +176,8 @@ async function main() {
if (isImportCancelled) throw new Error("Import cancelled");
completedSteps++;
console.log('Orders import result:', results.orders);
totalRecordsAdded += results.orders?.recordsAdded || 0;
totalRecordsUpdated += results.orders?.recordsUpdated || 0;
totalRecordsAdded += parseInt(results.orders?.recordsAdded || 0);
totalRecordsUpdated += parseInt(results.orders?.recordsUpdated || 0);
}
if (IMPORT_PURCHASE_ORDERS) {
@@ -189,8 +185,8 @@ async function main() {
if (isImportCancelled) throw new Error("Import cancelled");
completedSteps++;
console.log('Purchase orders import result:', results.purchaseOrders);
totalRecordsAdded += results.purchaseOrders?.recordsAdded || 0;
totalRecordsUpdated += results.purchaseOrders?.recordsUpdated || 0;
totalRecordsAdded += parseInt(results.purchaseOrders?.recordsAdded || 0);
totalRecordsUpdated += parseInt(results.purchaseOrders?.recordsUpdated || 0);
}
const endTime = Date.now();
@@ -201,21 +197,21 @@ async function main() {
UPDATE import_history
SET
end_time = NOW(),
duration_seconds = ?,
records_added = ?,
records_updated = ?,
duration_seconds = $1,
records_added = $2,
records_updated = $3,
status = 'completed',
additional_info = JSON_OBJECT(
'categories_enabled', ?,
'products_enabled', ?,
'orders_enabled', ?,
'purchase_orders_enabled', ?,
'categories_result', CAST(? AS JSON),
'products_result', CAST(? AS JSON),
'orders_result', CAST(? AS JSON),
'purchase_orders_result', CAST(? AS JSON)
additional_info = jsonb_build_object(
'categories_enabled', $4::boolean,
'products_enabled', $5::boolean,
'orders_enabled', $6::boolean,
'purchase_orders_enabled', $7::boolean,
'categories_result', COALESCE($8::jsonb, 'null'::jsonb),
'products_result', COALESCE($9::jsonb, 'null'::jsonb),
'orders_result', COALESCE($10::jsonb, 'null'::jsonb),
'purchase_orders_result', COALESCE($11::jsonb, 'null'::jsonb)
)
WHERE id = ?
WHERE id = $12
`, [
totalElapsedSeconds,
totalRecordsAdded,
@@ -259,10 +255,10 @@ async function main() {
UPDATE import_history
SET
end_time = NOW(),
duration_seconds = ?,
status = ?,
error_message = ?
WHERE id = ?
duration_seconds = $1,
status = $2,
error_message = $3
WHERE id = $4
`, [totalElapsedSeconds, error.message === "Import cancelled" ? 'cancelled' : 'failed', error.message, importHistoryId]);
}
@@ -288,16 +284,23 @@ async function main() {
throw error;
} finally {
if (connections) {
await closeConnections(connections);
await closeConnections(connections).catch(err => {
console.error("Error closing connections:", err);
});
}
}
}
// Run the import only if this is the main module
if (require.main === module) {
main().catch((error) => {
main().then((results) => {
console.log('Import completed successfully:', results);
// Force exit after a small delay to ensure all logs are written
setTimeout(() => process.exit(0), 500);
}).catch((error) => {
console.error("Unhandled error in main process:", error);
process.exit(1);
// Force exit with error code after a small delay
setTimeout(() => process.exit(1), 500);
});
}

View File

@@ -9,170 +9,206 @@ async function importCategories(prodConnection, localConnection) {
const startTime = Date.now();
const typeOrder = [10, 20, 11, 21, 12, 13];
let totalInserted = 0;
let totalUpdated = 0;
let skippedCategories = [];
try {
// Process each type in order with its own query
// Start a single transaction for the entire import
await localConnection.query('BEGIN');
// Process each type in order with its own savepoint
for (const type of typeOrder) {
const [categories] = await prodConnection.query(
`
SELECT
pc.cat_id,
pc.name,
pc.type,
CASE
WHEN pc.type IN (10, 20) THEN NULL -- Top level categories should have no parent
WHEN pc.master_cat_id IS NULL THEN NULL
ELSE pc.master_cat_id
END as parent_id,
pc.combined_name as description
FROM product_categories pc
WHERE pc.type = ?
ORDER BY pc.cat_id
`,
[type]
);
try {
// Create a savepoint for this type
await localConnection.query(`SAVEPOINT category_type_${type}`);
if (categories.length === 0) continue;
console.log(`\nProcessing ${categories.length} type ${type} categories`);
if (type === 10) {
console.log("Type 10 categories:", JSON.stringify(categories, null, 2));
}
// For types that can have parents (11, 21, 12, 13), verify parent existence
let categoriesToInsert = categories;
if (![10, 20].includes(type)) {
// Get all parent IDs
const parentIds = [
...new Set(
categories.map((c) => c.parent_id).filter((id) => id !== null)
),
];
// Check which parents exist
const [existingParents] = await localConnection.query(
"SELECT cat_id FROM categories WHERE cat_id IN (?)",
[parentIds]
);
const existingParentIds = new Set(existingParents.map((p) => p.cat_id));
// Filter categories and track skipped ones
categoriesToInsert = categories.filter(
(cat) =>
cat.parent_id === null || existingParentIds.has(cat.parent_id)
);
const invalidCategories = categories.filter(
(cat) =>
cat.parent_id !== null && !existingParentIds.has(cat.parent_id)
// Production query remains MySQL compatible
const [categories] = await prodConnection.query(
`
SELECT
pc.cat_id,
pc.name,
pc.type,
CASE
WHEN pc.type IN (10, 20) THEN NULL -- Top level categories should have no parent
WHEN pc.master_cat_id IS NULL THEN NULL
ELSE pc.master_cat_id
END as parent_id,
pc.combined_name as description
FROM product_categories pc
WHERE pc.type = ?
ORDER BY pc.cat_id
`,
[type]
);
if (invalidCategories.length > 0) {
const skippedInfo = invalidCategories.map((c) => ({
id: c.cat_id,
name: c.name,
type: c.type,
missing_parent: c.parent_id,
}));
skippedCategories.push(...skippedInfo);
if (categories.length === 0) {
await localConnection.query(`RELEASE SAVEPOINT category_type_${type}`);
continue;
}
console.log(
"\nSkipping categories with missing parents:",
invalidCategories
.map(
(c) =>
`${c.cat_id} - ${c.name} (missing parent: ${c.parent_id})`
)
.join("\n")
);
console.log(`\nProcessing ${categories.length} type ${type} categories`);
if (type === 10) {
console.log("Type 10 categories:", JSON.stringify(categories, null, 2));
}
// For types that can have parents (11, 21, 12, 13), verify parent existence
let categoriesToInsert = categories;
if (![10, 20].includes(type)) {
// Get all parent IDs
const parentIds = [
...new Set(
categories
.filter(c => c && c.parent_id !== null)
.map(c => c.parent_id)
),
];
console.log(`Processing ${categories.length} type ${type} categories with ${parentIds.length} unique parent IDs`);
console.log('Parent IDs:', parentIds);
// No need to check for parent existence - we trust they exist since they were just inserted
categoriesToInsert = categories;
}
if (categoriesToInsert.length === 0) {
console.log(
`No valid categories of type ${type} to insert - all had missing parents`
`No valid categories of type ${type} to insert`
);
await localConnection.query(`RELEASE SAVEPOINT category_type_${type}`);
continue;
}
console.log(
`Inserting ${categoriesToInsert.length} type ${type} categories`
);
// PostgreSQL upsert query with parameterized values
const values = categoriesToInsert.flatMap((cat) => [
cat.cat_id,
cat.name,
cat.type,
cat.parent_id,
cat.description,
'active',
new Date(),
new Date()
]);
console.log('Attempting to insert/update with values:', JSON.stringify(values, null, 2));
const placeholders = categoriesToInsert
.map((_, i) => `($${i * 8 + 1}, $${i * 8 + 2}, $${i * 8 + 3}, $${i * 8 + 4}, $${i * 8 + 5}, $${i * 8 + 6}, $${i * 8 + 7}, $${i * 8 + 8})`)
.join(',');
console.log('Using placeholders:', placeholders);
// Insert categories with ON CONFLICT clause for PostgreSQL
const query = `
WITH inserted_categories AS (
INSERT INTO categories (
cat_id, name, type, parent_id, description, status, created_at, updated_at
)
VALUES ${placeholders}
ON CONFLICT (cat_id) DO UPDATE SET
name = EXCLUDED.name,
type = EXCLUDED.type,
parent_id = EXCLUDED.parent_id,
description = EXCLUDED.description,
status = EXCLUDED.status,
updated_at = EXCLUDED.updated_at
RETURNING
cat_id,
CASE
WHEN xmax = 0 THEN true
ELSE false
END as is_insert
)
SELECT
COUNT(*) as total,
COUNT(*) FILTER (WHERE is_insert) as inserted,
COUNT(*) FILTER (WHERE NOT is_insert) as updated
FROM inserted_categories`;
console.log('Executing query:', query);
const result = await localConnection.query(query, values);
console.log('Query result:', result);
// Get the first result since query returns an array
const queryResult = Array.isArray(result) ? result[0] : result;
if (!queryResult || !queryResult.rows || !queryResult.rows[0]) {
console.error('Query failed to return results. Result:', queryResult);
throw new Error('Query did not return expected results');
}
const total = parseInt(queryResult.rows[0].total) || 0;
const inserted = parseInt(queryResult.rows[0].inserted) || 0;
const updated = parseInt(queryResult.rows[0].updated) || 0;
console.log(`Total: ${total}, Inserted: ${inserted}, Updated: ${updated}`);
totalInserted += inserted;
totalUpdated += updated;
// Release the savepoint for this type
await localConnection.query(`RELEASE SAVEPOINT category_type_${type}`);
outputProgress({
status: "running",
operation: "Categories import",
message: `Imported ${inserted} (updated ${updated}) categories of type ${type}`,
current: totalInserted + totalUpdated,
total: categories.length,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
});
} catch (error) {
// Rollback to the savepoint for this type
await localConnection.query(`ROLLBACK TO SAVEPOINT category_type_${type}`);
throw error;
}
console.log(
`Inserting ${categoriesToInsert.length} type ${type} categories`
);
const placeholders = categoriesToInsert
.map(() => "(?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)")
.join(",");
const values = categoriesToInsert.flatMap((cat) => [
cat.cat_id,
cat.name,
cat.type,
cat.parent_id,
cat.description,
"active",
]);
// Insert categories and create relationships in one query to avoid race conditions
await localConnection.query(
`
INSERT INTO categories (cat_id, name, type, parent_id, description, status, created_at, updated_at)
VALUES ${placeholders}
ON DUPLICATE KEY UPDATE
name = VALUES(name),
type = VALUES(type),
parent_id = VALUES(parent_id),
description = VALUES(description),
status = VALUES(status),
updated_at = CURRENT_TIMESTAMP
`,
values
);
totalInserted += categoriesToInsert.length;
outputProgress({
status: "running",
operation: "Categories import",
current: totalInserted,
total: totalInserted,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
});
}
// After all imports, if we skipped any categories, throw an error
if (skippedCategories.length > 0) {
const error = new Error(
"Categories import completed with errors - some categories were skipped due to missing parents"
);
error.skippedCategories = skippedCategories;
throw error;
}
// Commit the entire transaction - we'll do this even if we have skipped categories
await localConnection.query('COMMIT');
outputProgress({
status: "complete",
operation: "Categories import completed",
current: totalInserted,
total: totalInserted,
current: totalInserted + totalUpdated,
total: totalInserted + totalUpdated,
duration: formatElapsedTime((Date.now() - startTime) / 1000),
warnings: skippedCategories.length > 0 ? {
message: "Some categories were skipped due to missing parents",
skippedCategories
} : undefined
});
return {
status: "complete",
totalImported: totalInserted
recordsAdded: totalInserted,
recordsUpdated: totalUpdated,
totalRecords: totalInserted + totalUpdated,
warnings: skippedCategories.length > 0 ? {
message: "Some categories were skipped due to missing parents",
skippedCategories
} : undefined
};
} catch (error) {
console.error("Error importing categories:", error);
if (error.skippedCategories) {
console.error(
"Skipped categories:",
JSON.stringify(error.skippedCategories, null, 2)
);
// Only rollback if we haven't committed yet
try {
await localConnection.query('ROLLBACK');
} catch (rollbackError) {
console.error("Error during rollback:", rollbackError);
}
outputProgress({
status: "error",
operation: "Categories import failed",
error: error.message,
skippedCategories: error.skippedCategories
error: error.message
});
throw error;

View File

@@ -2,7 +2,7 @@ const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } =
const { importMissingProducts, setupTemporaryTables, cleanupTemporaryTables, materializeCalculations } = require('./products');
/**
* Imports orders from a production MySQL database to a local MySQL database.
* Imports orders from a production MySQL database to a local PostgreSQL database.
* It can run in two modes:
* 1. Incremental update mode (default): Only fetch orders that have changed since the last sync time.
* 2. Full update mode: Fetch all eligible orders within the last 5 years regardless of timestamp.
@@ -23,93 +23,18 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
let importedCount = 0;
let totalOrderItems = 0;
let totalUniqueOrders = 0;
// Add a cumulative counter for processed orders before the loop
let cumulativeProcessedOrders = 0;
try {
// Clean up any existing temp tables first
await localConnection.query(`
DROP TEMPORARY TABLE IF EXISTS temp_order_items;
DROP TEMPORARY TABLE IF EXISTS temp_order_meta;
DROP TEMPORARY TABLE IF EXISTS temp_order_discounts;
DROP TEMPORARY TABLE IF EXISTS temp_order_taxes;
DROP TEMPORARY TABLE IF EXISTS temp_order_costs;
`);
// Create all temp tables with correct schema
await localConnection.query(`
CREATE TEMPORARY TABLE temp_order_items (
order_id INT UNSIGNED NOT NULL,
pid INT UNSIGNED NOT NULL,
SKU VARCHAR(50) NOT NULL,
price DECIMAL(10,2) NOT NULL,
quantity INT NOT NULL,
base_discount DECIMAL(10,2) DEFAULT 0,
PRIMARY KEY (order_id, pid)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
`);
await localConnection.query(`
CREATE TEMPORARY TABLE temp_order_meta (
order_id INT UNSIGNED NOT NULL,
date DATE NOT NULL,
customer VARCHAR(100) NOT NULL,
customer_name VARCHAR(150) NOT NULL,
status INT,
canceled TINYINT(1),
summary_discount DECIMAL(10,2) DEFAULT 0.00,
summary_subtotal DECIMAL(10,2) DEFAULT 0.00,
PRIMARY KEY (order_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
`);
await localConnection.query(`
CREATE TEMPORARY TABLE temp_order_discounts (
order_id INT UNSIGNED NOT NULL,
pid INT UNSIGNED NOT NULL,
discount DECIMAL(10,2) NOT NULL,
PRIMARY KEY (order_id, pid)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
`);
await localConnection.query(`
CREATE TEMPORARY TABLE temp_order_taxes (
order_id INT UNSIGNED NOT NULL,
pid INT UNSIGNED NOT NULL,
tax DECIMAL(10,2) NOT NULL,
PRIMARY KEY (order_id, pid)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
`);
await localConnection.query(`
CREATE TEMPORARY TABLE temp_order_costs (
order_id INT UNSIGNED NOT NULL,
pid INT UNSIGNED NOT NULL,
costeach DECIMAL(10,3) DEFAULT 0.000,
PRIMARY KEY (order_id, pid)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
`);
// Get column names from the local table
const [columns] = await localConnection.query(`
SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = 'orders'
AND COLUMN_NAME != 'updated' -- Exclude the updated column
ORDER BY ORDINAL_POSITION
`);
const columnNames = columns.map(col => col.COLUMN_NAME);
// Get last sync info
const [syncInfo] = await localConnection.query(
"SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'orders'"
);
const lastSyncTime = syncInfo?.[0]?.last_sync_timestamp || '1970-01-01';
const lastSyncTime = syncInfo?.rows?.[0]?.last_sync_timestamp || '1970-01-01';
console.log('Orders: Using last sync time:', lastSyncTime);
// First get count of order items
// First get count of order items - Keep MySQL compatible for production
const [[{ total }]] = await prodConnection.query(`
SELECT COUNT(*) as total
FROM order_items oi
@@ -141,12 +66,13 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
totalOrderItems = total;
console.log('Orders: Found changes:', totalOrderItems);
// Get order items in batches
// Get order items - Keep MySQL compatible for production
console.log('Orders: Starting MySQL query...');
const [orderItems] = await prodConnection.query(`
SELECT
oi.order_id,
oi.prod_pid as pid,
oi.prod_itemnumber as SKU,
oi.prod_pid,
COALESCE(NULLIF(TRIM(oi.prod_itemnumber), ''), 'NO-SKU') as SKU,
oi.prod_price as price,
oi.qty_ordered as quantity,
COALESCE(oi.prod_price_reg - oi.prod_price, 0) as base_discount,
@@ -177,24 +103,78 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
` : ''}
`, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []);
console.log('Orders: Processing', orderItems.length, 'order items');
console.log('Orders: Found', orderItems.length, 'order items to process');
// Create tables in PostgreSQL for debugging
await localConnection.query(`
DROP TABLE IF EXISTS debug_order_items;
DROP TABLE IF EXISTS debug_order_meta;
DROP TABLE IF EXISTS debug_order_discounts;
DROP TABLE IF EXISTS debug_order_taxes;
DROP TABLE IF EXISTS debug_order_costs;
CREATE TABLE debug_order_items (
order_id INTEGER NOT NULL,
pid INTEGER NOT NULL,
SKU VARCHAR(50) NOT NULL,
price DECIMAL(10,2) NOT NULL,
quantity INTEGER NOT NULL,
base_discount DECIMAL(10,2) DEFAULT 0,
PRIMARY KEY (order_id, pid)
);
CREATE TABLE debug_order_meta (
order_id INTEGER NOT NULL,
date DATE NOT NULL,
customer VARCHAR(100) NOT NULL,
customer_name VARCHAR(150) NOT NULL,
status INTEGER,
canceled BOOLEAN,
summary_discount DECIMAL(10,2) DEFAULT 0.00,
summary_subtotal DECIMAL(10,2) DEFAULT 0.00,
PRIMARY KEY (order_id)
);
CREATE TABLE debug_order_discounts (
order_id INTEGER NOT NULL,
pid INTEGER NOT NULL,
discount DECIMAL(10,2) NOT NULL,
PRIMARY KEY (order_id, pid)
);
CREATE TABLE debug_order_taxes (
order_id INTEGER NOT NULL,
pid INTEGER NOT NULL,
tax DECIMAL(10,2) NOT NULL,
PRIMARY KEY (order_id, pid)
);
CREATE TABLE debug_order_costs (
order_id INTEGER NOT NULL,
pid INTEGER NOT NULL,
costeach DECIMAL(10,3) DEFAULT 0.000,
PRIMARY KEY (order_id, pid)
);
`);
// Insert order items in batches
for (let i = 0; i < orderItems.length; i += 5000) {
const batch = orderItems.slice(i, Math.min(i + 5000, orderItems.length));
const placeholders = batch.map(() => "(?, ?, ?, ?, ?, ?)").join(",");
const placeholders = batch.map((_, idx) =>
`($${idx * 6 + 1}, $${idx * 6 + 2}, $${idx * 6 + 3}, $${idx * 6 + 4}, $${idx * 6 + 5}, $${idx * 6 + 6})`
).join(",");
const values = batch.flatMap(item => [
item.order_id, item.pid, item.SKU, item.price, item.quantity, item.base_discount
item.order_id, item.prod_pid, item.SKU, item.price, item.quantity, item.base_discount
]);
await localConnection.query(`
INSERT INTO temp_order_items (order_id, pid, SKU, price, quantity, base_discount)
INSERT INTO debug_order_items (order_id, pid, SKU, price, quantity, base_discount)
VALUES ${placeholders}
ON DUPLICATE KEY UPDATE
SKU = VALUES(SKU),
price = VALUES(price),
quantity = VALUES(quantity),
base_discount = VALUES(base_discount)
ON CONFLICT (order_id, pid) DO UPDATE SET
SKU = EXCLUDED.SKU,
price = EXCLUDED.price,
quantity = EXCLUDED.quantity,
base_discount = EXCLUDED.base_discount
`, values);
processedCount = i + batch.length;
@@ -203,24 +183,26 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
operation: "Orders import",
message: `Loading order items: ${processedCount} of ${totalOrderItems}`,
current: processedCount,
total: totalOrderItems
total: totalOrderItems,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, processedCount, totalOrderItems),
rate: calculateRate(startTime, processedCount)
});
}
// Get unique order IDs
const orderIds = [...new Set(orderItems.map(item => item.order_id))];
totalUniqueOrders = orderIds.length;
console.log('Total unique order IDs:', totalUniqueOrders);
console.log('Orders: Processing', totalUniqueOrders, 'unique orders');
// Reset processed count for order processing phase
processedCount = 0;
// Get order metadata in batches
for (let i = 0; i < orderIds.length; i += 5000) {
const batchIds = orderIds.slice(i, i + 5000);
console.log(`Processing batch ${i/5000 + 1}, size: ${batchIds.length}`);
console.log('Sample of batch IDs:', batchIds.slice(0, 5));
// Process metadata, discounts, taxes, and costs in parallel
const METADATA_BATCH_SIZE = 2000;
const PG_BATCH_SIZE = 200;
const processMetadataBatch = async (batchIds) => {
const [orders] = await prodConnection.query(`
SELECT
o.order_id,
@@ -235,64 +217,46 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
LEFT JOIN users u ON o.order_cid = u.cid
WHERE o.order_id IN (?)
`, [batchIds]);
console.log(`Retrieved ${orders.length} orders for ${batchIds.length} IDs`);
const duplicates = orders.filter((order, index, self) =>
self.findIndex(o => o.order_id === order.order_id) !== index
);
if (duplicates.length > 0) {
console.log('Found duplicates:', duplicates);
// Process in sub-batches for PostgreSQL
for (let j = 0; j < orders.length; j += PG_BATCH_SIZE) {
const subBatch = orders.slice(j, j + PG_BATCH_SIZE);
if (subBatch.length === 0) continue;
const placeholders = subBatch.map((_, idx) =>
`($${idx * 8 + 1}, $${idx * 8 + 2}, $${idx * 8 + 3}, $${idx * 8 + 4}, $${idx * 8 + 5}, $${idx * 8 + 6}, $${idx * 8 + 7}, $${idx * 8 + 8})`
).join(",");
const values = subBatch.flatMap(order => [
order.order_id,
order.date,
order.customer,
order.customer_name || '',
order.status,
order.canceled,
order.summary_discount || 0,
order.summary_subtotal || 0
]);
await localConnection.query(`
INSERT INTO debug_order_meta (
order_id, date, customer, customer_name, status, canceled,
summary_discount, summary_subtotal
)
VALUES ${placeholders}
ON CONFLICT (order_id) DO UPDATE SET
date = EXCLUDED.date,
customer = EXCLUDED.customer,
customer_name = EXCLUDED.customer_name,
status = EXCLUDED.status,
canceled = EXCLUDED.canceled,
summary_discount = EXCLUDED.summary_discount,
summary_subtotal = EXCLUDED.summary_subtotal
`, values);
}
};
const placeholders = orders.map(() => "(?, ?, ?, ?, ?, ?, ?, ?)").join(",");
const values = orders.flatMap(order => [
order.order_id,
order.date,
order.customer,
order.customer_name,
order.status,
order.canceled,
order.summary_discount,
order.summary_subtotal
]);
await localConnection.query(`
INSERT INTO temp_order_meta (
order_id,
date,
customer,
customer_name,
status,
canceled,
summary_discount,
summary_subtotal
) VALUES ${placeholders}
ON DUPLICATE KEY UPDATE
date = VALUES(date),
customer = VALUES(customer),
customer_name = VALUES(customer_name),
status = VALUES(status),
canceled = VALUES(canceled),
summary_discount = VALUES(summary_discount),
summary_subtotal = VALUES(summary_subtotal)
`, values);
processedCount = i + orders.length;
outputProgress({
status: "running",
operation: "Orders import",
message: `Loading order metadata: ${processedCount} of ${totalUniqueOrders}`,
current: processedCount,
total: totalUniqueOrders
});
}
// Reset processed count for final phase
processedCount = 0;
// Get promotional discounts in batches
for (let i = 0; i < orderIds.length; i += 5000) {
const batchIds = orderIds.slice(i, i + 5000);
const processDiscountsBatch = async (batchIds) => {
const [discounts] = await prodConnection.query(`
SELECT order_id, pid, SUM(amount) as discount
FROM order_discount_items
@@ -300,313 +264,296 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
GROUP BY order_id, pid
`, [batchIds]);
if (discounts.length > 0) {
const placeholders = discounts.map(() => "(?, ?, ?)").join(",");
const values = discounts.flatMap(d => [d.order_id, d.pid, d.discount]);
if (discounts.length === 0) return;
for (let j = 0; j < discounts.length; j += PG_BATCH_SIZE) {
const subBatch = discounts.slice(j, j + PG_BATCH_SIZE);
if (subBatch.length === 0) continue;
const placeholders = subBatch.map((_, idx) =>
`($${idx * 3 + 1}, $${idx * 3 + 2}, $${idx * 3 + 3})`
).join(",");
const values = subBatch.flatMap(d => [
d.order_id,
d.pid,
d.discount || 0
]);
await localConnection.query(`
INSERT INTO temp_order_discounts VALUES ${placeholders}
ON DUPLICATE KEY UPDATE
discount = VALUES(discount)
INSERT INTO debug_order_discounts (order_id, pid, discount)
VALUES ${placeholders}
ON CONFLICT (order_id, pid) DO UPDATE SET
discount = EXCLUDED.discount
`, values);
}
}
};
// Get tax information in batches
for (let i = 0; i < orderIds.length; i += 5000) {
const batchIds = orderIds.slice(i, i + 5000);
const processTaxesBatch = async (batchIds) => {
// Optimized tax query to avoid subquery
const [taxes] = await prodConnection.query(`
SELECT DISTINCT
oti.order_id,
otip.pid,
otip.item_taxes_to_collect as tax
FROM order_tax_info oti
JOIN (
SELECT order_id, MAX(stamp) as max_stamp
SELECT oti.order_id, otip.pid, otip.item_taxes_to_collect as tax
FROM (
SELECT order_id, MAX(taxinfo_id) as latest_taxinfo_id
FROM order_tax_info
WHERE order_id IN (?)
GROUP BY order_id
) latest ON oti.order_id = latest.order_id AND oti.stamp = latest.max_stamp
) latest_info
JOIN order_tax_info oti ON oti.order_id = latest_info.order_id
AND oti.taxinfo_id = latest_info.latest_taxinfo_id
JOIN order_tax_info_products otip ON oti.taxinfo_id = otip.taxinfo_id
`, [batchIds]);
if (taxes.length > 0) {
// Remove any duplicates before inserting
const uniqueTaxes = new Map();
taxes.forEach(t => {
const key = `${t.order_id}-${t.pid}`;
uniqueTaxes.set(key, t);
});
if (taxes.length === 0) return;
const values = Array.from(uniqueTaxes.values()).flatMap(t => [t.order_id, t.pid, t.tax]);
if (values.length > 0) {
const placeholders = Array(uniqueTaxes.size).fill("(?, ?, ?)").join(",");
await localConnection.query(`
INSERT INTO temp_order_taxes VALUES ${placeholders}
ON DUPLICATE KEY UPDATE tax = VALUES(tax)
`, values);
}
}
}
for (let j = 0; j < taxes.length; j += PG_BATCH_SIZE) {
const subBatch = taxes.slice(j, j + PG_BATCH_SIZE);
if (subBatch.length === 0) continue;
// Get costeach values in batches
for (let i = 0; i < orderIds.length; i += 5000) {
const batchIds = orderIds.slice(i, i + 5000);
const [costs] = await prodConnection.query(`
SELECT
oc.orderid as order_id,
oc.pid,
COALESCE(
oc.costeach,
(SELECT pi.costeach
FROM product_inventory pi
WHERE pi.pid = oc.pid
AND pi.daterec <= o.date_placed
ORDER BY pi.daterec DESC LIMIT 1)
) as costeach
FROM order_costs oc
JOIN _order o ON oc.orderid = o.order_id
WHERE oc.orderid IN (?)
`, [batchIds]);
const placeholders = subBatch.map((_, idx) =>
`($${idx * 3 + 1}, $${idx * 3 + 2}, $${idx * 3 + 3})`
).join(",");
const values = subBatch.flatMap(t => [
t.order_id,
t.pid,
t.tax || 0
]);
if (costs.length > 0) {
const placeholders = costs.map(() => '(?, ?, ?)').join(",");
const values = costs.flatMap(c => [c.order_id, c.pid, c.costeach || 0]);
await localConnection.query(`
INSERT INTO temp_order_costs (order_id, pid, costeach)
INSERT INTO debug_order_taxes (order_id, pid, tax)
VALUES ${placeholders}
ON DUPLICATE KEY UPDATE costeach = VALUES(costeach)
ON CONFLICT (order_id, pid) DO UPDATE SET
tax = EXCLUDED.tax
`, values);
}
}
};
// Now combine all the data and insert into orders table
// Pre-check all products at once instead of per batch
const allOrderPids = [...new Set(orderItems.map(item => item.pid))];
const [existingProducts] = allOrderPids.length > 0 ? await localConnection.query(
"SELECT pid FROM products WHERE pid IN (?)",
[allOrderPids]
) : [[]];
const existingPids = new Set(existingProducts.map(p => p.pid));
// Process in larger batches
for (let i = 0; i < orderIds.length; i += 5000) {
const batchIds = orderIds.slice(i, i + 5000);
// Get combined data for this batch
const [orders] = await localConnection.query(`
const processCostsBatch = async (batchIds) => {
const [costs] = await prodConnection.query(`
SELECT
oi.order_id as order_number,
oi.pid,
oi.SKU,
om.date,
oi.price,
oi.quantity,
oi.base_discount + COALESCE(od.discount, 0) +
CASE
WHEN om.summary_discount > 0 THEN
ROUND((om.summary_discount * (oi.price * oi.quantity)) /
NULLIF(om.summary_subtotal, 0), 2)
ELSE 0
END as discount,
COALESCE(ot.tax, 0) as tax,
0 as tax_included,
0 as shipping,
om.customer,
om.customer_name,
om.status,
om.canceled,
COALESCE(tc.costeach, 0) as costeach
FROM temp_order_items oi
JOIN temp_order_meta om ON oi.order_id = om.order_id
LEFT JOIN temp_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid
LEFT JOIN temp_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid
LEFT JOIN temp_order_costs tc ON oi.order_id = tc.order_id AND oi.pid = tc.pid
WHERE oi.order_id IN (?)
oc.orderid as order_id,
oc.pid,
oc.costeach
FROM order_costs oc
WHERE oc.orderid IN (?)
AND oc.pending = 0
`, [batchIds]);
// Filter orders and track missing products - do this in a single pass
const validOrders = [];
const values = [];
const processedOrderItems = new Set(); // Track unique order items
const processedOrders = new Set(); // Track unique orders
for (const order of orders) {
if (!existingPids.has(order.pid)) {
missingProducts.add(order.pid);
skippedOrders.add(order.order_number);
continue;
}
validOrders.push(order);
values.push(...columnNames.map(col => order[col] ?? null));
processedOrderItems.add(`${order.order_number}-${order.pid}`);
processedOrders.add(order.order_number);
}
if (costs.length === 0) return;
if (validOrders.length > 0) {
// Pre-compute the placeholders string once
const singlePlaceholder = `(${columnNames.map(() => "?").join(",")})`;
const placeholders = Array(validOrders.length).fill(singlePlaceholder).join(",");
for (let j = 0; j < costs.length; j += PG_BATCH_SIZE) {
const subBatch = costs.slice(j, j + PG_BATCH_SIZE);
if (subBatch.length === 0) continue;
const placeholders = subBatch.map((_, idx) =>
`($${idx * 3 + 1}, $${idx * 3 + 2}, $${idx * 3 + 3})`
).join(",");
const result = await localConnection.query(`
INSERT INTO orders (${columnNames.join(",")})
const values = subBatch.flatMap(c => [
c.order_id,
c.pid,
c.costeach || 0
]);
await localConnection.query(`
INSERT INTO debug_order_costs (order_id, pid, costeach)
VALUES ${placeholders}
ON DUPLICATE KEY UPDATE
SKU = VALUES(SKU),
date = VALUES(date),
price = VALUES(price),
quantity = VALUES(quantity),
discount = VALUES(discount),
tax = VALUES(tax),
tax_included = VALUES(tax_included),
shipping = VALUES(shipping),
customer = VALUES(customer),
customer_name = VALUES(customer_name),
status = VALUES(status),
canceled = VALUES(canceled),
costeach = VALUES(costeach)
`, validOrders.map(o => columnNames.map(col => o[col] ?? null)).flat());
const affectedRows = result[0].affectedRows;
const updates = Math.floor(affectedRows / 2);
const inserts = affectedRows - (updates * 2);
recordsAdded += inserts;
recordsUpdated += updates;
importedCount += processedOrderItems.size; // Count unique order items processed
ON CONFLICT (order_id, pid) DO UPDATE SET
costeach = EXCLUDED.costeach
`, values);
}
};
// Update progress based on unique orders processed
cumulativeProcessedOrders += processedOrders.size;
// Process all data types in parallel for each batch
for (let i = 0; i < orderIds.length; i += METADATA_BATCH_SIZE) {
const batchIds = orderIds.slice(i, i + METADATA_BATCH_SIZE);
await Promise.all([
processMetadataBatch(batchIds),
processDiscountsBatch(batchIds),
processTaxesBatch(batchIds),
processCostsBatch(batchIds)
]);
processedCount = i + batchIds.length;
outputProgress({
status: "running",
operation: "Orders import",
message: `Imported ${importedCount} order items (${cumulativeProcessedOrders} of ${totalUniqueOrders} orders processed)`,
current: cumulativeProcessedOrders,
message: `Loading order data: ${processedCount} of ${totalUniqueOrders}`,
current: processedCount,
total: totalUniqueOrders,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, cumulativeProcessedOrders, totalUniqueOrders),
rate: calculateRate(startTime, cumulativeProcessedOrders)
remaining: estimateRemaining(startTime, processedCount, totalUniqueOrders),
rate: calculateRate(startTime, processedCount)
});
}
// Now try to import any orders that were skipped due to missing products
if (skippedOrders.size > 0) {
try {
outputProgress({
status: "running",
operation: "Orders import",
message: `Retrying import of ${skippedOrders.size} orders with previously missing products`,
});
// Pre-check all products at once
const allOrderPids = [...new Set(orderItems.map(item => item.prod_pid))];
console.log('Orders: Checking', allOrderPids.length, 'unique products');
const [existingProducts] = allOrderPids.length > 0 ? await localConnection.query(
"SELECT pid FROM products WHERE pid = ANY($1::bigint[])",
[allOrderPids]
) : [[]];
const existingPids = new Set(existingProducts.rows.map(p => p.pid));
// Process in smaller batches
for (let i = 0; i < orderIds.length; i += 1000) {
const batchIds = orderIds.slice(i, i + 1000);
// Get the orders that were skipped
const [skippedProdOrders] = await localConnection.query(`
SELECT DISTINCT
// Get combined data for this batch in sub-batches
const PG_BATCH_SIZE = 100; // Process 100 records at a time
for (let j = 0; j < batchIds.length; j += PG_BATCH_SIZE) {
const subBatchIds = batchIds.slice(j, j + PG_BATCH_SIZE);
const [orders] = await localConnection.query(`
WITH order_totals AS (
SELECT
oi.order_id,
oi.pid,
SUM(COALESCE(od.discount, 0)) as promo_discount,
COALESCE(ot.tax, 0) as total_tax,
COALESCE(oi.price * 0.5, 0) as costeach
FROM debug_order_items oi
LEFT JOIN debug_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid
LEFT JOIN debug_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid
GROUP BY oi.order_id, oi.pid, ot.tax
)
SELECT
oi.order_id as order_number,
oi.pid,
oi.SKU,
oi.pid::bigint as pid,
oi.SKU as sku,
om.date,
oi.price,
oi.quantity,
oi.base_discount + COALESCE(od.discount, 0) +
CASE
WHEN o.summary_discount > 0 THEN
ROUND((o.summary_discount * (oi.price * oi.quantity)) /
NULLIF(o.summary_subtotal, 0), 2)
(oi.base_discount +
COALESCE(ot.promo_discount, 0) +
CASE
WHEN om.summary_discount > 0 AND om.summary_subtotal > 0 THEN
ROUND((om.summary_discount * (oi.price * oi.quantity)) / NULLIF(om.summary_subtotal, 0), 2)
ELSE 0
END as discount,
COALESCE(ot.tax, 0) as tax,
0 as tax_included,
END)::DECIMAL(10,2) as discount,
COALESCE(ot.total_tax, 0)::DECIMAL(10,2) as tax,
false as tax_included,
0 as shipping,
om.customer,
om.customer_name,
om.status,
om.canceled,
COALESCE(tc.costeach, 0) as costeach
FROM temp_order_items oi
JOIN temp_order_meta om ON oi.order_id = om.order_id
LEFT JOIN _order o ON oi.order_id = o.order_id
LEFT JOIN temp_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid
LEFT JOIN temp_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid
LEFT JOIN temp_order_costs tc ON oi.order_id = tc.order_id AND oi.pid = tc.pid
WHERE oi.order_id IN (?)
`, [Array.from(skippedOrders)]);
COALESCE(ot.costeach, oi.price * 0.5)::DECIMAL(10,3) as costeach
FROM (
SELECT DISTINCT ON (order_id, pid)
order_id, pid, SKU, price, quantity, base_discount
FROM debug_order_items
WHERE order_id = ANY($1)
ORDER BY order_id, pid
) oi
JOIN debug_order_meta om ON oi.order_id = om.order_id
LEFT JOIN order_totals ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid
ORDER BY oi.order_id, oi.pid
`, [subBatchIds]);
// Check which products exist now
const skippedPids = [...new Set(skippedProdOrders.map(o => o.pid))];
const [existingProducts] = skippedPids.length > 0 ? await localConnection.query(
"SELECT pid FROM products WHERE pid IN (?)",
[skippedPids]
) : [[]];
const existingPids = new Set(existingProducts.map(p => p.pid));
// Filter orders that can now be imported
const validOrders = skippedProdOrders.filter(order => existingPids.has(order.pid));
const retryOrderItems = new Set(); // Track unique order items in retry
if (validOrders.length > 0) {
const placeholders = validOrders.map(() => `(${columnNames.map(() => "?").join(", ")})`).join(",");
const values = validOrders.map(o => columnNames.map(col => o[col] ?? null)).flat();
const result = await localConnection.query(`
INSERT INTO orders (${columnNames.join(", ")})
VALUES ${placeholders}
ON DUPLICATE KEY UPDATE
SKU = VALUES(SKU),
date = VALUES(date),
price = VALUES(price),
quantity = VALUES(quantity),
discount = VALUES(discount),
tax = VALUES(tax),
tax_included = VALUES(tax_included),
shipping = VALUES(shipping),
customer = VALUES(customer),
customer_name = VALUES(customer_name),
status = VALUES(status),
canceled = VALUES(canceled),
costeach = VALUES(costeach)
`, values);
const affectedRows = result[0].affectedRows;
const updates = Math.floor(affectedRows / 2);
const inserts = affectedRows - (updates * 2);
// Track unique order items
validOrders.forEach(order => {
retryOrderItems.add(`${order.order_number}-${order.pid}`);
});
outputProgress({
status: "running",
operation: "Orders import",
message: `Successfully imported ${retryOrderItems.size} previously skipped order items`,
});
// Update the main counters
recordsAdded += inserts;
recordsUpdated += updates;
importedCount += retryOrderItems.size;
// Filter orders and track missing products
const validOrders = [];
const processedOrderItems = new Set();
const processedOrders = new Set();
for (const order of orders.rows) {
if (!existingPids.has(order.pid)) {
missingProducts.add(order.pid);
skippedOrders.add(order.order_number);
continue;
}
validOrders.push(order);
processedOrderItems.add(`${order.order_number}-${order.pid}`);
processedOrders.add(order.order_number);
}
} catch (error) {
console.warn('Warning: Failed to retry skipped orders:', error.message);
console.warn(`Skipped ${skippedOrders.size} orders due to ${missingProducts.size} missing products`);
// Process valid orders in smaller sub-batches
const FINAL_BATCH_SIZE = 50;
for (let k = 0; k < validOrders.length; k += FINAL_BATCH_SIZE) {
const subBatch = validOrders.slice(k, k + FINAL_BATCH_SIZE);
const placeholders = subBatch.map((_, idx) => {
const base = idx * 14; // 14 columns (removed updated)
return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, $${base + 10}, $${base + 11}, $${base + 12}, $${base + 13}, $${base + 14})`;
}).join(',');
const batchValues = subBatch.flatMap(o => [
o.order_number,
o.pid,
o.sku || 'NO-SKU',
o.date,
o.price,
o.quantity,
o.discount,
o.tax,
o.tax_included,
o.shipping,
o.customer,
o.customer_name,
o.status,
o.canceled
]);
const [result] = await localConnection.query(`
WITH inserted_orders AS (
INSERT INTO orders (
order_number, pid, sku, date, price, quantity, discount,
tax, tax_included, shipping, customer, customer_name,
status, canceled
)
VALUES ${placeholders}
ON CONFLICT (order_number, pid) DO UPDATE SET
sku = EXCLUDED.sku,
date = EXCLUDED.date,
price = EXCLUDED.price,
quantity = EXCLUDED.quantity,
discount = EXCLUDED.discount,
tax = EXCLUDED.tax,
tax_included = EXCLUDED.tax_included,
shipping = EXCLUDED.shipping,
customer = EXCLUDED.customer,
customer_name = EXCLUDED.customer_name,
status = EXCLUDED.status,
canceled = EXCLUDED.canceled
RETURNING xmax = 0 as inserted
)
SELECT
COUNT(*) FILTER (WHERE inserted) as inserted,
COUNT(*) FILTER (WHERE NOT inserted) as updated
FROM inserted_orders
`, batchValues);
const { inserted, updated } = result.rows[0];
recordsAdded += inserted;
recordsUpdated += updated;
importedCount += subBatch.length;
}
cumulativeProcessedOrders += processedOrders.size;
outputProgress({
status: "running",
operation: "Orders import",
message: `Importing orders: ${cumulativeProcessedOrders} of ${totalUniqueOrders}`,
current: cumulativeProcessedOrders,
total: totalUniqueOrders,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, cumulativeProcessedOrders, totalUniqueOrders),
rate: calculateRate(startTime, cumulativeProcessedOrders)
});
}
}
// Clean up temporary tables after ALL processing is complete
await localConnection.query(`
DROP TEMPORARY TABLE IF EXISTS temp_order_items;
DROP TEMPORARY TABLE IF EXISTS temp_order_meta;
DROP TEMPORARY TABLE IF EXISTS temp_order_discounts;
DROP TEMPORARY TABLE IF EXISTS temp_order_taxes;
DROP TEMPORARY TABLE IF EXISTS temp_order_costs;
`);
// Only update sync status if we get here (no errors thrown)
// Update sync status
await localConnection.query(`
INSERT INTO sync_status (table_name, last_sync_timestamp)
VALUES ('orders', NOW())
ON DUPLICATE KEY UPDATE last_sync_timestamp = NOW()
ON CONFLICT (table_name) DO UPDATE SET
last_sync_timestamp = NOW()
`);
return {

File diff suppressed because it is too large Load Diff

View File

@@ -10,22 +10,42 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
const [syncInfo] = await localConnection.query(
"SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'purchase_orders'"
);
const lastSyncTime = syncInfo?.[0]?.last_sync_timestamp || '1970-01-01';
const lastSyncTime = syncInfo?.rows?.[0]?.last_sync_timestamp || '1970-01-01';
console.log('Purchase Orders: Using last sync time:', lastSyncTime);
// Insert temporary table creation query for purchase orders
// Create temporary tables with PostgreSQL syntax
await localConnection.query(`
CREATE TABLE IF NOT EXISTS temp_purchase_orders (
po_id INT UNSIGNED NOT NULL,
pid INT UNSIGNED NOT NULL,
DROP TABLE IF EXISTS temp_purchase_orders;
DROP TABLE IF EXISTS temp_po_receivings;
CREATE TEMP TABLE temp_purchase_orders (
po_id INTEGER NOT NULL,
pid INTEGER NOT NULL,
sku VARCHAR(50),
name VARCHAR(255),
vendor VARCHAR(255),
date DATE,
expected_date DATE,
status INT,
date TIMESTAMP WITH TIME ZONE,
expected_date TIMESTAMP WITH TIME ZONE,
status INTEGER,
notes TEXT,
ordered INTEGER,
cost_price DECIMAL(10,3),
PRIMARY KEY (po_id, pid)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
);
CREATE TEMP TABLE temp_po_receivings (
po_id INTEGER,
pid INTEGER NOT NULL,
receiving_id INTEGER NOT NULL,
qty_each INTEGER,
cost_each DECIMAL(10,3),
received_date TIMESTAMP WITH TIME ZONE,
received_by INTEGER,
received_by_name VARCHAR(255),
is_alt_po INTEGER,
PRIMARY KEY (receiving_id, pid)
);
`);
outputProgress({
@@ -33,8 +53,8 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
status: "running",
});
// Get column names first
const [columns] = await localConnection.query(`
// Get column names - Keep MySQL compatible for production
const [columns] = await prodConnection.query(`
SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = 'purchase_orders'
@@ -60,7 +80,7 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime]
: [];
// First get all relevant PO IDs with basic info
// First get all relevant PO IDs with basic info - Keep MySQL compatible for production
const [[{ total }]] = await prodConnection.query(`
SELECT COUNT(*) as total
FROM (
@@ -99,6 +119,7 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
console.log('Purchase Orders: Found changes:', total);
// Get PO list - Keep MySQL compatible for production
const [poList] = await prodConnection.query(`
SELECT DISTINCT
COALESCE(p.po_id, r.receiving_id) as po_id,
@@ -109,12 +130,12 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
) as vendor,
CASE
WHEN p.po_id IS NOT NULL THEN
DATE(COALESCE(
COALESCE(
NULLIF(p.date_ordered, '0000-00-00 00:00:00'),
p.date_created
))
)
WHEN r.receiving_id IS NOT NULL THEN
DATE(r.date_created)
r.date_created
END as date,
CASE
WHEN p.date_estin = '0000-00-00' THEN NULL
@@ -185,14 +206,14 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
const batch = poList.slice(i, Math.min(i + BATCH_SIZE, poList.length));
const poIds = batch.map(po => po.po_id);
// Get all products for these POs in one query
// Get all products for these POs in one query - Keep MySQL compatible for production
const [poProducts] = await prodConnection.query(`
SELECT
pop.po_id,
pop.pid,
pr.itemnumber as sku,
pr.description as name,
pop.cost_each,
pop.cost_each as cost_price,
pop.qty_each as ordered
FROM po_products pop
USE INDEX (PRIMARY)
@@ -232,317 +253,397 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
ORDER BY r.po_id, rp.pid, rp.received_date
`, [batchPoIds, productPids]);
// Create maps for this sub-batch
const poProductMap = new Map();
productBatch.forEach(product => {
const key = `${product.po_id}-${product.pid}`;
poProductMap.set(key, product);
});
const receivingMap = new Map();
const altReceivingMap = new Map();
const noPOReceivingMap = new Map();
receivings.forEach(receiving => {
const key = `${receiving.po_id}-${receiving.pid}`;
if (receiving.is_alt_po === 2) {
// No PO
if (!noPOReceivingMap.has(receiving.pid)) {
noPOReceivingMap.set(receiving.pid, []);
}
noPOReceivingMap.get(receiving.pid).push(receiving);
} else if (receiving.is_alt_po === 1) {
// Different PO
if (!altReceivingMap.has(receiving.pid)) {
altReceivingMap.set(receiving.pid, []);
}
altReceivingMap.get(receiving.pid).push(receiving);
} else {
// Original PO
if (!receivingMap.has(key)) {
receivingMap.set(key, []);
}
receivingMap.get(key).push(receiving);
}
});
// Verify PIDs exist
const [existingPids] = await localConnection.query(
'SELECT pid FROM products WHERE pid IN (?)',
[productPids]
);
const validPids = new Set(existingPids.map(p => p.pid));
// First check which PO lines already exist and get their current values
const poLines = Array.from(poProductMap.values())
.filter(p => validPids.has(p.pid))
.map(p => [p.po_id, p.pid]);
const [existingPOs] = await localConnection.query(
`SELECT ${columnNames.join(',')} FROM purchase_orders WHERE (po_id, pid) IN (${poLines.map(() => "(?,?)").join(",")})`,
poLines.flat()
);
const existingPOMap = new Map(
existingPOs.map(po => [`${po.po_id}-${po.pid}`, po])
);
// Split into inserts and updates
const insertsAndUpdates = { inserts: [], updates: [] };
let batchProcessed = 0;
for (const po of batch) {
const poProducts = Array.from(poProductMap.values())
.filter(p => p.po_id === po.po_id && validPids.has(p.pid));
for (const product of poProducts) {
const key = `${po.po_id}-${product.pid}`;
const receivingHistory = receivingMap.get(key) || [];
const altReceivingHistory = altReceivingMap.get(product.pid) || [];
const noPOReceivingHistory = noPOReceivingMap.get(product.pid) || [];
// Insert receivings into temp table
if (receivings.length > 0) {
// Process in smaller chunks to avoid parameter limits
const CHUNK_SIZE = 100; // Reduce chunk size to avoid parameter limits
for (let i = 0; i < receivings.length; i += CHUNK_SIZE) {
const chunk = receivings.slice(i, Math.min(i + CHUNK_SIZE, receivings.length));
// Combine all receivings and sort by date
const allReceivings = [
...receivingHistory.map(r => ({ ...r, type: 'original' })),
...altReceivingHistory.map(r => ({ ...r, type: 'alternate' })),
...noPOReceivingHistory.map(r => ({ ...r, type: 'no_po' }))
].sort((a, b) => new Date(a.received_date || '9999-12-31') - new Date(b.received_date || '9999-12-31'));
// Split receivings into original PO and others
const originalPOReceivings = allReceivings.filter(r => r.type === 'original');
const otherReceivings = allReceivings.filter(r => r.type !== 'original');
// Track FIFO fulfillment
let remainingToFulfill = product.ordered;
const fulfillmentTracking = [];
let totalReceived = 0;
let actualCost = null; // Will store the cost of the first receiving that fulfills this PO
let firstFulfillmentReceiving = null;
let lastFulfillmentReceiving = null;
for (const receiving of allReceivings) {
// Convert quantities to base units using supplier data
const baseQtyReceived = receiving.qty_each * (
receiving.type === 'original' ? 1 :
Math.max(1, product.supplier_qty_per_unit || 1)
);
const qtyToApply = Math.min(remainingToFulfill, baseQtyReceived);
if (qtyToApply > 0) {
// If this is the first receiving being applied, use its cost
if (actualCost === null && receiving.cost_each > 0) {
actualCost = receiving.cost_each;
firstFulfillmentReceiving = receiving;
}
lastFulfillmentReceiving = receiving;
fulfillmentTracking.push({
receiving_id: receiving.receiving_id,
qty_applied: qtyToApply,
qty_total: baseQtyReceived,
cost: receiving.cost_each || actualCost || product.cost_each,
date: receiving.received_date,
received_by: receiving.received_by,
received_by_name: receiving.received_by_name || 'Unknown',
type: receiving.type,
remaining_qty: baseQtyReceived - qtyToApply
});
remainingToFulfill -= qtyToApply;
} else {
// Track excess receivings
fulfillmentTracking.push({
receiving_id: receiving.receiving_id,
qty_applied: 0,
qty_total: baseQtyReceived,
cost: receiving.cost_each || actualCost || product.cost_each,
date: receiving.received_date,
received_by: receiving.received_by,
received_by_name: receiving.received_by_name || 'Unknown',
type: receiving.type,
is_excess: true
});
}
totalReceived += baseQtyReceived;
}
const receiving_status = !totalReceived ? 1 : // created
remainingToFulfill > 0 ? 30 : // partial
40; // full
function formatDate(dateStr) {
if (!dateStr) return null;
if (dateStr === '0000-00-00' || dateStr === '0000-00-00 00:00:00') return null;
if (typeof dateStr === 'string' && !dateStr.match(/^\d{4}-\d{2}-\d{2}/)) return null;
try {
const date = new Date(dateStr);
if (isNaN(date.getTime())) return null;
if (date.getFullYear() < 1900 || date.getFullYear() > 2100) return null;
return date.toISOString().split('T')[0];
} catch (e) {
return null;
}
}
const rowValues = columnNames.map(col => {
switch (col) {
case 'po_id': return po.po_id;
case 'vendor': return po.vendor;
case 'date': return formatDate(po.date);
case 'expected_date': return formatDate(po.expected_date);
case 'pid': return product.pid;
case 'sku': return product.sku;
case 'name': return product.name;
case 'cost_price': return actualCost || product.cost_each;
case 'po_cost_price': return product.cost_each;
case 'status': return po.status;
case 'notes': return po.notes;
case 'long_note': return po.long_note;
case 'ordered': return product.ordered;
case 'received': return totalReceived;
case 'unfulfilled': return remainingToFulfill;
case 'excess_received': return Math.max(0, totalReceived - product.ordered);
case 'received_date': return formatDate(firstFulfillmentReceiving?.received_date);
case 'last_received_date': return formatDate(lastFulfillmentReceiving?.received_date);
case 'received_by': return firstFulfillmentReceiving?.received_by_name || null;
case 'receiving_status': return receiving_status;
case 'receiving_history': return JSON.stringify({
fulfillment: fulfillmentTracking,
ordered_qty: product.ordered,
total_received: totalReceived,
remaining_unfulfilled: remainingToFulfill,
excess_received: Math.max(0, totalReceived - product.ordered),
po_cost: product.cost_each,
actual_cost: actualCost || product.cost_each
});
default: return null;
}
const values = [];
const placeholders = [];
chunk.forEach((r, idx) => {
values.push(
r.po_id,
r.pid,
r.receiving_id,
r.qty_each,
r.cost_each,
r.received_date,
r.received_by,
r.received_by_name || null,
r.is_alt_po
);
const offset = idx * 9;
placeholders.push(`($${offset + 1}, $${offset + 2}, $${offset + 3}, $${offset + 4}, $${offset + 5}, $${offset + 6}, $${offset + 7}, $${offset + 8}, $${offset + 9})`);
});
if (existingPOMap.has(key)) {
const existing = existingPOMap.get(key);
// Check if any values are different
const hasChanges = columnNames.some(col => {
const newVal = rowValues[columnNames.indexOf(col)];
const oldVal = existing[col] ?? null;
// Special handling for numbers to avoid type coercion issues
if (typeof newVal === 'number' && typeof oldVal === 'number') {
return Math.abs(newVal - oldVal) > 0.00001; // Allow for tiny floating point differences
}
// Special handling for receiving_history - parse and compare
if (col === 'receiving_history') {
const newHistory = JSON.parse(newVal || '{}');
const oldHistory = JSON.parse(oldVal || '{}');
return JSON.stringify(newHistory) !== JSON.stringify(oldHistory);
}
return newVal !== oldVal;
});
if (hasChanges) {
insertsAndUpdates.updates.push({
po_id: po.po_id,
pid: product.pid,
values: rowValues
});
}
} else {
insertsAndUpdates.inserts.push({
po_id: po.po_id,
pid: product.pid,
values: rowValues
});
}
batchProcessed++;
await localConnection.query(`
INSERT INTO temp_po_receivings (
po_id, pid, receiving_id, qty_each, cost_each, received_date,
received_by, received_by_name, is_alt_po
)
VALUES ${placeholders.join(',')}
ON CONFLICT (receiving_id, pid) DO UPDATE SET
po_id = EXCLUDED.po_id,
qty_each = EXCLUDED.qty_each,
cost_each = EXCLUDED.cost_each,
received_date = EXCLUDED.received_date,
received_by = EXCLUDED.received_by,
received_by_name = EXCLUDED.received_by_name,
is_alt_po = EXCLUDED.is_alt_po
`, values);
}
}
// Handle inserts
if (insertsAndUpdates.inserts.length > 0) {
const insertPlaceholders = insertsAndUpdates.inserts
.map(() => `(${Array(columnNames.length).fill("?").join(",")})`)
.join(",");
const insertResult = await localConnection.query(`
INSERT INTO purchase_orders (${columnNames.join(",")})
VALUES ${insertPlaceholders}
`, insertsAndUpdates.inserts.map(i => i.values).flat());
const affectedRows = insertResult[0].affectedRows;
// For an upsert, MySQL counts rows twice for updates
// So if affectedRows is odd, we have (updates * 2 + inserts)
const updates = Math.floor(affectedRows / 2);
const inserts = affectedRows - (updates * 2);
// Process each PO product in chunks
const PRODUCT_CHUNK_SIZE = 100;
for (let i = 0; i < productBatch.length; i += PRODUCT_CHUNK_SIZE) {
const chunk = productBatch.slice(i, Math.min(i + PRODUCT_CHUNK_SIZE, productBatch.length));
const values = [];
const placeholders = [];
recordsAdded += inserts;
recordsUpdated += Math.floor(updates); // Ensure we never have fractional updates
processed += batchProcessed;
}
// Handle updates - now we know these actually have changes
if (insertsAndUpdates.updates.length > 0) {
const updatePlaceholders = insertsAndUpdates.updates
.map(() => `(${Array(columnNames.length).fill("?").join(",")})`)
.join(",");
const updateResult = await localConnection.query(`
INSERT INTO purchase_orders (${columnNames.join(",")})
VALUES ${updatePlaceholders}
ON DUPLICATE KEY UPDATE ${columnNames
.filter((col) => col !== "po_id" && col !== "pid")
.map((col) => `${col} = VALUES(${col})`)
.join(",")};
`, insertsAndUpdates.updates.map(u => u.values).flat());
const affectedRows = updateResult[0].affectedRows;
// For an upsert, MySQL counts rows twice for updates
// So if affectedRows is odd, we have (updates * 2 + inserts)
const updates = Math.floor(affectedRows / 2);
const inserts = affectedRows - (updates * 2);
recordsUpdated += Math.floor(updates); // Ensure we never have fractional updates
processed += batchProcessed;
}
// Update progress based on time interval
const now = Date.now();
if (now - lastProgressUpdate >= PROGRESS_INTERVAL || processed === totalItems) {
outputProgress({
status: "running",
operation: "Purchase orders import",
current: processed,
total: totalItems,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, processed, totalItems),
rate: calculateRate(startTime, processed)
chunk.forEach((product, idx) => {
const po = batch.find(p => p.po_id === product.po_id);
if (!po) return;
values.push(
product.po_id,
product.pid,
product.sku,
product.name,
po.vendor,
po.date,
po.expected_date,
po.status,
po.notes || po.long_note,
product.ordered,
product.cost_price
);
const offset = idx * 11; // Updated to match 11 fields
placeholders.push(`($${offset + 1}, $${offset + 2}, $${offset + 3}, $${offset + 4}, $${offset + 5}, $${offset + 6}, $${offset + 7}, $${offset + 8}, $${offset + 9}, $${offset + 10}, $${offset + 11})`);
});
lastProgressUpdate = now;
if (placeholders.length > 0) {
await localConnection.query(`
INSERT INTO temp_purchase_orders (
po_id, pid, sku, name, vendor, date, expected_date,
status, notes, ordered, cost_price
)
VALUES ${placeholders.join(',')}
ON CONFLICT (po_id, pid) DO UPDATE SET
sku = EXCLUDED.sku,
name = EXCLUDED.name,
vendor = EXCLUDED.vendor,
date = EXCLUDED.date,
expected_date = EXCLUDED.expected_date,
status = EXCLUDED.status,
notes = EXCLUDED.notes,
ordered = EXCLUDED.ordered,
cost_price = EXCLUDED.cost_price
`, values);
}
processed += chunk.length;
// Update progress based on time interval
const now = Date.now();
if (now - lastProgressUpdate >= PROGRESS_INTERVAL || processed === totalItems) {
outputProgress({
status: "running",
operation: "Purchase orders import",
current: processed,
total: totalItems,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, processed, totalItems),
rate: calculateRate(startTime, processed)
});
lastProgressUpdate = now;
}
}
}
}
// Only update sync status if we get here (no errors thrown)
// Insert final data into purchase_orders table in chunks
const FINAL_CHUNK_SIZE = 1000;
let totalProcessed = 0;
const totalPosResult = await localConnection.query('SELECT COUNT(*) as total_pos FROM temp_purchase_orders');
const total_pos = parseInt(totalPosResult.rows?.[0]?.total_pos || '0', 10);
outputProgress({
status: "running",
operation: "Purchase orders final import",
message: `Processing ${total_pos} purchase orders for final import`,
current: 0,
total: total_pos
});
// Process in chunks using cursor-based pagination
let lastPoId = 0;
let lastPid = 0;
let recordsAdded = 0;
let recordsUpdated = 0;
while (true) {
console.log('Fetching next chunk with lastPoId:', lastPoId, 'lastPid:', lastPid);
const chunkResult = await localConnection.query(`
SELECT po_id, pid FROM temp_purchase_orders
WHERE (po_id, pid) > ($1, $2)
ORDER BY po_id, pid
LIMIT $3
`, [lastPoId, lastPid, FINAL_CHUNK_SIZE]);
if (!chunkResult?.rows) {
console.error('No rows returned from chunk query:', chunkResult);
break;
}
const chunk = chunkResult.rows;
console.log('Got chunk of size:', chunk.length);
if (chunk.length === 0) break;
const result = await localConnection.query(`
WITH inserted_pos AS (
INSERT INTO purchase_orders (
po_id, pid, sku, name, cost_price, po_cost_price,
vendor, date, expected_date, status, notes,
ordered, received, receiving_status,
received_date, last_received_date, received_by,
receiving_history
)
SELECT
po.po_id,
po.pid,
po.sku,
po.name,
COALESCE(
(
SELECT cost_each
FROM temp_po_receivings r2
WHERE r2.pid = po.pid
AND r2.po_id = po.po_id
AND r2.is_alt_po = 0
AND r2.cost_each > 0
ORDER BY r2.received_date
LIMIT 1
),
po.cost_price
) as cost_price,
po.cost_price as po_cost_price,
po.vendor,
po.date,
po.expected_date,
po.status,
po.notes,
po.ordered,
COALESCE(SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END), 0) as received,
CASE
WHEN COUNT(r.receiving_id) = 0 THEN 1 -- created
WHEN SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END) < po.ordered THEN 30 -- partial
ELSE 40 -- full
END as receiving_status,
MIN(CASE WHEN r.is_alt_po = 0 THEN r.received_date END) as received_date,
MAX(CASE WHEN r.is_alt_po = 0 THEN r.received_date END) as last_received_date,
(
SELECT r2.received_by_name
FROM temp_po_receivings r2
WHERE r2.pid = po.pid
AND r2.is_alt_po = 0
ORDER BY r2.received_date
LIMIT 1
) as received_by,
jsonb_build_object(
'ordered_qty', po.ordered,
'total_received', COALESCE(SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END), 0),
'remaining_unfulfilled', GREATEST(0, po.ordered - COALESCE(SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END), 0)),
'excess_received', GREATEST(0, COALESCE(SUM(CASE WHEN r.is_alt_po = 0 THEN r.qty_each END), 0) - po.ordered),
'po_cost', po.cost_price,
'actual_cost', COALESCE(
(
SELECT cost_each
FROM temp_po_receivings r2
WHERE r2.pid = po.pid
AND r2.is_alt_po = 0
AND r2.cost_each > 0
ORDER BY r2.received_date
LIMIT 1
),
po.cost_price
),
'fulfillment', (
SELECT jsonb_agg(
jsonb_build_object(
'receiving_id', r2.receiving_id,
'qty_applied', CASE
WHEN r2.running_total <= po.ordered THEN r2.qty_each
WHEN r2.running_total - r2.qty_each < po.ordered THEN po.ordered - (r2.running_total - r2.qty_each)
ELSE 0
END,
'qty_total', r2.qty_each,
'cost', r2.cost_each,
'date', r2.received_date,
'received_by', r2.received_by,
'received_by_name', r2.received_by_name,
'type', CASE r2.is_alt_po
WHEN 0 THEN 'original'
WHEN 1 THEN 'alternate'
ELSE 'no_po'
END,
'remaining_qty', CASE
WHEN r2.running_total <= po.ordered THEN 0
WHEN r2.running_total - r2.qty_each < po.ordered THEN r2.running_total - po.ordered
ELSE r2.qty_each
END,
'is_excess', r2.running_total > po.ordered
)
ORDER BY r2.received_date
)
FROM (
SELECT
r2.*,
SUM(r2.qty_each) OVER (
PARTITION BY r2.pid
ORDER BY r2.received_date
ROWS UNBOUNDED PRECEDING
) as running_total
FROM temp_po_receivings r2
WHERE r2.pid = po.pid
) r2
),
'alternate_po_receivings', (
SELECT jsonb_agg(
jsonb_build_object(
'receiving_id', r2.receiving_id,
'qty', r2.qty_each,
'cost', r2.cost_each,
'date', r2.received_date,
'received_by', r2.received_by,
'received_by_name', r2.received_by_name
)
ORDER BY r2.received_date
)
FROM temp_po_receivings r2
WHERE r2.pid = po.pid AND r2.is_alt_po = 1
),
'no_po_receivings', (
SELECT jsonb_agg(
jsonb_build_object(
'receiving_id', r2.receiving_id,
'qty', r2.qty_each,
'cost', r2.cost_each,
'date', r2.received_date,
'received_by', r2.received_by,
'received_by_name', r2.received_by_name
)
ORDER BY r2.received_date
)
FROM temp_po_receivings r2
WHERE r2.pid = po.pid AND r2.is_alt_po = 2
)
) as receiving_history
FROM temp_purchase_orders po
LEFT JOIN temp_po_receivings r ON po.pid = r.pid
WHERE (po.po_id, po.pid) IN (
SELECT po_id, pid FROM UNNEST($1::int[], $2::int[])
)
GROUP BY po.po_id, po.pid, po.sku, po.name, po.vendor, po.date,
po.expected_date, po.status, po.notes, po.ordered, po.cost_price
ON CONFLICT (po_id, pid) DO UPDATE SET
vendor = EXCLUDED.vendor,
date = EXCLUDED.date,
expected_date = EXCLUDED.expected_date,
status = EXCLUDED.status,
notes = EXCLUDED.notes,
ordered = EXCLUDED.ordered,
received = EXCLUDED.received,
receiving_status = EXCLUDED.receiving_status,
received_date = EXCLUDED.received_date,
last_received_date = EXCLUDED.last_received_date,
received_by = EXCLUDED.received_by,
receiving_history = EXCLUDED.receiving_history,
cost_price = EXCLUDED.cost_price,
po_cost_price = EXCLUDED.po_cost_price
RETURNING xmax
)
SELECT
COUNT(*) FILTER (WHERE xmax = 0) as inserted,
COUNT(*) FILTER (WHERE xmax <> 0) as updated
FROM inserted_pos
`, [
chunk.map(r => r.po_id),
chunk.map(r => r.pid)
]);
// Add debug logging
console.log('Insert result:', result?.rows?.[0]);
// Handle the result properly for PostgreSQL with more defensive coding
const resultRow = result?.rows?.[0] || {};
const insertCount = parseInt(resultRow.inserted || '0', 10);
const updateCount = parseInt(resultRow.updated || '0', 10);
recordsAdded += insertCount;
recordsUpdated += updateCount;
totalProcessed += chunk.length;
// Update progress
outputProgress({
status: "running",
operation: "Purchase orders final import",
message: `Processed ${totalProcessed} of ${total_pos} purchase orders`,
current: totalProcessed,
total: total_pos,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, totalProcessed, total_pos),
rate: calculateRate(startTime, totalProcessed)
});
// Update last processed IDs for next chunk with safety check
if (chunk.length > 0) {
const lastItem = chunk[chunk.length - 1];
if (lastItem) {
lastPoId = lastItem.po_id;
lastPid = lastItem.pid;
}
}
}
// Update sync status
await localConnection.query(`
INSERT INTO sync_status (table_name, last_sync_timestamp)
VALUES ('purchase_orders', NOW())
ON DUPLICATE KEY UPDATE
last_sync_timestamp = NOW(),
last_sync_id = LAST_INSERT_ID(last_sync_id)
ON CONFLICT (table_name) DO UPDATE SET
last_sync_timestamp = NOW()
`);
// Clean up temporary tables
await localConnection.query(`
DROP TABLE IF EXISTS temp_purchase_orders;
DROP TABLE IF EXISTS temp_po_receivings;
`);
return {
status: "complete",
totalImported: totalItems,
recordsAdded: recordsAdded || 0,
recordsUpdated: recordsUpdated || 0,
incrementalUpdate,
lastSyncTime
recordsAdded,
recordsUpdated,
totalRecords: processed
};
} catch (error) {
outputProgress({
operation: `${incrementalUpdate ? 'Incremental' : 'Full'} purchase orders import failed`,
status: "error",
error: error.message,
});
console.error("Error during purchase orders import:", error);
// Attempt cleanup on error
try {
await localConnection.query(`
DROP TABLE IF EXISTS temp_purchase_orders;
DROP TABLE IF EXISTS temp_po_receivings;
`);
} catch (cleanupError) {
console.error('Error during cleanup:', cleanupError);
}
throw error;
}
}
module.exports = importPurchaseOrders;
module.exports = importPurchaseOrders;

View File

@@ -1,5 +1,6 @@
const mysql = require("mysql2/promise");
const { Client } = require("ssh2");
const { Pool } = require('pg');
const dotenv = require("dotenv");
const path = require("path");
@@ -41,23 +42,90 @@ async function setupSshTunnel(sshConfig) {
async function setupConnections(sshConfig) {
const tunnel = await setupSshTunnel(sshConfig);
// Setup MySQL connection for production
const prodConnection = await mysql.createConnection({
...sshConfig.prodDbConfig,
stream: tunnel.stream,
});
const localConnection = await mysql.createPool({
...sshConfig.localDbConfig,
waitForConnections: true,
connectionLimit: 10,
queueLimit: 0
});
// Setup PostgreSQL connection pool for local
const localPool = new Pool(sshConfig.localDbConfig);
return {
ssh: tunnel.ssh,
prodConnection,
localConnection
// Test the PostgreSQL connection
try {
const client = await localPool.connect();
await client.query('SELECT NOW()');
client.release();
console.log('PostgreSQL connection successful');
} catch (err) {
console.error('PostgreSQL connection error:', err);
throw err;
}
// Create a wrapper for the PostgreSQL pool to match MySQL interface
const localConnection = {
_client: null,
_transactionActive: false,
query: async (text, params) => {
// If we're not in a transaction, use the pool directly
if (!localConnection._transactionActive) {
const client = await localPool.connect();
try {
const result = await client.query(text, params);
return [result];
} finally {
client.release();
}
}
// If we're in a transaction, use the dedicated client
if (!localConnection._client) {
throw new Error('No active transaction client');
}
const result = await localConnection._client.query(text, params);
return [result];
},
beginTransaction: async () => {
if (localConnection._transactionActive) {
throw new Error('Transaction already active');
}
localConnection._client = await localPool.connect();
await localConnection._client.query('BEGIN');
localConnection._transactionActive = true;
},
commit: async () => {
if (!localConnection._transactionActive) {
throw new Error('No active transaction to commit');
}
await localConnection._client.query('COMMIT');
localConnection._client.release();
localConnection._client = null;
localConnection._transactionActive = false;
},
rollback: async () => {
if (!localConnection._transactionActive) {
throw new Error('No active transaction to rollback');
}
await localConnection._client.query('ROLLBACK');
localConnection._client.release();
localConnection._client = null;
localConnection._transactionActive = false;
},
end: async () => {
if (localConnection._client) {
localConnection._client.release();
localConnection._client = null;
}
await localPool.end();
}
};
return { prodConnection, localConnection, tunnel };
}
// Helper function to close connections

File diff suppressed because it is too large Load Diff

View File

@@ -1,180 +0,0 @@
const path = require('path');
const fs = require('fs');
const axios = require('axios');
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress');
// Change working directory to script directory
process.chdir(path.dirname(__filename));
require('dotenv').config({ path: path.resolve(__dirname, '..', '.env') });
const FILES = [
{
name: '39f2x83-products.csv',
url: process.env.PRODUCTS_CSV_URL
},
{
name: '39f2x83-orders.csv',
url: process.env.ORDERS_CSV_URL
},
{
name: '39f2x83-purchase_orders.csv',
url: process.env.PURCHASE_ORDERS_CSV_URL
}
];
let isCancelled = false;
function cancelUpdate() {
isCancelled = true;
outputProgress({
status: 'cancelled',
operation: 'CSV update cancelled',
current: 0,
total: FILES.length,
elapsed: null,
remaining: null,
rate: 0
});
}
async function downloadFile(file, index, startTime) {
if (isCancelled) return;
const csvDir = path.join(__dirname, '../csv');
if (!fs.existsSync(csvDir)) {
fs.mkdirSync(csvDir, { recursive: true });
}
const writer = fs.createWriteStream(path.join(csvDir, file.name));
try {
const response = await axios({
url: file.url,
method: 'GET',
responseType: 'stream'
});
const totalLength = response.headers['content-length'];
let downloadedLength = 0;
let lastProgressUpdate = Date.now();
const PROGRESS_INTERVAL = 1000; // Update progress every second
response.data.on('data', (chunk) => {
if (isCancelled) {
writer.end();
return;
}
downloadedLength += chunk.length;
// Update progress based on time interval
const now = Date.now();
if (now - lastProgressUpdate >= PROGRESS_INTERVAL) {
const progress = (downloadedLength / totalLength) * 100;
outputProgress({
status: 'running',
operation: `Downloading ${file.name}`,
current: index + (downloadedLength / totalLength),
total: FILES.length,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, index + (downloadedLength / totalLength), FILES.length),
rate: calculateRate(startTime, index + (downloadedLength / totalLength)),
percentage: progress.toFixed(1),
file_progress: {
name: file.name,
downloaded: downloadedLength,
total: totalLength,
percentage: progress.toFixed(1)
}
});
lastProgressUpdate = now;
}
});
response.data.pipe(writer);
return new Promise((resolve, reject) => {
writer.on('finish', resolve);
writer.on('error', reject);
});
} catch (error) {
fs.unlinkSync(path.join(csvDir, file.name));
throw error;
}
}
// Main function to update all files
async function updateFiles() {
const startTime = Date.now();
outputProgress({
status: 'running',
operation: 'Starting CSV update',
current: 0,
total: FILES.length,
elapsed: '0s',
remaining: null,
rate: 0,
percentage: '0'
});
try {
for (let i = 0; i < FILES.length; i++) {
if (isCancelled) {
return;
}
const file = FILES[i];
await downloadFile(file, i, startTime);
outputProgress({
status: 'running',
operation: 'CSV update in progress',
current: i + 1,
total: FILES.length,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, i + 1, FILES.length),
rate: calculateRate(startTime, i + 1),
percentage: (((i + 1) / FILES.length) * 100).toFixed(1)
});
}
outputProgress({
status: 'complete',
operation: 'CSV update complete',
current: FILES.length,
total: FILES.length,
elapsed: formatElapsedTime(startTime),
remaining: '0s',
rate: calculateRate(startTime, FILES.length),
percentage: '100'
});
} catch (error) {
outputProgress({
status: 'error',
operation: 'CSV update failed',
error: error.message,
current: 0,
total: FILES.length,
elapsed: formatElapsedTime(startTime),
remaining: null,
rate: 0
});
throw error;
}
}
// Run the update only if this is the main module
if (require.main === module) {
updateFiles().catch((error) => {
console.error('Error updating CSV files:', error);
process.exit(1);
});
}
// Export the functions needed by the route
module.exports = {
updateFiles,
cancelUpdate
};

View File

@@ -1,4 +1,4 @@
const mysql = require('mysql2/promise');
const { Client } = require('pg');
const path = require('path');
const dotenv = require('dotenv');
const fs = require('fs');
@@ -10,7 +10,7 @@ const dbConfig = {
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
database: process.env.DB_NAME,
multipleStatements: true
port: process.env.DB_PORT || 5432
};
// Helper function to output progress in JSON format
@@ -54,14 +54,44 @@ function splitSQLStatements(sql) {
let currentStatement = '';
let inString = false;
let stringChar = '';
let inDollarQuote = false;
let dollarQuoteTag = '';
// Process character by character
for (let i = 0; i < sql.length; i++) {
const char = sql[i];
const nextChar = sql[i + 1] || '';
// Handle string literals
if ((char === "'" || char === '"') && sql[i - 1] !== '\\') {
// Handle dollar quotes
if (char === '$' && !inString) {
// Look ahead to find the dollar quote tag
let tag = '$';
let j = i + 1;
while (j < sql.length && sql[j] !== '$') {
tag += sql[j];
j++;
}
tag += '$';
if (j < sql.length) { // Found closing $
if (!inDollarQuote) {
inDollarQuote = true;
dollarQuoteTag = tag;
currentStatement += tag;
i = j;
continue;
} else if (sql.substring(i, j + 1) === dollarQuoteTag) {
inDollarQuote = false;
dollarQuoteTag = '';
currentStatement += tag;
i = j;
continue;
}
}
}
// Handle string literals (only if not in dollar quote)
if (!inDollarQuote && (char === "'" || char === '"') && sql[i - 1] !== '\\') {
if (!inString) {
inString = true;
stringChar = char;
@@ -70,23 +100,25 @@ function splitSQLStatements(sql) {
}
}
// Handle comments
if (!inString && char === '-' && nextChar === '-') {
// Skip to end of line
while (i < sql.length && sql[i] !== '\n') i++;
continue;
// Handle comments (only if not in string or dollar quote)
if (!inString && !inDollarQuote) {
if (char === '-' && nextChar === '-') {
// Skip to end of line
while (i < sql.length && sql[i] !== '\n') i++;
continue;
}
if (char === '/' && nextChar === '*') {
// Skip until closing */
i += 2;
while (i < sql.length && (sql[i] !== '*' || sql[i + 1] !== '/')) i++;
i++; // Skip the closing /
continue;
}
}
if (!inString && char === '/' && nextChar === '*') {
// Skip until closing */
i += 2;
while (i < sql.length && (sql[i] !== '*' || sql[i + 1] !== '/')) i++;
i++; // Skip the closing /
continue;
}
// Handle statement boundaries
if (!inString && char === ';') {
// Handle statement boundaries (only if not in string or dollar quote)
if (!inString && !inDollarQuote && char === ';') {
if (currentStatement.trim()) {
statements.push(currentStatement.trim());
}
@@ -120,30 +152,26 @@ async function resetDatabase() {
}
});
const connection = await mysql.createConnection(dbConfig);
const client = new Client(dbConfig);
await client.connect();
try {
// Check MySQL privileges
// Check PostgreSQL version and user
outputProgress({
operation: 'Checking privileges',
message: 'Verifying MySQL user privileges...'
operation: 'Checking database',
message: 'Verifying PostgreSQL version and user privileges...'
});
const [grants] = await connection.query('SHOW GRANTS');
outputProgress({
operation: 'User privileges',
message: {
grants: grants.map(g => Object.values(g)[0])
}
});
// Enable warnings as errors
await connection.query('SET SESSION sql_notes = 1');
const versionResult = await client.query('SELECT version()');
const userResult = await client.query('SELECT current_user, current_database()');
// Log database config (without sensitive info)
outputProgress({
operation: 'Database config',
message: `Using database: ${dbConfig.database} on host: ${dbConfig.host}`
operation: 'Database info',
message: {
version: versionResult.rows[0].version,
user: userResult.rows[0].current_user,
database: userResult.rows[0].current_database
}
});
// Get list of all tables in the current database
@@ -152,14 +180,14 @@ async function resetDatabase() {
message: 'Retrieving all table names...'
});
const [tables] = await connection.query(`
SELECT GROUP_CONCAT(table_name) as tables
FROM information_schema.tables
WHERE table_schema = DATABASE()
AND table_name NOT IN ('users', 'import_history', 'calculate_history')
const tablesResult = await client.query(`
SELECT string_agg(tablename, ', ') as tables
FROM pg_tables
WHERE schemaname = 'public'
AND tablename NOT IN ('users', 'calculate_history', 'import_history');
`);
if (!tables[0].tables) {
if (!tablesResult.rows[0].tables) {
outputProgress({
operation: 'No tables found',
message: 'Database is already empty'
@@ -170,20 +198,73 @@ async function resetDatabase() {
message: 'Dropping all existing tables...'
});
await connection.query('SET FOREIGN_KEY_CHECKS = 0');
const dropQuery = `
DROP TABLE IF EXISTS
${tables[0].tables
.split(',')
.filter(table => !['users', 'calculate_history'].includes(table))
.map(table => '`' + table + '`')
.join(', ')}
`;
await connection.query(dropQuery);
await connection.query('SET FOREIGN_KEY_CHECKS = 1');
// Disable triggers/foreign key checks
await client.query('SET session_replication_role = \'replica\';');
// Drop all tables except users
const tables = tablesResult.rows[0].tables.split(', ');
for (const table of tables) {
if (!['users'].includes(table)) {
await client.query(`DROP TABLE IF EXISTS "${table}" CASCADE`);
}
}
// Only drop types if we're not preserving history tables
const historyTablesExist = await client.query(`
SELECT EXISTS (
SELECT FROM pg_tables
WHERE schemaname = 'public'
AND tablename IN ('calculate_history', 'import_history')
);
`);
if (!historyTablesExist.rows[0].exists) {
await client.query('DROP TYPE IF EXISTS calculation_status CASCADE;');
await client.query('DROP TYPE IF EXISTS module_name CASCADE;');
}
// Re-enable triggers/foreign key checks
await client.query('SET session_replication_role = \'origin\';');
}
// Read and execute main schema (core tables)
// Create enum types if they don't exist
outputProgress({
operation: 'Creating enum types',
message: 'Setting up required enum types...'
});
// Check if types exist before creating
const typesExist = await client.query(`
SELECT EXISTS (
SELECT 1 FROM pg_type
WHERE typname = 'calculation_status'
) as calc_status_exists,
EXISTS (
SELECT 1 FROM pg_type
WHERE typname = 'module_name'
) as module_name_exists;
`);
if (!typesExist.rows[0].calc_status_exists) {
await client.query(`CREATE TYPE calculation_status AS ENUM ('running', 'completed', 'failed', 'cancelled')`);
}
if (!typesExist.rows[0].module_name_exists) {
await client.query(`
CREATE TYPE module_name AS ENUM (
'product_metrics',
'time_aggregates',
'financial_metrics',
'vendor_metrics',
'category_metrics',
'brand_metrics',
'sales_forecasts',
'abc_classification'
)
`);
}
// Read and execute main schema first (core tables)
outputProgress({
operation: 'Running database setup',
message: 'Creating core tables...'
@@ -223,35 +304,24 @@ async function resetDatabase() {
for (let i = 0; i < statements.length; i++) {
const stmt = statements[i];
try {
const [result, fields] = await connection.query(stmt);
// Check for warnings
const [warnings] = await connection.query('SHOW WARNINGS');
if (warnings && warnings.length > 0) {
outputProgress({
status: 'warning',
operation: 'SQL Warning',
statement: i + 1,
warnings: warnings
});
}
const result = await client.query(stmt);
// Verify if table was created (if this was a CREATE TABLE statement)
if (stmt.trim().toLowerCase().startsWith('create table')) {
const tableName = stmt.match(/create\s+table\s+(?:if\s+not\s+exists\s+)?`?(\w+)`?/i)?.[1];
const tableName = stmt.match(/create\s+table\s+(?:if\s+not\s+exists\s+)?["]?(\w+)["]?/i)?.[1];
if (tableName) {
const [tableExists] = await connection.query(`
const tableExists = await client.query(`
SELECT COUNT(*) as count
FROM information_schema.tables
WHERE table_schema = DATABASE()
AND table_name = ?
WHERE table_schema = 'public'
AND table_name = $1
`, [tableName]);
outputProgress({
operation: 'Table Creation Verification',
message: {
table: tableName,
exists: tableExists[0].count > 0
exists: tableExists.rows[0].count > 0
}
});
}
@@ -263,7 +333,7 @@ async function resetDatabase() {
statement: i + 1,
total: statements.length,
preview: stmt.substring(0, 100) + (stmt.length > 100 ? '...' : ''),
affectedRows: result.affectedRows
rowCount: result.rowCount
}
});
} catch (sqlError) {
@@ -271,8 +341,6 @@ async function resetDatabase() {
status: 'error',
operation: 'SQL Error',
error: sqlError.message,
sqlState: sqlError.sqlState,
errno: sqlError.errno,
statement: stmt,
statementNumber: i + 1
});
@@ -280,66 +348,12 @@ async function resetDatabase() {
}
}
// List all tables in the database after schema execution
outputProgress({
operation: 'Debug database',
message: {
currentDatabase: (await connection.query('SELECT DATABASE() as db'))[0][0].db
}
});
const [allTables] = await connection.query(`
SELECT
table_schema,
table_name,
engine,
create_time,
table_rows
// Verify core tables were created
const existingTables = (await client.query(`
SELECT table_name
FROM information_schema.tables
WHERE table_schema = DATABASE()
`);
if (allTables.length === 0) {
outputProgress({
operation: 'Warning',
message: 'No tables found in database after schema execution'
});
} else {
outputProgress({
operation: 'Tables after schema execution',
message: {
count: allTables.length,
tables: allTables.map(t => ({
schema: t.table_schema,
name: t.table_name,
engine: t.engine,
created: t.create_time,
rows: t.table_rows
}))
}
});
}
// Also check table status
const [tableStatus] = await connection.query('SHOW TABLE STATUS');
outputProgress({
operation: 'Table Status',
message: {
tables: tableStatus.map(t => ({
name: t.Name,
engine: t.Engine,
version: t.Version,
rowFormat: t.Row_format,
rows: t.Rows,
createTime: t.Create_time,
updateTime: t.Update_time
}))
}
});
// Verify core tables were created using SHOW TABLES
const [showTables] = await connection.query('SHOW TABLES');
const existingTables = showTables.map(t => Object.values(t)[0]);
WHERE table_schema = 'public'
`)).rows.map(t => t.table_name);
outputProgress({
operation: 'Core tables verification',
@@ -359,22 +373,12 @@ async function resetDatabase() {
);
}
// Verify all core tables use InnoDB
const [engineStatus] = await connection.query('SHOW TABLE STATUS WHERE Name IN (?)', [CORE_TABLES]);
const nonInnoDBTables = engineStatus.filter(t => t.Engine !== 'InnoDB');
if (nonInnoDBTables.length > 0) {
throw new Error(
`Tables using non-InnoDB engine: ${nonInnoDBTables.map(t => t.Name).join(', ')}`
);
}
outputProgress({
operation: 'Core tables created',
message: `Successfully created tables: ${CORE_TABLES.join(', ')}`
});
// Read and execute config schema
// Now read and execute config schema (since core tables exist)
outputProgress({
operation: 'Running config setup',
message: 'Creating configuration tables...'
@@ -400,18 +404,7 @@ async function resetDatabase() {
for (let i = 0; i < configStatements.length; i++) {
const stmt = configStatements[i];
try {
const [result, fields] = await connection.query(stmt);
// Check for warnings
const [warnings] = await connection.query('SHOW WARNINGS');
if (warnings && warnings.length > 0) {
outputProgress({
status: 'warning',
operation: 'Config SQL Warning',
statement: i + 1,
warnings: warnings
});
}
const result = await client.query(stmt);
outputProgress({
operation: 'Config SQL Progress',
@@ -419,7 +412,7 @@ async function resetDatabase() {
statement: i + 1,
total: configStatements.length,
preview: stmt.substring(0, 100) + (stmt.length > 100 ? '...' : ''),
affectedRows: result.affectedRows
rowCount: result.rowCount
}
});
} catch (sqlError) {
@@ -427,8 +420,6 @@ async function resetDatabase() {
status: 'error',
operation: 'Config SQL Error',
error: sqlError.message,
sqlState: sqlError.sqlState,
errno: sqlError.errno,
statement: stmt,
statementNumber: i + 1
});
@@ -436,33 +427,6 @@ async function resetDatabase() {
}
}
// Verify config tables were created
const [showConfigTables] = await connection.query('SHOW TABLES');
const existingConfigTables = showConfigTables.map(t => Object.values(t)[0]);
outputProgress({
operation: 'Config tables verification',
message: {
found: existingConfigTables,
expected: CONFIG_TABLES
}
});
const missingConfigTables = CONFIG_TABLES.filter(
t => !existingConfigTables.includes(t)
);
if (missingConfigTables.length > 0) {
throw new Error(
`Failed to create config tables: ${missingConfigTables.join(', ')}`
);
}
outputProgress({
operation: 'Config tables created',
message: `Successfully created tables: ${CONFIG_TABLES.join(', ')}`
});
// Read and execute metrics schema (metrics tables)
outputProgress({
operation: 'Running metrics setup',
@@ -489,18 +453,7 @@ async function resetDatabase() {
for (let i = 0; i < metricsStatements.length; i++) {
const stmt = metricsStatements[i];
try {
const [result, fields] = await connection.query(stmt);
// Check for warnings
const [warnings] = await connection.query('SHOW WARNINGS');
if (warnings && warnings.length > 0) {
outputProgress({
status: 'warning',
operation: 'Metrics SQL Warning',
statement: i + 1,
warnings: warnings
});
}
const result = await client.query(stmt);
outputProgress({
operation: 'Metrics SQL Progress',
@@ -508,7 +461,7 @@ async function resetDatabase() {
statement: i + 1,
total: metricsStatements.length,
preview: stmt.substring(0, 100) + (stmt.length > 100 ? '...' : ''),
affectedRows: result.affectedRows
rowCount: result.rowCount
}
});
} catch (sqlError) {
@@ -516,8 +469,6 @@ async function resetDatabase() {
status: 'error',
operation: 'Metrics SQL Error',
error: sqlError.message,
sqlState: sqlError.sqlState,
errno: sqlError.errno,
statement: stmt,
statementNumber: i + 1
});
@@ -539,7 +490,7 @@ async function resetDatabase() {
});
process.exit(1);
} finally {
await connection.end();
await client.end();
}
}

View File

@@ -1,4 +1,4 @@
const mysql = require('mysql2/promise');
const { Client } = require('pg');
const path = require('path');
const fs = require('fs');
require('dotenv').config({ path: path.resolve(__dirname, '../.env') });
@@ -8,7 +8,7 @@ const dbConfig = {
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
database: process.env.DB_NAME,
multipleStatements: true
port: process.env.DB_PORT || 5432
};
function outputProgress(data) {
@@ -34,8 +34,8 @@ const METRICS_TABLES = [
'sales_forecasts',
'temp_purchase_metrics',
'temp_sales_metrics',
'vendor_metrics', //before vendor_details for foreign key
'vendor_time_metrics', //before vendor_details for foreign key
'vendor_metrics',
'vendor_time_metrics',
'vendor_details'
];
@@ -90,31 +90,31 @@ function splitSQLStatements(sql) {
}
async function resetMetrics() {
let connection;
let client;
try {
outputProgress({
operation: 'Starting metrics reset',
message: 'Connecting to database...'
});
connection = await mysql.createConnection(dbConfig);
await connection.beginTransaction();
client = new Client(dbConfig);
await client.connect();
// First verify current state
const [initialTables] = await connection.query(`
SELECT TABLE_NAME as name
FROM information_schema.tables
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME IN (?)
const initialTables = await client.query(`
SELECT tablename as name
FROM pg_tables
WHERE schemaname = 'public'
AND tablename = ANY($1)
`, [METRICS_TABLES]);
outputProgress({
operation: 'Initial state',
message: `Found ${initialTables.length} existing metrics tables: ${initialTables.map(t => t.name).join(', ')}`
message: `Found ${initialTables.rows.length} existing metrics tables: ${initialTables.rows.map(t => t.name).join(', ')}`
});
// Disable foreign key checks at the start
await connection.query('SET FOREIGN_KEY_CHECKS = 0');
await client.query('SET session_replication_role = \'replica\'');
// Drop all metrics tables in reverse order to handle dependencies
outputProgress({
@@ -124,17 +124,17 @@ async function resetMetrics() {
for (const table of [...METRICS_TABLES].reverse()) {
try {
await connection.query(`DROP TABLE IF EXISTS ${table}`);
await client.query(`DROP TABLE IF EXISTS "${table}" CASCADE`);
// Verify the table was actually dropped
const [checkDrop] = await connection.query(`
const checkDrop = await client.query(`
SELECT COUNT(*) as count
FROM information_schema.tables
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME = ?
FROM pg_tables
WHERE schemaname = 'public'
AND tablename = $1
`, [table]);
if (checkDrop[0].count > 0) {
if (parseInt(checkDrop.rows[0].count) > 0) {
throw new Error(`Failed to drop table ${table} - table still exists`);
}
@@ -153,15 +153,15 @@ async function resetMetrics() {
}
// Verify all tables were dropped
const [afterDrop] = await connection.query(`
SELECT TABLE_NAME as name
FROM information_schema.tables
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME IN (?)
const afterDrop = await client.query(`
SELECT tablename as name
FROM pg_tables
WHERE schemaname = 'public'
AND tablename = ANY($1)
`, [METRICS_TABLES]);
if (afterDrop.length > 0) {
throw new Error(`Failed to drop all tables. Remaining tables: ${afterDrop.map(t => t.name).join(', ')}`);
if (afterDrop.rows.length > 0) {
throw new Error(`Failed to drop all tables. Remaining tables: ${afterDrop.rows.map(t => t.name).join(', ')}`);
}
// Read metrics schema
@@ -187,39 +187,26 @@ async function resetMetrics() {
for (let i = 0; i < statements.length; i++) {
const stmt = statements[i];
try {
await connection.query(stmt);
// Check for warnings
const [warnings] = await connection.query('SHOW WARNINGS');
if (warnings && warnings.length > 0) {
outputProgress({
status: 'warning',
operation: 'SQL Warning',
message: {
statement: i + 1,
warnings: warnings
}
});
}
const result = await client.query(stmt);
// If this is a CREATE TABLE statement, verify the table was created
if (stmt.trim().toLowerCase().startsWith('create table')) {
const tableName = stmt.match(/create\s+table\s+(?:if\s+not\s+exists\s+)?`?(\w+)`?/i)?.[1];
const tableName = stmt.match(/create\s+table\s+(?:if\s+not\s+exists\s+)?["]?(\w+)["]?/i)?.[1];
if (tableName) {
const [checkCreate] = await connection.query(`
SELECT TABLE_NAME as name, CREATE_TIME as created
FROM information_schema.tables
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME = ?
const checkCreate = await client.query(`
SELECT tablename as name
FROM pg_tables
WHERE schemaname = 'public'
AND tablename = $1
`, [tableName]);
if (checkCreate.length === 0) {
if (checkCreate.rows.length === 0) {
throw new Error(`Failed to create table ${tableName} - table does not exist after CREATE statement`);
}
outputProgress({
operation: 'Table created',
message: `Successfully created table: ${tableName} at ${checkCreate[0].created}`
message: `Successfully created table: ${tableName}`
});
}
}
@@ -229,7 +216,8 @@ async function resetMetrics() {
message: {
statement: i + 1,
total: statements.length,
preview: stmt.substring(0, 100) + (stmt.length > 100 ? '...' : '')
preview: stmt.substring(0, 100) + (stmt.length > 100 ? '...' : ''),
rowCount: result.rowCount
}
});
} catch (sqlError) {
@@ -238,8 +226,6 @@ async function resetMetrics() {
operation: 'SQL Error',
message: {
error: sqlError.message,
sqlState: sqlError.sqlState,
errno: sqlError.errno,
statement: stmt,
statementNumber: i + 1
}
@@ -249,7 +235,7 @@ async function resetMetrics() {
}
// Re-enable foreign key checks after all tables are created
await connection.query('SET FOREIGN_KEY_CHECKS = 1');
await client.query('SET session_replication_role = \'origin\'');
// Verify metrics tables were created
outputProgress({
@@ -257,37 +243,36 @@ async function resetMetrics() {
message: 'Checking all metrics tables were created...'
});
const [metricsTablesResult] = await connection.query(`
SELECT
TABLE_NAME as name,
TABLE_ROWS as \`rows\`,
CREATE_TIME as created
FROM information_schema.tables
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME IN (?)
const metricsTablesResult = await client.query(`
SELECT tablename as name
FROM pg_tables
WHERE schemaname = 'public'
AND tablename = ANY($1)
`, [METRICS_TABLES]);
outputProgress({
operation: 'Tables found',
message: `Found ${metricsTablesResult.length} tables: ${metricsTablesResult.map(t =>
`${t.name} (created: ${t.created})`
).join(', ')}`
message: `Found ${metricsTablesResult.rows.length} tables: ${metricsTablesResult.rows.map(t => t.name).join(', ')}`
});
const existingMetricsTables = metricsTablesResult.map(t => t.name);
const existingMetricsTables = metricsTablesResult.rows.map(t => t.name);
const missingMetricsTables = METRICS_TABLES.filter(t => !existingMetricsTables.includes(t));
if (missingMetricsTables.length > 0) {
// Do one final check of the actual tables
const [finalCheck] = await connection.query('SHOW TABLES');
const finalCheck = await client.query(`
SELECT tablename as name
FROM pg_tables
WHERE schemaname = 'public'
`);
outputProgress({
operation: 'Final table check',
message: `All database tables: ${finalCheck.map(t => Object.values(t)[0]).join(', ')}`
message: `All database tables: ${finalCheck.rows.map(t => t.name).join(', ')}`
});
throw new Error(`Failed to create metrics tables: ${missingMetricsTables.join(', ')}`);
}
await connection.commit();
await client.query('COMMIT');
outputProgress({
status: 'complete',
@@ -302,17 +287,17 @@ async function resetMetrics() {
stack: error.stack
});
if (connection) {
await connection.rollback();
if (client) {
await client.query('ROLLBACK');
// Make sure to re-enable foreign key checks even if there's an error
await connection.query('SET FOREIGN_KEY_CHECKS = 1').catch(() => {});
await client.query('SET session_replication_role = \'origin\'').catch(() => {});
}
throw error;
} finally {
if (connection) {
if (client) {
// One final attempt to ensure foreign key checks are enabled
await connection.query('SET FOREIGN_KEY_CHECKS = 1').catch(() => {});
await connection.end();
await client.query('SET session_replication_role = \'origin\'').catch(() => {});
await client.end();
}
}
}

View File

@@ -1,180 +0,0 @@
const readline = require('readline');
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout
});
const question = (query) => new Promise((resolve) => rl.question(query, resolve));
async function loadScript(name) {
try {
return await require(name);
} catch (error) {
console.error(`Failed to load script ${name}:`, error);
return null;
}
}
async function runWithTimeout(fn) {
return new Promise((resolve, reject) => {
// Create a child process for the script
const child = require('child_process').fork(fn.toString(), [], {
stdio: 'inherit'
});
child.on('exit', (code) => {
if (code === 0) {
resolve();
} else {
reject(new Error(`Script exited with code ${code}`));
}
});
child.on('error', (err) => {
reject(err);
});
});
}
function clearScreen() {
process.stdout.write('\x1Bc');
}
const scripts = {
'Import Scripts': {
'1': { name: 'Full Import From Production', path: './import-from-prod' },
'2': { name: 'Individual Import Scripts ▸', submenu: {
'1': { name: 'Import Orders', path: './import/orders', key: 'importOrders' },
'2': { name: 'Import Products', path: './import/products', key: 'importProducts' },
'3': { name: 'Import Purchase Orders', path: './import/purchase-orders' },
'4': { name: 'Import Categories', path: './import/categories' },
'b': { name: 'Back to Main Menu' }
}}
},
'Metrics': {
'3': { name: 'Calculate All Metrics', path: './calculate-metrics' },
'4': { name: 'Individual Metric Scripts ▸', submenu: {
'1': { name: 'Brand Metrics', path: './metrics/brand-metrics' },
'2': { name: 'Category Metrics', path: './metrics/category-metrics' },
'3': { name: 'Financial Metrics', path: './metrics/financial-metrics' },
'4': { name: 'Product Metrics', path: './metrics/product-metrics' },
'5': { name: 'Sales Forecasts', path: './metrics/sales-forecasts' },
'6': { name: 'Time Aggregates', path: './metrics/time-aggregates' },
'7': { name: 'Vendor Metrics', path: './metrics/vendor-metrics' },
'b': { name: 'Back to Main Menu' }
}}
},
'Database Management': {
'5': { name: 'Test Production Connection', path: './test-prod-connection' }
},
'Reset Scripts': {
'6': { name: 'Reset Database', path: './reset-db' },
'7': { name: 'Reset Metrics', path: './reset-metrics' }
}
};
let lastRun = null;
async function displayMenu(menuItems, title = 'Inventory Management Script Runner') {
clearScreen();
console.log(`\n${title}\n`);
for (const [category, items] of Object.entries(menuItems)) {
console.log(`\n${category}:`);
Object.entries(items).forEach(([key, script]) => {
console.log(`${key}. ${script.name}`);
});
}
if (lastRun) {
console.log('\nQuick Access:');
console.log(`r. Repeat Last Script (${lastRun.name})`);
}
console.log('\nq. Quit\n');
}
async function handleSubmenu(submenu, title) {
while (true) {
await displayMenu({"Individual Scripts": submenu}, title);
const choice = await question('Select an option (or b to go back): ');
if (choice.toLowerCase() === 'b') {
return null;
}
if (submenu[choice]) {
return submenu[choice];
}
console.log('Invalid selection. Please try again.');
await new Promise(resolve => setTimeout(resolve, 1000));
}
}
async function runScript(script) {
console.log(`\nRunning: ${script.name}`);
try {
const scriptPath = require.resolve(script.path);
await runWithTimeout(scriptPath);
console.log('\nScript completed successfully');
lastRun = script;
} catch (error) {
console.error('\nError running script:', error);
}
await question('\nPress Enter to continue...');
}
async function main() {
while (true) {
await displayMenu(scripts);
const choice = await question('Select an option: ');
if (choice.toLowerCase() === 'q') {
break;
}
if (choice.toLowerCase() === 'r' && lastRun) {
await runScript(lastRun);
continue;
}
let selectedScript = null;
for (const category of Object.values(scripts)) {
if (category[choice]) {
selectedScript = category[choice];
break;
}
}
if (!selectedScript) {
console.log('Invalid selection. Please try again.');
await new Promise(resolve => setTimeout(resolve, 1000));
continue;
}
if (selectedScript.submenu) {
const submenuChoice = await handleSubmenu(
selectedScript.submenu,
selectedScript.name
);
if (submenuChoice && submenuChoice.path) {
await runScript(submenuChoice);
}
} else if (selectedScript.path) {
await runScript(selectedScript);
}
}
rl.close();
process.exit(0);
}
if (require.main === module) {
main().catch(error => {
console.error('Fatal error:', error);
process.exit(1);
});
}

View File

@@ -1,89 +0,0 @@
const mysql = require('mysql2/promise');
const { Client } = require('ssh2');
const dotenv = require('dotenv');
const path = require('path');
dotenv.config({ path: path.join(__dirname, '../.env') });
// SSH configuration
const sshConfig = {
host: process.env.PROD_SSH_HOST,
port: process.env.PROD_SSH_PORT || 22,
username: process.env.PROD_SSH_USER,
privateKey: process.env.PROD_SSH_KEY_PATH ? require('fs').readFileSync(process.env.PROD_SSH_KEY_PATH) : undefined
};
// Database configuration
const dbConfig = {
host: process.env.PROD_DB_HOST || 'localhost', // Usually localhost when tunneling
user: process.env.PROD_DB_USER,
password: process.env.PROD_DB_PASSWORD,
database: process.env.PROD_DB_NAME,
port: process.env.PROD_DB_PORT || 3306
};
async function testConnection() {
const ssh = new Client();
try {
// Create new Promise for SSH connection
await new Promise((resolve, reject) => {
ssh.on('ready', resolve)
.on('error', reject)
.connect(sshConfig);
});
console.log('SSH Connection successful!');
// Forward local port to remote MySQL port
const tunnel = await new Promise((resolve, reject) => {
ssh.forwardOut(
'127.0.0.1',
0,
dbConfig.host,
dbConfig.port,
(err, stream) => {
if (err) reject(err);
resolve(stream);
}
);
});
console.log('Port forwarding established');
// Create MySQL connection over SSH tunnel
const connection = await mysql.createConnection({
...dbConfig,
stream: tunnel
});
console.log('MySQL Connection successful!');
// Test query
const [rows] = await connection.query('SELECT COUNT(*) as count FROM products');
console.log('Query successful! Product count:', rows[0].count);
// Clean up
await connection.end();
ssh.end();
console.log('Connections closed successfully');
return rows[0].count;
} catch (error) {
console.error('Error:', error);
if (ssh) ssh.end();
throw error;
}
}
// If running directly (not imported)
if (require.main === module) {
testConnection()
.then(() => process.exit(0))
.catch(error => {
console.error('Test failed:', error);
process.exit(1);
});
}
module.exports = { testConnection };