297 lines
8.9 KiB
JavaScript
297 lines
8.9 KiB
JavaScript
const { Client } = require('ssh2');
|
|
const mysql = require('mysql2/promise');
|
|
const fs = require('fs');
|
|
|
|
// Connection pool configuration
|
|
const connectionPool = {
|
|
connections: [],
|
|
maxConnections: 20,
|
|
currentConnections: 0,
|
|
pendingRequests: [],
|
|
// Cache for query results (key: query string, value: {data, timestamp})
|
|
queryCache: new Map(),
|
|
// Cache duration for different query types in milliseconds
|
|
cacheDuration: {
|
|
'stats': 60 * 1000, // 1 minute for stats
|
|
'products': 5 * 60 * 1000, // 5 minutes for products
|
|
'orders': 60 * 1000, // 1 minute for orders
|
|
'default': 60 * 1000 // 1 minute default
|
|
},
|
|
// Circuit breaker state
|
|
circuitBreaker: {
|
|
failures: 0,
|
|
lastFailure: 0,
|
|
isOpen: false,
|
|
threshold: 5,
|
|
timeout: 30000 // 30 seconds
|
|
}
|
|
};
|
|
|
|
/**
|
|
* Get a database connection from the pool
|
|
* @returns {Promise<{connection: object, release: function}>} The database connection and release function
|
|
*/
|
|
async function getDbConnection() {
|
|
return new Promise(async (resolve, reject) => {
|
|
// Check circuit breaker
|
|
const now = Date.now();
|
|
if (connectionPool.circuitBreaker.isOpen) {
|
|
if (now - connectionPool.circuitBreaker.lastFailure > connectionPool.circuitBreaker.timeout) {
|
|
// Reset circuit breaker
|
|
connectionPool.circuitBreaker.isOpen = false;
|
|
connectionPool.circuitBreaker.failures = 0;
|
|
console.log('Circuit breaker reset');
|
|
} else {
|
|
reject(new Error('Circuit breaker is open - too many connection failures'));
|
|
return;
|
|
}
|
|
}
|
|
|
|
// Check if there's an available connection in the pool
|
|
if (connectionPool.connections.length > 0) {
|
|
const conn = connectionPool.connections.pop();
|
|
console.log(`Using pooled connection. Pool size: ${connectionPool.connections.length}`);
|
|
resolve({
|
|
connection: conn.connection,
|
|
release: () => releaseConnection(conn)
|
|
});
|
|
return;
|
|
}
|
|
|
|
// If we haven't reached max connections, create a new one
|
|
if (connectionPool.currentConnections < connectionPool.maxConnections) {
|
|
try {
|
|
console.log(`Creating new connection. Current: ${connectionPool.currentConnections}/${connectionPool.maxConnections}`);
|
|
connectionPool.currentConnections++;
|
|
|
|
const tunnel = await setupSshTunnel();
|
|
const { ssh, stream, dbConfig } = tunnel;
|
|
|
|
const connection = await mysql.createConnection({
|
|
...dbConfig,
|
|
stream
|
|
});
|
|
|
|
const conn = { ssh, connection, inUse: true, created: Date.now() };
|
|
|
|
console.log('Database connection established');
|
|
|
|
// Reset circuit breaker on successful connection
|
|
if (connectionPool.circuitBreaker.failures > 0) {
|
|
connectionPool.circuitBreaker.failures = 0;
|
|
connectionPool.circuitBreaker.isOpen = false;
|
|
}
|
|
|
|
resolve({
|
|
connection: conn.connection,
|
|
release: () => releaseConnection(conn)
|
|
});
|
|
} catch (error) {
|
|
connectionPool.currentConnections--;
|
|
|
|
// Track circuit breaker failures
|
|
connectionPool.circuitBreaker.failures++;
|
|
connectionPool.circuitBreaker.lastFailure = Date.now();
|
|
|
|
if (connectionPool.circuitBreaker.failures >= connectionPool.circuitBreaker.threshold) {
|
|
connectionPool.circuitBreaker.isOpen = true;
|
|
console.log(`Circuit breaker opened after ${connectionPool.circuitBreaker.failures} failures`);
|
|
}
|
|
|
|
reject(error);
|
|
}
|
|
return;
|
|
}
|
|
|
|
// Pool is full, queue the request with timeout
|
|
console.log('Connection pool full, queuing request...');
|
|
const timeoutId = setTimeout(() => {
|
|
// Remove from queue if still there
|
|
const index = connectionPool.pendingRequests.findIndex(req => req.resolve === resolve);
|
|
if (index !== -1) {
|
|
connectionPool.pendingRequests.splice(index, 1);
|
|
reject(new Error('Connection pool queue timeout after 15 seconds'));
|
|
}
|
|
}, 15000);
|
|
|
|
connectionPool.pendingRequests.push({
|
|
resolve,
|
|
reject,
|
|
timeoutId,
|
|
timestamp: Date.now()
|
|
});
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Release a connection back to the pool
|
|
*/
|
|
function releaseConnection(conn) {
|
|
conn.inUse = false;
|
|
|
|
// Check if there are pending requests
|
|
if (connectionPool.pendingRequests.length > 0) {
|
|
const { resolve, timeoutId } = connectionPool.pendingRequests.shift();
|
|
|
|
// Clear the timeout since we're serving the request
|
|
if (timeoutId) {
|
|
clearTimeout(timeoutId);
|
|
}
|
|
|
|
conn.inUse = true;
|
|
console.log(`Serving queued request. Queue length: ${connectionPool.pendingRequests.length}`);
|
|
resolve({
|
|
connection: conn.connection,
|
|
release: () => releaseConnection(conn)
|
|
});
|
|
} else {
|
|
// Return to pool
|
|
connectionPool.connections.push(conn);
|
|
console.log(`Connection returned to pool. Pool size: ${connectionPool.connections.length}, Active: ${connectionPool.currentConnections}`);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get cached query results or execute query if not cached
|
|
* @param {string} cacheKey - Unique key to identify the query
|
|
* @param {string} queryType - Type of query (stats, products, orders, etc.)
|
|
* @param {Function} queryFn - Function to execute if cache miss
|
|
* @returns {Promise<any>} The query result
|
|
*/
|
|
async function getCachedQuery(cacheKey, queryType, queryFn) {
|
|
// Get cache duration based on query type
|
|
const cacheDuration = connectionPool.cacheDuration[queryType] || connectionPool.cacheDuration.default;
|
|
|
|
// Check if we have a valid cached result
|
|
const cachedResult = connectionPool.queryCache.get(cacheKey);
|
|
const now = Date.now();
|
|
|
|
if (cachedResult && (now - cachedResult.timestamp < cacheDuration)) {
|
|
console.log(`Cache hit for ${queryType} query: ${cacheKey}`);
|
|
return cachedResult.data;
|
|
}
|
|
|
|
// No valid cache found, execute the query
|
|
console.log(`Cache miss for ${queryType} query: ${cacheKey}`);
|
|
const result = await queryFn();
|
|
|
|
// Cache the result
|
|
connectionPool.queryCache.set(cacheKey, {
|
|
data: result,
|
|
timestamp: now
|
|
});
|
|
|
|
return result;
|
|
}
|
|
|
|
/**
|
|
* Setup SSH tunnel to production database
|
|
* @private - Should only be used by getDbConnection
|
|
* @returns {Promise<{ssh: object, stream: object, dbConfig: object}>}
|
|
*/
|
|
async function setupSshTunnel() {
|
|
const sshConfig = {
|
|
host: process.env.PROD_SSH_HOST,
|
|
port: process.env.PROD_SSH_PORT || 22,
|
|
username: process.env.PROD_SSH_USER,
|
|
privateKey: process.env.PROD_SSH_KEY_PATH
|
|
? fs.readFileSync(process.env.PROD_SSH_KEY_PATH)
|
|
: undefined,
|
|
compress: true
|
|
};
|
|
|
|
const dbConfig = {
|
|
host: process.env.PROD_DB_HOST || 'localhost',
|
|
user: process.env.PROD_DB_USER,
|
|
password: process.env.PROD_DB_PASSWORD,
|
|
database: process.env.PROD_DB_NAME,
|
|
port: process.env.PROD_DB_PORT || 3306,
|
|
timezone: 'Z'
|
|
};
|
|
|
|
return new Promise((resolve, reject) => {
|
|
const ssh = new Client();
|
|
|
|
ssh.on('error', (err) => {
|
|
console.error('SSH connection error:', err);
|
|
reject(err);
|
|
});
|
|
|
|
ssh.on('ready', () => {
|
|
ssh.forwardOut(
|
|
'127.0.0.1',
|
|
0,
|
|
dbConfig.host,
|
|
dbConfig.port,
|
|
(err, stream) => {
|
|
if (err) reject(err);
|
|
resolve({ ssh, stream, dbConfig });
|
|
}
|
|
);
|
|
}).connect(sshConfig);
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Clear cached query results
|
|
* @param {string} [cacheKey] - Specific cache key to clear (clears all if not provided)
|
|
*/
|
|
function clearQueryCache(cacheKey) {
|
|
if (cacheKey) {
|
|
connectionPool.queryCache.delete(cacheKey);
|
|
console.log(`Cleared cache for key: ${cacheKey}`);
|
|
} else {
|
|
connectionPool.queryCache.clear();
|
|
console.log('Cleared all query cache');
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Force close all active connections
|
|
* Useful for server shutdown or manual connection reset
|
|
*/
|
|
async function closeAllConnections() {
|
|
// Close all pooled connections
|
|
for (const conn of connectionPool.connections) {
|
|
try {
|
|
await conn.connection.end();
|
|
conn.ssh.end();
|
|
console.log('Closed pooled connection');
|
|
} catch (error) {
|
|
console.error('Error closing pooled connection:', error);
|
|
}
|
|
}
|
|
|
|
// Reset pool state
|
|
connectionPool.connections = [];
|
|
connectionPool.currentConnections = 0;
|
|
connectionPool.pendingRequests = [];
|
|
connectionPool.queryCache.clear();
|
|
|
|
console.log('All connections closed and pool reset');
|
|
}
|
|
|
|
/**
|
|
* Get connection pool status for debugging
|
|
*/
|
|
function getPoolStatus() {
|
|
return {
|
|
poolSize: connectionPool.connections.length,
|
|
activeConnections: connectionPool.currentConnections,
|
|
maxConnections: connectionPool.maxConnections,
|
|
pendingRequests: connectionPool.pendingRequests.length,
|
|
cacheSize: connectionPool.queryCache.size,
|
|
queuedRequests: connectionPool.pendingRequests.map(req => ({
|
|
waitTime: Date.now() - req.timestamp,
|
|
hasTimeout: !!req.timeoutId
|
|
}))
|
|
};
|
|
}
|
|
|
|
module.exports = {
|
|
getDbConnection,
|
|
getCachedQuery,
|
|
clearQueryCache,
|
|
closeAllConnections,
|
|
getPoolStatus
|
|
};
|