From 8c8f36093605142ae510a2e6b274117a71e4f62c Mon Sep 17 00:00:00 2001 From: Kaj Kowalski Date: Sat, 5 Jul 2025 14:13:19 +0200 Subject: [PATCH] feat: implement OpenAI Batch API for cost-efficient AI processing - Add AIBatchRequest and AIRequestStatus models to Prisma schema - Create comprehensive batch processing system (lib/batchProcessor.ts) - Add intelligent batch scheduler with automated management - Update processing pipeline to use batch requests instead of direct API calls - Integrate batch scheduler into main server startup - Achieve 50% cost reduction on OpenAI API usage - Improve rate limiting and processing reliability --- lib/batchProcessor.ts | 547 +++++++++++++++++++++++++++++ lib/batchScheduler.ts | 284 +++++++++++++++ lib/processingScheduler.ts | 128 ++++++- prisma/schema.prisma | 94 ++++- server.ts | 2 + tests/lib/transcriptParser.test.ts | 8 +- 6 files changed, 1028 insertions(+), 35 deletions(-) create mode 100644 lib/batchProcessor.ts create mode 100644 lib/batchScheduler.ts diff --git a/lib/batchProcessor.ts b/lib/batchProcessor.ts new file mode 100644 index 0000000..43fe930 --- /dev/null +++ b/lib/batchProcessor.ts @@ -0,0 +1,547 @@ +/** + * OpenAI Batch API Processing Utilities + * + * This module implements Phase 1 of the AI Session Processing Pipeline refactor + * to use OpenAI's Batch API for cost-efficient processing of AI requests. + * + * Key benefits: + * - 50% cost reduction compared to real-time API calls + * - Better rate limiting and throughput management + * - Improved error handling and retry mechanisms + */ + +import { prisma } from "./prisma"; +import { AIBatchRequestStatus, AIRequestStatus, type AIProcessingRequest } from "@prisma/client"; + +/** + * Configuration for batch processing + */ +const BATCH_CONFIG = { + // Maximum number of requests per batch (OpenAI limit is 50,000) + MAX_REQUESTS_PER_BATCH: 1000, + // Minimum time to wait before checking batch status (in milliseconds) + MIN_STATUS_CHECK_INTERVAL: 60000, // 1 minute + // Maximum time to wait for a batch to complete (24 hours) + MAX_BATCH_TIMEOUT: 24 * 60 * 60 * 1000, +} as const; + +/** + * Represents a single request in an OpenAI batch + */ +interface OpenAIBatchRequest { + custom_id: string; + method: "POST"; + url: "/v1/chat/completions"; + body: { + model: string; + messages: Array<{ + role: string; + content: string; + }>; + temperature?: number; + max_tokens?: number; + }; +} + +/** + * OpenAI Batch API response format + */ +interface OpenAIBatchResponse { + id: string; + object: "batch"; + endpoint: string; + errors: { + object: "list"; + data: Array<{ + code: string; + message: string; + param?: string; + type: string; + }>; + }; + input_file_id: string; + completion_window: string; + status: "validating" | "failed" | "in_progress" | "finalizing" | "completed" | "expired" | "cancelling" | "cancelled"; + output_file_id?: string; + error_file_id?: string; + created_at: number; + in_progress_at?: number; + expires_at?: number; + finalizing_at?: number; + completed_at?: number; + failed_at?: number; + expired_at?: number; + cancelling_at?: number; + cancelled_at?: number; + request_counts: { + total: number; + completed: number; + failed: number; + }; + metadata?: Record; +} + +/** + * Get pending AI processing requests that need to be batched + */ +export async function getPendingBatchRequests( + companyId: string, + limit: number = BATCH_CONFIG.MAX_REQUESTS_PER_BATCH +): Promise { + return prisma.aIProcessingRequest.findMany({ + where: { + session: { + companyId, + }, + processingStatus: AIRequestStatus.PENDING_BATCHING, + batchId: null, + }, + include: { + session: { + include: { + messages: { + orderBy: { order: "asc" }, + }, + }, + }, + }, + take: limit, + orderBy: { + requestedAt: "asc", + }, + }) as Promise<(AIProcessingRequest & { + session: { + id: string; + companyId: string; + messages: Array<{ + id: string; + role: string; + content: string; + order: number; + }>; + } | null; + })[]>; +} + +/** + * Create a new batch request and upload to OpenAI + */ +export async function createBatchRequest( + companyId: string, + requests: AIProcessingRequest[] +): Promise { + if (requests.length === 0) { + throw new Error("Cannot create batch with no requests"); + } + + if (requests.length > BATCH_CONFIG.MAX_REQUESTS_PER_BATCH) { + throw new Error(`Batch size ${requests.length} exceeds maximum of ${BATCH_CONFIG.MAX_REQUESTS_PER_BATCH}`); + } + + // Create batch requests in OpenAI format + const batchRequests: OpenAIBatchRequest[] = requests.map((request) => ({ + custom_id: request.id, + method: "POST", + url: "/v1/chat/completions", + body: { + model: request.model, + messages: [ + { + role: "system", + content: getSystemPromptForProcessingType(request.processingType), + }, + { + role: "user", + content: formatMessagesForProcessing(request.session?.messages || []), + }, + ], + temperature: 0.1, + max_tokens: 1000, + }, + })); + + // Convert to JSONL format for OpenAI + const jsonlContent = batchRequests + .map((req) => JSON.stringify(req)) + .join("\n"); + + // Upload file to OpenAI + const fileResponse = await uploadFileToOpenAI(jsonlContent); + + // Create batch on OpenAI + const batchResponse = await createOpenAIBatch(fileResponse.id); + + // Store batch request in our database + const batchRequest = await prisma.aIBatchRequest.create({ + data: { + companyId, + openaiBatchId: batchResponse.id, + inputFileId: fileResponse.id, + status: AIBatchRequestStatus.IN_PROGRESS, + processingRequests: { + connect: requests.map((req) => ({ id: req.id })), + }, + }, + }); + + // Update individual requests to mark them as batching + await prisma.aIProcessingRequest.updateMany({ + where: { + id: { + in: requests.map((req) => req.id), + }, + }, + data: { + processingStatus: AIRequestStatus.BATCHING_IN_PROGRESS, + batchId: batchRequest.id, + }, + }); + + return batchRequest.id; +} + +/** + * Check the status of all in-progress batches for a company + */ +export async function checkBatchStatuses(companyId: string): Promise { + const inProgressBatches = await prisma.aIBatchRequest.findMany({ + where: { + companyId, + status: { + in: [ + AIBatchRequestStatus.IN_PROGRESS, + AIBatchRequestStatus.VALIDATING, + AIBatchRequestStatus.FINALIZING, + ], + }, + }, + }); + + for (const batch of inProgressBatches) { + try { + const status = await getOpenAIBatchStatus(batch.openaiBatchId); + await updateBatchStatus(batch.id, status); + } catch (error) { + console.error(`Failed to check status for batch ${batch.id}:`, error); + } + } +} + +/** + * Process completed batches and extract results + */ +export async function processCompletedBatches(companyId: string): Promise { + const completedBatches = await prisma.aIBatchRequest.findMany({ + where: { + companyId, + status: AIBatchRequestStatus.COMPLETED, + outputFileId: { + not: null, + }, + }, + include: { + processingRequests: { + include: { + session: true, + }, + }, + }, + }); + + for (const batch of completedBatches) { + try { + await processBatchResults(batch); + } catch (error) { + console.error(`Failed to process batch results for ${batch.id}:`, error); + await prisma.aIBatchRequest.update({ + where: { id: batch.id }, + data: { status: AIBatchRequestStatus.FAILED }, + }); + } + } +} + +/** + * Helper function to upload file content to OpenAI + */ +async function uploadFileToOpenAI(content: string): Promise<{ id: string }> { + const formData = new FormData(); + formData.append("file", new Blob([content], { type: "application/jsonl" }), "batch_requests.jsonl"); + formData.append("purpose", "batch"); + + const response = await fetch("https://api.openai.com/v1/files", { + method: "POST", + headers: { + "Authorization": `Bearer ${process.env.OPENAI_API_KEY}`, + }, + body: formData, + }); + + if (!response.ok) { + throw new Error(`Failed to upload file: ${response.statusText}`); + } + + return response.json(); +} + +/** + * Helper function to create a batch request on OpenAI + */ +async function createOpenAIBatch(inputFileId: string): Promise { + const response = await fetch("https://api.openai.com/v1/batches", { + method: "POST", + headers: { + "Authorization": `Bearer ${process.env.OPENAI_API_KEY}`, + "Content-Type": "application/json", + }, + body: JSON.stringify({ + input_file_id: inputFileId, + endpoint: "/v1/chat/completions", + completion_window: "24h", + }), + }); + + if (!response.ok) { + throw new Error(`Failed to create batch: ${response.statusText}`); + } + + return response.json(); +} + +/** + * Helper function to get batch status from OpenAI + */ +async function getOpenAIBatchStatus(batchId: string): Promise { + const response = await fetch(`https://api.openai.com/v1/batches/${batchId}`, { + method: "GET", + headers: { + "Authorization": `Bearer ${process.env.OPENAI_API_KEY}`, + }, + }); + + if (!response.ok) { + throw new Error(`Failed to get batch status: ${response.statusText}`); + } + + return response.json(); +} + +/** + * Update batch status in our database based on OpenAI response + */ +async function updateBatchStatus(batchId: string, openAIResponse: OpenAIBatchResponse): Promise { + const statusMapping: Record = { + validating: AIBatchRequestStatus.VALIDATING, + failed: AIBatchRequestStatus.FAILED, + in_progress: AIBatchRequestStatus.IN_PROGRESS, + finalizing: AIBatchRequestStatus.FINALIZING, + completed: AIBatchRequestStatus.COMPLETED, + expired: AIBatchRequestStatus.FAILED, + cancelled: AIBatchRequestStatus.CANCELLED, + }; + + const ourStatus = statusMapping[openAIResponse.status] || AIBatchRequestStatus.FAILED; + + await prisma.aIBatchRequest.update({ + where: { id: batchId }, + data: { + status: ourStatus, + outputFileId: openAIResponse.output_file_id, + errorFileId: openAIResponse.error_file_id, + completedAt: openAIResponse.completed_at ? new Date(openAIResponse.completed_at * 1000) : null, + }, + }); +} + +/** + * Process results from a completed batch + */ +async function processBatchResults(batch: { + id: string; + outputFileId: string | null; + processingRequests: Array<{ sessionId: string }>; +}): Promise { + if (!batch.outputFileId) { + throw new Error("No output file available for completed batch"); + } + + // Download results from OpenAI + const results = await downloadOpenAIFile(batch.outputFileId); + + // Parse JSONL results + const resultLines = results.split("\n").filter(line => line.trim()); + + for (const line of resultLines) { + try { + const result = JSON.parse(line); + const requestId = result.custom_id; + + if (result.response?.body?.choices?.[0]?.message?.content) { + // Process successful result + await updateProcessingRequestWithResult(requestId, result.response.body); + } else { + // Handle error result + await markProcessingRequestAsFailed(requestId, result.error?.message || "Unknown error"); + } + } catch (error) { + console.error("Failed to process batch result line:", error); + } + } + + // Mark batch as processed + await prisma.aIBatchRequest.update({ + where: { id: batch.id }, + data: { + status: AIBatchRequestStatus.PROCESSED, + processedAt: new Date(), + }, + }); +} + +/** + * Download file content from OpenAI + */ +async function downloadOpenAIFile(fileId: string): Promise { + const response = await fetch(`https://api.openai.com/v1/files/${fileId}/content`, { + method: "GET", + headers: { + "Authorization": `Bearer ${process.env.OPENAI_API_KEY}`, + }, + }); + + if (!response.ok) { + throw new Error(`Failed to download file: ${response.statusText}`); + } + + return response.text(); +} + +/** + * Update processing request with successful AI result + */ +async function updateProcessingRequestWithResult(requestId: string, aiResponse: { + usage: { + prompt_tokens: number; + completion_tokens: number; + total_tokens: number; + }; + choices: Array<{ + message: { + content: string; + }; + }>; +}): Promise { + const usage = aiResponse.usage; + const content = aiResponse.choices[0].message.content; + + try { + const parsedResult = JSON.parse(content); + + // Update the processing request with usage data + await prisma.aIProcessingRequest.update({ + where: { id: requestId }, + data: { + processingStatus: AIRequestStatus.PROCESSING_COMPLETE, + success: true, + promptTokens: usage.prompt_tokens, + completionTokens: usage.completion_tokens, + totalTokens: usage.total_tokens, + completedAt: new Date(), + }, + }); + + // Update the session with AI analysis results + const request = await prisma.aIProcessingRequest.findUnique({ + where: { id: requestId }, + include: { session: true }, + }); + + if (request?.session) { + await prisma.session.update({ + where: { id: request.sessionId }, + data: { + summary: parsedResult.summary, + sentiment: parsedResult.sentiment, + category: parsedResult.category, + language: parsedResult.language, + }, + }); + } + } catch (error) { + console.error(`Failed to parse AI result for request ${requestId}:`, error); + await markProcessingRequestAsFailed(requestId, "Failed to parse AI response"); + } +} + +/** + * Mark processing request as failed + */ +async function markProcessingRequestAsFailed(requestId: string, errorMessage: string): Promise { + await prisma.aIProcessingRequest.update({ + where: { id: requestId }, + data: { + processingStatus: AIRequestStatus.PROCESSING_FAILED, + success: false, + errorMessage, + completedAt: new Date(), + }, + }); +} + +/** + * Get system prompt based on processing type + */ +function getSystemPromptForProcessingType(processingType: string): string { + const prompts = { + sentiment_analysis: "Analyze the sentiment of this conversation and respond with JSON containing: {\"sentiment\": \"POSITIVE|NEUTRAL|NEGATIVE\"}", + categorization: "Categorize this conversation and respond with JSON containing: {\"category\": \"CATEGORY_NAME\"}", + summary: "Summarize this conversation and respond with JSON containing: {\"summary\": \"Brief summary\"}", + full_analysis: `Analyze this conversation for sentiment, category, and provide a summary. Respond with JSON: +{ + "sentiment": "POSITIVE|NEUTRAL|NEGATIVE", + "category": "SCHEDULE_HOURS|LEAVE_VACATION|SICK_LEAVE_RECOVERY|SALARY_COMPENSATION|CONTRACT_HOURS|ONBOARDING|OFFBOARDING|WORKWEAR_STAFF_PASS|TEAM_CONTACTS|PERSONAL_QUESTIONS|ACCESS_LOGIN|SOCIAL_QUESTIONS|UNRECOGNIZED_OTHER", + "summary": "Brief summary of the conversation", + "language": "en|de|fr|es|it|pt|nl|sv|da|no|fi|pl|cs|sk|hu|ro|bg|hr|sl|et|lv|lt|el|mt" +}`, + }; + + return prompts[processingType as keyof typeof prompts] || prompts.full_analysis; +} + +/** + * Format session messages for AI processing + */ +function formatMessagesForProcessing(messages: Array<{ + role: string; + content: string; +}>): string { + return messages + .map((msg) => `${msg.role}: ${msg.content}`) + .join("\n"); +} + +/** + * Get statistics about batch processing + */ +export async function getBatchProcessingStats(companyId: string) { + const stats = await prisma.aIBatchRequest.groupBy({ + by: ["status"], + where: { companyId }, + _count: true, + }); + + const pendingRequests = await prisma.aIProcessingRequest.count({ + where: { + session: { companyId }, + processingStatus: AIRequestStatus.PENDING_BATCHING, + }, + }); + + return { + batchStats: stats.reduce((acc, stat) => { + acc[stat.status] = stat._count; + return acc; + }, {} as Record), + pendingRequests, + }; +} \ No newline at end of file diff --git a/lib/batchScheduler.ts b/lib/batchScheduler.ts new file mode 100644 index 0000000..dc36fa3 --- /dev/null +++ b/lib/batchScheduler.ts @@ -0,0 +1,284 @@ +/** + * OpenAI Batch Processing Scheduler + * + * This scheduler manages the lifecycle of OpenAI batch requests: + * 1. Creates new batches from pending requests + * 2. Checks status of in-progress batches + * 3. Processes completed batch results + */ + +import cron, { type ScheduledTask } from "node-cron"; +import { + getPendingBatchRequests, + createBatchRequest, + checkBatchStatuses, + processCompletedBatches, + getBatchProcessingStats +} from "./batchProcessor"; +import { prisma } from "./prisma"; +import { getSchedulerConfig } from "./schedulerConfig"; + +/** + * Configuration for batch scheduler intervals + */ +const SCHEDULER_CONFIG = { + // Check for new batches to create every 5 minutes + CREATE_BATCHES_INTERVAL: "*/5 * * * *", + // Check batch statuses every 2 minutes + CHECK_STATUS_INTERVAL: "*/2 * * * *", + // Process completed batches every minute + PROCESS_RESULTS_INTERVAL: "* * * * *", + // Minimum batch size to trigger creation + MIN_BATCH_SIZE: 10, + // Maximum time to wait before creating a batch (even if under min size) + MAX_WAIT_TIME_MINUTES: 30, +} as const; + +let createBatchesTask: ScheduledTask | null = null; +let checkStatusTask: ScheduledTask | null = null; +let processResultsTask: ScheduledTask | null = null; + +/** + * Start the batch processing scheduler + */ +export function startBatchScheduler(): void { + const config = getSchedulerConfig(); + + if (!config.enabled) { + console.log("Batch scheduler disabled by configuration"); + return; + } + + if (!process.env.OPENAI_API_KEY) { + console.log("Batch scheduler disabled: OPENAI_API_KEY not configured"); + return; + } + + console.log("Starting OpenAI Batch Processing Scheduler..."); + + // Schedule batch creation + createBatchesTask = cron.schedule( + SCHEDULER_CONFIG.CREATE_BATCHES_INTERVAL, + async () => { + try { + await createBatchesForAllCompanies(); + } catch (error) { + console.error("Error in batch creation scheduler:", error); + } + } + ); + + // Schedule status checking + checkStatusTask = cron.schedule( + SCHEDULER_CONFIG.CHECK_STATUS_INTERVAL, + async () => { + try { + await checkBatchStatusesForAllCompanies(); + } catch (error) { + console.error("Error in batch status checker:", error); + } + } + ); + + // Schedule result processing + processResultsTask = cron.schedule( + SCHEDULER_CONFIG.PROCESS_RESULTS_INTERVAL, + async () => { + try { + await processCompletedBatchesForAllCompanies(); + } catch (error) { + console.error("Error in batch result processor:", error); + } + } + ); + + // Start all tasks + createBatchesTask.start(); + checkStatusTask.start(); + processResultsTask.start(); + + console.log("Batch scheduler started successfully"); +} + +/** + * Stop the batch processing scheduler + */ +export function stopBatchScheduler(): void { + console.log("Stopping batch scheduler..."); + + if (createBatchesTask) { + createBatchesTask.stop(); + createBatchesTask.destroy(); + createBatchesTask = null; + } + + if (checkStatusTask) { + checkStatusTask.stop(); + checkStatusTask.destroy(); + checkStatusTask = null; + } + + if (processResultsTask) { + processResultsTask.stop(); + processResultsTask.destroy(); + processResultsTask = null; + } + + console.log("Batch scheduler stopped"); +} + +/** + * Create batches for all active companies + */ +async function createBatchesForAllCompanies(): Promise { + try { + const companies = await prisma.company.findMany({ + where: { status: "ACTIVE" }, + select: { id: true, name: true }, + }); + + for (const company of companies) { + await createBatchesForCompany(company.id); + } + } catch (error) { + console.error("Failed to create batches for companies:", error); + } +} + +/** + * Create batches for a specific company if conditions are met + */ +async function createBatchesForCompany(companyId: string): Promise { + try { + const pendingRequests = await getPendingBatchRequests(companyId); + + if (pendingRequests.length === 0) { + return; // No pending requests + } + + // Check if we should create a batch + const shouldCreateBatch = await shouldCreateBatchForCompany(companyId, pendingRequests.length); + + if (!shouldCreateBatch) { + return; // Wait for more requests or more time + } + + console.log(`Creating batch for company ${companyId} with ${pendingRequests.length} requests`); + + const batchId = await createBatchRequest(companyId, pendingRequests); + + console.log(`Successfully created batch ${batchId} for company ${companyId}`); + } catch (error) { + console.error(`Failed to create batch for company ${companyId}:`, error); + } +} + +/** + * Determine if a batch should be created for a company + */ +async function shouldCreateBatchForCompany(companyId: string, pendingCount: number): Promise { + // Always create if we have enough requests + if (pendingCount >= SCHEDULER_CONFIG.MIN_BATCH_SIZE) { + return true; + } + + // Check if oldest pending request is old enough to trigger batch creation + const oldestPending = await prisma.aIProcessingRequest.findFirst({ + where: { + session: { companyId }, + processingStatus: "PENDING_BATCHING", + }, + orderBy: { requestedAt: "asc" }, + }); + + if (!oldestPending) { + return false; + } + + const waitTimeMs = Date.now() - oldestPending.requestedAt.getTime(); + const maxWaitTimeMs = SCHEDULER_CONFIG.MAX_WAIT_TIME_MINUTES * 60 * 1000; + + return waitTimeMs >= maxWaitTimeMs; +} + +/** + * Check batch statuses for all companies + */ +async function checkBatchStatusesForAllCompanies(): Promise { + try { + const companies = await prisma.company.findMany({ + where: { status: "ACTIVE" }, + select: { id: true }, + }); + + for (const company of companies) { + await checkBatchStatuses(company.id); + } + } catch (error) { + console.error("Failed to check batch statuses:", error); + } +} + +/** + * Process completed batches for all companies + */ +async function processCompletedBatchesForAllCompanies(): Promise { + try { + const companies = await prisma.company.findMany({ + where: { status: "ACTIVE" }, + select: { id: true }, + }); + + for (const company of companies) { + await processCompletedBatches(company.id); + } + } catch (error) { + console.error("Failed to process completed batches:", error); + } +} + +/** + * Get batch processing statistics for monitoring + */ +export async function getAllBatchStats() { + try { + const companies = await prisma.company.findMany({ + where: { status: "ACTIVE" }, + select: { id: true, name: true }, + }); + + const stats = await Promise.all( + companies.map(async (company) => ({ + companyId: company.id, + companyName: company.name, + ...(await getBatchProcessingStats(company.id)), + })) + ); + + return stats; + } catch (error) { + console.error("Failed to get batch stats:", error); + return []; + } +} + +/** + * Force create batches for a specific company (for manual triggering) + */ +export async function forceBatchCreation(companyId: string): Promise { + console.log(`Force creating batch for company ${companyId}`); + await createBatchesForCompany(companyId); +} + +/** + * Get current scheduler status + */ +export function getBatchSchedulerStatus() { + return { + isRunning: !!(createBatchesTask && checkStatusTask && processResultsTask), + createBatchesRunning: !!createBatchesTask, + checkStatusRunning: !!checkStatusTask, + processResultsRunning: !!processResultsTask, + config: SCHEDULER_CONFIG, + }; +} \ No newline at end of file diff --git a/lib/processingScheduler.ts b/lib/processingScheduler.ts index edbd76f..3dd73dd 100644 --- a/lib/processingScheduler.ts +++ b/lib/processingScheduler.ts @@ -4,6 +4,7 @@ import { ProcessingStage, type SentimentCategory, type SessionCategory, + AIRequestStatus, } from "@prisma/client"; import cron from "node-cron"; import fetch from "node-fetch"; @@ -651,20 +652,20 @@ async function processSessionsInParallel( } /** - * Process unprocessed sessions using the new processing status system + * Process unprocessed sessions using the new batch processing system */ export async function processUnprocessedSessions( batchSize: number | null = null, - maxConcurrency = 5 + _maxConcurrency = 5 ): Promise { process.stdout.write( - "[ProcessingScheduler] Starting to process sessions needing AI analysis...\n" + "[ProcessingScheduler] Starting to create batch requests for sessions needing AI analysis...\n" ); try { await withRetry( async () => { - await processUnprocessedSessionsInternal(batchSize, maxConcurrency); + await createBatchRequestsForSessions(batchSize); }, { maxRetries: 3, @@ -680,7 +681,7 @@ export async function processUnprocessedSessions( } } -async function processUnprocessedSessionsInternal( +async function _processUnprocessedSessionsInternal( batchSize: number | null = null, maxConcurrency = 5 ): Promise { @@ -757,14 +758,16 @@ async function processUnprocessedSessionsInternal( */ export async function getAIProcessingCosts(): Promise<{ totalCostEur: number; + totalRequests: number; + totalPromptTokens: number; + totalCompletionTokens: number; totalTokens: number; - requestCount: number; - successfulRequests: number; - failedRequests: number; }> { const result = await prisma.aIProcessingRequest.aggregate({ _sum: { totalCostEur: true, + promptTokens: true, + completionTokens: true, totalTokens: true, }, _count: { @@ -772,20 +775,12 @@ export async function getAIProcessingCosts(): Promise<{ }, }); - const successfulRequests = await prisma.aIProcessingRequest.count({ - where: { success: true }, - }); - - const failedRequests = await prisma.aIProcessingRequest.count({ - where: { success: false }, - }); - return { totalCostEur: result._sum.totalCostEur || 0, + totalRequests: result._count.id || 0, + totalPromptTokens: result._sum.promptTokens || 0, + totalCompletionTokens: result._sum.completionTokens || 0, totalTokens: result._sum.totalTokens || 0, - requestCount: result._count.id || 0, - successfulRequests, - failedRequests, }; } @@ -825,3 +820,98 @@ export function startProcessingScheduler(): void { } }); } + +/** + * Create batch requests for sessions needing AI processing + */ +async function createBatchRequestsForSessions(batchSize: number | null = null): Promise { + // Get sessions that need AI processing using the new status system + const sessionsNeedingAI = await getSessionsNeedingProcessing( + ProcessingStage.AI_ANALYSIS, + batchSize || 50 + ); + + if (sessionsNeedingAI.length === 0) { + process.stdout.write( + "[ProcessingScheduler] No sessions found requiring AI processing.\n" + ); + return; + } + + // Get session IDs that need processing + const sessionIds = sessionsNeedingAI.map( + (statusRecord) => statusRecord.sessionId + ); + + // Fetch full session data with messages + const sessionsToProcess = await prisma.session.findMany({ + where: { + id: { in: sessionIds }, + }, + include: { + messages: { + orderBy: { order: "asc" }, + }, + }, + }); + + // Filter to only sessions that have messages + const sessionsWithMessages = sessionsToProcess.filter( + (session) => session.messages && session.messages.length > 0 + ); + + if (sessionsWithMessages.length === 0) { + process.stdout.write( + "[ProcessingScheduler] No sessions with messages found requiring processing.\n" + ); + return; + } + + process.stdout.write( + `[ProcessingScheduler] Found ${sessionsWithMessages.length} sessions to convert to batch requests.\n` + ); + + // Start AI analysis stage for all sessions + for (const session of sessionsWithMessages) { + await startStage(session.id, ProcessingStage.AI_ANALYSIS); + } + + // Create AI processing requests for batch processing + const batchRequests = []; + for (const session of sessionsWithMessages) { + try { + // Get company's AI model + const aiModel = await getCompanyAIModel(session.companyId); + + // Create batch processing request record + const processingRequest = await prisma.aIProcessingRequest.create({ + data: { + sessionId: session.id, + model: aiModel, + promptTokens: 0, // Will be filled when batch completes + completionTokens: 0, + totalTokens: 0, + promptTokenCost: 0, + completionTokenCost: 0, + totalCostEur: 0, + processingType: "session_analysis", + success: false, // Will be updated when batch completes + processingStatus: AIRequestStatus.PENDING_BATCHING, + }, + }); + + batchRequests.push(processingRequest); + } catch (error) { + console.error(`Failed to create batch request for session ${session.id}:`, error); + await failStage( + session.id, + ProcessingStage.AI_ANALYSIS, + error instanceof Error ? error.message : String(error) + ); + } + } + + process.stdout.write( + `[ProcessingScheduler] Created ${batchRequests.length} batch processing requests.\n` + ); +} diff --git a/prisma/schema.prisma b/prisma/schema.prisma index dc26bbf..e15ae60 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -55,6 +55,7 @@ model Company { sessions Session[] imports SessionImport[] users User[] @relation("CompanyUsers") + aiBatchRequests AIBatchRequest[] @@index([name]) @@index([status]) @@ -245,15 +246,43 @@ model SessionQuestion { @@index([questionId]) } +/// * +/// * AI BATCH REQUEST TRACKING (OpenAI Batch API) +/// * Tracks batch jobs submitted to OpenAI for cost-efficient processing +model AIBatchRequest { + id String @id @default(cuid()) + companyId String + company Company @relation(fields: [companyId], references: [id]) + + /// OpenAI specific IDs + openaiBatchId String @unique + inputFileId String + outputFileId String? + errorFileId String? + + /// Our internal status tracking + status AIBatchRequestStatus @default(PENDING) + + /// Timestamps + createdAt DateTime @default(now()) @db.Timestamptz(6) + completedAt DateTime? @db.Timestamptz(6) + processedAt DateTime? @db.Timestamptz(6) // When we finished processing the results + + /// Relation to the individual requests included in this batch + processingRequests AIProcessingRequest[] + + @@index([companyId, status]) +} + /// * /// * AI PROCESSING COST TRACKING model AIProcessingRequest { - id String @id @default(uuid()) + id String @id @default(uuid()) sessionId String - openaiRequestId String? @db.VarChar(255) - model String @db.VarChar(100) - serviceTier String? @db.VarChar(50) - systemFingerprint String? @db.VarChar(255) + openaiRequestId String? @db.VarChar(255) + model String @db.VarChar(100) + serviceTier String? @db.VarChar(50) + systemFingerprint String? @db.VarChar(255) promptTokens Int completionTokens Int totalTokens Int @@ -263,21 +292,28 @@ model AIProcessingRequest { audioTokensCompletion Int? acceptedPredictionTokens Int? rejectedPredictionTokens Int? - promptTokenCost Float @db.Real - completionTokenCost Float @db.Real - totalCostEur Float @db.Real - processingType String @db.VarChar(100) + promptTokenCost Float @db.Real + completionTokenCost Float @db.Real + totalCostEur Float @db.Real + processingType String @db.VarChar(100) success Boolean errorMessage String? - requestedAt DateTime @default(now()) @db.Timestamptz(6) - completedAt DateTime? @db.Timestamptz(6) - session Session @relation(fields: [sessionId], references: [id], onDelete: Cascade) + requestedAt DateTime @default(now()) @db.Timestamptz(6) + completedAt DateTime? @db.Timestamptz(6) + + /// === NEW BATCH API FIELDS === + processingStatus AIRequestStatus @default(PENDING_BATCHING) + batchId String? + batch AIBatchRequest? @relation(fields: [batchId], references: [id]) + + session Session @relation(fields: [sessionId], references: [id], onDelete: Cascade) @@index([sessionId]) @@index([sessionId, requestedAt]) @@index([requestedAt]) @@index([model]) @@index([success, requestedAt]) + @@index([processingStatus]) // Add this index for efficient querying } /// * @@ -427,3 +463,37 @@ enum ProcessingStatus { /// Stage was intentionally skipped SKIPPED } + +/// OpenAI Batch Request Status +enum AIBatchRequestStatus { + /// Batch request created, waiting to upload file to OpenAI + PENDING + /// Currently uploading file to OpenAI + UPLOADING + /// OpenAI is validating the uploaded file + VALIDATING + /// Batch is queued or running on OpenAI's side + IN_PROGRESS + /// OpenAI is finalizing the batch results + FINALIZING + /// Batch completed on OpenAI, ready for processing + COMPLETED + /// We have processed the results from the completed batch + PROCESSED + /// Batch failed during processing + FAILED + /// Batch was cancelled + CANCELLED +} + +/// Status for individual AI processing requests within a batch +enum AIRequestStatus { + /// Request is waiting to be included in a batch + PENDING_BATCHING + /// Request is currently part of a batch being processed + BATCHING_IN_PROGRESS + /// Processing completed successfully + PROCESSING_COMPLETE + /// Processing failed + PROCESSING_FAILED +} diff --git a/server.ts b/server.ts index d5d3dbf..fd72df7 100644 --- a/server.ts +++ b/server.ts @@ -6,6 +6,7 @@ import { getSchedulerConfig, logEnvConfig, validateEnv } from "./lib/env.js"; import { startImportProcessingScheduler } from "./lib/importProcessor.js"; import { startProcessingScheduler } from "./lib/processingScheduler.js"; import { startCsvImportScheduler } from "./lib/scheduler.js"; +import { startBatchScheduler } from "./lib/batchScheduler.js"; const dev = process.env.NODE_ENV !== "production"; const hostname = "localhost"; @@ -33,6 +34,7 @@ app.prepare().then(() => { startCsvImportScheduler(); startImportProcessingScheduler(); startProcessingScheduler(); + startBatchScheduler(); console.log("All schedulers initialized successfully"); } diff --git a/tests/lib/transcriptParser.test.ts b/tests/lib/transcriptParser.test.ts index a3d734b..db686b1 100644 --- a/tests/lib/transcriptParser.test.ts +++ b/tests/lib/transcriptParser.test.ts @@ -108,13 +108,13 @@ User: Third expect(result.success).toBe(true); expect(result.messages).toHaveLength(3); - + // First message should be at start time expect(result.messages![0].timestamp.getTime()).toBe(startTime.getTime()); - + // Last message should be at end time expect(result.messages![2].timestamp.getTime()).toBe(endTime.getTime()); - + // Middle message should be between start and end const midTime = result.messages![1].timestamp.getTime(); expect(midTime).toBeGreaterThan(startTime.getTime()); @@ -174,7 +174,7 @@ System: Mixed case system expect(result.success).toBe(true); expect(result.messages).toHaveLength(2); - + // Check that timestamps were parsed correctly const firstTimestamp = result.messages![0].timestamp; expect(firstTimestamp.getFullYear()).toBe(2024);