mirror of
https://github.com/kjanat/livedash-node.git
synced 2026-01-16 07:52:10 +01:00
feat: implement comprehensive database connection pooling optimization
🎯 SESSION POOLING PERFORMANCE BREAKTHROUGH! ✅ Critical Issues Fixed: - Eliminated multiple PrismaClient instances across schedulers - Fixed connection pool exhaustion risk in processing modules - Implemented singleton pattern for all database connections - Added graceful shutdown and connection cleanup 🚀 Enhanced Pooling Features: - Dual-mode connection pooling (standard + enhanced) - PostgreSQL native pooling with @prisma/adapter-pg - Advanced connection monitoring and health checks - Configurable pool limits and timeouts via environment variables - Real-time connection statistics and metrics 📊 Performance Optimizations: - Single shared connection pool across all schedulers - Configurable connection limits (DATABASE_CONNECTION_LIMIT=20) - Idle timeout management (DATABASE_POOL_TIMEOUT=10) - Connection cycling and health validation - Process termination signal handling 🛠️ New Infrastructure: - lib/database-pool.ts - Advanced pooling configuration - app/api/admin/database-health/route.ts - Connection monitoring - Enhanced lib/prisma.ts with dual-mode support - Comprehensive documentation in docs/database-connection-pooling.md - Graceful shutdown handling in lib/schedulers.ts 🎛️ Environment Configuration: - USE_ENHANCED_POOLING=true for production optimization - DATABASE_CONNECTION_LIMIT for pool size control - DATABASE_POOL_TIMEOUT for idle connection management - Automatic enhanced pooling in production environments 📈 Expected Impact: - Eliminates connection pool exhaustion under load - Reduces memory footprint from idle connections - Improves scheduler performance and reliability - Enables better resource monitoring and debugging - Supports horizontal scaling with proper connection management Production-ready connection pooling with monitoring and health checks!
This commit is contained in:
122
lib/database-pool.ts
Normal file
122
lib/database-pool.ts
Normal file
@ -0,0 +1,122 @@
|
||||
// Advanced database connection pooling configuration
|
||||
|
||||
import { PrismaPg } from "@prisma/adapter-pg";
|
||||
import { PrismaClient } from "@prisma/client";
|
||||
import { Pool } from "pg";
|
||||
import { env } from "./env.js";
|
||||
|
||||
// Enhanced connection pool configuration
|
||||
const createConnectionPool = () => {
|
||||
// Parse DATABASE_URL to get connection parameters
|
||||
const databaseUrl = new URL(env.DATABASE_URL);
|
||||
|
||||
const pool = new Pool({
|
||||
host: databaseUrl.hostname,
|
||||
port: Number.parseInt(databaseUrl.port) || 5432,
|
||||
user: databaseUrl.username,
|
||||
password: databaseUrl.password,
|
||||
database: databaseUrl.pathname.slice(1), // Remove leading slash
|
||||
ssl: databaseUrl.searchParams.get("sslmode") !== "disable",
|
||||
|
||||
// Connection pool configuration
|
||||
max: env.DATABASE_CONNECTION_LIMIT, // Maximum number of connections
|
||||
min: 2, // Minimum number of connections to maintain
|
||||
idleTimeoutMillis: env.DATABASE_POOL_TIMEOUT * 1000, // Close idle connections after timeout
|
||||
connectionTimeoutMillis: 10000, // Connection timeout
|
||||
maxUses: 1000, // Maximum uses per connection before cycling
|
||||
allowExitOnIdle: true, // Allow process to exit when all connections are idle
|
||||
|
||||
// Health check configuration
|
||||
query_timeout: 30000, // Query timeout
|
||||
keepAlive: true,
|
||||
keepAliveInitialDelayMillis: 30000,
|
||||
});
|
||||
|
||||
// Connection pool event handlers
|
||||
pool.on("connect", (_client) => {
|
||||
console.log(
|
||||
`Database connection established. Active connections: ${pool.totalCount}`
|
||||
);
|
||||
});
|
||||
|
||||
pool.on("acquire", (_client) => {
|
||||
console.log(
|
||||
`Connection acquired from pool. Waiting: ${pool.waitingCount}, Idle: ${pool.idleCount}`
|
||||
);
|
||||
});
|
||||
|
||||
pool.on("release", (_client) => {
|
||||
console.log(
|
||||
`Connection released to pool. Active: ${pool.totalCount - pool.idleCount}, Idle: ${pool.idleCount}`
|
||||
);
|
||||
});
|
||||
|
||||
pool.on("error", (err) => {
|
||||
console.error("Database pool error:", err);
|
||||
});
|
||||
|
||||
pool.on("remove", (_client) => {
|
||||
console.log(
|
||||
`Connection removed from pool. Total connections: ${pool.totalCount}`
|
||||
);
|
||||
});
|
||||
|
||||
return pool;
|
||||
};
|
||||
|
||||
// Create adapter with connection pool
|
||||
export const createEnhancedPrismaClient = () => {
|
||||
const pool = createConnectionPool();
|
||||
const adapter = new PrismaPg(pool);
|
||||
|
||||
return new PrismaClient({
|
||||
adapter,
|
||||
log:
|
||||
process.env.NODE_ENV === "development"
|
||||
? ["query", "info", "warn", "error"]
|
||||
: ["error"],
|
||||
});
|
||||
};
|
||||
|
||||
// Connection pool monitoring utilities
|
||||
export const getPoolStats = (pool: Pool) => {
|
||||
return {
|
||||
totalConnections: pool.totalCount,
|
||||
idleConnections: pool.idleCount,
|
||||
waitingQueries: pool.waitingCount,
|
||||
activeConnections: pool.totalCount - pool.idleCount,
|
||||
};
|
||||
};
|
||||
|
||||
// Health check for the connection pool
|
||||
export const checkPoolHealth = async (
|
||||
pool: Pool
|
||||
): Promise<{
|
||||
healthy: boolean;
|
||||
stats: ReturnType<typeof getPoolStats>;
|
||||
error?: string;
|
||||
}> => {
|
||||
try {
|
||||
const client = await pool.connect();
|
||||
await client.query("SELECT 1");
|
||||
client.release();
|
||||
|
||||
return {
|
||||
healthy: true,
|
||||
stats: getPoolStats(pool),
|
||||
};
|
||||
} catch (error) {
|
||||
return {
|
||||
healthy: false,
|
||||
stats: getPoolStats(pool),
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
// Graceful pool shutdown
|
||||
export const shutdownPool = async (pool: Pool) => {
|
||||
console.log("Shutting down database connection pool...");
|
||||
await pool.end();
|
||||
console.log("Database connection pool shut down successfully.");
|
||||
};
|
||||
10
lib/env.ts
10
lib/env.ts
@ -103,6 +103,16 @@ export const env = {
|
||||
5
|
||||
),
|
||||
|
||||
// Database Connection Pooling
|
||||
DATABASE_CONNECTION_LIMIT: parseIntWithDefault(
|
||||
process.env.DATABASE_CONNECTION_LIMIT,
|
||||
20
|
||||
),
|
||||
DATABASE_POOL_TIMEOUT: parseIntWithDefault(
|
||||
process.env.DATABASE_POOL_TIMEOUT,
|
||||
10
|
||||
),
|
||||
|
||||
// Server
|
||||
PORT: parseIntWithDefault(process.env.PORT, 3000),
|
||||
} as const;
|
||||
|
||||
@ -1,19 +1,14 @@
|
||||
// SessionImport to Session processor
|
||||
import {
|
||||
PrismaClient,
|
||||
ProcessingStage,
|
||||
SentimentCategory,
|
||||
} from "@prisma/client";
|
||||
import { ProcessingStage, SentimentCategory } from "@prisma/client";
|
||||
import cron from "node-cron";
|
||||
import { getSchedulerConfig } from "./env";
|
||||
import { prisma } from "./prisma.js";
|
||||
import { ProcessingStatusManager } from "./processingStatusManager";
|
||||
import {
|
||||
fetchTranscriptContent,
|
||||
isValidTranscriptUrl,
|
||||
} from "./transcriptFetcher";
|
||||
|
||||
const prisma = new PrismaClient();
|
||||
|
||||
interface ImportRecord {
|
||||
id: string;
|
||||
companyId: string;
|
||||
|
||||
@ -1,20 +1,67 @@
|
||||
// Simple Prisma client setup
|
||||
// Enhanced Prisma client setup with connection pooling
|
||||
import { PrismaClient } from "@prisma/client";
|
||||
import { createEnhancedPrismaClient } from "./database-pool.js";
|
||||
import { env } from "./env.js";
|
||||
|
||||
// Add prisma to the NodeJS global type
|
||||
// This approach avoids NodeJS.Global which is not available
|
||||
|
||||
// Prevent multiple instances of Prisma Client in development
|
||||
declare const global: {
|
||||
prisma: PrismaClient | undefined;
|
||||
};
|
||||
|
||||
// Initialize Prisma Client
|
||||
const prisma = global.prisma || new PrismaClient();
|
||||
// Connection pooling configuration
|
||||
const createPrismaClient = () => {
|
||||
// Use enhanced pooling in production or when explicitly enabled
|
||||
const useEnhancedPooling =
|
||||
process.env.NODE_ENV === "production" ||
|
||||
process.env.USE_ENHANCED_POOLING === "true";
|
||||
|
||||
// Save in global if we're in development
|
||||
if (useEnhancedPooling) {
|
||||
console.log("Using enhanced database connection pooling with PG adapter");
|
||||
return createEnhancedPrismaClient();
|
||||
}
|
||||
|
||||
// Default Prisma client with basic configuration
|
||||
return new PrismaClient({
|
||||
log:
|
||||
process.env.NODE_ENV === "development"
|
||||
? ["query", "error", "warn"]
|
||||
: ["error"],
|
||||
datasources: {
|
||||
db: {
|
||||
url: env.DATABASE_URL,
|
||||
},
|
||||
},
|
||||
});
|
||||
};
|
||||
|
||||
// Initialize Prisma Client with singleton pattern
|
||||
const prisma = global.prisma || createPrismaClient();
|
||||
|
||||
// Save in global if we're in development to prevent multiple instances
|
||||
if (process.env.NODE_ENV !== "production") {
|
||||
global.prisma = prisma;
|
||||
}
|
||||
|
||||
// Graceful shutdown handling
|
||||
const gracefulShutdown = async () => {
|
||||
console.log("Gracefully disconnecting from database...");
|
||||
await prisma.$disconnect();
|
||||
process.exit(0);
|
||||
};
|
||||
|
||||
// Handle process termination signals
|
||||
process.on("SIGINT", gracefulShutdown);
|
||||
process.on("SIGTERM", gracefulShutdown);
|
||||
|
||||
// Connection health check
|
||||
export const checkDatabaseConnection = async (): Promise<boolean> => {
|
||||
try {
|
||||
await prisma.$queryRaw`SELECT 1`;
|
||||
return true;
|
||||
} catch (error) {
|
||||
console.error("Database connection failed:", error);
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
export { prisma };
|
||||
|
||||
@ -1,17 +1,16 @@
|
||||
// Enhanced session processing scheduler with AI cost tracking and question management
|
||||
|
||||
import {
|
||||
PrismaClient,
|
||||
ProcessingStage,
|
||||
type SentimentCategory,
|
||||
type SessionCategory,
|
||||
} from "@prisma/client";
|
||||
import cron from "node-cron";
|
||||
import fetch from "node-fetch";
|
||||
import { prisma } from "./prisma.js";
|
||||
import { ProcessingStatusManager } from "./processingStatusManager";
|
||||
import { getSchedulerConfig } from "./schedulerConfig";
|
||||
|
||||
const prisma = new PrismaClient();
|
||||
const OPENAI_API_KEY = process.env.OPENAI_API_KEY;
|
||||
const OPENAI_API_URL = "https://api.openai.com/v1/chat/completions";
|
||||
const DEFAULT_MODEL = process.env.OPENAI_MODEL || "gpt-4o";
|
||||
|
||||
@ -1,10 +1,5 @@
|
||||
import {
|
||||
PrismaClient,
|
||||
ProcessingStage,
|
||||
ProcessingStatus,
|
||||
} from "@prisma/client";
|
||||
|
||||
const prisma = new PrismaClient();
|
||||
import { ProcessingStage, ProcessingStatus } from "@prisma/client";
|
||||
import { prisma } from "./prisma.js";
|
||||
|
||||
// Type-safe metadata interfaces
|
||||
interface ProcessingMetadata {
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
// Combined scheduler initialization
|
||||
// Combined scheduler initialization with graceful shutdown
|
||||
|
||||
import { prisma } from "./prisma.js";
|
||||
import { startProcessingScheduler } from "./processingScheduler";
|
||||
import { startCsvImportScheduler } from "./scheduler";
|
||||
|
||||
@ -16,4 +17,44 @@ export function initializeSchedulers() {
|
||||
startProcessingScheduler();
|
||||
|
||||
console.log("All schedulers initialized successfully");
|
||||
|
||||
// Set up graceful shutdown for schedulers
|
||||
setupGracefulShutdown();
|
||||
}
|
||||
|
||||
/**
|
||||
* Set up graceful shutdown handling for schedulers and database connections
|
||||
*/
|
||||
function setupGracefulShutdown() {
|
||||
const shutdown = async (signal: string) => {
|
||||
console.log(`\nReceived ${signal}. Starting graceful shutdown...`);
|
||||
|
||||
try {
|
||||
// Disconnect from database
|
||||
await prisma.$disconnect();
|
||||
console.log("Database connections closed.");
|
||||
|
||||
// Exit the process
|
||||
process.exit(0);
|
||||
} catch (error) {
|
||||
console.error("Error during shutdown:", error);
|
||||
process.exit(1);
|
||||
}
|
||||
};
|
||||
|
||||
// Handle various termination signals
|
||||
process.on("SIGTERM", () => shutdown("SIGTERM"));
|
||||
process.on("SIGINT", () => shutdown("SIGINT"));
|
||||
process.on("SIGUSR2", () => shutdown("SIGUSR2")); // Nodemon restart
|
||||
|
||||
// Handle uncaught exceptions
|
||||
process.on("uncaughtException", (error) => {
|
||||
console.error("Uncaught Exception:", error);
|
||||
shutdown("uncaughtException");
|
||||
});
|
||||
|
||||
process.on("unhandledRejection", (reason, promise) => {
|
||||
console.error("Unhandled Rejection at:", promise, "reason:", reason);
|
||||
shutdown("unhandledRejection");
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user