const express = require('express'); const router = express.Router(); const { spawn } = require('child_process'); const path = require('path'); // Debug middleware MUST be first router.use((req, res, next) => { console.log(`[CSV Route Debug] ${req.method} ${req.path}`); next(); }); // Store active import process and its progress let activeImport = null; let importProgress = null; // SSE clients for progress updates const updateClients = new Set(); const importClients = new Set(); const resetClients = new Set(); const resetMetricsClients = new Set(); const calculateMetricsClients = new Set(); // Helper to send progress to specific clients function sendProgressToClients(clients, progress) { const data = typeof progress === 'string' ? { progress } : progress; // Ensure we have a status field if (!data.status) { data.status = 'running'; } const message = `data: ${JSON.stringify(data)}\n\n`; clients.forEach(client => { try { client.write(message); // Immediately flush the response if (typeof client.flush === 'function') { client.flush(); } } catch (error) { // Silently remove failed client clients.delete(client); } }); } // Progress endpoints router.get('/update/progress', (req, res) => { res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': req.headers.origin || '*', 'Access-Control-Allow-Credentials': 'true' }); // Send an initial message to test the connection res.write('data: {"status":"running","operation":"Initializing connection..."}\n\n'); // Add this client to the update set updateClients.add(res); // Remove client when connection closes req.on('close', () => { updateClients.delete(res); }); }); router.get('/import/progress', (req, res) => { res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': req.headers.origin || '*', 'Access-Control-Allow-Credentials': 'true' }); // Send an initial message to test the connection res.write('data: {"status":"running","operation":"Initializing connection..."}\n\n'); // Add this client to the import set importClients.add(res); // Remove client when connection closes req.on('close', () => { importClients.delete(res); }); }); router.get('/reset/progress', (req, res) => { res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': req.headers.origin || '*', 'Access-Control-Allow-Credentials': 'true' }); // Send an initial message to test the connection res.write('data: {"status":"running","operation":"Initializing connection..."}\n\n'); // Add this client to the reset set resetClients.add(res); // Remove client when connection closes req.on('close', () => { resetClients.delete(res); }); }); // Add reset-metrics progress endpoint router.get('/reset-metrics/progress', (req, res) => { res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': req.headers.origin || '*', 'Access-Control-Allow-Credentials': 'true' }); // Send an initial message to test the connection res.write('data: {"status":"running","operation":"Initializing connection..."}\n\n'); // Add this client to the reset-metrics set resetMetricsClients.add(res); // Remove client when connection closes req.on('close', () => { resetMetricsClients.delete(res); }); }); // Add calculate-metrics progress endpoint router.get('/calculate-metrics/progress', (req, res) => { res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': req.headers.origin || '*', 'Access-Control-Allow-Credentials': 'true' }); // Send current progress if it exists if (importProgress) { res.write(`data: ${JSON.stringify(importProgress)}\n\n`); } else { res.write('data: {"status":"running","operation":"Initializing connection..."}\n\n'); } // Add this client to the calculate-metrics set calculateMetricsClients.add(res); // Remove client when connection closes req.on('close', () => { calculateMetricsClients.delete(res); }); }); // Debug endpoint to verify route registration router.get('/test', (req, res) => { console.log('CSV test endpoint hit'); res.json({ message: 'CSV routes are working' }); }); // Route to check import status router.get('/status', (req, res) => { console.log('CSV status endpoint hit'); res.json({ active: !!activeImport, progress: importProgress }); }); // Add calculate-metrics status endpoint router.get('/calculate-metrics/status', (req, res) => { console.log('Calculate metrics status endpoint hit'); const calculateMetrics = require('../../scripts/calculate-metrics'); const progress = calculateMetrics.getProgress(); // Only consider it active if both the process is running and we have progress const isActive = !!activeImport && !!progress; res.json({ active: isActive, progress: isActive ? progress : null }); }); // Route to update CSV files router.post('/update', async (req, res, next) => { if (activeImport) { return res.status(409).json({ error: 'Import already in progress' }); } try { const scriptPath = path.join(__dirname, '..', '..', 'scripts', 'update-csv.js'); if (!require('fs').existsSync(scriptPath)) { return res.status(500).json({ error: 'Update script not found' }); } activeImport = spawn('node', [scriptPath]); activeImport.stdout.on('data', (data) => { const output = data.toString().trim(); try { // Try to parse as JSON const jsonData = JSON.parse(output); sendProgressToClients(updateClients, { status: 'running', ...jsonData }); } catch (e) { // If not JSON, send as plain progress sendProgressToClients(updateClients, { status: 'running', progress: output }); } }); activeImport.stderr.on('data', (data) => { const error = data.toString().trim(); try { // Try to parse as JSON const jsonData = JSON.parse(error); sendProgressToClients(updateClients, { status: 'error', ...jsonData }); } catch { sendProgressToClients(updateClients, { status: 'error', error }); } }); await new Promise((resolve, reject) => { activeImport.on('close', (code) => { // Don't treat cancellation (code 143/SIGTERM) as an error if (code === 0 || code === 143) { sendProgressToClients(updateClients, { status: 'complete', operation: code === 143 ? 'Operation cancelled' : 'Update complete' }); resolve(); } else { const errorMsg = `Update process exited with code ${code}`; sendProgressToClients(updateClients, { status: 'error', error: errorMsg }); reject(new Error(errorMsg)); } activeImport = null; importProgress = null; }); }); res.json({ success: true }); } catch (error) { console.error('Error updating CSV files:', error); activeImport = null; importProgress = null; sendProgressToClients(updateClients, { status: 'error', error: error.message }); next(error); } }); // Route to import CSV files router.post('/import', async (req, res) => { if (activeImport) { return res.status(409).json({ error: 'Import already in progress' }); } try { const scriptPath = path.join(__dirname, '..', '..', 'scripts', 'import-csv.js'); if (!require('fs').existsSync(scriptPath)) { return res.status(500).json({ error: 'Import script not found' }); } // Get test limits from request body const { products = 0, orders = 10000, purchaseOrders = 10000 } = req.body; // Create environment variables for the script const env = { ...process.env, PRODUCTS_TEST_LIMIT: products.toString(), ORDERS_TEST_LIMIT: orders.toString(), PURCHASE_ORDERS_TEST_LIMIT: purchaseOrders.toString() }; activeImport = spawn('node', [scriptPath], { env }); activeImport.stdout.on('data', (data) => { const output = data.toString().trim(); try { // Try to parse as JSON const jsonData = JSON.parse(output); sendProgressToClients(importClients, { status: 'running', ...jsonData }); } catch { // If not JSON, send as plain progress sendProgressToClients(importClients, { status: 'running', progress: output }); } }); activeImport.stderr.on('data', (data) => { const error = data.toString().trim(); try { // Try to parse as JSON const jsonData = JSON.parse(error); sendProgressToClients(importClients, { status: 'error', ...jsonData }); } catch { sendProgressToClients(importClients, { status: 'error', error }); } }); await new Promise((resolve, reject) => { activeImport.on('close', (code) => { // Don't treat cancellation (code 143/SIGTERM) as an error if (code === 0 || code === 143) { sendProgressToClients(importClients, { status: 'complete', operation: code === 143 ? 'Operation cancelled' : 'Import complete' }); resolve(); } else { sendProgressToClients(importClients, { status: 'error', error: `Process exited with code ${code}` }); reject(new Error(`Import process exited with code ${code}`)); } activeImport = null; importProgress = null; }); }); res.json({ success: true }); } catch (error) { console.error('Error importing CSV files:', error); activeImport = null; importProgress = null; sendProgressToClients(importClients, { status: 'error', error: error.message }); res.status(500).json({ error: 'Failed to import CSV files', details: error.message }); } }); // Route to cancel active process router.post('/cancel', (req, res) => { if (!activeImport) { return res.status(404).json({ error: 'No active process to cancel' }); } try { // Kill the process with SIGTERM signal activeImport.kill('SIGTERM'); // Clean up activeImport = null; importProgress = null; // Get the operation type from the request const { operation } = req.query; // Send cancel message only to the appropriate client set const cancelMessage = { status: 'complete', operation: 'Operation cancelled' }; switch (operation) { case 'update': sendProgressToClients(updateClients, cancelMessage); break; case 'import': sendProgressToClients(importClients, cancelMessage); break; case 'reset': sendProgressToClients(resetClients, cancelMessage); break; case 'calculate-metrics': sendProgressToClients(calculateMetricsClients, cancelMessage); break; } res.json({ success: true }); } catch (error) { // Even if there's an error, try to clean up activeImport = null; importProgress = null; res.status(500).json({ error: 'Failed to cancel process' }); } }); // Route to reset database router.post('/reset', async (req, res) => { if (activeImport) { return res.status(409).json({ error: 'Import already in progress' }); } try { const scriptPath = path.join(__dirname, '..', '..', 'scripts', 'reset-db.js'); if (!require('fs').existsSync(scriptPath)) { return res.status(500).json({ error: 'Reset script not found' }); } activeImport = spawn('node', [scriptPath]); activeImport.stdout.on('data', (data) => { const output = data.toString().trim(); try { // Try to parse as JSON const jsonData = JSON.parse(output); sendProgressToClients(resetClients, { status: 'running', ...jsonData }); } catch (e) { // If not JSON, send as plain progress sendProgressToClients(resetClients, { status: 'running', progress: output }); } }); activeImport.stderr.on('data', (data) => { const error = data.toString().trim(); try { // Try to parse as JSON const jsonData = JSON.parse(error); sendProgressToClients(resetClients, { status: 'error', ...jsonData }); } catch { sendProgressToClients(resetClients, { status: 'error', error }); } }); await new Promise((resolve, reject) => { activeImport.on('close', (code) => { // Don't treat cancellation (code 143/SIGTERM) as an error if (code === 0 || code === 143) { sendProgressToClients(resetClients, { status: 'complete', operation: code === 143 ? 'Operation cancelled' : 'Reset complete' }); resolve(); } else { const errorMsg = `Reset process exited with code ${code}`; sendProgressToClients(resetClients, { status: 'error', error: errorMsg }); reject(new Error(errorMsg)); } activeImport = null; importProgress = null; }); }); res.json({ success: true }); } catch (error) { console.error('Error resetting database:', error); activeImport = null; importProgress = null; sendProgressToClients(resetClients, { status: 'error', error: error.message }); res.status(500).json({ error: 'Failed to reset database', details: error.message }); } }); // Add reset-metrics endpoint router.post('/reset-metrics', async (req, res) => { if (activeImport) { res.status(400).json({ error: 'Operation already in progress' }); return; } try { // Set active import to prevent concurrent operations activeImport = { type: 'reset-metrics', status: 'running', operation: 'Starting metrics reset' }; // Send initial response res.status(200).json({ message: 'Reset metrics started' }); // Send initial progress through SSE sendProgressToClients(resetMetricsClients, { status: 'running', operation: 'Starting metrics reset' }); // Run the reset metrics script const resetMetrics = require('../../scripts/reset-metrics'); await resetMetrics(); // Send completion through SSE sendProgressToClients(resetMetricsClients, { status: 'complete', operation: 'Metrics reset completed' }); activeImport = null; } catch (error) { console.error('Error during metrics reset:', error); // Send error through SSE sendProgressToClients(resetMetricsClients, { status: 'error', error: error.message || 'Failed to reset metrics' }); activeImport = null; res.status(500).json({ error: error.message || 'Failed to reset metrics' }); } }); // Add calculate-metrics status endpoint router.get('/calculate-metrics/status', (req, res) => { const calculateMetrics = require('../../scripts/calculate-metrics'); const progress = calculateMetrics.getProgress(); // Only consider it active if both the process is running and we have progress const isActive = !!activeImport && !!progress; res.json({ active: isActive, progress: isActive ? progress : null }); }); // Add calculate-metrics endpoint router.post('/calculate-metrics', async (req, res) => { if (activeImport) { return res.status(409).json({ error: 'Import already in progress' }); } try { const scriptPath = path.join(__dirname, '..', '..', 'scripts', 'calculate-metrics.js'); if (!require('fs').existsSync(scriptPath)) { return res.status(500).json({ error: 'Calculate metrics script not found' }); } activeImport = spawn('node', [scriptPath]); let wasCancelled = false; activeImport.stdout.on('data', (data) => { const output = data.toString().trim(); try { // Try to parse as JSON const jsonData = JSON.parse(output); importProgress = { status: 'running', ...jsonData.progress }; sendProgressToClients(calculateMetricsClients, importProgress); } catch (e) { // If not JSON, send as plain progress importProgress = { status: 'running', progress: output }; sendProgressToClients(calculateMetricsClients, importProgress); } }); activeImport.stderr.on('data', (data) => { if (wasCancelled) return; // Don't send errors if cancelled const error = data.toString().trim(); try { // Try to parse as JSON const jsonData = JSON.parse(error); importProgress = { status: 'error', ...jsonData.progress }; sendProgressToClients(calculateMetricsClients, importProgress); } catch { importProgress = { status: 'error', error }; sendProgressToClients(calculateMetricsClients, importProgress); } }); await new Promise((resolve, reject) => { activeImport.on('close', (code, signal) => { wasCancelled = signal === 'SIGTERM' || code === 143; activeImport = null; if (code === 0 || wasCancelled) { if (wasCancelled) { importProgress = { status: 'cancelled', operation: 'Operation cancelled' }; sendProgressToClients(calculateMetricsClients, importProgress); } else { importProgress = { status: 'complete', operation: 'Metrics calculation complete' }; sendProgressToClients(calculateMetricsClients, importProgress); } resolve(); } else { importProgress = null; reject(new Error(`Metrics calculation process exited with code ${code}`)); } }); }); res.json({ success: true }); } catch (error) { console.error('Error calculating metrics:', error); activeImport = null; importProgress = null; // Only send error if it wasn't a cancellation if (!error.message?.includes('code 143') && !error.message?.includes('SIGTERM')) { sendProgressToClients(calculateMetricsClients, { status: 'error', error: error.message }); res.status(500).json({ error: 'Failed to calculate metrics', details: error.message }); } else { res.json({ success: true }); } } }); module.exports = router;