- Add robust email service with rate limiting and configuration management - Implement shared rate limiter utility for consistent API protection - Create comprehensive test suite for core processing pipeline - Add API tests for dashboard metrics and authentication routes - Fix date range picker infinite loop issue - Improve session lookup in refresh sessions API - Refactor session API routing with better code organization - Update processing pipeline status monitoring - Clean up leftover files and improve code formatting
9.6 KiB
Refactor AI Session Processing Pipeline
This is a significant but valuable refactoring project. A detailed, well-structured prompt is key for getting a good result from a code-focused AI like Claude. Project: LiveDash-Node (
~/Projects/livedash-node-max-branch) Objective: Refactor our AI session processing pipeline to use the OpenAI Batch API for cost savings and higher throughput. Implement a new internal admin API under /api/admin/legacy/* to monitor and manage this new asynchronous workflow. Assignee: Claude Code
Context
Our current system processes AI analysis requests (like sentiment analysis, summarization, etc.) in a synchronous or simple asynchronous loop, likely via processingScheduler.ts. This is inefficient and costly at scale. We are moving to OpenAI's Batch API, which is fully asynchronous and requires a stateful, multi-stage processing architecture.
The term "legacy" in the API path /api/admin/legacy/* is intentional. It refers to the fact that our current method of getting data (CSV imports) is the "legacy" workflow. In the future, we plan to introduce a new API for clients to submit session data directly. This admin API is for monitoring the processing of data from our legacy import system.
Please follow the phased plan below precisely.
Phase 1: Database Schema Changes (prisma/schema.prisma)
First, we need to update our database schema to track the state of batch jobs and the individual requests within them.
-
Add the
AIBatchRequestmodel andAIBatchRequestStatusenum. This table will track the status of each batch job submitted to OpenAI.// Add this new model to your schema.prisma model AIBatchRequest { id String @id @default(cuid()) companyId String company Company @relation(fields: [companyId], references: [id]) // OpenAI specific IDs openaiBatchId String @unique inputFileId String outputFileId String? errorFileId String? // Our internal status tracking status AIBatchRequestStatus @default(PENDING) // Timestamps createdAt DateTime @default(now()) completedAt DateTime? processedAt DateTime? // When we finished processing the results // Relation to the individual requests included in this batch processingRequests AIProcessingRequest[] @@index([companyId, status]) } enum AIBatchRequestStatus { PENDING // We have created the batch in our DB, preparing to send to OpenAI UPLOADING // Uploading the .jsonl file VALIDATING // OpenAI is validating the file IN_PROGRESS // OpenAI is processing the batch FINALIZING // OpenAI is finalizing the results COMPLETED // OpenAI job is done, results are available for download PROCESSED // We have successfully downloaded and processed all results FAILED // The batch failed validation or expired CANCELLED // The batch was cancelled } -
Update the
AIProcessingRequestmodel and add theAIRequestStatusenum. We need to track the state of each individual request as it moves through the batching pipeline.// In your existing AIProcessingRequest model, add the new fields and enum. model AIProcessingRequest { // ... all existing fields (id, sessionId, token counts, etc.) // === ADD THESE NEW FIELDS === processingStatus AIRequestStatus @default(PENDING_BATCHING) batchId String? batch AIBatchRequest? @relation(fields: [batchId], references: [id]) // ============================ @@index([processingStatus]) // Add this index for efficient querying } enum AIRequestStatus { PENDING_BATCHING // Default state: waiting to be picked up by the batch creator BATCHING_IN_PROGRESS // It has been assigned to a batch that is currently running PROCESSING_COMPLETE // The batch finished and we successfully got a result for this request PROCESSING_FAILED // The batch finished but this specific request failed }
After modifying the schema, please run pnpm prisma:generate.
Phase 2: Implement the Batch Processing Schedulers
The core of this refactor is to replace the existing logic in lib/processingScheduler.ts with a two-stage scheduler system. You can create new files for this logic (e.g., lib/batchCreator.ts, lib/batchPoller.ts) and integrate them into lib/schedulers.ts.
Scheduler 1: Batch Creation (lib/batchCreator.ts)
This scheduler runs periodically (e.g., every 10 minutes) to bundle pending requests into a batch.
Functionality:
- Query the database for
AIProcessingRequestrecords withprocessingStatus:PENDING_BATCHING. - Group these requests by the AI model they need to use (e.g.,
gpt-4-turbo). The Batch API requires one model per batch file. - For each model group:
- Generate a
.jsonlstring. Each line must be a valid OpenAI batch request. - Crucially, use our internal
AIProcessingRequest.idas thecustom_idin each JSON line. This is how we will map results back. - Upload the
.jsonlcontent to OpenAI usingopenai.files.create({ file: Buffer.from(jsonlContent), purpose: 'batch' }). - Create the batch job using
openai.batches.create()with the returnedinput_file_id. - In a single database transaction:
- Create a new
AIBatchRequestrecord in our database, storing theopenaiBatchId,inputFileId, and setting the initial status toVALIDATING. - Update all the
AIProcessingRequestrecords included in this batch to set theirprocessingStatustoBATCHING_IN_PROGRESSand link them via thebatchId.
- Create a new
- Generate a
Scheduler 2: Result Polling (lib/batchPoller.ts)
This scheduler runs more frequently (e.g., every 2 minutes) to check for and process completed jobs.
Functionality:
- Query our database for
AIBatchRequestrecords with a status that is still in-flight (e.g.,VALIDATING,IN_PROGRESS,FINALIZING). - For each active batch, call
openai.batches.retrieve(batch.openaiBatchId)to get the latest status from OpenAI. - Update the status of our
AIBatchRequestrecord to match the one from OpenAI. - If a batch's status becomes completed:
- Update its status in our DB and store the
output_file_idanderror_file_id. - Download the content of the
output_file_idfrom OpenAI. - Parse the resulting .jsonl file line by line. For each line:
- Use the
custom_idto find our originalAIProcessingRequestrecord. - If the line contains a response, parse the AI content and usage data. Update our
AIProcessingRequestrecord with this data and set itsprocessingStatustoPROCESSING_COMPLETE. - If the line contains an error, log it and set the
processingStatustoPROCESSING_FAILED.
- Use the
- Do the same for the
error_file_idif it exists. - Once all results are processed, update the parent
AIBatchRequeststatus toPROCESSEDand set itsprocessedAttimestamp.
- Update its status in our DB and store the
Phase 3: Implement the Internal Admin API
Create a new set of internal API endpoints for monitoring and managing this process.
- Location:
app/api/admin/legacy/ - Authentication: Protect all these endpoints with our most secure admin-level authentication middleware (e.g., from
lib/platform-auth.ts). Access should be strictly limited.
Endpoint 1: Get Summary
-
Route:
GET/api/admin/legacy/summary -
Description: Returns a count of all
AIProcessingRequestrecords, grouped byprocessingStatus. -
Response:
{ "ok": true, "summary": { "pending_batching": 15231, "batching_in_progress": 2500, "processing_complete": 85432, "processing_failed": 78 } }
Endpoint 2: List Requests
-
Route:
GET/api/admin/legacy/requests -
Description: Retrieves a paginated list of
AIProcessingRequestrecords, filterable bystatus. -
Query Params:
status(required),limit(optional),cursor(optional). -
Response:
{ "ok": true, "requests": [ { "id": "...", "sessionId": "...", "status": "processing_failed", ... } ], "nextCursor": "..." }
Endpoint 3: Re-queue Failed Requests
-
Route:
POST/api/admin/legacy/requests/requeue -
Description: Resets the status of specified failed requests back to
PENDING_BATCHINGso they can be re-processed in a new batch. -
Request Body:
{ "requestIds": ["req_id_1", "req_id_2", ...] } -
Response:
{ "ok": true, "requeuedCount": 2, "notFoundCount": 0 }
Phase 4: Final Integration and Cleanup
- Update
server.tsandlib/schedulers.ts: Disable the oldprocessingSchedulerand enable the two new schedulers (batchCreator,batchPoller). Ensure they are controlled by environment variables (e.g.,BATCH_CREATION_ENABLED,BATCH_POLLING_ENABLED). - Documentation: Add a section to
CLAUDE.mdor a new file indocs/explaining the new batch processing architecture and the purpose of the admin API endpoints. - Environment Variables: Add any new required environment variables to
.env.example.
Please proceed with this refactoring plan. Implement robust logging throughout the new schedulers to ensure we can debug the pipeline effectively.