const { Client } = require('ssh2'); const mysql = require('mysql2/promise'); const fs = require('fs'); // Connection pool configuration const connectionPool = { connections: [], maxConnections: 20, currentConnections: 0, pendingRequests: [], // Cache for query results (key: query string, value: {data, timestamp}) queryCache: new Map(), // Cache duration for different query types in milliseconds cacheDuration: { 'stats': 60 * 1000, // 1 minute for stats 'products': 5 * 60 * 1000, // 5 minutes for products 'orders': 60 * 1000, // 1 minute for orders 'default': 60 * 1000 // 1 minute default }, // Circuit breaker state circuitBreaker: { failures: 0, lastFailure: 0, isOpen: false, threshold: 5, timeout: 30000 // 30 seconds } }; /** * Get a database connection from the pool * @returns {Promise<{connection: object, release: function}>} The database connection and release function */ async function getDbConnection() { return new Promise(async (resolve, reject) => { // Check circuit breaker const now = Date.now(); if (connectionPool.circuitBreaker.isOpen) { if (now - connectionPool.circuitBreaker.lastFailure > connectionPool.circuitBreaker.timeout) { // Reset circuit breaker connectionPool.circuitBreaker.isOpen = false; connectionPool.circuitBreaker.failures = 0; console.log('Circuit breaker reset'); } else { reject(new Error('Circuit breaker is open - too many connection failures')); return; } } // Check if there's an available connection in the pool if (connectionPool.connections.length > 0) { const conn = connectionPool.connections.pop(); console.log(`Using pooled connection. Pool size: ${connectionPool.connections.length}`); resolve({ connection: conn.connection, release: () => releaseConnection(conn) }); return; } // If we haven't reached max connections, create a new one if (connectionPool.currentConnections < connectionPool.maxConnections) { try { console.log(`Creating new connection. Current: ${connectionPool.currentConnections}/${connectionPool.maxConnections}`); connectionPool.currentConnections++; const tunnel = await setupSshTunnel(); const { ssh, stream, dbConfig } = tunnel; const connection = await mysql.createConnection({ ...dbConfig, stream }); const conn = { ssh, connection, inUse: true, created: Date.now() }; console.log('Database connection established'); // Reset circuit breaker on successful connection if (connectionPool.circuitBreaker.failures > 0) { connectionPool.circuitBreaker.failures = 0; connectionPool.circuitBreaker.isOpen = false; } resolve({ connection: conn.connection, release: () => releaseConnection(conn) }); } catch (error) { connectionPool.currentConnections--; // Track circuit breaker failures connectionPool.circuitBreaker.failures++; connectionPool.circuitBreaker.lastFailure = Date.now(); if (connectionPool.circuitBreaker.failures >= connectionPool.circuitBreaker.threshold) { connectionPool.circuitBreaker.isOpen = true; console.log(`Circuit breaker opened after ${connectionPool.circuitBreaker.failures} failures`); } reject(error); } return; } // Pool is full, queue the request with timeout console.log('Connection pool full, queuing request...'); const timeoutId = setTimeout(() => { // Remove from queue if still there const index = connectionPool.pendingRequests.findIndex(req => req.resolve === resolve); if (index !== -1) { connectionPool.pendingRequests.splice(index, 1); reject(new Error('Connection pool queue timeout after 15 seconds')); } }, 15000); connectionPool.pendingRequests.push({ resolve, reject, timeoutId, timestamp: Date.now() }); }); } /** * Release a connection back to the pool */ function releaseConnection(conn) { conn.inUse = false; // Check if there are pending requests if (connectionPool.pendingRequests.length > 0) { const { resolve, timeoutId } = connectionPool.pendingRequests.shift(); // Clear the timeout since we're serving the request if (timeoutId) { clearTimeout(timeoutId); } conn.inUse = true; console.log(`Serving queued request. Queue length: ${connectionPool.pendingRequests.length}`); resolve({ connection: conn.connection, release: () => releaseConnection(conn) }); } else { // Return to pool connectionPool.connections.push(conn); console.log(`Connection returned to pool. Pool size: ${connectionPool.connections.length}, Active: ${connectionPool.currentConnections}`); } } /** * Get cached query results or execute query if not cached * @param {string} cacheKey - Unique key to identify the query * @param {string} queryType - Type of query (stats, products, orders, etc.) * @param {Function} queryFn - Function to execute if cache miss * @returns {Promise} The query result */ async function getCachedQuery(cacheKey, queryType, queryFn) { // Get cache duration based on query type const cacheDuration = connectionPool.cacheDuration[queryType] || connectionPool.cacheDuration.default; // Check if we have a valid cached result const cachedResult = connectionPool.queryCache.get(cacheKey); const now = Date.now(); if (cachedResult && (now - cachedResult.timestamp < cacheDuration)) { console.log(`Cache hit for ${queryType} query: ${cacheKey}`); return cachedResult.data; } // No valid cache found, execute the query console.log(`Cache miss for ${queryType} query: ${cacheKey}`); const result = await queryFn(); // Cache the result connectionPool.queryCache.set(cacheKey, { data: result, timestamp: now }); return result; } /** * Setup SSH tunnel to production database * @private - Should only be used by getDbConnection * @returns {Promise<{ssh: object, stream: object, dbConfig: object}>} */ async function setupSshTunnel() { const sshConfig = { host: process.env.PROD_SSH_HOST, port: process.env.PROD_SSH_PORT || 22, username: process.env.PROD_SSH_USER, privateKey: process.env.PROD_SSH_KEY_PATH ? fs.readFileSync(process.env.PROD_SSH_KEY_PATH) : undefined, compress: true }; const dbConfig = { host: process.env.PROD_DB_HOST || 'localhost', user: process.env.PROD_DB_USER, password: process.env.PROD_DB_PASSWORD, database: process.env.PROD_DB_NAME, port: process.env.PROD_DB_PORT || 3306, timezone: 'Z' }; return new Promise((resolve, reject) => { const ssh = new Client(); ssh.on('error', (err) => { console.error('SSH connection error:', err); reject(err); }); ssh.on('ready', () => { ssh.forwardOut( '127.0.0.1', 0, dbConfig.host, dbConfig.port, (err, stream) => { if (err) reject(err); resolve({ ssh, stream, dbConfig }); } ); }).connect(sshConfig); }); } /** * Clear cached query results * @param {string} [cacheKey] - Specific cache key to clear (clears all if not provided) */ function clearQueryCache(cacheKey) { if (cacheKey) { connectionPool.queryCache.delete(cacheKey); console.log(`Cleared cache for key: ${cacheKey}`); } else { connectionPool.queryCache.clear(); console.log('Cleared all query cache'); } } /** * Force close all active connections * Useful for server shutdown or manual connection reset */ async function closeAllConnections() { // Close all pooled connections for (const conn of connectionPool.connections) { try { await conn.connection.end(); conn.ssh.end(); console.log('Closed pooled connection'); } catch (error) { console.error('Error closing pooled connection:', error); } } // Reset pool state connectionPool.connections = []; connectionPool.currentConnections = 0; connectionPool.pendingRequests = []; connectionPool.queryCache.clear(); console.log('All connections closed and pool reset'); } /** * Get connection pool status for debugging */ function getPoolStatus() { return { poolSize: connectionPool.connections.length, activeConnections: connectionPool.currentConnections, maxConnections: connectionPool.maxConnections, pendingRequests: connectionPool.pendingRequests.length, cacheSize: connectionPool.queryCache.size, queuedRequests: connectionPool.pendingRequests.map(req => ({ waitTime: Date.now() - req.timestamp, hasTimeout: !!req.timeoutId })) }; } module.exports = { getDbConnection, getCachedQuery, clearQueryCache, closeAllConnections, getPoolStatus };