mirror of
https://github.com/kjanat/livedash-node.git
synced 2026-01-16 07:52:10 +01:00
feat: Implement structured message parsing and display in MessageViewer component
- Added MessageViewer component to display parsed messages in a chat-like format. - Introduced new Message table in the database to store individual messages with timestamps, roles, and content. - Updated Session model to include a relation to parsed messages. - Created transcript parsing logic to convert raw transcripts into structured messages. - Enhanced processing scheduler to handle sessions with parsed messages. - Updated API endpoints to return parsed messages alongside session details. - Added manual trigger commands for session refresh, transcript parsing, and processing. - Improved user experience with color-coded message roles and timestamps in the UI. - Documented the new scheduler workflow and transcript parsing implementation.
This commit is contained in:
@ -407,7 +407,7 @@ async function fetchTranscriptContent(url, username, password) {
|
||||
headers: authHeader ? { Authorization: authHeader } : {},
|
||||
timeout: 10000, // 10 second timeout
|
||||
});
|
||||
|
||||
|
||||
if (!response.ok) {
|
||||
// Only log error once per batch, not for every transcript
|
||||
if (Math.random() < 0.1) { // Log ~10% of errors to avoid spam
|
||||
@ -502,7 +502,7 @@ export async function fetchAndStoreSessionsForAllCompanies() {
|
||||
try {
|
||||
// Get all companies
|
||||
const companies = await prisma.company.findMany();
|
||||
|
||||
|
||||
for (const company of companies) {
|
||||
if (!company.csvUrl) {
|
||||
console.log(`[Scheduler] Skipping company ${company.id} - no CSV URL configured`);
|
||||
@ -516,7 +516,7 @@ export async function fetchAndStoreSessionsForAllCompanies() {
|
||||
}
|
||||
|
||||
console.log(`[Scheduler] Processing sessions for company: ${company.id}`);
|
||||
|
||||
|
||||
try {
|
||||
const sessions = await fetchAndParseCsv(
|
||||
company.csvUrl,
|
||||
@ -603,7 +603,7 @@ export async function fetchAndStoreSessionsForAllCompanies() {
|
||||
initialMsg: session.initialMsg || null,
|
||||
},
|
||||
});
|
||||
|
||||
|
||||
addedCount++;
|
||||
}
|
||||
|
||||
|
||||
@ -181,46 +181,59 @@ function validateOpenAIResponse(data) {
|
||||
/**
|
||||
* Process unprocessed sessions
|
||||
*/
|
||||
async function processUnprocessedSessions() {
|
||||
export async function processUnprocessedSessions() {
|
||||
process.stdout.write("[ProcessingScheduler] Starting to process unprocessed sessions...\n");
|
||||
|
||||
// Find sessions that have transcript content but haven't been processed
|
||||
// Find sessions that have messages but haven't been processed
|
||||
const sessionsToProcess = await prisma.session.findMany({
|
||||
where: {
|
||||
AND: [
|
||||
{ transcriptContent: { not: null } },
|
||||
{ transcriptContent: { not: "" } },
|
||||
{ processed: { not: true } }, // Either false or null
|
||||
],
|
||||
},
|
||||
select: {
|
||||
id: true,
|
||||
transcriptContent: true,
|
||||
include: {
|
||||
messages: {
|
||||
orderBy: { order: 'asc' }
|
||||
}
|
||||
},
|
||||
take: 10, // Process in batches to avoid overloading the system
|
||||
});
|
||||
|
||||
if (sessionsToProcess.length === 0) {
|
||||
// Filter to only sessions that have messages
|
||||
const sessionsWithMessages = sessionsToProcess.filter(session => session.messages.length > 0);
|
||||
|
||||
if (sessionsWithMessages.length === 0) {
|
||||
process.stdout.write("[ProcessingScheduler] No sessions found requiring processing.\n");
|
||||
return;
|
||||
}
|
||||
|
||||
process.stdout.write(`[ProcessingScheduler] Found ${sessionsToProcess.length} sessions to process.\n`);
|
||||
process.stdout.write(`[ProcessingScheduler] Found ${sessionsWithMessages.length} sessions to process.\n`);
|
||||
let successCount = 0;
|
||||
let errorCount = 0;
|
||||
|
||||
for (const session of sessionsToProcess) {
|
||||
if (!session.transcriptContent) {
|
||||
// Should not happen due to query, but good for type safety
|
||||
process.stderr.write(`[ProcessingScheduler] Session ${session.id} has no transcript content, skipping.\n`);
|
||||
for (const session of sessionsWithMessages) {
|
||||
if (session.messages.length === 0) {
|
||||
process.stderr.write(`[ProcessingScheduler] Session ${session.id} has no messages, skipping.\n`);
|
||||
continue;
|
||||
}
|
||||
|
||||
process.stdout.write(`[ProcessingScheduler] Processing transcript for session ${session.id}...\n`);
|
||||
process.stdout.write(`[ProcessingScheduler] Processing messages for session ${session.id}...\n`);
|
||||
try {
|
||||
// Convert messages back to transcript format for OpenAI processing
|
||||
const transcript = session.messages.map(msg =>
|
||||
`[${new Date(msg.timestamp).toLocaleString('en-GB', {
|
||||
day: '2-digit',
|
||||
month: '2-digit',
|
||||
year: 'numeric',
|
||||
hour: '2-digit',
|
||||
minute: '2-digit',
|
||||
second: '2-digit'
|
||||
}).replace(',', '')}] ${msg.role}: ${msg.content}`
|
||||
).join('\n');
|
||||
|
||||
const processedData = await processTranscriptWithOpenAI(
|
||||
session.id,
|
||||
session.transcriptContent
|
||||
transcript
|
||||
);
|
||||
|
||||
// Map sentiment string to float value for compatibility with existing data
|
||||
|
||||
@ -10,9 +10,9 @@ import { startProcessingScheduler } from "./processingScheduler";
|
||||
export function initializeSchedulers() {
|
||||
// Start the session refresh scheduler
|
||||
startScheduler();
|
||||
|
||||
|
||||
// Start the session processing scheduler
|
||||
startProcessingScheduler();
|
||||
|
||||
|
||||
console.log("All schedulers initialized successfully");
|
||||
}
|
||||
|
||||
209
lib/transcriptParser.js
Normal file
209
lib/transcriptParser.js
Normal file
@ -0,0 +1,209 @@
|
||||
// Transcript parser utility - converts raw transcript text to structured messages
|
||||
import { PrismaClient } from "@prisma/client";
|
||||
|
||||
const prisma = new PrismaClient();
|
||||
|
||||
/**
|
||||
* Parses chat log string to JSON format with individual messages
|
||||
* @param {string} logString - Raw transcript content
|
||||
* @returns {Object} Parsed data with messages array and metadata
|
||||
*/
|
||||
export function parseChatLogToJSON(logString) {
|
||||
// Convert to string if it's not already
|
||||
const stringData = typeof logString === 'string' ? logString : String(logString);
|
||||
|
||||
// Split by lines and filter out empty lines
|
||||
const lines = stringData.split('\n').filter(line => line.trim() !== '');
|
||||
|
||||
const messages = [];
|
||||
let currentMessage = null;
|
||||
|
||||
for (const line of lines) {
|
||||
// Check if line starts with a timestamp pattern [DD.MM.YYYY HH:MM:SS]
|
||||
const timestampMatch = line.match(/^\[(\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})\] (.+?): (.*)$/);
|
||||
|
||||
if (timestampMatch) {
|
||||
// If we have a previous message, push it to the array
|
||||
if (currentMessage) {
|
||||
messages.push(currentMessage);
|
||||
}
|
||||
|
||||
// Parse the timestamp
|
||||
const [, timestamp, sender, content] = timestampMatch;
|
||||
|
||||
// Convert DD.MM.YYYY HH:MM:SS to ISO format
|
||||
const [datePart, timePart] = timestamp.split(' ');
|
||||
const [day, month, year] = datePart.split('.');
|
||||
const [hour, minute, second] = timePart.split(':');
|
||||
|
||||
const dateObject = new Date(year, month - 1, day, hour, minute, second);
|
||||
|
||||
// Create new message object
|
||||
currentMessage = {
|
||||
timestamp: dateObject.toISOString(),
|
||||
role: sender,
|
||||
content: content
|
||||
};
|
||||
} else if (currentMessage) {
|
||||
// This is a continuation of the previous message (multiline)
|
||||
currentMessage.content += '\n' + line;
|
||||
}
|
||||
}
|
||||
|
||||
// Don't forget the last message
|
||||
if (currentMessage) {
|
||||
messages.push(currentMessage);
|
||||
}
|
||||
|
||||
return {
|
||||
messages: messages.sort((a, b) => {
|
||||
// First sort by timestamp (ascending)
|
||||
const timeComparison = new Date(a.timestamp) - new Date(b.timestamp);
|
||||
if (timeComparison !== 0) {
|
||||
return timeComparison;
|
||||
}
|
||||
|
||||
// If timestamps are equal, sort by role (descending)
|
||||
// This puts "User" before "Assistant" when timestamps are the same
|
||||
return b.role.localeCompare(a.role);
|
||||
}),
|
||||
totalMessages: messages.length
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Stores parsed messages in the database for a session
|
||||
* @param {string} sessionId - The session ID
|
||||
* @param {Array} messages - Array of parsed message objects
|
||||
*/
|
||||
export async function storeMessagesForSession(sessionId, messages) {
|
||||
try {
|
||||
// First, delete any existing messages for this session
|
||||
await prisma.message.deleteMany({
|
||||
where: { sessionId }
|
||||
});
|
||||
|
||||
// Then insert the new messages
|
||||
const messageData = messages.map((message, index) => ({
|
||||
sessionId,
|
||||
timestamp: new Date(message.timestamp),
|
||||
role: message.role,
|
||||
content: message.content,
|
||||
order: index
|
||||
}));
|
||||
|
||||
if (messageData.length > 0) {
|
||||
await prisma.message.createMany({
|
||||
data: messageData
|
||||
});
|
||||
}
|
||||
|
||||
process.stdout.write(`[TranscriptParser] Stored ${messageData.length} messages for session ${sessionId}\n`);
|
||||
return messageData.length;
|
||||
} catch (error) {
|
||||
process.stderr.write(`[TranscriptParser] Error storing messages for session ${sessionId}: ${error}\n`);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Processes and stores transcript for a single session
|
||||
* @param {string} sessionId - The session ID
|
||||
* @param {string} transcriptContent - Raw transcript content
|
||||
* @returns {Promise<Object>} Processing result with message count
|
||||
*/
|
||||
export async function processTranscriptForSession(sessionId, transcriptContent) {
|
||||
if (!transcriptContent || transcriptContent.trim() === '') {
|
||||
throw new Error('No transcript content provided');
|
||||
}
|
||||
|
||||
try {
|
||||
// Parse the transcript
|
||||
const parsed = parseChatLogToJSON(transcriptContent);
|
||||
|
||||
// Store messages in database
|
||||
const messageCount = await storeMessagesForSession(sessionId, parsed.messages);
|
||||
|
||||
return {
|
||||
sessionId,
|
||||
messageCount,
|
||||
totalMessages: parsed.totalMessages,
|
||||
success: true
|
||||
};
|
||||
} catch (error) {
|
||||
process.stderr.write(`[TranscriptParser] Error processing transcript for session ${sessionId}: ${error}\n`);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Processes transcripts for all sessions that have transcript content but no parsed messages
|
||||
*/
|
||||
export async function processAllUnparsedTranscripts() {
|
||||
process.stdout.write("[TranscriptParser] Starting to process unparsed transcripts...\n");
|
||||
|
||||
try {
|
||||
// Find sessions with transcript content but no messages
|
||||
const sessionsToProcess = await prisma.session.findMany({
|
||||
where: {
|
||||
AND: [
|
||||
{ transcriptContent: { not: null } },
|
||||
{ transcriptContent: { not: "" } },
|
||||
]
|
||||
},
|
||||
include: {
|
||||
messages: true
|
||||
}
|
||||
});
|
||||
|
||||
// Filter to only sessions without messages
|
||||
const unparsedSessions = sessionsToProcess.filter(session => session.messages.length === 0);
|
||||
|
||||
if (unparsedSessions.length === 0) {
|
||||
process.stdout.write("[TranscriptParser] No unparsed transcripts found.\n");
|
||||
return { processed: 0, errors: 0 };
|
||||
}
|
||||
|
||||
process.stdout.write(`[TranscriptParser] Found ${unparsedSessions.length} sessions with unparsed transcripts.\n`);
|
||||
|
||||
let successCount = 0;
|
||||
let errorCount = 0;
|
||||
|
||||
for (const session of unparsedSessions) {
|
||||
try {
|
||||
const result = await processTranscriptForSession(session.id, session.transcriptContent);
|
||||
process.stdout.write(`[TranscriptParser] Processed session ${session.id}: ${result.messageCount} messages\n`);
|
||||
successCount++;
|
||||
} catch (error) {
|
||||
process.stderr.write(`[TranscriptParser] Failed to process session ${session.id}: ${error}\n`);
|
||||
errorCount++;
|
||||
}
|
||||
}
|
||||
|
||||
process.stdout.write(`[TranscriptParser] Completed processing. Success: ${successCount}, Errors: ${errorCount}\n`);
|
||||
return { processed: successCount, errors: errorCount };
|
||||
|
||||
} catch (error) {
|
||||
process.stderr.write(`[TranscriptParser] Error in processAllUnparsedTranscripts: ${error}\n`);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets parsed messages for a session
|
||||
* @param {string} sessionId - The session ID
|
||||
* @returns {Promise<Array>} Array of message objects
|
||||
*/
|
||||
export async function getMessagesForSession(sessionId) {
|
||||
try {
|
||||
const messages = await prisma.message.findMany({
|
||||
where: { sessionId },
|
||||
orderBy: { order: 'asc' }
|
||||
});
|
||||
|
||||
return messages;
|
||||
} catch (error) {
|
||||
process.stderr.write(`[TranscriptParser] Error getting messages for session ${sessionId}: ${error}\n`);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
12
lib/types.ts
12
lib/types.ts
@ -35,6 +35,16 @@ export interface User {
|
||||
updatedAt: Date;
|
||||
}
|
||||
|
||||
export interface Message {
|
||||
id: string;
|
||||
sessionId: string;
|
||||
timestamp: Date;
|
||||
role: string; // "User", "Assistant", "System", etc.
|
||||
content: string;
|
||||
order: number; // Order within the conversation (0, 1, 2, ...)
|
||||
createdAt: Date;
|
||||
}
|
||||
|
||||
export interface ChatSession {
|
||||
id: string;
|
||||
sessionId: string;
|
||||
@ -60,10 +70,10 @@ export interface ChatSession {
|
||||
tokensEur?: number;
|
||||
initialMsg?: string;
|
||||
fullTranscriptUrl?: string | null;
|
||||
transcriptContent?: string | null;
|
||||
processed?: boolean | null; // Flag for post-processing status
|
||||
questions?: string | null; // JSON array of questions asked by user
|
||||
summary?: string | null; // Brief summary of the conversation
|
||||
messages?: Message[]; // Parsed messages from transcript
|
||||
}
|
||||
|
||||
export interface SessionQuery {
|
||||
|
||||
Reference in New Issue
Block a user