Files
livedash-node/lib/batchProcessor.ts
Kaj Kowalski 1eea2cc3e4 refactor: fix biome linting issues and update project documentation
- Fix 36+ biome linting issues reducing errors/warnings from 227 to 191
- Replace explicit 'any' types with proper TypeScript interfaces
- Fix React hooks dependencies and useCallback patterns
- Resolve unused variables and parameter assignment issues
- Improve accessibility with proper label associations
- Add comprehensive API documentation for admin and security features
- Update README.md with accurate PostgreSQL setup and current tech stack
- Create complete documentation for audit logging, CSP monitoring, and batch processing
- Fix outdated project information and missing developer workflows
2025-07-12 00:28:09 +02:00

1333 lines
35 KiB
TypeScript

/**
* OpenAI Batch API Processing Utilities
*
* This module implements Phase 1 of the AI Session Processing Pipeline refactor
* to use OpenAI's Batch API for cost-efficient processing of AI requests.
*
* Key benefits:
* - 50% cost reduction compared to real-time API calls
* - Better rate limiting and throughput management
* - Improved error handling and retry mechanisms
*/
import {
AIBatchRequestStatus,
type AIProcessingRequest,
AIRequestStatus,
} from "@prisma/client";
import { BatchLogLevel, BatchOperation, batchLogger } from "./batchLogger";
import { env } from "./env";
import { openAIMock } from "./mocks/openai-mock-server";
import { prisma } from "./prisma";
/**
* Configuration for batch processing with retry logic
*/
const BATCH_CONFIG = {
// Maximum number of requests per batch (OpenAI limit is 50,000)
MAX_REQUESTS_PER_BATCH: 1000,
// Minimum time to wait before checking batch status (in milliseconds)
MIN_STATUS_CHECK_INTERVAL: 60000, // 1 minute
// Maximum time to wait for a batch to complete (24 hours)
MAX_BATCH_TIMEOUT: 24 * 60 * 60 * 1000,
// Retry configuration
MAX_RETRIES: 3,
BASE_RETRY_DELAY: 1000, // 1 second
MAX_RETRY_DELAY: 30000, // 30 seconds
EXPONENTIAL_BACKOFF_MULTIPLIER: 2,
// Circuit breaker configuration
CIRCUIT_BREAKER_THRESHOLD: 5, // failures before opening circuit
CIRCUIT_BREAKER_TIMEOUT: 5 * 60 * 1000, // 5 minutes
// Request timeout
REQUEST_TIMEOUT: 60000, // 60 seconds
} as const;
/**
* Circuit breaker state for API operations
*/
class CircuitBreaker {
private failures = 0;
private lastFailureTime = 0;
private isOpen = false;
async execute<T>(operation: () => Promise<T>): Promise<T> {
if (this.isOpen) {
const now = Date.now();
if (now - this.lastFailureTime < BATCH_CONFIG.CIRCUIT_BREAKER_TIMEOUT) {
await batchLogger.logCircuitBreaker(
"batch_operation",
"OPEN",
this.failures,
BATCH_CONFIG.CIRCUIT_BREAKER_THRESHOLD
);
throw new CircuitBreakerOpenError("Circuit breaker is open");
}
// Half-open state - try to recover
this.isOpen = false;
this.failures = 0;
await batchLogger.logCircuitBreaker(
"batch_operation",
"HALF_OPEN",
this.failures,
BATCH_CONFIG.CIRCUIT_BREAKER_THRESHOLD
);
}
try {
const result = await operation();
if (this.failures > 0) {
await batchLogger.logCircuitBreaker(
"batch_operation",
"CLOSED",
0,
BATCH_CONFIG.CIRCUIT_BREAKER_THRESHOLD
);
}
this.failures = 0; // Reset on success
return result;
} catch (error) {
this.failures++;
this.lastFailureTime = Date.now();
if (this.failures >= BATCH_CONFIG.CIRCUIT_BREAKER_THRESHOLD) {
this.isOpen = true;
await batchLogger.logCircuitBreaker(
"batch_operation",
"OPEN",
this.failures,
BATCH_CONFIG.CIRCUIT_BREAKER_THRESHOLD
);
}
throw error;
}
}
isCircuitOpen(): boolean {
return this.isOpen;
}
getStatus() {
return {
isOpen: this.isOpen,
failures: this.failures,
lastFailureTime: this.lastFailureTime,
};
}
}
/**
* Custom error classes for better error handling
*/
class BatchProcessingError extends Error {
constructor(
message: string,
public readonly _cause?: Error
) {
super(message);
this.name = "BatchProcessingError";
}
}
class CircuitBreakerOpenError extends Error {
constructor(message: string) {
super(message);
this.name = "CircuitBreakerOpenError";
}
}
class RetryableError extends Error {
constructor(
message: string,
public readonly _isRetryable = true
) {
super(message);
this.name = "RetryableError";
}
}
class NonRetryableError extends Error {
constructor(message: string) {
super(message);
this.name = "NonRetryableError";
}
}
// Global circuit breakers for different operations
const fileUploadCircuitBreaker = new CircuitBreaker();
const batchCreationCircuitBreaker = new CircuitBreaker();
const batchStatusCircuitBreaker = new CircuitBreaker();
const fileDownloadCircuitBreaker = new CircuitBreaker();
/**
* Retry utility with exponential backoff
*/
async function retryWithBackoff<T>(
operation: () => Promise<T>,
operationName: string,
maxRetries = BATCH_CONFIG.MAX_RETRIES
): Promise<T> {
let lastError: Error;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
const result = await operation();
if (attempt > 0) {
console.log(`${operationName} succeeded on attempt ${attempt + 1}`);
}
return result;
} catch (error) {
lastError = error as Error;
// Don't retry non-retryable errors
if (
error instanceof NonRetryableError ||
error instanceof CircuitBreakerOpenError
) {
throw error;
}
// Check if error is retryable based on type
const isRetryable = isErrorRetryable(error as Error);
if (!isRetryable) {
throw new NonRetryableError(
`Non-retryable error in ${operationName}: ${(error as Error).message}`
);
}
if (attempt === maxRetries) {
throw new BatchProcessingError(
`${operationName} failed after ${maxRetries + 1} attempts`,
lastError
);
}
const delay = Math.min(
BATCH_CONFIG.BASE_RETRY_DELAY *
BATCH_CONFIG.EXPONENTIAL_BACKOFF_MULTIPLIER ** attempt,
BATCH_CONFIG.MAX_RETRY_DELAY
);
await batchLogger.logRetry(
BatchOperation.RETRY_OPERATION,
operationName,
attempt + 1,
maxRetries + 1,
delay,
error as Error
);
console.warn(
`${operationName} failed on attempt ${attempt + 1}, retrying in ${delay}ms:`,
(error as Error).message
);
await sleep(delay);
}
}
throw lastError!;
}
/**
* Determine if an error is retryable
*/
function isErrorRetryable(error: Error): boolean {
// Network errors are usually retryable
if (
error.message.includes("ECONNRESET") ||
error.message.includes("ETIMEDOUT") ||
error.message.includes("ENOTFOUND") ||
error.message.includes("socket hang up")
) {
return true;
}
// HTTP errors - check status codes
if (
error.message.includes("fetch failed") ||
error.message.includes("Failed to")
) {
// 5xx errors are retryable, 4xx errors are usually not
if (
error.message.includes("500") ||
error.message.includes("502") ||
error.message.includes("503") ||
error.message.includes("504") ||
error.message.includes("429")
) {
// Rate limit
return true;
}
// 4xx errors are usually not retryable
if (
error.message.includes("400") ||
error.message.includes("401") ||
error.message.includes("403") ||
error.message.includes("404")
) {
return false;
}
}
// Default to retryable for unknown errors
return true;
}
/**
* Sleep utility for delays
*/
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
/**
* Create a fetch request with timeout
*/
async function fetchWithTimeout(
url: string,
options: RequestInit = {},
timeout = BATCH_CONFIG.REQUEST_TIMEOUT
): Promise<Response> {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), timeout);
try {
const response = await fetch(url, {
...options,
signal: controller.signal,
});
clearTimeout(timeoutId);
return response;
} catch (error) {
clearTimeout(timeoutId);
if ((error as Error).name === "AbortError") {
throw new RetryableError(`Request timeout after ${timeout}ms`);
}
throw error;
}
}
/**
* Represents a single request in an OpenAI batch
*/
interface OpenAIBatchRequest {
custom_id: string;
method: "POST";
url: "/v1/chat/completions";
body: {
model: string;
messages: Array<{
role: string;
content: string;
}>;
temperature?: number;
max_tokens?: number;
};
}
/**
* OpenAI Batch API response format
*/
interface OpenAIBatchResponse {
id: string;
object: "batch";
endpoint: string;
errors: {
object: "list";
data: Array<{
code: string;
message: string;
param?: string;
type: string;
}>;
};
input_file_id: string;
completion_window: string;
status:
| "validating"
| "failed"
| "in_progress"
| "finalizing"
| "completed"
| "expired"
| "cancelling"
| "cancelled";
output_file_id?: string;
error_file_id?: string;
created_at: number;
in_progress_at?: number;
expires_at?: number;
finalizing_at?: number;
completed_at?: number;
failed_at?: number;
expired_at?: number;
cancelling_at?: number;
cancelled_at?: number;
request_counts: {
total: number;
completed: number;
failed: number;
};
metadata?: Record<string, string>;
}
/**
* Get pending AI processing requests that need to be batched
*/
export async function getPendingBatchRequests(
companyId: string,
limit: number = BATCH_CONFIG.MAX_REQUESTS_PER_BATCH
): Promise<AIProcessingRequest[]> {
return prisma.aIProcessingRequest.findMany({
where: {
session: {
companyId,
},
processingStatus: AIRequestStatus.PENDING_BATCHING,
batchId: null,
},
include: {
session: {
include: {
messages: {
orderBy: { order: "asc" },
},
},
},
},
take: limit,
orderBy: {
requestedAt: "asc",
},
}) as Promise<
(AIProcessingRequest & {
session: {
id: string;
companyId: string;
messages: Array<{
id: string;
role: string;
content: string;
order: number;
}>;
} | null;
})[]
>;
}
/**
* Create a new batch request and upload to OpenAI
*/
export async function createBatchRequest(
companyId: string,
requests: AIProcessingRequest[]
): Promise<string> {
if (requests.length === 0) {
throw new Error("Cannot create batch with no requests");
}
if (requests.length > BATCH_CONFIG.MAX_REQUESTS_PER_BATCH) {
throw new Error(
`Batch size ${requests.length} exceeds maximum of ${BATCH_CONFIG.MAX_REQUESTS_PER_BATCH}`
);
}
const _operationId = `batch-create-${crypto.randomUUID()}`;
try {
await batchLogger.log(
BatchLogLevel.INFO,
`Starting batch creation for company ${companyId} with ${requests.length} requests`,
{
operation: BatchOperation.BATCH_CREATION,
companyId,
requestCount: requests.length,
}
);
// Create batch requests in OpenAI format
const batchRequests: OpenAIBatchRequest[] = requests.map((request) => ({
custom_id: request.id,
method: "POST",
url: "/v1/chat/completions",
body: {
model: request.model,
messages: [
{
role: "system",
content: getSystemPromptForProcessingType(request.processingType),
},
{
role: "user",
content: formatMessagesForProcessing(
(request as any).session?.messages || []
),
},
],
temperature: 0.1,
max_tokens: 1000,
},
}));
// Convert to JSONL format for OpenAI
const jsonlContent = batchRequests
.map((req) => JSON.stringify(req))
.join("\n");
// Upload file to OpenAI
const fileResponse = await uploadFileToOpenAI(jsonlContent);
// Create batch on OpenAI
const batchResponse = await createOpenAIBatch(fileResponse.id);
// Store batch request in our database
const batchRequest = await prisma.aIBatchRequest.create({
data: {
companyId,
openaiBatchId: batchResponse.id,
inputFileId: fileResponse.id,
status: AIBatchRequestStatus.IN_PROGRESS,
processingRequests: {
connect: requests.map((req) => ({ id: req.id })),
},
},
});
// Update individual requests to mark them as batching
await prisma.aIProcessingRequest.updateMany({
where: {
id: {
in: requests.map((req) => req.id),
},
},
data: {
processingStatus: AIRequestStatus.BATCHING_IN_PROGRESS,
batchId: batchRequest.id,
},
});
await batchLogger.logBatchCreation(
companyId,
requests.length,
batchRequest.id,
batchResponse.id
);
return batchRequest.id;
} catch (error) {
await batchLogger.logBatchCreation(
companyId,
requests.length,
undefined,
undefined,
error as Error
);
throw error;
}
}
/**
* Check the status of all in-progress batches for a company
*/
export async function checkBatchStatuses(companyId: string): Promise<void> {
const startTime = Date.now();
const inProgressBatches = await prisma.aIBatchRequest.findMany({
where: {
companyId,
status: {
in: [
AIBatchRequestStatus.IN_PROGRESS,
AIBatchRequestStatus.VALIDATING,
AIBatchRequestStatus.FINALIZING,
],
},
},
});
await batchLogger.log(
BatchLogLevel.DEBUG,
`Checking status for ${inProgressBatches.length} batches in company ${companyId}`,
{
operation: BatchOperation.BATCH_STATUS_CHECK,
companyId,
requestCount: inProgressBatches.length,
}
);
// Process batches concurrently but with error isolation
const results = await Promise.allSettled(
inProgressBatches.map(async (batch) => {
try {
const statusBefore = batch.status;
const status = await retryWithBackoff(
() =>
batchStatusCircuitBreaker.execute(() =>
getOpenAIBatchStatus(batch.openaiBatchId)
),
`Check batch status ${batch.id}`
);
await updateBatchStatus(batch.id, status);
await batchLogger.logStatusCheck(
batch.id,
batch.openaiBatchId,
statusBefore,
status.status === "completed"
? AIBatchRequestStatus.COMPLETED
: status.status === "failed"
? AIBatchRequestStatus.FAILED
: statusBefore,
Date.now() - startTime
);
} catch (error) {
console.error(`Failed to check status for batch ${batch.id}:`, error);
// Mark batch as failed if circuit breaker is open or too many retries
if (
error instanceof CircuitBreakerOpenError ||
error instanceof BatchProcessingError
) {
await markBatchAsFailed(batch.id, (error as Error).message);
}
throw error;
}
})
);
// Log any failures
const failures = results.filter((result) => result.status === "rejected");
if (failures.length > 0) {
console.warn(
`${failures.length}/${inProgressBatches.length} batch status checks failed for company ${companyId}`
);
}
}
/**
* Process completed batches and extract results
*/
export async function processCompletedBatches(
companyId: string
): Promise<void> {
const completedBatches = await prisma.aIBatchRequest.findMany({
where: {
companyId,
status: AIBatchRequestStatus.COMPLETED,
outputFileId: {
not: null,
},
},
include: {
processingRequests: {
include: {
session: true,
},
},
},
});
// Process batches concurrently but with error isolation
const results = await Promise.allSettled(
completedBatches.map(async (batch) => {
try {
await retryWithBackoff(
() =>
fileDownloadCircuitBreaker.execute(() =>
processBatchResults(batch)
),
`Process batch results ${batch.id}`
);
} catch (error) {
console.error(
`Failed to process batch results for ${batch.id}:`,
error
);
// Mark batch as failed and handle failed requests
await markBatchAsFailed(batch.id, (error as Error).message);
// Mark individual requests as failed so they can be retried individually
await handleFailedBatchRequests(
batch.processingRequests,
(error as Error).message
);
throw error;
}
})
);
// Log any failures
const failures = results.filter((result) => result.status === "rejected");
if (failures.length > 0) {
console.warn(
`${failures.length}/${completedBatches.length} batch result processing failed for company ${companyId}`
);
}
}
/**
* Helper function to upload file content to OpenAI (real or mock)
*/
async function uploadFileToOpenAI(content: string): Promise<{ id: string }> {
if (env.OPENAI_MOCK_MODE) {
console.log(
`[OpenAI Mock] Uploading batch file with ${content.split("\n").length} requests`
);
return openAIMock.mockUploadFile({
file: content,
purpose: "batch",
});
}
return retryWithBackoff(
() =>
fileUploadCircuitBreaker.execute(async () => {
const formData = new FormData();
formData.append(
"file",
new Blob([content], { type: "application/jsonl" }),
"batch_requests.jsonl"
);
formData.append("purpose", "batch");
const response = await fetchWithTimeout(
"https://api.openai.com/v1/files",
{
method: "POST",
headers: {
Authorization: `Bearer ${process.env.OPENAI_API_KEY}`,
},
body: formData,
}
);
if (!response.ok) {
const errorText = await response.text().catch(() => "Unknown error");
if (response.status >= 400 && response.status < 500) {
throw new NonRetryableError(
`Failed to upload file: ${response.status} ${response.statusText} - ${errorText}`
);
}
throw new RetryableError(
`Failed to upload file: ${response.status} ${response.statusText} - ${errorText}`
);
}
return response.json();
}),
"Upload file to OpenAI"
);
}
/**
* Helper function to create a batch request on OpenAI (real or mock)
*/
async function createOpenAIBatch(
inputFileId: string
): Promise<OpenAIBatchResponse> {
if (env.OPENAI_MOCK_MODE) {
console.log(`[OpenAI Mock] Creating batch with input file ${inputFileId}`);
return openAIMock.mockCreateBatch({
input_file_id: inputFileId,
endpoint: "/v1/chat/completions",
completion_window: "24h",
});
}
return retryWithBackoff(
() =>
batchCreationCircuitBreaker.execute(async () => {
const response = await fetchWithTimeout(
"https://api.openai.com/v1/batches",
{
method: "POST",
headers: {
Authorization: `Bearer ${process.env.OPENAI_API_KEY}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
input_file_id: inputFileId,
endpoint: "/v1/chat/completions",
completion_window: "24h",
}),
}
);
if (!response.ok) {
const errorText = await response.text().catch(() => "Unknown error");
if (response.status >= 400 && response.status < 500) {
throw new NonRetryableError(
`Failed to create batch: ${response.status} ${response.statusText} - ${errorText}`
);
}
throw new RetryableError(
`Failed to create batch: ${response.status} ${response.statusText} - ${errorText}`
);
}
return response.json();
}),
"Create OpenAI batch"
);
}
/**
* Helper function to get batch status from OpenAI (real or mock)
*/
async function getOpenAIBatchStatus(
batchId: string
): Promise<OpenAIBatchResponse> {
if (env.OPENAI_MOCK_MODE) {
console.log(`[OpenAI Mock] Getting batch status for ${batchId}`);
return openAIMock.mockGetBatch(batchId);
}
const response = await fetchWithTimeout(
`https://api.openai.com/v1/batches/${batchId}`,
{
method: "GET",
headers: {
Authorization: `Bearer ${process.env.OPENAI_API_KEY}`,
},
}
);
if (!response.ok) {
const errorText = await response.text().catch(() => "Unknown error");
if (response.status >= 400 && response.status < 500) {
throw new NonRetryableError(
`Failed to get batch status: ${response.status} ${response.statusText} - ${errorText}`
);
}
throw new RetryableError(
`Failed to get batch status: ${response.status} ${response.statusText} - ${errorText}`
);
}
return response.json();
}
/**
* Update batch status in our database based on OpenAI response
*/
async function updateBatchStatus(
batchId: string,
openAIResponse: OpenAIBatchResponse
): Promise<void> {
const statusMapping: Record<string, AIBatchRequestStatus> = {
validating: AIBatchRequestStatus.VALIDATING,
failed: AIBatchRequestStatus.FAILED,
in_progress: AIBatchRequestStatus.IN_PROGRESS,
finalizing: AIBatchRequestStatus.FINALIZING,
completed: AIBatchRequestStatus.COMPLETED,
expired: AIBatchRequestStatus.FAILED,
cancelled: AIBatchRequestStatus.CANCELLED,
};
const ourStatus =
statusMapping[openAIResponse.status] || AIBatchRequestStatus.FAILED;
await prisma.aIBatchRequest.update({
where: { id: batchId },
data: {
status: ourStatus,
outputFileId: openAIResponse.output_file_id,
errorFileId: openAIResponse.error_file_id,
completedAt: openAIResponse.completed_at
? new Date(openAIResponse.completed_at * 1000)
: null,
},
});
}
/**
* Process results from a completed batch
*/
async function processBatchResults(batch: {
id: string;
outputFileId: string | null;
processingRequests: Array<{ sessionId: string }>;
}): Promise<void> {
const startTime = Date.now();
let successCount = 0;
let failureCount = 0;
if (!batch.outputFileId) {
throw new Error("No output file available for completed batch");
}
try {
await batchLogger.log(
BatchLogLevel.INFO,
`Starting result processing for batch ${batch.id}`,
{
operation: BatchOperation.BATCH_RESULT_PROCESSING,
batchId: batch.id,
requestCount: batch.processingRequests.length,
}
);
// Download results from OpenAI
const results = await downloadOpenAIFile(batch.outputFileId);
// Parse JSONL results
const resultLines = results.split("\n").filter((line) => line.trim());
for (const line of resultLines) {
try {
const result = JSON.parse(line);
const requestId = result.custom_id;
if (result.response?.body?.choices?.[0]?.message?.content) {
// Process successful result
await updateProcessingRequestWithResult(
requestId,
result.response.body
);
successCount++;
} else {
// Handle error result
await markProcessingRequestAsFailed(
requestId,
result.error?.message || "Unknown error"
);
failureCount++;
}
} catch (error) {
console.error("Failed to process batch result line:", error);
}
}
// Mark batch as processed
await prisma.aIBatchRequest.update({
where: { id: batch.id },
data: {
status: AIBatchRequestStatus.PROCESSED,
processedAt: new Date(),
},
});
const duration = Date.now() - startTime;
await batchLogger.logResultProcessing(
batch.id,
"processed",
successCount,
failureCount,
duration
);
} catch (error) {
const duration = Date.now() - startTime;
await batchLogger.logResultProcessing(
batch.id,
"error",
successCount,
failureCount,
duration,
error as Error
);
throw error;
}
}
/**
* Download file content from OpenAI (real or mock)
*/
async function downloadOpenAIFile(fileId: string): Promise<string> {
if (env.OPENAI_MOCK_MODE) {
console.log(`[OpenAI Mock] Downloading file content for ${fileId}`);
return openAIMock.mockGetFileContent(fileId);
}
return retryWithBackoff(
() =>
fileDownloadCircuitBreaker.execute(async () => {
const response = await fetchWithTimeout(
`https://api.openai.com/v1/files/${fileId}/content`,
{
method: "GET",
headers: {
Authorization: `Bearer ${process.env.OPENAI_API_KEY}`,
},
}
);
if (!response.ok) {
const errorText = await response.text().catch(() => "Unknown error");
if (response.status >= 400 && response.status < 500) {
throw new NonRetryableError(
`Failed to download file: ${response.status} ${response.statusText} - ${errorText}`
);
}
throw new RetryableError(
`Failed to download file: ${response.status} ${response.statusText} - ${errorText}`
);
}
return response.text();
}),
"Download file from OpenAI"
);
}
/**
* Update processing request with successful AI result
*/
async function updateProcessingRequestWithResult(
requestId: string,
aiResponse: {
usage: {
prompt_tokens: number;
completion_tokens: number;
total_tokens: number;
};
choices: Array<{
message: {
content: string;
};
}>;
}
): Promise<void> {
const usage = aiResponse.usage;
const content = aiResponse.choices[0].message.content;
try {
const parsedResult = JSON.parse(content);
// Update the processing request with usage data
await prisma.aIProcessingRequest.update({
where: { id: requestId },
data: {
processingStatus: AIRequestStatus.PROCESSING_COMPLETE,
success: true,
promptTokens: usage.prompt_tokens,
completionTokens: usage.completion_tokens,
totalTokens: usage.total_tokens,
completedAt: new Date(),
},
});
// Update the session with AI analysis results
const request = await prisma.aIProcessingRequest.findUnique({
where: { id: requestId },
include: { session: true },
});
if (request?.session) {
await prisma.session.update({
where: { id: request.sessionId },
data: {
summary: parsedResult.summary,
sentiment: parsedResult.sentiment,
category: parsedResult.category,
language: parsedResult.language,
},
});
}
} catch (error) {
console.error(`Failed to parse AI result for request ${requestId}:`, error);
await markProcessingRequestAsFailed(
requestId,
"Failed to parse AI response"
);
}
}
/**
* Mark processing request as failed
*/
async function markProcessingRequestAsFailed(
requestId: string,
errorMessage: string
): Promise<void> {
await prisma.aIProcessingRequest.update({
where: { id: requestId },
data: {
processingStatus: AIRequestStatus.PROCESSING_FAILED,
success: false,
errorMessage,
completedAt: new Date(),
},
});
}
/**
* Get system prompt based on processing type
*/
function getSystemPromptForProcessingType(processingType: string): string {
const prompts = {
sentiment_analysis:
'Analyze the sentiment of this conversation and respond with JSON containing: {"sentiment": "POSITIVE|NEUTRAL|NEGATIVE"}',
categorization:
'Categorize this conversation and respond with JSON containing: {"category": "CATEGORY_NAME"}',
summary:
'Summarize this conversation and respond with JSON containing: {"summary": "Brief summary"}',
full_analysis: `Analyze this conversation for sentiment, category, and provide a summary. Respond with JSON:
{
"sentiment": "POSITIVE|NEUTRAL|NEGATIVE",
"category": "SCHEDULE_HOURS|LEAVE_VACATION|SICK_LEAVE_RECOVERY|SALARY_COMPENSATION|CONTRACT_HOURS|ONBOARDING|OFFBOARDING|WORKWEAR_STAFF_PASS|TEAM_CONTACTS|PERSONAL_QUESTIONS|ACCESS_LOGIN|SOCIAL_QUESTIONS|UNRECOGNIZED_OTHER",
"summary": "Brief summary of the conversation",
"language": "en|de|fr|es|it|pt|nl|sv|da|no|fi|pl|cs|sk|hu|ro|bg|hr|sl|et|lv|lt|el|mt"
}`,
};
return (
prompts[processingType as keyof typeof prompts] || prompts.full_analysis
);
}
/**
* Format session messages for AI processing
*/
function formatMessagesForProcessing(
messages: Array<{
role: string;
content: string;
}>
): string {
return messages.map((msg) => `${msg.role}: ${msg.content}`).join("\n");
}
/**
* Get statistics about batch processing
*/
export async function getBatchProcessingStats(companyId: string) {
const stats = await prisma.aIBatchRequest.groupBy({
by: ["status"],
where: { companyId },
_count: true,
});
const pendingRequests = await prisma.aIProcessingRequest.count({
where: {
session: { companyId },
processingStatus: AIRequestStatus.PENDING_BATCHING,
},
});
return {
batchStats: stats.reduce(
(acc, stat) => {
acc[stat.status] = stat._count;
return acc;
},
{} as Record<string, number>
),
pendingRequests,
};
}
/**
* Mark a batch as failed and update all related requests
*/
async function markBatchAsFailed(
batchId: string,
errorMessage: string
): Promise<void> {
try {
await prisma.aIBatchRequest.update({
where: { id: batchId },
data: {
status: AIBatchRequestStatus.FAILED,
completedAt: new Date(),
},
});
// Mark all related processing requests as failed so they can be retried individually
await prisma.aIProcessingRequest.updateMany({
where: { batchId },
data: {
processingStatus: AIRequestStatus.PROCESSING_FAILED,
batchId: null, // Remove batch association so they can be retried
errorMessage: `Batch failed: ${errorMessage}`,
},
});
console.warn(`Marked batch ${batchId} as failed: ${errorMessage}`);
} catch (error) {
console.error(`Failed to mark batch ${batchId} as failed:`, error);
}
}
/**
* Handle failed batch requests by marking them for individual retry
*/
async function handleFailedBatchRequests(
requests: Array<{ sessionId: string }>,
errorMessage: string
): Promise<void> {
try {
const requestIds = requests.map((req) => req.sessionId);
// Reset requests to PENDING_BATCHING so they can be retried individually
await prisma.aIProcessingRequest.updateMany({
where: {
sessionId: { in: requestIds },
processingStatus: AIRequestStatus.BATCHING_IN_PROGRESS,
},
data: {
processingStatus: AIRequestStatus.PENDING_BATCHING,
batchId: null,
errorMessage: `Batch processing failed: ${errorMessage}`,
},
});
console.warn(
`Reset ${requestIds.length} requests for individual retry after batch failure`
);
} catch (error) {
console.error("Failed to handle failed batch requests:", error);
}
}
/**
* Retry failed individual requests using the regular OpenAI API
*/
export async function retryFailedRequests(
companyId: string,
_maxRetries = 5
): Promise<void> {
const failedRequests = await prisma.aIProcessingRequest.findMany({
where: {
session: { companyId },
processingStatus: AIRequestStatus.PROCESSING_FAILED,
},
include: {
session: {
include: {
messages: {
orderBy: { order: "asc" },
},
},
},
},
take: 10, // Process in small batches to avoid overwhelming the API
});
for (const request of failedRequests) {
try {
await retryWithBackoff(async () => {
// Process individual request using regular OpenAI API
const result = await processIndividualRequest(request);
await updateProcessingRequestWithResult(request.id, result);
}, `Retry individual request ${request.id}`);
// Mark as successful retry
console.log(`Successfully retried request ${request.id}`);
} catch (error) {
console.error(`Failed to retry request ${request.id}:`, error);
// Mark as permanently failed
await prisma.aIProcessingRequest.update({
where: { id: request.id },
data: {
processingStatus: AIRequestStatus.PROCESSING_FAILED,
errorMessage: `Final retry failed: ${(error as Error).message}`,
},
});
}
}
}
/**
* Process an individual request using the regular OpenAI API (fallback)
*/
async function processIndividualRequest(request: any): Promise<any> {
if (env.OPENAI_MOCK_MODE) {
console.log(`[OpenAI Mock] Processing individual request ${request.id}`);
return {
usage: { prompt_tokens: 100, completion_tokens: 50, total_tokens: 150 },
choices: [
{
message: {
content: JSON.stringify({
sentiment: "NEUTRAL",
category: "UNRECOGNIZED_OTHER",
summary: "Mock AI analysis result",
language: "en",
}),
},
},
],
};
}
const response = await fetchWithTimeout(
"https://api.openai.com/v1/chat/completions",
{
method: "POST",
headers: {
Authorization: `Bearer ${process.env.OPENAI_API_KEY}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
model: request.model,
messages: [
{
role: "system",
content: getSystemPromptForProcessingType(request.processingType),
},
{
role: "user",
content: formatMessagesForProcessing(
request.session?.messages || []
),
},
],
temperature: 0.1,
max_tokens: 1000,
}),
}
);
if (!response.ok) {
const errorText = await response.text().catch(() => "Unknown error");
if (response.status >= 400 && response.status < 500) {
throw new NonRetryableError(
`Individual request failed: ${response.status} ${response.statusText} - ${errorText}`
);
}
throw new RetryableError(
`Individual request failed: ${response.status} ${response.statusText} - ${errorText}`
);
}
return response.json();
}
/**
* Get circuit breaker status for monitoring
*/
export function getCircuitBreakerStatus() {
return {
fileUpload: fileUploadCircuitBreaker.getStatus(),
batchCreation: batchCreationCircuitBreaker.getStatus(),
batchStatus: batchStatusCircuitBreaker.getStatus(),
fileDownload: fileDownloadCircuitBreaker.getStatus(),
};
}
/**
* Reset circuit breakers (for manual recovery)
*/
export function resetCircuitBreakers(): void {
// Reset circuit breaker internal state by creating new instances
const resetCircuitBreaker = (breaker: CircuitBreaker) => {
(breaker as any).failures = 0;
(breaker as any).isOpen = false;
(breaker as any).lastFailureTime = 0;
};
resetCircuitBreaker(fileUploadCircuitBreaker);
resetCircuitBreaker(batchCreationCircuitBreaker);
resetCircuitBreaker(batchStatusCircuitBreaker);
resetCircuitBreaker(fileDownloadCircuitBreaker);
console.log("All circuit breakers have been reset");
}