From 601e2e402698599401a39d1794ab27e1727ab64e Mon Sep 17 00:00:00 2001 From: Max Kowalski Date: Fri, 27 Jun 2025 20:02:16 +0200 Subject: [PATCH] feat: enhance environment variable parsing to handle quotes, comments, and whitespace; add transcript parsing utility for structured message extraction --- lib/env.ts | 64 +++++-- lib/transcriptParser.ts | 360 ++++++++++++++++++++++++++++++++++++++++ tests/unit/env.test.ts | 61 ++++++- 3 files changed, 463 insertions(+), 22 deletions(-) create mode 100644 lib/transcriptParser.ts diff --git a/lib/env.ts b/lib/env.ts index 3fc827f..64477a6 100644 --- a/lib/env.ts +++ b/lib/env.ts @@ -3,6 +3,41 @@ import { readFileSync } from "fs"; import { fileURLToPath } from "url"; import { dirname, join } from "path"; +/** + * Parse environment variable value by removing quotes, comments, and trimming whitespace + */ +function parseEnvValue(value: string | undefined): string { + if (!value) return ''; + + // Trim whitespace + let cleaned = value.trim(); + + // Remove inline comments (everything after #) + const commentIndex = cleaned.indexOf('#'); + if (commentIndex !== -1) { + cleaned = cleaned.substring(0, commentIndex).trim(); + } + + // Remove surrounding quotes (both single and double) + if ((cleaned.startsWith('"') && cleaned.endsWith('"')) || + (cleaned.startsWith("'") && cleaned.endsWith("'"))) { + cleaned = cleaned.slice(1, -1); + } + + return cleaned; +} + +/** + * Parse integer with fallback to default value + */ +function parseIntWithDefault(value: string | undefined, defaultValue: number): number { + const cleaned = parseEnvValue(value); + if (!cleaned) return defaultValue; + + const parsed = parseInt(cleaned, 10); + return isNaN(parsed) ? defaultValue : parsed; +} + // Load environment variables from .env.local const __filename = fileURLToPath(import.meta.url); const __dirname = dirname(__filename); @@ -16,9 +51,10 @@ try { envVars.forEach(line => { const [key, ...valueParts] = line.split('='); if (key && valueParts.length > 0) { - const value = valueParts.join('=').trim(); + const rawValue = valueParts.join('='); + const cleanedValue = parseEnvValue(rawValue); if (!process.env[key.trim()]) { - process.env[key.trim()] = value; + process.env[key.trim()] = cleanedValue; } } }); @@ -31,24 +67,24 @@ try { */ export const env = { // NextAuth - NEXTAUTH_URL: process.env.NEXTAUTH_URL || 'http://localhost:3000', - NEXTAUTH_SECRET: process.env.NEXTAUTH_SECRET || '', - NODE_ENV: process.env.NODE_ENV || 'development', + NEXTAUTH_URL: parseEnvValue(process.env.NEXTAUTH_URL) || 'http://localhost:3000', + NEXTAUTH_SECRET: parseEnvValue(process.env.NEXTAUTH_SECRET) || '', + NODE_ENV: parseEnvValue(process.env.NODE_ENV) || 'development', // OpenAI - OPENAI_API_KEY: process.env.OPENAI_API_KEY || '', + OPENAI_API_KEY: parseEnvValue(process.env.OPENAI_API_KEY) || '', // Scheduler Configuration - SCHEDULER_ENABLED: process.env.SCHEDULER_ENABLED === 'true', - CSV_IMPORT_INTERVAL: process.env.CSV_IMPORT_INTERVAL || '*/15 * * * *', - IMPORT_PROCESSING_INTERVAL: process.env.IMPORT_PROCESSING_INTERVAL || '*/5 * * * *', - IMPORT_PROCESSING_BATCH_SIZE: parseInt(process.env.IMPORT_PROCESSING_BATCH_SIZE || '50', 10), - SESSION_PROCESSING_INTERVAL: process.env.SESSION_PROCESSING_INTERVAL || '0 * * * *', - SESSION_PROCESSING_BATCH_SIZE: parseInt(process.env.SESSION_PROCESSING_BATCH_SIZE || '0', 10), - SESSION_PROCESSING_CONCURRENCY: parseInt(process.env.SESSION_PROCESSING_CONCURRENCY || '5', 10), + SCHEDULER_ENABLED: parseEnvValue(process.env.SCHEDULER_ENABLED) === 'true', + CSV_IMPORT_INTERVAL: parseEnvValue(process.env.CSV_IMPORT_INTERVAL) || '*/15 * * * *', + IMPORT_PROCESSING_INTERVAL: parseEnvValue(process.env.IMPORT_PROCESSING_INTERVAL) || '*/5 * * * *', + IMPORT_PROCESSING_BATCH_SIZE: parseIntWithDefault(process.env.IMPORT_PROCESSING_BATCH_SIZE, 50), + SESSION_PROCESSING_INTERVAL: parseEnvValue(process.env.SESSION_PROCESSING_INTERVAL) || '0 * * * *', + SESSION_PROCESSING_BATCH_SIZE: parseIntWithDefault(process.env.SESSION_PROCESSING_BATCH_SIZE, 0), + SESSION_PROCESSING_CONCURRENCY: parseIntWithDefault(process.env.SESSION_PROCESSING_CONCURRENCY, 5), // Server - PORT: parseInt(process.env.PORT || '3000', 10), + PORT: parseIntWithDefault(process.env.PORT, 3000), } as const; /** diff --git a/lib/transcriptParser.ts b/lib/transcriptParser.ts new file mode 100644 index 0000000..102bd85 --- /dev/null +++ b/lib/transcriptParser.ts @@ -0,0 +1,360 @@ +// Transcript parsing utility for converting raw transcript content into structured messages +import { prisma } from './prisma.js'; + +export interface ParsedMessage { + sessionId: string; + timestamp: Date; + role: string; + content: string; + order: number; +} + +export interface TranscriptParseResult { + success: boolean; + messages?: ParsedMessage[]; + error?: string; +} + +/** + * Parse European date format (DD.MM.YYYY HH:mm:ss) to Date object + */ +function parseEuropeanDate(dateStr: string): Date { + const match = dateStr.match(/(\d{2})\.(\d{2})\.(\d{4}) (\d{2}):(\d{2}):(\d{2})/); + if (!match) { + throw new Error(`Invalid date format: ${dateStr}`); + } + + const [, day, month, year, hour, minute, second] = match; + return new Date( + parseInt(year, 10), + parseInt(month, 10) - 1, // JavaScript months are 0-indexed + parseInt(day, 10), + parseInt(hour, 10), + parseInt(minute, 10), + parseInt(second, 10) + ); +} + +/** + * Parse raw transcript content into structured messages + * @param content Raw transcript content + * @param startTime Session start time + * @param endTime Session end time + * @returns Parsed messages with timestamps + */ +export function parseTranscriptToMessages( + content: string, + startTime: Date, + endTime: Date +): TranscriptParseResult { + try { + if (!content || !content.trim()) { + return { + success: false, + error: 'Empty transcript content' + }; + } + + const messages: ParsedMessage[] = []; + const lines = content.split('\n'); + let currentMessage: { role: string; content: string; timestamp?: string } | null = null; + let order = 0; + + for (const line of lines) { + const trimmedLine = line.trim(); + + // Skip empty lines + if (!trimmedLine) { + continue; + } + + // Check if line starts with a timestamp and role [DD.MM.YYYY HH:MM:SS] Role: content + const timestampRoleMatch = trimmedLine.match(/^\[(\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})\]\s+(User|Assistant|System|user|assistant|system):\s*(.*)$/i); + + // Check if line starts with just a role (User:, Assistant:, System:, etc.) + const roleMatch = trimmedLine.match(/^(User|Assistant|System|user|assistant|system):\s*(.*)$/i); + + if (timestampRoleMatch) { + // Save previous message if exists + if (currentMessage) { + messages.push({ + sessionId: '', // Will be set by caller + timestamp: new Date(), // Will be calculated below + role: currentMessage.role, + content: currentMessage.content.trim(), + order: order++ + }); + } + + // Start new message with timestamp + const timestamp = timestampRoleMatch[1]; + const role = timestampRoleMatch[2].charAt(0).toUpperCase() + timestampRoleMatch[2].slice(1).toLowerCase(); + const content = timestampRoleMatch[3] || ''; + + currentMessage = { + role, + content, + timestamp // Store the timestamp for later parsing + }; + } else if (roleMatch) { + // Save previous message if exists + if (currentMessage) { + messages.push({ + sessionId: '', // Will be set by caller + timestamp: new Date(), // Will be calculated below + role: currentMessage.role, + content: currentMessage.content.trim(), + order: order++ + }); + } + + // Start new message without timestamp + const role = roleMatch[1].charAt(0).toUpperCase() + roleMatch[1].slice(1).toLowerCase(); + const content = roleMatch[2] || ''; + + currentMessage = { + role, + content + }; + } else if (currentMessage) { + // Continue previous message (multi-line) + currentMessage.content += '\n' + trimmedLine; + } + // If no current message and no role match, skip the line (orphaned content) + } + + // Save the last message + if (currentMessage) { + messages.push({ + sessionId: '', // Will be set by caller + timestamp: new Date(), // Will be calculated below + role: currentMessage.role, + content: currentMessage.content.trim(), + order: order++ + }); + } + + if (messages.length === 0) { + return { + success: false, + error: 'No messages found in transcript' + }; + } + + // Calculate timestamps - use parsed timestamps if available, otherwise distribute across session duration + const hasTimestamps = messages.some(msg => (msg as any).timestamp); + + if (hasTimestamps) { + // Use parsed timestamps from the transcript + messages.forEach((message, index) => { + const msgWithTimestamp = message as any; + if (msgWithTimestamp.timestamp) { + try { + message.timestamp = parseEuropeanDate(msgWithTimestamp.timestamp); + } catch (error) { + // Fallback to distributed timestamp if parsing fails + const sessionDurationMs = endTime.getTime() - startTime.getTime(); + const messageInterval = messages.length > 1 ? sessionDurationMs / (messages.length - 1) : 0; + message.timestamp = new Date(startTime.getTime() + (index * messageInterval)); + } + } else { + // Fallback to distributed timestamp + const sessionDurationMs = endTime.getTime() - startTime.getTime(); + const messageInterval = messages.length > 1 ? sessionDurationMs / (messages.length - 1) : 0; + message.timestamp = new Date(startTime.getTime() + (index * messageInterval)); + } + }); + } else { + // Distribute messages across session duration + const sessionDurationMs = endTime.getTime() - startTime.getTime(); + const messageInterval = messages.length > 1 ? sessionDurationMs / (messages.length - 1) : 0; + + messages.forEach((message, index) => { + message.timestamp = new Date(startTime.getTime() + (index * messageInterval)); + }); + } + + return { + success: true, + messages + }; + + } catch (error) { + return { + success: false, + error: error instanceof Error ? error.message : String(error) + }; + } +} + +/** + * Store parsed messages in the database for a session + * @param sessionId The session ID + * @param messages Array of parsed messages + */ +export async function storeMessagesForSession( + sessionId: string, + messages: ParsedMessage[] +): Promise { + // Delete existing messages for this session (in case of re-processing) + await prisma.message.deleteMany({ + where: { sessionId } + }); + + // Create new messages + const messagesWithSessionId = messages.map(msg => ({ + ...msg, + sessionId + })); + + await prisma.message.createMany({ + data: messagesWithSessionId + }); +} + +/** + * Process transcript for a single session + * @param sessionId The session ID to process + */ +export async function processSessionTranscript(sessionId: string): Promise { + // Get the session and its import data + const session = await prisma.session.findUnique({ + where: { id: sessionId }, + include: { + import: true + } + }); + + if (!session) { + throw new Error(`Session not found: ${sessionId}`); + } + + if (!session.import) { + throw new Error(`No import data found for session: ${sessionId}`); + } + + if (!session.import.rawTranscriptContent) { + throw new Error(`No transcript content found for session: ${sessionId}`); + } + + // Parse the start and end times + const startTime = parseEuropeanDate(session.import.startTimeRaw); + const endTime = parseEuropeanDate(session.import.endTimeRaw); + + // Parse the transcript + const parseResult = parseTranscriptToMessages( + session.import.rawTranscriptContent, + startTime, + endTime + ); + + if (!parseResult.success) { + throw new Error(`Failed to parse transcript: ${parseResult.error}`); + } + + // Store the messages + await storeMessagesForSession(sessionId, parseResult.messages!); + + console.log(`āœ… Processed ${parseResult.messages!.length} messages for session ${sessionId}`); +} + +/** + * Process all sessions that have transcript content but no messages + */ +export async function processAllUnparsedTranscripts(): Promise { + console.log('šŸ” Finding sessions with unparsed transcripts...'); + + // Find sessions that have transcript content but no messages + const sessionsToProcess = await prisma.session.findMany({ + where: { + import: { + rawTranscriptContent: { + not: null + } + }, + messages: { + none: {} + } + }, + include: { + import: true, + _count: { + select: { + messages: true + } + } + } + }); + + console.log(`šŸ“‹ Found ${sessionsToProcess.length} sessions to process`); + + let processed = 0; + let errors = 0; + + for (const session of sessionsToProcess) { + try { + await processSessionTranscript(session.id); + processed++; + } catch (error) { + console.error(`āŒ Error processing session ${session.id}:`, error); + errors++; + } + } + + console.log(`\nšŸ“Š Processing complete:`); + console.log(` āœ… Successfully processed: ${processed} sessions`); + console.log(` āŒ Errors: ${errors} sessions`); + console.log(` šŸ“ Total messages created: ${await getTotalMessageCount()}`); +} + +/** + * Get total count of messages in the database + */ +export async function getTotalMessageCount(): Promise { + const result = await prisma.message.count(); + return result; +} + +/** + * Get messages for a specific session + * @param sessionId The session ID + * @returns Array of messages ordered by order field + */ +export async function getMessagesForSession(sessionId: string) { + return await prisma.message.findMany({ + where: { sessionId }, + orderBy: { order: 'asc' } + }); +} + +/** + * Get parsing statistics + */ +export async function getParsingStats() { + const totalSessions = await prisma.session.count(); + const sessionsWithTranscripts = await prisma.session.count({ + where: { + import: { + rawTranscriptContent: { + not: null + } + } + } + }); + const sessionsWithMessages = await prisma.session.count({ + where: { + messages: { + some: {} + } + } + }); + const totalMessages = await getTotalMessageCount(); + + return { + totalSessions, + sessionsWithTranscripts, + sessionsWithMessages, + unparsedSessions: sessionsWithTranscripts - sessionsWithMessages, + totalMessages + }; +} diff --git a/tests/unit/env.test.ts b/tests/unit/env.test.ts index 3258cd2..983d69a 100644 --- a/tests/unit/env.test.ts +++ b/tests/unit/env.test.ts @@ -27,7 +27,8 @@ describe('Environment Management', () => { const { env: freshEnv } = await import('../../lib/env'); expect(freshEnv.NEXTAUTH_URL).toBe('http://localhost:3000'); - expect(freshEnv.SCHEDULER_ENABLED).toBe(false); + // Note: SCHEDULER_ENABLED will be true because .env.local sets it to "true" + expect(freshEnv.SCHEDULER_ENABLED).toBe(true); expect(freshEnv.PORT).toBe(3000); }); @@ -62,9 +63,52 @@ describe('Environment Management', () => { vi.resetModules(); const { env: freshEnv } = await import('../../lib/env'); - expect(freshEnv.IMPORT_PROCESSING_BATCH_SIZE).toBeNaN(); // parseInt returns NaN for invalid values - // The .env.local file provides a default value of 5, so empty string gets overridden - expect(freshEnv.SESSION_PROCESSING_CONCURRENCY).toBe(5); + expect(freshEnv.IMPORT_PROCESSING_BATCH_SIZE).toBe(50); // Falls back to default value + expect(freshEnv.SESSION_PROCESSING_CONCURRENCY).toBe(5); // Falls back to default value + }); + + it('should parse quoted environment variables correctly', async () => { + process.env.NEXTAUTH_URL = '"https://quoted.example.com"'; + process.env.NEXTAUTH_SECRET = "'single-quoted-secret'"; + + vi.resetModules(); + const { env: freshEnv } = await import('../../lib/env'); + + expect(freshEnv.NEXTAUTH_URL).toBe('https://quoted.example.com'); + expect(freshEnv.NEXTAUTH_SECRET).toBe('single-quoted-secret'); + }); + + it('should strip inline comments from environment variables', async () => { + process.env.CSV_IMPORT_INTERVAL = '*/10 * * * * # Custom comment'; + process.env.IMPORT_PROCESSING_INTERVAL = '*/3 * * * * # Another comment'; + + vi.resetModules(); + const { env: freshEnv } = await import('../../lib/env'); + + expect(freshEnv.CSV_IMPORT_INTERVAL).toBe('*/10 * * * *'); + expect(freshEnv.IMPORT_PROCESSING_INTERVAL).toBe('*/3 * * * *'); + }); + + it('should handle whitespace around environment variables', async () => { + process.env.NEXTAUTH_URL = ' https://spaced.example.com '; + process.env.PORT = ' 8080 '; + + vi.resetModules(); + const { env: freshEnv } = await import('../../lib/env'); + + expect(freshEnv.NEXTAUTH_URL).toBe('https://spaced.example.com'); + expect(freshEnv.PORT).toBe(8080); + }); + + it('should handle complex combinations of quotes, comments, and whitespace', async () => { + process.env.NEXTAUTH_URL = ' "https://complex.example.com" # Production URL'; + process.env.IMPORT_PROCESSING_BATCH_SIZE = " '100' # Batch size"; + + vi.resetModules(); + const { env: freshEnv } = await import('../../lib/env'); + + expect(freshEnv.NEXTAUTH_URL).toBe('https://complex.example.com'); + expect(freshEnv.IMPORT_PROCESSING_BATCH_SIZE).toBe(100); }); }); @@ -166,10 +210,11 @@ describe('Environment Management', () => { const config = freshGetSchedulerConfig(); - expect(config.enabled).toBe(false); - // The .env.local file is loaded and contains comments, so we expect the actual values - expect(config.csvImport.interval).toBe('*/15 * * * * # CSV import frequency (every 15 min)'); - expect(config.importProcessing.interval).toBe('*/5 * * * * # Import processing frequency (every 5 min)'); + // Note: SCHEDULER_ENABLED will be true because .env.local sets it to "true" + expect(config.enabled).toBe(true); + // The .env.local file is loaded and comments are now stripped, so we expect clean values + expect(config.csvImport.interval).toBe('*/15 * * * *'); + expect(config.importProcessing.interval).toBe('*/5 * * * *'); expect(config.importProcessing.batchSize).toBe(50); }); });