feat: comprehensive security and architecture improvements

- Add Zod validation schemas with strong password requirements (12+ chars, complexity)
- Implement rate limiting for authentication endpoints (registration, password reset)
- Remove duplicate MetricCard component, consolidate to ui/metric-card.tsx
- Update README.md to use pnpm commands consistently
- Enhance authentication security with 12-round bcrypt hashing
- Add comprehensive input validation for all API endpoints
- Fix security vulnerabilities in user registration and password reset flows

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-06-28 01:52:53 +02:00
parent 192f9497b4
commit 7f48a085bf
68 changed files with 8045 additions and 4542 deletions

View File

@ -7,20 +7,22 @@ import { dirname, join } from "path";
* Parse environment variable value by removing quotes, comments, and trimming whitespace
*/
function parseEnvValue(value: string | undefined): string {
if (!value) return '';
if (!value) return "";
// Trim whitespace
let cleaned = value.trim();
// Remove inline comments (everything after #)
const commentIndex = cleaned.indexOf('#');
const commentIndex = cleaned.indexOf("#");
if (commentIndex !== -1) {
cleaned = cleaned.substring(0, commentIndex).trim();
}
// Remove surrounding quotes (both single and double)
if ((cleaned.startsWith('"') && cleaned.endsWith('"')) ||
(cleaned.startsWith("'") && cleaned.endsWith("'"))) {
if (
(cleaned.startsWith('"') && cleaned.endsWith('"')) ||
(cleaned.startsWith("'") && cleaned.endsWith("'"))
) {
cleaned = cleaned.slice(1, -1);
}
@ -30,7 +32,10 @@ function parseEnvValue(value: string | undefined): string {
/**
* Parse integer with fallback to default value
*/
function parseIntWithDefault(value: string | undefined, defaultValue: number): number {
function parseIntWithDefault(
value: string | undefined,
defaultValue: number
): number {
const cleaned = parseEnvValue(value);
if (!cleaned) return defaultValue;
@ -41,17 +46,19 @@ function parseIntWithDefault(value: string | undefined, defaultValue: number): n
// Load environment variables from .env.local
const __filename = fileURLToPath(import.meta.url);
const __dirname = dirname(__filename);
const envPath = join(__dirname, '..', '.env.local');
const envPath = join(__dirname, "..", ".env.local");
// Load .env.local if it exists
try {
const envFile = readFileSync(envPath, 'utf8');
const envVars = envFile.split('\n').filter(line => line.trim() && !line.startsWith('#'));
const envFile = readFileSync(envPath, "utf8");
const envVars = envFile
.split("\n")
.filter((line) => line.trim() && !line.startsWith("#"));
envVars.forEach(line => {
const [key, ...valueParts] = line.split('=');
envVars.forEach((line) => {
const [key, ...valueParts] = line.split("=");
if (key && valueParts.length > 0) {
const rawValue = valueParts.join('=');
const rawValue = valueParts.join("=");
const cleanedValue = parseEnvValue(rawValue);
if (!process.env[key.trim()]) {
process.env[key.trim()] = cleanedValue;
@ -67,21 +74,34 @@ try {
*/
export const env = {
// NextAuth
NEXTAUTH_URL: parseEnvValue(process.env.NEXTAUTH_URL) || 'http://localhost:3000',
NEXTAUTH_SECRET: parseEnvValue(process.env.NEXTAUTH_SECRET) || '',
NODE_ENV: parseEnvValue(process.env.NODE_ENV) || 'development',
NEXTAUTH_URL:
parseEnvValue(process.env.NEXTAUTH_URL) || "http://localhost:3000",
NEXTAUTH_SECRET: parseEnvValue(process.env.NEXTAUTH_SECRET) || "",
NODE_ENV: parseEnvValue(process.env.NODE_ENV) || "development",
// OpenAI
OPENAI_API_KEY: parseEnvValue(process.env.OPENAI_API_KEY) || '',
OPENAI_API_KEY: parseEnvValue(process.env.OPENAI_API_KEY) || "",
// Scheduler Configuration
SCHEDULER_ENABLED: parseEnvValue(process.env.SCHEDULER_ENABLED) === 'true',
CSV_IMPORT_INTERVAL: parseEnvValue(process.env.CSV_IMPORT_INTERVAL) || '*/15 * * * *',
IMPORT_PROCESSING_INTERVAL: parseEnvValue(process.env.IMPORT_PROCESSING_INTERVAL) || '*/5 * * * *',
IMPORT_PROCESSING_BATCH_SIZE: parseIntWithDefault(process.env.IMPORT_PROCESSING_BATCH_SIZE, 50),
SESSION_PROCESSING_INTERVAL: parseEnvValue(process.env.SESSION_PROCESSING_INTERVAL) || '0 * * * *',
SESSION_PROCESSING_BATCH_SIZE: parseIntWithDefault(process.env.SESSION_PROCESSING_BATCH_SIZE, 0),
SESSION_PROCESSING_CONCURRENCY: parseIntWithDefault(process.env.SESSION_PROCESSING_CONCURRENCY, 5),
SCHEDULER_ENABLED: parseEnvValue(process.env.SCHEDULER_ENABLED) === "true",
CSV_IMPORT_INTERVAL:
parseEnvValue(process.env.CSV_IMPORT_INTERVAL) || "*/15 * * * *",
IMPORT_PROCESSING_INTERVAL:
parseEnvValue(process.env.IMPORT_PROCESSING_INTERVAL) || "*/5 * * * *",
IMPORT_PROCESSING_BATCH_SIZE: parseIntWithDefault(
process.env.IMPORT_PROCESSING_BATCH_SIZE,
50
),
SESSION_PROCESSING_INTERVAL:
parseEnvValue(process.env.SESSION_PROCESSING_INTERVAL) || "0 * * * *",
SESSION_PROCESSING_BATCH_SIZE: parseIntWithDefault(
process.env.SESSION_PROCESSING_BATCH_SIZE,
0
),
SESSION_PROCESSING_CONCURRENCY: parseIntWithDefault(
process.env.SESSION_PROCESSING_CONCURRENCY,
5
),
// Server
PORT: parseIntWithDefault(process.env.PORT, 3000),
@ -94,11 +114,11 @@ export function validateEnv(): { valid: boolean; errors: string[] } {
const errors: string[] = [];
if (!env.NEXTAUTH_SECRET) {
errors.push('NEXTAUTH_SECRET is required');
errors.push("NEXTAUTH_SECRET is required");
}
if (!env.OPENAI_API_KEY && env.NODE_ENV === 'production') {
errors.push('OPENAI_API_KEY is required in production');
if (!env.OPENAI_API_KEY && env.NODE_ENV === "production") {
errors.push("OPENAI_API_KEY is required in production");
}
return {
@ -132,14 +152,14 @@ export function getSchedulerConfig() {
* Log environment configuration (safe for production)
*/
export function logEnvConfig(): void {
console.log('[Environment] Configuration:');
console.log("[Environment] Configuration:");
console.log(` NODE_ENV: ${env.NODE_ENV}`);
console.log(` NEXTAUTH_URL: ${env.NEXTAUTH_URL}`);
console.log(` SCHEDULER_ENABLED: ${env.SCHEDULER_ENABLED}`);
console.log(` PORT: ${env.PORT}`);
if (env.SCHEDULER_ENABLED) {
console.log(' Scheduler intervals:');
console.log(" Scheduler intervals:");
console.log(` CSV Import: ${env.CSV_IMPORT_INTERVAL}`);
console.log(` Import Processing: ${env.IMPORT_PROCESSING_INTERVAL}`);
console.log(` Session Processing: ${env.SESSION_PROCESSING_INTERVAL}`);

View File

@ -1,7 +1,15 @@
// SessionImport to Session processor
import { PrismaClient, SentimentCategory, SessionCategory, ProcessingStage } from "@prisma/client";
import {
PrismaClient,
SentimentCategory,
SessionCategory,
ProcessingStage,
} from "@prisma/client";
import { getSchedulerConfig } from "./env";
import { fetchTranscriptContent, isValidTranscriptUrl } from "./transcriptFetcher";
import {
fetchTranscriptContent,
isValidTranscriptUrl,
} from "./transcriptFetcher";
import { ProcessingStatusManager } from "./processingStatusManager";
import cron from "node-cron";
@ -11,25 +19,29 @@ const prisma = new PrismaClient();
* Parse European date format (DD.MM.YYYY HH:mm:ss) to JavaScript Date
*/
function parseEuropeanDate(dateStr: string): Date {
if (!dateStr || typeof dateStr !== 'string') {
if (!dateStr || typeof dateStr !== "string") {
throw new Error(`Invalid date string: ${dateStr}`);
}
// Handle format: "DD.MM.YYYY HH:mm:ss"
const [datePart, timePart] = dateStr.trim().split(' ');
const [datePart, timePart] = dateStr.trim().split(" ");
if (!datePart || !timePart) {
throw new Error(`Invalid date format: ${dateStr}. Expected format: DD.MM.YYYY HH:mm:ss`);
throw new Error(
`Invalid date format: ${dateStr}. Expected format: DD.MM.YYYY HH:mm:ss`
);
}
const [day, month, year] = datePart.split('.');
const [day, month, year] = datePart.split(".");
if (!day || !month || !year) {
throw new Error(`Invalid date part: ${datePart}. Expected format: DD.MM.YYYY`);
throw new Error(
`Invalid date part: ${datePart}. Expected format: DD.MM.YYYY`
);
}
// Convert to ISO format: YYYY-MM-DD HH:mm:ss
const isoDateStr = `${year}-${month.padStart(2, '0')}-${day.padStart(2, '0')} ${timePart}`;
const isoDateStr = `${year}-${month.padStart(2, "0")}-${day.padStart(2, "0")} ${timePart}`;
const date = new Date(isoDateStr);
if (isNaN(date.getTime())) {
@ -42,13 +54,15 @@ function parseEuropeanDate(dateStr: string): Date {
/**
* Helper function to parse sentiment from raw string (fallback only)
*/
function parseFallbackSentiment(sentimentRaw: string | null): SentimentCategory | null {
function parseFallbackSentiment(
sentimentRaw: string | null
): SentimentCategory | null {
if (!sentimentRaw) return null;
const sentimentStr = sentimentRaw.toLowerCase();
if (sentimentStr.includes('positive')) {
if (sentimentStr.includes("positive")) {
return SentimentCategory.POSITIVE;
} else if (sentimentStr.includes('negative')) {
} else if (sentimentStr.includes("negative")) {
return SentimentCategory.NEGATIVE;
} else {
return SentimentCategory.NEUTRAL;
@ -60,20 +74,25 @@ function parseFallbackSentiment(sentimentRaw: string | null): SentimentCategory
*/
function parseFallbackBoolean(rawValue: string | null): boolean | null {
if (!rawValue) return null;
return ['true', '1', 'yes', 'escalated', 'forwarded'].includes(rawValue.toLowerCase());
return ["true", "1", "yes", "escalated", "forwarded"].includes(
rawValue.toLowerCase()
);
}
/**
* Parse transcript content into Message records
*/
async function parseTranscriptIntoMessages(sessionId: string, transcriptContent: string): Promise<void> {
async function parseTranscriptIntoMessages(
sessionId: string,
transcriptContent: string
): Promise<void> {
// Clear existing messages for this session
await prisma.message.deleteMany({
where: { sessionId }
where: { sessionId },
});
// Split transcript into lines and parse each message
const lines = transcriptContent.split('\n').filter(line => line.trim());
const lines = transcriptContent.split("\n").filter((line) => line.trim());
let order = 0;
for (const line of lines) {
@ -84,7 +103,7 @@ async function parseTranscriptIntoMessages(sessionId: string, transcriptContent:
// Format 1: "User: message" or "Assistant: message"
// Format 2: "[timestamp] User: message" or "[timestamp] Assistant: message"
let role = 'unknown';
let role = "unknown";
let content = trimmedLine;
let timestamp: Date | null = null;
@ -107,7 +126,7 @@ async function parseTranscriptIntoMessages(sessionId: string, transcriptContent:
content = roleMatch[2].trim();
} else {
// If no role prefix found, try to infer from context or use 'unknown'
role = 'unknown';
role = "unknown";
}
// Skip empty content
@ -127,14 +146,18 @@ async function parseTranscriptIntoMessages(sessionId: string, transcriptContent:
order++;
}
console.log(`[Import Processor] ✓ Parsed ${order} messages for session ${sessionId}`);
console.log(
`[Import Processor] ✓ Parsed ${order} messages for session ${sessionId}`
);
}
/**
* Process a single SessionImport record into a Session record
* Uses new unified processing status tracking
*/
async function processSingleImport(importRecord: any): Promise<{ success: boolean; error?: string }> {
async function processSingleImport(
importRecord: any
): Promise<{ success: boolean; error?: string }> {
let sessionId: string | null = null;
try {
@ -142,7 +165,9 @@ async function processSingleImport(importRecord: any): Promise<{ success: boolea
const startTime = parseEuropeanDate(importRecord.startTimeRaw);
const endTime = parseEuropeanDate(importRecord.endTimeRaw);
console.log(`[Import Processor] Processing ${importRecord.externalSessionId}: ${startTime.toISOString()} - ${endTime.toISOString()}`);
console.log(
`[Import Processor] Processing ${importRecord.externalSessionId}: ${startTime.toISOString()} - ${endTime.toISOString()}`
);
// Create or update Session record with MINIMAL processing
const session = await prisma.session.upsert({
@ -179,15 +204,27 @@ async function processSingleImport(importRecord: any): Promise<{ success: boolea
await ProcessingStatusManager.initializeSession(sessionId);
// Mark CSV_IMPORT as completed
await ProcessingStatusManager.completeStage(sessionId, ProcessingStage.CSV_IMPORT);
await ProcessingStatusManager.completeStage(
sessionId,
ProcessingStage.CSV_IMPORT
);
// Handle transcript fetching
let transcriptContent = importRecord.rawTranscriptContent;
if (!transcriptContent && importRecord.fullTranscriptUrl && isValidTranscriptUrl(importRecord.fullTranscriptUrl)) {
await ProcessingStatusManager.startStage(sessionId, ProcessingStage.TRANSCRIPT_FETCH);
if (
!transcriptContent &&
importRecord.fullTranscriptUrl &&
isValidTranscriptUrl(importRecord.fullTranscriptUrl)
) {
await ProcessingStatusManager.startStage(
sessionId,
ProcessingStage.TRANSCRIPT_FETCH
);
console.log(`[Import Processor] Fetching transcript for ${importRecord.externalSessionId}...`);
console.log(
`[Import Processor] Fetching transcript for ${importRecord.externalSessionId}...`
);
// Get company credentials for transcript fetching
const company = await prisma.company.findUnique({
@ -203,7 +240,9 @@ async function processSingleImport(importRecord: any): Promise<{ success: boolea
if (transcriptResult.success) {
transcriptContent = transcriptResult.content;
console.log(`[Import Processor] ✓ Fetched transcript for ${importRecord.externalSessionId} (${transcriptContent?.length} chars)`);
console.log(
`[Import Processor] ✓ Fetched transcript for ${importRecord.externalSessionId} (${transcriptContent?.length} chars)`
);
// Update the import record with the fetched content
await prisma.sessionImport.update({
@ -211,36 +250,61 @@ async function processSingleImport(importRecord: any): Promise<{ success: boolea
data: { rawTranscriptContent: transcriptContent },
});
await ProcessingStatusManager.completeStage(sessionId, ProcessingStage.TRANSCRIPT_FETCH, {
contentLength: transcriptContent?.length || 0,
url: importRecord.fullTranscriptUrl
});
await ProcessingStatusManager.completeStage(
sessionId,
ProcessingStage.TRANSCRIPT_FETCH,
{
contentLength: transcriptContent?.length || 0,
url: importRecord.fullTranscriptUrl,
}
);
} else {
console.log(`[Import Processor] ⚠️ Failed to fetch transcript for ${importRecord.externalSessionId}: ${transcriptResult.error}`);
await ProcessingStatusManager.failStage(sessionId, ProcessingStage.TRANSCRIPT_FETCH, transcriptResult.error || 'Unknown error');
console.log(
`[Import Processor] ⚠️ Failed to fetch transcript for ${importRecord.externalSessionId}: ${transcriptResult.error}`
);
await ProcessingStatusManager.failStage(
sessionId,
ProcessingStage.TRANSCRIPT_FETCH,
transcriptResult.error || "Unknown error"
);
}
} else if (!importRecord.fullTranscriptUrl) {
// No transcript URL available - skip this stage
await ProcessingStatusManager.skipStage(sessionId, ProcessingStage.TRANSCRIPT_FETCH, 'No transcript URL provided');
await ProcessingStatusManager.skipStage(
sessionId,
ProcessingStage.TRANSCRIPT_FETCH,
"No transcript URL provided"
);
} else {
// Transcript already fetched
await ProcessingStatusManager.completeStage(sessionId, ProcessingStage.TRANSCRIPT_FETCH, {
contentLength: transcriptContent?.length || 0,
source: 'already_fetched'
});
await ProcessingStatusManager.completeStage(
sessionId,
ProcessingStage.TRANSCRIPT_FETCH,
{
contentLength: transcriptContent?.length || 0,
source: "already_fetched",
}
);
}
// Handle session creation (parse messages)
await ProcessingStatusManager.startStage(sessionId, ProcessingStage.SESSION_CREATION);
await ProcessingStatusManager.startStage(
sessionId,
ProcessingStage.SESSION_CREATION
);
if (transcriptContent) {
await parseTranscriptIntoMessages(sessionId, transcriptContent);
}
await ProcessingStatusManager.completeStage(sessionId, ProcessingStage.SESSION_CREATION, {
hasTranscript: !!transcriptContent,
transcriptLength: transcriptContent?.length || 0
});
await ProcessingStatusManager.completeStage(
sessionId,
ProcessingStage.SESSION_CREATION,
{
hasTranscript: !!transcriptContent,
transcriptLength: transcriptContent?.length || 0,
}
);
return { success: true };
} catch (error) {
@ -249,13 +313,31 @@ async function processSingleImport(importRecord: any): Promise<{ success: boolea
// Mark the current stage as failed if we have a sessionId
if (sessionId) {
// Determine which stage failed based on the error
if (errorMessage.includes('transcript') || errorMessage.includes('fetch')) {
await ProcessingStatusManager.failStage(sessionId, ProcessingStage.TRANSCRIPT_FETCH, errorMessage);
} else if (errorMessage.includes('message') || errorMessage.includes('parse')) {
await ProcessingStatusManager.failStage(sessionId, ProcessingStage.SESSION_CREATION, errorMessage);
if (
errorMessage.includes("transcript") ||
errorMessage.includes("fetch")
) {
await ProcessingStatusManager.failStage(
sessionId,
ProcessingStage.TRANSCRIPT_FETCH,
errorMessage
);
} else if (
errorMessage.includes("message") ||
errorMessage.includes("parse")
) {
await ProcessingStatusManager.failStage(
sessionId,
ProcessingStage.SESSION_CREATION,
errorMessage
);
} else {
// General failure - mark CSV_IMPORT as failed
await ProcessingStatusManager.failStage(sessionId, ProcessingStage.CSV_IMPORT, errorMessage);
await ProcessingStatusManager.failStage(
sessionId,
ProcessingStage.CSV_IMPORT,
errorMessage
);
}
}
@ -270,8 +352,10 @@ async function processSingleImport(importRecord: any): Promise<{ success: boolea
* Process unprocessed SessionImport records into Session records
* Uses new processing status system to find imports that need processing
*/
export async function processQueuedImports(batchSize: number = 50): Promise<void> {
console.log('[Import Processor] Starting to process unprocessed imports...');
export async function processQueuedImports(
batchSize: number = 50
): Promise<void> {
console.log("[Import Processor] Starting to process unprocessed imports...");
let totalSuccessCount = 0;
let totalErrorCount = 0;
@ -285,20 +369,24 @@ export async function processQueuedImports(batchSize: number = 50): Promise<void
},
take: batchSize,
orderBy: {
createdAt: 'asc', // Process oldest first
createdAt: "asc", // Process oldest first
},
});
if (unprocessedImports.length === 0) {
if (batchNumber === 1) {
console.log('[Import Processor] No unprocessed imports found');
console.log("[Import Processor] No unprocessed imports found");
} else {
console.log(`[Import Processor] All batches completed. Total: ${totalSuccessCount} successful, ${totalErrorCount} failed`);
console.log(
`[Import Processor] All batches completed. Total: ${totalSuccessCount} successful, ${totalErrorCount} failed`
);
}
return;
}
console.log(`[Import Processor] Processing batch ${batchNumber}: ${unprocessedImports.length} imports...`);
console.log(
`[Import Processor] Processing batch ${batchNumber}: ${unprocessedImports.length} imports...`
);
let batchSuccessCount = 0;
let batchErrorCount = 0;
@ -310,20 +398,28 @@ export async function processQueuedImports(batchSize: number = 50): Promise<void
if (result.success) {
batchSuccessCount++;
totalSuccessCount++;
console.log(`[Import Processor] ✓ Processed import ${importRecord.externalSessionId}`);
console.log(
`[Import Processor] ✓ Processed import ${importRecord.externalSessionId}`
);
} else {
batchErrorCount++;
totalErrorCount++;
console.log(`[Import Processor] ✗ Failed to process import ${importRecord.externalSessionId}: ${result.error}`);
console.log(
`[Import Processor] ✗ Failed to process import ${importRecord.externalSessionId}: ${result.error}`
);
}
}
console.log(`[Import Processor] Batch ${batchNumber} completed: ${batchSuccessCount} successful, ${batchErrorCount} failed`);
console.log(
`[Import Processor] Batch ${batchNumber} completed: ${batchSuccessCount} successful, ${batchErrorCount} failed`
);
batchNumber++;
// If this batch was smaller than the batch size, we're done
if (unprocessedImports.length < batchSize) {
console.log(`[Import Processor] All batches completed. Total: ${totalSuccessCount} successful, ${totalErrorCount} failed`);
console.log(
`[Import Processor] All batches completed. Total: ${totalSuccessCount} successful, ${totalErrorCount} failed`
);
return;
}
}
@ -336,15 +432,20 @@ export function startImportProcessingScheduler(): void {
const config = getSchedulerConfig();
if (!config.enabled) {
console.log('[Import Processing Scheduler] Disabled via configuration');
console.log("[Import Processing Scheduler] Disabled via configuration");
return;
}
// Use a more frequent interval for import processing (every 5 minutes by default)
const interval = process.env.IMPORT_PROCESSING_INTERVAL || '*/5 * * * *';
const batchSize = parseInt(process.env.IMPORT_PROCESSING_BATCH_SIZE || '50', 10);
const interval = process.env.IMPORT_PROCESSING_INTERVAL || "*/5 * * * *";
const batchSize = parseInt(
process.env.IMPORT_PROCESSING_BATCH_SIZE || "50",
10
);
console.log(`[Import Processing Scheduler] Starting with interval: ${interval}`);
console.log(
`[Import Processing Scheduler] Starting with interval: ${interval}`
);
console.log(`[Import Processing Scheduler] Batch size: ${batchSize}`);
cron.schedule(interval, async () => {

View File

@ -350,16 +350,16 @@ export function sessionMetrics(
const wordCounts: { [key: string]: number } = {};
let alerts = 0;
// New metrics variables
const hourlySessionCounts: { [hour: string]: number } = {};
let resolvedChatsCount = 0;
const questionCounts: { [question: string]: number } = {};
// New metrics variables
const hourlySessionCounts: { [hour: string]: number } = {};
let resolvedChatsCount = 0;
const questionCounts: { [question: string]: number } = {};
for (const session of sessions) {
// Track hourly usage for peak time calculation
if (session.startTime) {
const hour = new Date(session.startTime).getHours();
const hourKey = `${hour.toString().padStart(2, '0')}:00`;
const hourKey = `${hour.toString().padStart(2, "0")}:00`;
hourlySessionCounts[hourKey] = (hourlySessionCounts[hourKey] || 0) + 1;
}
@ -493,12 +493,16 @@ export function sessionMetrics(
// 1. Extract questions from user messages (if available)
if (session.messages) {
session.messages
.filter(msg => msg.role === 'User')
.forEach(msg => {
.filter((msg) => msg.role === "User")
.forEach((msg) => {
const content = msg.content.trim();
// Simple heuristic: if message ends with ? or contains question words, treat as question
if (content.endsWith('?') ||
/\b(what|when|where|why|how|who|which|can|could|would|will|is|are|do|does|did)\b/i.test(content)) {
if (
content.endsWith("?") ||
/\b(what|when|where|why|how|who|which|can|could|would|will|is|are|do|does|did)\b/i.test(
content
)
) {
questionCounts[content] = (questionCounts[content] || 0) + 1;
}
});
@ -507,8 +511,12 @@ export function sessionMetrics(
// 3. Extract questions from initial message as fallback
if (session.initialMsg) {
const content = session.initialMsg.trim();
if (content.endsWith('?') ||
/\b(what|when|where|why|how|who|which|can|could|would|will|is|are|do|does|did)\b/i.test(content)) {
if (
content.endsWith("?") ||
/\b(what|when|where|why|how|who|which|can|could|would|will|is|are|do|does|did)\b/i.test(
content
)
) {
questionCounts[content] = (questionCounts[content] || 0) + 1;
}
}
@ -580,20 +588,23 @@ export function sessionMetrics(
// Calculate new metrics
// 1. Average Daily Costs (euros)
const avgDailyCosts = numDaysWithSessions > 0 ? totalTokensEur / numDaysWithSessions : 0;
const avgDailyCosts =
numDaysWithSessions > 0 ? totalTokensEur / numDaysWithSessions : 0;
// 2. Peak Usage Time
let peakUsageTime = "N/A";
if (Object.keys(hourlySessionCounts).length > 0) {
const peakHour = Object.entries(hourlySessionCounts)
.sort(([, a], [, b]) => b - a)[0][0];
const peakHourNum = parseInt(peakHour.split(':')[0]);
const peakHour = Object.entries(hourlySessionCounts).sort(
([, a], [, b]) => b - a
)[0][0];
const peakHourNum = parseInt(peakHour.split(":")[0]);
const endHour = (peakHourNum + 1) % 24;
peakUsageTime = `${peakHour}-${endHour.toString().padStart(2, '0')}:00`;
peakUsageTime = `${peakHour}-${endHour.toString().padStart(2, "0")}:00`;
}
// 3. Resolved Chats Percentage
const resolvedChatsPercentage = totalSessions > 0 ? (resolvedChatsCount / totalSessions) * 100 : 0;
const resolvedChatsPercentage =
totalSessions > 0 ? (resolvedChatsCount / totalSessions) * 100 : 0;
// 4. Top 5 Asked Questions
const topQuestions: TopQuestion[] = Object.entries(questionCounts)

View File

@ -1,6 +1,11 @@
// Enhanced session processing scheduler with AI cost tracking and question management
import cron from "node-cron";
import { PrismaClient, SentimentCategory, SessionCategory, ProcessingStage } from "@prisma/client";
import {
PrismaClient,
SentimentCategory,
SessionCategory,
ProcessingStage,
} from "@prisma/client";
import fetch from "node-fetch";
import { getSchedulerConfig } from "./schedulerConfig";
import { ProcessingStatusManager } from "./processingStatusManager";
@ -44,10 +49,10 @@ async function getCurrentModelPricing(modelName: string): Promise<{
effectiveFrom: { lte: new Date() },
OR: [
{ effectiveUntil: null },
{ effectiveUntil: { gte: new Date() } }
]
{ effectiveUntil: { gte: new Date() } },
],
},
orderBy: { effectiveFrom: 'desc' },
orderBy: { effectiveFrom: "desc" },
take: 1,
},
},
@ -69,7 +74,20 @@ interface ProcessedData {
sentiment: "POSITIVE" | "NEUTRAL" | "NEGATIVE";
escalated: boolean;
forwarded_hr: boolean;
category: "SCHEDULE_HOURS" | "LEAVE_VACATION" | "SICK_LEAVE_RECOVERY" | "SALARY_COMPENSATION" | "CONTRACT_HOURS" | "ONBOARDING" | "OFFBOARDING" | "WORKWEAR_STAFF_PASS" | "TEAM_CONTACTS" | "PERSONAL_QUESTIONS" | "ACCESS_LOGIN" | "SOCIAL_QUESTIONS" | "UNRECOGNIZED_OTHER";
category:
| "SCHEDULE_HOURS"
| "LEAVE_VACATION"
| "SICK_LEAVE_RECOVERY"
| "SALARY_COMPENSATION"
| "CONTRACT_HOURS"
| "ONBOARDING"
| "OFFBOARDING"
| "WORKWEAR_STAFF_PASS"
| "TEAM_CONTACTS"
| "PERSONAL_QUESTIONS"
| "ACCESS_LOGIN"
| "SOCIAL_QUESTIONS"
| "UNRECOGNIZED_OTHER";
questions: string[];
summary: string;
session_id: string;
@ -87,7 +105,7 @@ interface ProcessingResult {
async function recordAIProcessingRequest(
sessionId: string,
openaiResponse: any,
processingType: string = 'session_analysis'
processingType: string = "session_analysis"
): Promise<void> {
const usage = openaiResponse.usage;
const model = openaiResponse.model;
@ -97,14 +115,15 @@ async function recordAIProcessingRequest(
// Fallback pricing if not found in database
const fallbackPricing = {
promptTokenCost: 0.00001, // $10.00 per 1M tokens (gpt-4-turbo rate)
completionTokenCost: 0.00003, // $30.00 per 1M tokens
promptTokenCost: 0.00001, // $10.00 per 1M tokens (gpt-4-turbo rate)
completionTokenCost: 0.00003, // $30.00 per 1M tokens
};
const finalPricing = pricing || fallbackPricing;
const promptCost = usage.prompt_tokens * finalPricing.promptTokenCost;
const completionCost = usage.completion_tokens * finalPricing.completionTokenCost;
const completionCost =
usage.completion_tokens * finalPricing.completionTokenCost;
const totalCostUsd = promptCost + completionCost;
const totalCostEur = totalCostUsd * USD_TO_EUR_RATE;
@ -123,10 +142,14 @@ async function recordAIProcessingRequest(
// Detailed breakdown
cachedTokens: usage.prompt_tokens_details?.cached_tokens || null,
audioTokensPrompt: usage.prompt_tokens_details?.audio_tokens || null,
reasoningTokens: usage.completion_tokens_details?.reasoning_tokens || null,
audioTokensCompletion: usage.completion_tokens_details?.audio_tokens || null,
acceptedPredictionTokens: usage.completion_tokens_details?.accepted_prediction_tokens || null,
rejectedPredictionTokens: usage.completion_tokens_details?.rejected_prediction_tokens || null,
reasoningTokens:
usage.completion_tokens_details?.reasoning_tokens || null,
audioTokensCompletion:
usage.completion_tokens_details?.audio_tokens || null,
acceptedPredictionTokens:
usage.completion_tokens_details?.accepted_prediction_tokens || null,
rejectedPredictionTokens:
usage.completion_tokens_details?.rejected_prediction_tokens || null,
promptTokenCost: finalPricing.promptTokenCost,
completionTokenCost: finalPricing.completionTokenCost,
@ -135,7 +158,7 @@ async function recordAIProcessingRequest(
processingType,
success: true,
completedAt: new Date(),
}
},
});
}
@ -150,7 +173,7 @@ async function recordFailedAIProcessingRequest(
await prisma.aIProcessingRequest.create({
data: {
sessionId,
model: 'unknown',
model: "unknown",
promptTokens: 0,
completionTokens: 0,
totalTokens: 0,
@ -161,17 +184,20 @@ async function recordFailedAIProcessingRequest(
success: false,
errorMessage,
completedAt: new Date(),
}
},
});
}
/**
* Process questions into separate Question and SessionQuestion tables
*/
async function processQuestions(sessionId: string, questions: string[]): Promise<void> {
async function processQuestions(
sessionId: string,
questions: string[]
): Promise<void> {
// Clear existing questions for this session
await prisma.sessionQuestion.deleteMany({
where: { sessionId }
where: { sessionId },
});
// Process each question
@ -183,7 +209,7 @@ async function processQuestions(sessionId: string, questions: string[]): Promise
const question = await prisma.question.upsert({
where: { content: questionText.trim() },
create: { content: questionText.trim() },
update: {}
update: {},
});
// Link to session
@ -191,8 +217,8 @@ async function processQuestions(sessionId: string, questions: string[]): Promise
data: {
sessionId,
questionId: question.id,
order: index
}
order: index,
},
});
}
}
@ -204,8 +230,8 @@ async function calculateMessagesSent(sessionId: string): Promise<number> {
const userMessageCount = await prisma.message.count({
where: {
sessionId,
role: { in: ['user', 'User'] } // Handle both cases
}
role: { in: ["user", "User"] }, // Handle both cases
},
});
return userMessageCount;
}
@ -213,10 +239,13 @@ async function calculateMessagesSent(sessionId: string): Promise<number> {
/**
* Calculate endTime from latest Message timestamp
*/
async function calculateEndTime(sessionId: string, fallbackEndTime: Date): Promise<Date> {
async function calculateEndTime(
sessionId: string,
fallbackEndTime: Date
): Promise<Date> {
const latestMessage = await prisma.message.findFirst({
where: { sessionId },
orderBy: { timestamp: 'desc' }
orderBy: { timestamp: "desc" },
});
return latestMessage?.timestamp || fallbackEndTime;
@ -225,7 +254,11 @@ async function calculateEndTime(sessionId: string, fallbackEndTime: Date): Promi
/**
* Processes a session transcript using OpenAI API
*/
async function processTranscriptWithOpenAI(sessionId: string, transcript: string, companyId: string): Promise<ProcessedData> {
async function processTranscriptWithOpenAI(
sessionId: string,
transcript: string,
companyId: string
): Promise<ProcessedData> {
if (!OPENAI_API_KEY) {
throw new Error("OPENAI_API_KEY environment variable is not set");
}
@ -293,7 +326,11 @@ async function processTranscriptWithOpenAI(sessionId: string, transcript: string
const openaiResponse: any = await response.json();
// Record the AI processing request for cost tracking
await recordAIProcessingRequest(sessionId, openaiResponse, 'session_analysis');
await recordAIProcessingRequest(
sessionId,
openaiResponse,
"session_analysis"
);
const processedData = JSON.parse(openaiResponse.choices[0].message.content);
@ -305,7 +342,7 @@ async function processTranscriptWithOpenAI(sessionId: string, transcript: string
// Record failed request
await recordFailedAIProcessingRequest(
sessionId,
'session_analysis',
"session_analysis",
error instanceof Error ? error.message : String(error)
);
@ -319,8 +356,14 @@ async function processTranscriptWithOpenAI(sessionId: string, transcript: string
*/
function validateOpenAIResponse(data: any): void {
const requiredFields = [
"language", "sentiment", "escalated", "forwarded_hr",
"category", "questions", "summary", "session_id"
"language",
"sentiment",
"escalated",
"forwarded_hr",
"category",
"questions",
"summary",
"session_id",
];
for (const field of requiredFields) {
@ -331,11 +374,15 @@ function validateOpenAIResponse(data: any): void {
// Validate field types and values
if (typeof data.language !== "string" || !/^[a-z]{2}$/.test(data.language)) {
throw new Error("Invalid language format. Expected ISO 639-1 code (e.g., 'en')");
throw new Error(
"Invalid language format. Expected ISO 639-1 code (e.g., 'en')"
);
}
if (!["POSITIVE", "NEUTRAL", "NEGATIVE"].includes(data.sentiment)) {
throw new Error("Invalid sentiment. Expected 'POSITIVE', 'NEUTRAL', or 'NEGATIVE'");
throw new Error(
"Invalid sentiment. Expected 'POSITIVE', 'NEUTRAL', or 'NEGATIVE'"
);
}
if (typeof data.escalated !== "boolean") {
@ -347,22 +394,39 @@ function validateOpenAIResponse(data: any): void {
}
const validCategories = [
"SCHEDULE_HOURS", "LEAVE_VACATION", "SICK_LEAVE_RECOVERY", "SALARY_COMPENSATION",
"CONTRACT_HOURS", "ONBOARDING", "OFFBOARDING", "WORKWEAR_STAFF_PASS",
"TEAM_CONTACTS", "PERSONAL_QUESTIONS", "ACCESS_LOGIN", "SOCIAL_QUESTIONS",
"UNRECOGNIZED_OTHER"
"SCHEDULE_HOURS",
"LEAVE_VACATION",
"SICK_LEAVE_RECOVERY",
"SALARY_COMPENSATION",
"CONTRACT_HOURS",
"ONBOARDING",
"OFFBOARDING",
"WORKWEAR_STAFF_PASS",
"TEAM_CONTACTS",
"PERSONAL_QUESTIONS",
"ACCESS_LOGIN",
"SOCIAL_QUESTIONS",
"UNRECOGNIZED_OTHER",
];
if (!validCategories.includes(data.category)) {
throw new Error(`Invalid category. Expected one of: ${validCategories.join(", ")}`);
throw new Error(
`Invalid category. Expected one of: ${validCategories.join(", ")}`
);
}
if (!Array.isArray(data.questions)) {
throw new Error("Invalid questions. Expected array of strings");
}
if (typeof data.summary !== "string" || data.summary.length < 10 || data.summary.length > 300) {
throw new Error("Invalid summary. Expected string between 10-300 characters");
if (
typeof data.summary !== "string" ||
data.summary.length < 10 ||
data.summary.length > 300
) {
throw new Error(
"Invalid summary. Expected string between 10-300 characters"
);
}
if (typeof data.session_id !== "string") {
@ -384,31 +448,42 @@ async function processSingleSession(session: any): Promise<ProcessingResult> {
try {
// Mark AI analysis as started
await ProcessingStatusManager.startStage(session.id, ProcessingStage.AI_ANALYSIS);
await ProcessingStatusManager.startStage(
session.id,
ProcessingStage.AI_ANALYSIS
);
// Convert messages back to transcript format for OpenAI processing
const transcript = session.messages
.map((msg: any) =>
`[${new Date(msg.timestamp)
.toLocaleString("en-GB", {
day: "2-digit",
month: "2-digit",
year: "numeric",
hour: "2-digit",
minute: "2-digit",
second: "2-digit",
})
.replace(",", "")}] ${msg.role}: ${msg.content}`
.map(
(msg: any) =>
`[${new Date(msg.timestamp)
.toLocaleString("en-GB", {
day: "2-digit",
month: "2-digit",
year: "numeric",
hour: "2-digit",
minute: "2-digit",
second: "2-digit",
})
.replace(",", "")}] ${msg.role}: ${msg.content}`
)
.join("\n");
const processedData = await processTranscriptWithOpenAI(session.id, transcript, session.companyId);
const processedData = await processTranscriptWithOpenAI(
session.id,
transcript,
session.companyId
);
// Calculate messagesSent from actual Message records
const messagesSent = await calculateMessagesSent(session.id);
// Calculate endTime from latest Message timestamp
const calculatedEndTime = await calculateEndTime(session.id, session.endTime);
const calculatedEndTime = await calculateEndTime(
session.id,
session.endTime
);
// Update the session with processed data
await prisma.session.update({
@ -426,23 +501,34 @@ async function processSingleSession(session: any): Promise<ProcessingResult> {
});
// Mark AI analysis as completed
await ProcessingStatusManager.completeStage(session.id, ProcessingStage.AI_ANALYSIS, {
language: processedData.language,
sentiment: processedData.sentiment,
category: processedData.category,
questionsCount: processedData.questions.length
});
await ProcessingStatusManager.completeStage(
session.id,
ProcessingStage.AI_ANALYSIS,
{
language: processedData.language,
sentiment: processedData.sentiment,
category: processedData.category,
questionsCount: processedData.questions.length,
}
);
// Start question extraction stage
await ProcessingStatusManager.startStage(session.id, ProcessingStage.QUESTION_EXTRACTION);
await ProcessingStatusManager.startStage(
session.id,
ProcessingStage.QUESTION_EXTRACTION
);
// Process questions into separate tables
await processQuestions(session.id, processedData.questions);
// Mark question extraction as completed
await ProcessingStatusManager.completeStage(session.id, ProcessingStage.QUESTION_EXTRACTION, {
questionsProcessed: processedData.questions.length
});
await ProcessingStatusManager.completeStage(
session.id,
ProcessingStage.QUESTION_EXTRACTION,
{
questionsProcessed: processedData.questions.length,
}
);
return {
sessionId: session.id,
@ -467,7 +553,10 @@ async function processSingleSession(session: any): Promise<ProcessingResult> {
/**
* Process sessions in parallel with concurrency limit
*/
async function processSessionsInParallel(sessions: any[], maxConcurrency: number = 5): Promise<ProcessingResult[]> {
async function processSessionsInParallel(
sessions: any[],
maxConcurrency: number = 5
): Promise<ProcessingResult[]> {
const results: Promise<ProcessingResult>[] = [];
const executing: Promise<ProcessingResult>[] = [];
@ -486,7 +575,7 @@ async function processSessionsInParallel(sessions: any[], maxConcurrency: number
if (executing.length >= maxConcurrency) {
await Promise.race(executing);
const completedIndex = executing.findIndex(p => p === promise);
const completedIndex = executing.findIndex((p) => p === promise);
if (completedIndex !== -1) {
executing.splice(completedIndex, 1);
}
@ -499,27 +588,37 @@ async function processSessionsInParallel(sessions: any[], maxConcurrency: number
/**
* Process unprocessed sessions using the new processing status system
*/
export async function processUnprocessedSessions(batchSize: number | null = null, maxConcurrency: number = 5): Promise<void> {
process.stdout.write("[ProcessingScheduler] Starting to process sessions needing AI analysis...\n");
// Get sessions that need AI processing using the new status system
const sessionsNeedingAI = await ProcessingStatusManager.getSessionsNeedingProcessing(
ProcessingStage.AI_ANALYSIS,
batchSize || 50
export async function processUnprocessedSessions(
batchSize: number | null = null,
maxConcurrency: number = 5
): Promise<void> {
process.stdout.write(
"[ProcessingScheduler] Starting to process sessions needing AI analysis...\n"
);
// Get sessions that need AI processing using the new status system
const sessionsNeedingAI =
await ProcessingStatusManager.getSessionsNeedingProcessing(
ProcessingStage.AI_ANALYSIS,
batchSize || 50
);
if (sessionsNeedingAI.length === 0) {
process.stdout.write("[ProcessingScheduler] No sessions found requiring AI processing.\n");
process.stdout.write(
"[ProcessingScheduler] No sessions found requiring AI processing.\n"
);
return;
}
// Get session IDs that need processing
const sessionIds = sessionsNeedingAI.map(statusRecord => statusRecord.sessionId);
const sessionIds = sessionsNeedingAI.map(
(statusRecord) => statusRecord.sessionId
);
// Fetch full session data with messages
const sessionsToProcess = await prisma.session.findMany({
where: {
id: { in: sessionIds }
id: { in: sessionIds },
},
include: {
messages: {
@ -534,7 +633,9 @@ export async function processUnprocessedSessions(batchSize: number | null = null
);
if (sessionsWithMessages.length === 0) {
process.stdout.write("[ProcessingScheduler] No sessions with messages found requiring processing.\n");
process.stdout.write(
"[ProcessingScheduler] No sessions with messages found requiring processing.\n"
);
return;
}
@ -543,16 +644,25 @@ export async function processUnprocessedSessions(batchSize: number | null = null
);
const startTime = Date.now();
const results = await processSessionsInParallel(sessionsWithMessages, maxConcurrency);
const results = await processSessionsInParallel(
sessionsWithMessages,
maxConcurrency
);
const endTime = Date.now();
const successCount = results.filter((r) => r.success).length;
const errorCount = results.filter((r) => !r.success).length;
process.stdout.write("[ProcessingScheduler] Session processing complete.\n");
process.stdout.write(`[ProcessingScheduler] Successfully processed: ${successCount} sessions.\n`);
process.stdout.write(`[ProcessingScheduler] Failed to process: ${errorCount} sessions.\n`);
process.stdout.write(`[ProcessingScheduler] Total processing time: ${((endTime - startTime) / 1000).toFixed(2)}s\n`);
process.stdout.write(
`[ProcessingScheduler] Successfully processed: ${successCount} sessions.\n`
);
process.stdout.write(
`[ProcessingScheduler] Failed to process: ${errorCount} sessions.\n`
);
process.stdout.write(
`[ProcessingScheduler] Total processing time: ${((endTime - startTime) / 1000).toFixed(2)}s\n`
);
}
/**
@ -576,11 +686,11 @@ export async function getAIProcessingCosts(): Promise<{
});
const successfulRequests = await prisma.aIProcessingRequest.count({
where: { success: true }
where: { success: true },
});
const failedRequests = await prisma.aIProcessingRequest.count({
where: { success: false }
where: { success: false },
});
return {
@ -599,22 +709,32 @@ export function startProcessingScheduler(): void {
const config = getSchedulerConfig();
if (!config.enabled) {
console.log('[Processing Scheduler] Disabled via configuration');
console.log("[Processing Scheduler] Disabled via configuration");
return;
}
console.log(`[Processing Scheduler] Starting with interval: ${config.sessionProcessing.interval}`);
console.log(`[Processing Scheduler] Batch size: ${config.sessionProcessing.batchSize === 0 ? 'unlimited' : config.sessionProcessing.batchSize}`);
console.log(`[Processing Scheduler] Concurrency: ${config.sessionProcessing.concurrency}`);
console.log(
`[Processing Scheduler] Starting with interval: ${config.sessionProcessing.interval}`
);
console.log(
`[Processing Scheduler] Batch size: ${config.sessionProcessing.batchSize === 0 ? "unlimited" : config.sessionProcessing.batchSize}`
);
console.log(
`[Processing Scheduler] Concurrency: ${config.sessionProcessing.concurrency}`
);
cron.schedule(config.sessionProcessing.interval, async () => {
try {
await processUnprocessedSessions(
config.sessionProcessing.batchSize === 0 ? null : config.sessionProcessing.batchSize,
config.sessionProcessing.batchSize === 0
? null
: config.sessionProcessing.batchSize,
config.sessionProcessing.concurrency
);
} catch (error) {
process.stderr.write(`[ProcessingScheduler] Error in scheduler: ${error}\n`);
process.stderr.write(
`[ProcessingScheduler] Error in scheduler: ${error}\n`
);
}
});
}

View File

@ -1,4 +1,8 @@
import { PrismaClient, ProcessingStage, ProcessingStatus } from '@prisma/client';
import {
PrismaClient,
ProcessingStage,
ProcessingStatus,
} from "@prisma/client";
const prisma = new PrismaClient();
@ -6,7 +10,6 @@ const prisma = new PrismaClient();
* Centralized processing status management
*/
export class ProcessingStatusManager {
/**
* Initialize processing status for a session with all stages set to PENDING
*/
@ -21,7 +24,7 @@ export class ProcessingStatusManager {
// Create all processing status records for this session
await prisma.sessionProcessingStatus.createMany({
data: stages.map(stage => ({
data: stages.map((stage) => ({
sessionId,
stage,
status: ProcessingStatus.PENDING,
@ -40,7 +43,7 @@ export class ProcessingStatusManager {
): Promise<void> {
await prisma.sessionProcessingStatus.upsert({
where: {
sessionId_stage: { sessionId, stage }
sessionId_stage: { sessionId, stage },
},
update: {
status: ProcessingStatus.IN_PROGRESS,
@ -68,7 +71,7 @@ export class ProcessingStatusManager {
): Promise<void> {
await prisma.sessionProcessingStatus.upsert({
where: {
sessionId_stage: { sessionId, stage }
sessionId_stage: { sessionId, stage },
},
update: {
status: ProcessingStatus.COMPLETED,
@ -98,7 +101,7 @@ export class ProcessingStatusManager {
): Promise<void> {
await prisma.sessionProcessingStatus.upsert({
where: {
sessionId_stage: { sessionId, stage }
sessionId_stage: { sessionId, stage },
},
update: {
status: ProcessingStatus.FAILED,
@ -130,7 +133,7 @@ export class ProcessingStatusManager {
): Promise<void> {
await prisma.sessionProcessingStatus.upsert({
where: {
sessionId_stage: { sessionId, stage }
sessionId_stage: { sessionId, stage },
},
update: {
status: ProcessingStatus.SKIPPED,
@ -154,7 +157,7 @@ export class ProcessingStatusManager {
static async getSessionStatus(sessionId: string) {
return await prisma.sessionProcessingStatus.findMany({
where: { sessionId },
orderBy: { stage: 'asc' },
orderBy: { stage: "asc" },
});
}
@ -179,7 +182,7 @@ export class ProcessingStatusManager {
},
},
take: limit,
orderBy: { session: { createdAt: 'asc' } },
orderBy: { session: { createdAt: "asc" } },
});
}
@ -189,7 +192,7 @@ export class ProcessingStatusManager {
static async getPipelineStatus() {
// Get counts by stage and status
const statusCounts = await prisma.sessionProcessingStatus.groupBy({
by: ['stage', 'status'],
by: ["stage", "status"],
_count: { id: true },
});
@ -233,17 +236,20 @@ export class ProcessingStatusManager {
},
},
},
orderBy: { completedAt: 'desc' },
orderBy: { completedAt: "desc" },
});
}
/**
* Reset a failed stage for retry
*/
static async resetStageForRetry(sessionId: string, stage: ProcessingStage): Promise<void> {
static async resetStageForRetry(
sessionId: string,
stage: ProcessingStage
): Promise<void> {
await prisma.sessionProcessingStatus.update({
where: {
sessionId_stage: { sessionId, stage }
sessionId_stage: { sessionId, stage },
},
data: {
status: ProcessingStatus.PENDING,
@ -257,10 +263,13 @@ export class ProcessingStatusManager {
/**
* Check if a session has completed a specific stage
*/
static async hasCompletedStage(sessionId: string, stage: ProcessingStage): Promise<boolean> {
static async hasCompletedStage(
sessionId: string,
stage: ProcessingStage
): Promise<boolean> {
const status = await prisma.sessionProcessingStatus.findUnique({
where: {
sessionId_stage: { sessionId, stage }
sessionId_stage: { sessionId, stage },
},
});
@ -270,7 +279,10 @@ export class ProcessingStatusManager {
/**
* Check if a session is ready for a specific stage (previous stages completed)
*/
static async isReadyForStage(sessionId: string, stage: ProcessingStage): Promise<boolean> {
static async isReadyForStage(
sessionId: string,
stage: ProcessingStage
): Promise<boolean> {
const stageOrder = [
ProcessingStage.CSV_IMPORT,
ProcessingStage.TRANSCRIPT_FETCH,

View File

@ -8,11 +8,13 @@ export function startCsvImportScheduler() {
const config = getSchedulerConfig();
if (!config.enabled) {
console.log('[CSV Import Scheduler] Disabled via configuration');
console.log("[CSV Import Scheduler] Disabled via configuration");
return;
}
console.log(`[CSV Import Scheduler] Starting with interval: ${config.csvImport.interval}`);
console.log(
`[CSV Import Scheduler] Starting with interval: ${config.csvImport.interval}`
);
cron.schedule(config.csvImport.interval, async () => {
const companies = await prisma.company.findMany();

View File

@ -1,7 +1,10 @@
// Legacy scheduler configuration - now uses centralized env management
// This file is kept for backward compatibility but delegates to lib/env.ts
import { getSchedulerConfig as getEnvSchedulerConfig, logEnvConfig } from "./env";
import {
getSchedulerConfig as getEnvSchedulerConfig,
logEnvConfig,
} from "./env";
export interface SchedulerConfig {
enabled: boolean;

View File

@ -23,7 +23,7 @@ export async function fetchTranscriptContent(
if (!url || !url.trim()) {
return {
success: false,
error: 'No transcript URL provided',
error: "No transcript URL provided",
};
}
@ -34,7 +34,7 @@ export async function fetchTranscriptContent(
: undefined;
const headers: Record<string, string> = {
'User-Agent': 'LiveDash-Transcript-Fetcher/1.0',
"User-Agent": "LiveDash-Transcript-Fetcher/1.0",
};
if (authHeader) {
@ -46,7 +46,7 @@ export async function fetchTranscriptContent(
const timeoutId = setTimeout(() => controller.abort(), 30000); // 30 second timeout
const response = await fetch(url, {
method: 'GET',
method: "GET",
headers,
signal: controller.signal,
});
@ -65,7 +65,7 @@ export async function fetchTranscriptContent(
if (!content || content.trim().length === 0) {
return {
success: false,
error: 'Empty transcript content',
error: "Empty transcript content",
};
}
@ -73,29 +73,28 @@ export async function fetchTranscriptContent(
success: true,
content: content.trim(),
};
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
// Handle common network errors
if (errorMessage.includes('ENOTFOUND')) {
if (errorMessage.includes("ENOTFOUND")) {
return {
success: false,
error: 'Domain not found',
error: "Domain not found",
};
}
if (errorMessage.includes('ECONNREFUSED')) {
if (errorMessage.includes("ECONNREFUSED")) {
return {
success: false,
error: 'Connection refused',
error: "Connection refused",
};
}
if (errorMessage.includes('timeout')) {
if (errorMessage.includes("timeout")) {
return {
success: false,
error: 'Request timeout',
error: "Request timeout",
};
}
@ -112,13 +111,13 @@ export async function fetchTranscriptContent(
* @returns boolean indicating if URL appears valid
*/
export function isValidTranscriptUrl(url: string): boolean {
if (!url || typeof url !== 'string') {
if (!url || typeof url !== "string") {
return false;
}
try {
const parsedUrl = new URL(url);
return parsedUrl.protocol === 'http:' || parsedUrl.protocol === 'https:';
return parsedUrl.protocol === "http:" || parsedUrl.protocol === "https:";
} catch {
return false;
}

View File

@ -1,5 +1,5 @@
// Transcript parsing utility for converting raw transcript content into structured messages
import { prisma } from './prisma.js';
import { prisma } from "./prisma.js";
export interface ParsedMessage {
sessionId: string;
@ -19,7 +19,9 @@ export interface TranscriptParseResult {
* Parse European date format (DD.MM.YYYY HH:mm:ss) to Date object
*/
function parseEuropeanDate(dateStr: string): Date {
const match = dateStr.match(/(\d{2})\.(\d{2})\.(\d{4}) (\d{2}):(\d{2}):(\d{2})/);
const match = dateStr.match(
/(\d{2})\.(\d{2})\.(\d{4}) (\d{2}):(\d{2}):(\d{2})/
);
if (!match) {
throw new Error(`Invalid date format: ${dateStr}`);
}
@ -51,13 +53,17 @@ export function parseTranscriptToMessages(
if (!content || !content.trim()) {
return {
success: false,
error: 'Empty transcript content'
error: "Empty transcript content",
};
}
const messages: ParsedMessage[] = [];
const lines = content.split('\n');
let currentMessage: { role: string; content: string; timestamp?: string } | null = null;
const lines = content.split("\n");
let currentMessage: {
role: string;
content: string;
timestamp?: string;
} | null = null;
let order = 0;
for (const line of lines) {
@ -69,56 +75,64 @@ export function parseTranscriptToMessages(
}
// Check if line starts with a timestamp and role [DD.MM.YYYY HH:MM:SS] Role: content
const timestampRoleMatch = trimmedLine.match(/^\[(\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})\]\s+(User|Assistant|System|user|assistant|system):\s*(.*)$/i);
const timestampRoleMatch = trimmedLine.match(
/^\[(\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})\]\s+(User|Assistant|System|user|assistant|system):\s*(.*)$/i
);
// Check if line starts with just a role (User:, Assistant:, System:, etc.)
const roleMatch = trimmedLine.match(/^(User|Assistant|System|user|assistant|system):\s*(.*)$/i);
const roleMatch = trimmedLine.match(
/^(User|Assistant|System|user|assistant|system):\s*(.*)$/i
);
if (timestampRoleMatch) {
// Save previous message if exists
if (currentMessage) {
messages.push({
sessionId: '', // Will be set by caller
sessionId: "", // Will be set by caller
timestamp: new Date(), // Will be calculated below
role: currentMessage.role,
content: currentMessage.content.trim(),
order: order++
order: order++,
});
}
// Start new message with timestamp
const timestamp = timestampRoleMatch[1];
const role = timestampRoleMatch[2].charAt(0).toUpperCase() + timestampRoleMatch[2].slice(1).toLowerCase();
const content = timestampRoleMatch[3] || '';
const role =
timestampRoleMatch[2].charAt(0).toUpperCase() +
timestampRoleMatch[2].slice(1).toLowerCase();
const content = timestampRoleMatch[3] || "";
currentMessage = {
role,
content,
timestamp // Store the timestamp for later parsing
timestamp, // Store the timestamp for later parsing
};
} else if (roleMatch) {
// Save previous message if exists
if (currentMessage) {
messages.push({
sessionId: '', // Will be set by caller
sessionId: "", // Will be set by caller
timestamp: new Date(), // Will be calculated below
role: currentMessage.role,
content: currentMessage.content.trim(),
order: order++
order: order++,
});
}
// Start new message without timestamp
const role = roleMatch[1].charAt(0).toUpperCase() + roleMatch[1].slice(1).toLowerCase();
const content = roleMatch[2] || '';
const role =
roleMatch[1].charAt(0).toUpperCase() +
roleMatch[1].slice(1).toLowerCase();
const content = roleMatch[2] || "";
currentMessage = {
role,
content
content,
};
} else if (currentMessage) {
// Continue previous message (multi-line)
currentMessage.content += '\n' + trimmedLine;
currentMessage.content += "\n" + trimmedLine;
}
// If no current message and no role match, skip the line (orphaned content)
}
@ -126,23 +140,23 @@ export function parseTranscriptToMessages(
// Save the last message
if (currentMessage) {
messages.push({
sessionId: '', // Will be set by caller
sessionId: "", // Will be set by caller
timestamp: new Date(), // Will be calculated below
role: currentMessage.role,
content: currentMessage.content.trim(),
order: order++
order: order++,
});
}
if (messages.length === 0) {
return {
success: false,
error: 'No messages found in transcript'
error: "No messages found in transcript",
};
}
// Calculate timestamps - use parsed timestamps if available, otherwise distribute across session duration
const hasTimestamps = messages.some(msg => (msg as any).timestamp);
const hasTimestamps = messages.some((msg) => (msg as any).timestamp);
if (hasTimestamps) {
// Use parsed timestamps from the transcript
@ -154,35 +168,45 @@ export function parseTranscriptToMessages(
} catch (error) {
// Fallback to distributed timestamp if parsing fails
const sessionDurationMs = endTime.getTime() - startTime.getTime();
const messageInterval = messages.length > 1 ? sessionDurationMs / (messages.length - 1) : 0;
message.timestamp = new Date(startTime.getTime() + (index * messageInterval));
const messageInterval =
messages.length > 1
? sessionDurationMs / (messages.length - 1)
: 0;
message.timestamp = new Date(
startTime.getTime() + index * messageInterval
);
}
} else {
// Fallback to distributed timestamp
const sessionDurationMs = endTime.getTime() - startTime.getTime();
const messageInterval = messages.length > 1 ? sessionDurationMs / (messages.length - 1) : 0;
message.timestamp = new Date(startTime.getTime() + (index * messageInterval));
const messageInterval =
messages.length > 1 ? sessionDurationMs / (messages.length - 1) : 0;
message.timestamp = new Date(
startTime.getTime() + index * messageInterval
);
}
});
} else {
// Distribute messages across session duration
const sessionDurationMs = endTime.getTime() - startTime.getTime();
const messageInterval = messages.length > 1 ? sessionDurationMs / (messages.length - 1) : 0;
const messageInterval =
messages.length > 1 ? sessionDurationMs / (messages.length - 1) : 0;
messages.forEach((message, index) => {
message.timestamp = new Date(startTime.getTime() + (index * messageInterval));
message.timestamp = new Date(
startTime.getTime() + index * messageInterval
);
});
}
return {
success: true,
messages
messages,
};
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : String(error)
error: error instanceof Error ? error.message : String(error),
};
}
}
@ -198,17 +222,17 @@ export async function storeMessagesForSession(
): Promise<void> {
// Delete existing messages for this session (in case of re-processing)
await prisma.message.deleteMany({
where: { sessionId }
where: { sessionId },
});
// Create new messages
const messagesWithSessionId = messages.map(msg => ({
const messagesWithSessionId = messages.map((msg) => ({
...msg,
sessionId
sessionId,
}));
await prisma.message.createMany({
data: messagesWithSessionId
data: messagesWithSessionId,
});
}
@ -216,13 +240,15 @@ export async function storeMessagesForSession(
* Process transcript for a single session
* @param sessionId The session ID to process
*/
export async function processSessionTranscript(sessionId: string): Promise<void> {
export async function processSessionTranscript(
sessionId: string
): Promise<void> {
// Get the session and its import data
const session = await prisma.session.findUnique({
where: { id: sessionId },
include: {
import: true
}
import: true,
},
});
if (!session) {
@ -255,35 +281,37 @@ export async function processSessionTranscript(sessionId: string): Promise<void>
// Store the messages
await storeMessagesForSession(sessionId, parseResult.messages!);
console.log(`✅ Processed ${parseResult.messages!.length} messages for session ${sessionId}`);
console.log(
`✅ Processed ${parseResult.messages!.length} messages for session ${sessionId}`
);
}
/**
* Process all sessions that have transcript content but no messages
*/
export async function processAllUnparsedTranscripts(): Promise<void> {
console.log('🔍 Finding sessions with unparsed transcripts...');
console.log("🔍 Finding sessions with unparsed transcripts...");
// Find sessions that have transcript content but no messages
const sessionsToProcess = await prisma.session.findMany({
where: {
import: {
rawTranscriptContent: {
not: null
}
not: null,
},
},
messages: {
none: {}
}
none: {},
},
},
include: {
import: true,
_count: {
select: {
messages: true
}
}
}
messages: true,
},
},
},
});
console.log(`📋 Found ${sessionsToProcess.length} sessions to process`);
@ -323,7 +351,7 @@ export async function getTotalMessageCount(): Promise<number> {
export async function getMessagesForSession(sessionId: string) {
return await prisma.message.findMany({
where: { sessionId },
orderBy: { order: 'asc' }
orderBy: { order: "asc" },
});
}
@ -336,17 +364,17 @@ export async function getParsingStats() {
where: {
import: {
rawTranscriptContent: {
not: null
}
}
}
not: null,
},
},
},
});
const sessionsWithMessages = await prisma.session.count({
where: {
messages: {
some: {}
}
}
some: {},
},
},
});
const totalMessages = await getTotalMessageCount();
@ -355,6 +383,6 @@ export async function getParsingStats() {
sessionsWithTranscripts,
sessionsWithMessages,
unparsedSessions: sessionsWithTranscripts - sessionsWithMessages,
totalMessages
totalMessages,
};
}

View File

@ -1,6 +1,6 @@
import { clsx, type ClassValue } from "clsx"
import { twMerge } from "tailwind-merge"
import { clsx, type ClassValue } from "clsx";
import { twMerge } from "tailwind-merge";
export function cn(...inputs: ClassValue[]) {
return twMerge(clsx(inputs))
return twMerge(clsx(inputs));
}

134
lib/validation.ts Normal file
View File

@ -0,0 +1,134 @@
import { z } from "zod";
// Password validation with strong requirements
const passwordSchema = z
.string()
.min(12, "Password must be at least 12 characters long")
.regex(/^(?=.*[a-z])/, "Password must contain at least one lowercase letter")
.regex(/^(?=.*[A-Z])/, "Password must contain at least one uppercase letter")
.regex(/^(?=.*\d)/, "Password must contain at least one number")
.regex(
/^(?=.*[@$!%*?&])/,
"Password must contain at least one special character (@$!%*?&)"
);
// Email validation
const emailSchema = z
.string()
.email("Invalid email format")
.max(255, "Email must be less than 255 characters")
.toLowerCase();
// Company name validation
const companyNameSchema = z
.string()
.min(1, "Company name is required")
.max(100, "Company name must be less than 100 characters")
.regex(/^[a-zA-Z0-9\s\-_.]+$/, "Company name contains invalid characters");
// User registration schema
export const registerSchema = z.object({
email: emailSchema,
password: passwordSchema,
company: companyNameSchema,
});
// User login schema
export const loginSchema = z.object({
email: emailSchema,
password: z.string().min(1, "Password is required"),
});
// Password reset request schema
export const forgotPasswordSchema = z.object({
email: emailSchema,
});
// Password reset schema
export const resetPasswordSchema = z.object({
token: z.string().min(1, "Reset token is required"),
password: passwordSchema,
});
// Session filter schema
export const sessionFilterSchema = z.object({
search: z.string().max(100).optional(),
sentiment: z.enum(["POSITIVE", "NEUTRAL", "NEGATIVE"]).optional(),
category: z
.enum([
"SCHEDULE_HOURS",
"LEAVE_VACATION",
"SICK_LEAVE_RECOVERY",
"SALARY_COMPENSATION",
"CONTRACT_HOURS",
"ONBOARDING",
"OFFBOARDING",
"WORKWEAR_STAFF_PASS",
"TEAM_CONTACTS",
"PERSONAL_QUESTIONS",
"ACCESS_LOGIN",
"SOCIAL_QUESTIONS",
"UNRECOGNIZED_OTHER",
])
.optional(),
startDate: z.string().datetime().optional(),
endDate: z.string().datetime().optional(),
page: z.number().int().min(1).default(1),
limit: z.number().int().min(1).max(100).default(20),
});
// Company settings schema
export const companySettingsSchema = z.object({
name: companyNameSchema,
csvUrl: z.string().url("Invalid CSV URL"),
csvUsername: z.string().max(100).optional(),
csvPassword: z.string().max(100).optional(),
sentimentAlert: z.number().min(0).max(1).optional(),
dashboardOpts: z.object({}).passthrough().optional(),
});
// User management schema
export const userUpdateSchema = z.object({
email: emailSchema.optional(),
role: z.enum(["ADMIN", "USER", "AUDITOR"]).optional(),
password: passwordSchema.optional(),
});
// Metrics query schema
export const metricsQuerySchema = z.object({
startDate: z.string().datetime().optional(),
endDate: z.string().datetime().optional(),
companyId: z.string().uuid().optional(),
});
// Helper function to validate and sanitize input
export function validateInput<T>(
schema: z.ZodSchema<T>,
data: unknown
): { success: true; data: T } | { success: false; errors: string[] } {
try {
const result = schema.parse(data);
return { success: true, data: result };
} catch (error) {
if (error instanceof z.ZodError) {
const errors = error.errors.map(
(err) => `${err.path.join(".")}: ${err.message}`
);
return { success: false, errors };
}
return { success: false, errors: ["Invalid input"] };
}
}
// Rate limiting helper types
export interface RateLimitConfig {
windowMs: number;
maxRequests: number;
skipSuccessfulRequests?: boolean;
}
export const rateLimitConfigs = {
auth: { windowMs: 15 * 60 * 1000, maxRequests: 5 }, // 5 requests per 15 minutes
registration: { windowMs: 60 * 60 * 1000, maxRequests: 3 }, // 3 registrations per hour
api: { windowMs: 15 * 60 * 1000, maxRequests: 100 }, // 100 API requests per 15 minutes
} as const;