const dotenv = require("dotenv"); const path = require("path"); const { outputProgress, formatElapsedTime } = require('./metrics/utils/progress'); const { setupConnections, closeConnections } = require('./import/utils'); const importCategories = require('./import/categories'); const { importProducts } = require('./import/products'); const importOrders = require('./import/orders'); const importPurchaseOrders = require('./import/purchase-orders'); dotenv.config({ path: path.join(__dirname, "../.env") }); // Constants to control which imports run const IMPORT_CATEGORIES = false; const IMPORT_PRODUCTS = false; const IMPORT_ORDERS = true; const IMPORT_PURCHASE_ORDERS = true; // Add flag for incremental updates const INCREMENTAL_UPDATE = process.env.INCREMENTAL_UPDATE !== 'false'; // Default to true unless explicitly set to false // SSH configuration // In import-from-prod.js const sshConfig = { ssh: { host: process.env.PROD_SSH_HOST, port: process.env.PROD_SSH_PORT || 22, username: process.env.PROD_SSH_USER, privateKey: process.env.PROD_SSH_KEY_PATH ? require("fs").readFileSync(process.env.PROD_SSH_KEY_PATH) : undefined, compress: true, // Enable SSH compression }, prodDbConfig: { host: process.env.PROD_DB_HOST || "localhost", user: process.env.PROD_DB_USER, password: process.env.PROD_DB_PASSWORD, database: process.env.PROD_DB_NAME, port: process.env.PROD_DB_PORT || 3306, timezone: 'Z', }, localDbConfig: { host: process.env.DB_HOST, user: process.env.DB_USER, password: process.env.DB_PASSWORD, database: process.env.DB_NAME, multipleStatements: true, waitForConnections: true, connectionLimit: 10, queueLimit: 0, namedPlaceholders: true, maxAllowedPacket: 64 * 1024 * 1024, // 64MB connectTimeout: 60000, enableKeepAlive: true, keepAliveInitialDelay: 10000, compress: true, timezone: 'Z', stringifyObjects: false, } }; let isImportCancelled = false; // Add cancel function function cancelImport() { isImportCancelled = true; outputProgress({ status: 'cancelled', operation: 'Import process', message: 'Import cancelled by user', current: 0, total: 0, elapsed: null, remaining: null, rate: 0 }); } async function main() { const startTime = Date.now(); let connections; let completedSteps = 0; let importHistoryId; const totalSteps = [ IMPORT_CATEGORIES, IMPORT_PRODUCTS, IMPORT_ORDERS, IMPORT_PURCHASE_ORDERS ].filter(Boolean).length; try { // Initial progress update outputProgress({ status: "running", operation: "Import process", message: `Initializing SSH tunnel for ${INCREMENTAL_UPDATE ? 'incremental' : 'full'} import...`, current: completedSteps, total: totalSteps, elapsed: formatElapsedTime(startTime) }); connections = await setupConnections(sshConfig); const { prodConnection, localConnection } = connections; if (isImportCancelled) throw new Error("Import cancelled"); // Clean up any previously running imports that weren't completed await localConnection.query(` UPDATE import_history SET status = 'cancelled', end_time = NOW(), duration_seconds = TIMESTAMPDIFF(SECOND, start_time, NOW()), error_message = 'Previous import was not completed properly' WHERE status = 'running' `); // Initialize sync_status table if it doesn't exist await localConnection.query(` CREATE TABLE IF NOT EXISTS sync_status ( table_name VARCHAR(50) PRIMARY KEY, last_sync_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, last_sync_id BIGINT, INDEX idx_last_sync (last_sync_timestamp) ); `); // Create import history record for the overall session const [historyResult] = await localConnection.query(` INSERT INTO import_history ( table_name, start_time, is_incremental, status, additional_info ) VALUES ( 'all_tables', NOW(), ?, 'running', JSON_OBJECT( 'categories_enabled', ?, 'products_enabled', ?, 'orders_enabled', ?, 'purchase_orders_enabled', ? ) ) `, [INCREMENTAL_UPDATE, IMPORT_CATEGORIES, IMPORT_PRODUCTS, IMPORT_ORDERS, IMPORT_PURCHASE_ORDERS]); importHistoryId = historyResult.insertId; const results = { categories: null, products: null, orders: null, purchaseOrders: null }; let totalRecordsAdded = 0; let totalRecordsUpdated = 0; // Run each import based on constants if (IMPORT_CATEGORIES) { results.categories = await importCategories(prodConnection, localConnection); if (isImportCancelled) throw new Error("Import cancelled"); completedSteps++; if (results.categories.recordsAdded) totalRecordsAdded += results.categories.recordsAdded; if (results.categories.recordsUpdated) totalRecordsUpdated += results.categories.recordsUpdated; } if (IMPORT_PRODUCTS) { results.products = await importProducts(prodConnection, localConnection, INCREMENTAL_UPDATE); if (isImportCancelled) throw new Error("Import cancelled"); completedSteps++; if (results.products.recordsAdded) totalRecordsAdded += results.products.recordsAdded; if (results.products.recordsUpdated) totalRecordsUpdated += results.products.recordsUpdated; } if (IMPORT_ORDERS) { results.orders = await importOrders(prodConnection, localConnection, INCREMENTAL_UPDATE); if (isImportCancelled) throw new Error("Import cancelled"); completedSteps++; if (results.orders.recordsAdded) totalRecordsAdded += results.orders.recordsAdded; if (results.orders.recordsUpdated) totalRecordsUpdated += results.orders.recordsUpdated; } if (IMPORT_PURCHASE_ORDERS) { results.purchaseOrders = await importPurchaseOrders(prodConnection, localConnection, INCREMENTAL_UPDATE); if (isImportCancelled) throw new Error("Import cancelled"); completedSteps++; if (results.purchaseOrders.recordsAdded) totalRecordsAdded += results.purchaseOrders.recordsAdded; if (results.purchaseOrders.recordsUpdated) totalRecordsUpdated += results.purchaseOrders.recordsUpdated; } const endTime = Date.now(); const totalElapsedSeconds = Math.round((endTime - startTime) / 1000); // Update import history with final stats await localConnection.query(` UPDATE import_history SET end_time = NOW(), duration_seconds = ?, records_added = ?, records_updated = ?, status = 'completed', additional_info = JSON_OBJECT( 'categories_enabled', ?, 'products_enabled', ?, 'orders_enabled', ?, 'purchase_orders_enabled', ?, 'categories_result', CAST(? AS JSON), 'products_result', CAST(? AS JSON), 'orders_result', CAST(? AS JSON), 'purchase_orders_result', CAST(? AS JSON) ) WHERE id = ? `, [ totalElapsedSeconds, totalRecordsAdded, totalRecordsUpdated, IMPORT_CATEGORIES, IMPORT_PRODUCTS, IMPORT_ORDERS, IMPORT_PURCHASE_ORDERS, JSON.stringify(results.categories), JSON.stringify(results.products), JSON.stringify(results.orders), JSON.stringify(results.purchaseOrders), importHistoryId ]); outputProgress({ status: "complete", operation: "Import process", message: `${INCREMENTAL_UPDATE ? 'Incremental' : 'Full'} import completed successfully in ${formatElapsedTime(totalElapsedSeconds)}`, current: completedSteps, total: totalSteps, elapsed: formatElapsedTime(startTime), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date(endTime).toISOString(), elapsed_time: formatElapsedTime(startTime), elapsed_seconds: totalElapsedSeconds, total_duration: formatElapsedTime(totalElapsedSeconds) }, results }); return results; } catch (error) { const endTime = Date.now(); const totalElapsedSeconds = Math.round((endTime - startTime) / 1000); // Update import history with error if (importHistoryId && connections?.localConnection) { await connections.localConnection.query(` UPDATE import_history SET end_time = NOW(), duration_seconds = ?, status = ?, error_message = ? WHERE id = ? `, [totalElapsedSeconds, error.message === "Import cancelled" ? 'cancelled' : 'failed', error.message, importHistoryId]); } console.error("Error during import process:", error); outputProgress({ status: error.message === "Import cancelled" ? "cancelled" : "error", operation: "Import process", message: error.message === "Import cancelled" ? `${INCREMENTAL_UPDATE ? 'Incremental' : 'Full'} import cancelled by user after ${formatElapsedTime(totalElapsedSeconds)}` : `${INCREMENTAL_UPDATE ? 'Incremental' : 'Full'} import failed after ${formatElapsedTime(totalElapsedSeconds)}`, error: error.message, current: completedSteps, total: totalSteps, elapsed: formatElapsedTime(startTime), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date(endTime).toISOString(), elapsed_time: formatElapsedTime(startTime), elapsed_seconds: totalElapsedSeconds, total_duration: formatElapsedTime(totalElapsedSeconds) } }); throw error; } finally { if (connections) { await closeConnections(connections); } } } // Run the import only if this is the main module if (require.main === module) { main().catch((error) => { console.error("Unhandled error in main process:", error); process.exit(1); }); } // Export the functions needed by the route module.exports = { main, cancelImport, };