mirror of
https://github.com/kjanat/livedash-node.git
synced 2026-01-16 08:32:09 +01:00
- Add Redis cache implementation with LRU eviction - Enhance Content Security Policy with nonce generation - Optimize database queries with connection pooling - Add cache invalidation API endpoints - Improve security monitoring performance
546 lines
14 KiB
TypeScript
546 lines
14 KiB
TypeScript
/**
|
|
* Optimized Database Queries for OpenAI Batch Processing
|
|
*
|
|
* This module provides optimized versions of batch processing queries
|
|
* with improved performance through:
|
|
* - Reduced data fetching with selective includes
|
|
* - Company caching to eliminate redundant lookups
|
|
* - Batch operations to reduce N+1 queries
|
|
* - Query result pooling and reuse
|
|
*/
|
|
|
|
import {
|
|
AIBatchRequestStatus,
|
|
type AIProcessingRequest,
|
|
AIRequestStatus,
|
|
} from "@prisma/client";
|
|
import { BatchLogLevel, BatchOperation, batchLogger } from "./batchLogger";
|
|
import { Cache } from "./cache";
|
|
import { prisma } from "./prisma";
|
|
|
|
/**
|
|
* Cache for active companies to reduce database lookups
|
|
*/
|
|
interface CachedCompany {
|
|
id: string;
|
|
name: string;
|
|
cachedAt: number;
|
|
}
|
|
|
|
class CompanyCache {
|
|
private cache = new Map<string, CachedCompany>();
|
|
private allActiveCompanies: CachedCompany[] | null = null;
|
|
private allActiveCompaniesCachedAt = 0;
|
|
private readonly CACHE_TTL = 5 * 60 * 1000; // 5 minutes
|
|
private readonly REDIS_CACHE_KEY = "active-companies";
|
|
|
|
async getActiveCompanies(): Promise<CachedCompany[]> {
|
|
const now = Date.now();
|
|
|
|
// Try Redis cache first
|
|
const redisCachedCompanies = await Cache.get<CachedCompany[]>(
|
|
this.REDIS_CACHE_KEY
|
|
);
|
|
if (redisCachedCompanies && redisCachedCompanies.length > 0) {
|
|
this.allActiveCompanies = redisCachedCompanies;
|
|
this.allActiveCompaniesCachedAt = now;
|
|
return redisCachedCompanies;
|
|
}
|
|
|
|
// Fall back to in-memory cache
|
|
if (
|
|
this.allActiveCompanies &&
|
|
now - this.allActiveCompaniesCachedAt < this.CACHE_TTL
|
|
) {
|
|
return this.allActiveCompanies;
|
|
}
|
|
|
|
// Cache miss - fetch from database
|
|
const companies = await prisma.company.findMany({
|
|
where: { status: "ACTIVE" },
|
|
select: { id: true, name: true },
|
|
});
|
|
|
|
const cachedCompanies = companies.map((company) => ({
|
|
...company,
|
|
cachedAt: now,
|
|
}));
|
|
|
|
// Update both caches
|
|
this.allActiveCompanies = cachedCompanies;
|
|
this.allActiveCompaniesCachedAt = now;
|
|
|
|
// Cache in Redis with 5-minute TTL
|
|
await Cache.set(this.REDIS_CACHE_KEY, cachedCompanies, 300);
|
|
|
|
await batchLogger.log(
|
|
BatchLogLevel.DEBUG,
|
|
`Refreshed company cache with ${companies.length} active companies`,
|
|
{
|
|
operation: BatchOperation.SCHEDULER_ACTION,
|
|
requestCount: companies.length,
|
|
}
|
|
);
|
|
|
|
return cachedCompanies;
|
|
}
|
|
|
|
async invalidate(): Promise<void> {
|
|
this.cache.clear();
|
|
this.allActiveCompanies = null;
|
|
this.allActiveCompaniesCachedAt = 0;
|
|
|
|
// Clear Redis cache
|
|
await Cache.delete(this.REDIS_CACHE_KEY);
|
|
}
|
|
|
|
getStats() {
|
|
return {
|
|
isActive: this.allActiveCompanies !== null,
|
|
cachedAt: new Date(this.allActiveCompaniesCachedAt),
|
|
cacheSize: this.allActiveCompanies?.length || 0,
|
|
};
|
|
}
|
|
}
|
|
|
|
const companyCache = new CompanyCache();
|
|
|
|
/**
|
|
* Optimized version of getPendingBatchRequests with minimal data fetching
|
|
*/
|
|
export async function getPendingBatchRequestsOptimized(
|
|
companyId: string,
|
|
limit = 1000
|
|
): Promise<AIProcessingRequest[]> {
|
|
const startTime = Date.now();
|
|
|
|
// Use a more efficient query that only fetches what we need
|
|
const requests = await prisma.aIProcessingRequest.findMany({
|
|
where: {
|
|
session: { companyId },
|
|
processingStatus: AIRequestStatus.PENDING_BATCHING,
|
|
batchId: null,
|
|
},
|
|
// Only include essential session data, not all messages
|
|
include: {
|
|
session: {
|
|
select: {
|
|
id: true,
|
|
companyId: true,
|
|
// Only include message count, not full messages
|
|
_count: {
|
|
select: { messages: true },
|
|
},
|
|
},
|
|
},
|
|
},
|
|
take: limit,
|
|
orderBy: {
|
|
requestedAt: "asc",
|
|
},
|
|
});
|
|
|
|
const duration = Date.now() - startTime;
|
|
|
|
await batchLogger.log(
|
|
BatchLogLevel.DEBUG,
|
|
`Retrieved ${requests.length} pending batch requests for company ${companyId} in ${duration}ms`,
|
|
{
|
|
operation: BatchOperation.BATCH_CREATION,
|
|
companyId,
|
|
requestCount: requests.length,
|
|
duration,
|
|
}
|
|
);
|
|
|
|
return requests;
|
|
}
|
|
|
|
/**
|
|
* Batch operation to get pending requests for multiple companies
|
|
*/
|
|
type AIProcessingRequestWithSession = AIProcessingRequest & {
|
|
session: {
|
|
messages: Array<{
|
|
id: string;
|
|
order: number;
|
|
role: string;
|
|
content: string;
|
|
}>;
|
|
};
|
|
};
|
|
|
|
export async function getPendingBatchRequestsForAllCompanies(): Promise<
|
|
Map<string, AIProcessingRequestWithSession[]>
|
|
> {
|
|
const startTime = Date.now();
|
|
const companies = await companyCache.getActiveCompanies();
|
|
|
|
if (companies.length === 0) {
|
|
return new Map();
|
|
}
|
|
|
|
// Single query to get all pending requests for all companies with session messages
|
|
const allRequests = await prisma.aIProcessingRequest.findMany({
|
|
where: {
|
|
session: {
|
|
companyId: { in: companies.map((c) => c.id) },
|
|
},
|
|
processingStatus: AIRequestStatus.PENDING_BATCHING,
|
|
batchId: null,
|
|
},
|
|
include: {
|
|
session: {
|
|
include: {
|
|
messages: {
|
|
orderBy: { order: "asc" },
|
|
},
|
|
},
|
|
},
|
|
},
|
|
orderBy: { requestedAt: "asc" },
|
|
});
|
|
|
|
// Group requests by company
|
|
const requestsByCompany = new Map<string, AIProcessingRequestWithSession[]>();
|
|
for (const request of allRequests) {
|
|
const companyId = request.session?.companyId;
|
|
if (!companyId) continue;
|
|
|
|
if (!requestsByCompany.has(companyId)) {
|
|
requestsByCompany.set(companyId, []);
|
|
}
|
|
requestsByCompany.get(companyId)?.push(request);
|
|
}
|
|
|
|
const duration = Date.now() - startTime;
|
|
|
|
await batchLogger.log(
|
|
BatchLogLevel.INFO,
|
|
`Retrieved pending requests for ${companies.length} companies (${allRequests.length} total requests) in ${duration}ms`,
|
|
{
|
|
operation: BatchOperation.BATCH_CREATION,
|
|
requestCount: allRequests.length,
|
|
duration,
|
|
}
|
|
);
|
|
|
|
return requestsByCompany;
|
|
}
|
|
|
|
/**
|
|
* Optimized batch status checking for all companies
|
|
*/
|
|
export async function getInProgressBatchesForAllCompanies(): Promise<
|
|
Map<string, unknown[]>
|
|
> {
|
|
const startTime = Date.now();
|
|
const companies = await companyCache.getActiveCompanies();
|
|
|
|
if (companies.length === 0) {
|
|
return new Map();
|
|
}
|
|
|
|
// Single query for all companies
|
|
const allBatches = await prisma.aIBatchRequest.findMany({
|
|
where: {
|
|
companyId: { in: companies.map((c) => c.id) },
|
|
status: {
|
|
in: [
|
|
AIBatchRequestStatus.IN_PROGRESS,
|
|
AIBatchRequestStatus.VALIDATING,
|
|
AIBatchRequestStatus.FINALIZING,
|
|
],
|
|
},
|
|
},
|
|
select: {
|
|
id: true,
|
|
companyId: true,
|
|
openaiBatchId: true,
|
|
status: true,
|
|
createdAt: true,
|
|
},
|
|
});
|
|
|
|
// Group by company
|
|
const batchesByCompany = new Map<string, unknown[]>();
|
|
for (const batch of allBatches) {
|
|
if (!batchesByCompany.has(batch.companyId)) {
|
|
batchesByCompany.set(batch.companyId, []);
|
|
}
|
|
batchesByCompany.get(batch.companyId)?.push(batch);
|
|
}
|
|
|
|
const duration = Date.now() - startTime;
|
|
|
|
await batchLogger.log(
|
|
BatchLogLevel.DEBUG,
|
|
`Retrieved in-progress batches for ${companies.length} companies (${allBatches.length} total batches) in ${duration}ms`,
|
|
{
|
|
operation: BatchOperation.BATCH_STATUS_CHECK,
|
|
requestCount: allBatches.length,
|
|
duration,
|
|
}
|
|
);
|
|
|
|
return batchesByCompany;
|
|
}
|
|
|
|
/**
|
|
* Optimized completed batch processing for all companies
|
|
*/
|
|
export async function getCompletedBatchesForAllCompanies(): Promise<
|
|
Map<string, unknown[]>
|
|
> {
|
|
const startTime = Date.now();
|
|
const companies = await companyCache.getActiveCompanies();
|
|
|
|
if (companies.length === 0) {
|
|
return new Map();
|
|
}
|
|
|
|
// Single query for all companies with minimal includes
|
|
const allBatches = await prisma.aIBatchRequest.findMany({
|
|
where: {
|
|
companyId: { in: companies.map((c) => c.id) },
|
|
status: AIBatchRequestStatus.COMPLETED,
|
|
outputFileId: { not: null },
|
|
},
|
|
select: {
|
|
id: true,
|
|
companyId: true,
|
|
openaiBatchId: true,
|
|
outputFileId: true,
|
|
status: true,
|
|
createdAt: true,
|
|
// Only get request IDs, not full request data
|
|
processingRequests: {
|
|
select: {
|
|
id: true,
|
|
sessionId: true,
|
|
processingStatus: true,
|
|
},
|
|
},
|
|
},
|
|
});
|
|
|
|
// Group by company
|
|
const batchesByCompany = new Map<string, unknown[]>();
|
|
for (const batch of allBatches) {
|
|
if (!batchesByCompany.has(batch.companyId)) {
|
|
batchesByCompany.set(batch.companyId, []);
|
|
}
|
|
batchesByCompany.get(batch.companyId)?.push(batch);
|
|
}
|
|
|
|
const duration = Date.now() - startTime;
|
|
|
|
await batchLogger.log(
|
|
BatchLogLevel.DEBUG,
|
|
`Retrieved completed batches for ${companies.length} companies (${allBatches.length} total batches) in ${duration}ms`,
|
|
{
|
|
operation: BatchOperation.BATCH_RESULT_PROCESSING,
|
|
requestCount: allBatches.length,
|
|
duration,
|
|
}
|
|
);
|
|
|
|
return batchesByCompany;
|
|
}
|
|
|
|
/**
|
|
* Optimized failed request retry for all companies
|
|
*/
|
|
export async function getFailedRequestsForAllCompanies(
|
|
maxPerCompany = 10
|
|
): Promise<Map<string, AIProcessingRequest[]>> {
|
|
const startTime = Date.now();
|
|
const companies = await companyCache.getActiveCompanies();
|
|
|
|
if (companies.length === 0) {
|
|
return new Map();
|
|
}
|
|
|
|
// Get failed requests for all companies in a single query
|
|
const allFailedRequests = await prisma.aIProcessingRequest.findMany({
|
|
where: {
|
|
session: {
|
|
companyId: { in: companies.map((c) => c.id) },
|
|
},
|
|
processingStatus: AIRequestStatus.PROCESSING_FAILED,
|
|
},
|
|
include: {
|
|
session: {
|
|
select: {
|
|
id: true,
|
|
companyId: true,
|
|
_count: { select: { messages: true } },
|
|
},
|
|
},
|
|
},
|
|
orderBy: { requestedAt: "asc" },
|
|
});
|
|
|
|
// Group by company and limit per company
|
|
const requestsByCompany = new Map<string, AIProcessingRequest[]>();
|
|
for (const request of allFailedRequests) {
|
|
const companyId = request.session?.companyId;
|
|
if (!companyId) continue;
|
|
|
|
if (!requestsByCompany.has(companyId)) {
|
|
requestsByCompany.set(companyId, []);
|
|
}
|
|
|
|
const companyRequests = requestsByCompany.get(companyId);
|
|
if (!companyRequests) continue;
|
|
if (companyRequests.length < maxPerCompany) {
|
|
companyRequests.push(request);
|
|
}
|
|
}
|
|
|
|
const duration = Date.now() - startTime;
|
|
const totalRequests = Array.from(requestsByCompany.values()).reduce(
|
|
(sum, requests) => sum + requests.length,
|
|
0
|
|
);
|
|
|
|
await batchLogger.log(
|
|
BatchLogLevel.DEBUG,
|
|
`Retrieved failed requests for ${companies.length} companies (${totalRequests} total requests) in ${duration}ms`,
|
|
{
|
|
operation: BatchOperation.INDIVIDUAL_REQUEST_RETRY,
|
|
requestCount: totalRequests,
|
|
duration,
|
|
}
|
|
);
|
|
|
|
return requestsByCompany;
|
|
}
|
|
|
|
/**
|
|
* Optimized check for oldest pending request with minimal data
|
|
*/
|
|
export async function getOldestPendingRequestOptimized(
|
|
companyId: string
|
|
): Promise<{ requestedAt: Date } | null> {
|
|
const startTime = Date.now();
|
|
|
|
// Only fetch the timestamp we need
|
|
const oldestPending = await prisma.aIProcessingRequest.findFirst({
|
|
where: {
|
|
session: { companyId },
|
|
processingStatus: AIRequestStatus.PENDING_BATCHING,
|
|
},
|
|
select: { requestedAt: true },
|
|
orderBy: { requestedAt: "asc" },
|
|
});
|
|
|
|
const duration = Date.now() - startTime;
|
|
|
|
await batchLogger.log(
|
|
BatchLogLevel.DEBUG,
|
|
`Retrieved oldest pending request timestamp for company ${companyId} in ${duration}ms`,
|
|
{
|
|
operation: BatchOperation.SCHEDULER_ACTION,
|
|
companyId,
|
|
duration,
|
|
}
|
|
);
|
|
|
|
return oldestPending;
|
|
}
|
|
|
|
/**
|
|
* Batch statistics query optimization
|
|
*/
|
|
export async function getBatchProcessingStatsOptimized(
|
|
companyId?: string
|
|
): Promise<{
|
|
totalBatches: number;
|
|
pendingRequests: number;
|
|
inProgressBatches: number;
|
|
completedBatches: number;
|
|
failedRequests: number;
|
|
}> {
|
|
const startTime = Date.now();
|
|
|
|
const whereClause = companyId ? { companyId } : {};
|
|
|
|
// Use aggregation instead of loading individual records
|
|
const [
|
|
totalBatches,
|
|
pendingRequests,
|
|
inProgressBatches,
|
|
completedBatches,
|
|
failedRequests,
|
|
] = await Promise.all([
|
|
prisma.aIBatchRequest.count({ where: whereClause }),
|
|
prisma.aIProcessingRequest.count({
|
|
where: {
|
|
...(companyId && { session: { companyId } }),
|
|
processingStatus: AIRequestStatus.PENDING_BATCHING,
|
|
},
|
|
}),
|
|
prisma.aIBatchRequest.count({
|
|
where: {
|
|
...whereClause,
|
|
status: {
|
|
in: [
|
|
AIBatchRequestStatus.IN_PROGRESS,
|
|
AIBatchRequestStatus.VALIDATING,
|
|
AIBatchRequestStatus.FINALIZING,
|
|
],
|
|
},
|
|
},
|
|
}),
|
|
prisma.aIBatchRequest.count({
|
|
where: {
|
|
...whereClause,
|
|
status: AIBatchRequestStatus.COMPLETED,
|
|
},
|
|
}),
|
|
prisma.aIProcessingRequest.count({
|
|
where: {
|
|
...(companyId && { session: { companyId } }),
|
|
processingStatus: AIRequestStatus.PROCESSING_FAILED,
|
|
},
|
|
}),
|
|
]);
|
|
|
|
const duration = Date.now() - startTime;
|
|
const stats = {
|
|
totalBatches,
|
|
pendingRequests,
|
|
inProgressBatches,
|
|
completedBatches,
|
|
failedRequests,
|
|
};
|
|
|
|
await batchLogger.log(
|
|
BatchLogLevel.DEBUG,
|
|
`Retrieved batch processing stats ${companyId ? `for company ${companyId}` : "globally"} in ${duration}ms`,
|
|
{
|
|
operation: BatchOperation.SCHEDULER_ACTION,
|
|
companyId,
|
|
duration,
|
|
metadata: stats,
|
|
}
|
|
);
|
|
|
|
return stats;
|
|
}
|
|
|
|
/**
|
|
* Utility to invalidate company cache (call when companies are added/removed/status changed)
|
|
*/
|
|
export async function invalidateCompanyCache(): Promise<void> {
|
|
await companyCache.invalidate();
|
|
}
|
|
|
|
/**
|
|
* Get cache statistics for monitoring
|
|
*/
|
|
export function getCompanyCacheStats() {
|
|
return companyCache.getStats();
|
|
}
|