Files
inventory/inventory-server/dashboard/acot-server/db/connection.js
2025-10-04 16:14:09 -04:00

297 lines
8.9 KiB
JavaScript

const { Client } = require('ssh2');
const mysql = require('mysql2/promise');
const fs = require('fs');
// Connection pool configuration
const connectionPool = {
connections: [],
maxConnections: 20,
currentConnections: 0,
pendingRequests: [],
// Cache for query results (key: query string, value: {data, timestamp})
queryCache: new Map(),
// Cache duration for different query types in milliseconds
cacheDuration: {
'stats': 60 * 1000, // 1 minute for stats
'products': 5 * 60 * 1000, // 5 minutes for products
'orders': 60 * 1000, // 1 minute for orders
'default': 60 * 1000 // 1 minute default
},
// Circuit breaker state
circuitBreaker: {
failures: 0,
lastFailure: 0,
isOpen: false,
threshold: 5,
timeout: 30000 // 30 seconds
}
};
/**
* Get a database connection from the pool
* @returns {Promise<{connection: object, release: function}>} The database connection and release function
*/
async function getDbConnection() {
return new Promise(async (resolve, reject) => {
// Check circuit breaker
const now = Date.now();
if (connectionPool.circuitBreaker.isOpen) {
if (now - connectionPool.circuitBreaker.lastFailure > connectionPool.circuitBreaker.timeout) {
// Reset circuit breaker
connectionPool.circuitBreaker.isOpen = false;
connectionPool.circuitBreaker.failures = 0;
console.log('Circuit breaker reset');
} else {
reject(new Error('Circuit breaker is open - too many connection failures'));
return;
}
}
// Check if there's an available connection in the pool
if (connectionPool.connections.length > 0) {
const conn = connectionPool.connections.pop();
console.log(`Using pooled connection. Pool size: ${connectionPool.connections.length}`);
resolve({
connection: conn.connection,
release: () => releaseConnection(conn)
});
return;
}
// If we haven't reached max connections, create a new one
if (connectionPool.currentConnections < connectionPool.maxConnections) {
try {
console.log(`Creating new connection. Current: ${connectionPool.currentConnections}/${connectionPool.maxConnections}`);
connectionPool.currentConnections++;
const tunnel = await setupSshTunnel();
const { ssh, stream, dbConfig } = tunnel;
const connection = await mysql.createConnection({
...dbConfig,
stream
});
const conn = { ssh, connection, inUse: true, created: Date.now() };
console.log('Database connection established');
// Reset circuit breaker on successful connection
if (connectionPool.circuitBreaker.failures > 0) {
connectionPool.circuitBreaker.failures = 0;
connectionPool.circuitBreaker.isOpen = false;
}
resolve({
connection: conn.connection,
release: () => releaseConnection(conn)
});
} catch (error) {
connectionPool.currentConnections--;
// Track circuit breaker failures
connectionPool.circuitBreaker.failures++;
connectionPool.circuitBreaker.lastFailure = Date.now();
if (connectionPool.circuitBreaker.failures >= connectionPool.circuitBreaker.threshold) {
connectionPool.circuitBreaker.isOpen = true;
console.log(`Circuit breaker opened after ${connectionPool.circuitBreaker.failures} failures`);
}
reject(error);
}
return;
}
// Pool is full, queue the request with timeout
console.log('Connection pool full, queuing request...');
const timeoutId = setTimeout(() => {
// Remove from queue if still there
const index = connectionPool.pendingRequests.findIndex(req => req.resolve === resolve);
if (index !== -1) {
connectionPool.pendingRequests.splice(index, 1);
reject(new Error('Connection pool queue timeout after 15 seconds'));
}
}, 15000);
connectionPool.pendingRequests.push({
resolve,
reject,
timeoutId,
timestamp: Date.now()
});
});
}
/**
* Release a connection back to the pool
*/
function releaseConnection(conn) {
conn.inUse = false;
// Check if there are pending requests
if (connectionPool.pendingRequests.length > 0) {
const { resolve, timeoutId } = connectionPool.pendingRequests.shift();
// Clear the timeout since we're serving the request
if (timeoutId) {
clearTimeout(timeoutId);
}
conn.inUse = true;
console.log(`Serving queued request. Queue length: ${connectionPool.pendingRequests.length}`);
resolve({
connection: conn.connection,
release: () => releaseConnection(conn)
});
} else {
// Return to pool
connectionPool.connections.push(conn);
console.log(`Connection returned to pool. Pool size: ${connectionPool.connections.length}, Active: ${connectionPool.currentConnections}`);
}
}
/**
* Get cached query results or execute query if not cached
* @param {string} cacheKey - Unique key to identify the query
* @param {string} queryType - Type of query (stats, products, orders, etc.)
* @param {Function} queryFn - Function to execute if cache miss
* @returns {Promise<any>} The query result
*/
async function getCachedQuery(cacheKey, queryType, queryFn) {
// Get cache duration based on query type
const cacheDuration = connectionPool.cacheDuration[queryType] || connectionPool.cacheDuration.default;
// Check if we have a valid cached result
const cachedResult = connectionPool.queryCache.get(cacheKey);
const now = Date.now();
if (cachedResult && (now - cachedResult.timestamp < cacheDuration)) {
console.log(`Cache hit for ${queryType} query: ${cacheKey}`);
return cachedResult.data;
}
// No valid cache found, execute the query
console.log(`Cache miss for ${queryType} query: ${cacheKey}`);
const result = await queryFn();
// Cache the result
connectionPool.queryCache.set(cacheKey, {
data: result,
timestamp: now
});
return result;
}
/**
* Setup SSH tunnel to production database
* @private - Should only be used by getDbConnection
* @returns {Promise<{ssh: object, stream: object, dbConfig: object}>}
*/
async function setupSshTunnel() {
const sshConfig = {
host: process.env.PROD_SSH_HOST,
port: process.env.PROD_SSH_PORT || 22,
username: process.env.PROD_SSH_USER,
privateKey: process.env.PROD_SSH_KEY_PATH
? fs.readFileSync(process.env.PROD_SSH_KEY_PATH)
: undefined,
compress: true
};
const dbConfig = {
host: process.env.PROD_DB_HOST || 'localhost',
user: process.env.PROD_DB_USER,
password: process.env.PROD_DB_PASSWORD,
database: process.env.PROD_DB_NAME,
port: process.env.PROD_DB_PORT || 3306,
timezone: 'Z'
};
return new Promise((resolve, reject) => {
const ssh = new Client();
ssh.on('error', (err) => {
console.error('SSH connection error:', err);
reject(err);
});
ssh.on('ready', () => {
ssh.forwardOut(
'127.0.0.1',
0,
dbConfig.host,
dbConfig.port,
(err, stream) => {
if (err) reject(err);
resolve({ ssh, stream, dbConfig });
}
);
}).connect(sshConfig);
});
}
/**
* Clear cached query results
* @param {string} [cacheKey] - Specific cache key to clear (clears all if not provided)
*/
function clearQueryCache(cacheKey) {
if (cacheKey) {
connectionPool.queryCache.delete(cacheKey);
console.log(`Cleared cache for key: ${cacheKey}`);
} else {
connectionPool.queryCache.clear();
console.log('Cleared all query cache');
}
}
/**
* Force close all active connections
* Useful for server shutdown or manual connection reset
*/
async function closeAllConnections() {
// Close all pooled connections
for (const conn of connectionPool.connections) {
try {
await conn.connection.end();
conn.ssh.end();
console.log('Closed pooled connection');
} catch (error) {
console.error('Error closing pooled connection:', error);
}
}
// Reset pool state
connectionPool.connections = [];
connectionPool.currentConnections = 0;
connectionPool.pendingRequests = [];
connectionPool.queryCache.clear();
console.log('All connections closed and pool reset');
}
/**
* Get connection pool status for debugging
*/
function getPoolStatus() {
return {
poolSize: connectionPool.connections.length,
activeConnections: connectionPool.currentConnections,
maxConnections: connectionPool.maxConnections,
pendingRequests: connectionPool.pendingRequests.length,
cacheSize: connectionPool.queryCache.size,
queuedRequests: connectionPool.pendingRequests.map(req => ({
waitTime: Date.now() - req.timestamp,
hasTimeout: !!req.timeoutId
}))
};
}
module.exports = {
getDbConnection,
getCachedQuery,
clearQueryCache,
closeAllConnections,
getPoolStatus
};