From a9e41450011fa7283b5a07e3deccb8f49433e462 Mon Sep 17 00:00:00 2001 From: Max Kowalski Date: Wed, 25 Jun 2025 17:45:08 +0200 Subject: [PATCH] feat: Implement structured message parsing and display in MessageViewer component - Added MessageViewer component to display parsed messages in a chat-like format. - Introduced new Message table in the database to store individual messages with timestamps, roles, and content. - Updated Session model to include a relation to parsed messages. - Created transcript parsing logic to convert raw transcripts into structured messages. - Enhanced processing scheduler to handle sessions with parsed messages. - Updated API endpoints to return parsed messages alongside session details. - Added manual trigger commands for session refresh, transcript parsing, and processing. - Improved user experience with color-coded message roles and timestamps in the UI. - Documented the new scheduler workflow and transcript parsing implementation. --- .env.development => .env.example | 2 +- app/dashboard/sessions/[id]/page.tsx | 41 ++-- components/MessageViewer.tsx | 75 +++++++ components/SessionDetails.tsx | 35 ++- docs/scheduler-workflow.md | 185 ++++++++++++++++ docs/transcript-parsing-implementation.md | 203 +++++++++++++++++ lib/csvFetcher.js | 8 +- lib/processingScheduler.js | 43 ++-- lib/schedulers.ts | 4 +- lib/transcriptParser.js | 209 ++++++++++++++++++ lib/types.ts | 12 +- package.json | 32 +-- pages/api/admin/refresh-sessions.ts | 2 +- pages/api/dashboard/session/[id].ts | 15 +- .../migration.sql | 14 ++ .../migration.sql | 39 ++++ prisma/schema.prisma | 15 +- scripts/manual-triggers.js | 187 ++++++++++++++++ server.mjs | 8 +- server.ts | 4 +- 20 files changed, 1043 insertions(+), 90 deletions(-) rename .env.development => .env.example (91%) create mode 100644 components/MessageViewer.tsx create mode 100644 docs/scheduler-workflow.md create mode 100644 docs/transcript-parsing-implementation.md create mode 100644 lib/transcriptParser.js create mode 100644 prisma/migrations/20250625152312_add_message_table/migration.sql create mode 100644 prisma/migrations/20250625153042_remove_transcript_content/migration.sql create mode 100644 scripts/manual-triggers.js diff --git a/.env.development b/.env.example similarity index 91% rename from .env.development rename to .env.example index 59bd135..56ca41f 100644 --- a/.env.development +++ b/.env.example @@ -8,6 +8,6 @@ NODE_ENV=development # OpenAI API key for session processing # Add your API key here: OPENAI_API_KEY=sk-... -OPENAI_API_KEY= +OPENAI_API_KEY=your_openai_api_key_here # Database connection - already configured in your prisma/schema.prisma diff --git a/app/dashboard/sessions/[id]/page.tsx b/app/dashboard/sessions/[id]/page.tsx index 7ff90d8..7b3f863 100644 --- a/app/dashboard/sessions/[id]/page.tsx +++ b/app/dashboard/sessions/[id]/page.tsx @@ -5,6 +5,7 @@ import { useParams, useRouter } from "next/navigation"; // Import useRouter import { useSession } from "next-auth/react"; // Import useSession import SessionDetails from "../../../../components/SessionDetails"; import TranscriptViewer from "../../../../components/TranscriptViewer"; +import MessageViewer from "../../../../components/MessageViewer"; import { ChatSession } from "../../../../lib/types"; import Link from "next/link"; @@ -136,30 +137,26 @@ export default function SessionViewPage() {
- {session.transcriptContent && - session.transcriptContent.trim() !== "" ? ( -
- + + {/* Show parsed messages if available */} + {session.messages && session.messages.length > 0 && ( +
+
- ) : ( + )} + + {/* Show transcript URL if available */} + {session.fullTranscriptUrl && (
-

Transcript

-

- No transcript content available for this session. -

- {session.fullTranscriptUrl && ( - - View Source Transcript URL - - )} +

Source Transcript

+ + View Original Transcript +
)}
diff --git a/components/MessageViewer.tsx b/components/MessageViewer.tsx new file mode 100644 index 0000000..1c4093b --- /dev/null +++ b/components/MessageViewer.tsx @@ -0,0 +1,75 @@ +"use client"; + +import { Message } from "../lib/types"; + +interface MessageViewerProps { + messages: Message[]; +} + +/** + * Component to display parsed messages in a chat-like format + */ +export default function MessageViewer({ messages }: MessageViewerProps) { + if (!messages || messages.length === 0) { + return ( +
+

Conversation

+

No parsed messages available

+
+ ); + } + + return ( +
+

+ Conversation ({messages.length} messages) +

+ +
+ {messages.map((message) => ( +
+
+
+ + {message.role} + + + {new Date(message.timestamp).toLocaleTimeString()} + +
+
+ {message.content} +
+
+
+ ))} +
+ +
+
+ + First message: {new Date(messages[0].timestamp).toLocaleString()} + + + Last message: {new Date(messages[messages.length - 1].timestamp).toLocaleString()} + +
+
+
+ ); +} diff --git a/components/SessionDetails.tsx b/components/SessionDetails.tsx index f48ac98..4093b2b 100644 --- a/components/SessionDetails.tsx +++ b/components/SessionDetails.tsx @@ -15,11 +15,10 @@ export default function SessionDetails({ session }: SessionDetailsProps) { return (

Session Details

- -
+
Session ID: - {session.sessionId || session.id} + {session.id}
@@ -220,23 +219,19 @@ export default function SessionDetails({ session }: SessionDetailsProps) {
)} - {/* Transcript rendering is now handled by the parent page (app/dashboard/sessions/[id]/page.tsx) */} - {/* Fallback to link only if we only have the URL but no content - this might also be redundant if parent handles all transcript display */} - {(!session.transcriptContent || - session.transcriptContent.length === 0) && - session.fullTranscriptUrl && ( -
- Transcript: - - View Full Transcript - -
- )} + {session.fullTranscriptUrl && ( +
+ Transcript: + + View Full Transcript + +
+ )}
); diff --git a/docs/scheduler-workflow.md b/docs/scheduler-workflow.md new file mode 100644 index 0000000..3d55093 --- /dev/null +++ b/docs/scheduler-workflow.md @@ -0,0 +1,185 @@ +# Scheduler Workflow Documentation + +## Overview +The LiveDash system has two main schedulers that work together to fetch and process session data: + +1. **Session Refresh Scheduler** - Fetches new sessions from CSV files +2. **Processing Scheduler** - Processes session transcripts with AI + +## Current Status (as of latest check) +- **Total sessions**: 107 +- **Processed sessions**: 0 +- **Sessions with transcript**: 0 +- **Ready for processing**: 0 + +## How the `processed` Field Works + +The ProcessingScheduler picks up sessions where `processed` is **NOT** `true`, which includes: +- `processed = false` +- `processed = null` + +**Query used:** +```javascript +{ processed: { not: true } } // Either false or null +``` + +## Complete Workflow + +### Step 1: Session Refresh (CSV Fetching) +**What it does:** +- Fetches session data from company CSV URLs +- Creates session records in database with basic metadata +- Sets `transcriptContent = null` initially +- Sets `processed = null` initially + +**Runs:** Every 30 minutes (cron: `*/30 * * * *`) + +### Step 2: Transcript Fetching +**What it does:** +- Downloads full transcript content for sessions +- Updates `transcriptContent` field with actual conversation data +- Sessions remain `processed = null` until AI processing + +**Runs:** As part of session refresh process + +### Step 3: AI Processing +**What it does:** +- Finds sessions with transcript content where `processed != true` +- Sends transcripts to OpenAI for analysis +- Extracts: sentiment, category, questions, summary, etc. +- Updates session with processed data +- Sets `processed = true` + +**Runs:** Every hour (cron: `0 * * * *`) + +## Manual Trigger Commands + +### Check Current Status +```bash +node scripts/manual-triggers.js status +``` + +### Trigger Session Refresh (Fetch new sessions from CSV) +```bash +node scripts/manual-triggers.js refresh +``` + +### Trigger AI Processing (Process unprocessed sessions) +```bash +node scripts/manual-triggers.js process +``` + +### Run Both Schedulers +```bash +node scripts/manual-triggers.js both +``` + +## Troubleshooting + +### No Sessions Being Processed? +1. **Check if sessions have transcripts:** + ```bash + node scripts/manual-triggers.js status + ``` + +2. **If "Sessions with transcript" is 0:** + - Sessions exist but transcripts haven't been fetched yet + - Run session refresh: `node scripts/manual-triggers.js refresh` + +3. **If "Ready for processing" is 0 but "Sessions with transcript" > 0:** + - All sessions with transcripts have already been processed + - Check if `OPENAI_API_KEY` is set in environment + +### Common Issues + +#### "No sessions found requiring processing" +- All sessions with transcripts have been processed (`processed = true`) +- Or no sessions have transcript content yet + +#### "OPENAI_API_KEY environment variable is not set" +- Add OpenAI API key to `.env.development` file +- Restart the application + +#### "Error fetching transcript: Unauthorized" +- CSV credentials are incorrect or expired +- Check company CSV username/password in database + +## Database Field Mapping + +### Before AI Processing +```javascript +{ + id: "session-uuid", + transcriptContent: "full conversation text" | null, + processed: null, + sentimentCategory: null, + questions: null, + summary: null, + // ... other fields +} +``` + +### After AI Processing +```javascript +{ + id: "session-uuid", + transcriptContent: "full conversation text", + processed: true, + sentimentCategory: "positive" | "neutral" | "negative", + questions: '["question 1", "question 2"]', // JSON string + summary: "Brief conversation summary", + language: "en", // ISO 639-1 code + messagesSent: 5, + sentiment: 0.8, // Float value (-1 to 1) + escalated: false, + forwardedHr: false, + category: "Schedule & Hours", + // ... other fields +} +``` + +## Scheduler Configuration + +### Session Refresh Scheduler +- **File**: `lib/scheduler.js` +- **Frequency**: Every 30 minutes +- **Cron**: `*/30 * * * *` + +### Processing Scheduler +- **File**: `lib/processingScheduler.js` +- **Frequency**: Every hour +- **Cron**: `0 * * * *` +- **Batch size**: 10 sessions per run + +## Environment Variables Required + +```bash +# Database +DATABASE_URL="postgresql://..." + +# OpenAI (for processing) +OPENAI_API_KEY="sk-..." + +# NextAuth +NEXTAUTH_SECRET="..." +NEXTAUTH_URL="http://localhost:3000" +``` + +## Next Steps for Testing + +1. **Trigger session refresh** to fetch transcripts: + ```bash + node scripts/manual-triggers.js refresh + ``` + +2. **Check status** to see if transcripts were fetched: + ```bash + node scripts/manual-triggers.js status + ``` + +3. **Trigger processing** if transcripts are available: + ```bash + node scripts/manual-triggers.js process + ``` + +4. **View results** in the dashboard session details pages diff --git a/docs/transcript-parsing-implementation.md b/docs/transcript-parsing-implementation.md new file mode 100644 index 0000000..51fabba --- /dev/null +++ b/docs/transcript-parsing-implementation.md @@ -0,0 +1,203 @@ +# Transcript Parsing Implementation + +## Overview +Added structured message parsing to the LiveDash system, allowing transcripts to be broken down into individual messages with timestamps, roles, and content. This provides a much better user experience for viewing conversations. + +## Database Changes + +### New Message Table +```sql +CREATE TABLE Message ( + id TEXT PRIMARY KEY DEFAULT (uuid()), + sessionId TEXT NOT NULL, + timestamp DATETIME NOT NULL, + role TEXT NOT NULL, + content TEXT NOT NULL, + order INTEGER NOT NULL, + createdAt DATETIME DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (sessionId) REFERENCES Session(id) ON DELETE CASCADE +); + +CREATE INDEX Message_sessionId_order_idx ON Message(sessionId, order); +``` + +### Updated Session Table +- Added `messages` relation to Session model +- Sessions can now have both raw transcript content AND parsed messages + +## New Components + +### 1. Message Interface (`lib/types.ts`) +```typescript +export interface Message { + id: string; + sessionId: string; + timestamp: Date; + role: string; // "User", "Assistant", "System", etc. + content: string; + order: number; // Order within the conversation (0, 1, 2, ...) + createdAt: Date; +} +``` + +### 2. Transcript Parser (`lib/transcriptParser.js`) +- **`parseChatLogToJSON(logString)`** - Parses raw transcript text into structured messages +- **`storeMessagesForSession(sessionId, messages)`** - Stores parsed messages in database +- **`processTranscriptForSession(sessionId, transcriptContent)`** - Complete processing for one session +- **`processAllUnparsedTranscripts()`** - Batch process all unparsed transcripts +- **`getMessagesForSession(sessionId)`** - Retrieve messages for a session + +### 3. MessageViewer Component (`components/MessageViewer.tsx`) +- Chat-like interface for displaying parsed messages +- Color-coded by role (User: blue, Assistant: gray, System: yellow) +- Shows timestamps and message order +- Scrollable with conversation metadata + +## Updated Components + +### 1. Session API (`pages/api/dashboard/session/[id].ts`) +- Now includes parsed messages in session response +- Messages are ordered by `order` field (ascending) + +### 2. Session Details Page (`app/dashboard/sessions/[id]/page.tsx`) +- Added MessageViewer component +- Shows both parsed messages AND raw transcript +- Prioritizes parsed messages when available + +### 3. ChatSession Interface (`lib/types.ts`) +- Added optional `messages?: Message[]` field + +## Parsing Logic + +### Supported Format +The parser expects transcript format: +``` +[DD.MM.YYYY HH:MM:SS] Role: Message content +[DD.MM.YYYY HH:MM:SS] User: Hello, I need help +[DD.MM.YYYY HH:MM:SS] Assistant: How can I help you today? +``` + +### Features +- **Multi-line support** - Messages can span multiple lines +- **Timestamp parsing** - Converts DD.MM.YYYY HH:MM:SS to ISO format +- **Role detection** - Extracts sender role from each message +- **Ordering** - Maintains conversation order with explicit order field +- **Sorting** - Messages sorted by timestamp, then by role (User before Assistant) + +## Manual Commands + +### New Commands Added +```bash +# Parse transcripts into structured messages +node scripts/manual-triggers.js parse + +# Complete workflow: refresh → parse → process +node scripts/manual-triggers.js all + +# Check status (now shows parsing info) +node scripts/manual-triggers.js status +``` + +### Updated Commands +- **`status`** - Now shows transcript and parsing statistics +- **`all`** - New command that runs refresh → parse → process in sequence + +## Workflow Integration + +### Complete Processing Pipeline +1. **Session Refresh** - Fetch sessions from CSV, download transcripts +2. **Transcript Parsing** - Parse raw transcripts into structured messages +3. **AI Processing** - Process sessions with OpenAI for sentiment, categories, etc. + +### Database States +```javascript +// After CSV fetch +{ + transcriptContent: "raw text...", + messages: [], // Empty + processed: null +} + +// After parsing +{ + transcriptContent: "raw text...", + messages: [Message, Message, ...], // Parsed + processed: null +} + +// After AI processing +{ + transcriptContent: "raw text...", + messages: [Message, Message, ...], // Parsed + processed: true, + sentimentCategory: "positive", + summary: "Brief summary...", + // ... other AI fields +} +``` + +## User Experience Improvements + +### Before +- Only raw transcript text in a text area +- Difficult to follow conversation flow +- No clear distinction between speakers + +### After +- **Chat-like interface** with message bubbles +- **Color-coded roles** for easy identification +- **Timestamps** for each message +- **Conversation metadata** (first/last message times) +- **Fallback to raw transcript** if parsing fails +- **Both views available** - structured AND raw + +## Testing + +### Manual Testing Commands +```bash +# Check current status +node scripts/manual-triggers.js status + +# Parse existing transcripts +node scripts/manual-triggers.js parse + +# Full pipeline test +node scripts/manual-triggers.js all +``` + +### Expected Results +1. Sessions with transcript content get parsed into individual messages +2. Session detail pages show chat-like interface +3. Both parsed messages and raw transcript are available +4. No data loss - original transcript content preserved + +## Technical Benefits + +### Performance +- **Indexed queries** - Messages indexed by sessionId and order +- **Efficient loading** - Only load messages when needed +- **Cascading deletes** - Messages automatically deleted with sessions + +### Maintainability +- **Separation of concerns** - Parsing logic isolated in dedicated module +- **Type safety** - Full TypeScript support for Message interface +- **Error handling** - Graceful fallbacks when parsing fails + +### Extensibility +- **Role flexibility** - Supports any role names (User, Assistant, System, etc.) +- **Content preservation** - Multi-line messages fully supported +- **Metadata ready** - Easy to add message-level metadata in future + +## Migration Notes + +### Existing Data +- **No data loss** - Original transcript content preserved +- **Backward compatibility** - Pages work with or without parsed messages +- **Gradual migration** - Can parse transcripts incrementally + +### Database Migration +- New Message table created with foreign key constraints +- Existing Session table unchanged (only added relation) +- Index created for efficient message queries + +This implementation provides a solid foundation for enhanced conversation analysis and user experience while maintaining full backward compatibility. diff --git a/lib/csvFetcher.js b/lib/csvFetcher.js index df701ab..5d32b5e 100644 --- a/lib/csvFetcher.js +++ b/lib/csvFetcher.js @@ -407,7 +407,7 @@ async function fetchTranscriptContent(url, username, password) { headers: authHeader ? { Authorization: authHeader } : {}, timeout: 10000, // 10 second timeout }); - + if (!response.ok) { // Only log error once per batch, not for every transcript if (Math.random() < 0.1) { // Log ~10% of errors to avoid spam @@ -502,7 +502,7 @@ export async function fetchAndStoreSessionsForAllCompanies() { try { // Get all companies const companies = await prisma.company.findMany(); - + for (const company of companies) { if (!company.csvUrl) { console.log(`[Scheduler] Skipping company ${company.id} - no CSV URL configured`); @@ -516,7 +516,7 @@ export async function fetchAndStoreSessionsForAllCompanies() { } console.log(`[Scheduler] Processing sessions for company: ${company.id}`); - + try { const sessions = await fetchAndParseCsv( company.csvUrl, @@ -603,7 +603,7 @@ export async function fetchAndStoreSessionsForAllCompanies() { initialMsg: session.initialMsg || null, }, }); - + addedCount++; } diff --git a/lib/processingScheduler.js b/lib/processingScheduler.js index 05e1a1b..5517d63 100644 --- a/lib/processingScheduler.js +++ b/lib/processingScheduler.js @@ -181,46 +181,59 @@ function validateOpenAIResponse(data) { /** * Process unprocessed sessions */ -async function processUnprocessedSessions() { +export async function processUnprocessedSessions() { process.stdout.write("[ProcessingScheduler] Starting to process unprocessed sessions...\n"); - // Find sessions that have transcript content but haven't been processed + // Find sessions that have messages but haven't been processed const sessionsToProcess = await prisma.session.findMany({ where: { AND: [ - { transcriptContent: { not: null } }, - { transcriptContent: { not: "" } }, { processed: { not: true } }, // Either false or null ], }, - select: { - id: true, - transcriptContent: true, + include: { + messages: { + orderBy: { order: 'asc' } + } }, take: 10, // Process in batches to avoid overloading the system }); - if (sessionsToProcess.length === 0) { + // Filter to only sessions that have messages + const sessionsWithMessages = sessionsToProcess.filter(session => session.messages.length > 0); + + if (sessionsWithMessages.length === 0) { process.stdout.write("[ProcessingScheduler] No sessions found requiring processing.\n"); return; } - process.stdout.write(`[ProcessingScheduler] Found ${sessionsToProcess.length} sessions to process.\n`); + process.stdout.write(`[ProcessingScheduler] Found ${sessionsWithMessages.length} sessions to process.\n`); let successCount = 0; let errorCount = 0; - for (const session of sessionsToProcess) { - if (!session.transcriptContent) { - // Should not happen due to query, but good for type safety - process.stderr.write(`[ProcessingScheduler] Session ${session.id} has no transcript content, skipping.\n`); + for (const session of sessionsWithMessages) { + if (session.messages.length === 0) { + process.stderr.write(`[ProcessingScheduler] Session ${session.id} has no messages, skipping.\n`); continue; } - process.stdout.write(`[ProcessingScheduler] Processing transcript for session ${session.id}...\n`); + process.stdout.write(`[ProcessingScheduler] Processing messages for session ${session.id}...\n`); try { + // Convert messages back to transcript format for OpenAI processing + const transcript = session.messages.map(msg => + `[${new Date(msg.timestamp).toLocaleString('en-GB', { + day: '2-digit', + month: '2-digit', + year: 'numeric', + hour: '2-digit', + minute: '2-digit', + second: '2-digit' + }).replace(',', '')}] ${msg.role}: ${msg.content}` + ).join('\n'); + const processedData = await processTranscriptWithOpenAI( session.id, - session.transcriptContent + transcript ); // Map sentiment string to float value for compatibility with existing data diff --git a/lib/schedulers.ts b/lib/schedulers.ts index 72e0dfe..7e738ac 100644 --- a/lib/schedulers.ts +++ b/lib/schedulers.ts @@ -10,9 +10,9 @@ import { startProcessingScheduler } from "./processingScheduler"; export function initializeSchedulers() { // Start the session refresh scheduler startScheduler(); - + // Start the session processing scheduler startProcessingScheduler(); - + console.log("All schedulers initialized successfully"); } diff --git a/lib/transcriptParser.js b/lib/transcriptParser.js new file mode 100644 index 0000000..4d0430f --- /dev/null +++ b/lib/transcriptParser.js @@ -0,0 +1,209 @@ +// Transcript parser utility - converts raw transcript text to structured messages +import { PrismaClient } from "@prisma/client"; + +const prisma = new PrismaClient(); + +/** + * Parses chat log string to JSON format with individual messages + * @param {string} logString - Raw transcript content + * @returns {Object} Parsed data with messages array and metadata + */ +export function parseChatLogToJSON(logString) { + // Convert to string if it's not already + const stringData = typeof logString === 'string' ? logString : String(logString); + + // Split by lines and filter out empty lines + const lines = stringData.split('\n').filter(line => line.trim() !== ''); + + const messages = []; + let currentMessage = null; + + for (const line of lines) { + // Check if line starts with a timestamp pattern [DD.MM.YYYY HH:MM:SS] + const timestampMatch = line.match(/^\[(\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})\] (.+?): (.*)$/); + + if (timestampMatch) { + // If we have a previous message, push it to the array + if (currentMessage) { + messages.push(currentMessage); + } + + // Parse the timestamp + const [, timestamp, sender, content] = timestampMatch; + + // Convert DD.MM.YYYY HH:MM:SS to ISO format + const [datePart, timePart] = timestamp.split(' '); + const [day, month, year] = datePart.split('.'); + const [hour, minute, second] = timePart.split(':'); + + const dateObject = new Date(year, month - 1, day, hour, minute, second); + + // Create new message object + currentMessage = { + timestamp: dateObject.toISOString(), + role: sender, + content: content + }; + } else if (currentMessage) { + // This is a continuation of the previous message (multiline) + currentMessage.content += '\n' + line; + } + } + + // Don't forget the last message + if (currentMessage) { + messages.push(currentMessage); + } + + return { + messages: messages.sort((a, b) => { + // First sort by timestamp (ascending) + const timeComparison = new Date(a.timestamp) - new Date(b.timestamp); + if (timeComparison !== 0) { + return timeComparison; + } + + // If timestamps are equal, sort by role (descending) + // This puts "User" before "Assistant" when timestamps are the same + return b.role.localeCompare(a.role); + }), + totalMessages: messages.length + }; +} + +/** + * Stores parsed messages in the database for a session + * @param {string} sessionId - The session ID + * @param {Array} messages - Array of parsed message objects + */ +export async function storeMessagesForSession(sessionId, messages) { + try { + // First, delete any existing messages for this session + await prisma.message.deleteMany({ + where: { sessionId } + }); + + // Then insert the new messages + const messageData = messages.map((message, index) => ({ + sessionId, + timestamp: new Date(message.timestamp), + role: message.role, + content: message.content, + order: index + })); + + if (messageData.length > 0) { + await prisma.message.createMany({ + data: messageData + }); + } + + process.stdout.write(`[TranscriptParser] Stored ${messageData.length} messages for session ${sessionId}\n`); + return messageData.length; + } catch (error) { + process.stderr.write(`[TranscriptParser] Error storing messages for session ${sessionId}: ${error}\n`); + throw error; + } +} + +/** + * Processes and stores transcript for a single session + * @param {string} sessionId - The session ID + * @param {string} transcriptContent - Raw transcript content + * @returns {Promise} Processing result with message count + */ +export async function processTranscriptForSession(sessionId, transcriptContent) { + if (!transcriptContent || transcriptContent.trim() === '') { + throw new Error('No transcript content provided'); + } + + try { + // Parse the transcript + const parsed = parseChatLogToJSON(transcriptContent); + + // Store messages in database + const messageCount = await storeMessagesForSession(sessionId, parsed.messages); + + return { + sessionId, + messageCount, + totalMessages: parsed.totalMessages, + success: true + }; + } catch (error) { + process.stderr.write(`[TranscriptParser] Error processing transcript for session ${sessionId}: ${error}\n`); + throw error; + } +} + +/** + * Processes transcripts for all sessions that have transcript content but no parsed messages + */ +export async function processAllUnparsedTranscripts() { + process.stdout.write("[TranscriptParser] Starting to process unparsed transcripts...\n"); + + try { + // Find sessions with transcript content but no messages + const sessionsToProcess = await prisma.session.findMany({ + where: { + AND: [ + { transcriptContent: { not: null } }, + { transcriptContent: { not: "" } }, + ] + }, + include: { + messages: true + } + }); + + // Filter to only sessions without messages + const unparsedSessions = sessionsToProcess.filter(session => session.messages.length === 0); + + if (unparsedSessions.length === 0) { + process.stdout.write("[TranscriptParser] No unparsed transcripts found.\n"); + return { processed: 0, errors: 0 }; + } + + process.stdout.write(`[TranscriptParser] Found ${unparsedSessions.length} sessions with unparsed transcripts.\n`); + + let successCount = 0; + let errorCount = 0; + + for (const session of unparsedSessions) { + try { + const result = await processTranscriptForSession(session.id, session.transcriptContent); + process.stdout.write(`[TranscriptParser] Processed session ${session.id}: ${result.messageCount} messages\n`); + successCount++; + } catch (error) { + process.stderr.write(`[TranscriptParser] Failed to process session ${session.id}: ${error}\n`); + errorCount++; + } + } + + process.stdout.write(`[TranscriptParser] Completed processing. Success: ${successCount}, Errors: ${errorCount}\n`); + return { processed: successCount, errors: errorCount }; + + } catch (error) { + process.stderr.write(`[TranscriptParser] Error in processAllUnparsedTranscripts: ${error}\n`); + throw error; + } +} + +/** + * Gets parsed messages for a session + * @param {string} sessionId - The session ID + * @returns {Promise} Array of message objects + */ +export async function getMessagesForSession(sessionId) { + try { + const messages = await prisma.message.findMany({ + where: { sessionId }, + orderBy: { order: 'asc' } + }); + + return messages; + } catch (error) { + process.stderr.write(`[TranscriptParser] Error getting messages for session ${sessionId}: ${error}\n`); + throw error; + } +} diff --git a/lib/types.ts b/lib/types.ts index b234d9e..2f04ff0 100644 --- a/lib/types.ts +++ b/lib/types.ts @@ -35,6 +35,16 @@ export interface User { updatedAt: Date; } +export interface Message { + id: string; + sessionId: string; + timestamp: Date; + role: string; // "User", "Assistant", "System", etc. + content: string; + order: number; // Order within the conversation (0, 1, 2, ...) + createdAt: Date; +} + export interface ChatSession { id: string; sessionId: string; @@ -60,10 +70,10 @@ export interface ChatSession { tokensEur?: number; initialMsg?: string; fullTranscriptUrl?: string | null; - transcriptContent?: string | null; processed?: boolean | null; // Flag for post-processing status questions?: string | null; // JSON array of questions asked by user summary?: string | null; // Brief summary of the conversation + messages?: Message[]; // Parsed messages from transcript } export interface SessionQuery { diff --git a/package.json b/package.json index f59c062..45dfe58 100644 --- a/package.json +++ b/package.json @@ -3,6 +3,22 @@ "type": "module", "version": "0.2.0", "private": true, + "scripts": { + "build": "next build", + "dev": "next dev --turbopack", + "dev:with-schedulers": "node server.mjs", + "format": "npx prettier --write .", + "format:check": "npx prettier --check .", + "lint": "next lint", + "lint:fix": "npx eslint --fix", + "prisma:generate": "prisma generate", + "prisma:migrate": "prisma migrate dev", + "prisma:seed": "node prisma/seed.mjs", + "prisma:studio": "prisma studio", + "start": "node server.mjs", + "lint:md": "markdownlint-cli2 \"**/*.md\" \"!.trunk/**\" \"!.venv/**\" \"!node_modules/**\"", + "lint:md:fix": "markdownlint-cli2 --fix \"**/*.md\" \"!.trunk/**\" \"!.venv/**\" \"!node_modules/**\"" + }, "dependencies": { "@prisma/client": "^6.10.1", "@rapideditor/country-coder": "^5.4.0", @@ -55,22 +71,6 @@ "ts-node": "^10.9.2", "typescript": "^5.0.0" }, - "scripts": { - "build": "next build", - "dev": "next dev --turbopack", - "dev:with-schedulers": "node server.mjs", - "format": "npx prettier --write .", - "format:check": "npx prettier --check .", - "lint": "next lint", - "lint:fix": "npx eslint --fix", - "prisma:generate": "prisma generate", - "prisma:migrate": "prisma migrate dev", - "prisma:seed": "node prisma/seed.mjs", - "prisma:studio": "prisma studio", - "start": "node server.mjs", - "lint:md": "markdownlint-cli2 \"**/*.md\" \"!.trunk/**\" \"!.venv/**\" \"!node_modules/**\"", - "lint:md:fix": "markdownlint-cli2 --fix \"**/*.md\" \"!.trunk/**\" \"!.venv/**\" \"!node_modules/**\"" - }, "prettier": { "bracketSpacing": true, "endOfLine": "auto", diff --git a/pages/api/admin/refresh-sessions.ts b/pages/api/admin/refresh-sessions.ts index 910a143..8e072aa 100644 --- a/pages/api/admin/refresh-sessions.ts +++ b/pages/api/admin/refresh-sessions.ts @@ -32,7 +32,7 @@ async function fetchTranscriptContent( const response = await fetch(url, { headers: authHeader ? { Authorization: authHeader } : {}, }); - + if (!response.ok) { process.stderr.write( `Error fetching transcript: ${response.statusText}\n` diff --git a/pages/api/dashboard/session/[id].ts b/pages/api/dashboard/session/[id].ts index 6997453..37e4c6c 100644 --- a/pages/api/dashboard/session/[id].ts +++ b/pages/api/dashboard/session/[id].ts @@ -19,6 +19,11 @@ export default async function handler( try { const prismaSession = await prisma.session.findUnique({ where: { id }, + include: { + messages: { + orderBy: { order: 'asc' } + } + } }); if (!prismaSession) { @@ -55,10 +60,18 @@ export default async function handler( tokensEur: prismaSession.tokensEur ?? undefined, initialMsg: prismaSession.initialMsg ?? undefined, fullTranscriptUrl: prismaSession.fullTranscriptUrl ?? null, - transcriptContent: prismaSession.transcriptContent ?? null, processed: prismaSession.processed ?? null, // New field questions: prismaSession.questions ?? null, // New field summary: prismaSession.summary ?? null, // New field + messages: prismaSession.messages?.map(msg => ({ + id: msg.id, + sessionId: msg.sessionId, + timestamp: new Date(msg.timestamp), + role: msg.role, + content: msg.content, + order: msg.order, + createdAt: new Date(msg.createdAt) + })) ?? [], // New field - parsed messages }; return res.status(200).json({ session }); diff --git a/prisma/migrations/20250625152312_add_message_table/migration.sql b/prisma/migrations/20250625152312_add_message_table/migration.sql new file mode 100644 index 0000000..e97b78b --- /dev/null +++ b/prisma/migrations/20250625152312_add_message_table/migration.sql @@ -0,0 +1,14 @@ +-- CreateTable +CREATE TABLE "Message" ( + "id" TEXT NOT NULL PRIMARY KEY, + "sessionId" TEXT NOT NULL, + "timestamp" DATETIME NOT NULL, + "role" TEXT NOT NULL, + "content" TEXT NOT NULL, + "order" INTEGER NOT NULL, + "createdAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT "Message_sessionId_fkey" FOREIGN KEY ("sessionId") REFERENCES "Session" ("id") ON DELETE CASCADE ON UPDATE CASCADE +); + +-- CreateIndex +CREATE INDEX "Message_sessionId_order_idx" ON "Message"("sessionId", "order"); diff --git a/prisma/migrations/20250625153042_remove_transcript_content/migration.sql b/prisma/migrations/20250625153042_remove_transcript_content/migration.sql new file mode 100644 index 0000000..b13885c --- /dev/null +++ b/prisma/migrations/20250625153042_remove_transcript_content/migration.sql @@ -0,0 +1,39 @@ +/* + Warnings: + + - You are about to drop the column `transcriptContent` on the `Session` table. All the data in the column will be lost. + +*/ +-- RedefineTables +PRAGMA defer_foreign_keys=ON; +PRAGMA foreign_keys=OFF; +CREATE TABLE "new_Session" ( + "id" TEXT NOT NULL PRIMARY KEY, + "companyId" TEXT NOT NULL, + "startTime" DATETIME NOT NULL, + "endTime" DATETIME NOT NULL, + "ipAddress" TEXT, + "country" TEXT, + "language" TEXT, + "messagesSent" INTEGER, + "sentiment" REAL, + "sentimentCategory" TEXT, + "escalated" BOOLEAN, + "forwardedHr" BOOLEAN, + "fullTranscriptUrl" TEXT, + "avgResponseTime" REAL, + "tokens" INTEGER, + "tokensEur" REAL, + "category" TEXT, + "initialMsg" TEXT, + "processed" BOOLEAN, + "questions" TEXT, + "summary" TEXT, + "createdAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT "Session_companyId_fkey" FOREIGN KEY ("companyId") REFERENCES "Company" ("id") ON DELETE RESTRICT ON UPDATE CASCADE +); +INSERT INTO "new_Session" ("avgResponseTime", "category", "companyId", "country", "createdAt", "endTime", "escalated", "forwardedHr", "fullTranscriptUrl", "id", "initialMsg", "ipAddress", "language", "messagesSent", "processed", "questions", "sentiment", "sentimentCategory", "startTime", "summary", "tokens", "tokensEur") SELECT "avgResponseTime", "category", "companyId", "country", "createdAt", "endTime", "escalated", "forwardedHr", "fullTranscriptUrl", "id", "initialMsg", "ipAddress", "language", "messagesSent", "processed", "questions", "sentiment", "sentimentCategory", "startTime", "summary", "tokens", "tokensEur" FROM "Session"; +DROP TABLE "Session"; +ALTER TABLE "new_Session" RENAME TO "Session"; +PRAGMA foreign_keys=ON; +PRAGMA defer_foreign_keys=OFF; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 7eeee88..0f3009a 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -48,7 +48,6 @@ model Session { escalated Boolean? forwardedHr Boolean? fullTranscriptUrl String? - transcriptContent String? // Added to store the fetched transcript avgResponseTime Float? tokens Int? tokensEur Float? @@ -57,5 +56,19 @@ model Session { processed Boolean? // Flag for post-processing status questions String? // JSON array of questions asked by user summary String? // Brief summary of the conversation + messages Message[] // Relation to parsed messages createdAt DateTime @default(now()) } + +model Message { + id String @id @default(uuid()) + session Session @relation(fields: [sessionId], references: [id], onDelete: Cascade) + sessionId String + timestamp DateTime // When the message was sent + role String // "User", "Assistant", "System", etc. + content String // The message content + order Int // Order within the conversation (0, 1, 2, ...) + createdAt DateTime @default(now()) + + @@index([sessionId, order]) // Index for efficient ordering queries +} diff --git a/scripts/manual-triggers.js b/scripts/manual-triggers.js new file mode 100644 index 0000000..0345064 --- /dev/null +++ b/scripts/manual-triggers.js @@ -0,0 +1,187 @@ +// Manual trigger scripts for both schedulers +import { fetchAndStoreSessionsForAllCompanies } from "../lib/csvFetcher.js"; +import { processAllUnparsedTranscripts } from "../lib/transcriptParser.js"; +import { PrismaClient } from "@prisma/client"; +import fetch from "node-fetch"; + +const prisma = new PrismaClient(); + +/** + * Manually trigger the session refresh scheduler + */ +async function triggerSessionRefresh() { + console.log("=== Manual Session Refresh Trigger ==="); + try { + await fetchAndStoreSessionsForAllCompanies(); + console.log("✅ Session refresh completed successfully"); + } catch (error) { + console.error("❌ Session refresh failed:", error); + } +} + +/** + * Manually trigger the processing scheduler + */ +async function triggerProcessingScheduler() { + console.log("=== Manual Processing Scheduler Trigger ==="); + + const OPENAI_API_KEY = process.env.OPENAI_API_KEY; + if (!OPENAI_API_KEY) { + console.error("❌ OPENAI_API_KEY environment variable is not set"); + return; + } + + try { + // Find sessions that need processing + const sessionsToProcess = await prisma.session.findMany({ + where: { + AND: [ + { messages: { some: {} } }, + { processed: { not: true } }, // Either false or null + ], + }, + select: { + id: true, + processed: true, + }, + take: 5, // Process 5 sessions for manual testing + }); + + console.log(`Found ${sessionsToProcess.length} sessions to process:`); + sessionsToProcess.forEach(session => { + console.log(`- Session ${session.id}: processed=${session.processed}`); + }); + + if (sessionsToProcess.length === 0) { + console.log("✅ No sessions found requiring processing"); + return; + } + + // Import and run the processing function + const { processUnprocessedSessions } = await import("../lib/processingScheduler.js"); + await processUnprocessedSessions(); + + console.log("✅ Processing scheduler completed"); + } catch (error) { + console.error("❌ Processing scheduler failed:", error); + } +} + +/** + * Manually trigger transcript parsing + */ +async function triggerTranscriptParsing() { + console.log("=== Manual Transcript Parsing Trigger ==="); + try { + const result = await processAllUnparsedTranscripts(); + console.log(`✅ Transcript parsing completed: ${result.processed} processed, ${result.errors} errors`); + } catch (error) { + console.error("❌ Transcript parsing failed:", error); + } +} + +/** + * Show current processing status + */ +async function showProcessingStatus() { + console.log("=== Processing Status ==="); + + try { + const totalSessions = await prisma.session.count(); + const processedSessions = await prisma.session.count({ + where: { processed: true } + }); + const unprocessedSessions = await prisma.session.count({ + where: { processed: { not: true } } + }); + const withMessages = await prisma.session.count({ + where: { + messages: { + some: {} + } + } + }); + const readyForProcessing = await prisma.session.count({ + where: { + AND: [ + { messages: { some: {} } }, + { processed: { not: true } } + ] + } + }); + + console.log(`📊 Total sessions: ${totalSessions}`); + console.log(`✅ Processed sessions: ${processedSessions}`); + console.log(`⏳ Unprocessed sessions: ${unprocessedSessions}`); + console.log(`📄 Sessions with messages: ${withMessages}`); + console.log(`🔄 Ready for processing: ${readyForProcessing}`); + + // Show some examples of unprocessed sessions + if (readyForProcessing > 0) { + console.log("\n📋 Sample unprocessed sessions:"); + const samples = await prisma.session.findMany({ + where: { + AND: [ + { messages: { some: {} } }, + { processed: { not: true } } + ] + }, + select: { + id: true, + processed: true, + startTime: true, + }, + take: 3 + }); + + samples.forEach(session => { + console.log(`- ${session.id} (${session.startTime.toISOString()}) - processed: ${session.processed}`); + }); + } + + } catch (error) { + console.error("❌ Failed to get processing status:", error); + } +} + +// Main execution based on command line argument +const command = process.argv[2]; + +switch (command) { + case 'refresh': + await triggerSessionRefresh(); + break; + case 'process': + await triggerProcessingScheduler(); + break; + case 'parse': + await triggerTranscriptParsing(); + break; + case 'status': + await showProcessingStatus(); + break; + case 'both': + await triggerSessionRefresh(); + console.log("\n" + "=".repeat(50) + "\n"); + await triggerProcessingScheduler(); + break; + case 'all': + await triggerSessionRefresh(); + console.log("\n" + "=".repeat(50) + "\n"); + await triggerTranscriptParsing(); + console.log("\n" + "=".repeat(50) + "\n"); + await triggerProcessingScheduler(); + break; + default: + console.log("Usage: node scripts/manual-triggers.js [command]"); + console.log("Commands:"); + console.log(" refresh - Trigger session refresh (fetch new sessions from CSV)"); + console.log(" parse - Parse transcripts into structured messages"); + console.log(" process - Trigger processing scheduler (process unprocessed sessions)"); + console.log(" status - Show current processing status"); + console.log(" both - Run both refresh and processing"); + console.log(" all - Run refresh, parse, and processing in sequence"); + break; +} + +await prisma.$disconnect(); diff --git a/server.mjs b/server.mjs index 4143071..4ede829 100644 --- a/server.mjs +++ b/server.mjs @@ -20,22 +20,22 @@ async function init() { // Dynamically import the schedulers const scheduler = await import('./lib/scheduler.js'); const processingScheduler = await import('./lib/processingScheduler.js'); - + startScheduler = scheduler.startScheduler; startProcessingScheduler = processingScheduler.startProcessingScheduler; - + app.prepare().then(() => { // Initialize schedulers when the server starts console.log('Starting schedulers...'); startScheduler(); startProcessingScheduler(); console.log('All schedulers initialized successfully'); - + createServer(async (req, res) => { try { // Parse the URL const parsedUrl = parse(req.url || '', true); - + // Let Next.js handle the request await handle(req, res, parsedUrl); } catch (err) { diff --git a/server.ts b/server.ts index f99d6fb..806242c 100644 --- a/server.ts +++ b/server.ts @@ -19,12 +19,12 @@ app.prepare().then(() => { startScheduler(); startProcessingScheduler(); console.log('All schedulers initialized successfully'); - + createServer(async (req, res) => { try { // Parse the URL const parsedUrl = parse(req.url || '', true); - + // Let Next.js handle the request await handle(req, res, parsedUrl); } catch (err) {