const express = require('express'); const router = express.Router(); const { spawn } = require('child_process'); const path = require('path'); const db = require('../utils/db'); // Debug middleware MUST be first router.use((req, res, next) => { console.log(`[CSV Route Debug] ${req.method} ${req.path}`); next(); }); // Store active processes and their progress let activeImport = null; let importProgress = null; let activeFullUpdate = null; let activeFullReset = null; // SSE clients for progress updates const updateClients = new Set(); const importClients = new Set(); const resetClients = new Set(); const resetMetricsClients = new Set(); const calculateMetricsClients = new Set(); const fullUpdateClients = new Set(); const fullResetClients = new Set(); // Helper to send progress to specific clients function sendProgressToClients(clients, data) { // If data is a string, send it directly // If it's an object, convert it to JSON const message = typeof data === 'string' ? `data: ${data}\n\n` : `data: ${JSON.stringify(data)}\n\n`; clients.forEach(client => { try { client.write(message); // Immediately flush the response if (typeof client.flush === 'function') { client.flush(); } } catch (error) { // Silently remove failed client clients.delete(client); } }); } // Helper to run a script and stream progress function runScript(scriptPath, type, clients) { return new Promise((resolve, reject) => { // Kill any existing process of this type let activeProcess; switch (type) { case 'update': if (activeFullUpdate) { try { activeFullUpdate.kill(); } catch (e) { } } activeProcess = activeFullUpdate; break; case 'reset': if (activeFullReset) { try { activeFullReset.kill(); } catch (e) { } } activeProcess = activeFullReset; break; } const child = spawn('node', [scriptPath], { stdio: ['inherit', 'pipe', 'pipe'] }); switch (type) { case 'update': activeFullUpdate = child; break; case 'reset': activeFullReset = child; break; } let output = ''; child.stdout.on('data', (data) => { const text = data.toString(); output += text; // Split by lines to handle multiple JSON outputs const lines = text.split('\n'); lines.filter(line => line.trim()).forEach(line => { try { // Try to parse as JSON but don't let it affect the display const jsonData = JSON.parse(line); // Only end the process if we get a final status if (jsonData.status === 'complete' || jsonData.status === 'error' || jsonData.status === 'cancelled') { if (jsonData.status === 'complete' && !jsonData.operation?.includes('complete')) { // Don't close for intermediate completion messages sendProgressToClients(clients, line); return; } // Close only on final completion/error/cancellation switch (type) { case 'update': activeFullUpdate = null; break; case 'reset': activeFullReset = null; break; } if (jsonData.status === 'error') { reject(new Error(jsonData.error || 'Unknown error')); } else { resolve({ output }); } } } catch (e) { // Not JSON, just display as is } // Always send the raw line sendProgressToClients(clients, line); }); }); child.stderr.on('data', (data) => { const text = data.toString(); console.error(text); // Send stderr output directly too sendProgressToClients(clients, text); }); child.on('close', (code) => { switch (type) { case 'update': activeFullUpdate = null; break; case 'reset': activeFullReset = null; break; } if (code !== 0) { const error = `Script ${scriptPath} exited with code ${code}`; sendProgressToClients(clients, error); reject(new Error(error)); } // Don't resolve here - let the completion message from the script trigger the resolve }); child.on('error', (err) => { switch (type) { case 'update': activeFullUpdate = null; break; case 'reset': activeFullReset = null; break; } sendProgressToClients(clients, err.message); reject(err); }); }); } // Progress endpoints router.get('/:type/progress', (req, res) => { const { type } = req.params; if (!['update', 'reset'].includes(type)) { return res.status(400).json({ error: 'Invalid operation type' }); } res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': req.headers.origin || '*', 'Access-Control-Allow-Credentials': 'true' }); // Add this client to the correct set const clients = type === 'update' ? fullUpdateClients : fullResetClients; clients.add(res); // Send initial connection message sendProgressToClients(new Set([res]), JSON.stringify({ status: 'running', operation: 'Initializing connection...' })); // Handle client disconnect req.on('close', () => { clients.delete(res); }); }); // Route to cancel active process router.post('/cancel', (req, res) => { let killed = false; // Get the operation type from the request const { type } = req.query; const clients = type === 'update' ? fullUpdateClients : fullResetClients; const activeProcess = type === 'update' ? activeFullUpdate : activeFullReset; if (activeProcess) { try { activeProcess.kill('SIGTERM'); if (type === 'update') { activeFullUpdate = null; } else { activeFullReset = null; } killed = true; sendProgressToClients(clients, JSON.stringify({ status: 'cancelled', operation: 'Operation cancelled' })); } catch (err) { console.error(`Error killing ${type} process:`, err); } } if (killed) { res.json({ success: true }); } else { res.status(404).json({ error: 'No active process to cancel' }); } }); // POST /csv/full-update - Run full update script router.post('/full-update', async (req, res) => { try { const scriptPath = path.join(__dirname, '../../scripts/full-update.js'); runScript(scriptPath, 'update', fullUpdateClients) .catch(error => { console.error('Update failed:', error); }); res.status(202).json({ message: 'Update started' }); } catch (error) { res.status(500).json({ error: error.message }); } }); // POST /csv/full-reset - Run full reset script router.post('/full-reset', async (req, res) => { try { const scriptPath = path.join(__dirname, '../../scripts/full-reset.js'); runScript(scriptPath, 'reset', fullResetClients) .catch(error => { console.error('Reset failed:', error); }); res.status(202).json({ message: 'Reset started' }); } catch (error) { res.status(500).json({ error: error.message }); } }); // GET /history/import - Get recent import history router.get('/history/import', async (req, res) => { try { const pool = req.app.locals.pool; const { rows } = await pool.query(` SELECT id, start_time, end_time, status, error_message, records_added::integer, records_updated::integer FROM import_history ORDER BY start_time DESC LIMIT 20 `); res.json(rows || []); } catch (error) { console.error('Error fetching import history:', error); res.status(500).json({ error: error.message }); } }); // GET /history/calculate - Get recent calculation history router.get('/history/calculate', async (req, res) => { try { const pool = req.app.locals.pool; const { rows } = await pool.query(` SELECT id, start_time, end_time, duration_minutes, status, error_message, total_products, total_orders, total_purchase_orders, processed_products, processed_orders, processed_purchase_orders, additional_info FROM calculate_history ORDER BY start_time DESC LIMIT 20 `); res.json(rows || []); } catch (error) { console.error('Error fetching calculate history:', error); res.status(500).json({ error: error.message }); } }); // GET /status/modules - Get module calculation status router.get('/status/modules', async (req, res) => { try { const pool = req.app.locals.pool; const { rows } = await pool.query(` SELECT module_name, last_calculation_timestamp::timestamp FROM calculate_status ORDER BY module_name `); res.json(rows || []); } catch (error) { console.error('Error fetching module status:', error); res.status(500).json({ error: error.message }); } }); // GET /status/tables - Get table sync status router.get('/status/tables', async (req, res) => { try { const pool = req.app.locals.pool; const { rows } = await pool.query(` SELECT table_name, last_sync_timestamp::timestamp FROM sync_status ORDER BY table_name `); res.json(rows || []); } catch (error) { console.error('Error fetching table status:', error); res.status(500).json({ error: error.message }); } }); // GET /status/table-counts - Get record counts for all tables router.get('/status/table-counts', async (req, res) => { try { const pool = req.app.locals.pool; const tables = [ // Core tables 'products', 'categories', 'product_categories', 'orders', 'purchase_orders', // New metrics tables 'product_metrics', 'daily_product_snapshots','brand_metrics','category_metrics','vendor_metrics', // Config tables 'settings_global', 'settings_vendor', 'settings_product' ]; const counts = await Promise.all( tables.map(table => pool.query(`SELECT COUNT(*) as count FROM ${table}`) .then(result => ({ table_name: table, count: parseInt(result.rows[0].count) })) .catch(err => ({ table_name: table, count: null, error: err.message })) ) ); // Group tables by type const groupedCounts = { core: counts.filter(c => ['products', 'categories', 'product_categories', 'orders', 'purchase_orders'].includes(c.table_name)), metrics: counts.filter(c => ['product_metrics', 'daily_product_snapshots','brand_metrics','category_metrics','vendor_metrics'].includes(c.table_name)), config: counts.filter(c => ['settings_global', 'settings_vendor', 'settings_product'].includes(c.table_name)) }; res.json(groupedCounts); } catch (error) { console.error('Error fetching table counts:', error); res.status(500).json({ error: error.message }); } }); module.exports = router;