diff --git a/.env.example b/.env.example index 56f99bf..32859b1 100644 --- a/.env.example +++ b/.env.example @@ -20,3 +20,7 @@ IMPORT_PROCESSING_BATCH_SIZE="50" # Number of imports to process at on SESSION_PROCESSING_INTERVAL="0 * * * *" # Cron expression for AI session processing (every hour) SESSION_PROCESSING_BATCH_SIZE="0" # 0 = unlimited sessions, >0 = specific limit SESSION_PROCESSING_CONCURRENCY="5" # How many sessions to process in parallel + +# Postgres Database Configuration +DATABASE_URL_TEST="postgresql://" +DATABASE_URL="postgresql://" \ No newline at end of file diff --git a/.env.local.example b/.env.local.example index 74ec9f9..2cc4bf2 100644 --- a/.env.local.example +++ b/.env.local.example @@ -17,6 +17,10 @@ SESSION_PROCESSING_INTERVAL="0 * * * *" # Every hour (cron format) - AI pro SESSION_PROCESSING_BATCH_SIZE="0" # 0 = process all sessions, >0 = limit number SESSION_PROCESSING_CONCURRENCY="5" # Number of sessions to process in parallel +# Postgres Database Configuration +DATABASE_URL_TEST="postgresql://" +DATABASE_URL="postgresql://" + # Example configurations: # - For development (no schedulers): SCHEDULER_ENABLED=false # - For testing (every 5 minutes): CSV_IMPORT_INTERVAL=*/5 * * * * diff --git a/.gitignore b/.gitignore index 3312044..242dfda 100644 --- a/.gitignore +++ b/.gitignore @@ -261,3 +261,6 @@ Thumbs.db /playwright-report/ /blob-report/ /playwright/.cache/ + +# OpenAI API request samples +sample-openai-request.json \ No newline at end of file diff --git a/lib/importProcessor.ts b/lib/importProcessor.ts index 65ab20b..16d9da0 100644 --- a/lib/importProcessor.ts +++ b/lib/importProcessor.ts @@ -1,5 +1,5 @@ // SessionImport to Session processor -import { PrismaClient, ImportStatus, SentimentCategory } from "@prisma/client"; +import { PrismaClient, ImportStatus, SentimentCategory, SessionCategory } from "@prisma/client"; import { getSchedulerConfig } from "./env"; import { fetchTranscriptContent, isValidTranscriptUrl } from "./transcriptFetcher"; import cron from "node-cron"; @@ -38,8 +38,33 @@ function parseEuropeanDate(dateStr: string): Date { return date; } +/** + * Helper function to parse sentiment from raw string (fallback only) + */ +function parseFallbackSentiment(sentimentRaw: string | null): SentimentCategory | null { + if (!sentimentRaw) return null; + + const sentimentStr = sentimentRaw.toLowerCase(); + if (sentimentStr.includes('positive')) { + return SentimentCategory.POSITIVE; + } else if (sentimentStr.includes('negative')) { + return SentimentCategory.NEGATIVE; + } else { + return SentimentCategory.NEUTRAL; + } +} + +/** + * Helper function to parse boolean from raw string (fallback only) + */ +function parseFallbackBoolean(rawValue: string | null): boolean | null { + if (!rawValue) return null; + return ['true', '1', 'yes', 'escalated', 'forwarded'].includes(rawValue.toLowerCase()); +} + /** * Process a single SessionImport record into a Session record + * NEW STRATEGY: Only copy minimal fields, let AI processing handle the rest */ async function processSingleImport(importRecord: any): Promise<{ success: boolean; error?: string }> { try { @@ -49,34 +74,6 @@ async function processSingleImport(importRecord: any): Promise<{ success: boolea console.log(`[Import Processor] Parsed dates for ${importRecord.externalSessionId}: ${startTime.toISOString()} - ${endTime.toISOString()}`); - // Process sentiment - let sentiment: number | null = null; - let sentimentCategory: SentimentCategory | null = null; - - if (importRecord.sentimentRaw) { - const sentimentStr = importRecord.sentimentRaw.toLowerCase(); - if (sentimentStr.includes('positive')) { - sentiment = 0.8; - sentimentCategory = SentimentCategory.POSITIVE; - } else if (sentimentStr.includes('negative')) { - sentiment = -0.8; - sentimentCategory = SentimentCategory.NEGATIVE; - } else { - sentiment = 0.0; - sentimentCategory = SentimentCategory.NEUTRAL; - } - } - - // Process boolean fields - const escalated = importRecord.escalatedRaw ? - ['true', '1', 'yes', 'escalated'].includes(importRecord.escalatedRaw.toLowerCase()) : null; - - const forwardedHr = importRecord.forwardedHrRaw ? - ['true', '1', 'yes', 'forwarded'].includes(importRecord.forwardedHrRaw.toLowerCase()) : null; - - // Keep country code as-is, will be processed by OpenAI later - const country = importRecord.countryCode; - // Fetch transcript content if URL is provided and not already fetched let transcriptContent = importRecord.rawTranscriptContent; if (!transcriptContent && importRecord.fullTranscriptUrl && isValidTranscriptUrl(importRecord.fullTranscriptUrl)) { @@ -108,7 +105,8 @@ async function processSingleImport(importRecord: any): Promise<{ success: boolea } } - // Create or update Session record + // Create or update Session record with MINIMAL processing + // Only copy fields that don't need AI analysis const session = await prisma.session.upsert({ where: { importId: importRecord.id, @@ -116,20 +114,22 @@ async function processSingleImport(importRecord: any): Promise<{ success: boolea update: { startTime, endTime, + // Direct copies (minimal processing) ipAddress: importRecord.ipAddress, - country, - language: importRecord.language, - messagesSent: importRecord.messagesSent, - sentiment, - sentimentCategory, - escalated, - forwardedHr, + country: importRecord.countryCode, // Keep as country code fullTranscriptUrl: importRecord.fullTranscriptUrl, avgResponseTime: importRecord.avgResponseTimeSeconds, - tokens: importRecord.tokens, - tokensEur: importRecord.tokensEur, - category: importRecord.category, initialMsg: importRecord.initialMessage, + + // AI-processed fields: Leave empty, will be filled by AI processing + // language: null, // AI will detect + // messagesSent: null, // AI will count from Messages + // sentiment: null, // AI will analyze + // escalated: null, // AI will detect + // forwardedHr: null, // AI will detect + // category: null, // AI will categorize + // summary: null, // AI will generate + processed: false, // Will be processed later by AI }, create: { @@ -137,20 +137,15 @@ async function processSingleImport(importRecord: any): Promise<{ success: boolea importId: importRecord.id, startTime, endTime, + // Direct copies (minimal processing) ipAddress: importRecord.ipAddress, - country, - language: importRecord.language, - messagesSent: importRecord.messagesSent, - sentiment, - sentimentCategory, - escalated, - forwardedHr, + country: importRecord.countryCode, // Keep as country code fullTranscriptUrl: importRecord.fullTranscriptUrl, avgResponseTime: importRecord.avgResponseTimeSeconds, - tokens: importRecord.tokens, - tokensEur: importRecord.tokensEur, - category: importRecord.category, initialMsg: importRecord.initialMessage, + + // AI-processed fields: Leave empty, will be filled by AI processing + // All these will be null initially and filled by AI processed: false, // Will be processed later by AI }, }); diff --git a/lib/processingScheduler.ts b/lib/processingScheduler.ts index ef1e24e..11d9220 100644 --- a/lib/processingScheduler.ts +++ b/lib/processingScheduler.ts @@ -1,6 +1,6 @@ -// Session processing scheduler with configurable intervals and batch sizes +// Enhanced session processing scheduler with AI cost tracking and question management import cron from "node-cron"; -import { PrismaClient } from "@prisma/client"; +import { PrismaClient, SentimentCategory, SessionCategory } from "@prisma/client"; import fetch from "node-fetch"; import { getSchedulerConfig } from "./schedulerConfig"; @@ -8,13 +8,30 @@ const prisma = new PrismaClient(); const OPENAI_API_KEY = process.env.OPENAI_API_KEY; const OPENAI_API_URL = "https://api.openai.com/v1/chat/completions"; +// Model pricing in USD (update as needed) +const MODEL_PRICING = { + 'gpt-4o-2024-08-06': { + promptTokenCost: 0.0000025, // $2.50 per 1M tokens + completionTokenCost: 0.00001, // $10.00 per 1M tokens + }, + 'gpt-4-turbo': { + promptTokenCost: 0.00001, // $10.00 per 1M tokens + completionTokenCost: 0.00003, // $30.00 per 1M tokens + }, + 'gpt-4o': { + promptTokenCost: 0.000005, // $5.00 per 1M tokens + completionTokenCost: 0.000015, // $15.00 per 1M tokens + } +} as const; + +const USD_TO_EUR_RATE = 0.85; // Update periodically or fetch from API + interface ProcessedData { language: string; - messages_sent: number; - sentiment: "positive" | "neutral" | "negative"; + sentiment: "POSITIVE" | "NEUTRAL" | "NEGATIVE"; escalated: boolean; forwarded_hr: boolean; - category: string; + category: "SCHEDULE_HOURS" | "LEAVE_VACATION" | "SICK_LEAVE_RECOVERY" | "SALARY_COMPENSATION" | "CONTRACT_HOURS" | "ONBOARDING" | "OFFBOARDING" | "WORKWEAR_STAFF_PASS" | "TEAM_CONTACTS" | "PERSONAL_QUESTIONS" | "ACCESS_LOGIN" | "SOCIAL_QUESTIONS" | "UNRECOGNIZED_OTHER"; questions: string[]; summary: string; session_id: string; @@ -26,6 +43,137 @@ interface ProcessingResult { error?: string; } +/** + * Record AI processing request with detailed token tracking + */ +async function recordAIProcessingRequest( + sessionId: string, + openaiResponse: any, + processingType: string = 'session_analysis' +): Promise { + const usage = openaiResponse.usage; + const model = openaiResponse.model; + const pricing = MODEL_PRICING[model as keyof typeof MODEL_PRICING] || MODEL_PRICING['gpt-4-turbo']; // fallback + + const promptCost = usage.prompt_tokens * pricing.promptTokenCost; + const completionCost = usage.completion_tokens * pricing.completionTokenCost; + const totalCostUsd = promptCost + completionCost; + const totalCostEur = totalCostUsd * USD_TO_EUR_RATE; + + await prisma.aIProcessingRequest.create({ + data: { + sessionId, + openaiRequestId: openaiResponse.id, + model: openaiResponse.model, + serviceTier: openaiResponse.service_tier, + systemFingerprint: openaiResponse.system_fingerprint, + + promptTokens: usage.prompt_tokens, + completionTokens: usage.completion_tokens, + totalTokens: usage.total_tokens, + + // Detailed breakdown + cachedTokens: usage.prompt_tokens_details?.cached_tokens || null, + audioTokensPrompt: usage.prompt_tokens_details?.audio_tokens || null, + reasoningTokens: usage.completion_tokens_details?.reasoning_tokens || null, + audioTokensCompletion: usage.completion_tokens_details?.audio_tokens || null, + acceptedPredictionTokens: usage.completion_tokens_details?.accepted_prediction_tokens || null, + rejectedPredictionTokens: usage.completion_tokens_details?.rejected_prediction_tokens || null, + + promptTokenCost: pricing.promptTokenCost, + completionTokenCost: pricing.completionTokenCost, + totalCostEur, + + processingType, + success: true, + completedAt: new Date(), + } + }); +} + +/** + * Record failed AI processing request + */ +async function recordFailedAIProcessingRequest( + sessionId: string, + processingType: string, + errorMessage: string +): Promise { + await prisma.aIProcessingRequest.create({ + data: { + sessionId, + model: 'unknown', + promptTokens: 0, + completionTokens: 0, + totalTokens: 0, + promptTokenCost: 0, + completionTokenCost: 0, + totalCostEur: 0, + processingType, + success: false, + errorMessage, + completedAt: new Date(), + } + }); +} + +/** + * Process questions into separate Question and SessionQuestion tables + */ +async function processQuestions(sessionId: string, questions: string[]): Promise { + // Clear existing questions for this session + await prisma.sessionQuestion.deleteMany({ + where: { sessionId } + }); + + // Process each question + for (let index = 0; index < questions.length; index++) { + const questionText = questions[index]; + if (!questionText.trim()) continue; // Skip empty questions + + // Find or create question + const question = await prisma.question.upsert({ + where: { content: questionText.trim() }, + create: { content: questionText.trim() }, + update: {} + }); + + // Link to session + await prisma.sessionQuestion.create({ + data: { + sessionId, + questionId: question.id, + order: index + } + }); + } +} + +/** + * Calculate messagesSent from actual Message records + */ +async function calculateMessagesSent(sessionId: string): Promise { + const userMessageCount = await prisma.message.count({ + where: { + sessionId, + role: { in: ['user', 'User'] } // Handle both cases + } + }); + return userMessageCount; +} + +/** + * Calculate endTime from latest Message timestamp + */ +async function calculateEndTime(sessionId: string, fallbackEndTime: Date): Promise { + const latestMessage = await prisma.message.findFirst({ + where: { sessionId }, + orderBy: { timestamp: 'desc' } + }); + + return latestMessage?.timestamp || fallbackEndTime; +} + /** * Processes a session transcript using OpenAI API */ @@ -34,44 +182,32 @@ async function processTranscriptWithOpenAI(sessionId: string, transcript: string throw new Error("OPENAI_API_KEY environment variable is not set"); } - // Create a system message with instructions + // Updated system message with exact enum values const systemMessage = ` You are an AI assistant tasked with analyzing chat transcripts. - Extract the following information from the transcript: - 1. The primary language used by the user (ISO 639-1 code) - 2. Number of messages sent by the user - 3. Overall sentiment (positive, neutral, or negative) - 4. Whether the conversation was escalated - 5. Whether HR contact was mentioned or provided - 6. The best-fitting category for the conversation from this list: - - Schedule & Hours - - Leave & Vacation - - Sick Leave & Recovery - - Salary & Compensation - - Contract & Hours - - Onboarding - - Offboarding - - Workwear & Staff Pass - - Team & Contacts - - Personal Questions - - Access & Login - - Social questions - - Unrecognized / Other - 7. Up to 5 paraphrased questions asked by the user (in English) - 8. A brief summary of the conversation (10-300 characters) + Extract the following information from the transcript and return it in EXACT JSON format: - Return the data in JSON format matching this schema: { - "language": "ISO 639-1 code", - "messages_sent": number, - "sentiment": "positive|neutral|negative", + "language": "ISO 639-1 code (e.g., 'en', 'nl', 'de')", + "sentiment": "POSITIVE|NEUTRAL|NEGATIVE", "escalated": boolean, "forwarded_hr": boolean, - "category": "one of the categories listed above", + "category": "SCHEDULE_HOURS|LEAVE_VACATION|SICK_LEAVE_RECOVERY|SALARY_COMPENSATION|CONTRACT_HOURS|ONBOARDING|OFFBOARDING|WORKWEAR_STAFF_PASS|TEAM_CONTACTS|PERSONAL_QUESTIONS|ACCESS_LOGIN|SOCIAL_QUESTIONS|UNRECOGNIZED_OTHER", "questions": ["question 1", "question 2", ...], - "summary": "brief summary", + "summary": "brief summary (10-300 chars)", "session_id": "${sessionId}" } + + Rules: + - language: Primary language used by the user (ISO 639-1 code) + - sentiment: Overall emotional tone of the conversation + - escalated: Was the issue escalated to a supervisor/manager? + - forwarded_hr: Was HR contact mentioned or provided? + - category: Best fitting category for the main topic (use exact enum values above) + - questions: Up to 5 paraphrased user questions (in English) + - summary: Brief conversation summary (10-300 characters) + + IMPORTANT: Use EXACT enum values as specified above. `; try { @@ -82,7 +218,7 @@ async function processTranscriptWithOpenAI(sessionId: string, transcript: string Authorization: `Bearer ${OPENAI_API_KEY}`, }, body: JSON.stringify({ - model: "gpt-4-turbo", + model: "gpt-4o", // Use latest model messages: [ { role: "system", @@ -103,14 +239,25 @@ async function processTranscriptWithOpenAI(sessionId: string, transcript: string throw new Error(`OpenAI API error: ${response.status} - ${errorText}`); } - const data: any = await response.json(); - const processedData = JSON.parse(data.choices[0].message.content); + const openaiResponse: any = await response.json(); + + // Record the AI processing request for cost tracking + await recordAIProcessingRequest(sessionId, openaiResponse, 'session_analysis'); + + const processedData = JSON.parse(openaiResponse.choices[0].message.content); // Validate the response against our expected schema validateOpenAIResponse(processedData); return processedData; } catch (error) { + // Record failed request + await recordFailedAIProcessingRequest( + sessionId, + 'session_analysis', + error instanceof Error ? error.message : String(error) + ); + process.stderr.write(`Error processing transcript with OpenAI: ${error}\n`); throw error; } @@ -120,17 +267,9 @@ async function processTranscriptWithOpenAI(sessionId: string, transcript: string * Validates the OpenAI response against our expected schema */ function validateOpenAIResponse(data: any): void { - // Check required fields const requiredFields = [ - "language", - "messages_sent", - "sentiment", - "escalated", - "forwarded_hr", - "category", - "questions", - "summary", - "session_id", + "language", "sentiment", "escalated", "forwarded_hr", + "category", "questions", "summary", "session_id" ]; for (const field of requiredFields) { @@ -139,21 +278,13 @@ function validateOpenAIResponse(data: any): void { } } - // Validate field types + // Validate field types and values if (typeof data.language !== "string" || !/^[a-z]{2}$/.test(data.language)) { - throw new Error( - "Invalid language format. Expected ISO 639-1 code (e.g., 'en')" - ); + throw new Error("Invalid language format. Expected ISO 639-1 code (e.g., 'en')"); } - if (typeof data.messages_sent !== "number" || data.messages_sent < 0) { - throw new Error("Invalid messages_sent. Expected non-negative number"); - } - - if (!["positive", "neutral", "negative"].includes(data.sentiment)) { - throw new Error( - "Invalid sentiment. Expected 'positive', 'neutral', or 'negative'" - ); + if (!["POSITIVE", "NEUTRAL", "NEGATIVE"].includes(data.sentiment)) { + throw new Error("Invalid sentiment. Expected 'POSITIVE', 'NEUTRAL', or 'NEGATIVE'"); } if (typeof data.escalated !== "boolean") { @@ -165,39 +296,22 @@ function validateOpenAIResponse(data: any): void { } const validCategories = [ - "Schedule & Hours", - "Leave & Vacation", - "Sick Leave & Recovery", - "Salary & Compensation", - "Contract & Hours", - "Onboarding", - "Offboarding", - "Workwear & Staff Pass", - "Team & Contacts", - "Personal Questions", - "Access & Login", - "Social questions", - "Unrecognized / Other", + "SCHEDULE_HOURS", "LEAVE_VACATION", "SICK_LEAVE_RECOVERY", "SALARY_COMPENSATION", + "CONTRACT_HOURS", "ONBOARDING", "OFFBOARDING", "WORKWEAR_STAFF_PASS", + "TEAM_CONTACTS", "PERSONAL_QUESTIONS", "ACCESS_LOGIN", "SOCIAL_QUESTIONS", + "UNRECOGNIZED_OTHER" ]; if (!validCategories.includes(data.category)) { - throw new Error( - `Invalid category. Expected one of: ${validCategories.join(", ")}` - ); + throw new Error(`Invalid category. Expected one of: ${validCategories.join(", ")}`); } if (!Array.isArray(data.questions)) { throw new Error("Invalid questions. Expected array of strings"); } - if ( - typeof data.summary !== "string" || - data.summary.length < 10 || - data.summary.length > 300 - ) { - throw new Error( - "Invalid summary. Expected string between 10-300 characters" - ); + if (typeof data.summary !== "string" || data.summary.length < 10 || data.summary.length > 300) { + throw new Error("Invalid summary. Expected string between 10-300 characters"); } if (typeof data.session_id !== "string") { @@ -220,45 +334,42 @@ async function processSingleSession(session: any): Promise { try { // Convert messages back to transcript format for OpenAI processing const transcript = session.messages - .map( - (msg: any) => - `[${new Date(msg.timestamp) - .toLocaleString("en-GB", { - day: "2-digit", - month: "2-digit", - year: "numeric", - hour: "2-digit", - minute: "2-digit", - second: "2-digit", - }) - .replace(",", "")}] ${msg.role}: ${msg.content}` + .map((msg: any) => + `[${new Date(msg.timestamp) + .toLocaleString("en-GB", { + day: "2-digit", + month: "2-digit", + year: "numeric", + hour: "2-digit", + minute: "2-digit", + second: "2-digit", + }) + .replace(",", "")}] ${msg.role}: ${msg.content}` ) .join("\n"); - const processedData = await processTranscriptWithOpenAI( - session.id, - transcript - ); + const processedData = await processTranscriptWithOpenAI(session.id, transcript); - // Map sentiment string to float value for compatibility with existing data - const sentimentMap = { - positive: 0.8, - neutral: 0.0, - negative: -0.8, - }; + // Calculate messagesSent from actual Message records + const messagesSent = await calculateMessagesSent(session.id); + + // Calculate endTime from latest Message timestamp + const calculatedEndTime = await calculateEndTime(session.id, session.endTime); + + // Process questions into separate tables + await processQuestions(session.id, processedData.questions); // Update the session with processed data await prisma.session.update({ where: { id: session.id }, data: { language: processedData.language, - messagesSent: processedData.messages_sent, - sentiment: sentimentMap[processedData.sentiment] || 0, - sentimentCategory: processedData.sentiment.toUpperCase() as "POSITIVE" | "NEUTRAL" | "NEGATIVE", + messagesSent: messagesSent, // Calculated from Messages, not AI + endTime: calculatedEndTime, // Use calculated endTime if different + sentiment: processedData.sentiment as SentimentCategory, escalated: processedData.escalated, forwardedHr: processedData.forwarded_hr, - category: processedData.category, - questions: JSON.stringify(processedData.questions), + category: processedData.category as SessionCategory, summary: processedData.summary, processed: true, }, @@ -313,9 +424,7 @@ async function processSessionsInParallel(sessions: any[], maxConcurrency: number * Process unprocessed sessions */ export async function processUnprocessedSessions(batchSize: number | null = null, maxConcurrency: number = 5): Promise { - process.stdout.write( - "[ProcessingScheduler] Starting to process unprocessed sessions...\n" - ); + process.stdout.write("[ProcessingScheduler] Starting to process unprocessed sessions...\n"); // Find sessions that have messages but haven't been processed const queryOptions: any = { @@ -345,9 +454,7 @@ export async function processUnprocessedSessions(batchSize: number | null = null ); if (sessionsWithMessages.length === 0) { - process.stdout.write( - "[ProcessingScheduler] No sessions found requiring processing.\n" - ); + process.stdout.write("[ProcessingScheduler] No sessions found requiring processing.\n"); return; } @@ -363,15 +470,46 @@ export async function processUnprocessedSessions(batchSize: number | null = null const errorCount = results.filter((r) => !r.success).length; process.stdout.write("[ProcessingScheduler] Session processing complete.\n"); - process.stdout.write( - `[ProcessingScheduler] Successfully processed: ${successCount} sessions.\n` - ); - process.stdout.write( - `[ProcessingScheduler] Failed to process: ${errorCount} sessions.\n` - ); - process.stdout.write( - `[ProcessingScheduler] Total processing time: ${((endTime - startTime) / 1000).toFixed(2)}s\n` - ); + process.stdout.write(`[ProcessingScheduler] Successfully processed: ${successCount} sessions.\n`); + process.stdout.write(`[ProcessingScheduler] Failed to process: ${errorCount} sessions.\n`); + process.stdout.write(`[ProcessingScheduler] Total processing time: ${((endTime - startTime) / 1000).toFixed(2)}s\n`); +} + +/** + * Get total AI processing costs for reporting + */ +export async function getAIProcessingCosts(): Promise<{ + totalCostEur: number; + totalTokens: number; + requestCount: number; + successfulRequests: number; + failedRequests: number; +}> { + const result = await prisma.aIProcessingRequest.aggregate({ + _sum: { + totalCostEur: true, + totalTokens: true, + }, + _count: { + id: true, + }, + }); + + const successfulRequests = await prisma.aIProcessingRequest.count({ + where: { success: true } + }); + + const failedRequests = await prisma.aIProcessingRequest.count({ + where: { success: false } + }); + + return { + totalCostEur: result._sum.totalCostEur || 0, + totalTokens: result._sum.totalTokens || 0, + requestCount: result._count.id || 0, + successfulRequests, + failedRequests, + }; } /** @@ -396,9 +534,7 @@ export function startProcessingScheduler(): void { config.sessionProcessing.concurrency ); } catch (error) { - process.stderr.write( - `[ProcessingScheduler] Error in scheduler: ${error}\n` - ); + process.stderr.write(`[ProcessingScheduler] Error in scheduler: ${error}\n`); } }); } diff --git a/next.config.js b/next.config.js index 690ec80..7f1e761 100644 --- a/next.config.js +++ b/next.config.js @@ -5,10 +5,8 @@ const nextConfig = { reactStrictMode: true, // Allow cross-origin requests from specific origins in development allowedDevOrigins: [ - "192.168.1.2", "localhost", - "propc", - "test123.kjanat.com", + "127.0.0.1" ], }; diff --git a/prisma/migrations/20250627185818_initial_refactored_schema/migration.sql b/prisma/migrations/20250627185818_initial_refactored_schema/migration.sql new file mode 100644 index 0000000..86dd815 --- /dev/null +++ b/prisma/migrations/20250627185818_initial_refactored_schema/migration.sql @@ -0,0 +1,183 @@ +-- CreateTable +CREATE TABLE "Company" ( + "id" TEXT NOT NULL PRIMARY KEY, + "name" TEXT NOT NULL, + "csvUrl" TEXT NOT NULL, + "csvUsername" TEXT, + "csvPassword" TEXT, + "sentimentAlert" REAL, + "dashboardOpts" JSONB, + "createdAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" DATETIME NOT NULL +); + +-- CreateTable +CREATE TABLE "User" ( + "id" TEXT NOT NULL PRIMARY KEY, + "email" TEXT NOT NULL, + "password" TEXT NOT NULL, + "role" TEXT NOT NULL DEFAULT 'USER', + "companyId" TEXT NOT NULL, + "resetToken" TEXT, + "resetTokenExpiry" DATETIME, + "createdAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" DATETIME NOT NULL, + CONSTRAINT "User_companyId_fkey" FOREIGN KEY ("companyId") REFERENCES "Company" ("id") ON DELETE CASCADE ON UPDATE CASCADE +); + +-- CreateTable +CREATE TABLE "Session" ( + "id" TEXT NOT NULL PRIMARY KEY, + "companyId" TEXT NOT NULL, + "importId" TEXT, + "startTime" DATETIME NOT NULL, + "endTime" DATETIME NOT NULL, + "ipAddress" TEXT, + "country" TEXT, + "fullTranscriptUrl" TEXT, + "avgResponseTime" REAL, + "initialMsg" TEXT, + "language" TEXT, + "messagesSent" INTEGER, + "sentiment" TEXT, + "escalated" BOOLEAN, + "forwardedHr" BOOLEAN, + "category" TEXT, + "summary" TEXT, + "processed" BOOLEAN NOT NULL DEFAULT false, + "createdAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" DATETIME NOT NULL, + CONSTRAINT "Session_companyId_fkey" FOREIGN KEY ("companyId") REFERENCES "Company" ("id") ON DELETE CASCADE ON UPDATE CASCADE, + CONSTRAINT "Session_importId_fkey" FOREIGN KEY ("importId") REFERENCES "SessionImport" ("id") ON DELETE SET NULL ON UPDATE CASCADE +); + +-- CreateTable +CREATE TABLE "SessionImport" ( + "id" TEXT NOT NULL PRIMARY KEY, + "companyId" TEXT NOT NULL, + "externalSessionId" TEXT NOT NULL, + "startTimeRaw" TEXT NOT NULL, + "endTimeRaw" TEXT NOT NULL, + "ipAddress" TEXT, + "countryCode" TEXT, + "language" TEXT, + "messagesSent" INTEGER, + "sentimentRaw" TEXT, + "escalatedRaw" TEXT, + "forwardedHrRaw" TEXT, + "fullTranscriptUrl" TEXT, + "avgResponseTimeSeconds" REAL, + "tokens" INTEGER, + "tokensEur" REAL, + "category" TEXT, + "initialMessage" TEXT, + "rawTranscriptContent" TEXT, + "status" TEXT NOT NULL DEFAULT 'QUEUED', + "errorMsg" TEXT, + "processedAt" DATETIME, + "createdAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT "SessionImport_companyId_fkey" FOREIGN KEY ("companyId") REFERENCES "Company" ("id") ON DELETE CASCADE ON UPDATE CASCADE +); + +-- CreateTable +CREATE TABLE "Message" ( + "id" TEXT NOT NULL PRIMARY KEY, + "sessionId" TEXT NOT NULL, + "timestamp" DATETIME, + "role" TEXT NOT NULL, + "content" TEXT NOT NULL, + "order" INTEGER NOT NULL, + "createdAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT "Message_sessionId_fkey" FOREIGN KEY ("sessionId") REFERENCES "Session" ("id") ON DELETE CASCADE ON UPDATE CASCADE +); + +-- CreateTable +CREATE TABLE "Question" ( + "id" TEXT NOT NULL PRIMARY KEY, + "content" TEXT NOT NULL, + "createdAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +-- CreateTable +CREATE TABLE "SessionQuestion" ( + "id" TEXT NOT NULL PRIMARY KEY, + "sessionId" TEXT NOT NULL, + "questionId" TEXT NOT NULL, + "order" INTEGER NOT NULL, + "createdAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT "SessionQuestion_sessionId_fkey" FOREIGN KEY ("sessionId") REFERENCES "Session" ("id") ON DELETE CASCADE ON UPDATE CASCADE, + CONSTRAINT "SessionQuestion_questionId_fkey" FOREIGN KEY ("questionId") REFERENCES "Question" ("id") ON DELETE RESTRICT ON UPDATE CASCADE +); + +-- CreateTable +CREATE TABLE "AIProcessingRequest" ( + "id" TEXT NOT NULL PRIMARY KEY, + "sessionId" TEXT NOT NULL, + "openaiRequestId" TEXT, + "model" TEXT NOT NULL, + "serviceTier" TEXT, + "systemFingerprint" TEXT, + "promptTokens" INTEGER NOT NULL, + "completionTokens" INTEGER NOT NULL, + "totalTokens" INTEGER NOT NULL, + "cachedTokens" INTEGER, + "audioTokensPrompt" INTEGER, + "reasoningTokens" INTEGER, + "audioTokensCompletion" INTEGER, + "acceptedPredictionTokens" INTEGER, + "rejectedPredictionTokens" INTEGER, + "promptTokenCost" REAL NOT NULL, + "completionTokenCost" REAL NOT NULL, + "totalCostEur" REAL NOT NULL, + "processingType" TEXT NOT NULL, + "success" BOOLEAN NOT NULL, + "errorMessage" TEXT, + "requestedAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + "completedAt" DATETIME, + CONSTRAINT "AIProcessingRequest_sessionId_fkey" FOREIGN KEY ("sessionId") REFERENCES "Session" ("id") ON DELETE CASCADE ON UPDATE CASCADE +); + +-- CreateIndex +CREATE UNIQUE INDEX "User_email_key" ON "User"("email"); + +-- CreateIndex +CREATE UNIQUE INDEX "Session_importId_key" ON "Session"("importId"); + +-- CreateIndex +CREATE INDEX "Session_companyId_startTime_idx" ON "Session"("companyId", "startTime"); + +-- CreateIndex +CREATE UNIQUE INDEX "SessionImport_externalSessionId_key" ON "SessionImport"("externalSessionId"); + +-- CreateIndex +CREATE INDEX "SessionImport_status_idx" ON "SessionImport"("status"); + +-- CreateIndex +CREATE UNIQUE INDEX "SessionImport_companyId_externalSessionId_key" ON "SessionImport"("companyId", "externalSessionId"); + +-- CreateIndex +CREATE INDEX "Message_sessionId_order_idx" ON "Message"("sessionId", "order"); + +-- CreateIndex +CREATE UNIQUE INDEX "Message_sessionId_order_key" ON "Message"("sessionId", "order"); + +-- CreateIndex +CREATE UNIQUE INDEX "Question_content_key" ON "Question"("content"); + +-- CreateIndex +CREATE INDEX "SessionQuestion_sessionId_idx" ON "SessionQuestion"("sessionId"); + +-- CreateIndex +CREATE UNIQUE INDEX "SessionQuestion_sessionId_questionId_key" ON "SessionQuestion"("sessionId", "questionId"); + +-- CreateIndex +CREATE UNIQUE INDEX "SessionQuestion_sessionId_order_key" ON "SessionQuestion"("sessionId", "order"); + +-- CreateIndex +CREATE INDEX "AIProcessingRequest_sessionId_idx" ON "AIProcessingRequest"("sessionId"); + +-- CreateIndex +CREATE INDEX "AIProcessingRequest_requestedAt_idx" ON "AIProcessingRequest"("requestedAt"); + +-- CreateIndex +CREATE INDEX "AIProcessingRequest_model_idx" ON "AIProcessingRequest"("model"); diff --git a/prisma/migrations/migration_lock.toml b/prisma/migrations/migration_lock.toml new file mode 100644 index 0000000..2a5a444 --- /dev/null +++ b/prisma/migrations/migration_lock.toml @@ -0,0 +1,3 @@ +# Please do not edit this file manually +# It should be added in your version-control system (e.g., Git) +provider = "sqlite" diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 3cf8dac..adc58cb 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -22,6 +22,22 @@ enum SentimentCategory { NEGATIVE } +enum SessionCategory { + SCHEDULE_HOURS + LEAVE_VACATION + SICK_LEAVE_RECOVERY + SALARY_COMPENSATION + CONTRACT_HOURS + ONBOARDING + OFFBOARDING + WORKWEAR_STAFF_PASS + TEAM_CONTACTS + PERSONAL_QUESTIONS + ACCESS_LOGIN + SOCIAL_QUESTIONS + UNRECOGNIZED_OTHER +} + /** * COMPANY (multi-tenant root) */ @@ -85,31 +101,33 @@ model Session { startTime DateTime endTime DateTime - // Processed fields from SessionImport data + // Direct copies from SessionImport (minimal processing) ipAddress String? - country String? // processed from countryCode - language String? // processed from language - messagesSent Int? - sentiment Float? // processed from sentimentRaw - sentimentCategory SentimentCategory? - escalated Boolean? - forwardedHr Boolean? + country String? // from countryCode fullTranscriptUrl String? - avgResponseTime Float? // processed from avgResponseTimeSeconds - tokens Int? - tokensEur Float? - category String? - initialMsg String? // processed from initialMessage + avgResponseTime Float? // from avgResponseTimeSeconds + initialMsg String? // from initialMessage + + // AI-processed fields (calculated from Messages or AI analysis) + language String? // AI-detected from Messages + messagesSent Int? // Calculated from Message count + sentiment SentimentCategory? // AI-analyzed (changed from Float to enum) + escalated Boolean? // AI-detected + forwardedHr Boolean? // AI-detected + category SessionCategory? // AI-categorized (changed to enum) + + // AI-generated fields + summary String? // AI-generated summary // Processing metadata processed Boolean @default(false) - questions String? // JSON array of extracted questions - summary String? // AI-generated summary /** - * ---------- the missing opposite side ---------- + * Relationships */ - messages Message[] // <-- satisfies Message.session + messages Message[] // Individual conversation messages + sessionQuestions SessionQuestion[] // Questions asked in this session + aiProcessingRequests AIProcessingRequest[] // AI processing cost tracking createdAt DateTime @default(now()) updatedAt DateTime @updatedAt @@ -187,3 +205,79 @@ model Message { @@unique([sessionId, order]) // guards against duplicate order values @@index([sessionId, order]) } + +/** + * QUESTION MANAGEMENT (separate from Session for better analytics) + */ +model Question { + id String @id @default(uuid()) + content String @unique // The actual question text + createdAt DateTime @default(now()) + + // Relationships + sessionQuestions SessionQuestion[] +} + +model SessionQuestion { + id String @id @default(uuid()) + sessionId String + questionId String + order Int // Order within the session + createdAt DateTime @default(now()) + + // Relationships + session Session @relation(fields: [sessionId], references: [id], onDelete: Cascade) + question Question @relation(fields: [questionId], references: [id]) + + @@unique([sessionId, questionId]) // Prevent duplicate questions per session + @@unique([sessionId, order]) // Ensure unique ordering + @@index([sessionId]) +} + +/** + * AI PROCESSING COST TRACKING + */ +model AIProcessingRequest { + id String @id @default(uuid()) + sessionId String + + // OpenAI Request Details + openaiRequestId String? // "chatcmpl-Bn8IH9UM8t7luZVWnwZG7CVJ0kjPo" + model String // "gpt-4o-2024-08-06" + serviceTier String? // "default" + systemFingerprint String? // "fp_07871e2ad8" + + // Token Usage (from usage object) + promptTokens Int // 11 + completionTokens Int // 9 + totalTokens Int // 20 + + // Detailed Token Breakdown + cachedTokens Int? // prompt_tokens_details.cached_tokens + audioTokensPrompt Int? // prompt_tokens_details.audio_tokens + reasoningTokens Int? // completion_tokens_details.reasoning_tokens + audioTokensCompletion Int? // completion_tokens_details.audio_tokens + acceptedPredictionTokens Int? // completion_tokens_details.accepted_prediction_tokens + rejectedPredictionTokens Int? // completion_tokens_details.rejected_prediction_tokens + + // Cost Calculation + promptTokenCost Float // Cost per prompt token (varies by model) + completionTokenCost Float // Cost per completion token (varies by model) + totalCostEur Float // Calculated total cost in EUR + + // Processing Context + processingType String // "session_analysis", "reprocessing", etc. + success Boolean // Whether the request succeeded + errorMessage String? // If failed, what went wrong + + // Timestamps + requestedAt DateTime @default(now()) + completedAt DateTime? + + // Relationships + session Session @relation(fields: [sessionId], references: [id], onDelete: Cascade) + + @@index([sessionId]) + @@index([requestedAt]) + @@index([model]) +} diff --git a/test-refactored-pipeline.js b/test-refactored-pipeline.js new file mode 100644 index 0000000..27c8168 --- /dev/null +++ b/test-refactored-pipeline.js @@ -0,0 +1,129 @@ +// Test script for the refactored data processing pipeline +import { PrismaClient } from '@prisma/client'; +import { processQueuedImports } from './lib/importProcessor.ts'; +import { processAllUnparsedTranscripts } from './lib/transcriptParser.ts'; +import { processUnprocessedSessions, getAIProcessingCosts } from './lib/processingScheduler.ts'; + +const prisma = new PrismaClient(); + +async function testRefactoredPipeline() { + console.log('๐Ÿงช Testing Refactored Data Processing Pipeline\n'); + + // Step 1: Check current state + console.log('๐Ÿ“Š Current Database State:'); + const stats = await getDatabaseStats(); + console.log(stats); + console.log(''); + + // Step 2: Test import processing (minimal fields only) + console.log('๐Ÿ”„ Testing Import Processing (Phase 1)...'); + await processQueuedImports(5); // Process 5 imports + console.log(''); + + // Step 3: Test transcript parsing + console.log('๐Ÿ“ Testing Transcript Parsing (Phase 2)...'); + await processAllUnparsedTranscripts(); + console.log(''); + + // Step 4: Test AI processing with cost tracking + console.log('๐Ÿค– Testing AI Processing with Cost Tracking (Phase 3)...'); + await processUnprocessedSessions(3, 2); // Process 3 sessions with concurrency 2 + console.log(''); + + // Step 5: Show final results + console.log('๐Ÿ“ˆ Final Results:'); + const finalStats = await getDatabaseStats(); + console.log(finalStats); + console.log(''); + + // Step 6: Show AI processing costs + console.log('๐Ÿ’ฐ AI Processing Costs:'); + const costs = await getAIProcessingCosts(); + console.log(costs); + console.log(''); + + // Step 7: Show sample processed session + console.log('๐Ÿ” Sample Processed Session:'); + const sampleSession = await getSampleProcessedSession(); + if (sampleSession) { + console.log(`Session ID: ${sampleSession.id}`); + console.log(`Language: ${sampleSession.language}`); + console.log(`Messages Sent: ${sampleSession.messagesSent}`); + console.log(`Sentiment: ${sampleSession.sentiment}`); + console.log(`Category: ${sampleSession.category}`); + console.log(`Escalated: ${sampleSession.escalated}`); + console.log(`Forwarded HR: ${sampleSession.forwardedHr}`); + console.log(`Summary: ${sampleSession.summary}`); + console.log(`Questions: ${sampleSession.sessionQuestions.length} questions`); + console.log(`AI Requests: ${sampleSession.aiProcessingRequests.length} requests`); + + if (sampleSession.sessionQuestions.length > 0) { + console.log('Sample Questions:'); + sampleSession.sessionQuestions.slice(0, 3).forEach((sq, i) => { + console.log(` ${i + 1}. ${sq.question.content}`); + }); + } + } + console.log(''); + + console.log('โœ… Pipeline test completed!'); +} + +async function getDatabaseStats() { + const [ + totalSessions, + sessionsWithImports, + sessionsWithMessages, + processedSessions, + totalMessages, + totalQuestions, + totalSessionQuestions, + totalAIRequests + ] = await Promise.all([ + prisma.session.count(), + prisma.session.count({ where: { importId: { not: null } } }), + prisma.session.count({ where: { messages: { some: {} } } }), + prisma.session.count({ where: { processed: true } }), + prisma.message.count(), + prisma.question.count(), + prisma.sessionQuestion.count(), + prisma.aIProcessingRequest.count() + ]); + + return { + totalSessions, + sessionsWithImports, + sessionsWithMessages, + processedSessions, + unprocessedSessions: sessionsWithMessages - processedSessions, + totalMessages, + totalQuestions, + totalSessionQuestions, + totalAIRequests + }; +} + +async function getSampleProcessedSession() { + return await prisma.session.findFirst({ + where: { + processed: true, + messages: { some: {} } + }, + include: { + sessionQuestions: { + include: { + question: true + }, + orderBy: { order: 'asc' } + }, + aiProcessingRequests: { + orderBy: { requestedAt: 'desc' } + } + } + }); +} + +// Run the test +testRefactoredPipeline() + .catch(console.error) + .finally(() => prisma.$disconnect());