mirror of
https://github.com/kjanat/livedash-node.git
synced 2026-01-16 10:52:08 +01:00
feat: add rawTranscriptContent field to SessionImport model
feat: enhance server initialization with environment validation and import processing scheduler test: add Jest setup for unit tests and mock console methods test: implement unit tests for environment management and validation test: create unit tests for transcript fetcher functionality
This commit is contained in:
225
lib/importProcessor.ts
Normal file
225
lib/importProcessor.ts
Normal file
@ -0,0 +1,225 @@
|
||||
// SessionImport to Session processor
|
||||
import { PrismaClient, ImportStatus, SentimentCategory } from "@prisma/client";
|
||||
import { getSchedulerConfig } from "./env";
|
||||
import { fetchTranscriptContent, isValidTranscriptUrl } from "./transcriptFetcher";
|
||||
import cron from "node-cron";
|
||||
|
||||
const prisma = new PrismaClient();
|
||||
|
||||
/**
|
||||
* Process a single SessionImport record into a Session record
|
||||
*/
|
||||
async function processSingleImport(importRecord: any): Promise<{ success: boolean; error?: string }> {
|
||||
try {
|
||||
// Parse dates
|
||||
const startTime = new Date(importRecord.startTimeRaw);
|
||||
const endTime = new Date(importRecord.endTimeRaw);
|
||||
|
||||
// Validate dates
|
||||
if (isNaN(startTime.getTime()) || isNaN(endTime.getTime())) {
|
||||
throw new Error(`Invalid date format: start=${importRecord.startTimeRaw}, end=${importRecord.endTimeRaw}`);
|
||||
}
|
||||
|
||||
// Process sentiment
|
||||
let sentiment: number | null = null;
|
||||
let sentimentCategory: SentimentCategory | null = null;
|
||||
|
||||
if (importRecord.sentimentRaw) {
|
||||
const sentimentStr = importRecord.sentimentRaw.toLowerCase();
|
||||
if (sentimentStr.includes('positive')) {
|
||||
sentiment = 0.8;
|
||||
sentimentCategory = SentimentCategory.POSITIVE;
|
||||
} else if (sentimentStr.includes('negative')) {
|
||||
sentiment = -0.8;
|
||||
sentimentCategory = SentimentCategory.NEGATIVE;
|
||||
} else {
|
||||
sentiment = 0.0;
|
||||
sentimentCategory = SentimentCategory.NEUTRAL;
|
||||
}
|
||||
}
|
||||
|
||||
// Process boolean fields
|
||||
const escalated = importRecord.escalatedRaw ?
|
||||
['true', '1', 'yes', 'escalated'].includes(importRecord.escalatedRaw.toLowerCase()) : null;
|
||||
|
||||
const forwardedHr = importRecord.forwardedHrRaw ?
|
||||
['true', '1', 'yes', 'forwarded'].includes(importRecord.forwardedHrRaw.toLowerCase()) : null;
|
||||
|
||||
// Keep country code as-is, will be processed by OpenAI later
|
||||
const country = importRecord.countryCode;
|
||||
|
||||
// Fetch transcript content if URL is provided and not already fetched
|
||||
let transcriptContent = importRecord.rawTranscriptContent;
|
||||
if (!transcriptContent && importRecord.fullTranscriptUrl && isValidTranscriptUrl(importRecord.fullTranscriptUrl)) {
|
||||
console.log(`[Import Processor] Fetching transcript for ${importRecord.externalSessionId}...`);
|
||||
|
||||
// Get company credentials for transcript fetching
|
||||
const company = await prisma.company.findUnique({
|
||||
where: { id: importRecord.companyId },
|
||||
select: { csvUsername: true, csvPassword: true },
|
||||
});
|
||||
|
||||
const transcriptResult = await fetchTranscriptContent(
|
||||
importRecord.fullTranscriptUrl,
|
||||
company?.csvUsername || undefined,
|
||||
company?.csvPassword || undefined
|
||||
);
|
||||
|
||||
if (transcriptResult.success) {
|
||||
transcriptContent = transcriptResult.content;
|
||||
console.log(`[Import Processor] ✓ Fetched transcript for ${importRecord.externalSessionId} (${transcriptContent?.length} chars)`);
|
||||
|
||||
// Update the import record with the fetched content
|
||||
await prisma.sessionImport.update({
|
||||
where: { id: importRecord.id },
|
||||
data: { rawTranscriptContent: transcriptContent },
|
||||
});
|
||||
} else {
|
||||
console.log(`[Import Processor] ⚠️ Failed to fetch transcript for ${importRecord.externalSessionId}: ${transcriptResult.error}`);
|
||||
}
|
||||
}
|
||||
|
||||
// Create or update Session record
|
||||
const session = await prisma.session.upsert({
|
||||
where: {
|
||||
importId: importRecord.id,
|
||||
},
|
||||
update: {
|
||||
startTime,
|
||||
endTime,
|
||||
ipAddress: importRecord.ipAddress,
|
||||
country,
|
||||
language: importRecord.language,
|
||||
messagesSent: importRecord.messagesSent,
|
||||
sentiment,
|
||||
sentimentCategory,
|
||||
escalated,
|
||||
forwardedHr,
|
||||
fullTranscriptUrl: importRecord.fullTranscriptUrl,
|
||||
avgResponseTime: importRecord.avgResponseTimeSeconds,
|
||||
tokens: importRecord.tokens,
|
||||
tokensEur: importRecord.tokensEur,
|
||||
category: importRecord.category,
|
||||
initialMsg: importRecord.initialMessage,
|
||||
processed: false, // Will be processed later by AI
|
||||
},
|
||||
create: {
|
||||
companyId: importRecord.companyId,
|
||||
importId: importRecord.id,
|
||||
startTime,
|
||||
endTime,
|
||||
ipAddress: importRecord.ipAddress,
|
||||
country,
|
||||
language: importRecord.language,
|
||||
messagesSent: importRecord.messagesSent,
|
||||
sentiment,
|
||||
sentimentCategory,
|
||||
escalated,
|
||||
forwardedHr,
|
||||
fullTranscriptUrl: importRecord.fullTranscriptUrl,
|
||||
avgResponseTime: importRecord.avgResponseTimeSeconds,
|
||||
tokens: importRecord.tokens,
|
||||
tokensEur: importRecord.tokensEur,
|
||||
category: importRecord.category,
|
||||
initialMsg: importRecord.initialMessage,
|
||||
processed: false, // Will be processed later by AI
|
||||
},
|
||||
});
|
||||
|
||||
// Update import status to DONE
|
||||
await prisma.sessionImport.update({
|
||||
where: { id: importRecord.id },
|
||||
data: {
|
||||
status: ImportStatus.DONE,
|
||||
processedAt: new Date(),
|
||||
errorMsg: null,
|
||||
},
|
||||
});
|
||||
|
||||
return { success: true };
|
||||
} catch (error) {
|
||||
// Update import status to ERROR
|
||||
await prisma.sessionImport.update({
|
||||
where: { id: importRecord.id },
|
||||
data: {
|
||||
status: ImportStatus.ERROR,
|
||||
errorMsg: error instanceof Error ? error.message : String(error),
|
||||
},
|
||||
});
|
||||
|
||||
return {
|
||||
success: false,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Process queued SessionImport records into Session records
|
||||
*/
|
||||
export async function processQueuedImports(batchSize: number = 50): Promise<void> {
|
||||
console.log('[Import Processor] Starting to process queued imports...');
|
||||
|
||||
// Find queued imports
|
||||
const queuedImports = await prisma.sessionImport.findMany({
|
||||
where: {
|
||||
status: ImportStatus.QUEUED,
|
||||
},
|
||||
take: batchSize,
|
||||
orderBy: {
|
||||
createdAt: 'asc', // Process oldest first
|
||||
},
|
||||
});
|
||||
|
||||
if (queuedImports.length === 0) {
|
||||
console.log('[Import Processor] No queued imports found');
|
||||
return;
|
||||
}
|
||||
|
||||
console.log(`[Import Processor] Processing ${queuedImports.length} queued imports...`);
|
||||
|
||||
let successCount = 0;
|
||||
let errorCount = 0;
|
||||
|
||||
// Process each import
|
||||
for (const importRecord of queuedImports) {
|
||||
const result = await processSingleImport(importRecord);
|
||||
|
||||
if (result.success) {
|
||||
successCount++;
|
||||
console.log(`[Import Processor] ✓ Processed import ${importRecord.externalSessionId}`);
|
||||
} else {
|
||||
errorCount++;
|
||||
console.log(`[Import Processor] ✗ Failed to process import ${importRecord.externalSessionId}: ${result.error}`);
|
||||
}
|
||||
}
|
||||
|
||||
console.log(`[Import Processor] Completed: ${successCount} successful, ${errorCount} failed`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the import processing scheduler
|
||||
*/
|
||||
export function startImportProcessingScheduler(): void {
|
||||
const config = getSchedulerConfig();
|
||||
|
||||
if (!config.enabled) {
|
||||
console.log('[Import Processing Scheduler] Disabled via configuration');
|
||||
return;
|
||||
}
|
||||
|
||||
// Use a more frequent interval for import processing (every 5 minutes by default)
|
||||
const interval = process.env.IMPORT_PROCESSING_INTERVAL || '*/5 * * * *';
|
||||
const batchSize = parseInt(process.env.IMPORT_PROCESSING_BATCH_SIZE || '50', 10);
|
||||
|
||||
console.log(`[Import Processing Scheduler] Starting with interval: ${interval}`);
|
||||
console.log(`[Import Processing Scheduler] Batch size: ${batchSize}`);
|
||||
|
||||
cron.schedule(interval, async () => {
|
||||
try {
|
||||
await processQueuedImports(batchSize);
|
||||
} catch (error) {
|
||||
console.error(`[Import Processing Scheduler] Error: ${error}`);
|
||||
}
|
||||
});
|
||||
}
|
||||
Reference in New Issue
Block a user