Break up prod import script into pieces and move csv scripts into folder

This commit is contained in:
2025-01-29 13:23:32 -05:00
parent 814d5d1a84
commit 84baa7e7d3
6 changed files with 187 additions and 266 deletions

View File

@@ -1,7 +1,7 @@
const mysql = require("mysql2/promise");
const dotenv = require("dotenv"); const dotenv = require("dotenv");
const path = require("path"); const path = require("path");
const { setupSshTunnel, outputProgress, formatElapsedTime, prodDbConfig, localDbConfig } = require('./import/utils'); const { outputProgress, formatElapsedTime } = require('./metrics/utils/progress');
const { setupConnections, closeConnections } = require('./import/utils');
const importCategories = require('./import/categories'); const importCategories = require('./import/categories');
const { importProducts } = require('./import/products'); const { importProducts } = require('./import/products');
const importOrders = require('./import/orders'); const importOrders = require('./import/orders');
@@ -15,6 +15,38 @@ const IMPORT_PRODUCTS = true;
const IMPORT_ORDERS = true; const IMPORT_ORDERS = true;
const IMPORT_PURCHASE_ORDERS = true; const IMPORT_PURCHASE_ORDERS = true;
// SSH configuration
const sshConfig = {
ssh: {
host: process.env.PROD_SSH_HOST,
port: process.env.PROD_SSH_PORT || 22,
username: process.env.PROD_SSH_USER,
privateKey: process.env.PROD_SSH_KEY_PATH
? require("fs").readFileSync(process.env.PROD_SSH_KEY_PATH)
: undefined,
},
// Production database configuration
prodDbConfig: {
host: process.env.PROD_DB_HOST || "localhost",
user: process.env.PROD_DB_USER,
password: process.env.PROD_DB_PASSWORD,
database: process.env.PROD_DB_NAME,
port: process.env.PROD_DB_PORT || 3306,
},
// Local database configuration
localDbConfig: {
host: process.env.DB_HOST,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
database: process.env.DB_NAME,
multipleStatements: true,
waitForConnections: true,
connectionLimit: 10,
queueLimit: 0,
namedPlaceholders: true,
}
};
let isImportCancelled = false; let isImportCancelled = false;
// Add cancel function // Add cancel function
@@ -32,12 +64,9 @@ function cancelImport() {
}); });
} }
// Modify main function to handle cancellation and avoid process.exit
async function main() { async function main() {
let ssh;
let prodConnection;
let localConnection;
const startTime = Date.now(); const startTime = Date.now();
let connections;
try { try {
// Initial progress update // Initial progress update
@@ -50,96 +79,39 @@ async function main() {
elapsed: formatElapsedTime((Date.now() - startTime) / 1000) elapsed: formatElapsedTime((Date.now() - startTime) / 1000)
}); });
const tunnel = await setupSshTunnel(); connections = await setupConnections(sshConfig);
ssh = tunnel.ssh; const { prodConnection, localConnection } = connections;
outputProgress({
status: "running",
operation: "Import process",
message: "Connecting to production database...",
current: 0,
total: 4,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000)
});
prodConnection = await mysql.createConnection({
...prodDbConfig,
stream: tunnel.stream,
});
outputProgress({
status: "running",
operation: "Import process",
message: "Connecting to local database...",
current: 0,
total: 4,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000)
});
localConnection = await mysql.createPool({
...localDbConfig,
waitForConnections: true,
connectionLimit: 10,
queueLimit: 0
});
if (isImportCancelled) throw new Error("Import cancelled"); if (isImportCancelled) throw new Error("Import cancelled");
let currentStep = 0; const results = {
categories: null,
products: null,
orders: null,
purchaseOrders: null
};
// Run each import based on constants // Run each import based on constants
if (IMPORT_CATEGORIES) { if (IMPORT_CATEGORIES) {
outputProgress({ results.categories = await importCategories(prodConnection, localConnection);
status: "running",
operation: "Import process",
message: "Starting categories import...",
current: currentStep,
total: 4,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000)
});
await importCategories(prodConnection, localConnection);
if (isImportCancelled) throw new Error("Import cancelled"); if (isImportCancelled) throw new Error("Import cancelled");
currentStep++; currentStep++;
} }
if (IMPORT_PRODUCTS) { if (IMPORT_PRODUCTS) {
outputProgress({ results.products = await importProducts(prodConnection, localConnection);
status: "running",
operation: "Import process",
message: "Starting products import...",
current: currentStep,
total: 4,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000)
});
await importProducts(prodConnection, localConnection);
if (isImportCancelled) throw new Error("Import cancelled"); if (isImportCancelled) throw new Error("Import cancelled");
currentStep++; currentStep++;
} }
if (IMPORT_ORDERS) { if (IMPORT_ORDERS) {
outputProgress({ results.orders = await importOrders(prodConnection, localConnection);
status: "running",
operation: "Import process",
message: "Starting orders import...",
current: currentStep,
total: 4,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000)
});
await importOrders(prodConnection, localConnection);
if (isImportCancelled) throw new Error("Import cancelled"); if (isImportCancelled) throw new Error("Import cancelled");
currentStep++; currentStep++;
} }
if (IMPORT_PURCHASE_ORDERS) { if (IMPORT_PURCHASE_ORDERS) {
outputProgress({ results.purchaseOrders = await importPurchaseOrders(prodConnection, localConnection);
status: "running",
operation: "Import process",
message: "Starting purchase orders import...",
current: currentStep,
total: 4,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000)
});
await importPurchaseOrders(prodConnection, localConnection);
if (isImportCancelled) throw new Error("Import cancelled"); if (isImportCancelled) throw new Error("Import cancelled");
currentStep++; currentStep++;
} }
@@ -157,8 +129,11 @@ async function main() {
end_time: new Date(endTime).toISOString(), end_time: new Date(endTime).toISOString(),
elapsed_time: formatElapsedTime((endTime - startTime) / 1000), elapsed_time: formatElapsedTime((endTime - startTime) / 1000),
elapsed_seconds: Math.round((endTime - startTime) / 1000) elapsed_seconds: Math.round((endTime - startTime) / 1000)
} },
results
}); });
return results;
} catch (error) { } catch (error) {
const endTime = Date.now(); const endTime = Date.now();
console.error("Error during import process:", error); console.error("Error during import process:", error);
@@ -179,23 +154,8 @@ async function main() {
}); });
throw error; throw error;
} finally { } finally {
try { if (connections) {
// Close connections in order await closeConnections(connections);
if (prodConnection) await prodConnection.end();
if (localConnection) await localConnection.end();
// Wait a bit for any pending data to be written before closing SSH
await new Promise(resolve => setTimeout(resolve, 100));
if (ssh) {
// Properly close the SSH connection
ssh.on('close', () => {
console.log('SSH connection closed cleanly');
});
ssh.end();
}
} catch (err) {
console.error('Error during cleanup:', err);
} }
} }
} }
@@ -211,6 +171,5 @@ if (require.main === module) {
// Export the functions needed by the route // Export the functions needed by the route
module.exports = { module.exports = {
main, main,
outputProgress,
cancelImport, cancelImport,
}; };

View File

@@ -1,4 +1,4 @@
const { updateProgress, outputProgress, formatElapsedTime } = require('./utils'); const { outputProgress, formatElapsedTime } = require('../metrics/utils/progress');
async function importCategories(prodConnection, localConnection) { async function importCategories(prodConnection, localConnection) {
outputProgress({ outputProgress({
@@ -129,12 +129,13 @@ async function importCategories(prodConnection, localConnection) {
); );
totalInserted += categoriesToInsert.length; totalInserted += categoriesToInsert.length;
updateProgress( outputProgress({
totalInserted, status: "running",
totalInserted, operation: "Categories import",
"Categories import", current: totalInserted,
startTime total: totalInserted,
); elapsed: formatElapsedTime(startTime),
});
} }
// After all imports, if we skipped any categories, throw an error // After all imports, if we skipped any categories, throw an error
@@ -151,8 +152,13 @@ async function importCategories(prodConnection, localConnection) {
operation: "Categories import completed", operation: "Categories import completed",
current: totalInserted, current: totalInserted,
total: totalInserted, total: totalInserted,
duration: formatElapsedTime((Date.now() - startTime) / 1000), duration: formatElapsedTime(Date.now() - startTime),
}); });
return {
status: "complete",
totalImported: totalInserted
};
} catch (error) { } catch (error) {
console.error("Error importing categories:", error); console.error("Error importing categories:", error);
if (error.skippedCategories) { if (error.skippedCategories) {
@@ -161,6 +167,14 @@ async function importCategories(prodConnection, localConnection) {
JSON.stringify(error.skippedCategories, null, 2) JSON.stringify(error.skippedCategories, null, 2)
); );
} }
outputProgress({
status: "error",
operation: "Categories import failed",
error: error.message,
skippedCategories: error.skippedCategories
});
throw error; throw error;
} }
} }

