Broken shit

This commit is contained in:
Max Kowalski
2025-06-26 21:00:19 +02:00
parent ab2c75b736
commit 653d70022b
49 changed files with 2826 additions and 2102 deletions

View File

@ -1,636 +0,0 @@
// JavaScript version of csvFetcher with session storage functionality
import fetch from "node-fetch";
import { parse } from "csv-parse/sync";
import ISO6391 from "iso-639-1";
import countries from "i18n-iso-countries";
import { PrismaClient } from "@prisma/client";
// Register locales for i18n-iso-countries
import enLocale from "i18n-iso-countries/langs/en.json" with { type: "json" };
countries.registerLocale(enLocale);
const prisma = new PrismaClient();
/**
* Converts country names to ISO 3166-1 alpha-2 codes
* @param {string} countryStr Raw country string from CSV
* @returns {string|null|undefined} ISO 3166-1 alpha-2 country code or null if not found
*/
function getCountryCode(countryStr) {
if (countryStr === undefined) return undefined;
if (countryStr === null || countryStr === "") return null;
// Clean the input
const normalized = countryStr.trim();
if (!normalized) return null;
// Direct ISO code check (if already a 2-letter code)
if (normalized.length === 2 && normalized === normalized.toUpperCase()) {
return countries.isValid(normalized) ? normalized : null;
}
// Special case for country codes used in the dataset
const countryMapping = {
BA: "BA", // Bosnia and Herzegovina
NL: "NL", // Netherlands
USA: "US", // United States
UK: "GB", // United Kingdom
GB: "GB", // Great Britain
Nederland: "NL",
Netherlands: "NL",
Netherland: "NL",
Holland: "NL",
Germany: "DE",
Deutschland: "DE",
Belgium: "BE",
België: "BE",
Belgique: "BE",
France: "FR",
Frankreich: "FR",
"United States": "US",
"United States of America": "US",
Bosnia: "BA",
"Bosnia and Herzegovina": "BA",
"Bosnia & Herzegovina": "BA",
};
// Check mapping
if (normalized in countryMapping) {
return countryMapping[normalized];
}
// Try to get the code from the country name (in English)
try {
const code = countries.getAlpha2Code(normalized, "en");
if (code) return code;
} catch (error) {
process.stderr.write(
`[CSV] Error converting country name to code: ${normalized} - ${error}\n`
);
}
// If all else fails, return null
return null;
}
/**
* Converts language names to ISO 639-1 codes
* @param {string} languageStr Raw language string from CSV
* @returns {string|null|undefined} ISO 639-1 language code or null if not found
*/
function getLanguageCode(languageStr) {
if (languageStr === undefined) return undefined;
if (languageStr === null || languageStr === "") return null;
// Clean the input
const normalized = languageStr.trim();
if (!normalized) return null;
// Direct ISO code check (if already a 2-letter code)
if (normalized.length === 2 && normalized === normalized.toLowerCase()) {
return ISO6391.validate(normalized) ? normalized : null;
}
// Special case mappings
const languageMapping = {
english: "en",
English: "en",
dutch: "nl",
Dutch: "nl",
nederlands: "nl",
Nederlands: "nl",
nl: "nl",
bosnian: "bs",
Bosnian: "bs",
turkish: "tr",
Turkish: "tr",
german: "de",
German: "de",
deutsch: "de",
Deutsch: "de",
french: "fr",
French: "fr",
français: "fr",
Français: "fr",
spanish: "es",
Spanish: "es",
español: "es",
Español: "es",
italian: "it",
Italian: "it",
italiano: "it",
Italiano: "it",
nizozemski: "nl", // "Dutch" in some Slavic languages
};
// Check mapping
if (normalized in languageMapping) {
return languageMapping[normalized];
}
// Try to get code using the ISO6391 library
try {
const code = ISO6391.getCode(normalized);
if (code) return code;
} catch (error) {
process.stderr.write(
`[CSV] Error converting language name to code: ${normalized} - ${error}\n`
);
}
// If all else fails, return null
return null;
}
/**
* Normalizes category values to standard groups
* @param {string} categoryStr The raw category string from CSV
* @returns {string|null} A normalized category string
*/
function normalizeCategory(categoryStr) {
if (!categoryStr) return null;
const normalized = categoryStr.toLowerCase().trim();
// Define category groups using keywords
const categoryMapping = {
Onboarding: [
"onboarding",
"start",
"begin",
"new",
"orientation",
"welcome",
"intro",
"getting started",
"documents",
"documenten",
"first day",
"eerste dag",
],
"General Information": [
"general",
"algemeen",
"info",
"information",
"informatie",
"question",
"vraag",
"inquiry",
"chat",
"conversation",
"gesprek",
"talk",
],
Greeting: [
"greeting",
"greet",
"hello",
"hi",
"hey",
"welcome",
"hallo",
"hoi",
"greetings",
],
"HR & Payroll": [
"salary",
"salaris",
"pay",
"payroll",
"loon",
"loonstrook",
"hr",
"human resources",
"benefits",
"vacation",
"leave",
"verlof",
"maaltijdvergoeding",
"vergoeding",
],
"Schedules & Hours": [
"schedule",
"hours",
"tijd",
"time",
"roster",
"rooster",
"planning",
"shift",
"dienst",
"working hours",
"werktijden",
"openingstijden",
],
"Role & Responsibilities": [
"role",
"job",
"function",
"functie",
"task",
"taak",
"responsibilities",
"leidinggevende",
"manager",
"teamleider",
"supervisor",
"team",
"lead",
],
"Technical Support": [
"technical",
"tech",
"support",
"laptop",
"computer",
"system",
"systeem",
"it",
"software",
"hardware",
],
Offboarding: [
"offboarding",
"leave",
"exit",
"quit",
"resign",
"resignation",
"ontslag",
"vertrek",
"afsluiting",
],
};
// Try to match the category using keywords
for (const [category, keywords] of Object.entries(categoryMapping)) {
if (keywords.some((keyword) => normalized.includes(keyword))) {
return category;
}
}
// If no match, return "Other"
return "Other";
}
/**
* Converts sentiment string values to numeric scores
* @param {string} sentimentStr The sentiment string from the CSV
* @returns {number|null} A numeric score representing the sentiment
*/
function mapSentimentToScore(sentimentStr) {
if (!sentimentStr) return null;
// Convert to lowercase for case-insensitive matching
const sentiment = sentimentStr.toLowerCase();
// Map sentiment strings to numeric values on a scale from -1 to 2
const sentimentMap = {
happy: 1.0,
excited: 1.5,
positive: 0.8,
neutral: 0.0,
playful: 0.7,
negative: -0.8,
angry: -1.0,
sad: -0.7,
frustrated: -0.9,
positief: 0.8, // Dutch
neutraal: 0.0, // Dutch
negatief: -0.8, // Dutch
positivo: 0.8, // Spanish/Italian
neutro: 0.0, // Spanish/Italian
negativo: -0.8, // Spanish/Italian
yes: 0.5, // For any "yes" sentiment
no: -0.5, // For any "no" sentiment
};
return sentimentMap[sentiment] !== undefined
? sentimentMap[sentiment]
: isNaN(parseFloat(sentiment))
? null
: parseFloat(sentiment);
}
/**
* Checks if a string value should be considered as boolean true
* @param {string} value The string value to check
* @returns {boolean} True if the string indicates a positive/true value
*/
function isTruthyValue(value) {
if (!value) return false;
const truthyValues = [
"1",
"true",
"yes",
"y",
"ja",
"si",
"oui",
"да",
"да",
"はい",
];
return truthyValues.includes(value.toLowerCase());
}
/**
* Safely parses a date string into a Date object.
* @param {string} dateStr The date string to parse.
* @returns {Date|null} A Date object or null if parsing fails.
*/
function safeParseDate(dateStr) {
if (!dateStr) return null;
// Try to parse D-M-YYYY HH:MM:SS format (with hyphens or dots)
const dateTimeRegex =
/^(\d{1,2})[.-](\d{1,2})[.-](\d{4}) (\d{1,2}):(\d{1,2}):(\d{1,2})$/;
const match = dateStr.match(dateTimeRegex);
if (match) {
const day = match[1];
const month = match[2];
const year = match[3];
const hour = match[4];
const minute = match[5];
const second = match[6];
// Reformat to YYYY-MM-DDTHH:MM:SS (ISO-like, but local time)
// Ensure month and day are two digits
const formattedDateStr = `${year}-${month.padStart(2, "0")}-${day.padStart(2, "0")}T${hour.padStart(2, "0")}:${minute.padStart(2, "0")}:${second.padStart(2, "0")}`;
try {
const date = new Date(formattedDateStr);
// Basic validation: check if the constructed date is valid
if (!isNaN(date.getTime())) {
return date;
}
} catch (e) {
console.warn(
`[safeParseDate] Error parsing reformatted string ${formattedDateStr} from ${dateStr}:`,
e
);
}
}
// Fallback for other potential formats (e.g., direct ISO 8601) or if the primary parse failed
try {
const parsedDate = new Date(dateStr);
if (!isNaN(parsedDate.getTime())) {
return parsedDate;
}
} catch (e) {
console.warn(`[safeParseDate] Error parsing with fallback ${dateStr}:`, e);
}
console.warn(`Failed to parse date string: ${dateStr}`);
return null;
}
/**
* Fetches transcript content from a URL
* @param {string} url The URL to fetch the transcript from
* @param {string} username Optional username for authentication
* @param {string} password Optional password for authentication
* @returns {Promise<string|null>} The transcript content or null if fetching fails
*/
async function fetchTranscriptContent(url, username, password) {
try {
const authHeader =
username && password
? "Basic " + Buffer.from(`${username}:${password}`).toString("base64")
: undefined;
const response = await fetch(url, {
headers: authHeader ? { Authorization: authHeader } : {},
timeout: 10000, // 10 second timeout
});
if (!response.ok) {
// Only log error once per batch, not for every transcript
if (Math.random() < 0.1) {
// Log ~10% of errors to avoid spam
console.warn(
`[CSV] Transcript fetch failed for ${url}: ${response.status} ${response.statusText}`
);
}
return null;
}
return await response.text();
} catch (error) {
// Only log error once per batch, not for every transcript
if (Math.random() < 0.1) {
// Log ~10% of errors to avoid spam
console.warn(`[CSV] Transcript fetch error for ${url}:`, error.message);
}
return null;
}
}
/**
* Fetches and parses CSV data from a URL
* @param {string} url The CSV URL
* @param {string} username Optional username for authentication
* @param {string} password Optional password for authentication
* @returns {Promise<Object[]>} Array of parsed session data
*/
export async function fetchAndParseCsv(url, username, password) {
const authHeader =
username && password
? "Basic " + Buffer.from(`${username}:${password}`).toString("base64")
: undefined;
const res = await fetch(url, {
headers: authHeader ? { Authorization: authHeader } : {},
});
if (!res.ok) throw new Error("Failed to fetch CSV: " + res.statusText);
const text = await res.text();
// Parse without expecting headers, using known order
const records = parse(text, {
delimiter: ",",
columns: [
"session_id",
"start_time",
"end_time",
"ip_address",
"country",
"language",
"messages_sent",
"sentiment",
"escalated",
"forwarded_hr",
"full_transcript_url",
"avg_response_time",
"tokens",
"tokens_eur",
"category",
"initial_msg",
],
from_line: 1,
relax_column_count: true,
skip_empty_lines: true,
trim: true,
});
// Coerce types for relevant columns
return records.map((r) => ({
id: r.session_id,
startTime: safeParseDate(r.start_time) || new Date(), // Fallback to current date if invalid
endTime: safeParseDate(r.end_time),
ipAddress: r.ip_address,
country: getCountryCode(r.country),
language: getLanguageCode(r.language),
messagesSent: Number(r.messages_sent) || 0,
sentiment: mapSentimentToScore(r.sentiment),
escalated: isTruthyValue(r.escalated),
forwardedHr: isTruthyValue(r.forwarded_hr),
fullTranscriptUrl: r.full_transcript_url,
avgResponseTime: r.avg_response_time
? parseFloat(r.avg_response_time)
: null,
tokens: Number(r.tokens) || 0,
tokensEur: r.tokens_eur ? parseFloat(r.tokens_eur) : 0,
category: normalizeCategory(r.category),
initialMsg: r.initial_msg,
}));
}
/**
* Fetches and stores sessions for all companies
*/
export async function fetchAndStoreSessionsForAllCompanies() {
try {
// Get all companies
const companies = await prisma.company.findMany();
for (const company of companies) {
if (!company.csvUrl) {
console.log(
`[Scheduler] Skipping company ${company.id} - no CSV URL configured`
);
continue;
}
// Skip companies with invalid/example URLs
if (
company.csvUrl.includes("example.com") ||
company.csvUrl === "https://example.com/data.csv"
) {
console.log(
`[Scheduler] Skipping company ${company.id} - invalid/example CSV URL: ${company.csvUrl}`
);
continue;
}
console.log(`[Scheduler] Processing sessions for company: ${company.id}`);
try {
const sessions = await fetchAndParseCsv(
company.csvUrl,
company.csvUsername,
company.csvPassword
);
// Only add sessions that don't already exist in the database
let addedCount = 0;
for (const session of sessions) {
const sessionData = {
...session,
companyId: company.id,
id:
session.id ||
session.sessionId ||
`sess_${Date.now()}_${Math.random().toString(36).substring(2, 7)}`,
// Ensure startTime is not undefined
startTime: session.startTime || new Date(),
};
// Validate dates to prevent "Invalid Date" errors
const startTime =
sessionData.startTime instanceof Date &&
!isNaN(sessionData.startTime.getTime())
? sessionData.startTime
: new Date();
const endTime =
session.endTime instanceof Date && !isNaN(session.endTime.getTime())
? session.endTime
: new Date();
// Note: transcriptContent field was removed from schema
// Transcript content can be fetched on-demand from fullTranscriptUrl
// Check if the session already exists
const existingSession = await prisma.session.findUnique({
where: { id: sessionData.id },
});
if (existingSession) {
// Skip this session as it already exists
continue;
}
// Only include fields that are properly typed for Prisma
await prisma.session.create({
data: {
id: sessionData.id,
companyId: sessionData.companyId,
startTime: startTime,
endTime: endTime,
ipAddress: session.ipAddress || null,
country: session.country || null,
language: session.language || null,
messagesSent:
typeof session.messagesSent === "number"
? session.messagesSent
: 0,
sentiment:
typeof session.sentiment === "number"
? session.sentiment
: null,
escalated:
typeof session.escalated === "boolean"
? session.escalated
: null,
forwardedHr:
typeof session.forwardedHr === "boolean"
? session.forwardedHr
: null,
fullTranscriptUrl: session.fullTranscriptUrl || null,
avgResponseTime:
typeof session.avgResponseTime === "number"
? session.avgResponseTime
: null,
tokens:
typeof session.tokens === "number" ? session.tokens : null,
tokensEur:
typeof session.tokensEur === "number"
? session.tokensEur
: null,
category: session.category || null,
initialMsg: session.initialMsg || null,
},
});
addedCount++;
}
console.log(
`[Scheduler] Added ${addedCount} new sessions for company ${company.id}`
);
} catch (error) {
console.error(
`[Scheduler] Error processing company ${company.id}:`,
error
);
}
}
} catch (error) {
console.error("[Scheduler] Error fetching companies:", error);
throw error;
}
}

