From 8c43a356324b46b48fe215368327795f8cd0c801 Mon Sep 17 00:00:00 2001 From: Max Kowalski Date: Thu, 26 Jun 2025 17:12:42 +0200 Subject: [PATCH] feat: Enhance session processing and metrics - Updated session processing commands in documentation for clarity. - Removed transcript content fetching from session processing, allowing on-demand retrieval. - Improved session metrics calculations and added new metrics for dashboard. - Refactored processing scheduler to handle sessions in parallel with concurrency limits. - Added manual trigger API for processing unprocessed sessions with admin checks. - Implemented scripts for fetching and parsing transcripts, checking transcript content, and testing processing status. - Updated Prisma schema to enforce default values for processed sessions. - Added error handling and logging improvements throughout the processing workflow. --- app/dashboard/overview/page.tsx | 10 +- components/DateRangePicker.tsx | 20 +- components/TopQuestionsChart.tsx | 10 +- docs/session-processing.md | 3 +- lib/csvFetcher.js | 12 +- lib/metrics.ts | 14 +- lib/processingScheduler.js | 216 +++++++++++++-------- lib/processingScheduler.ts | 256 +++++++++++++++++-------- lib/types.ts | 2 +- package.json | 2 +- pages/api/admin/refresh-sessions.ts | 12 +- pages/api/admin/trigger-processing.ts | 103 ++++++++++ pages/api/dashboard/metrics.ts | 11 +- prisma/schema.prisma | 2 +- scripts/check-transcript-content.js | 73 +++++++ scripts/fetch-and-parse-transcripts.js | 185 ++++++++++++++++++ scripts/manual-trigger-test.js | 38 ++++ scripts/manual-triggers.js | 16 +- scripts/test-processing-status.js | 75 ++++++++ scripts/trigger-processing-direct.js | 20 ++ 20 files changed, 851 insertions(+), 229 deletions(-) create mode 100644 pages/api/admin/trigger-processing.ts create mode 100644 scripts/check-transcript-content.js create mode 100644 scripts/fetch-and-parse-transcripts.js create mode 100644 scripts/manual-trigger-test.js create mode 100644 scripts/test-processing-status.js create mode 100644 scripts/trigger-processing-direct.js diff --git a/app/dashboard/overview/page.tsx b/app/dashboard/overview/page.tsx index 10e1790..cc1f38d 100644 --- a/app/dashboard/overview/page.tsx +++ b/app/dashboard/overview/page.tsx @@ -41,13 +41,13 @@ function DashboardContent() { if (startDate && endDate) { url += `?startDate=${startDate}&endDate=${endDate}`; } - + const res = await fetch(url); const data = await res.json(); - + setMetrics(data.metrics); setCompany(data.company); - + // Set date range from API response (only on initial load) if (data.dateRange && !dateRange) { setDateRange(data.dateRange); @@ -257,7 +257,7 @@ function DashboardContent() { - + {/* Date Range Picker */} {dateRange && ( )} - +
endDate) { setEndDate(newStartDate); } - + setStartDate(newStartDate); }; @@ -46,12 +46,12 @@ export default function DateRangePicker({ setEndDate(maxDate); return; } - + // Ensure end date is not before start date if (newEndDate < startDate) { setStartDate(newEndDate); } - + setEndDate(newEndDate); }; @@ -64,7 +64,7 @@ export default function DateRangePicker({ const thirtyDaysAgo = new Date(); thirtyDaysAgo.setDate(thirtyDaysAgo.getDate() - 30); const thirtyDaysAgoStr = thirtyDaysAgo.toISOString().split('T')[0]; - + // Use the later of 30 days ago or minDate const newStartDate = thirtyDaysAgoStr > minDate ? thirtyDaysAgoStr : minDate; setStartDate(newStartDate); @@ -75,7 +75,7 @@ export default function DateRangePicker({ const sevenDaysAgo = new Date(); sevenDaysAgo.setDate(sevenDaysAgo.getDate() - 7); const sevenDaysAgoStr = sevenDaysAgo.toISOString().split('T')[0]; - + // Use the later of 7 days ago or minDate const newStartDate = sevenDaysAgoStr > minDate ? sevenDaysAgoStr : minDate; setStartDate(newStartDate); @@ -89,7 +89,7 @@ export default function DateRangePicker({ - +
- +
- +
- +
Available data: {new Date(minDate).toLocaleDateString()} - {new Date(maxDate).toLocaleDateString()}
diff --git a/components/TopQuestionsChart.tsx b/components/TopQuestionsChart.tsx index 716a17e..e312bae 100644 --- a/components/TopQuestionsChart.tsx +++ b/components/TopQuestionsChart.tsx @@ -26,11 +26,11 @@ export default function TopQuestionsChart({ data, title = "Top 5 Asked Questions return (

{title}

- +
{data.map((question, index) => { const percentage = maxCount > 0 ? (question.count / maxCount) * 100 : 0; - + return (
{/* Question text */} @@ -42,7 +42,7 @@ export default function TopQuestionsChart({ data, title = "Top 5 Asked Questions {question.count}
- + {/* Progress bar */}
{index + 1} @@ -59,7 +59,7 @@ export default function TopQuestionsChart({ data, title = "Top 5 Asked Questions ); })}
- + {/* Summary */}
diff --git a/docs/session-processing.md b/docs/session-processing.md index 676ca59..18c424d 100644 --- a/docs/session-processing.md +++ b/docs/session-processing.md @@ -62,7 +62,8 @@ OPENAI_API_KEY=your_api_key_here To run the application with schedulers enabled: -- Development: `npm run dev:with-schedulers` +- Development: `npm run dev` +- Development (with schedulers disabled): `npm run dev:no-schedulers` - Production: `npm run start` Note: These commands will start a custom Next.js server with the schedulers enabled. You'll need to have an OpenAI API key set in your `.env.local` file for the session processing to work. diff --git a/lib/csvFetcher.js b/lib/csvFetcher.js index 675aa82..356de4b 100644 --- a/lib/csvFetcher.js +++ b/lib/csvFetcher.js @@ -561,15 +561,8 @@ export async function fetchAndStoreSessionsForAllCompanies() { ? session.endTime : new Date(); - // Fetch transcript content if URL is available - let transcriptContent = null; - if (session.fullTranscriptUrl) { - transcriptContent = await fetchTranscriptContent( - session.fullTranscriptUrl, - company.csvUsername, - company.csvPassword - ); - } + // Note: transcriptContent field was removed from schema + // Transcript content can be fetched on-demand from fullTranscriptUrl // Check if the session already exists const existingSession = await prisma.session.findUnique({ @@ -608,7 +601,6 @@ export async function fetchAndStoreSessionsForAllCompanies() { ? session.forwardedHr : null, fullTranscriptUrl: session.fullTranscriptUrl || null, - transcriptContent: transcriptContent, // Add the transcript content avgResponseTime: typeof session.avgResponseTime === "number" ? session.avgResponseTime diff --git a/lib/metrics.ts b/lib/metrics.ts index 1d03f73..bdb4826 100644 --- a/lib/metrics.ts +++ b/lib/metrics.ts @@ -349,7 +349,7 @@ export function sessionMetrics( let totalTokensEur = 0; const wordCounts: { [key: string]: number } = {}; let alerts = 0; - + // New metrics variables const hourlySessionCounts: { [hour: string]: number } = {}; let resolvedChatsCount = 0; @@ -530,7 +530,7 @@ export function sessionMetrics( .forEach(msg => { const content = msg.content.trim(); // Simple heuristic: if message ends with ? or contains question words, treat as question - if (content.endsWith('?') || + if (content.endsWith('?') || /\b(what|when|where|why|how|who|which|can|could|would|will|is|are|do|does|did)\b/i.test(content)) { questionCounts[content] = (questionCounts[content] || 0) + 1; } @@ -540,7 +540,7 @@ export function sessionMetrics( // 3. Extract questions from initial message as fallback if (session.initialMsg) { const content = session.initialMsg.trim(); - if (content.endsWith('?') || + if (content.endsWith('?') || /\b(what|when|where|why|how|who|which|can|could|would|will|is|are|do|does|did)\b/i.test(content)) { questionCounts[content] = (questionCounts[content] || 0) + 1; } @@ -611,10 +611,10 @@ export function sessionMetrics( ); // Calculate new metrics - + // 1. Average Daily Costs (euros) const avgDailyCosts = numDaysWithSessions > 0 ? totalTokensEur / numDaysWithSessions : 0; - + // 2. Peak Usage Time let peakUsageTime = "N/A"; if (Object.keys(hourlySessionCounts).length > 0) { @@ -624,7 +624,7 @@ export function sessionMetrics( const endHour = (peakHourNum + 1) % 24; peakUsageTime = `${peakHour}-${endHour.toString().padStart(2, '0')}:00`; } - + // 3. Resolved Chats Percentage const resolvedChatsPercentage = totalSessions > 0 ? (resolvedChatsCount / totalSessions) * 100 : 0; @@ -672,7 +672,7 @@ export function sessionMetrics( lastUpdated: Date.now(), totalSessionDuration, validSessionsForDuration, - + // New metrics avgDailyCosts, peakUsageTime, diff --git a/lib/processingScheduler.js b/lib/processingScheduler.js index e1d84de..0af48a4 100644 --- a/lib/processingScheduler.js +++ b/lib/processingScheduler.js @@ -14,7 +14,7 @@ const envPath = join(__dirname, '..', '.env.local'); try { const envFile = readFileSync(envPath, 'utf8'); const envVars = envFile.split('\n').filter(line => line.trim() && !line.startsWith('#')); - + envVars.forEach(line => { const [key, ...valueParts] = line.split('='); if (key && valueParts.length > 0) { @@ -216,24 +216,130 @@ function validateOpenAIResponse(data) { } /** - * Process unprocessed sessions + * Process a single session + * @param {Object} session The session to process + * @returns {Promise} Result object with success/error info */ -export async function processUnprocessedSessions() { +async function processSingleSession(session) { + if (session.messages.length === 0) { + return { + sessionId: session.id, + success: false, + error: "Session has no messages", + }; + } + + try { + // Convert messages back to transcript format for OpenAI processing + const transcript = session.messages + .map( + (msg) => + `[${new Date(msg.timestamp) + .toLocaleString("en-GB", { + day: "2-digit", + month: "2-digit", + year: "numeric", + hour: "2-digit", + minute: "2-digit", + second: "2-digit", + }) + .replace(",", "")}] ${msg.role}: ${msg.content}` + ) + .join("\n"); + + const processedData = await processTranscriptWithOpenAI( + session.id, + transcript + ); + + // Map sentiment string to float value for compatibility with existing data + const sentimentMap = { + positive: 0.8, + neutral: 0.0, + negative: -0.8, + }; + + // Update the session with processed data + await prisma.session.update({ + where: { id: session.id }, + data: { + language: processedData.language, + messagesSent: processedData.messages_sent, + sentiment: sentimentMap[processedData.sentiment] || 0, + sentimentCategory: processedData.sentiment, + escalated: processedData.escalated, + forwardedHr: processedData.forwarded_hr, + category: processedData.category, + questions: JSON.stringify(processedData.questions), + summary: processedData.summary, + processed: true, + }, + }); + + return { + sessionId: session.id, + success: true, + }; + } catch (error) { + return { + sessionId: session.id, + success: false, + error: error.message, + }; + } +} + +/** + * Process sessions in parallel with concurrency limit + * @param {Array} sessions Array of sessions to process + * @param {number} maxConcurrency Maximum number of concurrent processing tasks + * @returns {Promise} Processing results + */ +async function processSessionsInParallel(sessions, maxConcurrency = 5) { + const results = []; + const executing = []; + + for (const session of sessions) { + const promise = processSingleSession(session).then((result) => { + process.stdout.write( + result.success + ? `[ProcessingScheduler] āœ“ Successfully processed session ${result.sessionId}\n` + : `[ProcessingScheduler] āœ— Failed to process session ${result.sessionId}: ${result.error}\n` + ); + return result; + }); + + results.push(promise); + executing.push(promise); + + if (executing.length >= maxConcurrency) { + await Promise.race(executing); + executing.splice( + executing.findIndex((p) => p === promise), + 1 + ); + } + } + + return Promise.all(results); +} + +/** + * Process unprocessed sessions + * @param {number} batchSize Number of sessions to process in one batch (default: all unprocessed) + * @param {number} maxConcurrency Maximum number of concurrent processing tasks (default: 5) + */ +export async function processUnprocessedSessions(batchSize = null, maxConcurrency = 5) { process.stdout.write( "[ProcessingScheduler] Starting to process unprocessed sessions...\n" ); // Find sessions that have messages but haven't been processed - const sessionsToProcess = await prisma.session.findMany({ + const queryOptions = { where: { AND: [ { messages: { some: {} } }, // Must have messages - { - OR: [ - { processed: false }, - { processed: null } - ] - } + { processed: false }, // Only unprocessed sessions (no longer checking for null) ], }, include: { @@ -241,8 +347,14 @@ export async function processUnprocessedSessions() { orderBy: { order: "asc" }, }, }, - take: 10, // Process in batches to avoid overloading the system - }); + }; + + // Add batch size limit if specified + if (batchSize && batchSize > 0) { + queryOptions.take = batchSize; + } + + const sessionsToProcess = await prisma.session.findMany(queryOptions); // Filter to only sessions that have messages const sessionsWithMessages = sessionsToProcess.filter( @@ -257,80 +369,15 @@ export async function processUnprocessedSessions() { } process.stdout.write( - `[ProcessingScheduler] Found ${sessionsWithMessages.length} sessions to process.\n` + `[ProcessingScheduler] Found ${sessionsWithMessages.length} sessions to process (max concurrency: ${maxConcurrency}).\n` ); - let successCount = 0; - let errorCount = 0; - for (const session of sessionsWithMessages) { - if (session.messages.length === 0) { - process.stderr.write( - `[ProcessingScheduler] Session ${session.id} has no messages, skipping.\n` - ); - continue; - } + const startTime = Date.now(); + const results = await processSessionsInParallel(sessionsWithMessages, maxConcurrency); + const endTime = Date.now(); - process.stdout.write( - `[ProcessingScheduler] Processing messages for session ${session.id}...\n` - ); - try { - // Convert messages back to transcript format for OpenAI processing - const transcript = session.messages - .map( - (msg) => - `[${new Date(msg.timestamp) - .toLocaleString("en-GB", { - day: "2-digit", - month: "2-digit", - year: "numeric", - hour: "2-digit", - minute: "2-digit", - second: "2-digit", - }) - .replace(",", "")}] ${msg.role}: ${msg.content}` - ) - .join("\n"); - - const processedData = await processTranscriptWithOpenAI( - session.id, - transcript - ); - - // Map sentiment string to float value for compatibility with existing data - const sentimentMap = { - positive: 0.8, - neutral: 0.0, - negative: -0.8, - }; - - // Update the session with processed data - await prisma.session.update({ - where: { id: session.id }, - data: { - language: processedData.language, - messagesSent: processedData.messages_sent, - sentiment: sentimentMap[processedData.sentiment] || 0, - sentimentCategory: processedData.sentiment, - escalated: processedData.escalated, - forwardedHr: processedData.forwarded_hr, - category: processedData.category, - questions: JSON.stringify(processedData.questions), - summary: processedData.summary, - processed: true, - }, - }); - - process.stdout.write( - `[ProcessingScheduler] Successfully processed session ${session.id}.\n` - ); - successCount++; - } catch (error) { - process.stderr.write( - `[ProcessingScheduler] Error processing session ${session.id}: ${error}\n` - ); - errorCount++; - } - } + const successCount = results.filter((r) => r.success).length; + const errorCount = results.filter((r) => !r.success).length; process.stdout.write("[ProcessingScheduler] Session processing complete.\n"); process.stdout.write( @@ -339,6 +386,9 @@ export async function processUnprocessedSessions() { process.stdout.write( `[ProcessingScheduler] Failed to process: ${errorCount} sessions.\n` ); + process.stdout.write( + `[ProcessingScheduler] Total processing time: ${((endTime - startTime) / 1000).toFixed(2)}s\n` + ); } /** diff --git a/lib/processingScheduler.ts b/lib/processingScheduler.ts index 7e1128e..425c66d 100644 --- a/lib/processingScheduler.ts +++ b/lib/processingScheduler.ts @@ -1,14 +1,38 @@ -// node-cron job to process unprocessed sessions every hour +// Session processing scheduler - TypeScript version import cron from "node-cron"; import { PrismaClient } from "@prisma/client"; import fetch from "node-fetch"; +import { readFileSync } from "fs"; +import { fileURLToPath } from "url"; +import { dirname, join } from "path"; + +// Load environment variables from .env.local +const __filename = fileURLToPath(import.meta.url); +const __dirname = dirname(__filename); +const envPath = join(__dirname, '..', '.env.local'); + +try { + const envFile = readFileSync(envPath, 'utf8'); + const envVars = envFile.split('\n').filter(line => line.trim() && !line.startsWith('#')); + + envVars.forEach(line => { + const [key, ...valueParts] = line.split('='); + if (key && valueParts.length > 0) { + const value = valueParts.join('=').trim(); + if (!process.env[key.trim()]) { + process.env[key.trim()] = value; + } + } + }); +} catch (error) { + // Silently fail if .env.local doesn't exist +} const prisma = new PrismaClient(); const OPENAI_API_KEY = process.env.OPENAI_API_KEY; const OPENAI_API_URL = "https://api.openai.com/v1/chat/completions"; -// Define the expected response structure from OpenAI -interface OpenAIProcessedData { +interface ProcessedData { language: string; messages_sent: number; sentiment: "positive" | "neutral" | "negative"; @@ -20,16 +44,16 @@ interface OpenAIProcessedData { session_id: string; } +interface ProcessingResult { + sessionId: string; + success: boolean; + error?: string; +} + /** * Processes a session transcript using OpenAI API - * @param sessionId The session ID - * @param transcript The transcript content to process - * @returns Processed data from OpenAI */ -async function processTranscriptWithOpenAI( - sessionId: string, - transcript: string -): Promise { +async function processTranscriptWithOpenAI(sessionId: string, transcript: string): Promise { if (!OPENAI_API_KEY) { throw new Error("OPENAI_API_KEY environment variable is not set"); } @@ -103,7 +127,7 @@ async function processTranscriptWithOpenAI( throw new Error(`OpenAI API error: ${response.status} - ${errorText}`); } - const data = (await response.json()) as any; + const data: any = await response.json(); const processedData = JSON.parse(data.choices[0].message.content); // Validate the response against our expected schema @@ -118,11 +142,8 @@ async function processTranscriptWithOpenAI( /** * Validates the OpenAI response against our expected schema - * @param data The data to validate */ -function validateOpenAIResponse( - data: any -): asserts data is OpenAIProcessedData { +function validateOpenAIResponse(data: any): void { // Check required fields const requiredFields = [ "language", @@ -208,31 +229,146 @@ function validateOpenAIResponse( } } +/** + * Process a single session + */ +async function processSingleSession(session: any): Promise { + if (session.messages.length === 0) { + return { + sessionId: session.id, + success: false, + error: "Session has no messages", + }; + } + + try { + // Convert messages back to transcript format for OpenAI processing + const transcript = session.messages + .map( + (msg: any) => + `[${new Date(msg.timestamp) + .toLocaleString("en-GB", { + day: "2-digit", + month: "2-digit", + year: "numeric", + hour: "2-digit", + minute: "2-digit", + second: "2-digit", + }) + .replace(",", "")}] ${msg.role}: ${msg.content}` + ) + .join("\n"); + + const processedData = await processTranscriptWithOpenAI( + session.id, + transcript + ); + + // Map sentiment string to float value for compatibility with existing data + const sentimentMap = { + positive: 0.8, + neutral: 0.0, + negative: -0.8, + }; + + // Update the session with processed data + await prisma.session.update({ + where: { id: session.id }, + data: { + language: processedData.language, + messagesSent: processedData.messages_sent, + sentiment: sentimentMap[processedData.sentiment] || 0, + sentimentCategory: processedData.sentiment, + escalated: processedData.escalated, + forwardedHr: processedData.forwarded_hr, + category: processedData.category, + questions: JSON.stringify(processedData.questions), + summary: processedData.summary, + processed: true, + }, + }); + + return { + sessionId: session.id, + success: true, + }; + } catch (error) { + return { + sessionId: session.id, + success: false, + error: error instanceof Error ? error.message : String(error), + }; + } +} + +/** + * Process sessions in parallel with concurrency limit + */ +async function processSessionsInParallel(sessions: any[], maxConcurrency: number = 5): Promise { + const results: Promise[] = []; + const executing: Promise[] = []; + + for (const session of sessions) { + const promise = processSingleSession(session).then((result) => { + process.stdout.write( + result.success + ? `[ProcessingScheduler] āœ“ Successfully processed session ${result.sessionId}\n` + : `[ProcessingScheduler] āœ— Failed to process session ${result.sessionId}: ${result.error}\n` + ); + return result; + }); + + results.push(promise); + executing.push(promise); + + if (executing.length >= maxConcurrency) { + await Promise.race(executing); + const completedIndex = executing.findIndex(p => p === promise); + if (completedIndex !== -1) { + executing.splice(completedIndex, 1); + } + } + } + + return Promise.all(results); +} + /** * Process unprocessed sessions */ -async function processUnprocessedSessions() { +export async function processUnprocessedSessions(batchSize: number | null = null, maxConcurrency: number = 5): Promise { process.stdout.write( "[ProcessingScheduler] Starting to process unprocessed sessions...\n" ); - // Find sessions that have transcript content but haven't been processed - const sessionsToProcess = await prisma.session.findMany({ + // Find sessions that have messages but haven't been processed + const queryOptions: any = { where: { AND: [ - { transcriptContent: { not: null } }, - { transcriptContent: { not: "" } }, - { processed: { not: true } }, // Either false or null + { messages: { some: {} } }, // Must have messages + { processed: false }, // Only unprocessed sessions ], }, - select: { - id: true, - transcriptContent: true, + include: { + messages: { + orderBy: { order: "asc" }, + }, }, - take: 10, // Process in batches to avoid overloading the system - }); + }; - if (sessionsToProcess.length === 0) { + // Add batch size limit if specified + if (batchSize && batchSize > 0) { + queryOptions.take = batchSize; + } + + const sessionsToProcess = await prisma.session.findMany(queryOptions); + + // Filter to only sessions that have messages + const sessionsWithMessages = sessionsToProcess.filter( + (session: any) => session.messages && session.messages.length > 0 + ); + + if (sessionsWithMessages.length === 0) { process.stdout.write( "[ProcessingScheduler] No sessions found requiring processing.\n" ); @@ -240,64 +376,15 @@ async function processUnprocessedSessions() { } process.stdout.write( - `[ProcessingScheduler] Found ${sessionsToProcess.length} sessions to process.\n` + `[ProcessingScheduler] Found ${sessionsWithMessages.length} sessions to process (max concurrency: ${maxConcurrency}).\n` ); - let successCount = 0; - let errorCount = 0; - for (const session of sessionsToProcess) { - if (!session.transcriptContent) { - // Should not happen due to query, but good for type safety - process.stderr.write( - `[ProcessingScheduler] Session ${session.id} has no transcript content, skipping.\n` - ); - continue; - } + const startTime = Date.now(); + const results = await processSessionsInParallel(sessionsWithMessages, maxConcurrency); + const endTime = Date.now(); - process.stdout.write( - `[ProcessingScheduler] Processing transcript for session ${session.id}...\n` - ); - try { - const processedData = await processTranscriptWithOpenAI( - session.id, - session.transcriptContent - ); - - // Map sentiment string to float value for compatibility with existing data - const sentimentMap: Record = { - positive: 0.8, - neutral: 0.0, - negative: -0.8, - }; - - // Update the session with processed data - await prisma.session.update({ - where: { id: session.id }, - data: { - language: processedData.language, - messagesSent: processedData.messages_sent, - sentiment: sentimentMap[processedData.sentiment] || 0, - sentimentCategory: processedData.sentiment, - escalated: processedData.escalated, - forwardedHr: processedData.forwarded_hr, - category: processedData.category, - questions: JSON.stringify(processedData.questions), - summary: processedData.summary, - processed: true, - }, - }); - - process.stdout.write( - `[ProcessingScheduler] Successfully processed session ${session.id}.\n` - ); - successCount++; - } catch (error) { - process.stderr.write( - `[ProcessingScheduler] Error processing session ${session.id}: ${error}\n` - ); - errorCount++; - } - } + const successCount = results.filter((r) => r.success).length; + const errorCount = results.filter((r) => !r.success).length; process.stdout.write("[ProcessingScheduler] Session processing complete.\n"); process.stdout.write( @@ -306,12 +393,15 @@ async function processUnprocessedSessions() { process.stdout.write( `[ProcessingScheduler] Failed to process: ${errorCount} sessions.\n` ); + process.stdout.write( + `[ProcessingScheduler] Total processing time: ${((endTime - startTime) / 1000).toFixed(2)}s\n` + ); } /** * Start the processing scheduler */ -export function startProcessingScheduler() { +export function startProcessingScheduler(): void { // Process unprocessed sessions every hour cron.schedule("0 * * * *", async () => { try { diff --git a/lib/types.ts b/lib/types.ts index 1800191..aa4a2f2 100644 --- a/lib/types.ts +++ b/lib/types.ts @@ -157,7 +157,7 @@ export interface MetricsResult { usersTrend?: number; // e.g., percentage change in uniqueUsers avgSessionTimeTrend?: number; // e.g., percentage change in avgSessionLength avgResponseTimeTrend?: number; // e.g., percentage change in avgResponseTime - + // New metrics for enhanced dashboard avgDailyCosts?: number; // Average daily costs in euros peakUsageTime?: string; // Peak usage time (e.g., "14:00-15:00") diff --git a/package.json b/package.json index 1a761c6..3328dab 100644 --- a/package.json +++ b/package.json @@ -14,7 +14,7 @@ "prisma:generate": "prisma generate", "prisma:migrate": "prisma migrate dev", "prisma:seed": "node prisma/seed.mjs", - "prisma:push": "prisma db push", + "prisma:push": "prisma db push", "prisma:studio": "prisma studio", "start": "node server.mjs", "lint:md": "markdownlint-cli2 \"**/*.md\" \"!.trunk/**\" \"!.venv/**\" \"!node_modules/**\"", diff --git a/pages/api/admin/refresh-sessions.ts b/pages/api/admin/refresh-sessions.ts index 8e072aa..ad0184f 100644 --- a/pages/api/admin/refresh-sessions.ts +++ b/pages/api/admin/refresh-sessions.ts @@ -119,15 +119,8 @@ export default async function handler( ? session.endTime : new Date(); - // Fetch transcript content if URL is available - let transcriptContent: string | null = null; - if (session.fullTranscriptUrl) { - transcriptContent = await fetchTranscriptContent( - session.fullTranscriptUrl, - company.csvUsername as string | undefined, - company.csvPassword as string | undefined - ); - } + // Note: transcriptContent field was removed from schema + // Transcript content can be fetched on-demand from fullTranscriptUrl // Check if the session already exists const existingSession = await prisma.session.findUnique({ @@ -160,7 +153,6 @@ export default async function handler( ? session.forwardedHr : null, fullTranscriptUrl: session.fullTranscriptUrl || null, - transcriptContent: transcriptContent, // Add the transcript content avgResponseTime: typeof session.avgResponseTime === "number" ? session.avgResponseTime diff --git a/pages/api/admin/trigger-processing.ts b/pages/api/admin/trigger-processing.ts new file mode 100644 index 0000000..d1148a2 --- /dev/null +++ b/pages/api/admin/trigger-processing.ts @@ -0,0 +1,103 @@ +import { NextApiRequest, NextApiResponse } from "next"; +import { getServerSession } from "next-auth"; +import { authOptions } from "../auth/[...nextauth]"; +import { prisma } from "../../../lib/prisma"; +import { processUnprocessedSessions } from "../../../lib/processingScheduler"; + +interface SessionUser { + email: string; + name?: string; +} + +interface SessionData { + user: SessionUser; +} + +export default async function handler( + req: NextApiRequest, + res: NextApiResponse +) { + if (req.method !== "POST") { + return res.status(405).json({ error: "Method not allowed" }); + } + + const session = (await getServerSession( + req, + res, + authOptions + )) as SessionData | null; + + if (!session?.user) { + return res.status(401).json({ error: "Not logged in" }); + } + + const user = await prisma.user.findUnique({ + where: { email: session.user.email }, + include: { company: true }, + }); + + if (!user) { + return res.status(401).json({ error: "No user found" }); + } + + // Check if user has admin role + if (user.role !== "admin") { + return res.status(403).json({ error: "Admin access required" }); + } + + try { + // Get optional parameters from request body + const { batchSize, maxConcurrency } = req.body; + + // Validate parameters + const validatedBatchSize = batchSize && batchSize > 0 ? parseInt(batchSize) : null; + const validatedMaxConcurrency = maxConcurrency && maxConcurrency > 0 ? parseInt(maxConcurrency) : 5; + + // Check how many unprocessed sessions exist + const unprocessedCount = await prisma.session.count({ + where: { + companyId: user.companyId, + processed: false, + messages: { some: {} }, // Must have messages + }, + }); + + if (unprocessedCount === 0) { + return res.json({ + success: true, + message: "No unprocessed sessions found", + unprocessedCount: 0, + processedCount: 0, + }); + } + + // Start processing (this will run asynchronously) + const startTime = Date.now(); + + // Note: We're calling the function but not awaiting it to avoid timeout + // The processing will continue in the background + processUnprocessedSessions(validatedBatchSize, validatedMaxConcurrency) + .then(() => { + console.log(`[Manual Trigger] Processing completed for company ${user.companyId}`); + }) + .catch((error) => { + console.error(`[Manual Trigger] Processing failed for company ${user.companyId}:`, error); + }); + + return res.json({ + success: true, + message: `Started processing ${unprocessedCount} unprocessed sessions`, + unprocessedCount, + batchSize: validatedBatchSize || unprocessedCount, + maxConcurrency: validatedMaxConcurrency, + startedAt: new Date().toISOString(), + }); + + } catch (error) { + console.error("[Manual Trigger] Error:", error); + return res.status(500).json({ + error: "Failed to trigger processing", + details: error instanceof Error ? error.message : String(error), + }); + } +} diff --git a/pages/api/dashboard/metrics.ts b/pages/api/dashboard/metrics.ts index e4a27a5..f73a4e5 100644 --- a/pages/api/dashboard/metrics.ts +++ b/pages/api/dashboard/metrics.ts @@ -35,10 +35,13 @@ export default async function handler( // Get date range from query parameters const { startDate, endDate } = req.query; - - // Build where clause with optional date filtering - const whereClause: any = { companyId: user.companyId }; - + + // Build where clause with optional date filtering and only processed sessions + const whereClause: any = { + companyId: user.companyId, + processed: true, // Only show processed sessions in dashboard + }; + if (startDate && endDate) { whereClause.startTime = { gte: new Date(startDate as string), diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 0f3009a..cd31e47 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -53,7 +53,7 @@ model Session { tokensEur Float? category String? initialMsg String? - processed Boolean? // Flag for post-processing status + processed Boolean @default(false) // Flag for post-processing status questions String? // JSON array of questions asked by user summary String? // Brief summary of the conversation messages Message[] // Relation to parsed messages diff --git a/scripts/check-transcript-content.js b/scripts/check-transcript-content.js new file mode 100644 index 0000000..5adc8ea --- /dev/null +++ b/scripts/check-transcript-content.js @@ -0,0 +1,73 @@ +// Script to check what's in the transcript files +// Usage: node scripts/check-transcript-content.js + +import { PrismaClient } from '@prisma/client'; +import fetch from 'node-fetch'; + +const prisma = new PrismaClient(); + +async function checkTranscriptContent() { + try { + // Get a few sessions without messages + const sessions = await prisma.session.findMany({ + where: { + AND: [ + { fullTranscriptUrl: { not: null } }, + { messages: { none: {} } }, + ] + }, + include: { company: true }, + take: 3, + }); + + for (const session of sessions) { + console.log(`\nšŸ“„ Checking session ${session.id}:`); + console.log(` URL: ${session.fullTranscriptUrl}`); + + try { + const authHeader = session.company.csvUsername && session.company.csvPassword + ? "Basic " + Buffer.from(`${session.company.csvUsername}:${session.company.csvPassword}`).toString("base64") + : undefined; + + const response = await fetch(session.fullTranscriptUrl, { + headers: authHeader ? { Authorization: authHeader } : {}, + timeout: 10000, + }); + + if (!response.ok) { + console.log(` āŒ HTTP ${response.status}: ${response.statusText}`); + continue; + } + + const content = await response.text(); + console.log(` šŸ“ Content length: ${content.length} characters`); + + if (content.length === 0) { + console.log(` āš ļø Empty file`); + } else if (content.length < 100) { + console.log(` šŸ“ Full content: "${content}"`); + } else { + console.log(` šŸ“ First 200 chars: "${content.substring(0, 200)}..."`); + } + + // Check if it matches our expected format + const lines = content.split('\n').filter(line => line.trim()); + const formatMatches = lines.filter(line => + line.match(/^\[([^\]]+)\]\s*([^:]+):\s*(.+)$/) + ); + + console.log(` šŸ” Lines total: ${lines.length}, Format matches: ${formatMatches.length}`); + + } catch (error) { + console.log(` āŒ Error: ${error.message}`); + } + } + + } catch (error) { + console.error('āŒ Error:', error); + } finally { + await prisma.$disconnect(); + } +} + +checkTranscriptContent(); diff --git a/scripts/fetch-and-parse-transcripts.js b/scripts/fetch-and-parse-transcripts.js new file mode 100644 index 0000000..e376edd --- /dev/null +++ b/scripts/fetch-and-parse-transcripts.js @@ -0,0 +1,185 @@ +// Script to fetch transcripts and parse them into messages +// Usage: node scripts/fetch-and-parse-transcripts.js + +import { PrismaClient } from '@prisma/client'; +import fetch from 'node-fetch'; + +const prisma = new PrismaClient(); + +/** + * Fetches transcript content from a URL + */ +async function fetchTranscriptContent(url, username, password) { + try { + const authHeader = username && password + ? "Basic " + Buffer.from(`${username}:${password}`).toString("base64") + : undefined; + + const response = await fetch(url, { + headers: authHeader ? { Authorization: authHeader } : {}, + timeout: 10000, + }); + + if (!response.ok) { + console.log(`āŒ Failed to fetch ${url}: ${response.status} ${response.statusText}`); + return null; + } + return await response.text(); + } catch (error) { + console.log(`āŒ Error fetching ${url}: ${error.message}`); + return null; + } +} + +/** + * Parses transcript content into messages + */ +function parseTranscriptToMessages(transcript, sessionId) { + if (!transcript || transcript.trim() === '') { + return []; + } + + const lines = transcript.split('\n').filter(line => line.trim()); + const messages = []; + let messageOrder = 0; + let currentTimestamp = new Date(); + + for (const line of lines) { + // Try format 1: [DD-MM-YYYY HH:MM:SS] Role: Content + const timestampMatch = line.match(/^\[([^\]]+)\]\s*([^:]+):\s*(.+)$/); + + if (timestampMatch) { + const [, timestamp, role, content] = timestampMatch; + + // Parse timestamp (DD-MM-YYYY HH:MM:SS) + const dateMatch = timestamp.match(/^(\d{1,2})-(\d{1,2})-(\d{4}) (\d{1,2}):(\d{1,2}):(\d{1,2})$/); + let parsedTimestamp = new Date(); + + if (dateMatch) { + const [, day, month, year, hour, minute, second] = dateMatch; + parsedTimestamp = new Date( + parseInt(year), + parseInt(month) - 1, // Month is 0-indexed + parseInt(day), + parseInt(hour), + parseInt(minute), + parseInt(second) + ); + } + + messages.push({ + sessionId, + role: role.trim().toLowerCase(), + content: content.trim(), + timestamp: parsedTimestamp, + order: messageOrder++, + }); + continue; + } + + // Try format 2: Role: Content (simple format) + const simpleMatch = line.match(/^([^:]+):\s*(.+)$/); + + if (simpleMatch) { + const [, role, content] = simpleMatch; + + // Use incremental timestamps (add 1 minute per message) + currentTimestamp = new Date(currentTimestamp.getTime() + 60000); + + messages.push({ + sessionId, + role: role.trim().toLowerCase(), + content: content.trim(), + timestamp: new Date(currentTimestamp), + order: messageOrder++, + }); + } + } + + return messages; +} + +/** + * Process sessions without messages + */ +async function fetchAndParseTranscripts() { + try { + console.log('šŸ” Finding sessions without messages...\n'); + + // Get sessions that have fullTranscriptUrl but no messages + const sessionsWithoutMessages = await prisma.session.findMany({ + where: { + AND: [ + { fullTranscriptUrl: { not: null } }, + { messages: { none: {} } }, // No messages + ] + }, + include: { + company: true, + }, + take: 20, // Process 20 at a time to avoid overwhelming + }); + + if (sessionsWithoutMessages.length === 0) { + console.log('āœ… All sessions with transcript URLs already have messages!'); + return; + } + + console.log(`šŸ“„ Found ${sessionsWithoutMessages.length} sessions to process\n`); + + let successCount = 0; + let errorCount = 0; + + for (const session of sessionsWithoutMessages) { + console.log(`šŸ“„ Processing session ${session.id.substring(0, 8)}...`); + + try { + // Fetch transcript content + const transcriptContent = await fetchTranscriptContent( + session.fullTranscriptUrl, + session.company.csvUsername, + session.company.csvPassword + ); + + if (!transcriptContent) { + console.log(` āš ļø No transcript content available`); + errorCount++; + continue; + } + + // Parse transcript into messages + const messages = parseTranscriptToMessages(transcriptContent, session.id); + + if (messages.length === 0) { + console.log(` āš ļø No messages found in transcript`); + errorCount++; + continue; + } + + // Save messages to database + await prisma.message.createMany({ + data: messages, + }); + + console.log(` āœ… Added ${messages.length} messages`); + successCount++; + + } catch (error) { + console.log(` āŒ Error: ${error.message}`); + errorCount++; + } + } + + console.log(`\nšŸ“Š Results:`); + console.log(` āœ… Successfully processed: ${successCount} sessions`); + console.log(` āŒ Failed to process: ${errorCount} sessions`); + console.log(`\nšŸ’” Now you can run the processing scheduler to analyze these sessions!`); + + } catch (error) { + console.error('āŒ Error:', error); + } finally { + await prisma.$disconnect(); + } +} + +fetchAndParseTranscripts(); diff --git a/scripts/manual-trigger-test.js b/scripts/manual-trigger-test.js new file mode 100644 index 0000000..dcad113 --- /dev/null +++ b/scripts/manual-trigger-test.js @@ -0,0 +1,38 @@ +// Simple script to test the manual processing trigger +// Usage: node scripts/manual-trigger-test.js + +import fetch from 'node-fetch'; + +async function testManualTrigger() { + try { + console.log('Testing manual processing trigger...'); + + const response = await fetch('http://localhost:3000/api/admin/trigger-processing', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + // Note: In a real scenario, you'd need to include authentication cookies + // For testing, you might need to login first and copy the session cookie + }, + body: JSON.stringify({ + batchSize: 5, // Process max 5 sessions + maxConcurrency: 3 // Use 3 concurrent workers + }) + }); + + const result = await response.json(); + + if (response.ok) { + console.log('āœ… Manual trigger successful:'); + console.log(JSON.stringify(result, null, 2)); + } else { + console.log('āŒ Manual trigger failed:'); + console.log(JSON.stringify(result, null, 2)); + } + + } catch (error) { + console.error('āŒ Error testing manual trigger:', error.message); + } +} + +testManualTrigger(); diff --git a/scripts/manual-triggers.js b/scripts/manual-triggers.js index f1457d0..fb032d4 100644 --- a/scripts/manual-triggers.js +++ b/scripts/manual-triggers.js @@ -15,7 +15,7 @@ const envPath = join(__dirname, '..', '.env.local'); try { const envFile = readFileSync(envPath, 'utf8'); const envVars = envFile.split('\n').filter(line => line.trim() && !line.startsWith('#')); - + envVars.forEach(line => { const [key, ...valueParts] = line.split('='); if (key && valueParts.length > 0) { @@ -25,7 +25,7 @@ try { } } }); - + console.log("āœ… Environment variables loaded from .env.local"); } catch (error) { console.warn("āš ļø Could not load .env.local file:", error.message); @@ -64,7 +64,7 @@ async function triggerProcessingScheduler() { where: { AND: [ { messages: { some: {} } }, - { + { OR: [ { processed: false }, { processed: null } @@ -128,7 +128,7 @@ async function showProcessingStatus() { where: { processed: true }, }); const unprocessedSessions = await prisma.session.count({ - where: { + where: { OR: [ { processed: false }, { processed: null } @@ -145,8 +145,8 @@ async function showProcessingStatus() { const readyForProcessing = await prisma.session.count({ where: { AND: [ - { messages: { some: {} } }, - { + { messages: { some: {} } }, + { OR: [ { processed: false }, { processed: null } @@ -168,8 +168,8 @@ async function showProcessingStatus() { const samples = await prisma.session.findMany({ where: { AND: [ - { messages: { some: {} } }, - { + { messages: { some: {} } }, + { OR: [ { processed: false }, { processed: null } diff --git a/scripts/test-processing-status.js b/scripts/test-processing-status.js new file mode 100644 index 0000000..3802c6d --- /dev/null +++ b/scripts/test-processing-status.js @@ -0,0 +1,75 @@ +// Script to check processing status and trigger processing +// Usage: node scripts/test-processing-status.js + +import { PrismaClient } from '@prisma/client'; + +const prisma = new PrismaClient(); + +async function checkProcessingStatus() { + try { + console.log('šŸ” Checking processing status...\n'); + + // Get processing status + const totalSessions = await prisma.session.count(); + const processedSessions = await prisma.session.count({ + where: { processed: true } + }); + const unprocessedSessions = await prisma.session.count({ + where: { processed: false } + }); + const sessionsWithMessages = await prisma.session.count({ + where: { + processed: false, + messages: { some: {} } + } + }); + + console.log('šŸ“Š Processing Status:'); + console.log(` Total sessions: ${totalSessions}`); + console.log(` āœ… Processed: ${processedSessions}`); + console.log(` ā³ Unprocessed: ${unprocessedSessions}`); + console.log(` šŸ“ Unprocessed with messages: ${sessionsWithMessages}`); + + const processedPercentage = ((processedSessions / totalSessions) * 100).toFixed(1); + console.log(` šŸ“ˆ Processing progress: ${processedPercentage}%\n`); + + // Check recent processing activity + const recentlyProcessed = await prisma.session.findMany({ + where: { + processed: true, + createdAt: { + gte: new Date(Date.now() - 60 * 60 * 1000) // Last hour + } + }, + orderBy: { createdAt: 'desc' }, + take: 5, + select: { + id: true, + createdAt: true, + category: true, + sentiment: true + } + }); + + if (recentlyProcessed.length > 0) { + console.log('šŸ•’ Recently processed sessions:'); + recentlyProcessed.forEach(session => { + const timeAgo = Math.round((Date.now() - session.createdAt.getTime()) / 1000 / 60); + console.log(` • ${session.id.substring(0, 8)}... (${timeAgo}m ago) - ${session.category || 'No category'}`); + }); + } else { + console.log('šŸ•’ No sessions processed in the last hour'); + } + + console.log('\n✨ Processing system is working correctly!'); + console.log('šŸ’” The parallel processing successfully processed sessions.'); + console.log('šŸŽÆ For manual triggers, you need to be logged in as an admin user.'); + + } catch (error) { + console.error('āŒ Error checking status:', error); + } finally { + await prisma.$disconnect(); + } +} + +checkProcessingStatus(); diff --git a/scripts/trigger-processing-direct.js b/scripts/trigger-processing-direct.js new file mode 100644 index 0000000..4b8f07e --- /dev/null +++ b/scripts/trigger-processing-direct.js @@ -0,0 +1,20 @@ +// Direct trigger for processing scheduler (bypasses authentication) +// Usage: node scripts/trigger-processing-direct.js + +import { processUnprocessedSessions } from '../lib/processingScheduler.js'; + +async function triggerProcessing() { + try { + console.log('šŸš€ Manually triggering processing scheduler...\n'); + + // Process with custom parameters + await processUnprocessedSessions(50, 3); // Process 50 sessions with 3 concurrent workers + + console.log('\nāœ… Processing trigger completed!'); + + } catch (error) { + console.error('āŒ Error triggering processing:', error); + } +} + +triggerProcessing();