const express = require('express'); const router = express.Router(); const { spawn } = require('child_process'); const path = require('path'); const db = require('../utils/db'); // Debug middleware MUST be first router.use((req, res, next) => { console.log(`[CSV Route Debug] ${req.method} ${req.path}`); next(); }); // Store active processes and their progress let activeImport = null; let importProgress = null; let activeFullUpdate = null; let activeFullReset = null; // SSE clients for progress updates const updateClients = new Set(); const importClients = new Set(); const resetClients = new Set(); const resetMetricsClients = new Set(); const calculateMetricsClients = new Set(); const fullUpdateClients = new Set(); const fullResetClients = new Set(); // Helper to send progress to specific clients function sendProgressToClients(clients, data) { // If data is a string, send it directly // If it's an object, convert it to JSON const message = typeof data === 'string' ? `data: ${data}\n\n` : `data: ${JSON.stringify(data)}\n\n`; clients.forEach(client => { try { client.write(message); // Immediately flush the response if (typeof client.flush === 'function') { client.flush(); } } catch (error) { // Silently remove failed client clients.delete(client); } }); } // Helper to run a script and stream progress function runScript(scriptPath, type, clients) { return new Promise((resolve, reject) => { // Kill any existing process of this type let activeProcess; switch (type) { case 'update': if (activeFullUpdate) { try { activeFullUpdate.kill(); } catch (e) { } } activeProcess = activeFullUpdate; break; case 'reset': if (activeFullReset) { try { activeFullReset.kill(); } catch (e) { } } activeProcess = activeFullReset; break; } const child = spawn('node', [scriptPath], { stdio: ['inherit', 'pipe', 'pipe'] }); switch (type) { case 'update': activeFullUpdate = child; break; case 'reset': activeFullReset = child; break; } let output = ''; child.stdout.on('data', (data) => { const text = data.toString(); output += text; // Split by lines to handle multiple JSON outputs const lines = text.split('\n'); lines.filter(line => line.trim()).forEach(line => { try { // Try to parse as JSON but don't let it affect the display const jsonData = JSON.parse(line); // Only end the process if we get a final status if (jsonData.status === 'complete' || jsonData.status === 'error' || jsonData.status === 'cancelled') { if (jsonData.status === 'complete' && !jsonData.operation?.includes('complete')) { // Don't close for intermediate completion messages sendProgressToClients(clients, line); return; } // Close only on final completion/error/cancellation switch (type) { case 'update': activeFullUpdate = null; break; case 'reset': activeFullReset = null; break; } if (jsonData.status === 'error') { reject(new Error(jsonData.error || 'Unknown error')); } else { resolve({ output }); } } } catch (e) { // Not JSON, just display as is } // Always send the raw line sendProgressToClients(clients, line); }); }); child.stderr.on('data', (data) => { const text = data.toString(); console.error(text); // Send stderr output directly too sendProgressToClients(clients, text); }); child.on('close', (code) => { switch (type) { case 'update': activeFullUpdate = null; break; case 'reset': activeFullReset = null; break; } if (code !== 0) { const error = `Script ${scriptPath} exited with code ${code}`; sendProgressToClients(clients, error); reject(new Error(error)); } // Don't resolve here - let the completion message from the script trigger the resolve }); child.on('error', (err) => { switch (type) { case 'update': activeFullUpdate = null; break; case 'reset': activeFullReset = null; break; } sendProgressToClients(clients, err.message); reject(err); }); }); } // Progress endpoints router.get('/:type/progress', (req, res) => { const { type } = req.params; if (!['update', 'reset'].includes(type)) { return res.status(400).json({ error: 'Invalid operation type' }); } res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': req.headers.origin || '*', 'Access-Control-Allow-Credentials': 'true' }); // Add this client to the correct set const clients = type === 'update' ? fullUpdateClients : fullResetClients; clients.add(res); // Send initial connection message sendProgressToClients(new Set([res]), JSON.stringify({ status: 'running', operation: 'Initializing connection...' })); // Handle client disconnect req.on('close', () => { clients.delete(res); }); }); // Debug endpoint to verify route registration router.get('/test', (req, res) => { console.log('CSV test endpoint hit'); res.json({ message: 'CSV routes are working' }); }); // Route to check import status router.get('/status', (req, res) => { console.log('CSV status endpoint hit'); res.json({ active: !!activeImport, progress: importProgress }); }); // Add calculate-metrics status endpoint router.get('/calculate-metrics/status', (req, res) => { const calculateMetrics = require('../../scripts/calculate-metrics'); const progress = calculateMetrics.getProgress(); // Only consider it active if both the process is running and we have progress const isActive = !!activeImport && !!progress; res.json({ active: isActive, progress: isActive ? progress : null }); }); // Route to update CSV files router.post('/update', async (req, res, next) => { if (activeImport) { return res.status(409).json({ error: 'Import already in progress' }); } try { const scriptPath = path.join(__dirname, '..', '..', 'scripts', 'update-csv.js'); if (!require('fs').existsSync(scriptPath)) { return res.status(500).json({ error: 'Update script not found' }); } activeImport = spawn('node', [scriptPath]); activeImport.stdout.on('data', (data) => { const output = data.toString().trim(); try { // Try to parse as JSON const jsonData = JSON.parse(output); sendProgressToClients(updateClients, { status: 'running', ...jsonData }); } catch (e) { // If not JSON, send as plain progress sendProgressToClients(updateClients, { status: 'running', progress: output }); } }); activeImport.stderr.on('data', (data) => { const error = data.toString().trim(); try { // Try to parse as JSON const jsonData = JSON.parse(error); sendProgressToClients(updateClients, { status: 'error', ...jsonData }); } catch { sendProgressToClients(updateClients, { status: 'error', error }); } }); await new Promise((resolve, reject) => { activeImport.on('close', (code) => { // Don't treat cancellation (code 143/SIGTERM) as an error if (code === 0 || code === 143) { sendProgressToClients(updateClients, { status: 'complete', operation: code === 143 ? 'Operation cancelled' : 'Update complete' }); resolve(); } else { const errorMsg = `Update process exited with code ${code}`; sendProgressToClients(updateClients, { status: 'error', error: errorMsg }); reject(new Error(errorMsg)); } activeImport = null; importProgress = null; }); }); res.json({ success: true }); } catch (error) { console.error('Error updating CSV files:', error); activeImport = null; importProgress = null; sendProgressToClients(updateClients, { status: 'error', error: error.message }); next(error); } }); // Route to import CSV files router.post('/import', async (req, res) => { if (activeImport) { return res.status(409).json({ error: 'Import already in progress' }); } try { const scriptPath = path.join(__dirname, '..', '..', 'scripts', 'import-csv.js'); if (!require('fs').existsSync(scriptPath)) { return res.status(500).json({ error: 'Import script not found' }); } // Get test limits from request body const { products = 0, orders = 10000, purchaseOrders = 10000 } = req.body; // Create environment variables for the script const env = { ...process.env, PRODUCTS_TEST_LIMIT: products.toString(), ORDERS_TEST_LIMIT: orders.toString(), PURCHASE_ORDERS_TEST_LIMIT: purchaseOrders.toString() }; activeImport = spawn('node', [scriptPath], { env }); activeImport.stdout.on('data', (data) => { const output = data.toString().trim(); try { // Try to parse as JSON const jsonData = JSON.parse(output); sendProgressToClients(importClients, { status: 'running', ...jsonData }); } catch { // If not JSON, send as plain progress sendProgressToClients(importClients, { status: 'running', progress: output }); } }); activeImport.stderr.on('data', (data) => { const error = data.toString().trim(); try { // Try to parse as JSON const jsonData = JSON.parse(error); sendProgressToClients(importClients, { status: 'error', ...jsonData }); } catch { sendProgressToClients(importClients, { status: 'error', error }); } }); await new Promise((resolve, reject) => { activeImport.on('close', (code) => { // Don't treat cancellation (code 143/SIGTERM) as an error if (code === 0 || code === 143) { sendProgressToClients(importClients, { status: 'complete', operation: code === 143 ? 'Operation cancelled' : 'Import complete' }); resolve(); } else { sendProgressToClients(importClients, { status: 'error', error: `Process exited with code ${code}` }); reject(new Error(`Import process exited with code ${code}`)); } activeImport = null; importProgress = null; }); }); res.json({ success: true }); } catch (error) { console.error('Error importing CSV files:', error); activeImport = null; importProgress = null; sendProgressToClients(importClients, { status: 'error', error: error.message }); res.status(500).json({ error: 'Failed to import CSV files', details: error.message }); } }); // Route to cancel active process router.post('/cancel', (req, res) => { let killed = false; // Get the operation type from the request const { type } = req.query; const clients = type === 'update' ? fullUpdateClients : fullResetClients; const activeProcess = type === 'update' ? activeFullUpdate : activeFullReset; if (activeProcess) { try { activeProcess.kill('SIGTERM'); if (type === 'update') { activeFullUpdate = null; } else { activeFullReset = null; } killed = true; sendProgressToClients(clients, JSON.stringify({ status: 'cancelled', operation: 'Operation cancelled' })); } catch (err) { console.error(`Error killing ${type} process:`, err); } } if (killed) { res.json({ success: true }); } else { res.status(404).json({ error: 'No active process to cancel' }); } }); // Route to reset database router.post('/reset', async (req, res) => { if (activeImport) { return res.status(409).json({ error: 'Import already in progress' }); } try { const scriptPath = path.join(__dirname, '..', '..', 'scripts', 'reset-db.js'); if (!require('fs').existsSync(scriptPath)) { return res.status(500).json({ error: 'Reset script not found' }); } activeImport = spawn('node', [scriptPath]); activeImport.stdout.on('data', (data) => { const output = data.toString().trim(); try { // Try to parse as JSON const jsonData = JSON.parse(output); sendProgressToClients(resetClients, { status: 'running', ...jsonData }); } catch (e) { // If not JSON, send as plain progress sendProgressToClients(resetClients, { status: 'running', progress: output }); } }); activeImport.stderr.on('data', (data) => { const error = data.toString().trim(); try { // Try to parse as JSON const jsonData = JSON.parse(error); sendProgressToClients(resetClients, { status: 'error', ...jsonData }); } catch { sendProgressToClients(resetClients, { status: 'error', error }); } }); await new Promise((resolve, reject) => { activeImport.on('close', (code) => { // Don't treat cancellation (code 143/SIGTERM) as an error if (code === 0 || code === 143) { sendProgressToClients(resetClients, { status: 'complete', operation: code === 143 ? 'Operation cancelled' : 'Reset complete' }); resolve(); } else { const errorMsg = `Reset process exited with code ${code}`; sendProgressToClients(resetClients, { status: 'error', error: errorMsg }); reject(new Error(errorMsg)); } activeImport = null; importProgress = null; }); }); res.json({ success: true }); } catch (error) { console.error('Error resetting database:', error); activeImport = null; importProgress = null; sendProgressToClients(resetClients, { status: 'error', error: error.message }); res.status(500).json({ error: 'Failed to reset database', details: error.message }); } }); // Add reset-metrics endpoint router.post('/reset-metrics', async (req, res) => { if (activeImport) { res.status(400).json({ error: 'Operation already in progress' }); return; } try { // Set active import to prevent concurrent operations activeImport = { type: 'reset-metrics', status: 'running', operation: 'Starting metrics reset' }; // Send initial response res.status(200).json({ message: 'Reset metrics started' }); // Send initial progress through SSE sendProgressToClients(resetMetricsClients, { status: 'running', operation: 'Starting metrics reset' }); // Run the reset metrics script const resetMetrics = require('../../scripts/reset-metrics'); await resetMetrics(); // Send completion through SSE sendProgressToClients(resetMetricsClients, { status: 'complete', operation: 'Metrics reset completed' }); activeImport = null; } catch (error) { console.error('Error during metrics reset:', error); // Send error through SSE sendProgressToClients(resetMetricsClients, { status: 'error', error: error.message || 'Failed to reset metrics' }); activeImport = null; res.status(500).json({ error: error.message || 'Failed to reset metrics' }); } }); // Add calculate-metrics endpoint router.post('/calculate-metrics', async (req, res) => { if (activeImport) { return res.status(409).json({ error: 'Import already in progress' }); } try { const scriptPath = path.join(__dirname, '..', '..', 'scripts', 'calculate-metrics.js'); if (!require('fs').existsSync(scriptPath)) { return res.status(500).json({ error: 'Calculate metrics script not found' }); } activeImport = spawn('node', [scriptPath]); let wasCancelled = false; activeImport.stdout.on('data', (data) => { const output = data.toString().trim(); try { // Try to parse as JSON const jsonData = JSON.parse(output); importProgress = { status: 'running', ...jsonData.progress }; sendProgressToClients(calculateMetricsClients, importProgress); } catch (e) { // If not JSON, send as plain progress importProgress = { status: 'running', progress: output }; sendProgressToClients(calculateMetricsClients, importProgress); } }); activeImport.stderr.on('data', (data) => { if (wasCancelled) return; // Don't send errors if cancelled const error = data.toString().trim(); try { // Try to parse as JSON const jsonData = JSON.parse(error); importProgress = { status: 'error', ...jsonData.progress }; sendProgressToClients(calculateMetricsClients, importProgress); } catch { importProgress = { status: 'error', error }; sendProgressToClients(calculateMetricsClients, importProgress); } }); await new Promise((resolve, reject) => { activeImport.on('close', (code, signal) => { wasCancelled = signal === 'SIGTERM' || code === 143; activeImport = null; if (code === 0 || wasCancelled) { if (wasCancelled) { importProgress = { status: 'cancelled', operation: 'Operation cancelled' }; sendProgressToClients(calculateMetricsClients, importProgress); } else { importProgress = { status: 'complete', operation: 'Metrics calculation complete' }; sendProgressToClients(calculateMetricsClients, importProgress); } resolve(); } else { importProgress = null; reject(new Error(`Metrics calculation process exited with code ${code}`)); } }); }); res.json({ success: true }); } catch (error) { console.error('Error calculating metrics:', error); activeImport = null; importProgress = null; // Only send error if it wasn't a cancellation if (!error.message?.includes('code 143') && !error.message?.includes('SIGTERM')) { sendProgressToClients(calculateMetricsClients, { status: 'error', error: error.message }); res.status(500).json({ error: 'Failed to calculate metrics', details: error.message }); } else { res.json({ success: true }); } } }); // Route to import from production database router.post('/import-from-prod', async (req, res) => { if (activeImport) { return res.status(409).json({ error: 'Import already in progress' }); } try { const importFromProd = require('../../scripts/import-from-prod'); // Set up progress handler const progressHandler = (data) => { importProgress = data; sendProgressToClients(importClients, data); }; // Start the import process importFromProd.outputProgress = progressHandler; activeImport = importFromProd; // Store the module for cancellation // Run the import in the background importFromProd.main().catch(error => { console.error('Error in import process:', error); activeImport = null; importProgress = { status: error.message === 'Import cancelled' ? 'cancelled' : 'error', operation: 'Import process', error: error.message }; sendProgressToClients(importClients, importProgress); }).finally(() => { activeImport = null; }); res.json({ message: 'Import from production started' }); } catch (error) { console.error('Error starting production import:', error); activeImport = null; res.status(500).json({ error: error.message || 'Failed to start production import' }); } }); // POST /csv/full-update - Run full update script router.post('/full-update', async (req, res) => { try { const scriptPath = path.join(__dirname, '../../scripts/full-update.js'); runScript(scriptPath, 'update', fullUpdateClients) .catch(error => { console.error('Update failed:', error); }); res.status(202).json({ message: 'Update started' }); } catch (error) { res.status(500).json({ error: error.message }); } }); // POST /csv/full-reset - Run full reset script router.post('/full-reset', async (req, res) => { try { const scriptPath = path.join(__dirname, '../../scripts/full-reset.js'); runScript(scriptPath, 'reset', fullResetClients) .catch(error => { console.error('Reset failed:', error); }); res.status(202).json({ message: 'Reset started' }); } catch (error) { res.status(500).json({ error: error.message }); } }); // GET /history/import - Get recent import history router.get('/history/import', async (req, res) => { try { const pool = req.app.locals.pool; const { rows } = await pool.query(` SELECT id, start_time, end_time, status, error_message, rows_processed::integer, files_processed::integer FROM import_history ORDER BY start_time DESC LIMIT 20 `); res.json(rows || []); } catch (error) { console.error('Error fetching import history:', error); res.status(500).json({ error: error.message }); } }); // GET /history/calculate - Get recent calculation history router.get('/history/calculate', async (req, res) => { try { const pool = req.app.locals.pool; const { rows } = await pool.query(` SELECT id, start_time, end_time, status, error_message, modules_processed::integer, total_modules::integer FROM calculate_history ORDER BY start_time DESC LIMIT 20 `); res.json(rows || []); } catch (error) { console.error('Error fetching calculate history:', error); res.status(500).json({ error: error.message }); } }); // GET /status/modules - Get module calculation status router.get('/status/modules', async (req, res) => { try { const pool = req.app.locals.pool; const { rows } = await pool.query(` SELECT module_name, last_calculation_timestamp::timestamp FROM calculate_status ORDER BY module_name `); res.json(rows || []); } catch (error) { console.error('Error fetching module status:', error); res.status(500).json({ error: error.message }); } }); // GET /status/tables - Get table sync status router.get('/status/tables', async (req, res) => { try { const pool = req.app.locals.pool; const { rows } = await pool.query(` SELECT table_name, last_sync_timestamp::timestamp FROM sync_status ORDER BY table_name `); res.json(rows || []); } catch (error) { console.error('Error fetching table status:', error); res.status(500).json({ error: error.message }); } }); module.exports = router;