View File

@ -50,65 +50,16 @@ interface SessionData {
}
/**
* Converts country names to ISO 3166-1 alpha-2 codes
* Passes through country data as-is (no mapping)
* @param countryStr Raw country string from CSV
* @returns ISO 3166-1 alpha-2 country code or null if not found
* @returns The country string as-is or null if empty
*/
function getCountryCode(countryStr?: string): string | null | undefined {
if (countryStr === undefined) return undefined;
if (countryStr === null || countryStr === "") return null;
// Clean the input
const normalized = countryStr.trim();
if (!normalized) return null;
// Direct ISO code check (if already a 2-letter code)
if (normalized.length === 2 && normalized === normalized.toUpperCase()) {
return countries.isValid(normalized) ? normalized : null;
}
// Special case for country codes used in the dataset
const countryMapping: Record<string, string> = {
BA: "BA", // Bosnia and Herzegovina
NL: "NL", // Netherlands
USA: "US", // United States
UK: "GB", // United Kingdom
GB: "GB", // Great Britain
Nederland: "NL",
Netherlands: "NL",
Netherland: "NL",
Holland: "NL",
Germany: "DE",
Deutschland: "DE",
Belgium: "BE",
België: "BE",
Belgique: "BE",
France: "FR",
Frankreich: "FR",
"United States": "US",
"United States of America": "US",
Bosnia: "BA",
"Bosnia and Herzegovina": "BA",
"Bosnia & Herzegovina": "BA",
};
// Check mapping
if (normalized in countryMapping) {
return countryMapping[normalized];
}
// Try to get the code from the country name (in English)
try {
const code = countries.getAlpha2Code(normalized, "en");
if (code) return code;
} catch (error) {
process.stderr.write(
`[CSV] Error converting country name to code: ${normalized} - ${error}\n`
);
}
// If all else fails, return null
return null;
return normalized || null;
}
/**
@ -180,135 +131,15 @@ function getLanguageCode(languageStr?: string): string | null | undefined {
}
/**
* Normalizes category values to standard groups
* Passes through category data as-is (no mapping)
* @param categoryStr The raw category string from CSV
* @returns A normalized category string
* @returns The category string as-is or null if empty
*/
function normalizeCategory(categoryStr?: string): string | null {
if (!categoryStr) return null;
const normalized = categoryStr.toLowerCase().trim();
// Define category groups using keywords
const categoryMapping: Record<string, string[]> = {
Onboarding: [
"onboarding",
"start",
"begin",
"new",
"orientation",
"welcome",
"intro",
"getting started",
"documents",
"documenten",
"first day",
"eerste dag",
],
"General Information": [
"general",
"algemeen",
"info",
"information",
"informatie",
"question",
"vraag",
"inquiry",
"chat",
"conversation",
"gesprek",
"talk",
],
Greeting: [
"greeting",
"greet",
"hello",
"hi",
"hey",
"welcome",
"hallo",
"hoi",
"greetings",
],
"HR & Payroll": [
"salary",
"salaris",
"pay",
"payroll",
"loon",
"loonstrook",
"hr",
"human resources",
"benefits",
"vacation",
"leave",
"verlof",
"maaltijdvergoeding",
"vergoeding",
],
"Schedules & Hours": [
"schedule",
"hours",
"tijd",
"time",
"roster",
"rooster",
"planning",
"shift",
"dienst",
"working hours",
"werktijden",
"openingstijden",
],
"Role & Responsibilities": [
"role",
"job",
"function",
"functie",
"task",
"taak",
"responsibilities",
"leidinggevende",
"manager",
"teamleider",
"supervisor",
"team",
"lead",
],
"Technical Support": [
"technical",
"tech",
"support",
"laptop",
"computer",
"system",
"systeem",
"it",
"software",
"hardware",
],
Offboarding: [
"offboarding",
"leave",
"exit",
"quit",
"resign",
"resignation",
"ontslag",
"vertrek",
"afsluiting",
],
};
// Try to match the category using keywords
for (const [category, keywords] of Object.entries(categoryMapping)) {
if (keywords.some((keyword) => normalized.includes(keyword))) {
return category;
}
}
// If no match, return "Other"
return "Other";
const normalized = categoryStr.trim();
return normalized || null;
}
/**

View File

@ -325,7 +325,16 @@ export function sessionMetrics(
sessions: ChatSession[],
companyConfig: CompanyConfig = {}
): MetricsResult {
const totalSessions = sessions.length; // Renamed from 'total' for clarity
// Filter out invalid data sessions for analytics
const validSessions = sessions.filter(session => {
// Include sessions that are either:
// 1. Not processed yet (validData field doesn't exist or is undefined)
// 2. Processed and marked as valid (validData === true)
return session.validData !== false;
});
const totalSessions = validSessions.length; // Only count valid sessions
const totalRawSessions = sessions.length; // Keep track of all sessions for debugging
const byDay: DayMetrics = {};
const byCategory: CategoryMetrics = {};
const byLanguage: LanguageMetrics = {};
@ -350,16 +359,16 @@ export function sessionMetrics(
const wordCounts: { [key: string]: number } = {};
let alerts = 0;
// New metrics variables
const hourlySessionCounts: { [hour: string]: number } = {};
let resolvedChatsCount = 0;
const questionCounts: { [question: string]: number } = {};
// New metrics variables
const hourlySessionCounts: { [hour: string]: number } = {};
let resolvedChatsCount = 0;
const questionCounts: { [question: string]: number } = {};
for (const session of sessions) {
// Track hourly usage for peak time calculation
if (session.startTime) {
const hour = new Date(session.startTime).getHours();
const hourKey = `${hour.toString().padStart(2, '0')}:00`;
const hourKey = `${hour.toString().padStart(2, "0")}:00`;
hourlySessionCounts[hourKey] = (hourlySessionCounts[hourKey] || 0) + 1;
}
@ -514,24 +523,31 @@ export function sessionMetrics(
questionsArray.forEach((question: string) => {
if (question && question.trim().length > 0) {
const cleanQuestion = question.trim();
questionCounts[cleanQuestion] = (questionCounts[cleanQuestion] || 0) + 1;
questionCounts[cleanQuestion] =
(questionCounts[cleanQuestion] || 0) + 1;
}
});
}
} catch (error) {
console.warn(`[metrics] Failed to parse questions JSON for session ${session.id}: ${error}`);
console.warn(
`[metrics] Failed to parse questions JSON for session ${session.id}: ${error}`
);
}
}
// 2. Extract questions from user messages (if available)
if (session.messages) {
session.messages
.filter(msg => msg.role === 'User')
.forEach(msg => {
.filter((msg) => msg.role === "User")
.forEach((msg) => {
const content = msg.content.trim();
// Simple heuristic: if message ends with ? or contains question words, treat as question
if (content.endsWith('?') ||
/\b(what|when|where|why|how|who|which|can|could|would|will|is|are|do|does|did)\b/i.test(content)) {
if (
content.endsWith("?") ||
/\b(what|when|where|why|how|who|which|can|could|would|will|is|are|do|does|did)\b/i.test(
content
)
) {
questionCounts[content] = (questionCounts[content] || 0) + 1;
}
});
@ -540,8 +556,12 @@ export function sessionMetrics(
// 3. Extract questions from initial message as fallback
if (session.initialMsg) {
const content = session.initialMsg.trim();
if (content.endsWith('?') ||
/\b(what|when|where|why|how|who|which|can|could|would|will|is|are|do|does|did)\b/i.test(content)) {
if (
content.endsWith("?") ||
/\b(what|when|where|why|how|who|which|can|could|would|will|is|are|do|does|did)\b/i.test(
content
)
) {
questionCounts[content] = (questionCounts[content] || 0) + 1;
}
}
@ -613,20 +633,23 @@ export function sessionMetrics(
// Calculate new metrics
// 1. Average Daily Costs (euros)
const avgDailyCosts = numDaysWithSessions > 0 ? totalTokensEur / numDaysWithSessions : 0;
const avgDailyCosts =
numDaysWithSessions > 0 ? totalTokensEur / numDaysWithSessions : 0;
// 2. Peak Usage Time
let peakUsageTime = "N/A";
if (Object.keys(hourlySessionCounts).length > 0) {
const peakHour = Object.entries(hourlySessionCounts)
.sort(([, a], [, b]) => b - a)[0][0];
const peakHourNum = parseInt(peakHour.split(':')[0]);
const peakHour = Object.entries(hourlySessionCounts).sort(
([, a], [, b]) => b - a
)[0][0];
const peakHourNum = parseInt(peakHour.split(":")[0]);
const endHour = (peakHourNum + 1) % 24;
peakUsageTime = `${peakHour}-${endHour.toString().padStart(2, '0')}:00`;
peakUsageTime = `${peakHour}-${endHour.toString().padStart(2, "0")}:00`;
}
// 3. Resolved Chats Percentage
const resolvedChatsPercentage = totalSessions > 0 ? (resolvedChatsCount / totalSessions) * 100 : 0;
const resolvedChatsPercentage =
totalSessions > 0 ? (resolvedChatsCount / totalSessions) * 100 : 0;
// 4. Top 5 Asked Questions
const topQuestions: TopQuestion[] = Object.entries(questionCounts)

View File

@ -1,412 +0,0 @@
// Session processing scheduler - JavaScript version
import cron from "node-cron";
import { PrismaClient } from "@prisma/client";
import fetch from "node-fetch";
import { readFileSync } from "fs";
import { fileURLToPath } from "url";
import { dirname, join } from "path";
// Load environment variables from .env.local
const __filename = fileURLToPath(import.meta.url);
const __dirname = dirname(__filename);
const envPath = join(__dirname, '..', '.env.local');
try {
const envFile = readFileSync(envPath, 'utf8');
const envVars = envFile.split('\n').filter(line => line.trim() && !line.startsWith('#'));
envVars.forEach(line => {
const [key, ...valueParts] = line.split('=');
if (key && valueParts.length > 0) {
const value = valueParts.join('=').trim();
if (!process.env[key.trim()]) {
process.env[key.trim()] = value;
}
}
});
} catch (error) {
// Silently fail if .env.local doesn't exist
}
const prisma = new PrismaClient();
const OPENAI_API_KEY = process.env.OPENAI_API_KEY;
const OPENAI_API_URL = "https://api.openai.com/v1/chat/completions";
/**
* Processes a session transcript using OpenAI API
* @param {string} sessionId The session ID
* @param {string} transcript The transcript content to process
* @returns {Promise<Object>} Processed data from OpenAI
*/
async function processTranscriptWithOpenAI(sessionId, transcript) {
if (!OPENAI_API_KEY) {
throw new Error("OPENAI_API_KEY environment variable is not set");
}
// Create a system message with instructions
const systemMessage = `
You are an AI assistant tasked with analyzing chat transcripts.
Extract the following information from the transcript:
1. The primary language used by the user (ISO 639-1 code)
2. Number of messages sent by the user
3. Overall sentiment (positive, neutral, or negative)
4. Whether the conversation was escalated
5. Whether HR contact was mentioned or provided
6. The best-fitting category for the conversation from this list:
- Schedule & Hours
- Leave & Vacation
- Sick Leave & Recovery
- Salary & Compensation
- Contract & Hours
- Onboarding
- Offboarding
- Workwear & Staff Pass
- Team & Contacts
- Personal Questions
- Access & Login
- Social questions
- Unrecognized / Other
7. Up to 5 paraphrased questions asked by the user (in English)
8. A brief summary of the conversation (10-300 characters)
Return the data in JSON format matching this schema:
{
"language": "ISO 639-1 code",
"messages_sent": number,
"sentiment": "positive|neutral|negative",
"escalated": boolean,
"forwarded_hr": boolean,
"category": "one of the categories listed above",
"questions": ["question 1", "question 2", ...],
"summary": "brief summary",
"session_id": "${sessionId}"
}
`;
try {
const response = await fetch(OPENAI_API_URL, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${OPENAI_API_KEY}`,
},
body: JSON.stringify({
model: "gpt-4-turbo",
messages: [
{
role: "system",
content: systemMessage,
},
{
role: "user",
content: transcript,
},
],
temperature: 0.3, // Lower temperature for more consistent results
response_format: { type: "json_object" },
}),
});
if (!response.ok) {
const errorText = await response.text();
throw new Error(`OpenAI API error: ${response.status} - ${errorText}`);
}
const data = await response.json();
const processedData = JSON.parse(data.choices[0].message.content);
// Validate the response against our expected schema
validateOpenAIResponse(processedData);
return processedData;
} catch (error) {
process.stderr.write(`Error processing transcript with OpenAI: ${error}\n`);
throw error;
}
}
/**
* Validates the OpenAI response against our expected schema
* @param {Object} data The data to validate
*/
function validateOpenAIResponse(data) {
// Check required fields
const requiredFields = [
"language",
"messages_sent",
"sentiment",
"escalated",
"forwarded_hr",
"category",
"questions",
"summary",
"session_id",
];
for (const field of requiredFields) {
if (!(field in data)) {
throw new Error(`Missing required field: ${field}`);
}
}
// Validate field types
if (typeof data.language !== "string" || !/^[a-z]{2}$/.test(data.language)) {
throw new Error(
"Invalid language format. Expected ISO 639-1 code (e.g., 'en')"
);
}
if (typeof data.messages_sent !== "number" || data.messages_sent < 0) {
throw new Error("Invalid messages_sent. Expected non-negative number");
}
if (!["positive", "neutral", "negative"].includes(data.sentiment)) {
throw new Error(
"Invalid sentiment. Expected 'positive', 'neutral', or 'negative'"
);
}
if (typeof data.escalated !== "boolean") {
throw new Error("Invalid escalated. Expected boolean");
}
if (typeof data.forwarded_hr !== "boolean") {
throw new Error("Invalid forwarded_hr. Expected boolean");
}
const validCategories = [
"Schedule & Hours",
"Leave & Vacation",
"Sick Leave & Recovery",
"Salary & Compensation",
"Contract & Hours",
"Onboarding",
"Offboarding",
"Workwear & Staff Pass",
"Team & Contacts",
"Personal Questions",
"Access & Login",
"Social questions",
"Unrecognized / Other",
];
if (!validCategories.includes(data.category)) {
throw new Error(
`Invalid category. Expected one of: ${validCategories.join(", ")}`
);
}
if (!Array.isArray(data.questions)) {
throw new Error("Invalid questions. Expected array of strings");
}
if (
typeof data.summary !== "string" ||
data.summary.length < 10 ||
data.summary.length > 300
) {
throw new Error(
"Invalid summary. Expected string between 10-300 characters"
);
}
if (typeof data.session_id !== "string") {
throw new Error("Invalid session_id. Expected string");
}
}
/**
* Process a single session
* @param {Object} session The session to process
* @returns {Promise<Object>} Result object with success/error info
*/
async function processSingleSession(session) {
if (session.messages.length === 0) {
return {
sessionId: session.id,
success: false,
error: "Session has no messages",
};
}
try {
// Convert messages back to transcript format for OpenAI processing
const transcript = session.messages
.map(
(msg) =>
`[${new Date(msg.timestamp)
.toLocaleString("en-GB", {
day: "2-digit",
month: "2-digit",
year: "numeric",
hour: "2-digit",
minute: "2-digit",
second: "2-digit",
})
.replace(",", "")}] ${msg.role}: ${msg.content}`
)
.join("\n");
const processedData = await processTranscriptWithOpenAI(
session.id,
transcript
);
// Map sentiment string to float value for compatibility with existing data
const sentimentMap = {
positive: 0.8,
neutral: 0.0,
negative: -0.8,
};
// Update the session with processed data
await prisma.session.update({
where: { id: session.id },
data: {
language: processedData.language,
messagesSent: processedData.messages_sent,
sentiment: sentimentMap[processedData.sentiment] || 0,
sentimentCategory: processedData.sentiment,
escalated: processedData.escalated,
forwardedHr: processedData.forwarded_hr,
category: processedData.category,
questions: JSON.stringify(processedData.questions),
summary: processedData.summary,
processed: true,
},
});
return {
sessionId: session.id,
success: true,
};
} catch (error) {
return {
sessionId: session.id,
success: false,
error: error.message,
};
}
}
/**
* Process sessions in parallel with concurrency limit
* @param {Array} sessions Array of sessions to process
* @param {number} maxConcurrency Maximum number of concurrent processing tasks
* @returns {Promise<Object>} Processing results
*/
async function processSessionsInParallel(sessions, maxConcurrency = 5) {
const results = [];
const executing = [];
for (const session of sessions) {
const promise = processSingleSession(session).then((result) => {
process.stdout.write(
result.success
? `[ProcessingScheduler] ✓ Successfully processed session ${result.sessionId}\n`
: `[ProcessingScheduler] ✗ Failed to process session ${result.sessionId}: ${result.error}\n`
);
return result;
});
results.push(promise);
executing.push(promise);
if (executing.length >= maxConcurrency) {
await Promise.race(executing);
executing.splice(
executing.findIndex((p) => p === promise),
1
);
}
}
return Promise.all(results);
}
/**
* Process unprocessed sessions
* @param {number} batchSize Number of sessions to process in one batch (default: all unprocessed)
* @param {number} maxConcurrency Maximum number of concurrent processing tasks (default: 5)
*/
export async function processUnprocessedSessions(batchSize = null, maxConcurrency = 5) {
process.stdout.write(
"[ProcessingScheduler] Starting to process unprocessed sessions...\n"
);
// Find sessions that have messages but haven't been processed
const queryOptions = {
where: {
AND: [
{ messages: { some: {} } }, // Must have messages
{ processed: false }, // Only unprocessed sessions (no longer checking for null)
],
},
include: {
messages: {
orderBy: { order: "asc" },
},
},
};
// Add batch size limit if specified
if (batchSize && batchSize > 0) {
queryOptions.take = batchSize;
}
const sessionsToProcess = await prisma.session.findMany(queryOptions);
// Filter to only sessions that have messages
const sessionsWithMessages = sessionsToProcess.filter(
(session) => session.messages.length > 0
);
if (sessionsWithMessages.length === 0) {
process.stdout.write(
"[ProcessingScheduler] No sessions found requiring processing.\n"
);
return;
}
process.stdout.write(
`[ProcessingScheduler] Found ${sessionsWithMessages.length} sessions to process (max concurrency: ${maxConcurrency}).\n`
);
const startTime = Date.now();
const results = await processSessionsInParallel(sessionsWithMessages, maxConcurrency);
const endTime = Date.now();
const successCount = results.filter((r) => r.success).length;
const errorCount = results.filter((r) => !r.success).length;
process.stdout.write("[ProcessingScheduler] Session processing complete.\n");
process.stdout.write(
`[ProcessingScheduler] Successfully processed: ${successCount} sessions.\n`
);
process.stdout.write(
`[ProcessingScheduler] Failed to process: ${errorCount} sessions.\n`
);
process.stdout.write(
`[ProcessingScheduler] Total processing time: ${((endTime - startTime) / 1000).toFixed(2)}s\n`
);
}
/**
* Start the processing scheduler
*/
export function startProcessingScheduler() {
// Process unprocessed sessions every hour
cron.schedule("0 * * * *", async () => {
try {
await processUnprocessedSessions();
} catch (error) {
process.stderr.write(
`[ProcessingScheduler] Error in scheduler: ${error}\n`
);
}
});
process.stdout.write(
"[ProcessingScheduler] Started processing scheduler (runs hourly).\n"
);
}

View File

@ -1,24 +1,28 @@
// Session processing scheduler - TypeScript version
import cron from "node-cron";
// Note: Disabled due to Next.js compatibility issues
// import cron from "node-cron";
import { PrismaClient } from "@prisma/client";
import fetch from "node-fetch";
import { readFileSync } from "fs";
import { fileURLToPath } from "url";
import { dirname, join } from "path";
import { VALID_CATEGORIES, ValidCategory, SentimentCategory } from "./types";
// Load environment variables from .env.local
const __filename = fileURLToPath(import.meta.url);
const __dirname = dirname(__filename);
const envPath = join(__dirname, '..', '.env.local');
const envPath = join(__dirname, "..", ".env.local");
try {
const envFile = readFileSync(envPath, 'utf8');
const envVars = envFile.split('\n').filter(line => line.trim() && !line.startsWith('#'));
const envFile = readFileSync(envPath, "utf8");
const envVars = envFile
.split("\n")
.filter((line) => line.trim() && !line.startsWith("#"));
envVars.forEach(line => {
const [key, ...valueParts] = line.split('=');
envVars.forEach((line) => {
const [key, ...valueParts] = line.split("=");
if (key && valueParts.length > 0) {
const value = valueParts.join('=').trim();
const value = valueParts.join("=").trim();
if (!process.env[key.trim()]) {
process.env[key.trim()] = value;
}
@ -35,10 +39,10 @@ const OPENAI_API_URL = "https://api.openai.com/v1/chat/completions";
interface ProcessedData {
language: string;
messages_sent: number;
sentiment: "positive" | "neutral" | "negative";
sentiment: SentimentCategory;
escalated: boolean;
forwarded_hr: boolean;
category: string;
category: ValidCategory;
questions: string[];
summary: string;
session_id: string;
@ -53,49 +57,49 @@ interface ProcessingResult {
/**
* Processes a session transcript using OpenAI API
*/
async function processTranscriptWithOpenAI(sessionId: string, transcript: string): Promise<ProcessedData> {
async function processTranscriptWithOpenAI(
sessionId: string,
transcript: string
): Promise<ProcessedData> {
if (!OPENAI_API_KEY) {
throw new Error("OPENAI_API_KEY environment variable is not set");
}
// Create a system message with instructions
const systemMessage = `
You are an AI assistant tasked with analyzing chat transcripts.
Extract the following information from the transcript:
1. The primary language used by the user (ISO 639-1 code)
2. Number of messages sent by the user
3. Overall sentiment (positive, neutral, or negative)
4. Whether the conversation was escalated
5. Whether HR contact was mentioned or provided
6. The best-fitting category for the conversation from this list:
- Schedule & Hours
- Leave & Vacation
- Sick Leave & Recovery
- Salary & Compensation
- Contract & Hours
- Onboarding
- Offboarding
- Workwear & Staff Pass
- Team & Contacts
- Personal Questions
- Access & Login
- Social questions
- Unrecognized / Other
7. Up to 5 paraphrased questions asked by the user (in English)
8. A brief summary of the conversation (10-300 characters)
Return the data in JSON format matching this schema:
{
"language": "ISO 639-1 code",
"messages_sent": number,
"sentiment": "positive|neutral|negative",
"escalated": boolean,
"forwarded_hr": boolean,
"category": "one of the categories listed above",
"questions": ["question 1", "question 2", ...],
"summary": "brief summary",
"session_id": "${sessionId}"
}
System: You are a JSON-generating assistant. Your task is to analyze raw chat transcripts between a user and an assistant and return structured data.
⚠️ IMPORTANT:
- You must return a **single, valid JSON object**.
- Do **not** include markdown formatting, code fences, explanations, or comments.
- The JSON must match the exact structure and constraints described below.
Here is the schema you must follow:
{
"language": "ISO 639-1 code, e.g., 'en', 'nl'",
"messages_sent": "integer, number of messages from the user",
"sentiment": "'positive', 'neutral', or 'negative'",
"escalated": "bool: true if the assistant connected or referred to a human agent, otherwise false",
"forwarded_hr": "bool: true if HR contact info was given, otherwise false",
"category": "one of: 'Schedule & Hours', 'Leave & Vacation', 'Sick Leave & Recovery', 'Salary & Compensation', 'Contract & Hours', 'Onboarding', 'Offboarding', 'Workwear & Staff Pass', 'Team & Contacts', 'Personal Questions', 'Access & Login', 'Social questions', 'Unrecognized / Other'",
"questions": array of simplified questions asked by the user formulated in English, try to make a question out of messages,
"summary": "Brief summary (12 sentences) of the conversation",
}
You must format your output as a JSON value that adheres to a given "JSON Schema" instance.
"JSON Schema" is a declarative language that allows you to annotate and validate JSON documents.
For example, the example "JSON Schema" instance {{"properties": {{"foo": {{"description": "a list of test words", "type": "array", "items": {{"type": "string"}}}}}}, "required": ["foo"]}}}}
would match an object with one required property, "foo". The "type" property specifies "foo" must be an "array", and the "description" property semantically describes it as "a list of test words". The items within "foo" must be strings.
Thus, the object {{"foo": ["bar", "baz"]}} is a well-formatted instance of this example "JSON Schema". The object {{"properties": {{"foo": ["bar", "baz"]}}}} is not well-formatted.
Your output will be parsed and type-checked according to the provided schema instance, so make sure all fields in your output match the schema exactly and there are no trailing commas!
Here is the JSON Schema instance your output must adhere to. Include the enclosing markdown codeblock:
\`\`\`json
{"type":"object","properties":{"language":{"type":"string","pattern":"^[a-z]{2}$","description":"ISO 639-1 code for the user's primary language"},"messages_sent":{"type":"integer","minimum":0,"description":"Number of messages sent by the user"},"sentiment":{"type":"string","enum":["positive","neutral","negative"],"description":"Overall tone of the user during the conversation"},"escalated":{"type":"boolean","description":"Whether the assistant indicated it could not help"},"forwarded_hr":{"type":"boolean","description":"Whether HR contact was mentioned or provided"},"category":{"type":"string","enum":["Schedule & Hours","Leave & Vacation","Sick Leave & Recovery","Salary & Compensation","Contract & Hours","Onboarding","Offboarding","Workwear & Staff Pass","Team & Contacts","Personal Questions","Access & Login","Social questions","Unrecognized / Other"],"description":"Best-fitting topic category for the conversation"},"questions":{"type":"array","items":{"type":"string","minLength":5},"minItems":0,"maxItems":5,"description":"List of paraphrased questions asked by the user in English"},"summary":{"type":"string","minLength":10,"maxLength":300,"description":"Brief summary of the conversation"},"session_id":{"type":"string","pattern":"^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}$","minLength":36,"maxLength":36,"description":"Unique identifier for the conversation session"}},"required":["language","messages_sent","sentiment","escalated","forwarded_hr","category","questions","summary","session_id"],"additionalProperties":false,"$schema":"http://json-schema.org/draft-07/schema#"}
\`\`\`
`;
try {
@ -154,7 +158,6 @@ function validateOpenAIResponse(data: any): void {
"category",
"questions",
"summary",
"session_id",
];
for (const field of requiredFields) {
@ -188,25 +191,9 @@ function validateOpenAIResponse(data: any): void {
throw new Error("Invalid forwarded_hr. Expected boolean");
}
const validCategories = [
"Schedule & Hours",
"Leave & Vacation",
"Sick Leave & Recovery",
"Salary & Compensation",
"Contract & Hours",
"Onboarding",
"Offboarding",
"Workwear & Staff Pass",
"Team & Contacts",
"Personal Questions",
"Access & Login",
"Social questions",
"Unrecognized / Other",
];
if (!validCategories.includes(data.category)) {
if (!VALID_CATEGORIES.includes(data.category)) {
throw new Error(
`Invalid category. Expected one of: ${validCategories.join(", ")}`
`Invalid category. Expected one of: ${VALID_CATEGORIES.join(", ")}`
);
}
@ -224,7 +211,8 @@ function validateOpenAIResponse(data: any): void {
);
}
if (typeof data.session_id !== "string") {
// session_id is optional in the response, we'll use the one we passed in
if (data.session_id && typeof data.session_id !== "string") {
throw new Error("Invalid session_id. Expected string");
}
}
@ -241,6 +229,28 @@ async function processSingleSession(session: any): Promise<ProcessingResult> {
};
}
// Check for minimum data quality requirements
const userMessages = session.messages.filter((msg: any) =>
msg.role.toLowerCase() === 'user' || msg.role.toLowerCase() === 'human'
);
if (userMessages.length === 0) {
// Mark as invalid data - no user interaction
await prisma.session.update({
where: { id: session.id },
data: {
processed: true,
summary: "No user messages found - marked as invalid data",
},
});
return {
sessionId: session.id,
success: true,
error: "No user messages - marked as invalid data",
};
}
try {
// Convert messages back to transcript format for OpenAI processing
const transcript = session.messages
@ -264,12 +274,10 @@ async function processSingleSession(session: any): Promise<ProcessingResult> {
transcript
);
// Map sentiment string to float value for compatibility with existing data
const sentimentMap = {
positive: 0.8,
neutral: 0.0,
negative: -0.8,
};
// Check if the processed data indicates low quality (empty questions, very short summary, etc.)
const hasValidQuestions = processedData.questions && processedData.questions.length > 0;
const hasValidSummary = processedData.summary && processedData.summary.length >= 10;
const isValidData = hasValidQuestions && hasValidSummary;
// Update the session with processed data
await prisma.session.update({
@ -277,7 +285,7 @@ async function processSingleSession(session: any): Promise<ProcessingResult> {
data: {
language: processedData.language,
messagesSent: processedData.messages_sent,
sentiment: sentimentMap[processedData.sentiment] || 0,
sentiment: null, // Remove numeric sentiment, use only sentimentCategory
sentimentCategory: processedData.sentiment,
escalated: processedData.escalated,
forwardedHr: processedData.forwarded_hr,
@ -288,6 +296,12 @@ async function processSingleSession(session: any): Promise<ProcessingResult> {
},
});
if (!isValidData) {
process.stdout.write(
`[ProcessingScheduler] ⚠️ Session ${session.id} marked as invalid data (empty questions or short summary)\n`
);
}
return {
sessionId: session.id,
success: true,
@ -304,7 +318,10 @@ async function processSingleSession(session: any): Promise<ProcessingResult> {
/**
* Process sessions in parallel with concurrency limit
*/
async function processSessionsInParallel(sessions: any[], maxConcurrency: number = 5): Promise<ProcessingResult[]> {
async function processSessionsInParallel(
sessions: any[],
maxConcurrency: number = 5
): Promise<ProcessingResult[]> {
const results: Promise<ProcessingResult>[] = [];
const executing: Promise<ProcessingResult>[] = [];
@ -323,7 +340,7 @@ async function processSessionsInParallel(sessions: any[], maxConcurrency: number
if (executing.length >= maxConcurrency) {
await Promise.race(executing);
const completedIndex = executing.findIndex(p => p === promise);
const completedIndex = executing.findIndex((p) => p === promise);
if (completedIndex !== -1) {
executing.splice(completedIndex, 1);
}
@ -334,75 +351,104 @@ async function processSessionsInParallel(sessions: any[], maxConcurrency: number
}
/**
* Process unprocessed sessions
* Process unprocessed sessions in batches until completion
*/
export async function processUnprocessedSessions(batchSize: number | null = null, maxConcurrency: number = 5): Promise<void> {
export async function processUnprocessedSessions(
batchSize: number = 10,
maxConcurrency: number = 5
): Promise<{ totalProcessed: number; totalFailed: number; totalTime: number }> {
process.stdout.write(
"[ProcessingScheduler] Starting to process unprocessed sessions...\n"
"[ProcessingScheduler] Starting complete processing of all unprocessed sessions...\n"
);
// Find sessions that have messages but haven't been processed
const queryOptions: any = {
where: {
AND: [
{ messages: { some: {} } }, // Must have messages
{ processed: false }, // Only unprocessed sessions
],
},
include: {
messages: {
orderBy: { order: "asc" },
let totalProcessed = 0;
let totalFailed = 0;
const overallStartTime = Date.now();
let batchNumber = 1;
while (true) {
// Find sessions that have messages but haven't been processed
const sessionsToProcess = await prisma.session.findMany({
where: {
AND: [
{ messages: { some: {} } }, // Must have messages
{ processed: false }, // Only unprocessed sessions
],
},
},
};
include: {
messages: {
orderBy: { order: "asc" },
},
},
take: batchSize,
});
// Add batch size limit if specified
if (batchSize && batchSize > 0) {
queryOptions.take = batchSize;
}
const sessionsToProcess = await prisma.session.findMany(queryOptions);
// Filter to only sessions that have messages
const sessionsWithMessages = sessionsToProcess.filter(
(session: any) => session.messages && session.messages.length > 0
);
if (sessionsWithMessages.length === 0) {
process.stdout.write(
"[ProcessingScheduler] No sessions found requiring processing.\n"
// Filter to only sessions that have messages
const sessionsWithMessages = sessionsToProcess.filter(
(session: any) => session.messages && session.messages.length > 0
);
return;
if (sessionsWithMessages.length === 0) {
process.stdout.write(
"[ProcessingScheduler] ✅ All sessions with messages have been processed!\n"
);
break;
}
process.stdout.write(
`[ProcessingScheduler] 📦 Batch ${batchNumber}: Processing ${sessionsWithMessages.length} sessions (max concurrency: ${maxConcurrency})...\n`
);
const batchStartTime = Date.now();
const results = await processSessionsInParallel(
sessionsWithMessages,
maxConcurrency
);
const batchEndTime = Date.now();
const batchSuccessCount = results.filter((r) => r.success).length;
const batchErrorCount = results.filter((r) => !r.success).length;
totalProcessed += batchSuccessCount;
totalFailed += batchErrorCount;
process.stdout.write(
`[ProcessingScheduler] 📦 Batch ${batchNumber} complete: ${batchSuccessCount} success, ${batchErrorCount} failed (${((batchEndTime - batchStartTime) / 1000).toFixed(2)}s)\n`
);
batchNumber++;
// Small delay between batches to prevent overwhelming the system
if (sessionsWithMessages.length === batchSize) {
await new Promise(resolve => setTimeout(resolve, 1000));
}
}
const overallEndTime = Date.now();
const totalTime = (overallEndTime - overallStartTime) / 1000;
process.stdout.write("[ProcessingScheduler] 🎉 Complete processing finished!\n");
process.stdout.write(
`[ProcessingScheduler] Found ${sessionsWithMessages.length} sessions to process (max concurrency: ${maxConcurrency}).\n`
`[ProcessingScheduler] 📊 Total results: ${totalProcessed} processed, ${totalFailed} failed\n`
);
process.stdout.write(
`[ProcessingScheduler] ⏱️ Total processing time: ${totalTime.toFixed(2)}s\n`
);
const startTime = Date.now();
const results = await processSessionsInParallel(sessionsWithMessages, maxConcurrency);
const endTime = Date.now();
const successCount = results.filter((r) => r.success).length;
const errorCount = results.filter((r) => !r.success).length;
process.stdout.write("[ProcessingScheduler] Session processing complete.\n");
process.stdout.write(
`[ProcessingScheduler] Successfully processed: ${successCount} sessions.\n`
);
process.stdout.write(
`[ProcessingScheduler] Failed to process: ${errorCount} sessions.\n`
);
process.stdout.write(
`[ProcessingScheduler] Total processing time: ${((endTime - startTime) / 1000).toFixed(2)}s\n`
);
return { totalProcessed, totalFailed, totalTime };
}
/**
* Start the processing scheduler
*/
export function startProcessingScheduler(): void {
// Process unprocessed sessions every hour
// Note: Scheduler disabled due to Next.js compatibility issues
// Use manual triggers via API endpoints instead
console.log("Processing scheduler disabled - using manual triggers via API endpoints");
// Original cron-based implementation commented out due to Next.js compatibility issues
// The functionality is now available via the /api/admin/trigger-processing endpoint
/*
cron.schedule("0 * * * *", async () => {
try {
await processUnprocessedSessions();
@ -416,4 +462,5 @@ export function startProcessingScheduler(): void {
process.stdout.write(
"[ProcessingScheduler] Started processing scheduler (runs hourly).\n"
);
*/
}

View File

@ -0,0 +1,437 @@
// Session processing without cron dependency - for Next.js API routes
import { PrismaClient } from "@prisma/client";
import fetch from "node-fetch";
import { readFileSync } from "fs";
import { fileURLToPath } from "url";
import { dirname, join } from "path";
import { VALID_CATEGORIES, ValidCategory, SentimentCategory } from "./types";
// Load environment variables from .env.local
const __filename = fileURLToPath(import.meta.url);
const __dirname = dirname(__filename);
const envPath = join(__dirname, "..", ".env.local");
try {
const envFile = readFileSync(envPath, "utf8");
const envVars = envFile
.split("\n")
.filter((line) => line.trim() && !line.startsWith("#"));
envVars.forEach((line) => {
const [key, ...valueParts] = line.split("=");
if (key && valueParts.length > 0) {
const value = valueParts.join("=").trim();
if (!process.env[key.trim()]) {
process.env[key.trim()] = value;
}
}
});
} catch (error) {
// Silently fail if .env.local doesn't exist
}
const prisma = new PrismaClient();
const OPENAI_API_KEY = process.env.OPENAI_API_KEY;
const OPENAI_API_URL = "https://api.openai.com/v1/chat/completions";
interface ProcessedData {
language: string;
messages_sent: number;
sentiment: SentimentCategory;
escalated: boolean;
forwarded_hr: boolean;
category: ValidCategory;
questions: string[];
summary: string;
session_id: string;
}
interface ProcessingResult {
sessionId: string;
success: boolean;
error?: string;
}
/**
* Processes a session transcript using OpenAI API
*/
async function processTranscriptWithOpenAI(
sessionId: string,
transcript: string
): Promise<ProcessedData> {
if (!OPENAI_API_KEY) {
throw new Error("OPENAI_API_KEY environment variable is not set");
}
// Create a system message with instructions
const systemMessage = `
System: You are a JSON-generating assistant. Your task is to analyze raw chat transcripts between a user and an assistant and return structured data.
⚠️ IMPORTANT:
- You must return a **single, valid JSON object**.
- Do **not** include markdown formatting, code fences, explanations, or comments.
- The JSON must match the exact structure and constraints described below.
Here is the schema you must follow:
{
"language": "ISO 639-1 code, e.g., 'en', 'nl'",
"messages_sent": "integer, number of messages from the user",
"sentiment": "'positive', 'neutral', or 'negative'",
"escalated": "bool: true if the assistant connected or referred to a human agent, otherwise false",
"forwarded_hr": "bool: true if HR contact info was given, otherwise false",
"category": "one of: 'Schedule & Hours', 'Leave & Vacation', 'Sick Leave & Recovery', 'Salary & Compensation', 'Contract & Hours', 'Onboarding', 'Offboarding', 'Workwear & Staff Pass', 'Team & Contacts', 'Personal Questions', 'Access & Login', 'Social questions', 'Unrecognized / Other'",
"questions": array of simplified questions asked by the user formulated in English, try to make a question out of messages,
"summary": "Brief summary (12 sentences) of the conversation",
}
You must format your output as a JSON value that adheres to a given "JSON Schema" instance.
"JSON Schema" is a declarative language that allows you to annotate and validate JSON documents.
For example, the example "JSON Schema" instance {{"properties": {{"foo": {{"description": "a list of test words", "type": "array", "items": {{"type": "string"}}}}}}, "required": ["foo"]}}}}
would match an object with one required property, "foo". The "type" property specifies "foo" must be an "array", and the "description" property semantically describes it as "a list of test words". The items within "foo" must be strings.
Thus, the object {{"foo": ["bar", "baz"]}} is a well-formatted instance of this example "JSON Schema". The object {{"properties": {{"foo": ["bar", "baz"]}}}} is not well-formatted.
Your output will be parsed and type-checked according to the provided schema instance, so make sure all fields in your output match the schema exactly and there are no trailing commas!
Here is the JSON Schema instance your output must adhere to. Include the enclosing markdown codeblock:
\`\`\`json
{"type":"object","properties":{"language":{"type":"string","pattern":"^[a-z]{2}$","description":"ISO 639-1 code for the user's primary language"},"messages_sent":{"type":"integer","minimum":0,"description":"Number of messages sent by the user"},"sentiment":{"type":"string","enum":["positive","neutral","negative"],"description":"Overall tone of the user during the conversation"},"escalated":{"type":"boolean","description":"Whether the assistant indicated it could not help"},"forwarded_hr":{"type":"boolean","description":"Whether HR contact was mentioned or provided"},"category":{"type":"string","enum":["Schedule & Hours","Leave & Vacation","Sick Leave & Recovery","Salary & Compensation","Contract & Hours","Onboarding","Offboarding","Workwear & Staff Pass","Team & Contacts","Personal Questions","Access & Login","Social questions","Unrecognized / Other"],"description":"Best-fitting topic category for the conversation"},"questions":{"type":"array","items":{"type":"string","minLength":5},"minItems":0,"maxItems":5,"description":"List of paraphrased questions asked by the user in English"},"summary":{"type":"string","minLength":10,"maxLength":300,"description":"Brief summary of the conversation"},"session_id":{"type":"string","pattern":"^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}$","minLength":36,"maxLength":36,"description":"Unique identifier for the conversation session"}},"required":["language","messages_sent","sentiment","escalated","forwarded_hr","category","questions","summary","session_id"],"additionalProperties":false,"$schema":"http://json-schema.org/draft-07/schema#"}
\`\`\`
`;
try {
const response = await fetch(OPENAI_API_URL, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${OPENAI_API_KEY}`,
},
body: JSON.stringify({
model: "gpt-4-turbo",
messages: [
{
role: "system",
content: systemMessage,
},
{
role: "user",
content: transcript,
},
],
temperature: 0.3, // Lower temperature for more consistent results
response_format: { type: "json_object" },
}),
});
if (!response.ok) {
const errorText = await response.text();
throw new Error(`OpenAI API error: ${response.status} - ${errorText}`);
}
const data: any = await response.json();
const processedData = JSON.parse(data.choices[0].message.content);
// Validate the response against our expected schema
validateOpenAIResponse(processedData);
return processedData;
} catch (error) {
process.stderr.write(`Error processing transcript with OpenAI: ${error}\n`);
throw error;
}
}
/**
* Validates the OpenAI response against our expected schema
*/
function validateOpenAIResponse(data: any): void {
// Check required fields
const requiredFields = [
"language",
"messages_sent",
"sentiment",
"escalated",
"forwarded_hr",
"category",
"questions",
"summary",
];
for (const field of requiredFields) {
if (!(field in data)) {
throw new Error(`Missing required field: ${field}`);
}
}
// Validate field types
if (typeof data.language !== "string" || !/^[a-z]{2}$/.test(data.language)) {
throw new Error(
"Invalid language format. Expected ISO 639-1 code (e.g., 'en')"
);
}
if (typeof data.messages_sent !== "number" || data.messages_sent < 0) {
throw new Error("Invalid messages_sent. Expected non-negative number");
}
if (!["positive", "neutral", "negative"].includes(data.sentiment)) {
throw new Error(
"Invalid sentiment. Expected 'positive', 'neutral', or 'negative'"
);
}
if (typeof data.escalated !== "boolean") {
throw new Error("Invalid escalated. Expected boolean");
}
if (typeof data.forwarded_hr !== "boolean") {
throw new Error("Invalid forwarded_hr. Expected boolean");
}
if (!VALID_CATEGORIES.includes(data.category)) {
throw new Error(
`Invalid category. Expected one of: ${VALID_CATEGORIES.join(", ")}`
);
}
if (!Array.isArray(data.questions)) {
throw new Error("Invalid questions. Expected array of strings");
}
if (
typeof data.summary !== "string" ||
data.summary.length < 10 ||
data.summary.length > 300
) {
throw new Error(
"Invalid summary. Expected string between 10-300 characters"
);
}
// session_id is optional in the response, we'll use the one we passed in
if (data.session_id && typeof data.session_id !== "string") {
throw new Error("Invalid session_id. Expected string");
}
}
/**
* Process a single session
*/
async function processSingleSession(session: any): Promise<ProcessingResult> {
if (session.messages.length === 0) {
return {
sessionId: session.id,
success: false,
error: "Session has no messages",
};
}
// Check for minimum data quality requirements
const userMessages = session.messages.filter((msg: any) =>
msg.role.toLowerCase() === 'user' || msg.role.toLowerCase() === 'human'
);
if (userMessages.length === 0) {
// Mark as invalid data - no user interaction
await prisma.session.update({
where: { id: session.id },
data: {
processed: true,
summary: "No user messages found - marked as invalid data",
},
});
return {
sessionId: session.id,
success: true,
error: "No user messages - marked as invalid data",
};
}
try {
// Convert messages back to transcript format for OpenAI processing
const transcript = session.messages
.map(
(msg: any) =>
`[${new Date(msg.timestamp)
.toLocaleString("en-GB", {
day: "2-digit",
month: "2-digit",
year: "numeric",
hour: "2-digit",
minute: "2-digit",
second: "2-digit",
})
.replace(",", "")}] ${msg.role}: ${msg.content}`
)
.join("\n");
const processedData = await processTranscriptWithOpenAI(
session.id,
transcript
);
// Check if the processed data indicates low quality (empty questions, very short summary, etc.)
const hasValidQuestions = processedData.questions && processedData.questions.length > 0;
const hasValidSummary = processedData.summary && processedData.summary.length >= 10;
const isValidData = hasValidQuestions && hasValidSummary;
// Update the session with processed data
await prisma.session.update({
where: { id: session.id },
data: {
language: processedData.language,
messagesSent: processedData.messages_sent,
sentiment: null, // Remove numeric sentiment, use only sentimentCategory
sentimentCategory: processedData.sentiment,
escalated: processedData.escalated,
forwardedHr: processedData.forwarded_hr,
category: processedData.category,
questions: JSON.stringify(processedData.questions),
summary: processedData.summary,
processed: true,
},
});
if (!isValidData) {
process.stdout.write(
`[ProcessingScheduler] ⚠️ Session ${session.id} marked as invalid data (empty questions or short summary)\n`
);
}
return {
sessionId: session.id,
success: true,
};
} catch (error) {
return {
sessionId: session.id,
success: false,
error: error instanceof Error ? error.message : String(error),
};
}
}
/**
* Process sessions in parallel with concurrency limit
*/
async function processSessionsInParallel(
sessions: any[],
maxConcurrency: number = 5
): Promise<ProcessingResult[]> {
const results: Promise<ProcessingResult>[] = [];
const executing: Promise<ProcessingResult>[] = [];
for (const session of sessions) {
const promise = processSingleSession(session).then((result) => {
process.stdout.write(
result.success
? `[ProcessingScheduler] ✓ Successfully processed session ${result.sessionId}\n`
: `[ProcessingScheduler] ✗ Failed to process session ${result.sessionId}: ${result.error}\n`
);
return result;
});
results.push(promise);
executing.push(promise);
if (executing.length >= maxConcurrency) {
await Promise.race(executing);
const completedIndex = executing.findIndex((p) => p === promise);
if (completedIndex !== -1) {
executing.splice(completedIndex, 1);
}
}
}
return Promise.all(results);
}
/**
* Process unprocessed sessions in batches until completion
*/
export async function processUnprocessedSessions(
batchSize: number = 10,
maxConcurrency: number = 5
): Promise<{ totalProcessed: number; totalFailed: number; totalTime: number }> {
process.stdout.write(
"[ProcessingScheduler] Starting complete processing of all unprocessed sessions...\n"
);
let totalProcessed = 0;
let totalFailed = 0;
const overallStartTime = Date.now();
let batchNumber = 1;
while (true) {
// Find sessions that have messages but haven't been processed
const sessionsToProcess = await prisma.session.findMany({
where: {
AND: [
{ messages: { some: {} } }, // Must have messages
{ processed: false }, // Only unprocessed sessions
],
},
include: {
messages: {
orderBy: { order: "asc" },
},
},
take: batchSize,
});
// Filter to only sessions that have messages
const sessionsWithMessages = sessionsToProcess.filter(
(session: any) => session.messages && session.messages.length > 0
);
if (sessionsWithMessages.length === 0) {
process.stdout.write(
"[ProcessingScheduler] ✅ All sessions with messages have been processed!\n"
);
break;
}
process.stdout.write(
`[ProcessingScheduler] 📦 Batch ${batchNumber}: Processing ${sessionsWithMessages.length} sessions (max concurrency: ${maxConcurrency})...\n`
);
const batchStartTime = Date.now();
const results = await processSessionsInParallel(
sessionsWithMessages,
maxConcurrency
);
const batchEndTime = Date.now();
const batchSuccessCount = results.filter((r) => r.success).length;
const batchErrorCount = results.filter((r) => !r.success).length;
totalProcessed += batchSuccessCount;
totalFailed += batchErrorCount;
process.stdout.write(
`[ProcessingScheduler] 📦 Batch ${batchNumber} complete: ${batchSuccessCount} success, ${batchErrorCount} failed (${((batchEndTime - batchStartTime) / 1000).toFixed(2)}s)\n`
);
batchNumber++;
// Small delay between batches to prevent overwhelming the system
if (sessionsWithMessages.length === batchSize) {
await new Promise(resolve => setTimeout(resolve, 1000));
}
}
const overallEndTime = Date.now();
const totalTime = (overallEndTime - overallStartTime) / 1000;
process.stdout.write("[ProcessingScheduler] 🎉 Complete processing finished!\n");
process.stdout.write(
`[ProcessingScheduler] 📊 Total results: ${totalProcessed} processed, ${totalFailed} failed\n`
);
process.stdout.write(
`[ProcessingScheduler] ⏱️ Total processing time: ${totalTime.toFixed(2)}s\n`
);
return { totalProcessed, totalFailed, totalTime };
}

View File

@ -1,37 +0,0 @@
// Session refresh scheduler - JavaScript version
import cron from "node-cron";
import { PrismaClient } from "@prisma/client";
import { fetchAndStoreSessionsForAllCompanies } from "./csvFetcher.js";
const prisma = new PrismaClient();
/**
* Refresh sessions for all companies
*/
async function refreshSessions() {
console.log("[Scheduler] Starting session refresh...");
try {
await fetchAndStoreSessionsForAllCompanies();
console.log("[Scheduler] Session refresh completed successfully.");
} catch (error) {
console.error("[Scheduler] Error during session refresh:", error);
}
}
/**
* Start the session refresh scheduler
*/
export function startScheduler() {
// Run every 15 minutes
cron.schedule("*/15 * * * *", async () => {
try {
await refreshSessions();
} catch (error) {
console.error("[Scheduler] Error in scheduler:", error);
}
});
console.log(
"[Scheduler] Started session refresh scheduler (runs every 15 minutes)."
);
}

View File

@ -1,5 +1,6 @@
// node-cron job to auto-refresh session data every 15 mins
import cron from "node-cron";
// Note: Disabled due to Next.js compatibility issues
// import cron from "node-cron";
import { prisma } from "./prisma";
import { fetchAndParseCsv } from "./csvFetcher";
@ -11,68 +12,10 @@ interface SessionCreateData {
}
export function startScheduler() {
cron.schedule("*/15 * * * *", async () => {
const companies = await prisma.company.findMany();
for (const company of companies) {
try {
const sessions = await fetchAndParseCsv(
company.csvUrl,
company.csvUsername as string | undefined,
company.csvPassword as string | undefined
);
// Only add sessions that don't already exist in the database
for (const session of sessions) {
const sessionData: SessionCreateData = {
...session,
companyId: company.id,
id: session.id || session.sessionId || `sess_${Date.now()}`,
// Ensure startTime is not undefined
startTime: session.startTime || new Date(),
};
// Note: Scheduler disabled due to Next.js compatibility issues
// Use manual triggers via API endpoints instead
console.log("Session refresh scheduler disabled - using manual triggers via API endpoints");
// Check if the session already exists
const existingSession = await prisma.session.findUnique({
where: { id: sessionData.id },
});
if (existingSession) {
// Skip this session as it already exists
continue;
}
// Only include fields that are properly typed for Prisma
await prisma.session.create({
data: {
id: sessionData.id,
companyId: sessionData.companyId,
startTime: sessionData.startTime,
// endTime is required in the schema, so use startTime if not available
endTime: session.endTime || new Date(),
ipAddress: session.ipAddress || null,
country: session.country || null,
language: session.language || null,
sentiment:
typeof session.sentiment === "number"
? session.sentiment
: null,
messagesSent:
typeof session.messagesSent === "number"
? session.messagesSent
: 0,
category: session.category || null,
},
});
}
// Using process.stdout.write instead of console.log to avoid ESLint warning
process.stdout.write(
`[Scheduler] Refreshed sessions for company: ${company.name}\n`
);
} catch (e) {
// Using process.stderr.write instead of console.error to avoid ESLint warning
process.stderr.write(
`[Scheduler] Failed for company: ${company.name} - ${e}\n`
);
}
}
});
// Original cron-based implementation commented out due to Next.js compatibility issues
// The functionality is now available via the /api/admin/refresh-sessions endpoint
}

View File

@ -1,6 +1,7 @@
// Combined scheduler initialization
import { startScheduler } from "./scheduler";
import { startProcessingScheduler } from "./processingScheduler";
// Note: Removed cron-based scheduler imports to avoid Next.js compatibility issues
// import { startScheduler } from "./scheduler";
// import { startProcessingScheduler } from "./processingScheduler";
/**
* Initialize all schedulers
@ -8,11 +9,9 @@ import { startProcessingScheduler } from "./processingScheduler";
* - Session processing scheduler (runs every hour)
*/
export function initializeSchedulers() {
// Start the session refresh scheduler
startScheduler();
// Start the session processing scheduler
startProcessingScheduler();
console.log("All schedulers initialized successfully");
// Note: All schedulers disabled due to Next.js compatibility issues
// Use manual triggers via API endpoints instead
console.log("Schedulers disabled - using manual triggers via API endpoints");
// startScheduler();
// startProcessingScheduler();
}

View File

@ -102,7 +102,9 @@ export async function storeMessagesForSession(sessionId, messages) {
// Extract actual end time from the latest message
const latestMessage = messages.reduce((latest, current) => {
return new Date(current.timestamp) > new Date(latest.timestamp) ? current : latest;
return new Date(current.timestamp) > new Date(latest.timestamp)
? current
: latest;
});
// Update the session's endTime with the actual conversation end time

View File

@ -1,5 +1,26 @@
import { Session as NextAuthSession } from "next-auth";
// Standardized enums
export type SentimentCategory = "positive" | "neutral" | "negative";
export const VALID_CATEGORIES = [
"Schedule & Hours",
"Leave & Vacation",
"Sick Leave & Recovery",
"Salary & Compensation",
"Contract & Hours",
"Onboarding",
"Offboarding",
"Workwear & Staff Pass",
"Team & Contacts",
"Personal Questions",
"Access & Login",
"Social questions",
"Unrecognized / Other",
] as const;
export type ValidCategory = (typeof VALID_CATEGORIES)[number];
export interface UserSession extends NextAuthSession {
user: {
id?: string;
@ -71,6 +92,7 @@ export interface ChatSession {
initialMsg?: string;
fullTranscriptUrl?: string | null;
processed?: boolean | null; // Flag for post-processing status
validData?: boolean | null; // Flag for data quality (false = exclude from analytics)
questions?: string | null; // JSON array of questions asked by user
summary?: string | null; // Brief summary of the conversation
messages?: Message[]; // Parsed messages from transcript