feat: Enhance session processing and metrics

- Updated session processing commands in documentation for clarity.
- Removed transcript content fetching from session processing, allowing on-demand retrieval.
- Improved session metrics calculations and added new metrics for dashboard.
- Refactored processing scheduler to handle sessions in parallel with concurrency limits.
- Added manual trigger API for processing unprocessed sessions with admin checks.
- Implemented scripts for fetching and parsing transcripts, checking transcript content, and testing processing status.
- Updated Prisma schema to enforce default values for processed sessions.
- Added error handling and logging improvements throughout the processing workflow.
This commit is contained in:
Max Kowalski
2025-06-26 17:12:42 +02:00
parent 8f3c1e0f7c
commit 8c43a35632
20 changed files with 851 additions and 229 deletions

View File

@ -14,7 +14,7 @@ const envPath = join(__dirname, '..', '.env.local');
try {
const envFile = readFileSync(envPath, 'utf8');
const envVars = envFile.split('\n').filter(line => line.trim() && !line.startsWith('#'));
envVars.forEach(line => {
const [key, ...valueParts] = line.split('=');
if (key && valueParts.length > 0) {
@ -216,24 +216,130 @@ function validateOpenAIResponse(data) {
}
/**
* Process unprocessed sessions
* Process a single session
* @param {Object} session The session to process
* @returns {Promise<Object>} Result object with success/error info
*/
export async function processUnprocessedSessions() {
async function processSingleSession(session) {
if (session.messages.length === 0) {
return {
sessionId: session.id,
success: false,
error: "Session has no messages",
};
}
try {
// Convert messages back to transcript format for OpenAI processing
const transcript = session.messages
.map(
(msg) =>
`[${new Date(msg.timestamp)
.toLocaleString("en-GB", {
day: "2-digit",
month: "2-digit",
year: "numeric",
hour: "2-digit",
minute: "2-digit",
second: "2-digit",
})
.replace(",", "")}] ${msg.role}: ${msg.content}`
)
.join("\n");
const processedData = await processTranscriptWithOpenAI(
session.id,
transcript
);
// Map sentiment string to float value for compatibility with existing data
const sentimentMap = {
positive: 0.8,
neutral: 0.0,
negative: -0.8,
};
// Update the session with processed data
await prisma.session.update({
where: { id: session.id },
data: {
language: processedData.language,
messagesSent: processedData.messages_sent,
sentiment: sentimentMap[processedData.sentiment] || 0,
sentimentCategory: processedData.sentiment,
escalated: processedData.escalated,
forwardedHr: processedData.forwarded_hr,
category: processedData.category,
questions: JSON.stringify(processedData.questions),
summary: processedData.summary,
processed: true,
},
});
return {
sessionId: session.id,
success: true,
};
} catch (error) {
return {
sessionId: session.id,
success: false,
error: error.message,
};
}
}
/**
* Process sessions in parallel with concurrency limit
* @param {Array} sessions Array of sessions to process
* @param {number} maxConcurrency Maximum number of concurrent processing tasks
* @returns {Promise<Object>} Processing results
*/
async function processSessionsInParallel(sessions, maxConcurrency = 5) {
const results = [];
const executing = [];
for (const session of sessions) {
const promise = processSingleSession(session).then((result) => {
process.stdout.write(
result.success
? `[ProcessingScheduler] ✓ Successfully processed session ${result.sessionId}\n`
: `[ProcessingScheduler] ✗ Failed to process session ${result.sessionId}: ${result.error}\n`
);
return result;
});
results.push(promise);
executing.push(promise);
if (executing.length >= maxConcurrency) {
await Promise.race(executing);
executing.splice(
executing.findIndex((p) => p === promise),
1
);
}
}
return Promise.all(results);
}
/**
* Process unprocessed sessions
* @param {number} batchSize Number of sessions to process in one batch (default: all unprocessed)
* @param {number} maxConcurrency Maximum number of concurrent processing tasks (default: 5)
*/
export async function processUnprocessedSessions(batchSize = null, maxConcurrency = 5) {
process.stdout.write(
"[ProcessingScheduler] Starting to process unprocessed sessions...\n"
);
// Find sessions that have messages but haven't been processed
const sessionsToProcess = await prisma.session.findMany({
const queryOptions = {
where: {
AND: [
{ messages: { some: {} } }, // Must have messages
{
OR: [
{ processed: false },
{ processed: null }
]
}
{ processed: false }, // Only unprocessed sessions (no longer checking for null)
],
},
include: {
@ -241,8 +347,14 @@ export async function processUnprocessedSessions() {
orderBy: { order: "asc" },
},
},
take: 10, // Process in batches to avoid overloading the system
});
};
// Add batch size limit if specified
if (batchSize && batchSize > 0) {
queryOptions.take = batchSize;
}
const sessionsToProcess = await prisma.session.findMany(queryOptions);
// Filter to only sessions that have messages
const sessionsWithMessages = sessionsToProcess.filter(
@ -257,80 +369,15 @@ export async function processUnprocessedSessions() {
}
process.stdout.write(
`[ProcessingScheduler] Found ${sessionsWithMessages.length} sessions to process.\n`
`[ProcessingScheduler] Found ${sessionsWithMessages.length} sessions to process (max concurrency: ${maxConcurrency}).\n`
);
let successCount = 0;
let errorCount = 0;
for (const session of sessionsWithMessages) {
if (session.messages.length === 0) {
process.stderr.write(
`[ProcessingScheduler] Session ${session.id} has no messages, skipping.\n`
);
continue;
}
const startTime = Date.now();
const results = await processSessionsInParallel(sessionsWithMessages, maxConcurrency);
const endTime = Date.now();
process.stdout.write(
`[ProcessingScheduler] Processing messages for session ${session.id}...\n`
);
try {
// Convert messages back to transcript format for OpenAI processing
const transcript = session.messages
.map(
(msg) =>
`[${new Date(msg.timestamp)
.toLocaleString("en-GB", {
day: "2-digit",
month: "2-digit",
year: "numeric",
hour: "2-digit",
minute: "2-digit",
second: "2-digit",
})
.replace(",", "")}] ${msg.role}: ${msg.content}`
)
.join("\n");
const processedData = await processTranscriptWithOpenAI(
session.id,
transcript
);
// Map sentiment string to float value for compatibility with existing data
const sentimentMap = {
positive: 0.8,
neutral: 0.0,
negative: -0.8,
};
// Update the session with processed data
await prisma.session.update({
where: { id: session.id },
data: {
language: processedData.language,
messagesSent: processedData.messages_sent,
sentiment: sentimentMap[processedData.sentiment] || 0,
sentimentCategory: processedData.sentiment,
escalated: processedData.escalated,
forwardedHr: processedData.forwarded_hr,
category: processedData.category,
questions: JSON.stringify(processedData.questions),
summary: processedData.summary,
processed: true,
},
});
process.stdout.write(
`[ProcessingScheduler] Successfully processed session ${session.id}.\n`
);
successCount++;
} catch (error) {
process.stderr.write(
`[ProcessingScheduler] Error processing session ${session.id}: ${error}\n`
);
errorCount++;
}
}
const successCount = results.filter((r) => r.success).length;
const errorCount = results.filter((r) => !r.success).length;
process.stdout.write("[ProcessingScheduler] Session processing complete.\n");
process.stdout.write(
@ -339,6 +386,9 @@ export async function processUnprocessedSessions() {
process.stdout.write(
`[ProcessingScheduler] Failed to process: ${errorCount} sessions.\n`
);
process.stdout.write(
`[ProcessingScheduler] Total processing time: ${((endTime - startTime) / 1000).toFixed(2)}s\n`
);
}
/**