diff --git a/inventory-server/scripts/import-from-prod.js b/inventory-server/scripts/import-from-prod.js index 94bb506..2c926dd 100644 --- a/inventory-server/scripts/import-from-prod.js +++ b/inventory-server/scripts/import-from-prod.js @@ -1,7 +1,7 @@ -const mysql = require("mysql2/promise"); const dotenv = require("dotenv"); const path = require("path"); -const { setupSshTunnel, outputProgress, formatElapsedTime, prodDbConfig, localDbConfig } = require('./import/utils'); +const { outputProgress, formatElapsedTime } = require('./metrics/utils/progress'); +const { setupConnections, closeConnections } = require('./import/utils'); const importCategories = require('./import/categories'); const { importProducts } = require('./import/products'); const importOrders = require('./import/orders'); @@ -15,6 +15,38 @@ const IMPORT_PRODUCTS = true; const IMPORT_ORDERS = true; const IMPORT_PURCHASE_ORDERS = true; +// SSH configuration +const sshConfig = { + ssh: { + host: process.env.PROD_SSH_HOST, + port: process.env.PROD_SSH_PORT || 22, + username: process.env.PROD_SSH_USER, + privateKey: process.env.PROD_SSH_KEY_PATH + ? require("fs").readFileSync(process.env.PROD_SSH_KEY_PATH) + : undefined, + }, + // Production database configuration + prodDbConfig: { + host: process.env.PROD_DB_HOST || "localhost", + user: process.env.PROD_DB_USER, + password: process.env.PROD_DB_PASSWORD, + database: process.env.PROD_DB_NAME, + port: process.env.PROD_DB_PORT || 3306, + }, + // Local database configuration + localDbConfig: { + host: process.env.DB_HOST, + user: process.env.DB_USER, + password: process.env.DB_PASSWORD, + database: process.env.DB_NAME, + multipleStatements: true, + waitForConnections: true, + connectionLimit: 10, + queueLimit: 0, + namedPlaceholders: true, + } +}; + let isImportCancelled = false; // Add cancel function @@ -32,12 +64,9 @@ function cancelImport() { }); } -// Modify main function to handle cancellation and avoid process.exit async function main() { - let ssh; - let prodConnection; - let localConnection; const startTime = Date.now(); + let connections; try { // Initial progress update @@ -50,96 +79,39 @@ async function main() { elapsed: formatElapsedTime((Date.now() - startTime) / 1000) }); - const tunnel = await setupSshTunnel(); - ssh = tunnel.ssh; - - outputProgress({ - status: "running", - operation: "Import process", - message: "Connecting to production database...", - current: 0, - total: 4, - elapsed: formatElapsedTime((Date.now() - startTime) / 1000) - }); - - prodConnection = await mysql.createConnection({ - ...prodDbConfig, - stream: tunnel.stream, - }); - - outputProgress({ - status: "running", - operation: "Import process", - message: "Connecting to local database...", - current: 0, - total: 4, - elapsed: formatElapsedTime((Date.now() - startTime) / 1000) - }); - - localConnection = await mysql.createPool({ - ...localDbConfig, - waitForConnections: true, - connectionLimit: 10, - queueLimit: 0 - }); + connections = await setupConnections(sshConfig); + const { prodConnection, localConnection } = connections; if (isImportCancelled) throw new Error("Import cancelled"); - let currentStep = 0; + const results = { + categories: null, + products: null, + orders: null, + purchaseOrders: null + }; // Run each import based on constants if (IMPORT_CATEGORIES) { - outputProgress({ - status: "running", - operation: "Import process", - message: "Starting categories import...", - current: currentStep, - total: 4, - elapsed: formatElapsedTime((Date.now() - startTime) / 1000) - }); - await importCategories(prodConnection, localConnection); + results.categories = await importCategories(prodConnection, localConnection); if (isImportCancelled) throw new Error("Import cancelled"); currentStep++; } if (IMPORT_PRODUCTS) { - outputProgress({ - status: "running", - operation: "Import process", - message: "Starting products import...", - current: currentStep, - total: 4, - elapsed: formatElapsedTime((Date.now() - startTime) / 1000) - }); - await importProducts(prodConnection, localConnection); + results.products = await importProducts(prodConnection, localConnection); if (isImportCancelled) throw new Error("Import cancelled"); currentStep++; } if (IMPORT_ORDERS) { - outputProgress({ - status: "running", - operation: "Import process", - message: "Starting orders import...", - current: currentStep, - total: 4, - elapsed: formatElapsedTime((Date.now() - startTime) / 1000) - }); - await importOrders(prodConnection, localConnection); + results.orders = await importOrders(prodConnection, localConnection); if (isImportCancelled) throw new Error("Import cancelled"); currentStep++; } if (IMPORT_PURCHASE_ORDERS) { - outputProgress({ - status: "running", - operation: "Import process", - message: "Starting purchase orders import...", - current: currentStep, - total: 4, - elapsed: formatElapsedTime((Date.now() - startTime) / 1000) - }); - await importPurchaseOrders(prodConnection, localConnection); + results.purchaseOrders = await importPurchaseOrders(prodConnection, localConnection); if (isImportCancelled) throw new Error("Import cancelled"); currentStep++; } @@ -157,8 +129,11 @@ async function main() { end_time: new Date(endTime).toISOString(), elapsed_time: formatElapsedTime((endTime - startTime) / 1000), elapsed_seconds: Math.round((endTime - startTime) / 1000) - } + }, + results }); + + return results; } catch (error) { const endTime = Date.now(); console.error("Error during import process:", error); @@ -179,23 +154,8 @@ async function main() { }); throw error; } finally { - try { - // Close connections in order - if (prodConnection) await prodConnection.end(); - if (localConnection) await localConnection.end(); - - // Wait a bit for any pending data to be written before closing SSH - await new Promise(resolve => setTimeout(resolve, 100)); - - if (ssh) { - // Properly close the SSH connection - ssh.on('close', () => { - console.log('SSH connection closed cleanly'); - }); - ssh.end(); - } - } catch (err) { - console.error('Error during cleanup:', err); + if (connections) { + await closeConnections(connections); } } } @@ -211,6 +171,5 @@ if (require.main === module) { // Export the functions needed by the route module.exports = { main, - outputProgress, cancelImport, }; diff --git a/inventory-server/scripts/import/categories.js b/inventory-server/scripts/import/categories.js index 34f7c61..b099acc 100644 --- a/inventory-server/scripts/import/categories.js +++ b/inventory-server/scripts/import/categories.js @@ -1,4 +1,4 @@ -const { updateProgress, outputProgress, formatElapsedTime } = require('./utils'); +const { outputProgress, formatElapsedTime } = require('../metrics/utils/progress'); async function importCategories(prodConnection, localConnection) { outputProgress({ @@ -129,12 +129,13 @@ async function importCategories(prodConnection, localConnection) { ); totalInserted += categoriesToInsert.length; - updateProgress( - totalInserted, - totalInserted, - "Categories import", - startTime - ); + outputProgress({ + status: "running", + operation: "Categories import", + current: totalInserted, + total: totalInserted, + elapsed: formatElapsedTime(startTime), + }); } // After all imports, if we skipped any categories, throw an error @@ -151,8 +152,13 @@ async function importCategories(prodConnection, localConnection) { operation: "Categories import completed", current: totalInserted, total: totalInserted, - duration: formatElapsedTime((Date.now() - startTime) / 1000), + duration: formatElapsedTime(Date.now() - startTime), }); + + return { + status: "complete", + totalImported: totalInserted + }; } catch (error) { console.error("Error importing categories:", error); if (error.skippedCategories) { @@ -161,6 +167,14 @@ async function importCategories(prodConnection, localConnection) { JSON.stringify(error.skippedCategories, null, 2) ); } + + outputProgress({ + status: "error", + operation: "Categories import failed", + error: error.message, + skippedCategories: error.skippedCategories + }); + throw error; } } diff --git a/inventory-server/scripts/import/orders.js b/inventory-server/scripts/import/orders.js index 1427d0a..d692696 100644 --- a/inventory-server/scripts/import/orders.js +++ b/inventory-server/scripts/import/orders.js @@ -1,4 +1,4 @@ -const { updateProgress, outputProgress, formatElapsedTime } = require('./utils'); +const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress'); const { importMissingProducts } = require('./products'); async function importOrders(prodConnection, localConnection) { @@ -25,11 +25,6 @@ async function importOrders(prodConnection, localConnection) { .filter((name) => name !== "id"); // Skip auto-increment ID // Get total count first for progress indication - outputProgress({ - operation: "Starting orders import - Getting total count", - status: "running", - }); - const [countResult] = await prodConnection.query(` SELECT COUNT(*) as total FROM order_items oi FORCE INDEX (PRIMARY) @@ -132,12 +127,15 @@ async function importOrders(prodConnection, localConnection) { processed += orders.length; offset += batchSize; - updateProgress( - processed, + outputProgress({ + status: "running", + operation: "Orders import", + current: processed, total, - "Orders import", - startTime - ); + elapsed: formatElapsedTime(startTime), + remaining: estimateRemaining(startTime, processed, total), + rate: calculateRate(startTime, processed) + }); } // Now handle missing products and retry skipped orders @@ -215,13 +213,20 @@ async function importOrders(prodConnection, localConnection) { } } - const endTime = Date.now(); outputProgress({ - operation: `Orders import complete in ${Math.round( - (endTime - startTime) / 1000 - )}s`, status: "complete", + operation: "Orders import completed", + current: total, + total, + duration: formatElapsedTime(Date.now() - startTime), }); + + return { + status: "complete", + totalImported: total, + missingProducts: missingProducts.size, + retriedOrders: skippedOrders.size + }; } catch (error) { outputProgress({ operation: "Orders import failed", diff --git a/inventory-server/scripts/import/products.js b/inventory-server/scripts/import/products.js index 70be5c5..f597207 100644 --- a/inventory-server/scripts/import/products.js +++ b/inventory-server/scripts/import/products.js @@ -1,4 +1,4 @@ -const { updateProgress, outputProgress, formatElapsedTime } = require('./utils'); +const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress'); async function importMissingProducts(prodConnection, localConnection, missingPids) { // First get the column names from the table structure @@ -345,76 +345,9 @@ async function importProducts(prodConnection, localConnection) { GROUP BY p.pid `); - // Debug log to check for specific product - const debugProduct = rows.find((row) => row.pid === 620972); - if (debugProduct) { - console.log("Found product 620972:", debugProduct); - } else { - console.log("Product 620972 not found in query results"); - - // Debug query to check why it's missing - const [debugResult] = await prodConnection.query( - ` - SELECT - p.pid, - p.itemnumber, - p.date_created, - p.datein, - pls.date_sold, - si.show, - si.buyable, - pcp.price_each - FROM products p - LEFT JOIN product_last_sold pls ON p.pid = pls.pid - LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0 - LEFT JOIN ( - SELECT pid, MIN(price_each) as price_each - FROM product_current_prices - WHERE active = 1 - GROUP BY pid - ) pcp ON p.pid = pcp.pid - WHERE p.pid = ? - `, - [620972] - ); - - console.log("Debug query result:", debugResult); - } - - // Also check for the other missing products - const missingPids = [ - 208348, 317600, 370009, 429494, 466233, 471156, 474582, 476214, 484394, - 484755, 484756, 493549, 620972, - ]; - const [missingProducts] = await prodConnection.query( - ` - SELECT - p.pid, - p.itemnumber, - p.date_created, - p.datein, - pls.date_sold, - si.show, - si.buyable, - pcp.price_each - FROM products p - LEFT JOIN product_last_sold pls ON p.pid = pls.pid - LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0 - LEFT JOIN ( - SELECT pid, MIN(price_each) as price_each - FROM product_current_prices - WHERE active = 1 - GROUP BY pid - ) pcp ON p.pid = pcp.pid - WHERE p.pid IN (?) - `, - [missingPids] - ); - - console.log("Debug results for missing products:", missingProducts); - let current = 0; const total = rows.length; + const BATCH_SIZE = 1000; // Process products in batches for (let i = 0; i < rows.length; i += BATCH_SIZE) { @@ -475,8 +408,6 @@ async function importProducts(prodConnection, localConnection) { ...new Set(categoryRelationships.map(([catId]) => catId)), ]; - console.log("Checking categories:", uniqueCatIds); - // Check which categories exist const [existingCats] = await localConnection.query( "SELECT cat_id FROM categories WHERE cat_id IN (?)", @@ -539,7 +470,15 @@ async function importProducts(prodConnection, localConnection) { } current += batch.length; - updateProgress(current, total, "Products import", startTime); + outputProgress({ + status: "running", + operation: "Products import", + current, + total, + elapsed: formatElapsedTime(startTime), + remaining: estimateRemaining(startTime, current, total), + rate: calculateRate(startTime, current) + }); } outputProgress({ @@ -547,10 +486,22 @@ async function importProducts(prodConnection, localConnection) { operation: "Products import completed", current: total, total, - duration: formatElapsedTime((Date.now() - startTime) / 1000), + duration: formatElapsedTime(Date.now() - startTime), }); + + return { + status: "complete", + totalImported: total + }; } catch (error) { console.error("Error importing products:", error); + + outputProgress({ + status: "error", + operation: "Products import failed", + error: error.message + }); + throw error; } } diff --git a/inventory-server/scripts/import/purchase-orders.js b/inventory-server/scripts/import/purchase-orders.js index 323e894..2c9ac69 100644 --- a/inventory-server/scripts/import/purchase-orders.js +++ b/inventory-server/scripts/import/purchase-orders.js @@ -1,4 +1,4 @@ -const { updateProgress, outputProgress, formatElapsedTime } = require('./utils'); +const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress'); async function importPurchaseOrders(prodConnection, localConnection) { outputProgress({ @@ -257,26 +257,32 @@ async function importPurchaseOrders(prodConnection, localConnection) { // Update progress based on time interval const now = Date.now(); if (now - lastProgressUpdate >= PROGRESS_INTERVAL || processed === totalItems) { - updateProgress(processed, totalItems, "Purchase orders import", startTime); + outputProgress({ + status: "running", + operation: "Purchase orders import", + current: processed, + total: totalItems, + elapsed: formatElapsedTime(startTime), + remaining: estimateRemaining(startTime, processed, totalItems), + rate: calculateRate(startTime, processed) + }); lastProgressUpdate = now; } } } - const endTime = Date.now(); outputProgress({ - operation: `Purchase orders import complete`, status: "complete", - processed_records: processed, - total_records: totalItems, - timing: { - start_time: new Date(startTime).toISOString(), - end_time: new Date(endTime).toISOString(), - elapsed_time: formatElapsedTime((endTime - startTime) / 1000), - elapsed_seconds: Math.round((endTime - startTime) / 1000) - } + operation: "Purchase orders import completed", + current: totalItems, + total: totalItems, + duration: formatElapsedTime(Date.now() - startTime), }); + return { + status: "complete", + totalImported: totalItems + }; } catch (error) { outputProgress({ operation: "Purchase orders import failed", diff --git a/inventory-server/scripts/import/utils.js b/inventory-server/scripts/import/utils.js index 3f71b9c..12d8a21 100644 --- a/inventory-server/scripts/import/utils.js +++ b/inventory-server/scripts/import/utils.js @@ -2,53 +2,14 @@ const mysql = require("mysql2/promise"); const { Client } = require("ssh2"); const dotenv = require("dotenv"); const path = require("path"); -const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress'); -dotenv.config({ path: path.join(__dirname, "../../.env") }); - -// SSH configuration -const sshConfig = { - host: process.env.PROD_SSH_HOST, - port: process.env.PROD_SSH_PORT || 22, - username: process.env.PROD_SSH_USER, - privateKey: process.env.PROD_SSH_KEY_PATH - ? require("fs").readFileSync(process.env.PROD_SSH_KEY_PATH) - : undefined, -}; - -// Production database configuration -const prodDbConfig = { - host: process.env.PROD_DB_HOST || "localhost", - user: process.env.PROD_DB_USER, - password: process.env.PROD_DB_PASSWORD, - database: process.env.PROD_DB_NAME, - port: process.env.PROD_DB_PORT || 3306, -}; - -// Local database configuration -const localDbConfig = { - host: process.env.DB_HOST, - user: process.env.DB_USER, - password: process.env.DB_PASSWORD, - database: process.env.DB_NAME, - multipleStatements: true, - waitForConnections: true, - connectionLimit: 10, - queueLimit: 0, - namedPlaceholders: true, -}; - -// Constants -const BATCH_SIZE = 1000; -const PROGRESS_INTERVAL = 1000; // Update progress every second - -async function setupSshTunnel() { +// Helper function to setup SSH tunnel +async function setupSshTunnel(sshConfig) { return new Promise((resolve, reject) => { const ssh = new Client(); ssh.on('error', (err) => { console.error('SSH connection error:', err); - // Don't reject here, just log the error }); ssh.on('end', () => { @@ -64,39 +25,64 @@ async function setupSshTunnel() { ssh.forwardOut( "127.0.0.1", 0, - prodDbConfig.host, - prodDbConfig.port, + sshConfig.prodDbConfig.host, + sshConfig.prodDbConfig.port, async (err, stream) => { if (err) reject(err); resolve({ ssh, stream }); } ); }) - .connect(sshConfig); + .connect(sshConfig.ssh); }); } -// Helper function to update progress with time estimate -function updateProgress(current, total, operation, startTime) { - outputProgress({ - status: 'running', - operation, - current, - total, - rate: calculateRate(startTime, current), - elapsed: formatElapsedTime(startTime), - remaining: estimateRemaining(startTime, current, total), - percentage: ((current / total) * 100).toFixed(1) - }); +// Helper function to setup database connections +async function setupConnections(sshConfig) { + const tunnel = await setupSshTunnel(sshConfig); + + const prodConnection = await mysql.createConnection({ + ...sshConfig.prodDbConfig, + stream: tunnel.stream, + }); + + const localConnection = await mysql.createPool({ + ...sshConfig.localDbConfig, + waitForConnections: true, + connectionLimit: 10, + queueLimit: 0 + }); + + return { + ssh: tunnel.ssh, + prodConnection, + localConnection + }; +} + +// Helper function to close connections +async function closeConnections(connections) { + const { ssh, prodConnection, localConnection } = connections; + + try { + if (prodConnection) await prodConnection.end(); + if (localConnection) await localConnection.end(); + + // Wait a bit for any pending data to be written before closing SSH + await new Promise(resolve => setTimeout(resolve, 100)); + + if (ssh) { + ssh.on('close', () => { + console.log('SSH connection closed cleanly'); + }); + ssh.end(); + } + } catch (err) { + console.error('Error during cleanup:', err); + } } module.exports = { - setupSshTunnel, - updateProgress, - prodDbConfig, - localDbConfig, - BATCH_SIZE, - PROGRESS_INTERVAL, - outputProgress, - formatElapsedTime + setupConnections, + closeConnections }; \ No newline at end of file