feat: Refactor data processing pipeline with AI cost tracking and enhanced session management

- Updated environment configuration to include Postgres database settings.
- Enhanced import processing to minimize field copying and rely on AI for analysis.
- Implemented detailed AI processing request tracking, including token usage and costs.
- Added new models for Question and SessionQuestion to manage user inquiries separately.
- Improved session processing scheduler with AI cost reporting functionality.
- Created a test script to validate the refactored pipeline and display processing statistics.
- Updated Prisma schema and migration files to reflect new database structure and relationships.
This commit is contained in:
Max Kowalski
2025-06-27 21:15:44 +02:00
parent 601e2e4026
commit 6f9ac219c2
10 changed files with 747 additions and 198 deletions

View File

@ -1,5 +1,5 @@
// SessionImport to Session processor
import { PrismaClient, ImportStatus, SentimentCategory } from "@prisma/client";
import { PrismaClient, ImportStatus, SentimentCategory, SessionCategory } from "@prisma/client";
import { getSchedulerConfig } from "./env";
import { fetchTranscriptContent, isValidTranscriptUrl } from "./transcriptFetcher";
import cron from "node-cron";
@ -38,8 +38,33 @@ function parseEuropeanDate(dateStr: string): Date {
return date;
}
/**
* Helper function to parse sentiment from raw string (fallback only)
*/
function parseFallbackSentiment(sentimentRaw: string | null): SentimentCategory | null {
if (!sentimentRaw) return null;
const sentimentStr = sentimentRaw.toLowerCase();
if (sentimentStr.includes('positive')) {
return SentimentCategory.POSITIVE;
} else if (sentimentStr.includes('negative')) {
return SentimentCategory.NEGATIVE;
} else {
return SentimentCategory.NEUTRAL;
}
}
/**
* Helper function to parse boolean from raw string (fallback only)
*/
function parseFallbackBoolean(rawValue: string | null): boolean | null {
if (!rawValue) return null;
return ['true', '1', 'yes', 'escalated', 'forwarded'].includes(rawValue.toLowerCase());
}
/**
* Process a single SessionImport record into a Session record
* NEW STRATEGY: Only copy minimal fields, let AI processing handle the rest
*/
async function processSingleImport(importRecord: any): Promise<{ success: boolean; error?: string }> {
try {
@ -49,34 +74,6 @@ async function processSingleImport(importRecord: any): Promise<{ success: boolea
console.log(`[Import Processor] Parsed dates for ${importRecord.externalSessionId}: ${startTime.toISOString()} - ${endTime.toISOString()}`);
// Process sentiment
let sentiment: number | null = null;
let sentimentCategory: SentimentCategory | null = null;
if (importRecord.sentimentRaw) {
const sentimentStr = importRecord.sentimentRaw.toLowerCase();
if (sentimentStr.includes('positive')) {
sentiment = 0.8;
sentimentCategory = SentimentCategory.POSITIVE;
} else if (sentimentStr.includes('negative')) {
sentiment = -0.8;
sentimentCategory = SentimentCategory.NEGATIVE;
} else {
sentiment = 0.0;
sentimentCategory = SentimentCategory.NEUTRAL;
}
}
// Process boolean fields
const escalated = importRecord.escalatedRaw ?
['true', '1', 'yes', 'escalated'].includes(importRecord.escalatedRaw.toLowerCase()) : null;
const forwardedHr = importRecord.forwardedHrRaw ?
['true', '1', 'yes', 'forwarded'].includes(importRecord.forwardedHrRaw.toLowerCase()) : null;
// Keep country code as-is, will be processed by OpenAI later
const country = importRecord.countryCode;
// Fetch transcript content if URL is provided and not already fetched
let transcriptContent = importRecord.rawTranscriptContent;
if (!transcriptContent && importRecord.fullTranscriptUrl && isValidTranscriptUrl(importRecord.fullTranscriptUrl)) {
@ -108,7 +105,8 @@ async function processSingleImport(importRecord: any): Promise<{ success: boolea
}
}
// Create or update Session record
// Create or update Session record with MINIMAL processing
// Only copy fields that don't need AI analysis
const session = await prisma.session.upsert({
where: {
importId: importRecord.id,
@ -116,20 +114,22 @@ async function processSingleImport(importRecord: any): Promise<{ success: boolea
update: {
startTime,
endTime,
// Direct copies (minimal processing)
ipAddress: importRecord.ipAddress,
country,
language: importRecord.language,
messagesSent: importRecord.messagesSent,
sentiment,
sentimentCategory,
escalated,
forwardedHr,
country: importRecord.countryCode, // Keep as country code
fullTranscriptUrl: importRecord.fullTranscriptUrl,
avgResponseTime: importRecord.avgResponseTimeSeconds,
tokens: importRecord.tokens,
tokensEur: importRecord.tokensEur,
category: importRecord.category,
initialMsg: importRecord.initialMessage,
// AI-processed fields: Leave empty, will be filled by AI processing
// language: null, // AI will detect
// messagesSent: null, // AI will count from Messages
// sentiment: null, // AI will analyze
// escalated: null, // AI will detect
// forwardedHr: null, // AI will detect
// category: null, // AI will categorize
// summary: null, // AI will generate
processed: false, // Will be processed later by AI
},
create: {
@ -137,20 +137,15 @@ async function processSingleImport(importRecord: any): Promise<{ success: boolea
importId: importRecord.id,
startTime,
endTime,
// Direct copies (minimal processing)
ipAddress: importRecord.ipAddress,
country,
language: importRecord.language,
messagesSent: importRecord.messagesSent,
sentiment,
sentimentCategory,
escalated,
forwardedHr,
country: importRecord.countryCode, // Keep as country code
fullTranscriptUrl: importRecord.fullTranscriptUrl,
avgResponseTime: importRecord.avgResponseTimeSeconds,
tokens: importRecord.tokens,
tokensEur: importRecord.tokensEur,
category: importRecord.category,
initialMsg: importRecord.initialMessage,
// AI-processed fields: Leave empty, will be filled by AI processing
// All these will be null initially and filled by AI
processed: false, // Will be processed later by AI
},
});

View File

@ -1,6 +1,6 @@
// Session processing scheduler with configurable intervals and batch sizes
// Enhanced session processing scheduler with AI cost tracking and question management
import cron from "node-cron";
import { PrismaClient } from "@prisma/client";
import { PrismaClient, SentimentCategory, SessionCategory } from "@prisma/client";
import fetch from "node-fetch";
import { getSchedulerConfig } from "./schedulerConfig";
@ -8,13 +8,30 @@ const prisma = new PrismaClient();
const OPENAI_API_KEY = process.env.OPENAI_API_KEY;
const OPENAI_API_URL = "https://api.openai.com/v1/chat/completions";
// Model pricing in USD (update as needed)
const MODEL_PRICING = {
'gpt-4o-2024-08-06': {
promptTokenCost: 0.0000025, // $2.50 per 1M tokens
completionTokenCost: 0.00001, // $10.00 per 1M tokens
},
'gpt-4-turbo': {
promptTokenCost: 0.00001, // $10.00 per 1M tokens
completionTokenCost: 0.00003, // $30.00 per 1M tokens
},
'gpt-4o': {
promptTokenCost: 0.000005, // $5.00 per 1M tokens
completionTokenCost: 0.000015, // $15.00 per 1M tokens
}
} as const;
const USD_TO_EUR_RATE = 0.85; // Update periodically or fetch from API
interface ProcessedData {
language: string;
messages_sent: number;
sentiment: "positive" | "neutral" | "negative";
sentiment: "POSITIVE" | "NEUTRAL" | "NEGATIVE";
escalated: boolean;
forwarded_hr: boolean;
category: string;
category: "SCHEDULE_HOURS" | "LEAVE_VACATION" | "SICK_LEAVE_RECOVERY" | "SALARY_COMPENSATION" | "CONTRACT_HOURS" | "ONBOARDING" | "OFFBOARDING" | "WORKWEAR_STAFF_PASS" | "TEAM_CONTACTS" | "PERSONAL_QUESTIONS" | "ACCESS_LOGIN" | "SOCIAL_QUESTIONS" | "UNRECOGNIZED_OTHER";
questions: string[];
summary: string;
session_id: string;
@ -26,6 +43,137 @@ interface ProcessingResult {
error?: string;
}
/**
* Record AI processing request with detailed token tracking
*/
async function recordAIProcessingRequest(
sessionId: string,
openaiResponse: any,
processingType: string = 'session_analysis'
): Promise<void> {
const usage = openaiResponse.usage;
const model = openaiResponse.model;
const pricing = MODEL_PRICING[model as keyof typeof MODEL_PRICING] || MODEL_PRICING['gpt-4-turbo']; // fallback
const promptCost = usage.prompt_tokens * pricing.promptTokenCost;
const completionCost = usage.completion_tokens * pricing.completionTokenCost;
const totalCostUsd = promptCost + completionCost;
const totalCostEur = totalCostUsd * USD_TO_EUR_RATE;
await prisma.aIProcessingRequest.create({
data: {
sessionId,
openaiRequestId: openaiResponse.id,
model: openaiResponse.model,
serviceTier: openaiResponse.service_tier,
systemFingerprint: openaiResponse.system_fingerprint,
promptTokens: usage.prompt_tokens,
completionTokens: usage.completion_tokens,
totalTokens: usage.total_tokens,
// Detailed breakdown
cachedTokens: usage.prompt_tokens_details?.cached_tokens || null,
audioTokensPrompt: usage.prompt_tokens_details?.audio_tokens || null,
reasoningTokens: usage.completion_tokens_details?.reasoning_tokens || null,
audioTokensCompletion: usage.completion_tokens_details?.audio_tokens || null,
acceptedPredictionTokens: usage.completion_tokens_details?.accepted_prediction_tokens || null,
rejectedPredictionTokens: usage.completion_tokens_details?.rejected_prediction_tokens || null,
promptTokenCost: pricing.promptTokenCost,
completionTokenCost: pricing.completionTokenCost,
totalCostEur,
processingType,
success: true,
completedAt: new Date(),
}
});
}
/**
* Record failed AI processing request
*/
async function recordFailedAIProcessingRequest(
sessionId: string,
processingType: string,
errorMessage: string
): Promise<void> {
await prisma.aIProcessingRequest.create({
data: {
sessionId,
model: 'unknown',
promptTokens: 0,
completionTokens: 0,
totalTokens: 0,
promptTokenCost: 0,
completionTokenCost: 0,
totalCostEur: 0,
processingType,
success: false,
errorMessage,
completedAt: new Date(),
}
});
}
/**
* Process questions into separate Question and SessionQuestion tables
*/
async function processQuestions(sessionId: string, questions: string[]): Promise<void> {
// Clear existing questions for this session
await prisma.sessionQuestion.deleteMany({
where: { sessionId }
});
// Process each question
for (let index = 0; index < questions.length; index++) {
const questionText = questions[index];
if (!questionText.trim()) continue; // Skip empty questions
// Find or create question
const question = await prisma.question.upsert({
where: { content: questionText.trim() },
create: { content: questionText.trim() },
update: {}
});
// Link to session
await prisma.sessionQuestion.create({
data: {
sessionId,
questionId: question.id,
order: index
}
});
}
}
/**
* Calculate messagesSent from actual Message records
*/
async function calculateMessagesSent(sessionId: string): Promise<number> {
const userMessageCount = await prisma.message.count({
where: {
sessionId,
role: { in: ['user', 'User'] } // Handle both cases
}
});
return userMessageCount;
}
/**
* Calculate endTime from latest Message timestamp
*/
async function calculateEndTime(sessionId: string, fallbackEndTime: Date): Promise<Date> {
const latestMessage = await prisma.message.findFirst({
where: { sessionId },
orderBy: { timestamp: 'desc' }
});
return latestMessage?.timestamp || fallbackEndTime;
}
/**
* Processes a session transcript using OpenAI API
*/
@ -34,44 +182,32 @@ async function processTranscriptWithOpenAI(sessionId: string, transcript: string
throw new Error("OPENAI_API_KEY environment variable is not set");
}
// Create a system message with instructions
// Updated system message with exact enum values
const systemMessage = `
You are an AI assistant tasked with analyzing chat transcripts.
Extract the following information from the transcript:
1. The primary language used by the user (ISO 639-1 code)
2. Number of messages sent by the user
3. Overall sentiment (positive, neutral, or negative)
4. Whether the conversation was escalated
5. Whether HR contact was mentioned or provided
6. The best-fitting category for the conversation from this list:
- Schedule & Hours
- Leave & Vacation
- Sick Leave & Recovery
- Salary & Compensation
- Contract & Hours
- Onboarding
- Offboarding
- Workwear & Staff Pass
- Team & Contacts
- Personal Questions
- Access & Login
- Social questions
- Unrecognized / Other
7. Up to 5 paraphrased questions asked by the user (in English)
8. A brief summary of the conversation (10-300 characters)
Extract the following information from the transcript and return it in EXACT JSON format:
Return the data in JSON format matching this schema:
{
"language": "ISO 639-1 code",
"messages_sent": number,
"sentiment": "positive|neutral|negative",
"language": "ISO 639-1 code (e.g., 'en', 'nl', 'de')",
"sentiment": "POSITIVE|NEUTRAL|NEGATIVE",
"escalated": boolean,
"forwarded_hr": boolean,
"category": "one of the categories listed above",
"category": "SCHEDULE_HOURS|LEAVE_VACATION|SICK_LEAVE_RECOVERY|SALARY_COMPENSATION|CONTRACT_HOURS|ONBOARDING|OFFBOARDING|WORKWEAR_STAFF_PASS|TEAM_CONTACTS|PERSONAL_QUESTIONS|ACCESS_LOGIN|SOCIAL_QUESTIONS|UNRECOGNIZED_OTHER",
"questions": ["question 1", "question 2", ...],
"summary": "brief summary",
"summary": "brief summary (10-300 chars)",
"session_id": "${sessionId}"
}
Rules:
- language: Primary language used by the user (ISO 639-1 code)
- sentiment: Overall emotional tone of the conversation
- escalated: Was the issue escalated to a supervisor/manager?
- forwarded_hr: Was HR contact mentioned or provided?
- category: Best fitting category for the main topic (use exact enum values above)
- questions: Up to 5 paraphrased user questions (in English)
- summary: Brief conversation summary (10-300 characters)
IMPORTANT: Use EXACT enum values as specified above.
`;
try {
@ -82,7 +218,7 @@ async function processTranscriptWithOpenAI(sessionId: string, transcript: string
Authorization: `Bearer ${OPENAI_API_KEY}`,
},
body: JSON.stringify({
model: "gpt-4-turbo",
model: "gpt-4o", // Use latest model
messages: [
{
role: "system",
@ -103,14 +239,25 @@ async function processTranscriptWithOpenAI(sessionId: string, transcript: string
throw new Error(`OpenAI API error: ${response.status} - ${errorText}`);
}
const data: any = await response.json();
const processedData = JSON.parse(data.choices[0].message.content);
const openaiResponse: any = await response.json();
// Record the AI processing request for cost tracking
await recordAIProcessingRequest(sessionId, openaiResponse, 'session_analysis');
const processedData = JSON.parse(openaiResponse.choices[0].message.content);
// Validate the response against our expected schema
validateOpenAIResponse(processedData);
return processedData;
} catch (error) {
// Record failed request
await recordFailedAIProcessingRequest(
sessionId,
'session_analysis',
error instanceof Error ? error.message : String(error)
);
process.stderr.write(`Error processing transcript with OpenAI: ${error}\n`);
throw error;
}
@ -120,17 +267,9 @@ async function processTranscriptWithOpenAI(sessionId: string, transcript: string
* Validates the OpenAI response against our expected schema
*/
function validateOpenAIResponse(data: any): void {
// Check required fields
const requiredFields = [
"language",
"messages_sent",
"sentiment",
"escalated",
"forwarded_hr",
"category",
"questions",
"summary",
"session_id",
"language", "sentiment", "escalated", "forwarded_hr",
"category", "questions", "summary", "session_id"
];
for (const field of requiredFields) {
@ -139,21 +278,13 @@ function validateOpenAIResponse(data: any): void {
}
}
// Validate field types
// Validate field types and values
if (typeof data.language !== "string" || !/^[a-z]{2}$/.test(data.language)) {
throw new Error(
"Invalid language format. Expected ISO 639-1 code (e.g., 'en')"
);
throw new Error("Invalid language format. Expected ISO 639-1 code (e.g., 'en')");
}
if (typeof data.messages_sent !== "number" || data.messages_sent < 0) {
throw new Error("Invalid messages_sent. Expected non-negative number");
}
if (!["positive", "neutral", "negative"].includes(data.sentiment)) {
throw new Error(
"Invalid sentiment. Expected 'positive', 'neutral', or 'negative'"
);
if (!["POSITIVE", "NEUTRAL", "NEGATIVE"].includes(data.sentiment)) {
throw new Error("Invalid sentiment. Expected 'POSITIVE', 'NEUTRAL', or 'NEGATIVE'");
}
if (typeof data.escalated !== "boolean") {
@ -165,39 +296,22 @@ function validateOpenAIResponse(data: any): void {
}
const validCategories = [
"Schedule & Hours",
"Leave & Vacation",
"Sick Leave & Recovery",
"Salary & Compensation",
"Contract & Hours",
"Onboarding",
"Offboarding",
"Workwear & Staff Pass",
"Team & Contacts",
"Personal Questions",
"Access & Login",
"Social questions",
"Unrecognized / Other",
"SCHEDULE_HOURS", "LEAVE_VACATION", "SICK_LEAVE_RECOVERY", "SALARY_COMPENSATION",
"CONTRACT_HOURS", "ONBOARDING", "OFFBOARDING", "WORKWEAR_STAFF_PASS",
"TEAM_CONTACTS", "PERSONAL_QUESTIONS", "ACCESS_LOGIN", "SOCIAL_QUESTIONS",
"UNRECOGNIZED_OTHER"
];
if (!validCategories.includes(data.category)) {
throw new Error(
`Invalid category. Expected one of: ${validCategories.join(", ")}`
);
throw new Error(`Invalid category. Expected one of: ${validCategories.join(", ")}`);
}
if (!Array.isArray(data.questions)) {
throw new Error("Invalid questions. Expected array of strings");
}
if (
typeof data.summary !== "string" ||
data.summary.length < 10 ||
data.summary.length > 300
) {
throw new Error(
"Invalid summary. Expected string between 10-300 characters"
);
if (typeof data.summary !== "string" || data.summary.length < 10 || data.summary.length > 300) {
throw new Error("Invalid summary. Expected string between 10-300 characters");
}
if (typeof data.session_id !== "string") {
@ -220,45 +334,42 @@ async function processSingleSession(session: any): Promise<ProcessingResult> {
try {
// Convert messages back to transcript format for OpenAI processing
const transcript = session.messages
.map(
(msg: any) =>
`[${new Date(msg.timestamp)
.toLocaleString("en-GB", {
day: "2-digit",
month: "2-digit",
year: "numeric",
hour: "2-digit",
minute: "2-digit",
second: "2-digit",
})
.replace(",", "")}] ${msg.role}: ${msg.content}`
.map((msg: any) =>
`[${new Date(msg.timestamp)
.toLocaleString("en-GB", {
day: "2-digit",
month: "2-digit",
year: "numeric",
hour: "2-digit",
minute: "2-digit",
second: "2-digit",
})
.replace(",", "")}] ${msg.role}: ${msg.content}`
)
.join("\n");
const processedData = await processTranscriptWithOpenAI(
session.id,
transcript
);
const processedData = await processTranscriptWithOpenAI(session.id, transcript);
// Map sentiment string to float value for compatibility with existing data
const sentimentMap = {
positive: 0.8,
neutral: 0.0,
negative: -0.8,
};
// Calculate messagesSent from actual Message records
const messagesSent = await calculateMessagesSent(session.id);
// Calculate endTime from latest Message timestamp
const calculatedEndTime = await calculateEndTime(session.id, session.endTime);
// Process questions into separate tables
await processQuestions(session.id, processedData.questions);
// Update the session with processed data
await prisma.session.update({
where: { id: session.id },
data: {
language: processedData.language,
messagesSent: processedData.messages_sent,
sentiment: sentimentMap[processedData.sentiment] || 0,
sentimentCategory: processedData.sentiment.toUpperCase() as "POSITIVE" | "NEUTRAL" | "NEGATIVE",
messagesSent: messagesSent, // Calculated from Messages, not AI
endTime: calculatedEndTime, // Use calculated endTime if different
sentiment: processedData.sentiment as SentimentCategory,
escalated: processedData.escalated,
forwardedHr: processedData.forwarded_hr,
category: processedData.category,
questions: JSON.stringify(processedData.questions),
category: processedData.category as SessionCategory,
summary: processedData.summary,
processed: true,
},
@ -313,9 +424,7 @@ async function processSessionsInParallel(sessions: any[], maxConcurrency: number
* Process unprocessed sessions
*/
export async function processUnprocessedSessions(batchSize: number | null = null, maxConcurrency: number = 5): Promise<void> {
process.stdout.write(
"[ProcessingScheduler] Starting to process unprocessed sessions...\n"
);
process.stdout.write("[ProcessingScheduler] Starting to process unprocessed sessions...\n");
// Find sessions that have messages but haven't been processed
const queryOptions: any = {
@ -345,9 +454,7 @@ export async function processUnprocessedSessions(batchSize: number | null = null
);
if (sessionsWithMessages.length === 0) {
process.stdout.write(
"[ProcessingScheduler] No sessions found requiring processing.\n"
);
process.stdout.write("[ProcessingScheduler] No sessions found requiring processing.\n");
return;
}
@ -363,15 +470,46 @@ export async function processUnprocessedSessions(batchSize: number | null = null
const errorCount = results.filter((r) => !r.success).length;
process.stdout.write("[ProcessingScheduler] Session processing complete.\n");
process.stdout.write(
`[ProcessingScheduler] Successfully processed: ${successCount} sessions.\n`
);
process.stdout.write(
`[ProcessingScheduler] Failed to process: ${errorCount} sessions.\n`
);
process.stdout.write(
`[ProcessingScheduler] Total processing time: ${((endTime - startTime) / 1000).toFixed(2)}s\n`
);
process.stdout.write(`[ProcessingScheduler] Successfully processed: ${successCount} sessions.\n`);
process.stdout.write(`[ProcessingScheduler] Failed to process: ${errorCount} sessions.\n`);
process.stdout.write(`[ProcessingScheduler] Total processing time: ${((endTime - startTime) / 1000).toFixed(2)}s\n`);
}
/**
* Get total AI processing costs for reporting
*/
export async function getAIProcessingCosts(): Promise<{
totalCostEur: number;
totalTokens: number;
requestCount: number;
successfulRequests: number;
failedRequests: number;
}> {
const result = await prisma.aIProcessingRequest.aggregate({
_sum: {
totalCostEur: true,
totalTokens: true,
},
_count: {
id: true,
},
});
const successfulRequests = await prisma.aIProcessingRequest.count({
where: { success: true }
});
const failedRequests = await prisma.aIProcessingRequest.count({
where: { success: false }
});
return {
totalCostEur: result._sum.totalCostEur || 0,
totalTokens: result._sum.totalTokens || 0,
requestCount: result._count.id || 0,
successfulRequests,
failedRequests,
};
}
/**
@ -396,9 +534,7 @@ export function startProcessingScheduler(): void {
config.sessionProcessing.concurrency
);
} catch (error) {
process.stderr.write(
`[ProcessingScheduler] Error in scheduler: ${error}\n`
);
process.stderr.write(`[ProcessingScheduler] Error in scheduler: ${error}\n`);
}
});
}