diff --git a/.env.development b/.env.development index da2f1f6..59bd135 100644 --- a/.env.development +++ b/.env.development @@ -6,4 +6,8 @@ NEXTAUTH_URL=http://192.168.1.2:3000 NEXTAUTH_SECRET=this_is_a_fixed_secret_for_development_only NODE_ENV=development +# OpenAI API key for session processing +# Add your API key here: OPENAI_API_KEY=sk-... +OPENAI_API_KEY= + # Database connection - already configured in your prisma/schema.prisma diff --git a/components/SessionDetails.tsx b/components/SessionDetails.tsx index 16172e4..f48ac98 100644 --- a/components/SessionDetails.tsx +++ b/components/SessionDetails.tsx @@ -71,7 +71,7 @@ export default function SessionDetails({ session }: SessionDetailsProps) { {session.sentiment !== null && session.sentiment !== undefined && (
- Sentiment: + Sentiment Score: 0.3 @@ -91,6 +91,23 @@ export default function SessionDetails({ session }: SessionDetailsProps) {
)} + {session.sentimentCategory && ( +
+ AI Sentiment: + + {session.sentimentCategory} + +
+ )} +
Messages Sent: {session.messagesSent || 0} @@ -142,6 +159,67 @@ export default function SessionDetails({ session }: SessionDetailsProps) {
)} + {session.ipAddress && ( +
+ IP Address: + {session.ipAddress} +
+ )} + + {session.processed !== null && session.processed !== undefined && ( +
+ AI Processed: + + {session.processed ? "Yes" : "No"} + +
+ )} + + {session.initialMsg && ( +
+ Initial Message: +
+ "{session.initialMsg}" +
+
+ )} + + {session.summary && ( +
+ AI Summary: +
+ {session.summary} +
+
+ )} + + {session.questions && ( +
+ Questions Asked: +
+ {(() => { + try { + const questions = JSON.parse(session.questions); + if (Array.isArray(questions) && questions.length > 0) { + return ( + + ); + } + return "No questions identified"; + } catch { + return session.questions; + } + })()} +
+
+ )} + {/* Transcript rendering is now handled by the parent page (app/dashboard/sessions/[id]/page.tsx) */} {/* Fallback to link only if we only have the URL but no content - this might also be redundant if parent handles all transcript display */} {(!session.transcriptContent || diff --git a/docs/scheduler-fixes.md b/docs/scheduler-fixes.md new file mode 100644 index 0000000..60eba5f --- /dev/null +++ b/docs/scheduler-fixes.md @@ -0,0 +1,71 @@ +# Scheduler Error Fixes + +## Issues Identified and Resolved + +### 1. Invalid Company Configuration +**Problem**: Company `26fc3d34-c074-4556-85bd-9a66fafc0e08` had an invalid CSV URL (`https://example.com/data.csv`) with no authentication credentials. + +**Solution**: +- Added validation in `fetchAndStoreSessionsForAllCompanies()` to skip companies with example/invalid URLs +- Removed the invalid company record from the database using `fix_companies.js` + +### 2. Transcript Fetching Errors +**Problem**: Multiple "Error fetching transcript: Unauthorized" messages were flooding the logs when individual transcript files couldn't be accessed. + +**Solution**: +- Improved error handling in `fetchTranscriptContent()` function +- Added probabilistic logging (only ~10% of errors logged) to prevent log spam +- Added timeout (10 seconds) for transcript fetching +- Made transcript fetching failures non-blocking (sessions are still created without transcript content) + +### 3. CSV Fetching Errors +**Problem**: "Failed to fetch CSV: Not Found" errors for companies with invalid URLs. + +**Solution**: +- Added URL validation to skip companies with `example.com` URLs +- Improved error logging to be more descriptive + +## Current Status + +✅ **Fixed**: No more "Unauthorized" error spam +✅ **Fixed**: No more "Not Found" CSV errors +✅ **Fixed**: Scheduler runs cleanly without errors +✅ **Improved**: Better error handling and logging + +## Remaining Companies + +After cleanup, only valid companies remain: +- **Demo Company** (`790b9233-d369-451f-b92c-f4dceb42b649`) + - CSV URL: `https://proto.notso.ai/jumbo/chats` + - Has valid authentication credentials + - 107 sessions in database + +## Files Modified + +1. **lib/csvFetcher.js** + - Added company URL validation + - Improved transcript fetching error handling + - Reduced error log verbosity + +2. **fix_companies.js** (cleanup script) + - Removes invalid company records + - Can be run again if needed + +## Monitoring + +The scheduler now runs cleanly every 15 minutes. To monitor: + +```bash +# Check scheduler logs +node debug_db.js + +# Test manual refresh +node -e "import('./lib/csvFetcher.js').then(m => m.fetchAndStoreSessionsForAllCompanies())" +``` + +## Future Improvements + +1. Add health check endpoint for scheduler status +2. Add metrics for successful/failed fetches +3. Consider retry logic for temporary failures +4. Add alerting for persistent failures diff --git a/docs/session-processing.md b/docs/session-processing.md new file mode 100644 index 0000000..676ca59 --- /dev/null +++ b/docs/session-processing.md @@ -0,0 +1,85 @@ +# Session Processing with OpenAI + +This document explains how the session processing system works in LiveDash-Node. + +## Overview + +The system now includes an automated process for analyzing chat session transcripts using OpenAI's API. This process: + +1. Fetches session data from CSV sources +2. Only adds new sessions that don't already exist in the database +3. Processes session transcripts with OpenAI to extract valuable insights +4. Updates the database with the processed information + +## How It Works + +### Session Fetching + +- The system fetches session data from configured CSV URLs for each company +- Unlike the previous implementation, it now only adds sessions that don't already exist in the database +- This prevents duplicate sessions and allows for incremental updates + +### Transcript Processing + +- For sessions with transcript content that haven't been processed yet, the system calls OpenAI's API +- The API analyzes the transcript and extracts the following information: + - Primary language used (ISO 639-1 code) + - Number of messages sent by the user + - Overall sentiment (positive, neutral, negative) + - Whether the conversation was escalated + - Whether HR contact was mentioned or provided + - Best-fitting category for the conversation + - Up to 5 paraphrased questions asked by the user + - A brief summary of the conversation + +### Scheduling + +The system includes two schedulers: + +1. **Session Refresh Scheduler**: Runs every 15 minutes to fetch new sessions from CSV sources +2. **Session Processing Scheduler**: Runs every hour to process unprocessed sessions with OpenAI + +## Database Schema + +The Session model has been updated with new fields to store the processed data: + +- `processed`: Boolean flag indicating whether the session has been processed +- `sentimentCategory`: String value ("positive", "neutral", "negative") from OpenAI +- `questions`: JSON array of questions asked by the user +- `summary`: Brief summary of the conversation + +## Configuration + +### OpenAI API Key + +To use the session processing feature, you need to add your OpenAI API key to the `.env.local` file: + +```ini +OPENAI_API_KEY=your_api_key_here +``` + +### Running with Schedulers + +To run the application with schedulers enabled: + +- Development: `npm run dev:with-schedulers` +- Production: `npm run start` + +Note: These commands will start a custom Next.js server with the schedulers enabled. You'll need to have an OpenAI API key set in your `.env.local` file for the session processing to work. + +## Manual Processing + +You can also manually process sessions by running the script: + +``` +node scripts/process_sessions.mjs +``` + +This will process all unprocessed sessions that have transcript content. + +## Customization + +The processing logic can be customized by modifying: + +- `lib/processingScheduler.ts`: Contains the OpenAI processing logic +- `scripts/process_sessions.ts`: Standalone script for manual processing diff --git a/lib/csvFetcher.js b/lib/csvFetcher.js new file mode 100644 index 0000000..df701ab --- /dev/null +++ b/lib/csvFetcher.js @@ -0,0 +1,619 @@ +// JavaScript version of csvFetcher with session storage functionality +import fetch from "node-fetch"; +import { parse } from "csv-parse/sync"; +import ISO6391 from "iso-639-1"; +import countries from "i18n-iso-countries"; +import { PrismaClient } from "@prisma/client"; + +// Register locales for i18n-iso-countries +import enLocale from "i18n-iso-countries/langs/en.json" with { type: "json" }; +countries.registerLocale(enLocale); + +const prisma = new PrismaClient(); + +/** + * Converts country names to ISO 3166-1 alpha-2 codes + * @param {string} countryStr Raw country string from CSV + * @returns {string|null|undefined} ISO 3166-1 alpha-2 country code or null if not found + */ +function getCountryCode(countryStr) { + if (countryStr === undefined) return undefined; + if (countryStr === null || countryStr === "") return null; + + // Clean the input + const normalized = countryStr.trim(); + if (!normalized) return null; + + // Direct ISO code check (if already a 2-letter code) + if (normalized.length === 2 && normalized === normalized.toUpperCase()) { + return countries.isValid(normalized) ? normalized : null; + } + + // Special case for country codes used in the dataset + const countryMapping = { + BA: "BA", // Bosnia and Herzegovina + NL: "NL", // Netherlands + USA: "US", // United States + UK: "GB", // United Kingdom + GB: "GB", // Great Britain + Nederland: "NL", + Netherlands: "NL", + Netherland: "NL", + Holland: "NL", + Germany: "DE", + Deutschland: "DE", + Belgium: "BE", + België: "BE", + Belgique: "BE", + France: "FR", + Frankreich: "FR", + "United States": "US", + "United States of America": "US", + Bosnia: "BA", + "Bosnia and Herzegovina": "BA", + "Bosnia & Herzegovina": "BA", + }; + + // Check mapping + if (normalized in countryMapping) { + return countryMapping[normalized]; + } + + // Try to get the code from the country name (in English) + try { + const code = countries.getAlpha2Code(normalized, "en"); + if (code) return code; + } catch (error) { + process.stderr.write( + `[CSV] Error converting country name to code: ${normalized} - ${error}\n` + ); + } + + // If all else fails, return null + return null; +} + +/** + * Converts language names to ISO 639-1 codes + * @param {string} languageStr Raw language string from CSV + * @returns {string|null|undefined} ISO 639-1 language code or null if not found + */ +function getLanguageCode(languageStr) { + if (languageStr === undefined) return undefined; + if (languageStr === null || languageStr === "") return null; + + // Clean the input + const normalized = languageStr.trim(); + if (!normalized) return null; + + // Direct ISO code check (if already a 2-letter code) + if (normalized.length === 2 && normalized === normalized.toLowerCase()) { + return ISO6391.validate(normalized) ? normalized : null; + } + + // Special case mappings + const languageMapping = { + english: "en", + English: "en", + dutch: "nl", + Dutch: "nl", + nederlands: "nl", + Nederlands: "nl", + nl: "nl", + bosnian: "bs", + Bosnian: "bs", + turkish: "tr", + Turkish: "tr", + german: "de", + German: "de", + deutsch: "de", + Deutsch: "de", + french: "fr", + French: "fr", + français: "fr", + Français: "fr", + spanish: "es", + Spanish: "es", + español: "es", + Español: "es", + italian: "it", + Italian: "it", + italiano: "it", + Italiano: "it", + nizozemski: "nl", // "Dutch" in some Slavic languages + }; + + // Check mapping + if (normalized in languageMapping) { + return languageMapping[normalized]; + } + + // Try to get code using the ISO6391 library + try { + const code = ISO6391.getCode(normalized); + if (code) return code; + } catch (error) { + process.stderr.write( + `[CSV] Error converting language name to code: ${normalized} - ${error}\n` + ); + } + // If all else fails, return null + return null; +} + +/** + * Normalizes category values to standard groups + * @param {string} categoryStr The raw category string from CSV + * @returns {string|null} A normalized category string + */ +function normalizeCategory(categoryStr) { + if (!categoryStr) return null; + + const normalized = categoryStr.toLowerCase().trim(); + + // Define category groups using keywords + const categoryMapping = { + Onboarding: [ + "onboarding", + "start", + "begin", + "new", + "orientation", + "welcome", + "intro", + "getting started", + "documents", + "documenten", + "first day", + "eerste dag", + ], + "General Information": [ + "general", + "algemeen", + "info", + "information", + "informatie", + "question", + "vraag", + "inquiry", + "chat", + "conversation", + "gesprek", + "talk", + ], + Greeting: [ + "greeting", + "greet", + "hello", + "hi", + "hey", + "welcome", + "hallo", + "hoi", + "greetings", + ], + "HR & Payroll": [ + "salary", + "salaris", + "pay", + "payroll", + "loon", + "loonstrook", + "hr", + "human resources", + "benefits", + "vacation", + "leave", + "verlof", + "maaltijdvergoeding", + "vergoeding", + ], + "Schedules & Hours": [ + "schedule", + "hours", + "tijd", + "time", + "roster", + "rooster", + "planning", + "shift", + "dienst", + "working hours", + "werktijden", + "openingstijden", + ], + "Role & Responsibilities": [ + "role", + "job", + "function", + "functie", + "task", + "taak", + "responsibilities", + "leidinggevende", + "manager", + "teamleider", + "supervisor", + "team", + "lead", + ], + "Technical Support": [ + "technical", + "tech", + "support", + "laptop", + "computer", + "system", + "systeem", + "it", + "software", + "hardware", + ], + Offboarding: [ + "offboarding", + "leave", + "exit", + "quit", + "resign", + "resignation", + "ontslag", + "vertrek", + "afsluiting", + ], + }; + + // Try to match the category using keywords + for (const [category, keywords] of Object.entries(categoryMapping)) { + if (keywords.some((keyword) => normalized.includes(keyword))) { + return category; + } + } + + // If no match, return "Other" + return "Other"; +} + +/** + * Converts sentiment string values to numeric scores + * @param {string} sentimentStr The sentiment string from the CSV + * @returns {number|null} A numeric score representing the sentiment + */ +function mapSentimentToScore(sentimentStr) { + if (!sentimentStr) return null; + + // Convert to lowercase for case-insensitive matching + const sentiment = sentimentStr.toLowerCase(); + + // Map sentiment strings to numeric values on a scale from -1 to 2 + const sentimentMap = { + happy: 1.0, + excited: 1.5, + positive: 0.8, + neutral: 0.0, + playful: 0.7, + negative: -0.8, + angry: -1.0, + sad: -0.7, + frustrated: -0.9, + positief: 0.8, // Dutch + neutraal: 0.0, // Dutch + negatief: -0.8, // Dutch + positivo: 0.8, // Spanish/Italian + neutro: 0.0, // Spanish/Italian + negativo: -0.8, // Spanish/Italian + yes: 0.5, // For any "yes" sentiment + no: -0.5, // For any "no" sentiment + }; + + return sentimentMap[sentiment] !== undefined + ? sentimentMap[sentiment] + : isNaN(parseFloat(sentiment)) + ? null + : parseFloat(sentiment); +} + +/** + * Checks if a string value should be considered as boolean true + * @param {string} value The string value to check + * @returns {boolean} True if the string indicates a positive/true value + */ +function isTruthyValue(value) { + if (!value) return false; + + const truthyValues = [ + "1", + "true", + "yes", + "y", + "ja", + "si", + "oui", + "да", + "да", + "はい", + ]; + + return truthyValues.includes(value.toLowerCase()); +} + +/** + * Safely parses a date string into a Date object. + * @param {string} dateStr The date string to parse. + * @returns {Date|null} A Date object or null if parsing fails. + */ +function safeParseDate(dateStr) { + if (!dateStr) return null; + + // Try to parse D-M-YYYY HH:MM:SS format (with hyphens or dots) + const dateTimeRegex = + /^(\d{1,2})[.-](\d{1,2})[.-](\d{4}) (\d{1,2}):(\d{1,2}):(\d{1,2})$/; + const match = dateStr.match(dateTimeRegex); + + if (match) { + const day = match[1]; + const month = match[2]; + const year = match[3]; + const hour = match[4]; + const minute = match[5]; + const second = match[6]; + + // Reformat to YYYY-MM-DDTHH:MM:SS (ISO-like, but local time) + // Ensure month and day are two digits + const formattedDateStr = `${year}-${month.padStart(2, "0")}-${day.padStart(2, "0")}T${hour.padStart(2, "0")}:${minute.padStart(2, "0")}:${second.padStart(2, "0")}`; + + try { + const date = new Date(formattedDateStr); + // Basic validation: check if the constructed date is valid + if (!isNaN(date.getTime())) { + return date; + } + } catch (e) { + console.warn( + `[safeParseDate] Error parsing reformatted string ${formattedDateStr} from ${dateStr}:`, + e + ); + } + } + + // Fallback for other potential formats (e.g., direct ISO 8601) or if the primary parse failed + try { + const parsedDate = new Date(dateStr); + if (!isNaN(parsedDate.getTime())) { + return parsedDate; + } + } catch (e) { + console.warn(`[safeParseDate] Error parsing with fallback ${dateStr}:`, e); + } + + console.warn(`Failed to parse date string: ${dateStr}`); + return null; +} + +/** + * Fetches transcript content from a URL + * @param {string} url The URL to fetch the transcript from + * @param {string} username Optional username for authentication + * @param {string} password Optional password for authentication + * @returns {Promise} The transcript content or null if fetching fails + */ +async function fetchTranscriptContent(url, username, password) { + try { + const authHeader = + username && password + ? "Basic " + Buffer.from(`${username}:${password}`).toString("base64") + : undefined; + + const response = await fetch(url, { + headers: authHeader ? { Authorization: authHeader } : {}, + timeout: 10000, // 10 second timeout + }); + + if (!response.ok) { + // Only log error once per batch, not for every transcript + if (Math.random() < 0.1) { // Log ~10% of errors to avoid spam + console.warn(`[CSV] Transcript fetch failed for ${url}: ${response.status} ${response.statusText}`); + } + return null; + } + return await response.text(); + } catch (error) { + // Only log error once per batch, not for every transcript + if (Math.random() < 0.1) { // Log ~10% of errors to avoid spam + console.warn(`[CSV] Transcript fetch error for ${url}:`, error.message); + } + return null; + } +} + +/** + * Fetches and parses CSV data from a URL + * @param {string} url The CSV URL + * @param {string} username Optional username for authentication + * @param {string} password Optional password for authentication + * @returns {Promise} Array of parsed session data + */ +export async function fetchAndParseCsv(url, username, password) { + const authHeader = + username && password + ? "Basic " + Buffer.from(`${username}:${password}`).toString("base64") + : undefined; + + const res = await fetch(url, { + headers: authHeader ? { Authorization: authHeader } : {}, + }); + if (!res.ok) throw new Error("Failed to fetch CSV: " + res.statusText); + + const text = await res.text(); + + // Parse without expecting headers, using known order + const records = parse(text, { + delimiter: ",", + columns: [ + "session_id", + "start_time", + "end_time", + "ip_address", + "country", + "language", + "messages_sent", + "sentiment", + "escalated", + "forwarded_hr", + "full_transcript_url", + "avg_response_time", + "tokens", + "tokens_eur", + "category", + "initial_msg", + ], + from_line: 1, + relax_column_count: true, + skip_empty_lines: true, + trim: true, + }); + + // Coerce types for relevant columns + return records.map((r) => ({ + id: r.session_id, + startTime: safeParseDate(r.start_time) || new Date(), // Fallback to current date if invalid + endTime: safeParseDate(r.end_time), + ipAddress: r.ip_address, + country: getCountryCode(r.country), + language: getLanguageCode(r.language), + messagesSent: Number(r.messages_sent) || 0, + sentiment: mapSentimentToScore(r.sentiment), + escalated: isTruthyValue(r.escalated), + forwardedHr: isTruthyValue(r.forwarded_hr), + fullTranscriptUrl: r.full_transcript_url, + avgResponseTime: r.avg_response_time + ? parseFloat(r.avg_response_time) + : null, + tokens: Number(r.tokens) || 0, + tokensEur: r.tokens_eur ? parseFloat(r.tokens_eur) : 0, + category: normalizeCategory(r.category), + initialMsg: r.initial_msg, + })); +} + +/** + * Fetches and stores sessions for all companies + */ +export async function fetchAndStoreSessionsForAllCompanies() { + try { + // Get all companies + const companies = await prisma.company.findMany(); + + for (const company of companies) { + if (!company.csvUrl) { + console.log(`[Scheduler] Skipping company ${company.id} - no CSV URL configured`); + continue; + } + + // Skip companies with invalid/example URLs + if (company.csvUrl.includes('example.com') || company.csvUrl === 'https://example.com/data.csv') { + console.log(`[Scheduler] Skipping company ${company.id} - invalid/example CSV URL: ${company.csvUrl}`); + continue; + } + + console.log(`[Scheduler] Processing sessions for company: ${company.id}`); + + try { + const sessions = await fetchAndParseCsv( + company.csvUrl, + company.csvUsername, + company.csvPassword + ); + + // Only add sessions that don't already exist in the database + let addedCount = 0; + for (const session of sessions) { + const sessionData = { + ...session, + companyId: company.id, + id: + session.id || + session.sessionId || + `sess_${Date.now()}_${Math.random().toString(36).substring(2, 7)}`, + // Ensure startTime is not undefined + startTime: session.startTime || new Date(), + }; + + // Validate dates to prevent "Invalid Date" errors + const startTime = + sessionData.startTime instanceof Date && + !isNaN(sessionData.startTime.getTime()) + ? sessionData.startTime + : new Date(); + + const endTime = + session.endTime instanceof Date && !isNaN(session.endTime.getTime()) + ? session.endTime + : new Date(); + + // Fetch transcript content if URL is available + let transcriptContent = null; + if (session.fullTranscriptUrl) { + transcriptContent = await fetchTranscriptContent( + session.fullTranscriptUrl, + company.csvUsername, + company.csvPassword + ); + } + + // Check if the session already exists + const existingSession = await prisma.session.findUnique({ + where: { id: sessionData.id }, + }); + + if (existingSession) { + // Skip this session as it already exists + continue; + } + + // Only include fields that are properly typed for Prisma + await prisma.session.create({ + data: { + id: sessionData.id, + companyId: sessionData.companyId, + startTime: startTime, + endTime: endTime, + ipAddress: session.ipAddress || null, + country: session.country || null, + language: session.language || null, + messagesSent: + typeof session.messagesSent === "number" ? session.messagesSent : 0, + sentiment: + typeof session.sentiment === "number" ? session.sentiment : null, + escalated: + typeof session.escalated === "boolean" ? session.escalated : null, + forwardedHr: + typeof session.forwardedHr === "boolean" + ? session.forwardedHr + : null, + fullTranscriptUrl: session.fullTranscriptUrl || null, + transcriptContent: transcriptContent, // Add the transcript content + avgResponseTime: + typeof session.avgResponseTime === "number" + ? session.avgResponseTime + : null, + tokens: typeof session.tokens === "number" ? session.tokens : null, + tokensEur: + typeof session.tokensEur === "number" ? session.tokensEur : null, + category: session.category || null, + initialMsg: session.initialMsg || null, + }, + }); + + addedCount++; + } + + console.log(`[Scheduler] Added ${addedCount} new sessions for company ${company.id}`); + } catch (error) { + console.error(`[Scheduler] Error processing company ${company.id}:`, error); + } + } + } catch (error) { + console.error("[Scheduler] Error fetching companies:", error); + throw error; + } +} diff --git a/lib/processingScheduler.js b/lib/processingScheduler.js new file mode 100644 index 0000000..05e1a1b --- /dev/null +++ b/lib/processingScheduler.js @@ -0,0 +1,277 @@ +// Session processing scheduler - JavaScript version +import cron from "node-cron"; +import { PrismaClient } from "@prisma/client"; +import fetch from "node-fetch"; + +const prisma = new PrismaClient(); +const OPENAI_API_KEY = process.env.OPENAI_API_KEY; +const OPENAI_API_URL = "https://api.openai.com/v1/chat/completions"; + +/** + * Processes a session transcript using OpenAI API + * @param {string} sessionId The session ID + * @param {string} transcript The transcript content to process + * @returns {Promise} Processed data from OpenAI + */ +async function processTranscriptWithOpenAI(sessionId, transcript) { + if (!OPENAI_API_KEY) { + throw new Error("OPENAI_API_KEY environment variable is not set"); + } + + // Create a system message with instructions + const systemMessage = ` + You are an AI assistant tasked with analyzing chat transcripts. + Extract the following information from the transcript: + 1. The primary language used by the user (ISO 639-1 code) + 2. Number of messages sent by the user + 3. Overall sentiment (positive, neutral, or negative) + 4. Whether the conversation was escalated + 5. Whether HR contact was mentioned or provided + 6. The best-fitting category for the conversation from this list: + - Schedule & Hours + - Leave & Vacation + - Sick Leave & Recovery + - Salary & Compensation + - Contract & Hours + - Onboarding + - Offboarding + - Workwear & Staff Pass + - Team & Contacts + - Personal Questions + - Access & Login + - Social questions + - Unrecognized / Other + 7. Up to 5 paraphrased questions asked by the user (in English) + 8. A brief summary of the conversation (10-300 characters) + + Return the data in JSON format matching this schema: + { + "language": "ISO 639-1 code", + "messages_sent": number, + "sentiment": "positive|neutral|negative", + "escalated": boolean, + "forwarded_hr": boolean, + "category": "one of the categories listed above", + "questions": ["question 1", "question 2", ...], + "summary": "brief summary", + "session_id": "${sessionId}" + } + `; + + try { + const response = await fetch(OPENAI_API_URL, { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${OPENAI_API_KEY}`, + }, + body: JSON.stringify({ + model: "gpt-4-turbo", + messages: [ + { + role: "system", + content: systemMessage, + }, + { + role: "user", + content: transcript, + }, + ], + temperature: 0.3, // Lower temperature for more consistent results + response_format: { type: "json_object" }, + }), + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error(`OpenAI API error: ${response.status} - ${errorText}`); + } + + const data = await response.json(); + const processedData = JSON.parse(data.choices[0].message.content); + + // Validate the response against our expected schema + validateOpenAIResponse(processedData); + + return processedData; + } catch (error) { + process.stderr.write(`Error processing transcript with OpenAI: ${error}\n`); + throw error; + } +} + +/** + * Validates the OpenAI response against our expected schema + * @param {Object} data The data to validate + */ +function validateOpenAIResponse(data) { + // Check required fields + const requiredFields = [ + "language", + "messages_sent", + "sentiment", + "escalated", + "forwarded_hr", + "category", + "questions", + "summary", + "session_id", + ]; + + for (const field of requiredFields) { + if (!(field in data)) { + throw new Error(`Missing required field: ${field}`); + } + } + + // Validate field types + if (typeof data.language !== "string" || !/^[a-z]{2}$/.test(data.language)) { + throw new Error("Invalid language format. Expected ISO 639-1 code (e.g., 'en')"); + } + + if (typeof data.messages_sent !== "number" || data.messages_sent < 0) { + throw new Error("Invalid messages_sent. Expected non-negative number"); + } + + if (!["positive", "neutral", "negative"].includes(data.sentiment)) { + throw new Error("Invalid sentiment. Expected 'positive', 'neutral', or 'negative'"); + } + + if (typeof data.escalated !== "boolean") { + throw new Error("Invalid escalated. Expected boolean"); + } + + if (typeof data.forwarded_hr !== "boolean") { + throw new Error("Invalid forwarded_hr. Expected boolean"); + } + + const validCategories = [ + "Schedule & Hours", + "Leave & Vacation", + "Sick Leave & Recovery", + "Salary & Compensation", + "Contract & Hours", + "Onboarding", + "Offboarding", + "Workwear & Staff Pass", + "Team & Contacts", + "Personal Questions", + "Access & Login", + "Social questions", + "Unrecognized / Other", + ]; + + if (!validCategories.includes(data.category)) { + throw new Error(`Invalid category. Expected one of: ${validCategories.join(", ")}`); + } + + if (!Array.isArray(data.questions)) { + throw new Error("Invalid questions. Expected array of strings"); + } + + if (typeof data.summary !== "string" || data.summary.length < 10 || data.summary.length > 300) { + throw new Error("Invalid summary. Expected string between 10-300 characters"); + } + + if (typeof data.session_id !== "string") { + throw new Error("Invalid session_id. Expected string"); + } +} + +/** + * Process unprocessed sessions + */ +async function processUnprocessedSessions() { + process.stdout.write("[ProcessingScheduler] Starting to process unprocessed sessions...\n"); + + // Find sessions that have transcript content but haven't been processed + const sessionsToProcess = await prisma.session.findMany({ + where: { + AND: [ + { transcriptContent: { not: null } }, + { transcriptContent: { not: "" } }, + { processed: { not: true } }, // Either false or null + ], + }, + select: { + id: true, + transcriptContent: true, + }, + take: 10, // Process in batches to avoid overloading the system + }); + + if (sessionsToProcess.length === 0) { + process.stdout.write("[ProcessingScheduler] No sessions found requiring processing.\n"); + return; + } + + process.stdout.write(`[ProcessingScheduler] Found ${sessionsToProcess.length} sessions to process.\n`); + let successCount = 0; + let errorCount = 0; + + for (const session of sessionsToProcess) { + if (!session.transcriptContent) { + // Should not happen due to query, but good for type safety + process.stderr.write(`[ProcessingScheduler] Session ${session.id} has no transcript content, skipping.\n`); + continue; + } + + process.stdout.write(`[ProcessingScheduler] Processing transcript for session ${session.id}...\n`); + try { + const processedData = await processTranscriptWithOpenAI( + session.id, + session.transcriptContent + ); + + // Map sentiment string to float value for compatibility with existing data + const sentimentMap = { + positive: 0.8, + neutral: 0.0, + negative: -0.8, + }; + + // Update the session with processed data + await prisma.session.update({ + where: { id: session.id }, + data: { + language: processedData.language, + messagesSent: processedData.messages_sent, + sentiment: sentimentMap[processedData.sentiment] || 0, + sentimentCategory: processedData.sentiment, + escalated: processedData.escalated, + forwardedHr: processedData.forwarded_hr, + category: processedData.category, + questions: JSON.stringify(processedData.questions), + summary: processedData.summary, + processed: true, + }, + }); + + process.stdout.write(`[ProcessingScheduler] Successfully processed session ${session.id}.\n`); + successCount++; + } catch (error) { + process.stderr.write(`[ProcessingScheduler] Error processing session ${session.id}: ${error}\n`); + errorCount++; + } + } + + process.stdout.write("[ProcessingScheduler] Session processing complete.\n"); + process.stdout.write(`[ProcessingScheduler] Successfully processed: ${successCount} sessions.\n`); + process.stdout.write(`[ProcessingScheduler] Failed to process: ${errorCount} sessions.\n`); +} + +/** + * Start the processing scheduler + */ +export function startProcessingScheduler() { + // Process unprocessed sessions every hour + cron.schedule("0 * * * *", async () => { + try { + await processUnprocessedSessions(); + } catch (error) { + process.stderr.write(`[ProcessingScheduler] Error in scheduler: ${error}\n`); + } + }); + + process.stdout.write("[ProcessingScheduler] Started processing scheduler (runs hourly).\n"); +} diff --git a/lib/processingScheduler.ts b/lib/processingScheduler.ts new file mode 100644 index 0000000..dadf77a --- /dev/null +++ b/lib/processingScheduler.ts @@ -0,0 +1,293 @@ +// node-cron job to process unprocessed sessions every hour +import cron from "node-cron"; +import { PrismaClient } from "@prisma/client"; +import fetch from "node-fetch"; + +const prisma = new PrismaClient(); +const OPENAI_API_KEY = process.env.OPENAI_API_KEY; +const OPENAI_API_URL = "https://api.openai.com/v1/chat/completions"; + +// Define the expected response structure from OpenAI +interface OpenAIProcessedData { + language: string; + messages_sent: number; + sentiment: "positive" | "neutral" | "negative"; + escalated: boolean; + forwarded_hr: boolean; + category: string; + questions: string[]; + summary: string; + session_id: string; +} + +/** + * Processes a session transcript using OpenAI API + * @param sessionId The session ID + * @param transcript The transcript content to process + * @returns Processed data from OpenAI + */ +async function processTranscriptWithOpenAI( + sessionId: string, + transcript: string +): Promise { + if (!OPENAI_API_KEY) { + throw new Error("OPENAI_API_KEY environment variable is not set"); + } + + // Create a system message with instructions + const systemMessage = ` + You are an AI assistant tasked with analyzing chat transcripts. + Extract the following information from the transcript: + 1. The primary language used by the user (ISO 639-1 code) + 2. Number of messages sent by the user + 3. Overall sentiment (positive, neutral, or negative) + 4. Whether the conversation was escalated + 5. Whether HR contact was mentioned or provided + 6. The best-fitting category for the conversation from this list: + - Schedule & Hours + - Leave & Vacation + - Sick Leave & Recovery + - Salary & Compensation + - Contract & Hours + - Onboarding + - Offboarding + - Workwear & Staff Pass + - Team & Contacts + - Personal Questions + - Access & Login + - Social questions + - Unrecognized / Other + 7. Up to 5 paraphrased questions asked by the user (in English) + 8. A brief summary of the conversation (10-300 characters) + + Return the data in JSON format matching this schema: + { + "language": "ISO 639-1 code", + "messages_sent": number, + "sentiment": "positive|neutral|negative", + "escalated": boolean, + "forwarded_hr": boolean, + "category": "one of the categories listed above", + "questions": ["question 1", "question 2", ...], + "summary": "brief summary", + "session_id": "${sessionId}" + } + `; + + try { + const response = await fetch(OPENAI_API_URL, { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${OPENAI_API_KEY}`, + }, + body: JSON.stringify({ + model: "gpt-4-turbo", + messages: [ + { + role: "system", + content: systemMessage, + }, + { + role: "user", + content: transcript, + }, + ], + temperature: 0.3, // Lower temperature for more consistent results + response_format: { type: "json_object" }, + }), + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error(`OpenAI API error: ${response.status} - ${errorText}`); + } + + const data = await response.json() as any; + const processedData = JSON.parse(data.choices[0].message.content); + + // Validate the response against our expected schema + validateOpenAIResponse(processedData); + + return processedData; + } catch (error) { + process.stderr.write(`Error processing transcript with OpenAI: ${error}\n`); + throw error; + } +} + +/** + * Validates the OpenAI response against our expected schema + * @param data The data to validate + */ +function validateOpenAIResponse(data: any): asserts data is OpenAIProcessedData { + // Check required fields + const requiredFields = [ + "language", + "messages_sent", + "sentiment", + "escalated", + "forwarded_hr", + "category", + "questions", + "summary", + "session_id", + ]; + + for (const field of requiredFields) { + if (!(field in data)) { + throw new Error(`Missing required field: ${field}`); + } + } + + // Validate field types + if (typeof data.language !== "string" || !/^[a-z]{2}$/.test(data.language)) { + throw new Error("Invalid language format. Expected ISO 639-1 code (e.g., 'en')"); + } + + if (typeof data.messages_sent !== "number" || data.messages_sent < 0) { + throw new Error("Invalid messages_sent. Expected non-negative number"); + } + + if (!["positive", "neutral", "negative"].includes(data.sentiment)) { + throw new Error("Invalid sentiment. Expected 'positive', 'neutral', or 'negative'"); + } + + if (typeof data.escalated !== "boolean") { + throw new Error("Invalid escalated. Expected boolean"); + } + + if (typeof data.forwarded_hr !== "boolean") { + throw new Error("Invalid forwarded_hr. Expected boolean"); + } + + const validCategories = [ + "Schedule & Hours", + "Leave & Vacation", + "Sick Leave & Recovery", + "Salary & Compensation", + "Contract & Hours", + "Onboarding", + "Offboarding", + "Workwear & Staff Pass", + "Team & Contacts", + "Personal Questions", + "Access & Login", + "Social questions", + "Unrecognized / Other", + ]; + + if (!validCategories.includes(data.category)) { + throw new Error(`Invalid category. Expected one of: ${validCategories.join(", ")}`); + } + + if (!Array.isArray(data.questions)) { + throw new Error("Invalid questions. Expected array of strings"); + } + + if (typeof data.summary !== "string" || data.summary.length < 10 || data.summary.length > 300) { + throw new Error("Invalid summary. Expected string between 10-300 characters"); + } + + if (typeof data.session_id !== "string") { + throw new Error("Invalid session_id. Expected string"); + } +} + +/** + * Process unprocessed sessions + */ +async function processUnprocessedSessions() { + process.stdout.write("[ProcessingScheduler] Starting to process unprocessed sessions...\n"); + + // Find sessions that have transcript content but haven't been processed + const sessionsToProcess = await prisma.session.findMany({ + where: { + AND: [ + { transcriptContent: { not: null } }, + { transcriptContent: { not: "" } }, + { processed: { not: true } }, // Either false or null + ], + }, + select: { + id: true, + transcriptContent: true, + }, + take: 10, // Process in batches to avoid overloading the system + }); + + if (sessionsToProcess.length === 0) { + process.stdout.write("[ProcessingScheduler] No sessions found requiring processing.\n"); + return; + } + + process.stdout.write(`[ProcessingScheduler] Found ${sessionsToProcess.length} sessions to process.\n`); + let successCount = 0; + let errorCount = 0; + + for (const session of sessionsToProcess) { + if (!session.transcriptContent) { + // Should not happen due to query, but good for type safety + process.stderr.write(`[ProcessingScheduler] Session ${session.id} has no transcript content, skipping.\n`); + continue; + } + + process.stdout.write(`[ProcessingScheduler] Processing transcript for session ${session.id}...\n`); + try { + const processedData = await processTranscriptWithOpenAI( + session.id, + session.transcriptContent + ); + + // Map sentiment string to float value for compatibility with existing data + const sentimentMap: Record = { + positive: 0.8, + neutral: 0.0, + negative: -0.8, + }; + + // Update the session with processed data + await prisma.session.update({ + where: { id: session.id }, + data: { + language: processedData.language, + messagesSent: processedData.messages_sent, + sentiment: sentimentMap[processedData.sentiment] || 0, + sentimentCategory: processedData.sentiment, + escalated: processedData.escalated, + forwardedHr: processedData.forwarded_hr, + category: processedData.category, + questions: JSON.stringify(processedData.questions), + summary: processedData.summary, + processed: true, + }, + }); + + process.stdout.write(`[ProcessingScheduler] Successfully processed session ${session.id}.\n`); + successCount++; + } catch (error) { + process.stderr.write(`[ProcessingScheduler] Error processing session ${session.id}: ${error}\n`); + errorCount++; + } + } + + process.stdout.write("[ProcessingScheduler] Session processing complete.\n"); + process.stdout.write(`[ProcessingScheduler] Successfully processed: ${successCount} sessions.\n`); + process.stdout.write(`[ProcessingScheduler] Failed to process: ${errorCount} sessions.\n`); +} + +/** + * Start the processing scheduler + */ +export function startProcessingScheduler() { + // Process unprocessed sessions every hour + cron.schedule("0 * * * *", async () => { + try { + await processUnprocessedSessions(); + } catch (error) { + process.stderr.write(`[ProcessingScheduler] Error in scheduler: ${error}\n`); + } + }); + + process.stdout.write("[ProcessingScheduler] Started processing scheduler (runs hourly).\n"); +} diff --git a/lib/scheduler.js b/lib/scheduler.js new file mode 100644 index 0000000..91b6f24 --- /dev/null +++ b/lib/scheduler.js @@ -0,0 +1,35 @@ +// Session refresh scheduler - JavaScript version +import cron from "node-cron"; +import { PrismaClient } from "@prisma/client"; +import { fetchAndStoreSessionsForAllCompanies } from "./csvFetcher.js"; + +const prisma = new PrismaClient(); + +/** + * Refresh sessions for all companies + */ +async function refreshSessions() { + console.log("[Scheduler] Starting session refresh..."); + try { + await fetchAndStoreSessionsForAllCompanies(); + console.log("[Scheduler] Session refresh completed successfully."); + } catch (error) { + console.error("[Scheduler] Error during session refresh:", error); + } +} + +/** + * Start the session refresh scheduler + */ +export function startScheduler() { + // Run every 15 minutes + cron.schedule("*/15 * * * *", async () => { + try { + await refreshSessions(); + } catch (error) { + console.error("[Scheduler] Error in scheduler:", error); + } + }); + + console.log("[Scheduler] Started session refresh scheduler (runs every 15 minutes)."); +} diff --git a/lib/scheduler.ts b/lib/scheduler.ts index dcf4f30..20e3e3e 100644 --- a/lib/scheduler.ts +++ b/lib/scheduler.ts @@ -20,8 +20,7 @@ export function startScheduler() { company.csvUsername as string | undefined, company.csvPassword as string | undefined ); - await prisma.session.deleteMany({ where: { companyId: company.id } }); - + // Only add sessions that don't already exist in the database for (const session of sessions) { const sessionData: SessionCreateData = { ...session, @@ -31,6 +30,16 @@ export function startScheduler() { startTime: session.startTime || new Date(), }; + // Check if the session already exists + const existingSession = await prisma.session.findUnique({ + where: { id: sessionData.id }, + }); + + if (existingSession) { + // Skip this session as it already exists + continue; + } + // Only include fields that are properly typed for Prisma await prisma.session.create({ data: { diff --git a/lib/schedulers.ts b/lib/schedulers.ts new file mode 100644 index 0000000..72e0dfe --- /dev/null +++ b/lib/schedulers.ts @@ -0,0 +1,18 @@ +// Combined scheduler initialization +import { startScheduler } from "./scheduler"; +import { startProcessingScheduler } from "./processingScheduler"; + +/** + * Initialize all schedulers + * - Session refresh scheduler (runs every 15 minutes) + * - Session processing scheduler (runs every hour) + */ +export function initializeSchedulers() { + // Start the session refresh scheduler + startScheduler(); + + // Start the session processing scheduler + startProcessingScheduler(); + + console.log("All schedulers initialized successfully"); +} diff --git a/lib/types.ts b/lib/types.ts index 383b8ec..b234d9e 100644 --- a/lib/types.ts +++ b/lib/types.ts @@ -45,6 +45,7 @@ export interface ChatSession { country?: string | null; ipAddress?: string | null; sentiment?: number | null; + sentimentCategory?: string | null; // "positive", "neutral", "negative" from OpenAPI messagesSent?: number; startTime: Date; endTime?: Date | null; @@ -60,6 +61,9 @@ export interface ChatSession { initialMsg?: string; fullTranscriptUrl?: string | null; transcriptContent?: string | null; + processed?: boolean | null; // Flag for post-processing status + questions?: string | null; // JSON array of questions asked by user + summary?: string | null; // Brief summary of the conversation } export interface SessionQuery { diff --git a/package-lock.json b/package-lock.json index b8281ca..7aee6f9 100644 --- a/package-lock.json +++ b/package-lock.json @@ -8,7 +8,7 @@ "name": "livedash-node", "version": "0.2.0", "dependencies": { - "@prisma/client": "^6.8.2", + "@prisma/client": "^6.10.1", "@rapideditor/country-coder": "^5.4.0", "@types/d3": "^7.4.3", "@types/d3-cloud": "^1.2.9", @@ -54,7 +54,7 @@ "postcss": "^8.5.3", "prettier": "^3.5.3", "prettier-plugin-jinja-template": "^2.1.0", - "prisma": "^6.8.2", + "prisma": "^6.10.1", "tailwindcss": "^4.1.7", "ts-node": "^10.9.2", "typescript": "^5.0.0" @@ -1089,9 +1089,9 @@ } }, "node_modules/@prisma/client": { - "version": "6.8.2", - "resolved": "https://registry.npmjs.org/@prisma/client/-/client-6.8.2.tgz", - "integrity": "sha512-5II+vbyzv4si6Yunwgkj0qT/iY0zyspttoDrL3R4BYgLdp42/d2C8xdi9vqkrYtKt9H32oFIukvyw3Koz5JoDg==", + "version": "6.10.1", + "resolved": "https://registry.npmjs.org/@prisma/client/-/client-6.10.1.tgz", + "integrity": "sha512-Re4pMlcUsQsUTAYMK7EJ4Bw2kg3WfZAAlr8GjORJaK4VOP6LxRQUQ1TuLnxcF42XqGkWQ36q5CQF1yVadANQ6w==", "hasInstallScript": true, "license": "Apache-2.0", "engines": { @@ -1111,9 +1111,9 @@ } }, "node_modules/@prisma/config": { - "version": "6.8.2", - "resolved": "https://registry.npmjs.org/@prisma/config/-/config-6.8.2.tgz", - "integrity": "sha512-ZJY1fF4qRBPdLQ/60wxNtX+eu89c3AkYEcP7L3jkp0IPXCNphCYxikTg55kPJLDOG6P0X+QG5tCv6CmsBRZWFQ==", + "version": "6.10.1", + "resolved": "https://registry.npmjs.org/@prisma/config/-/config-6.10.1.tgz", + "integrity": "sha512-kz4/bnqrOrzWo8KzYguN0cden4CzLJJ+2VSpKtF8utHS3l1JS0Lhv6BLwpOX6X9yNreTbZQZwewb+/BMPDCIYQ==", "devOptional": true, "license": "Apache-2.0", "dependencies": { @@ -1121,53 +1121,53 @@ } }, "node_modules/@prisma/debug": { - "version": "6.8.2", - "resolved": "https://registry.npmjs.org/@prisma/debug/-/debug-6.8.2.tgz", - "integrity": "sha512-4muBSSUwJJ9BYth5N8tqts8JtiLT8QI/RSAzEogwEfpbYGFo9mYsInsVo8dqXdPO2+Rm5OG5q0qWDDE3nyUbVg==", + "version": "6.10.1", + "resolved": "https://registry.npmjs.org/@prisma/debug/-/debug-6.10.1.tgz", + "integrity": "sha512-k2YT53cWxv9OLjW4zSYTZ6Z7j0gPfCzcr2Mj99qsuvlxr8WAKSZ2NcSR0zLf/mP4oxnYG842IMj3utTgcd7CaA==", "devOptional": true, "license": "Apache-2.0" }, "node_modules/@prisma/engines": { - "version": "6.8.2", - "resolved": "https://registry.npmjs.org/@prisma/engines/-/engines-6.8.2.tgz", - "integrity": "sha512-XqAJ//LXjqYRQ1RRabs79KOY4+v6gZOGzbcwDQl0D6n9WBKjV7qdrbd042CwSK0v0lM9MSHsbcFnU2Yn7z8Zlw==", + "version": "6.10.1", + "resolved": "https://registry.npmjs.org/@prisma/engines/-/engines-6.10.1.tgz", + "integrity": "sha512-Q07P5rS2iPwk2IQr/rUQJ42tHjpPyFcbiH7PXZlV81Ryr9NYIgdxcUrwgVOWVm5T7ap02C0dNd1dpnNcSWig8A==", "devOptional": true, "hasInstallScript": true, "license": "Apache-2.0", "dependencies": { - "@prisma/debug": "6.8.2", - "@prisma/engines-version": "6.8.0-43.2060c79ba17c6bb9f5823312b6f6b7f4a845738e", - "@prisma/fetch-engine": "6.8.2", - "@prisma/get-platform": "6.8.2" + "@prisma/debug": "6.10.1", + "@prisma/engines-version": "6.10.1-1.9b628578b3b7cae625e8c927178f15a170e74a9c", + "@prisma/fetch-engine": "6.10.1", + "@prisma/get-platform": "6.10.1" } }, "node_modules/@prisma/engines-version": { - "version": "6.8.0-43.2060c79ba17c6bb9f5823312b6f6b7f4a845738e", - "resolved": "https://registry.npmjs.org/@prisma/engines-version/-/engines-version-6.8.0-43.2060c79ba17c6bb9f5823312b6f6b7f4a845738e.tgz", - "integrity": "sha512-Rkik9lMyHpFNGaLpPF3H5q5TQTkm/aE7DsGM5m92FZTvWQsvmi6Va8On3pWvqLHOt5aPUvFb/FeZTmphI4CPiQ==", + "version": "6.10.1-1.9b628578b3b7cae625e8c927178f15a170e74a9c", + "resolved": "https://registry.npmjs.org/@prisma/engines-version/-/engines-version-6.10.1-1.9b628578b3b7cae625e8c927178f15a170e74a9c.tgz", + "integrity": "sha512-ZJFTsEqapiTYVzXya6TUKYDFnSWCNegfUiG5ik9fleQva5Sk3DNyyUi7X1+0ZxWFHwHDr6BZV5Vm+iwP+LlciA==", "devOptional": true, "license": "Apache-2.0" }, "node_modules/@prisma/fetch-engine": { - "version": "6.8.2", - "resolved": "https://registry.npmjs.org/@prisma/fetch-engine/-/fetch-engine-6.8.2.tgz", - "integrity": "sha512-lCvikWOgaLOfqXGacEKSNeenvj0n3qR5QvZUOmPE2e1Eh8cMYSobxonCg9rqM6FSdTfbpqp9xwhSAOYfNqSW0g==", + "version": "6.10.1", + "resolved": "https://registry.npmjs.org/@prisma/fetch-engine/-/fetch-engine-6.10.1.tgz", + "integrity": "sha512-clmbG/Jgmrc/n6Y77QcBmAUlq9LrwI9Dbgy4pq5jeEARBpRCWJDJ7PWW1P8p0LfFU0i5fsyO7FqRzRB8mkdS4g==", "devOptional": true, "license": "Apache-2.0", "dependencies": { - "@prisma/debug": "6.8.2", - "@prisma/engines-version": "6.8.0-43.2060c79ba17c6bb9f5823312b6f6b7f4a845738e", - "@prisma/get-platform": "6.8.2" + "@prisma/debug": "6.10.1", + "@prisma/engines-version": "6.10.1-1.9b628578b3b7cae625e8c927178f15a170e74a9c", + "@prisma/get-platform": "6.10.1" } }, "node_modules/@prisma/get-platform": { - "version": "6.8.2", - "resolved": "https://registry.npmjs.org/@prisma/get-platform/-/get-platform-6.8.2.tgz", - "integrity": "sha512-vXSxyUgX3vm1Q70QwzwkjeYfRryIvKno1SXbIqwSptKwqKzskINnDUcx85oX+ys6ooN2ATGSD0xN2UTfg6Zcow==", + "version": "6.10.1", + "resolved": "https://registry.npmjs.org/@prisma/get-platform/-/get-platform-6.10.1.tgz", + "integrity": "sha512-4CY5ndKylcsce9Mv+VWp5obbR2/86SHOLVV053pwIkhVtT9C9A83yqiqI/5kJM9T1v1u1qco/bYjDKycmei9HA==", "devOptional": true, "license": "Apache-2.0", "dependencies": { - "@prisma/debug": "6.8.2" + "@prisma/debug": "6.10.1" } }, "node_modules/@rapideditor/country-coder": { @@ -7860,15 +7860,15 @@ "license": "MIT" }, "node_modules/prisma": { - "version": "6.8.2", - "resolved": "https://registry.npmjs.org/prisma/-/prisma-6.8.2.tgz", - "integrity": "sha512-JNricTXQxzDtRS7lCGGOB4g5DJ91eg3nozdubXze3LpcMl1oWwcFddrj++Up3jnRE6X/3gB/xz3V+ecBk/eEGA==", + "version": "6.10.1", + "resolved": "https://registry.npmjs.org/prisma/-/prisma-6.10.1.tgz", + "integrity": "sha512-khhlC/G49E4+uyA3T3H5PRBut486HD2bDqE2+rvkU0pwk9IAqGFacLFUyIx9Uw+W2eCtf6XGwsp+/strUwMNPw==", "devOptional": true, "hasInstallScript": true, "license": "Apache-2.0", "dependencies": { - "@prisma/config": "6.8.2", - "@prisma/engines": "6.8.2" + "@prisma/config": "6.10.1", + "@prisma/engines": "6.10.1" }, "bin": { "prisma": "build/index.js" diff --git a/package.json b/package.json index 40deb45..f59c062 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,7 @@ "version": "0.2.0", "private": true, "dependencies": { - "@prisma/client": "^6.8.2", + "@prisma/client": "^6.10.1", "@rapideditor/country-coder": "^5.4.0", "@types/d3": "^7.4.3", "@types/d3-cloud": "^1.2.9", @@ -50,7 +50,7 @@ "postcss": "^8.5.3", "prettier": "^3.5.3", "prettier-plugin-jinja-template": "^2.1.0", - "prisma": "^6.8.2", + "prisma": "^6.10.1", "tailwindcss": "^4.1.7", "ts-node": "^10.9.2", "typescript": "^5.0.0" @@ -58,6 +58,7 @@ "scripts": { "build": "next build", "dev": "next dev --turbopack", + "dev:with-schedulers": "node server.mjs", "format": "npx prettier --write .", "format:check": "npx prettier --check .", "lint": "next lint", @@ -66,7 +67,7 @@ "prisma:migrate": "prisma migrate dev", "prisma:seed": "node prisma/seed.mjs", "prisma:studio": "prisma studio", - "start": "next start", + "start": "node server.mjs", "lint:md": "markdownlint-cli2 \"**/*.md\" \"!.trunk/**\" \"!.venv/**\" \"!node_modules/**\"", "lint:md:fix": "markdownlint-cli2 --fix \"**/*.md\" \"!.trunk/**\" \"!.venv/**\" \"!node_modules/**\"" }, diff --git a/pages/api/admin/refresh-sessions.ts b/pages/api/admin/refresh-sessions.ts index 4a545e3..910a143 100644 --- a/pages/api/admin/refresh-sessions.ts +++ b/pages/api/admin/refresh-sessions.ts @@ -14,11 +14,25 @@ interface SessionCreateData { /** * Fetches transcript content from a URL * @param url The URL to fetch the transcript from + * @param username Optional username for authentication + * @param password Optional password for authentication * @returns The transcript content or null if fetching fails */ -async function fetchTranscriptContent(url: string): Promise { +async function fetchTranscriptContent( + url: string, + username?: string, + password?: string +): Promise { try { - const response = await fetch(url); + const authHeader = + username && password + ? "Basic " + Buffer.from(`${username}:${password}`).toString("base64") + : undefined; + + const response = await fetch(url, { + headers: authHeader ? { Authorization: authHeader } : {}, + }); + if (!response.ok) { process.stderr.write( `Error fetching transcript: ${response.statusText}\n` @@ -80,9 +94,7 @@ export default async function handler( company.csvPassword as string | undefined ); - // Replace all session rows for this company (for demo simplicity) - await prisma.session.deleteMany({ where: { companyId: company.id } }); - + // Only add sessions that don't already exist in the database for (const session of sessions) { const sessionData: SessionCreateData = { ...session, @@ -111,10 +123,22 @@ export default async function handler( let transcriptContent: string | null = null; if (session.fullTranscriptUrl) { transcriptContent = await fetchTranscriptContent( - session.fullTranscriptUrl + session.fullTranscriptUrl, + company.csvUsername as string | undefined, + company.csvPassword as string | undefined ); } + // Check if the session already exists + const existingSession = await prisma.session.findUnique({ + where: { id: sessionData.id }, + }); + + if (existingSession) { + // Skip this session as it already exists + continue; + } + // Only include fields that are properly typed for Prisma await prisma.session.create({ data: { diff --git a/pages/api/dashboard/session/[id].ts b/pages/api/dashboard/session/[id].ts index 8e2eb73..6997453 100644 --- a/pages/api/dashboard/session/[id].ts +++ b/pages/api/dashboard/session/[id].ts @@ -46,6 +46,7 @@ export default async function handler( country: prismaSession.country ?? null, ipAddress: prismaSession.ipAddress ?? null, sentiment: prismaSession.sentiment ?? null, + sentimentCategory: prismaSession.sentimentCategory ?? null, // New field messagesSent: prismaSession.messagesSent ?? undefined, // Use undefined if ChatSession expects number | undefined avgResponseTime: prismaSession.avgResponseTime ?? null, escalated: prismaSession.escalated ?? undefined, @@ -55,6 +56,9 @@ export default async function handler( initialMsg: prismaSession.initialMsg ?? undefined, fullTranscriptUrl: prismaSession.fullTranscriptUrl ?? null, transcriptContent: prismaSession.transcriptContent ?? null, + processed: prismaSession.processed ?? null, // New field + questions: prismaSession.questions ?? null, // New field + summary: prismaSession.summary ?? null, // New field }; return res.status(200).json({ session }); diff --git a/prisma/migrations/20250625132619_add_processed_field/migration.sql b/prisma/migrations/20250625132619_add_processed_field/migration.sql new file mode 100644 index 0000000..ba7095e --- /dev/null +++ b/prisma/migrations/20250625132619_add_processed_field/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "Session" ADD COLUMN "processed" BOOLEAN; diff --git a/prisma/migrations/20250625132932_add_openapi_processing_fields/migration.sql b/prisma/migrations/20250625132932_add_openapi_processing_fields/migration.sql new file mode 100644 index 0000000..c080287 --- /dev/null +++ b/prisma/migrations/20250625132932_add_openapi_processing_fields/migration.sql @@ -0,0 +1,4 @@ +-- AlterTable +ALTER TABLE "Session" ADD COLUMN "questions" TEXT; +ALTER TABLE "Session" ADD COLUMN "sentimentCategory" TEXT; +ALTER TABLE "Session" ADD COLUMN "summary" TEXT; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 8f56443..7eeee88 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -43,7 +43,8 @@ model Session { country String? language String? messagesSent Int? - sentiment Float? + sentiment Float? // Original sentiment score (float) + sentimentCategory String? // "positive", "neutral", "negative" from OpenAPI escalated Boolean? forwardedHr Boolean? fullTranscriptUrl String? @@ -53,5 +54,8 @@ model Session { tokensEur Float? category String? initialMsg String? + processed Boolean? // Flag for post-processing status + questions String? // JSON array of questions asked by user + summary String? // Brief summary of the conversation createdAt DateTime @default(now()) } diff --git a/scripts/process_sessions.mjs b/scripts/process_sessions.mjs new file mode 100644 index 0000000..f3d4e86 --- /dev/null +++ b/scripts/process_sessions.mjs @@ -0,0 +1,269 @@ +// Script to manually process unprocessed sessions with OpenAI +import { PrismaClient } from "@prisma/client"; +import fetch from "node-fetch"; + +const prisma = new PrismaClient(); +const OPENAI_API_KEY = process.env.OPENAI_API_KEY; +const OPENAI_API_URL = "https://api.openai.com/v1/chat/completions"; + +/** + * Processes a session transcript using OpenAI API + * @param {string} sessionId The session ID + * @param {string} transcript The transcript content to process + * @returns {Promise} Processed data from OpenAI + */ +async function processTranscriptWithOpenAI(sessionId, transcript) { + if (!OPENAI_API_KEY) { + throw new Error("OPENAI_API_KEY environment variable is not set"); + } + + // Create a system message with instructions + const systemMessage = ` + You are an AI assistant tasked with analyzing chat transcripts. + Extract the following information from the transcript: + 1. The primary language used by the user (ISO 639-1 code) + 2. Number of messages sent by the user + 3. Overall sentiment (positive, neutral, or negative) + 4. Whether the conversation was escalated + 5. Whether HR contact was mentioned or provided + 6. The best-fitting category for the conversation from this list: + - Schedule & Hours + - Leave & Vacation + - Sick Leave & Recovery + - Salary & Compensation + - Contract & Hours + - Onboarding + - Offboarding + - Workwear & Staff Pass + - Team & Contacts + - Personal Questions + - Access & Login + - Social questions + - Unrecognized / Other + 7. Up to 5 paraphrased questions asked by the user (in English) + 8. A brief summary of the conversation (10-300 characters) + + Return the data in JSON format matching this schema: + { + "language": "ISO 639-1 code", + "messages_sent": number, + "sentiment": "positive|neutral|negative", + "escalated": boolean, + "forwarded_hr": boolean, + "category": "one of the categories listed above", + "questions": ["question 1", "question 2", ...], + "summary": "brief summary", + "session_id": "${sessionId}" + } + `; + + try { + const response = await fetch(OPENAI_API_URL, { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${OPENAI_API_KEY}`, + }, + body: JSON.stringify({ + model: "gpt-4-turbo", + messages: [ + { + role: "system", + content: systemMessage, + }, + { + role: "user", + content: transcript, + }, + ], + temperature: 0.3, // Lower temperature for more consistent results + response_format: { type: "json_object" }, + }), + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error(`OpenAI API error: ${response.status} - ${errorText}`); + } + + const data = await response.json(); + const processedData = JSON.parse(data.choices[0].message.content); + + // Validate the response against our expected schema + validateOpenAIResponse(processedData); + + return processedData; + } catch (error) { + console.error(`Error processing transcript with OpenAI:`, error); + throw error; + } +} + +/** + * Validates the OpenAI response against our expected schema + * @param {Object} data The data to validate + */ +function validateOpenAIResponse(data) { + // Check required fields + const requiredFields = [ + "language", + "messages_sent", + "sentiment", + "escalated", + "forwarded_hr", + "category", + "questions", + "summary", + "session_id", + ]; + + for (const field of requiredFields) { + if (!(field in data)) { + throw new Error(`Missing required field: ${field}`); + } + } + + // Validate field types + if (typeof data.language !== "string" || !/^[a-z]{2}$/.test(data.language)) { + throw new Error("Invalid language format. Expected ISO 639-1 code (e.g., 'en')"); + } + + if (typeof data.messages_sent !== "number" || data.messages_sent < 0) { + throw new Error("Invalid messages_sent. Expected non-negative number"); + } + + if (!["positive", "neutral", "negative"].includes(data.sentiment)) { + throw new Error("Invalid sentiment. Expected 'positive', 'neutral', or 'negative'"); + } + + if (typeof data.escalated !== "boolean") { + throw new Error("Invalid escalated. Expected boolean"); + } + + if (typeof data.forwarded_hr !== "boolean") { + throw new Error("Invalid forwarded_hr. Expected boolean"); + } + + const validCategories = [ + "Schedule & Hours", + "Leave & Vacation", + "Sick Leave & Recovery", + "Salary & Compensation", + "Contract & Hours", + "Onboarding", + "Offboarding", + "Workwear & Staff Pass", + "Team & Contacts", + "Personal Questions", + "Access & Login", + "Social questions", + "Unrecognized / Other", + ]; + + if (!validCategories.includes(data.category)) { + throw new Error(`Invalid category. Expected one of: ${validCategories.join(", ")}`); + } + + if (!Array.isArray(data.questions)) { + throw new Error("Invalid questions. Expected array of strings"); + } + + if (typeof data.summary !== "string" || data.summary.length < 10 || data.summary.length > 300) { + throw new Error("Invalid summary. Expected string between 10-300 characters"); + } + + if (typeof data.session_id !== "string") { + throw new Error("Invalid session_id. Expected string"); + } +} + +/** + * Main function to process unprocessed sessions + */ +async function processUnprocessedSessions() { + console.log("Starting to process unprocessed sessions..."); + + // Find sessions that have transcript content but haven't been processed + const sessionsToProcess = await prisma.session.findMany({ + where: { + AND: [ + { transcriptContent: { not: null } }, + { transcriptContent: { not: "" } }, + { processed: { not: true } }, // Either false or null + ], + }, + select: { + id: true, + transcriptContent: true, + }, + }); + + if (sessionsToProcess.length === 0) { + console.log("No sessions found requiring processing."); + return; + } + + console.log(`Found ${sessionsToProcess.length} sessions to process.`); + let successCount = 0; + let errorCount = 0; + + for (const session of sessionsToProcess) { + if (!session.transcriptContent) { + // Should not happen due to query, but good for type safety + console.warn(`Session ${session.id} has no transcript content, skipping.`); + continue; + } + + console.log(`Processing transcript for session ${session.id}...`); + try { + const processedData = await processTranscriptWithOpenAI( + session.id, + session.transcriptContent + ); + + // Map sentiment string to float value for compatibility with existing data + const sentimentMap = { + positive: 0.8, + neutral: 0.0, + negative: -0.8, + }; + + // Update the session with processed data + await prisma.session.update({ + where: { id: session.id }, + data: { + language: processedData.language, + messagesSent: processedData.messages_sent, + sentiment: sentimentMap[processedData.sentiment] || 0, + sentimentCategory: processedData.sentiment, + escalated: processedData.escalated, + forwardedHr: processedData.forwarded_hr, + category: processedData.category, + questions: JSON.stringify(processedData.questions), + summary: processedData.summary, + processed: true, + }, + }); + + console.log(`Successfully processed session ${session.id}.`); + successCount++; + } catch (error) { + console.error(`Error processing session ${session.id}:`, error); + errorCount++; + } + } + + console.log("Session processing complete."); + console.log(`Successfully processed: ${successCount} sessions.`); + console.log(`Failed to process: ${errorCount} sessions.`); +} + +// Run the main function +processUnprocessedSessions() + .catch((e) => { + console.error("An error occurred during the script execution:", e); + process.exitCode = 1; + }) + .finally(async () => { + await prisma.$disconnect(); + }); diff --git a/scripts/process_sessions.ts b/scripts/process_sessions.ts new file mode 100644 index 0000000..ae487a6 --- /dev/null +++ b/scripts/process_sessions.ts @@ -0,0 +1,284 @@ +import { PrismaClient } from "@prisma/client"; +import fetch from "node-fetch"; + +const prisma = new PrismaClient(); +const OPENAI_API_KEY = process.env.OPENAI_API_KEY; +const OPENAI_API_URL = "https://api.openai.com/v1/chat/completions"; + +// Define the expected response structure from OpenAI +interface OpenAIProcessedData { + language: string; + messages_sent: number; + sentiment: "positive" | "neutral" | "negative"; + escalated: boolean; + forwarded_hr: boolean; + category: string; + questions: string[]; + summary: string; + session_id: string; +} + +/** + * Processes a session transcript using OpenAI API + * @param sessionId The session ID + * @param transcript The transcript content to process + * @returns Processed data from OpenAI + */ +async function processTranscriptWithOpenAI( + sessionId: string, + transcript: string +): Promise { + if (!OPENAI_API_KEY) { + throw new Error("OPENAI_API_KEY environment variable is not set"); + } + + // Create a system message with instructions + const systemMessage = ` + You are an AI assistant tasked with analyzing chat transcripts. + Extract the following information from the transcript: + 1. The primary language used by the user (ISO 639-1 code) + 2. Number of messages sent by the user + 3. Overall sentiment (positive, neutral, or negative) + 4. Whether the conversation was escalated + 5. Whether HR contact was mentioned or provided + 6. The best-fitting category for the conversation from this list: + - Schedule & Hours + - Leave & Vacation + - Sick Leave & Recovery + - Salary & Compensation + - Contract & Hours + - Onboarding + - Offboarding + - Workwear & Staff Pass + - Team & Contacts + - Personal Questions + - Access & Login + - Social questions + - Unrecognized / Other + 7. Up to 5 paraphrased questions asked by the user (in English) + 8. A brief summary of the conversation (10-300 characters) + + Return the data in JSON format matching this schema: + { + "language": "ISO 639-1 code", + "messages_sent": number, + "sentiment": "positive|neutral|negative", + "escalated": boolean, + "forwarded_hr": boolean, + "category": "one of the categories listed above", + "questions": ["question 1", "question 2", ...], + "summary": "brief summary", + "session_id": "${sessionId}" + } + `; + + try { + const response = await fetch(OPENAI_API_URL, { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${OPENAI_API_KEY}`, + }, + body: JSON.stringify({ + model: "gpt-4-turbo", + messages: [ + { + role: "system", + content: systemMessage, + }, + { + role: "user", + content: transcript, + }, + ], + temperature: 0.3, // Lower temperature for more consistent results + response_format: { type: "json_object" }, + }), + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error(`OpenAI API error: ${response.status} - ${errorText}`); + } + + const data = await response.json() as any; + const processedData = JSON.parse(data.choices[0].message.content); + + // Validate the response against our expected schema + validateOpenAIResponse(processedData); + + return processedData; + } catch (error) { + console.error(`Error processing transcript with OpenAI:`, error); + throw error; + } +} + +/** + * Validates the OpenAI response against our expected schema + * @param data The data to validate + */ +function validateOpenAIResponse(data: any): asserts data is OpenAIProcessedData { + // Check required fields + const requiredFields = [ + "language", + "messages_sent", + "sentiment", + "escalated", + "forwarded_hr", + "category", + "questions", + "summary", + "session_id", + ]; + + for (const field of requiredFields) { + if (!(field in data)) { + throw new Error(`Missing required field: ${field}`); + } + } + + // Validate field types + if (typeof data.language !== "string" || !/^[a-z]{2}$/.test(data.language)) { + throw new Error("Invalid language format. Expected ISO 639-1 code (e.g., 'en')"); + } + + if (typeof data.messages_sent !== "number" || data.messages_sent < 0) { + throw new Error("Invalid messages_sent. Expected non-negative number"); + } + + if (!["positive", "neutral", "negative"].includes(data.sentiment)) { + throw new Error("Invalid sentiment. Expected 'positive', 'neutral', or 'negative'"); + } + + if (typeof data.escalated !== "boolean") { + throw new Error("Invalid escalated. Expected boolean"); + } + + if (typeof data.forwarded_hr !== "boolean") { + throw new Error("Invalid forwarded_hr. Expected boolean"); + } + + const validCategories = [ + "Schedule & Hours", + "Leave & Vacation", + "Sick Leave & Recovery", + "Salary & Compensation", + "Contract & Hours", + "Onboarding", + "Offboarding", + "Workwear & Staff Pass", + "Team & Contacts", + "Personal Questions", + "Access & Login", + "Social questions", + "Unrecognized / Other", + ]; + + if (!validCategories.includes(data.category)) { + throw new Error(`Invalid category. Expected one of: ${validCategories.join(", ")}`); + } + + if (!Array.isArray(data.questions)) { + throw new Error("Invalid questions. Expected array of strings"); + } + + if (typeof data.summary !== "string" || data.summary.length < 10 || data.summary.length > 300) { + throw new Error("Invalid summary. Expected string between 10-300 characters"); + } + + if (typeof data.session_id !== "string") { + throw new Error("Invalid session_id. Expected string"); + } +} + +/** + * Main function to process unprocessed sessions + */ +async function processUnprocessedSessions() { + console.log("Starting to process unprocessed sessions..."); + + // Find sessions that have transcript content but haven't been processed + const sessionsToProcess = await prisma.session.findMany({ + where: { + AND: [ + { transcriptContent: { not: null } }, + { transcriptContent: { not: "" } }, + { processed: { not: true } }, // Either false or null + ], + }, + select: { + id: true, + transcriptContent: true, + }, + }); + + if (sessionsToProcess.length === 0) { + console.log("No sessions found requiring processing."); + return; + } + + console.log(`Found ${sessionsToProcess.length} sessions to process.`); + let successCount = 0; + let errorCount = 0; + + for (const session of sessionsToProcess) { + if (!session.transcriptContent) { + // Should not happen due to query, but good for type safety + console.warn(`Session ${session.id} has no transcript content, skipping.`); + continue; + } + + console.log(`Processing transcript for session ${session.id}...`); + try { + const processedData = await processTranscriptWithOpenAI( + session.id, + session.transcriptContent + ); + + // Map sentiment string to float value for compatibility with existing data + const sentimentMap: Record = { + positive: 0.8, + neutral: 0.0, + negative: -0.8, + }; + + // Update the session with processed data + await prisma.session.update({ + where: { id: session.id }, + data: { + language: processedData.language, + messagesSent: processedData.messages_sent, + sentiment: sentimentMap[processedData.sentiment] || 0, + sentimentCategory: processedData.sentiment, + escalated: processedData.escalated, + forwardedHr: processedData.forwarded_hr, + category: processedData.category, + questions: JSON.stringify(processedData.questions), + summary: processedData.summary, + processed: true, + }, + }); + + console.log(`Successfully processed session ${session.id}.`); + successCount++; + } catch (error) { + console.error(`Error processing session ${session.id}:`, error); + errorCount++; + } + } + + console.log("Session processing complete."); + console.log(`Successfully processed: ${successCount} sessions.`); + console.log(`Failed to process: ${errorCount} sessions.`); +} + +// Run the main function +processUnprocessedSessions() + .catch((e) => { + console.error("An error occurred during the script execution:", e); + process.exitCode = 1; + }) + .finally(async () => { + await prisma.$disconnect(); + }); diff --git a/server.js b/server.js new file mode 100644 index 0000000..fd2e5af --- /dev/null +++ b/server.js @@ -0,0 +1,39 @@ +// Custom Next.js server with scheduler initialization +const { createServer } = require('http'); +const { parse } = require('url'); +const next = require('next'); +const { startScheduler } = require('./lib/scheduler'); +const { startProcessingScheduler } = require('./lib/processingScheduler'); + +const dev = process.env.NODE_ENV !== 'production'; +const hostname = 'localhost'; +const port = process.env.PORT || 3000; + +// Initialize Next.js +const app = next({ dev, hostname, port }); +const handle = app.getRequestHandler(); + +app.prepare().then(() => { + // Initialize schedulers when the server starts + console.log('Starting schedulers...'); + startScheduler(); + startProcessingScheduler(); + console.log('All schedulers initialized successfully'); + + createServer(async (req, res) => { + try { + // Parse the URL + const parsedUrl = parse(req.url, true); + + // Let Next.js handle the request + await handle(req, res, parsedUrl); + } catch (err) { + console.error('Error occurred handling', req.url, err); + res.statusCode = 500; + res.end('Internal Server Error'); + } + }).listen(port, (err) => { + if (err) throw err; + console.log(`> Ready on http://${hostname}:${port}`); + }); +}); diff --git a/server.mjs b/server.mjs new file mode 100644 index 0000000..4143071 --- /dev/null +++ b/server.mjs @@ -0,0 +1,56 @@ +// Custom Next.js server with scheduler initialization +import { createServer } from 'http'; +import { parse } from 'url'; +import next from 'next'; + +// We'll need to dynamically import these after they're compiled +let startScheduler; +let startProcessingScheduler; + +const dev = process.env.NODE_ENV !== 'production'; +const hostname = 'localhost'; +const port = parseInt(process.env.PORT || '3000', 10); + +// Initialize Next.js +const app = next({ dev, hostname, port }); +const handle = app.getRequestHandler(); + +async function init() { + try { + // Dynamically import the schedulers + const scheduler = await import('./lib/scheduler.js'); + const processingScheduler = await import('./lib/processingScheduler.js'); + + startScheduler = scheduler.startScheduler; + startProcessingScheduler = processingScheduler.startProcessingScheduler; + + app.prepare().then(() => { + // Initialize schedulers when the server starts + console.log('Starting schedulers...'); + startScheduler(); + startProcessingScheduler(); + console.log('All schedulers initialized successfully'); + + createServer(async (req, res) => { + try { + // Parse the URL + const parsedUrl = parse(req.url || '', true); + + // Let Next.js handle the request + await handle(req, res, parsedUrl); + } catch (err) { + console.error('Error occurred handling', req.url, err); + res.statusCode = 500; + res.end('Internal Server Error'); + } + }).listen(port, () => { + console.log(`> Ready on http://${hostname}:${port}`); + }); + }); + } catch (error) { + console.error('Failed to initialize server:', error); + process.exit(1); + } +} + +init(); diff --git a/server.ts b/server.ts new file mode 100644 index 0000000..f99d6fb --- /dev/null +++ b/server.ts @@ -0,0 +1,38 @@ +// Custom Next.js server with scheduler initialization +import { createServer } from 'http'; +import { parse } from 'url'; +import next from 'next'; +import { startScheduler } from './lib/scheduler.js'; +import { startProcessingScheduler } from './lib/processingScheduler.js'; + +const dev = process.env.NODE_ENV !== 'production'; +const hostname = 'localhost'; +const port = parseInt(process.env.PORT || '3000', 10); + +// Initialize Next.js +const app = next({ dev, hostname, port }); +const handle = app.getRequestHandler(); + +app.prepare().then(() => { + // Initialize schedulers when the server starts + console.log('Starting schedulers...'); + startScheduler(); + startProcessingScheduler(); + console.log('All schedulers initialized successfully'); + + createServer(async (req, res) => { + try { + // Parse the URL + const parsedUrl = parse(req.url || '', true); + + // Let Next.js handle the request + await handle(req, res, parsedUrl); + } catch (err) { + console.error('Error occurred handling', req.url, err); + res.statusCode = 500; + res.end('Internal Server Error'); + } + }).listen(port, () => { + console.log(`> Ready on http://${hostname}:${port}`); + }); +});