From 2dfc49f8402809cab47ffcceb797b81821823903 Mon Sep 17 00:00:00 2001 From: Max Kowalski Date: Fri, 27 Jun 2025 23:05:46 +0200 Subject: [PATCH] DB refactor --- .gitignore | 3 +- check-pipeline-status.ts | 96 ++++++ check-refactored-pipeline-status.ts | 78 +++++ debug-import-status.ts | 81 +++++ demo-admin-user.txt | 2 - docs/dashboard-components.md | 66 ++-- docs/scheduler-fixes.md | 46 +-- docs/scheduler-workflow.md | 114 ++++--- docs/session-processing.md | 56 ++-- docs/transcript-parsing-implementation.md | 135 ++++---- fix-import-status.ts | 88 +++++ lib/importProcessor.ts | 300 ++++++++++++------ lib/processingScheduler.ts | 98 ++++-- lib/processingStatusManager.ts | 295 +++++++++++++++++ migrate-to-refactored-system.ts | 129 ++++++++ .../migration.sql | 63 ++++ prisma/schema.prisma | 135 ++++++-- prisma/seed.ts | 127 +++++++- test-ai-processing.ts | 17 + test-import-processing.ts | 17 + 20 files changed, 1607 insertions(+), 339 deletions(-) create mode 100644 check-pipeline-status.ts create mode 100644 check-refactored-pipeline-status.ts create mode 100644 debug-import-status.ts delete mode 100644 demo-admin-user.txt create mode 100644 fix-import-status.ts create mode 100644 lib/processingStatusManager.ts create mode 100644 migrate-to-refactored-system.ts create mode 100644 prisma/migrations/20250627194930_add_ai_model_management/migration.sql create mode 100644 test-ai-processing.ts create mode 100644 test-import-processing.ts diff --git a/.gitignore b/.gitignore index 242dfda..223b193 100644 --- a/.gitignore +++ b/.gitignore @@ -263,4 +263,5 @@ Thumbs.db /playwright/.cache/ # OpenAI API request samples -sample-openai-request.json \ No newline at end of file +sample-openai-request.json +admin-user.txt \ No newline at end of file diff --git a/check-pipeline-status.ts b/check-pipeline-status.ts new file mode 100644 index 0000000..5bb5bc6 --- /dev/null +++ b/check-pipeline-status.ts @@ -0,0 +1,96 @@ +import { PrismaClient } from '@prisma/client'; + +const prisma = new PrismaClient(); + +async function checkPipelineStatus() { + try { + console.log('=== COMPLETE PIPELINE STATUS ===\n'); + + // Stage 1: SessionImport status + console.log('1. SessionImport Status:'); + const importCounts = await prisma.sessionImport.groupBy({ + by: ['status'], + _count: { status: true } + }); + + const totalImports = await prisma.sessionImport.count(); + console.log(` Total imports: ${totalImports}`); + importCounts.forEach(({ status, _count }) => { + console.log(` ${status}: ${_count.status}`); + }); + + // Stage 2: Session creation status + console.log('\n2. Session Creation Status:'); + const totalSessions = await prisma.session.count(); + const sessionsWithMessages = await prisma.session.count({ + where: { messages: { some: {} } } + }); + const sessionsWithoutMessages = await prisma.session.count({ + where: { messages: { none: {} } } + }); + + console.log(` Total sessions: ${totalSessions}`); + console.log(` Sessions with messages: ${sessionsWithMessages}`); + console.log(` Sessions without messages: ${sessionsWithoutMessages}`); + + // Stage 3: AI Processing status + console.log('\n3. AI Processing Status:'); + const processedSessions = await prisma.session.count({ + where: { processed: true } + }); + const unprocessedSessions = await prisma.session.count({ + where: { processed: false } + }); + + console.log(` Processed sessions: ${processedSessions}`); + console.log(` Unprocessed sessions: ${unprocessedSessions}`); + + // Stage 4: Questions extracted + console.log('\n4. Question Extraction Status:'); + const sessionsWithQuestions = await prisma.session.count({ + where: { sessionQuestions: { some: {} } } + }); + const totalQuestions = await prisma.question.count(); + + console.log(` Sessions with questions: ${sessionsWithQuestions}`); + console.log(` Total unique questions: ${totalQuestions}`); + + // Show what needs processing + console.log('\n=== WHAT NEEDS PROCESSING ==='); + + const queuedImports = await prisma.sessionImport.count({ + where: { status: 'QUEUED' } + }); + console.log(`• ${queuedImports} SessionImports need import processing`); + + const sessionsNeedingAI = await prisma.session.count({ + where: { + AND: [ + { messages: { some: {} } }, + { processed: false } + ] + } + }); + console.log(`• ${sessionsNeedingAI} Sessions need AI processing`); + + // Sample of what's pending + if (queuedImports > 0) { + console.log('\nSample queued imports:'); + const sampleImports = await prisma.sessionImport.findMany({ + where: { status: 'QUEUED' }, + select: { externalSessionId: true, createdAt: true }, + take: 5 + }); + sampleImports.forEach(imp => { + console.log(` ${imp.externalSessionId} (created: ${imp.createdAt})`); + }); + } + + } catch (error) { + console.error('Error checking pipeline status:', error); + } finally { + await prisma.$disconnect(); + } +} + +checkPipelineStatus(); diff --git a/check-refactored-pipeline-status.ts b/check-refactored-pipeline-status.ts new file mode 100644 index 0000000..0bb7038 --- /dev/null +++ b/check-refactored-pipeline-status.ts @@ -0,0 +1,78 @@ +import { PrismaClient } from '@prisma/client'; +import { ProcessingStatusManager } from './lib/processingStatusManager'; + +const prisma = new PrismaClient(); + +async function checkRefactoredPipelineStatus() { + try { + console.log('=== REFACTORED PIPELINE STATUS ===\n'); + + // Get pipeline status using the new system + const pipelineStatus = await ProcessingStatusManager.getPipelineStatus(); + + console.log(`Total Sessions: ${pipelineStatus.totalSessions}\n`); + + // Display status for each stage + const stages = ['CSV_IMPORT', 'TRANSCRIPT_FETCH', 'SESSION_CREATION', 'AI_ANALYSIS', 'QUESTION_EXTRACTION']; + + for (const stage of stages) { + console.log(`${stage}:`); + const stageData = pipelineStatus.pipeline[stage] || {}; + + const pending = stageData.PENDING || 0; + const inProgress = stageData.IN_PROGRESS || 0; + const completed = stageData.COMPLETED || 0; + const failed = stageData.FAILED || 0; + const skipped = stageData.SKIPPED || 0; + + console.log(` PENDING: ${pending}`); + console.log(` IN_PROGRESS: ${inProgress}`); + console.log(` COMPLETED: ${completed}`); + console.log(` FAILED: ${failed}`); + console.log(` SKIPPED: ${skipped}`); + console.log(''); + } + + // Show what needs processing + console.log('=== WHAT NEEDS PROCESSING ==='); + + for (const stage of stages) { + const stageData = pipelineStatus.pipeline[stage] || {}; + const pending = stageData.PENDING || 0; + const failed = stageData.FAILED || 0; + + if (pending > 0 || failed > 0) { + console.log(`• ${stage}: ${pending} pending, ${failed} failed`); + } + } + + // Show failed sessions if any + const failedSessions = await ProcessingStatusManager.getFailedSessions(); + if (failedSessions.length > 0) { + console.log('\n=== FAILED SESSIONS ==='); + failedSessions.slice(0, 5).forEach(failure => { + console.log(` ${failure.session.import?.externalSessionId || failure.sessionId}: ${failure.stage} - ${failure.errorMessage}`); + }); + + if (failedSessions.length > 5) { + console.log(` ... and ${failedSessions.length - 5} more failed sessions`); + } + } + + // Show sessions ready for AI processing + const readyForAI = await ProcessingStatusManager.getSessionsNeedingProcessing('AI_ANALYSIS', 5); + if (readyForAI.length > 0) { + console.log('\n=== SESSIONS READY FOR AI PROCESSING ==='); + readyForAI.forEach(status => { + console.log(` ${status.session.import?.externalSessionId || status.sessionId} (created: ${status.session.createdAt})`); + }); + } + + } catch (error) { + console.error('Error checking pipeline status:', error); + } finally { + await prisma.$disconnect(); + } +} + +checkRefactoredPipelineStatus(); diff --git a/debug-import-status.ts b/debug-import-status.ts new file mode 100644 index 0000000..7a4eaa8 --- /dev/null +++ b/debug-import-status.ts @@ -0,0 +1,81 @@ +import { PrismaClient } from '@prisma/client'; +import { ProcessingStatusManager } from './lib/processingStatusManager'; + +const prisma = new PrismaClient(); + +async function debugImportStatus() { + try { + console.log('=== DEBUGGING PROCESSING STATUS (REFACTORED SYSTEM) ===\n'); + + // Get pipeline status using the new system + const pipelineStatus = await ProcessingStatusManager.getPipelineStatus(); + + console.log(`Total Sessions: ${pipelineStatus.totalSessions}\n`); + + // Display status for each stage + const stages = ['CSV_IMPORT', 'TRANSCRIPT_FETCH', 'SESSION_CREATION', 'AI_ANALYSIS', 'QUESTION_EXTRACTION']; + + for (const stage of stages) { + console.log(`${stage}:`); + const stageData = pipelineStatus.pipeline[stage] || {}; + + const pending = stageData.PENDING || 0; + const inProgress = stageData.IN_PROGRESS || 0; + const completed = stageData.COMPLETED || 0; + const failed = stageData.FAILED || 0; + const skipped = stageData.SKIPPED || 0; + + console.log(` PENDING: ${pending}`); + console.log(` IN_PROGRESS: ${inProgress}`); + console.log(` COMPLETED: ${completed}`); + console.log(` FAILED: ${failed}`); + console.log(` SKIPPED: ${skipped}`); + console.log(''); + } + + // Check Sessions vs SessionImports + console.log('=== SESSION IMPORT RELATIONSHIP ==='); + const sessionsWithImports = await prisma.session.count({ + where: { importId: { not: null } } + }); + const totalSessions = await prisma.session.count(); + + console.log(` Sessions with importId: ${sessionsWithImports}`); + console.log(` Total sessions: ${totalSessions}`); + + // Show failed sessions if any + const failedSessions = await ProcessingStatusManager.getFailedSessions(); + if (failedSessions.length > 0) { + console.log('\n=== FAILED SESSIONS ==='); + failedSessions.slice(0, 10).forEach(failure => { + console.log(` ${failure.session.import?.externalSessionId || failure.sessionId}: ${failure.stage} - ${failure.errorMessage}`); + }); + + if (failedSessions.length > 10) { + console.log(` ... and ${failedSessions.length - 10} more failed sessions`); + } + } else { + console.log('\n✓ No failed sessions found'); + } + + // Show what needs processing + console.log('\n=== WHAT NEEDS PROCESSING ==='); + + for (const stage of stages) { + const stageData = pipelineStatus.pipeline[stage] || {}; + const pending = stageData.PENDING || 0; + const failed = stageData.FAILED || 0; + + if (pending > 0 || failed > 0) { + console.log(`• ${stage}: ${pending} pending, ${failed} failed`); + } + } + + } catch (error) { + console.error('Error debugging processing status:', error); + } finally { + await prisma.$disconnect(); + } +} + +debugImportStatus(); diff --git a/demo-admin-user.txt b/demo-admin-user.txt deleted file mode 100644 index 5ccc906..0000000 --- a/demo-admin-user.txt +++ /dev/null @@ -1,2 +0,0 @@ -user: admin@demo.com -password: admin123 diff --git a/docs/dashboard-components.md b/docs/dashboard-components.md index 4bf8970..c56bf9f 100644 --- a/docs/dashboard-components.md +++ b/docs/dashboard-components.md @@ -12,10 +12,10 @@ The WordCloud component visualizes categories or topics based on their frequency **Features:** -- Dynamic sizing based on frequency -- Colorful display with a pleasing color palette -- Responsive design -- Interactive hover effects +- Dynamic sizing based on frequency +- Colorful display with a pleasing color palette +- Responsive design +- Interactive hover effects ### 2. GeographicMap @@ -25,10 +25,10 @@ This component displays a world map with circles representing the number of sess **Features:** -- Interactive map using React Leaflet -- Circle sizes scaled by session count -- Tooltips showing country names and session counts -- Responsive design +- Interactive map using React Leaflet +- Circle sizes scaled by session count +- Tooltips showing country names and session counts +- Responsive design ### 3. MetricCard @@ -38,10 +38,10 @@ A modern, visually appealing card for displaying key metrics. **Features:** -- Multiple design variants (default, primary, success, warning, danger) -- Support for trend indicators -- Icons and descriptions -- Clean, modern styling +- Multiple design variants (default, primary, success, warning, danger) +- Support for trend indicators +- Icons and descriptions +- Clean, modern styling ### 4. DonutChart @@ -51,10 +51,10 @@ An enhanced donut chart with better styling and a central text display capabilit **Features:** -- Customizable colors -- Center text area for displaying summaries -- Interactive tooltips with percentages -- Well-balanced legend display +- Customizable colors +- Center text area for displaying summaries +- Interactive tooltips with percentages +- Well-balanced legend display ### 5. ResponseTimeDistribution @@ -64,28 +64,28 @@ Visualizes the distribution of response times as a histogram. **Features:** -- Color-coded bars (green for fast, yellow for medium, red for slow) -- Target time indicator -- Automatic binning of response times -- Clear labeling and scales +- Color-coded bars (green for fast, yellow for medium, red for slow) +- Target time indicator +- Automatic binning of response times +- Clear labeling and scales ## Dashboard Enhancements The dashboard has been enhanced with: -1. **Improved Layout**: Better use of space and responsive grid layouts -2. **Visual Hierarchies**: Clear heading styles and consistent spacing -3. **Color Coding**: Semantic use of colors to indicate statuses -4. **Interactive Elements**: Better button styles with loading indicators -5. **Data Context**: More complete view of metrics with additional visualizations -6. **Geographic Insights**: Map view of session distribution by country -7. **Language Analysis**: Improved language distribution visualization -8. **Category Analysis**: Word cloud for category popularity -9. **Performance Metrics**: Response time distribution for better insight into system performance +1. **Improved Layout**: Better use of space and responsive grid layouts +2. **Visual Hierarchies**: Clear heading styles and consistent spacing +3. **Color Coding**: Semantic use of colors to indicate statuses +4. **Interactive Elements**: Better button styles with loading indicators +5. **Data Context**: More complete view of metrics with additional visualizations +6. **Geographic Insights**: Map view of session distribution by country +7. **Language Analysis**: Improved language distribution visualization +8. **Category Analysis**: Word cloud for category popularity +9. **Performance Metrics**: Response time distribution for better insight into system performance ## Usage Notes -- The geographic map and response time distribution use simulated data where actual data is not available -- All components are responsive and will adjust to different screen sizes -- The dashboard automatically refreshes data when using the refresh button -- Admin users have access to additional controls at the bottom of the dashboard +- The geographic map and response time distribution use simulated data where actual data is not available +- All components are responsive and will adjust to different screen sizes +- The dashboard automatically refreshes data when using the refresh button +- Admin users have access to additional controls at the bottom of the dashboard diff --git a/docs/scheduler-fixes.md b/docs/scheduler-fixes.md index 913570a..e096fc9 100644 --- a/docs/scheduler-fixes.md +++ b/docs/scheduler-fixes.md @@ -8,8 +8,8 @@ **Solution**: -- Added validation in `fetchAndStoreSessionsForAllCompanies()` to skip companies with example/invalid URLs -- Removed the invalid company record from the database using `fix_companies.js` +- Added validation in `fetchAndStoreSessionsForAllCompanies()` to skip companies with example/invalid URLs +- Removed the invalid company record from the database using `fix_companies.js` ### 2. Transcript Fetching Errors @@ -17,10 +17,10 @@ **Solution**: -- Improved error handling in `fetchTranscriptContent()` function -- Added probabilistic logging (only ~10% of errors logged) to prevent log spam -- Added timeout (10 seconds) for transcript fetching -- Made transcript fetching failures non-blocking (sessions are still created without transcript content) +- Improved error handling in `fetchTranscriptContent()` function +- Added probabilistic logging (only ~10% of errors logged) to prevent log spam +- Added timeout (10 seconds) for transcript fetching +- Made transcript fetching failures non-blocking (sessions are still created without transcript content) ### 3. CSV Fetching Errors @@ -28,8 +28,8 @@ **Solution**: -- Added URL validation to skip companies with `example.com` URLs -- Improved error logging to be more descriptive +- Added URL validation to skip companies with `example.com` URLs +- Improved error logging to be more descriptive ## Current Status @@ -42,22 +42,22 @@ After cleanup, only valid companies remain: -- **Demo Company** (`790b9233-d369-451f-b92c-f4dceb42b649`) - - CSV URL: `https://proto.notso.ai/jumbo/chats` - - Has valid authentication credentials - - 107 sessions in database +- **Demo Company** (`790b9233-d369-451f-b92c-f4dceb42b649`) + - CSV URL: `https://proto.notso.ai/jumbo/chats` + - Has valid authentication credentials + - 107 sessions in database ## Files Modified -1. **lib/csvFetcher.js** +1. **lib/csvFetcher.js** - - Added company URL validation - - Improved transcript fetching error handling - - Reduced error log verbosity + - Added company URL validation + - Improved transcript fetching error handling + - Reduced error log verbosity -2. **fix_companies.js** (cleanup script) - - Removes invalid company records - - Can be run again if needed +2. **fix_companies.js** (cleanup script) + - Removes invalid company records + - Can be run again if needed ## Monitoring @@ -73,7 +73,7 @@ node -e "import('./lib/csvFetcher.js').then(m => m.fetchAndStoreSessionsForAllCo ## Future Improvements -1. Add health check endpoint for scheduler status -2. Add metrics for successful/failed fetches -3. Consider retry logic for temporary failures -4. Add alerting for persistent failures +1. Add health check endpoint for scheduler status +2. Add metrics for successful/failed fetches +3. Consider retry logic for temporary failures +4. Add alerting for persistent failures diff --git a/docs/scheduler-workflow.md b/docs/scheduler-workflow.md index 3d55093..cc270b6 100644 --- a/docs/scheduler-workflow.md +++ b/docs/scheduler-workflow.md @@ -1,24 +1,28 @@ # Scheduler Workflow Documentation ## Overview + The LiveDash system has two main schedulers that work together to fetch and process session data: -1. **Session Refresh Scheduler** - Fetches new sessions from CSV files -2. **Processing Scheduler** - Processes session transcripts with AI +1. **Session Refresh Scheduler** - Fetches new sessions from CSV files +2. **Processing Scheduler** - Processes session transcripts with AI ## Current Status (as of latest check) -- **Total sessions**: 107 -- **Processed sessions**: 0 -- **Sessions with transcript**: 0 -- **Ready for processing**: 0 + +- **Total sessions**: 107 +- **Processed sessions**: 0 +- **Sessions with transcript**: 0 +- **Ready for processing**: 0 ## How the `processed` Field Works The ProcessingScheduler picks up sessions where `processed` is **NOT** `true`, which includes: -- `processed = false` -- `processed = null` + +- `processed = false` +- `processed = null` **Query used:** + ```javascript { processed: { not: true } } // Either false or null ``` @@ -26,50 +30,60 @@ The ProcessingScheduler picks up sessions where `processed` is **NOT** `true`, w ## Complete Workflow ### Step 1: Session Refresh (CSV Fetching) + **What it does:** -- Fetches session data from company CSV URLs -- Creates session records in database with basic metadata -- Sets `transcriptContent = null` initially -- Sets `processed = null` initially + +- Fetches session data from company CSV URLs +- Creates session records in database with basic metadata +- Sets `transcriptContent = null` initially +- Sets `processed = null` initially **Runs:** Every 30 minutes (cron: `*/30 * * * *`) ### Step 2: Transcript Fetching + **What it does:** -- Downloads full transcript content for sessions -- Updates `transcriptContent` field with actual conversation data -- Sessions remain `processed = null` until AI processing + +- Downloads full transcript content for sessions +- Updates `transcriptContent` field with actual conversation data +- Sessions remain `processed = null` until AI processing **Runs:** As part of session refresh process ### Step 3: AI Processing + **What it does:** -- Finds sessions with transcript content where `processed != true` -- Sends transcripts to OpenAI for analysis -- Extracts: sentiment, category, questions, summary, etc. -- Updates session with processed data -- Sets `processed = true` + +- Finds sessions with transcript content where `processed != true` +- Sends transcripts to OpenAI for analysis +- Extracts: sentiment, category, questions, summary, etc. +- Updates session with processed data +- Sets `processed = true` **Runs:** Every hour (cron: `0 * * * *`) ## Manual Trigger Commands ### Check Current Status + ```bash node scripts/manual-triggers.js status ``` ### Trigger Session Refresh (Fetch new sessions from CSV) + ```bash node scripts/manual-triggers.js refresh ``` ### Trigger AI Processing (Process unprocessed sessions) + ```bash node scripts/manual-triggers.js process ``` ### Run Both Schedulers + ```bash node scripts/manual-triggers.js both ``` @@ -77,36 +91,42 @@ node scripts/manual-triggers.js both ## Troubleshooting ### No Sessions Being Processed? -1. **Check if sessions have transcripts:** + +1. **Check if sessions have transcripts:** + ```bash node scripts/manual-triggers.js status ``` -2. **If "Sessions with transcript" is 0:** - - Sessions exist but transcripts haven't been fetched yet - - Run session refresh: `node scripts/manual-triggers.js refresh` +2. **If "Sessions with transcript" is 0:** + - Sessions exist but transcripts haven't been fetched yet + - Run session refresh: `node scripts/manual-triggers.js refresh` -3. **If "Ready for processing" is 0 but "Sessions with transcript" > 0:** - - All sessions with transcripts have already been processed - - Check if `OPENAI_API_KEY` is set in environment +3. **If "Ready for processing" is 0 but "Sessions with transcript" > 0:** + - All sessions with transcripts have already been processed + - Check if `OPENAI_API_KEY` is set in environment ### Common Issues #### "No sessions found requiring processing" -- All sessions with transcripts have been processed (`processed = true`) -- Or no sessions have transcript content yet + +- All sessions with transcripts have been processed (`processed = true`) +- Or no sessions have transcript content yet #### "OPENAI_API_KEY environment variable is not set" -- Add OpenAI API key to `.env.development` file -- Restart the application + +- Add OpenAI API key to `.env.development` file +- Restart the application #### "Error fetching transcript: Unauthorized" -- CSV credentials are incorrect or expired -- Check company CSV username/password in database + +- CSV credentials are incorrect or expired +- Check company CSV username/password in database ## Database Field Mapping ### Before AI Processing + ```javascript { id: "session-uuid", @@ -120,6 +140,7 @@ node scripts/manual-triggers.js both ``` ### After AI Processing + ```javascript { id: "session-uuid", @@ -141,15 +162,17 @@ node scripts/manual-triggers.js both ## Scheduler Configuration ### Session Refresh Scheduler -- **File**: `lib/scheduler.js` -- **Frequency**: Every 30 minutes -- **Cron**: `*/30 * * * *` + +- **File**: `lib/scheduler.js` +- **Frequency**: Every 30 minutes +- **Cron**: `*/30 * * * *` ### Processing Scheduler -- **File**: `lib/processingScheduler.js` -- **Frequency**: Every hour -- **Cron**: `0 * * * *` -- **Batch size**: 10 sessions per run + +- **File**: `lib/processingScheduler.js` +- **Frequency**: Every hour +- **Cron**: `0 * * * *` +- **Batch size**: 10 sessions per run ## Environment Variables Required @@ -167,19 +190,22 @@ NEXTAUTH_URL="http://localhost:3000" ## Next Steps for Testing -1. **Trigger session refresh** to fetch transcripts: +1. **Trigger session refresh** to fetch transcripts: + ```bash node scripts/manual-triggers.js refresh ``` -2. **Check status** to see if transcripts were fetched: +2. **Check status** to see if transcripts were fetched: + ```bash node scripts/manual-triggers.js status ``` -3. **Trigger processing** if transcripts are available: +3. **Trigger processing** if transcripts are available: + ```bash node scripts/manual-triggers.js process ``` -4. **View results** in the dashboard session details pages +4. **View results** in the dashboard session details pages diff --git a/docs/session-processing.md b/docs/session-processing.md index 18c424d..abb1a31 100644 --- a/docs/session-processing.md +++ b/docs/session-processing.md @@ -6,47 +6,47 @@ This document explains how the session processing system works in LiveDash-Node. The system now includes an automated process for analyzing chat session transcripts using OpenAI's API. This process: -1. Fetches session data from CSV sources -2. Only adds new sessions that don't already exist in the database -3. Processes session transcripts with OpenAI to extract valuable insights -4. Updates the database with the processed information +1. Fetches session data from CSV sources +2. Only adds new sessions that don't already exist in the database +3. Processes session transcripts with OpenAI to extract valuable insights +4. Updates the database with the processed information ## How It Works ### Session Fetching -- The system fetches session data from configured CSV URLs for each company -- Unlike the previous implementation, it now only adds sessions that don't already exist in the database -- This prevents duplicate sessions and allows for incremental updates +- The system fetches session data from configured CSV URLs for each company +- Unlike the previous implementation, it now only adds sessions that don't already exist in the database +- This prevents duplicate sessions and allows for incremental updates ### Transcript Processing -- For sessions with transcript content that haven't been processed yet, the system calls OpenAI's API -- The API analyzes the transcript and extracts the following information: - - Primary language used (ISO 639-1 code) - - Number of messages sent by the user - - Overall sentiment (positive, neutral, negative) - - Whether the conversation was escalated - - Whether HR contact was mentioned or provided - - Best-fitting category for the conversation - - Up to 5 paraphrased questions asked by the user - - A brief summary of the conversation +- For sessions with transcript content that haven't been processed yet, the system calls OpenAI's API +- The API analyzes the transcript and extracts the following information: + - Primary language used (ISO 639-1 code) + - Number of messages sent by the user + - Overall sentiment (positive, neutral, negative) + - Whether the conversation was escalated + - Whether HR contact was mentioned or provided + - Best-fitting category for the conversation + - Up to 5 paraphrased questions asked by the user + - A brief summary of the conversation ### Scheduling The system includes two schedulers: -1. **Session Refresh Scheduler**: Runs every 15 minutes to fetch new sessions from CSV sources -2. **Session Processing Scheduler**: Runs every hour to process unprocessed sessions with OpenAI +1. **Session Refresh Scheduler**: Runs every 15 minutes to fetch new sessions from CSV sources +2. **Session Processing Scheduler**: Runs every hour to process unprocessed sessions with OpenAI ## Database Schema The Session model has been updated with new fields to store the processed data: -- `processed`: Boolean flag indicating whether the session has been processed -- `sentimentCategory`: String value ("positive", "neutral", "negative") from OpenAI -- `questions`: JSON array of questions asked by the user -- `summary`: Brief summary of the conversation +- `processed`: Boolean flag indicating whether the session has been processed +- `sentimentCategory`: String value ("positive", "neutral", "negative") from OpenAI +- `questions`: JSON array of questions asked by the user +- `summary`: Brief summary of the conversation ## Configuration @@ -62,9 +62,9 @@ OPENAI_API_KEY=your_api_key_here To run the application with schedulers enabled: -- Development: `npm run dev` -- Development (with schedulers disabled): `npm run dev:no-schedulers` -- Production: `npm run start` +- Development: `npm run dev` +- Development (with schedulers disabled): `npm run dev:no-schedulers` +- Production: `npm run start` Note: These commands will start a custom Next.js server with the schedulers enabled. You'll need to have an OpenAI API key set in your `.env.local` file for the session processing to work. @@ -82,5 +82,5 @@ This will process all unprocessed sessions that have transcript content. The processing logic can be customized by modifying: -- `lib/processingScheduler.ts`: Contains the OpenAI processing logic -- `scripts/process_sessions.ts`: Standalone script for manual processing +- `lib/processingScheduler.ts`: Contains the OpenAI processing logic +- `scripts/process_sessions.ts`: Standalone script for manual processing diff --git a/docs/transcript-parsing-implementation.md b/docs/transcript-parsing-implementation.md index 51fabba..0f0c018 100644 --- a/docs/transcript-parsing-implementation.md +++ b/docs/transcript-parsing-implementation.md @@ -1,11 +1,13 @@ # Transcript Parsing Implementation ## Overview + Added structured message parsing to the LiveDash system, allowing transcripts to be broken down into individual messages with timestamps, roles, and content. This provides a much better user experience for viewing conversations. ## Database Changes ### New Message Table + ```sql CREATE TABLE Message ( id TEXT PRIMARY KEY DEFAULT (uuid()), @@ -22,12 +24,14 @@ CREATE INDEX Message_sessionId_order_idx ON Message(sessionId, order); ``` ### Updated Session Table -- Added `messages` relation to Session model -- Sessions can now have both raw transcript content AND parsed messages + +- Added `messages` relation to Session model +- Sessions can now have both raw transcript content AND parsed messages ## New Components ### 1. Message Interface (`lib/types.ts`) + ```typescript export interface Message { id: string; @@ -41,36 +45,43 @@ export interface Message { ``` ### 2. Transcript Parser (`lib/transcriptParser.js`) -- **`parseChatLogToJSON(logString)`** - Parses raw transcript text into structured messages -- **`storeMessagesForSession(sessionId, messages)`** - Stores parsed messages in database -- **`processTranscriptForSession(sessionId, transcriptContent)`** - Complete processing for one session -- **`processAllUnparsedTranscripts()`** - Batch process all unparsed transcripts -- **`getMessagesForSession(sessionId)`** - Retrieve messages for a session + +- **`parseChatLogToJSON(logString)`** - Parses raw transcript text into structured messages +- **`storeMessagesForSession(sessionId, messages)`** - Stores parsed messages in database +- **`processTranscriptForSession(sessionId, transcriptContent)`** - Complete processing for one session +- **`processAllUnparsedTranscripts()`** - Batch process all unparsed transcripts +- **`getMessagesForSession(sessionId)`** - Retrieve messages for a session ### 3. MessageViewer Component (`components/MessageViewer.tsx`) -- Chat-like interface for displaying parsed messages -- Color-coded by role (User: blue, Assistant: gray, System: yellow) -- Shows timestamps and message order -- Scrollable with conversation metadata + +- Chat-like interface for displaying parsed messages +- Color-coded by role (User: blue, Assistant: gray, System: yellow) +- Shows timestamps and message order +- Scrollable with conversation metadata ## Updated Components ### 1. Session API (`pages/api/dashboard/session/[id].ts`) -- Now includes parsed messages in session response -- Messages are ordered by `order` field (ascending) + +- Now includes parsed messages in session response +- Messages are ordered by `order` field (ascending) ### 2. Session Details Page (`app/dashboard/sessions/[id]/page.tsx`) -- Added MessageViewer component -- Shows both parsed messages AND raw transcript -- Prioritizes parsed messages when available + +- Added MessageViewer component +- Shows both parsed messages AND raw transcript +- Prioritizes parsed messages when available ### 3. ChatSession Interface (`lib/types.ts`) -- Added optional `messages?: Message[]` field + +- Added optional `messages?: Message[]` field ## Parsing Logic ### Supported Format + The parser expects transcript format: + ``` [DD.MM.YYYY HH:MM:SS] Role: Message content [DD.MM.YYYY HH:MM:SS] User: Hello, I need help @@ -78,15 +89,17 @@ The parser expects transcript format: ``` ### Features -- **Multi-line support** - Messages can span multiple lines -- **Timestamp parsing** - Converts DD.MM.YYYY HH:MM:SS to ISO format -- **Role detection** - Extracts sender role from each message -- **Ordering** - Maintains conversation order with explicit order field -- **Sorting** - Messages sorted by timestamp, then by role (User before Assistant) + +- **Multi-line support** - Messages can span multiple lines +- **Timestamp parsing** - Converts DD.MM.YYYY HH:MM:SS to ISO format +- **Role detection** - Extracts sender role from each message +- **Ordering** - Maintains conversation order with explicit order field +- **Sorting** - Messages sorted by timestamp, then by role (User before Assistant) ## Manual Commands ### New Commands Added + ```bash # Parse transcripts into structured messages node scripts/manual-triggers.js parse @@ -99,17 +112,20 @@ node scripts/manual-triggers.js status ``` ### Updated Commands -- **`status`** - Now shows transcript and parsing statistics -- **`all`** - New command that runs refresh → parse → process in sequence + +- **`status`** - Now shows transcript and parsing statistics +- **`all`** - New command that runs refresh → parse → process in sequence ## Workflow Integration ### Complete Processing Pipeline -1. **Session Refresh** - Fetch sessions from CSV, download transcripts -2. **Transcript Parsing** - Parse raw transcripts into structured messages -3. **AI Processing** - Process sessions with OpenAI for sentiment, categories, etc. + +1. **Session Refresh** - Fetch sessions from CSV, download transcripts +2. **Transcript Parsing** - Parse raw transcripts into structured messages +3. **AI Processing** - Process sessions with OpenAI for sentiment, categories, etc. ### Database States + ```javascript // After CSV fetch { @@ -139,21 +155,24 @@ node scripts/manual-triggers.js status ## User Experience Improvements ### Before -- Only raw transcript text in a text area -- Difficult to follow conversation flow -- No clear distinction between speakers + +- Only raw transcript text in a text area +- Difficult to follow conversation flow +- No clear distinction between speakers ### After -- **Chat-like interface** with message bubbles -- **Color-coded roles** for easy identification -- **Timestamps** for each message -- **Conversation metadata** (first/last message times) -- **Fallback to raw transcript** if parsing fails -- **Both views available** - structured AND raw + +- **Chat-like interface** with message bubbles +- **Color-coded roles** for easy identification +- **Timestamps** for each message +- **Conversation metadata** (first/last message times) +- **Fallback to raw transcript** if parsing fails +- **Both views available** - structured AND raw ## Testing ### Manual Testing Commands + ```bash # Check current status node scripts/manual-triggers.js status @@ -166,38 +185,44 @@ node scripts/manual-triggers.js all ``` ### Expected Results -1. Sessions with transcript content get parsed into individual messages -2. Session detail pages show chat-like interface -3. Both parsed messages and raw transcript are available -4. No data loss - original transcript content preserved + +1. Sessions with transcript content get parsed into individual messages +2. Session detail pages show chat-like interface +3. Both parsed messages and raw transcript are available +4. No data loss - original transcript content preserved ## Technical Benefits ### Performance -- **Indexed queries** - Messages indexed by sessionId and order -- **Efficient loading** - Only load messages when needed -- **Cascading deletes** - Messages automatically deleted with sessions + +- **Indexed queries** - Messages indexed by sessionId and order +- **Efficient loading** - Only load messages when needed +- **Cascading deletes** - Messages automatically deleted with sessions ### Maintainability -- **Separation of concerns** - Parsing logic isolated in dedicated module -- **Type safety** - Full TypeScript support for Message interface -- **Error handling** - Graceful fallbacks when parsing fails + +- **Separation of concerns** - Parsing logic isolated in dedicated module +- **Type safety** - Full TypeScript support for Message interface +- **Error handling** - Graceful fallbacks when parsing fails ### Extensibility -- **Role flexibility** - Supports any role names (User, Assistant, System, etc.) -- **Content preservation** - Multi-line messages fully supported -- **Metadata ready** - Easy to add message-level metadata in future + +- **Role flexibility** - Supports any role names (User, Assistant, System, etc.) +- **Content preservation** - Multi-line messages fully supported +- **Metadata ready** - Easy to add message-level metadata in future ## Migration Notes ### Existing Data -- **No data loss** - Original transcript content preserved -- **Backward compatibility** - Pages work with or without parsed messages -- **Gradual migration** - Can parse transcripts incrementally + +- **No data loss** - Original transcript content preserved +- **Backward compatibility** - Pages work with or without parsed messages +- **Gradual migration** - Can parse transcripts incrementally ### Database Migration -- New Message table created with foreign key constraints -- Existing Session table unchanged (only added relation) -- Index created for efficient message queries + +- New Message table created with foreign key constraints +- Existing Session table unchanged (only added relation) +- Index created for efficient message queries This implementation provides a solid foundation for enhanced conversation analysis and user experience while maintaining full backward compatibility. diff --git a/fix-import-status.ts b/fix-import-status.ts new file mode 100644 index 0000000..8026c99 --- /dev/null +++ b/fix-import-status.ts @@ -0,0 +1,88 @@ +import { PrismaClient, ProcessingStage, ProcessingStatus } from '@prisma/client'; +import { ProcessingStatusManager } from './lib/processingStatusManager'; + +const prisma = new PrismaClient(); + +async function fixProcessingStatus() { + try { + console.log('=== FIXING PROCESSING STATUS (REFACTORED SYSTEM) ===\n'); + + // Check for any failed processing stages that might need retry + const failedSessions = await ProcessingStatusManager.getFailedSessions(); + + console.log(`Found ${failedSessions.length} failed processing stages`); + + if (failedSessions.length > 0) { + console.log('\nFailed sessions by stage:'); + const failuresByStage: Record = {}; + + failedSessions.forEach(failure => { + failuresByStage[failure.stage] = (failuresByStage[failure.stage] || 0) + 1; + }); + + Object.entries(failuresByStage).forEach(([stage, count]) => { + console.log(` ${stage}: ${count} failures`); + }); + + // Show sample failed sessions + console.log('\nSample failed sessions:'); + failedSessions.slice(0, 5).forEach(failure => { + console.log(` ${failure.session.import?.externalSessionId || failure.sessionId}: ${failure.stage} - ${failure.errorMessage}`); + }); + + // Ask if user wants to reset failed stages for retry + console.log('\nTo reset failed stages for retry, you can use:'); + console.log('ProcessingStatusManager.resetStageForRetry(sessionId, stage)'); + } + + // Check for sessions that might be stuck in IN_PROGRESS + const stuckSessions = await prisma.sessionProcessingStatus.findMany({ + where: { + status: ProcessingStatus.IN_PROGRESS, + startedAt: { + lt: new Date(Date.now() - 30 * 60 * 1000) // Started more than 30 minutes ago + } + }, + include: { + session: { + include: { + import: true + } + } + } + }); + + if (stuckSessions.length > 0) { + console.log(`\nFound ${stuckSessions.length} sessions stuck in IN_PROGRESS state:`); + stuckSessions.forEach(stuck => { + console.log(` ${stuck.session.import?.externalSessionId || stuck.sessionId}: ${stuck.stage} (started: ${stuck.startedAt})`); + }); + + console.log('\nThese sessions may need to be reset to PENDING status for retry.'); + } + + // Show current pipeline status + console.log('\n=== CURRENT PIPELINE STATUS ==='); + const pipelineStatus = await ProcessingStatusManager.getPipelineStatus(); + + const stages = ['CSV_IMPORT', 'TRANSCRIPT_FETCH', 'SESSION_CREATION', 'AI_ANALYSIS', 'QUESTION_EXTRACTION']; + + for (const stage of stages) { + const stageData = pipelineStatus.pipeline[stage] || {}; + const pending = stageData.PENDING || 0; + const inProgress = stageData.IN_PROGRESS || 0; + const completed = stageData.COMPLETED || 0; + const failed = stageData.FAILED || 0; + const skipped = stageData.SKIPPED || 0; + + console.log(`${stage}: ${completed} completed, ${pending} pending, ${inProgress} in progress, ${failed} failed, ${skipped} skipped`); + } + + } catch (error) { + console.error('Error fixing processing status:', error); + } finally { + await prisma.$disconnect(); + } +} + +fixProcessingStatus(); diff --git a/lib/importProcessor.ts b/lib/importProcessor.ts index 16d9da0..c360a60 100644 --- a/lib/importProcessor.ts +++ b/lib/importProcessor.ts @@ -1,7 +1,8 @@ // SessionImport to Session processor -import { PrismaClient, ImportStatus, SentimentCategory, SessionCategory } from "@prisma/client"; +import { PrismaClient, SentimentCategory, SessionCategory, ProcessingStage } from "@prisma/client"; import { getSchedulerConfig } from "./env"; import { fetchTranscriptContent, isValidTranscriptUrl } from "./transcriptFetcher"; +import { ProcessingStatusManager } from "./processingStatusManager"; import cron from "node-cron"; const prisma = new PrismaClient(); @@ -62,21 +63,130 @@ function parseFallbackBoolean(rawValue: string | null): boolean | null { return ['true', '1', 'yes', 'escalated', 'forwarded'].includes(rawValue.toLowerCase()); } +/** + * Parse transcript content into Message records + */ +async function parseTranscriptIntoMessages(sessionId: string, transcriptContent: string): Promise { + // Clear existing messages for this session + await prisma.message.deleteMany({ + where: { sessionId } + }); + + // Split transcript into lines and parse each message + const lines = transcriptContent.split('\n').filter(line => line.trim()); + let order = 0; + + for (const line of lines) { + const trimmedLine = line.trim(); + if (!trimmedLine) continue; + + // Try to parse different formats: + // Format 1: "User: message" or "Assistant: message" + // Format 2: "[timestamp] User: message" or "[timestamp] Assistant: message" + + let role = 'unknown'; + let content = trimmedLine; + let timestamp: Date | null = null; + + // Check for timestamp format: [DD.MM.YYYY HH:mm:ss] Role: content + const timestampMatch = trimmedLine.match(/^\[([^\]]+)\]\s*(.+)$/); + if (timestampMatch) { + try { + timestamp = parseEuropeanDate(timestampMatch[1]); + content = timestampMatch[2]; + } catch (error) { + // If timestamp parsing fails, treat the whole line as content + content = trimmedLine; + } + } + + // Extract role and message content + const roleMatch = content.match(/^(User|Assistant|System):\s*(.*)$/i); + if (roleMatch) { + role = roleMatch[1].toLowerCase(); + content = roleMatch[2].trim(); + } else { + // If no role prefix found, try to infer from context or use 'unknown' + role = 'unknown'; + } + + // Skip empty content + if (!content) continue; + + // Create message record + await prisma.message.create({ + data: { + sessionId, + timestamp, + role, + content, + order, + }, + }); + + order++; + } + + console.log(`[Import Processor] ✓ Parsed ${order} messages for session ${sessionId}`); +} + /** * Process a single SessionImport record into a Session record - * NEW STRATEGY: Only copy minimal fields, let AI processing handle the rest + * Uses new unified processing status tracking */ async function processSingleImport(importRecord: any): Promise<{ success: boolean; error?: string }> { + let sessionId: string | null = null; + try { // Parse dates using European format parser const startTime = parseEuropeanDate(importRecord.startTimeRaw); const endTime = parseEuropeanDate(importRecord.endTimeRaw); - console.log(`[Import Processor] Parsed dates for ${importRecord.externalSessionId}: ${startTime.toISOString()} - ${endTime.toISOString()}`); + console.log(`[Import Processor] Processing ${importRecord.externalSessionId}: ${startTime.toISOString()} - ${endTime.toISOString()}`); - // Fetch transcript content if URL is provided and not already fetched + // Create or update Session record with MINIMAL processing + const session = await prisma.session.upsert({ + where: { + importId: importRecord.id, + }, + update: { + startTime, + endTime, + // Direct copies (minimal processing) + ipAddress: importRecord.ipAddress, + country: importRecord.countryCode, // Keep as country code + fullTranscriptUrl: importRecord.fullTranscriptUrl, + avgResponseTime: importRecord.avgResponseTimeSeconds, + initialMsg: importRecord.initialMessage, + }, + create: { + companyId: importRecord.companyId, + importId: importRecord.id, + startTime, + endTime, + // Direct copies (minimal processing) + ipAddress: importRecord.ipAddress, + country: importRecord.countryCode, // Keep as country code + fullTranscriptUrl: importRecord.fullTranscriptUrl, + avgResponseTime: importRecord.avgResponseTimeSeconds, + initialMsg: importRecord.initialMessage, + }, + }); + + sessionId = session.id; + + // Initialize processing status for this session + await ProcessingStatusManager.initializeSession(sessionId); + + // Mark CSV_IMPORT as completed + await ProcessingStatusManager.completeStage(sessionId, ProcessingStage.CSV_IMPORT); + + // Handle transcript fetching let transcriptContent = importRecord.rawTranscriptContent; + if (!transcriptContent && importRecord.fullTranscriptUrl && isValidTranscriptUrl(importRecord.fullTranscriptUrl)) { + await ProcessingStatusManager.startStage(sessionId, ProcessingStage.TRANSCRIPT_FETCH); + console.log(`[Import Processor] Fetching transcript for ${importRecord.externalSessionId}...`); // Get company credentials for transcript fetching @@ -100,125 +210,123 @@ async function processSingleImport(importRecord: any): Promise<{ success: boolea where: { id: importRecord.id }, data: { rawTranscriptContent: transcriptContent }, }); + + await ProcessingStatusManager.completeStage(sessionId, ProcessingStage.TRANSCRIPT_FETCH, { + contentLength: transcriptContent?.length || 0, + url: importRecord.fullTranscriptUrl + }); } else { console.log(`[Import Processor] ⚠️ Failed to fetch transcript for ${importRecord.externalSessionId}: ${transcriptResult.error}`); + await ProcessingStatusManager.failStage(sessionId, ProcessingStage.TRANSCRIPT_FETCH, transcriptResult.error || 'Unknown error'); } + } else if (!importRecord.fullTranscriptUrl) { + // No transcript URL available - skip this stage + await ProcessingStatusManager.skipStage(sessionId, ProcessingStage.TRANSCRIPT_FETCH, 'No transcript URL provided'); + } else { + // Transcript already fetched + await ProcessingStatusManager.completeStage(sessionId, ProcessingStage.TRANSCRIPT_FETCH, { + contentLength: transcriptContent?.length || 0, + source: 'already_fetched' + }); } - // Create or update Session record with MINIMAL processing - // Only copy fields that don't need AI analysis - const session = await prisma.session.upsert({ - where: { - importId: importRecord.id, - }, - update: { - startTime, - endTime, - // Direct copies (minimal processing) - ipAddress: importRecord.ipAddress, - country: importRecord.countryCode, // Keep as country code - fullTranscriptUrl: importRecord.fullTranscriptUrl, - avgResponseTime: importRecord.avgResponseTimeSeconds, - initialMsg: importRecord.initialMessage, - - // AI-processed fields: Leave empty, will be filled by AI processing - // language: null, // AI will detect - // messagesSent: null, // AI will count from Messages - // sentiment: null, // AI will analyze - // escalated: null, // AI will detect - // forwardedHr: null, // AI will detect - // category: null, // AI will categorize - // summary: null, // AI will generate - - processed: false, // Will be processed later by AI - }, - create: { - companyId: importRecord.companyId, - importId: importRecord.id, - startTime, - endTime, - // Direct copies (minimal processing) - ipAddress: importRecord.ipAddress, - country: importRecord.countryCode, // Keep as country code - fullTranscriptUrl: importRecord.fullTranscriptUrl, - avgResponseTime: importRecord.avgResponseTimeSeconds, - initialMsg: importRecord.initialMessage, - - // AI-processed fields: Leave empty, will be filled by AI processing - // All these will be null initially and filled by AI - processed: false, // Will be processed later by AI - }, - }); + // Handle session creation (parse messages) + await ProcessingStatusManager.startStage(sessionId, ProcessingStage.SESSION_CREATION); + + if (transcriptContent) { + await parseTranscriptIntoMessages(sessionId, transcriptContent); + } - // Update import status to DONE - await prisma.sessionImport.update({ - where: { id: importRecord.id }, - data: { - status: ImportStatus.DONE, - processedAt: new Date(), - errorMsg: null, - }, + await ProcessingStatusManager.completeStage(sessionId, ProcessingStage.SESSION_CREATION, { + hasTranscript: !!transcriptContent, + transcriptLength: transcriptContent?.length || 0 }); return { success: true }; } catch (error) { - // Update import status to ERROR - await prisma.sessionImport.update({ - where: { id: importRecord.id }, - data: { - status: ImportStatus.ERROR, - errorMsg: error instanceof Error ? error.message : String(error), - }, - }); + const errorMessage = error instanceof Error ? error.message : String(error); + + // Mark the current stage as failed if we have a sessionId + if (sessionId) { + // Determine which stage failed based on the error + if (errorMessage.includes('transcript') || errorMessage.includes('fetch')) { + await ProcessingStatusManager.failStage(sessionId, ProcessingStage.TRANSCRIPT_FETCH, errorMessage); + } else if (errorMessage.includes('message') || errorMessage.includes('parse')) { + await ProcessingStatusManager.failStage(sessionId, ProcessingStage.SESSION_CREATION, errorMessage); + } else { + // General failure - mark CSV_IMPORT as failed + await ProcessingStatusManager.failStage(sessionId, ProcessingStage.CSV_IMPORT, errorMessage); + } + } return { success: false, - error: error instanceof Error ? error.message : String(error), + error: errorMessage, }; } } /** - * Process queued SessionImport records into Session records + * Process unprocessed SessionImport records into Session records + * Uses new processing status system to find imports that need processing */ export async function processQueuedImports(batchSize: number = 50): Promise { - console.log('[Import Processor] Starting to process queued imports...'); + console.log('[Import Processor] Starting to process unprocessed imports...'); - // Find queued imports - const queuedImports = await prisma.sessionImport.findMany({ - where: { - status: ImportStatus.QUEUED, - }, - take: batchSize, - orderBy: { - createdAt: 'asc', // Process oldest first - }, - }); + let totalSuccessCount = 0; + let totalErrorCount = 0; + let batchNumber = 1; - if (queuedImports.length === 0) { - console.log('[Import Processor] No queued imports found'); - return; - } + while (true) { + // Find SessionImports that don't have a corresponding Session yet + const unprocessedImports = await prisma.sessionImport.findMany({ + where: { + session: null, // No session created yet + }, + take: batchSize, + orderBy: { + createdAt: 'asc', // Process oldest first + }, + }); - console.log(`[Import Processor] Processing ${queuedImports.length} queued imports...`); + if (unprocessedImports.length === 0) { + if (batchNumber === 1) { + console.log('[Import Processor] No unprocessed imports found'); + } else { + console.log(`[Import Processor] All batches completed. Total: ${totalSuccessCount} successful, ${totalErrorCount} failed`); + } + return; + } - let successCount = 0; - let errorCount = 0; + console.log(`[Import Processor] Processing batch ${batchNumber}: ${unprocessedImports.length} imports...`); - // Process each import - for (const importRecord of queuedImports) { - const result = await processSingleImport(importRecord); - - if (result.success) { - successCount++; - console.log(`[Import Processor] ✓ Processed import ${importRecord.externalSessionId}`); - } else { - errorCount++; - console.log(`[Import Processor] ✗ Failed to process import ${importRecord.externalSessionId}: ${result.error}`); + let batchSuccessCount = 0; + let batchErrorCount = 0; + + // Process each import in this batch + for (const importRecord of unprocessedImports) { + const result = await processSingleImport(importRecord); + + if (result.success) { + batchSuccessCount++; + totalSuccessCount++; + console.log(`[Import Processor] ✓ Processed import ${importRecord.externalSessionId}`); + } else { + batchErrorCount++; + totalErrorCount++; + console.log(`[Import Processor] ✗ Failed to process import ${importRecord.externalSessionId}: ${result.error}`); + } + } + + console.log(`[Import Processor] Batch ${batchNumber} completed: ${batchSuccessCount} successful, ${batchErrorCount} failed`); + batchNumber++; + + // If this batch was smaller than the batch size, we're done + if (unprocessedImports.length < batchSize) { + console.log(`[Import Processor] All batches completed. Total: ${totalSuccessCount} successful, ${totalErrorCount} failed`); + return; } } - - console.log(`[Import Processor] Completed: ${successCount} successful, ${errorCount} failed`); } /** diff --git a/lib/processingScheduler.ts b/lib/processingScheduler.ts index 11d9220..d7c854e 100644 --- a/lib/processingScheduler.ts +++ b/lib/processingScheduler.ts @@ -7,25 +7,62 @@ import { getSchedulerConfig } from "./schedulerConfig"; const prisma = new PrismaClient(); const OPENAI_API_KEY = process.env.OPENAI_API_KEY; const OPENAI_API_URL = "https://api.openai.com/v1/chat/completions"; - -// Model pricing in USD (update as needed) -const MODEL_PRICING = { - 'gpt-4o-2024-08-06': { - promptTokenCost: 0.0000025, // $2.50 per 1M tokens - completionTokenCost: 0.00001, // $10.00 per 1M tokens - }, - 'gpt-4-turbo': { - promptTokenCost: 0.00001, // $10.00 per 1M tokens - completionTokenCost: 0.00003, // $30.00 per 1M tokens - }, - 'gpt-4o': { - promptTokenCost: 0.000005, // $5.00 per 1M tokens - completionTokenCost: 0.000015, // $15.00 per 1M tokens - } -} as const; +const DEFAULT_MODEL = process.env.OPENAI_MODEL || "gpt-4o"; const USD_TO_EUR_RATE = 0.85; // Update periodically or fetch from API +/** + * Get company's default AI model + */ +async function getCompanyAIModel(companyId: string): Promise { + const companyModel = await prisma.companyAIModel.findFirst({ + where: { + companyId, + isDefault: true, + }, + include: { + aiModel: true, + }, + }); + + return companyModel?.aiModel.name || DEFAULT_MODEL; +} + +/** + * Get current pricing for an AI model + */ +async function getCurrentModelPricing(modelName: string): Promise<{ + promptTokenCost: number; + completionTokenCost: number; +} | null> { + const model = await prisma.aIModel.findUnique({ + where: { name: modelName }, + include: { + pricing: { + where: { + effectiveFrom: { lte: new Date() }, + OR: [ + { effectiveUntil: null }, + { effectiveUntil: { gte: new Date() } } + ] + }, + orderBy: { effectiveFrom: 'desc' }, + take: 1, + }, + }, + }); + + if (!model || model.pricing.length === 0) { + return null; + } + + const pricing = model.pricing[0]; + return { + promptTokenCost: pricing.promptTokenCost, + completionTokenCost: pricing.completionTokenCost, + }; +} + interface ProcessedData { language: string; sentiment: "POSITIVE" | "NEUTRAL" | "NEGATIVE"; @@ -53,10 +90,20 @@ async function recordAIProcessingRequest( ): Promise { const usage = openaiResponse.usage; const model = openaiResponse.model; - const pricing = MODEL_PRICING[model as keyof typeof MODEL_PRICING] || MODEL_PRICING['gpt-4-turbo']; // fallback - const promptCost = usage.prompt_tokens * pricing.promptTokenCost; - const completionCost = usage.completion_tokens * pricing.completionTokenCost; + // Get current pricing from database + const pricing = await getCurrentModelPricing(model); + + // Fallback pricing if not found in database + const fallbackPricing = { + promptTokenCost: 0.00001, // $10.00 per 1M tokens (gpt-4-turbo rate) + completionTokenCost: 0.00003, // $30.00 per 1M tokens + }; + + const finalPricing = pricing || fallbackPricing; + + const promptCost = usage.prompt_tokens * finalPricing.promptTokenCost; + const completionCost = usage.completion_tokens * finalPricing.completionTokenCost; const totalCostUsd = promptCost + completionCost; const totalCostEur = totalCostUsd * USD_TO_EUR_RATE; @@ -80,8 +127,8 @@ async function recordAIProcessingRequest( acceptedPredictionTokens: usage.completion_tokens_details?.accepted_prediction_tokens || null, rejectedPredictionTokens: usage.completion_tokens_details?.rejected_prediction_tokens || null, - promptTokenCost: pricing.promptTokenCost, - completionTokenCost: pricing.completionTokenCost, + promptTokenCost: finalPricing.promptTokenCost, + completionTokenCost: finalPricing.completionTokenCost, totalCostEur, processingType, @@ -177,11 +224,14 @@ async function calculateEndTime(sessionId: string, fallbackEndTime: Date): Promi /** * Processes a session transcript using OpenAI API */ -async function processTranscriptWithOpenAI(sessionId: string, transcript: string): Promise { +async function processTranscriptWithOpenAI(sessionId: string, transcript: string, companyId: string): Promise { if (!OPENAI_API_KEY) { throw new Error("OPENAI_API_KEY environment variable is not set"); } + // Get company's AI model + const aiModel = await getCompanyAIModel(companyId); + // Updated system message with exact enum values const systemMessage = ` You are an AI assistant tasked with analyzing chat transcripts. @@ -218,7 +268,7 @@ async function processTranscriptWithOpenAI(sessionId: string, transcript: string Authorization: `Bearer ${OPENAI_API_KEY}`, }, body: JSON.stringify({ - model: "gpt-4o", // Use latest model + model: aiModel, // Use company's configured AI model messages: [ { role: "system", @@ -348,7 +398,7 @@ async function processSingleSession(session: any): Promise { ) .join("\n"); - const processedData = await processTranscriptWithOpenAI(session.id, transcript); + const processedData = await processTranscriptWithOpenAI(session.id, transcript, session.companyId); // Calculate messagesSent from actual Message records const messagesSent = await calculateMessagesSent(session.id); diff --git a/lib/processingStatusManager.ts b/lib/processingStatusManager.ts new file mode 100644 index 0000000..1d7591e --- /dev/null +++ b/lib/processingStatusManager.ts @@ -0,0 +1,295 @@ +import { PrismaClient, ProcessingStage, ProcessingStatus } from '@prisma/client'; + +const prisma = new PrismaClient(); + +/** + * Centralized processing status management + */ +export class ProcessingStatusManager { + + /** + * Initialize processing status for a session with all stages set to PENDING + */ + static async initializeSession(sessionId: string): Promise { + const stages = [ + ProcessingStage.CSV_IMPORT, + ProcessingStage.TRANSCRIPT_FETCH, + ProcessingStage.SESSION_CREATION, + ProcessingStage.AI_ANALYSIS, + ProcessingStage.QUESTION_EXTRACTION, + ]; + + // Create all processing status records for this session + await prisma.sessionProcessingStatus.createMany({ + data: stages.map(stage => ({ + sessionId, + stage, + status: ProcessingStatus.PENDING, + })), + skipDuplicates: true, // In case some already exist + }); + } + + /** + * Start a processing stage + */ + static async startStage( + sessionId: string, + stage: ProcessingStage, + metadata?: any + ): Promise { + await prisma.sessionProcessingStatus.upsert({ + where: { + sessionId_stage: { sessionId, stage } + }, + update: { + status: ProcessingStatus.IN_PROGRESS, + startedAt: new Date(), + errorMessage: null, + metadata: metadata || null, + }, + create: { + sessionId, + stage, + status: ProcessingStatus.IN_PROGRESS, + startedAt: new Date(), + metadata: metadata || null, + }, + }); + } + + /** + * Complete a processing stage successfully + */ + static async completeStage( + sessionId: string, + stage: ProcessingStage, + metadata?: any + ): Promise { + await prisma.sessionProcessingStatus.upsert({ + where: { + sessionId_stage: { sessionId, stage } + }, + update: { + status: ProcessingStatus.COMPLETED, + completedAt: new Date(), + errorMessage: null, + metadata: metadata || null, + }, + create: { + sessionId, + stage, + status: ProcessingStatus.COMPLETED, + startedAt: new Date(), + completedAt: new Date(), + metadata: metadata || null, + }, + }); + } + + /** + * Mark a processing stage as failed + */ + static async failStage( + sessionId: string, + stage: ProcessingStage, + errorMessage: string, + metadata?: any + ): Promise { + await prisma.sessionProcessingStatus.upsert({ + where: { + sessionId_stage: { sessionId, stage } + }, + update: { + status: ProcessingStatus.FAILED, + completedAt: new Date(), + errorMessage, + retryCount: { increment: 1 }, + metadata: metadata || null, + }, + create: { + sessionId, + stage, + status: ProcessingStatus.FAILED, + startedAt: new Date(), + completedAt: new Date(), + errorMessage, + retryCount: 1, + metadata: metadata || null, + }, + }); + } + + /** + * Skip a processing stage (e.g., no transcript URL available) + */ + static async skipStage( + sessionId: string, + stage: ProcessingStage, + reason: string + ): Promise { + await prisma.sessionProcessingStatus.upsert({ + where: { + sessionId_stage: { sessionId, stage } + }, + update: { + status: ProcessingStatus.SKIPPED, + completedAt: new Date(), + errorMessage: reason, + }, + create: { + sessionId, + stage, + status: ProcessingStatus.SKIPPED, + startedAt: new Date(), + completedAt: new Date(), + errorMessage: reason, + }, + }); + } + + /** + * Get processing status for a specific session + */ + static async getSessionStatus(sessionId: string) { + return await prisma.sessionProcessingStatus.findMany({ + where: { sessionId }, + orderBy: { stage: 'asc' }, + }); + } + + /** + * Get sessions that need processing for a specific stage + */ + static async getSessionsNeedingProcessing( + stage: ProcessingStage, + limit: number = 50 + ) { + return await prisma.sessionProcessingStatus.findMany({ + where: { + stage, + status: ProcessingStatus.PENDING, + }, + include: { + session: { + include: { + import: true, + company: true, + }, + }, + }, + take: limit, + orderBy: { session: { createdAt: 'asc' } }, + }); + } + + /** + * Get pipeline status overview + */ + static async getPipelineStatus() { + // Get counts by stage and status + const statusCounts = await prisma.sessionProcessingStatus.groupBy({ + by: ['stage', 'status'], + _count: { id: true }, + }); + + // Get total sessions + const totalSessions = await prisma.session.count(); + + // Organize the data + const pipeline: Record> = {}; + + for (const { stage, status, _count } of statusCounts) { + if (!pipeline[stage]) { + pipeline[stage] = {}; + } + pipeline[stage][status] = _count.id; + } + + return { + totalSessions, + pipeline, + }; + } + + /** + * Get sessions with failed processing + */ + static async getFailedSessions(stage?: ProcessingStage) { + const where: any = { + status: ProcessingStatus.FAILED, + }; + + if (stage) { + where.stage = stage; + } + + return await prisma.sessionProcessingStatus.findMany({ + where, + include: { + session: { + include: { + import: true, + }, + }, + }, + orderBy: { completedAt: 'desc' }, + }); + } + + /** + * Reset a failed stage for retry + */ + static async resetStageForRetry(sessionId: string, stage: ProcessingStage): Promise { + await prisma.sessionProcessingStatus.update({ + where: { + sessionId_stage: { sessionId, stage } + }, + data: { + status: ProcessingStatus.PENDING, + startedAt: null, + completedAt: null, + errorMessage: null, + }, + }); + } + + /** + * Check if a session has completed a specific stage + */ + static async hasCompletedStage(sessionId: string, stage: ProcessingStage): Promise { + const status = await prisma.sessionProcessingStatus.findUnique({ + where: { + sessionId_stage: { sessionId, stage } + }, + }); + + return status?.status === ProcessingStatus.COMPLETED; + } + + /** + * Check if a session is ready for a specific stage (previous stages completed) + */ + static async isReadyForStage(sessionId: string, stage: ProcessingStage): Promise { + const stageOrder = [ + ProcessingStage.CSV_IMPORT, + ProcessingStage.TRANSCRIPT_FETCH, + ProcessingStage.SESSION_CREATION, + ProcessingStage.AI_ANALYSIS, + ProcessingStage.QUESTION_EXTRACTION, + ]; + + const currentStageIndex = stageOrder.indexOf(stage); + if (currentStageIndex === 0) return true; // First stage is always ready + + // Check if all previous stages are completed + const previousStages = stageOrder.slice(0, currentStageIndex); + + for (const prevStage of previousStages) { + const isCompleted = await this.hasCompletedStage(sessionId, prevStage); + if (!isCompleted) return false; + } + + return true; + } +} diff --git a/migrate-to-refactored-system.ts b/migrate-to-refactored-system.ts new file mode 100644 index 0000000..84108e2 --- /dev/null +++ b/migrate-to-refactored-system.ts @@ -0,0 +1,129 @@ +import { PrismaClient, ProcessingStage, ProcessingStatus } from '@prisma/client'; +import { ProcessingStatusManager } from './lib/processingStatusManager'; + +const prisma = new PrismaClient(); + +async function migrateToRefactoredSystem() { + try { + console.log('=== MIGRATING TO REFACTORED PROCESSING SYSTEM ===\n'); + + // Get all existing sessions + const sessions = await prisma.session.findMany({ + include: { + import: true, + messages: true, + sessionQuestions: true, + }, + orderBy: { createdAt: 'asc' } + }); + + console.log(`Found ${sessions.length} sessions to migrate...\n`); + + let migratedCount = 0; + + for (const session of sessions) { + console.log(`Migrating session ${session.import?.externalSessionId || session.id}...`); + + // Initialize processing status for this session + await ProcessingStatusManager.initializeSession(session.id); + + // Determine the current state of each stage based on existing data + + // 1. CSV_IMPORT - Always completed if session exists + await ProcessingStatusManager.completeStage(session.id, ProcessingStage.CSV_IMPORT, { + migratedFrom: 'existing_session', + importId: session.importId + }); + + // 2. TRANSCRIPT_FETCH - Check if transcript content exists + if (session.import?.rawTranscriptContent) { + await ProcessingStatusManager.completeStage(session.id, ProcessingStage.TRANSCRIPT_FETCH, { + migratedFrom: 'existing_transcript', + contentLength: session.import.rawTranscriptContent.length + }); + } else if (!session.import?.fullTranscriptUrl) { + // No transcript URL - skip this stage + await ProcessingStatusManager.skipStage(session.id, ProcessingStage.TRANSCRIPT_FETCH, 'No transcript URL in original import'); + } else { + // Has URL but no content - mark as pending for retry + console.log(` - Transcript fetch pending for ${session.import.externalSessionId}`); + } + + // 3. SESSION_CREATION - Check if messages exist + if (session.messages.length > 0) { + await ProcessingStatusManager.completeStage(session.id, ProcessingStage.SESSION_CREATION, { + migratedFrom: 'existing_messages', + messageCount: session.messages.length + }); + } else if (session.import?.rawTranscriptContent) { + // Has transcript but no messages - needs reprocessing + console.log(` - Session creation pending for ${session.import.externalSessionId} (has transcript but no messages)`); + } else { + // No transcript content - skip or mark as pending based on transcript fetch status + if (!session.import?.fullTranscriptUrl) { + await ProcessingStatusManager.skipStage(session.id, ProcessingStage.SESSION_CREATION, 'No transcript content available'); + } + } + + // 4. AI_ANALYSIS - Check if AI fields are populated + const hasAIAnalysis = session.summary || session.sentiment || session.category || session.language; + if (hasAIAnalysis) { + await ProcessingStatusManager.completeStage(session.id, ProcessingStage.AI_ANALYSIS, { + migratedFrom: 'existing_ai_analysis', + hasSummary: !!session.summary, + hasSentiment: !!session.sentiment, + hasCategory: !!session.category, + hasLanguage: !!session.language + }); + } else { + // No AI analysis - mark as pending if session creation is complete + if (session.messages.length > 0) { + console.log(` - AI analysis pending for ${session.import?.externalSessionId}`); + } + } + + // 5. QUESTION_EXTRACTION - Check if questions exist + if (session.sessionQuestions.length > 0) { + await ProcessingStatusManager.completeStage(session.id, ProcessingStage.QUESTION_EXTRACTION, { + migratedFrom: 'existing_questions', + questionCount: session.sessionQuestions.length + }); + } else { + // No questions - mark as pending if AI analysis is complete + if (hasAIAnalysis) { + console.log(` - Question extraction pending for ${session.import?.externalSessionId}`); + } + } + + migratedCount++; + + if (migratedCount % 10 === 0) { + console.log(` Migrated ${migratedCount}/${sessions.length} sessions...`); + } + } + + console.log(`\n✓ Successfully migrated ${migratedCount} sessions to the new processing system`); + + // Show final status + console.log('\n=== MIGRATION COMPLETE - FINAL STATUS ==='); + const pipelineStatus = await ProcessingStatusManager.getPipelineStatus(); + + const stages = ['CSV_IMPORT', 'TRANSCRIPT_FETCH', 'SESSION_CREATION', 'AI_ANALYSIS', 'QUESTION_EXTRACTION']; + + for (const stage of stages) { + const stageData = pipelineStatus.pipeline[stage] || {}; + const pending = stageData.PENDING || 0; + const completed = stageData.COMPLETED || 0; + const skipped = stageData.SKIPPED || 0; + + console.log(`${stage}: ${completed} completed, ${pending} pending, ${skipped} skipped`); + } + + } catch (error) { + console.error('Error migrating to refactored system:', error); + } finally { + await prisma.$disconnect(); + } +} + +migrateToRefactoredSystem(); diff --git a/prisma/migrations/20250627194930_add_ai_model_management/migration.sql b/prisma/migrations/20250627194930_add_ai_model_management/migration.sql new file mode 100644 index 0000000..0acfe3d --- /dev/null +++ b/prisma/migrations/20250627194930_add_ai_model_management/migration.sql @@ -0,0 +1,63 @@ +-- CreateTable +CREATE TABLE "AIModel" ( + "id" TEXT NOT NULL, + "name" TEXT NOT NULL, + "provider" TEXT NOT NULL, + "maxTokens" INTEGER, + "isActive" BOOLEAN NOT NULL DEFAULT true, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "AIModel_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "AIModelPricing" ( + "id" TEXT NOT NULL, + "aiModelId" TEXT NOT NULL, + "promptTokenCost" DOUBLE PRECISION NOT NULL, + "completionTokenCost" DOUBLE PRECISION NOT NULL, + "effectiveFrom" TIMESTAMP(3) NOT NULL, + "effectiveUntil" TIMESTAMP(3), + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "AIModelPricing_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "CompanyAIModel" ( + "id" TEXT NOT NULL, + "companyId" TEXT NOT NULL, + "aiModelId" TEXT NOT NULL, + "isDefault" BOOLEAN NOT NULL DEFAULT false, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "CompanyAIModel_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE UNIQUE INDEX "AIModel_name_key" ON "AIModel"("name"); + +-- CreateIndex +CREATE INDEX "AIModel_provider_isActive_idx" ON "AIModel"("provider", "isActive"); + +-- CreateIndex +CREATE INDEX "AIModelPricing_aiModelId_effectiveFrom_idx" ON "AIModelPricing"("aiModelId", "effectiveFrom"); + +-- CreateIndex +CREATE INDEX "AIModelPricing_effectiveFrom_effectiveUntil_idx" ON "AIModelPricing"("effectiveFrom", "effectiveUntil"); + +-- CreateIndex +CREATE INDEX "CompanyAIModel_companyId_isDefault_idx" ON "CompanyAIModel"("companyId", "isDefault"); + +-- CreateIndex +CREATE UNIQUE INDEX "CompanyAIModel_companyId_aiModelId_key" ON "CompanyAIModel"("companyId", "aiModelId"); + +-- AddForeignKey +ALTER TABLE "AIModelPricing" ADD CONSTRAINT "AIModelPricing_aiModelId_fkey" FOREIGN KEY ("aiModelId") REFERENCES "AIModel"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "CompanyAIModel" ADD CONSTRAINT "CompanyAIModel_companyId_fkey" FOREIGN KEY ("companyId") REFERENCES "Company"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "CompanyAIModel" ADD CONSTRAINT "CompanyAIModel_aiModelId_fkey" FOREIGN KEY ("aiModelId") REFERENCES "AIModel"("id") ON DELETE CASCADE ON UPDATE CASCADE; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index b8690d6..1e4701c 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -1,10 +1,12 @@ generator client { provider = "prisma-client-js" + previewFeatures = ["driverAdapters"] } datasource db { - provider = "postgresql" - url = env("DATABASE_URL") + provider = "postgresql" + url = env("DATABASE_URL") + directUrl = env("DATABASE_URL_DIRECT") } /** @@ -38,6 +40,22 @@ enum SessionCategory { UNRECOGNIZED_OTHER } +enum ProcessingStage { + CSV_IMPORT // SessionImport created + TRANSCRIPT_FETCH // Transcript content fetched + SESSION_CREATION // Session + Messages created + AI_ANALYSIS // AI processing completed + QUESTION_EXTRACTION // Questions extracted +} + +enum ProcessingStatus { + PENDING + IN_PROGRESS + COMPLETED + FAILED + SKIPPED +} + /** * COMPANY (multi-tenant root) */ @@ -50,9 +68,10 @@ model Company { sentimentAlert Float? dashboardOpts Json? // JSON column instead of opaque string - users User[] @relation("CompanyUsers") - sessions Session[] - imports SessionImport[] + users User[] @relation("CompanyUsers") + sessions Session[] + imports SessionImport[] + companyAiModels CompanyAIModel[] createdAt DateTime @default(now()) updatedAt DateTime @updatedAt @@ -118,9 +137,6 @@ model Session { // AI-generated fields summary String? // AI-generated summary - - // Processing metadata - processed Boolean @default(false) /** * Relationships @@ -128,6 +144,7 @@ model Session { messages Message[] // Individual conversation messages sessionQuestions SessionQuestion[] // Questions asked in this session aiProcessingRequests AIProcessingRequest[] // AI processing cost tracking + processingStatus SessionProcessingStatus[] // Processing pipeline status createdAt DateTime @default(now()) updatedAt DateTime @updatedAt @@ -136,15 +153,8 @@ model Session { } /** - * 2. Raw CSV row waiting to be processed ---------- + * 2. Raw CSV row (pure data storage) ---------- */ -enum ImportStatus { - QUEUED - PROCESSING - DONE - ERROR -} - model SessionImport { id String @id @default(uuid()) company Company @relation(fields: [companyId], references: [id], onDelete: Cascade) @@ -177,13 +187,9 @@ model SessionImport { rawTranscriptContent String? // Fetched content from fullTranscriptUrl // ─── bookkeeping ───────────────────────────────── - status ImportStatus @default(QUEUED) - errorMsg String? - processedAt DateTime? - createdAt DateTime @default(now()) + createdAt DateTime @default(now()) @@unique([companyId, externalSessionId]) // idempotent re-imports - @@index([status]) } /** @@ -206,6 +212,30 @@ model Message { @@index([sessionId, order]) } +/** + * UNIFIED PROCESSING STATUS TRACKING + */ +model SessionProcessingStatus { + id String @id @default(uuid()) + sessionId String + stage ProcessingStage + status ProcessingStatus @default(PENDING) + + startedAt DateTime? + completedAt DateTime? + errorMessage String? + retryCount Int @default(0) + + // Stage-specific metadata (e.g., AI costs, token usage, fetch details) + metadata Json? + + session Session @relation(fields: [sessionId], references: [id], onDelete: Cascade) + + @@unique([sessionId, stage]) + @@index([stage, status]) + @@index([sessionId]) +} + /** * QUESTION MANAGEMENT (separate from Session for better analytics) */ @@ -281,3 +311,66 @@ model AIProcessingRequest { @@index([requestedAt]) @@index([model]) } + +/** + * AI MODEL MANAGEMENT SYSTEM + */ + +/** + * AI Model definitions (without pricing) + */ +model AIModel { + id String @id @default(uuid()) + name String @unique // "gpt-4o", "gpt-4-turbo", etc. + provider String // "openai", "anthropic", etc. + maxTokens Int? // Maximum tokens for this model + isActive Boolean @default(true) + + // Relationships + pricing AIModelPricing[] + companyModels CompanyAIModel[] + + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + @@index([provider, isActive]) +} + +/** + * Time-based pricing for AI models + */ +model AIModelPricing { + id String @id @default(uuid()) + aiModelId String + promptTokenCost Float // Cost per prompt token in USD + completionTokenCost Float // Cost per completion token in USD + effectiveFrom DateTime // When this pricing becomes effective + effectiveUntil DateTime? // When this pricing expires (null = current) + + // Relationships + aiModel AIModel @relation(fields: [aiModelId], references: [id], onDelete: Cascade) + + createdAt DateTime @default(now()) + + @@index([aiModelId, effectiveFrom]) + @@index([effectiveFrom, effectiveUntil]) +} + +/** + * Company-specific AI model assignments + */ +model CompanyAIModel { + id String @id @default(uuid()) + companyId String + aiModelId String + isDefault Boolean @default(false) // Is this the default model for the company? + + // Relationships + company Company @relation(fields: [companyId], references: [id], onDelete: Cascade) + aiModel AIModel @relation(fields: [aiModelId], references: [id], onDelete: Cascade) + + createdAt DateTime @default(now()) + + @@unique([companyId, aiModelId]) // Prevent duplicate assignments + @@index([companyId, isDefault]) +} diff --git a/prisma/seed.ts b/prisma/seed.ts index c7557f5..ed22f79 100644 --- a/prisma/seed.ts +++ b/prisma/seed.ts @@ -1,4 +1,4 @@ -// seed.ts - Create initial admin user and company +// seed.ts - Create initial admin user, company, and AI models import { PrismaClient } from "@prisma/client"; import bcrypt from "bcryptjs"; @@ -6,30 +6,133 @@ const prisma = new PrismaClient(); async function main() { try { - // Create a company + console.log("🌱 Starting database seeding..."); + + // Create the Jumbo company const company = await prisma.company.create({ data: { - name: "Demo Company", - csvUrl: "https://proto.notso.ai/jumbo/chats", // Replace with a real URL if available + name: "Jumbo Bas Bobbeldijk", + csvUrl: "https://proto.notso.ai/jumbo/chats", + csvUsername: "jumboadmin", + csvPassword: "jumboadmin", }, }); + console.log(`✅ Created company: ${company.name}`); - // Create an admin user - const hashedPassword = await bcrypt.hash("admin123", 10); - await prisma.user.create({ + // Create admin user + const hashedPassword = await bcrypt.hash("8QbL26tB7fWS", 10); + const adminUser = await prisma.user.create({ data: { - email: "admin@demo.com", + email: "max.kowalski.contact@gmail.com", password: hashedPassword, role: "ADMIN", companyId: company.id, }, }); + console.log(`✅ Created admin user: ${adminUser.email}`); + + // Create AI Models + const aiModels = [ + { + name: "gpt-4o", + provider: "openai", + maxTokens: 128000, + isActive: true, + }, + { + name: "gpt-4o-2024-08-06", + provider: "openai", + maxTokens: 128000, + isActive: true, + }, + { + name: "gpt-4-turbo", + provider: "openai", + maxTokens: 128000, + isActive: true, + }, + { + name: "gpt-4o-mini", + provider: "openai", + maxTokens: 128000, + isActive: true, + }, + ]; + + const createdModels: any[] = []; + for (const modelData of aiModels) { + const model = await prisma.aIModel.create({ + data: modelData, + }); + createdModels.push(model); + console.log(`✅ Created AI model: ${model.name}`); + } + + // Create current pricing for AI models (as of December 2024) + const currentTime = new Date(); + const pricingData = [ + { + modelName: "gpt-4o", + promptTokenCost: 0.0000025, // $2.50 per 1M tokens + completionTokenCost: 0.00001, // $10.00 per 1M tokens + }, + { + modelName: "gpt-4o-2024-08-06", + promptTokenCost: 0.0000025, // $2.50 per 1M tokens + completionTokenCost: 0.00001, // $10.00 per 1M tokens + }, + { + modelName: "gpt-4-turbo", + promptTokenCost: 0.00001, // $10.00 per 1M tokens + completionTokenCost: 0.00003, // $30.00 per 1M tokens + }, + { + modelName: "gpt-4o-mini", + promptTokenCost: 0.00000015, // $0.15 per 1M tokens + completionTokenCost: 0.0000006, // $0.60 per 1M tokens + }, + ]; + + for (const pricing of pricingData) { + const model = createdModels.find(m => m.name === pricing.modelName); + if (model) { + await prisma.aIModelPricing.create({ + data: { + aiModelId: model.id, + promptTokenCost: pricing.promptTokenCost, + completionTokenCost: pricing.completionTokenCost, + effectiveFrom: currentTime, + effectiveUntil: null, // Current pricing + }, + }); + console.log(`✅ Created pricing for: ${model.name}`); + } + } + + // Assign default AI model to company (gpt-4o) + const defaultModel = createdModels.find(m => m.name === "gpt-4o"); + if (defaultModel) { + await prisma.companyAIModel.create({ + data: { + companyId: company.id, + aiModelId: defaultModel.id, + isDefault: true, + }, + }); + console.log(`✅ Set default AI model for company: ${defaultModel.name}`); + } + + console.log("\n🎉 Database seeding completed successfully!"); + console.log("\n📋 Summary:"); + console.log(`Company: ${company.name}`); + console.log(`Admin user: ${adminUser.email}`); + console.log(`Password: 8QbL26tB7fWS`); + console.log(`AI Models: ${createdModels.length} models created with current pricing`); + console.log(`Default model: ${defaultModel?.name}`); + console.log("\n🚀 Ready to start importing CSV data!"); - console.log("Seed data created successfully:"); - console.log("Company: Demo Company"); - console.log("Admin user: admin@demo.com (password: admin123)"); } catch (error) { - console.error("Error seeding database:", error); + console.error("❌ Error seeding database:", error); process.exit(1); } finally { await prisma.$disconnect(); diff --git a/test-ai-processing.ts b/test-ai-processing.ts new file mode 100644 index 0000000..357aa8d --- /dev/null +++ b/test-ai-processing.ts @@ -0,0 +1,17 @@ +import { processUnprocessedSessions } from './lib/processingScheduler'; + +async function testAIProcessing() { + console.log('=== TESTING AI PROCESSING ===\n'); + + try { + // Process with batch size of 10 to test multiple batches (since we have 109 sessions) + await processUnprocessedSessions(10, 3); // batch size 10, max concurrency 3 + + console.log('\n=== AI PROCESSING COMPLETED ==='); + + } catch (error) { + console.error('Error during AI processing:', error); + } +} + +testAIProcessing(); diff --git a/test-import-processing.ts b/test-import-processing.ts new file mode 100644 index 0000000..5028c90 --- /dev/null +++ b/test-import-processing.ts @@ -0,0 +1,17 @@ +import { processQueuedImports } from './lib/importProcessor'; + +async function testImportProcessing() { + console.log('=== TESTING IMPORT PROCESSING ===\n'); + + try { + // Process with batch size of 50 to test multiple batches + await processQueuedImports(50); + + console.log('\n=== IMPORT PROCESSING COMPLETED ==='); + + } catch (error) { + console.error('Error during import processing:', error); + } +} + +testImportProcessing();