const mysql = require("mysql2/promise"); const dotenv = require("dotenv"); const path = require("path"); const { setupSshTunnel, outputProgress, formatElapsedTime, prodDbConfig, localDbConfig } = require('./import/utils'); const importCategories = require('./import/categories'); const { importProducts } = require('./import/products'); const importOrders = require('./import/orders'); const importPurchaseOrders = require('./import/purchase-orders'); dotenv.config({ path: path.join(__dirname, "../.env") }); // Constants to control which imports run const IMPORT_CATEGORIES = true; const IMPORT_PRODUCTS = true; const IMPORT_ORDERS = true; const IMPORT_PURCHASE_ORDERS = true; let isImportCancelled = false; // Add cancel function function cancelImport() { isImportCancelled = true; outputProgress({ status: 'cancelled', operation: 'Import process', message: 'Import cancelled by user', current: 0, total: 0, elapsed: null, remaining: null, rate: 0 }); } // Modify main function to handle cancellation and avoid process.exit async function main() { let ssh; let prodConnection; let localConnection; const startTime = Date.now(); try { // Initial progress update outputProgress({ status: "running", operation: "Import process", message: "Initializing SSH tunnel...", current: 0, total: 4, // Total number of major steps elapsed: formatElapsedTime((Date.now() - startTime) / 1000) }); const tunnel = await setupSshTunnel(); ssh = tunnel.ssh; outputProgress({ status: "running", operation: "Import process", message: "Connecting to production database...", current: 0, total: 4, elapsed: formatElapsedTime((Date.now() - startTime) / 1000) }); prodConnection = await mysql.createConnection({ ...prodDbConfig, stream: tunnel.stream, }); outputProgress({ status: "running", operation: "Import process", message: "Connecting to local database...", current: 0, total: 4, elapsed: formatElapsedTime((Date.now() - startTime) / 1000) }); localConnection = await mysql.createPool({ ...localDbConfig, waitForConnections: true, connectionLimit: 10, queueLimit: 0 }); if (isImportCancelled) throw new Error("Import cancelled"); let currentStep = 0; // Run each import based on constants if (IMPORT_CATEGORIES) { outputProgress({ status: "running", operation: "Import process", message: "Starting categories import...", current: currentStep, total: 4, elapsed: formatElapsedTime((Date.now() - startTime) / 1000) }); await importCategories(prodConnection, localConnection); if (isImportCancelled) throw new Error("Import cancelled"); currentStep++; } if (IMPORT_PRODUCTS) { outputProgress({ status: "running", operation: "Import process", message: "Starting products import...", current: currentStep, total: 4, elapsed: formatElapsedTime((Date.now() - startTime) / 1000) }); await importProducts(prodConnection, localConnection); if (isImportCancelled) throw new Error("Import cancelled"); currentStep++; } if (IMPORT_ORDERS) { outputProgress({ status: "running", operation: "Import process", message: "Starting orders import...", current: currentStep, total: 4, elapsed: formatElapsedTime((Date.now() - startTime) / 1000) }); await importOrders(prodConnection, localConnection); if (isImportCancelled) throw new Error("Import cancelled"); currentStep++; } if (IMPORT_PURCHASE_ORDERS) { outputProgress({ status: "running", operation: "Import process", message: "Starting purchase orders import...", current: currentStep, total: 4, elapsed: formatElapsedTime((Date.now() - startTime) / 1000) }); await importPurchaseOrders(prodConnection, localConnection); if (isImportCancelled) throw new Error("Import cancelled"); currentStep++; } const endTime = Date.now(); outputProgress({ status: "complete", operation: "Import process", message: "All imports completed successfully", current: 4, total: 4, elapsed: formatElapsedTime((endTime - startTime) / 1000), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date(endTime).toISOString(), elapsed_time: formatElapsedTime((endTime - startTime) / 1000), elapsed_seconds: Math.round((endTime - startTime) / 1000) } }); } catch (error) { const endTime = Date.now(); console.error("Error during import process:", error); outputProgress({ status: error.message === "Import cancelled" ? "cancelled" : "error", operation: "Import process", message: error.message === "Import cancelled" ? "Import cancelled by user" : "Import failed", error: error.message, current: 0, total: 4, elapsed: formatElapsedTime((endTime - startTime) / 1000), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date(endTime).toISOString(), elapsed_time: formatElapsedTime((endTime - startTime) / 1000), elapsed_seconds: Math.round((endTime - startTime) / 1000) } }); throw error; } finally { try { // Close connections in order if (prodConnection) await prodConnection.end(); if (localConnection) await localConnection.end(); // Wait a bit for any pending data to be written before closing SSH await new Promise(resolve => setTimeout(resolve, 100)); if (ssh) { // Properly close the SSH connection ssh.on('close', () => { console.log('SSH connection closed cleanly'); }); ssh.end(); } } catch (err) { console.error('Error during cleanup:', err); } } } // Run the import only if this is the main module if (require.main === module) { main().catch((error) => { console.error("Unhandled error in main process:", error); process.exit(1); }); } // Export the functions needed by the route module.exports = { main, outputProgress, cancelImport, };