View File

@@ -1,4 +1,4 @@
const { updateProgress, outputProgress, formatElapsedTime } = require('./utils'); const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress');
const { importMissingProducts } = require('./products'); const { importMissingProducts } = require('./products');
async function importOrders(prodConnection, localConnection) { async function importOrders(prodConnection, localConnection) {
@@ -25,11 +25,6 @@ async function importOrders(prodConnection, localConnection) {
.filter((name) => name !== "id"); // Skip auto-increment ID .filter((name) => name !== "id"); // Skip auto-increment ID
// Get total count first for progress indication // Get total count first for progress indication
outputProgress({
operation: "Starting orders import - Getting total count",
status: "running",
});
const [countResult] = await prodConnection.query(` const [countResult] = await prodConnection.query(`
SELECT COUNT(*) as total SELECT COUNT(*) as total
FROM order_items oi FORCE INDEX (PRIMARY) FROM order_items oi FORCE INDEX (PRIMARY)
@@ -132,12 +127,15 @@ async function importOrders(prodConnection, localConnection) {
processed += orders.length; processed += orders.length;
offset += batchSize; offset += batchSize;
updateProgress( outputProgress({
processed, status: "running",
operation: "Orders import",
current: processed,
total, total,
"Orders import", elapsed: formatElapsedTime(startTime),
startTime remaining: estimateRemaining(startTime, processed, total),
); rate: calculateRate(startTime, processed)
});
} }
// Now handle missing products and retry skipped orders // Now handle missing products and retry skipped orders
@@ -215,13 +213,20 @@ async function importOrders(prodConnection, localConnection) {
} }
} }
const endTime = Date.now();
outputProgress({ outputProgress({
operation: `Orders import complete in ${Math.round(
(endTime - startTime) / 1000
)}s`,
status: "complete", status: "complete",
operation: "Orders import completed",
current: total,
total,
duration: formatElapsedTime(Date.now() - startTime),
}); });
return {
status: "complete",
totalImported: total,
missingProducts: missingProducts.size,
retriedOrders: skippedOrders.size
};
} catch (error) { } catch (error) {
outputProgress({ outputProgress({
operation: "Orders import failed", operation: "Orders import failed",

View File

@@ -1,4 +1,4 @@
const { updateProgress, outputProgress, formatElapsedTime } = require('./utils'); const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress');
async function importMissingProducts(prodConnection, localConnection, missingPids) { async function importMissingProducts(prodConnection, localConnection, missingPids) {
// First get the column names from the table structure // First get the column names from the table structure
@@ -345,76 +345,9 @@ async function importProducts(prodConnection, localConnection) {
GROUP BY p.pid GROUP BY p.pid
`); `);
// Debug log to check for specific product
const debugProduct = rows.find((row) => row.pid === 620972);
if (debugProduct) {
console.log("Found product 620972:", debugProduct);
} else {
console.log("Product 620972 not found in query results");
// Debug query to check why it's missing
const [debugResult] = await prodConnection.query(
`
SELECT
p.pid,
p.itemnumber,
p.date_created,
p.datein,
pls.date_sold,
si.show,
si.buyable,
pcp.price_each
FROM products p
LEFT JOIN product_last_sold pls ON p.pid = pls.pid
LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0
LEFT JOIN (
SELECT pid, MIN(price_each) as price_each
FROM product_current_prices
WHERE active = 1
GROUP BY pid
) pcp ON p.pid = pcp.pid
WHERE p.pid = ?
`,
[620972]
);
console.log("Debug query result:", debugResult);
}
// Also check for the other missing products
const missingPids = [
208348, 317600, 370009, 429494, 466233, 471156, 474582, 476214, 484394,
484755, 484756, 493549, 620972,
];
const [missingProducts] = await prodConnection.query(
`
SELECT
p.pid,
p.itemnumber,
p.date_created,
p.datein,
pls.date_sold,
si.show,
si.buyable,
pcp.price_each
FROM products p
LEFT JOIN product_last_sold pls ON p.pid = pls.pid
LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0
LEFT JOIN (
SELECT pid, MIN(price_each) as price_each
FROM product_current_prices
WHERE active = 1
GROUP BY pid
) pcp ON p.pid = pcp.pid
WHERE p.pid IN (?)
`,
[missingPids]
);
console.log("Debug results for missing products:", missingProducts);
let current = 0; let current = 0;
const total = rows.length; const total = rows.length;
const BATCH_SIZE = 1000;
// Process products in batches // Process products in batches
for (let i = 0; i < rows.length; i += BATCH_SIZE) { for (let i = 0; i < rows.length; i += BATCH_SIZE) {
@@ -475,8 +408,6 @@ async function importProducts(prodConnection, localConnection) {
...new Set(categoryRelationships.map(([catId]) => catId)), ...new Set(categoryRelationships.map(([catId]) => catId)),
]; ];
console.log("Checking categories:", uniqueCatIds);
// Check which categories exist // Check which categories exist
const [existingCats] = await localConnection.query( const [existingCats] = await localConnection.query(
"SELECT cat_id FROM categories WHERE cat_id IN (?)", "SELECT cat_id FROM categories WHERE cat_id IN (?)",
@@ -539,7 +470,15 @@ async function importProducts(prodConnection, localConnection) {
} }
current += batch.length; current += batch.length;
updateProgress(current, total, "Products import", startTime); outputProgress({
status: "running",
operation: "Products import",
current,
total,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, current, total),
rate: calculateRate(startTime, current)
});
} }
outputProgress({ outputProgress({
@@ -547,10 +486,22 @@ async function importProducts(prodConnection, localConnection) {
operation: "Products import completed", operation: "Products import completed",
current: total, current: total,
total, total,
duration: formatElapsedTime((Date.now() - startTime) / 1000), duration: formatElapsedTime(Date.now() - startTime),
}); });
return {
status: "complete",
totalImported: total
};
} catch (error) { } catch (error) {
console.error("Error importing products:", error); console.error("Error importing products:", error);
outputProgress({
status: "error",
operation: "Products import failed",
error: error.message
});
throw error; throw error;
} }
} }

View File

@@ -1,4 +1,4 @@
const { updateProgress, outputProgress, formatElapsedTime } = require('./utils'); const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress');
async function importPurchaseOrders(prodConnection, localConnection) { async function importPurchaseOrders(prodConnection, localConnection) {
outputProgress({ outputProgress({
@@ -257,26 +257,32 @@ async function importPurchaseOrders(prodConnection, localConnection) {
// Update progress based on time interval // Update progress based on time interval
const now = Date.now(); const now = Date.now();
if (now - lastProgressUpdate >= PROGRESS_INTERVAL || processed === totalItems) { if (now - lastProgressUpdate >= PROGRESS_INTERVAL || processed === totalItems) {
updateProgress(processed, totalItems, "Purchase orders import", startTime); outputProgress({
status: "running",
operation: "Purchase orders import",
current: processed,
total: totalItems,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processed, totalItems),
rate: calculateRate(startTime, processed)
});
lastProgressUpdate = now; lastProgressUpdate = now;
} }
} }
} }
const endTime = Date.now();
outputProgress({ outputProgress({
operation: `Purchase orders import complete`,
status: "complete", status: "complete",
processed_records: processed, operation: "Purchase orders import completed",
total_records: totalItems, current: totalItems,
timing: { total: totalItems,
start_time: new Date(startTime).toISOString(), duration: formatElapsedTime(Date.now() - startTime),
end_time: new Date(endTime).toISOString(),
elapsed_time: formatElapsedTime((endTime - startTime) / 1000),
elapsed_seconds: Math.round((endTime - startTime) / 1000)
}
}); });
return {
status: "complete",
totalImported: totalItems
};
} catch (error) { } catch (error) {
outputProgress({ outputProgress({
operation: "Purchase orders import failed", operation: "Purchase orders import failed",

View File

@@ -2,53 +2,14 @@ const mysql = require("mysql2/promise");
const { Client } = require("ssh2"); const { Client } = require("ssh2");
const dotenv = require("dotenv"); const dotenv = require("dotenv");
const path = require("path"); const path = require("path");
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress');
dotenv.config({ path: path.join(__dirname, "../../.env") }); // Helper function to setup SSH tunnel
async function setupSshTunnel(sshConfig) {
// SSH configuration
const sshConfig = {
host: process.env.PROD_SSH_HOST,
port: process.env.PROD_SSH_PORT || 22,
username: process.env.PROD_SSH_USER,
privateKey: process.env.PROD_SSH_KEY_PATH
? require("fs").readFileSync(process.env.PROD_SSH_KEY_PATH)
: undefined,
};
// Production database configuration
const prodDbConfig = {
host: process.env.PROD_DB_HOST || "localhost",
user: process.env.PROD_DB_USER,
password: process.env.PROD_DB_PASSWORD,
database: process.env.PROD_DB_NAME,
port: process.env.PROD_DB_PORT || 3306,
};
// Local database configuration
const localDbConfig = {
host: process.env.DB_HOST,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
database: process.env.DB_NAME,
multipleStatements: true,
waitForConnections: true,
connectionLimit: 10,
queueLimit: 0,
namedPlaceholders: true,
};
// Constants
const BATCH_SIZE = 1000;
const PROGRESS_INTERVAL = 1000; // Update progress every second
async function setupSshTunnel() {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
const ssh = new Client(); const ssh = new Client();
ssh.on('error', (err) => { ssh.on('error', (err) => {
console.error('SSH connection error:', err); console.error('SSH connection error:', err);
// Don't reject here, just log the error
}); });
ssh.on('end', () => { ssh.on('end', () => {
@@ -64,39 +25,64 @@ async function setupSshTunnel() {
ssh.forwardOut( ssh.forwardOut(
"127.0.0.1", "127.0.0.1",
0, 0,
prodDbConfig.host, sshConfig.prodDbConfig.host,
prodDbConfig.port, sshConfig.prodDbConfig.port,
async (err, stream) => { async (err, stream) => {
if (err) reject(err); if (err) reject(err);
resolve({ ssh, stream }); resolve({ ssh, stream });
} }
); );
}) })
.connect(sshConfig); .connect(sshConfig.ssh);
}); });
} }
// Helper function to update progress with time estimate // Helper function to setup database connections
function updateProgress(current, total, operation, startTime) { async function setupConnections(sshConfig) {
outputProgress({ const tunnel = await setupSshTunnel(sshConfig);
status: 'running',
operation, const prodConnection = await mysql.createConnection({
current, ...sshConfig.prodDbConfig,
total, stream: tunnel.stream,
rate: calculateRate(startTime, current),
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, current, total),
percentage: ((current / total) * 100).toFixed(1)
}); });
const localConnection = await mysql.createPool({
...sshConfig.localDbConfig,
waitForConnections: true,
connectionLimit: 10,
queueLimit: 0
});
return {
ssh: tunnel.ssh,
prodConnection,
localConnection
};
}
// Helper function to close connections
async function closeConnections(connections) {
const { ssh, prodConnection, localConnection } = connections;
try {
if (prodConnection) await prodConnection.end();
if (localConnection) await localConnection.end();
// Wait a bit for any pending data to be written before closing SSH
await new Promise(resolve => setTimeout(resolve, 100));
if (ssh) {
ssh.on('close', () => {
console.log('SSH connection closed cleanly');
});
ssh.end();
}
} catch (err) {
console.error('Error during cleanup:', err);
}
} }
module.exports = { module.exports = {
setupSshTunnel, setupConnections,
updateProgress, closeConnections
prodDbConfig,
localDbConfig,
BATCH_SIZE,
PROGRESS_INTERVAL,
outputProgress,
formatElapsedTime
}; };