refactor: comprehensive code quality improvements and dev environment fixes

- Convert ProcessingStatusManager from static class to individual functions
- Refactor processSingleImport function to reduce cognitive complexity
- Fix unused parameters in database-pool.ts event handlers
- Add missing DATABASE_URL configuration to env.ts
- Add pg package and @types/pg dependencies for PostgreSQL support
- Fix tsx command execution by updating package.json scripts to use pnpm exec
- Apply biome formatting fixes for import organization
This commit is contained in:
2025-06-29 21:56:29 +02:00
parent 8fd774422c
commit 5042a6c016
8 changed files with 3836 additions and 6878 deletions

View File

@ -33,19 +33,19 @@ const createConnectionPool = () => {
});
// Connection pool event handlers
pool.on("connect", (_client) => {
pool.on("connect", () => {
console.log(
`Database connection established. Active connections: ${pool.totalCount}`
);
});
pool.on("acquire", (_client) => {
pool.on("acquire", () => {
console.log(
`Connection acquired from pool. Waiting: ${pool.waitingCount}, Idle: ${pool.idleCount}`
);
});
pool.on("release", (_client) => {
pool.on("release", () => {
console.log(
`Connection released to pool. Active: ${pool.totalCount - pool.idleCount}, Idle: ${pool.idleCount}`
);
@ -55,7 +55,7 @@ const createConnectionPool = () => {
console.error("Database pool error:", err);
});
pool.on("remove", (_client) => {
pool.on("remove", () => {
console.log(
`Connection removed from pool. Total connections: ${pool.totalCount}`
);

View File

@ -12,7 +12,7 @@ export interface RetryConfig {
export const DEFAULT_RETRY_CONFIG: RetryConfig = {
maxRetries: 3,
initialDelay: 1000, // 1 second
maxDelay: 10000, // 10 seconds
maxDelay: 10000, // 10 seconds
backoffMultiplier: 2,
};
@ -21,86 +21,90 @@ export function isRetryableError(error: unknown): boolean {
if (error instanceof PrismaClientKnownRequestError) {
// Connection errors that are worth retrying
const retryableCodes = [
'P1001', // Can't reach database server
'P1002', // Database server was reached but timed out
'P1008', // Operations timed out
'P1017', // Server has closed the connection
"P1001", // Can't reach database server
"P1002", // Database server was reached but timed out
"P1008", // Operations timed out
"P1017", // Server has closed the connection
];
return retryableCodes.includes(error.code);
}
// Check for network-related errors
if (error instanceof Error) {
const retryableMessages = [
'ECONNREFUSED',
'ECONNRESET',
'ETIMEDOUT',
'ENOTFOUND',
'EAI_AGAIN',
'Can\'t reach database server',
'Connection terminated',
'Connection lost',
"ECONNREFUSED",
"ECONNRESET",
"ETIMEDOUT",
"ENOTFOUND",
"EAI_AGAIN",
"Can't reach database server",
"Connection terminated",
"Connection lost",
];
return retryableMessages.some(msg =>
error.message.includes(msg)
);
return retryableMessages.some((msg) => error.message.includes(msg));
}
return false;
}
// Calculate delay with exponential backoff
export function calculateDelay(
attempt: number,
attempt: number,
config: RetryConfig = DEFAULT_RETRY_CONFIG
): number {
const delay = config.initialDelay * Math.pow(config.backoffMultiplier, attempt - 1);
const delay = config.initialDelay * config.backoffMultiplier ** (attempt - 1);
return Math.min(delay, config.maxDelay);
}
// Sleep utility
export function sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
return new Promise((resolve) => setTimeout(resolve, ms));
}
// Retry wrapper for database operations
export async function withRetry<T>(
operation: () => Promise<T>,
config: RetryConfig = DEFAULT_RETRY_CONFIG,
context: string = 'database operation'
context = "database operation"
): Promise<T> {
let lastError: unknown;
for (let attempt = 1; attempt <= config.maxRetries; attempt++) {
try {
return await operation();
} catch (error) {
lastError = error;
// Don't retry if error is not retryable
if (!isRetryableError(error)) {
console.error(`[${context}] Non-retryable error on attempt ${attempt}:`, error);
console.error(
`[${context}] Non-retryable error on attempt ${attempt}:`,
error
);
throw error;
}
// Don't retry on last attempt
if (attempt === config.maxRetries) {
console.error(`[${context}] Max retries (${config.maxRetries}) exceeded:`, error);
console.error(
`[${context}] Max retries (${config.maxRetries}) exceeded:`,
error
);
break;
}
const delay = calculateDelay(attempt, config);
console.warn(
`[${context}] Attempt ${attempt}/${config.maxRetries} failed, retrying in ${delay}ms:`,
error instanceof Error ? error.message : error
);
await sleep(delay);
}
}
throw lastError;
}
@ -110,21 +114,21 @@ export async function checkDatabaseHealthWithRetry(
config: Partial<RetryConfig> = {}
): Promise<boolean> {
const retryConfig = { ...DEFAULT_RETRY_CONFIG, ...config };
try {
return await withRetry(
async () => {
const isHealthy = await checkFunction();
if (!isHealthy) {
throw new Error('Database health check failed');
throw new Error("Database health check failed");
}
return true;
},
retryConfig,
'database health check'
"database health check"
);
} catch (error) {
console.error('Database health check failed after retries:', error);
console.error("Database health check failed after retries:", error);
return false;
}
}
}

View File

@ -103,6 +103,10 @@ export const env = {
5
),
// Database Configuration
DATABASE_URL: parseEnvValue(process.env.DATABASE_URL) || "",
DATABASE_URL_DIRECT: parseEnvValue(process.env.DATABASE_URL_DIRECT) || "",
// Database Connection Pooling
DATABASE_CONNECTION_LIMIT: parseIntWithDefault(
process.env.DATABASE_CONNECTION_LIMIT,
@ -123,6 +127,10 @@ export const env = {
export function validateEnv(): { valid: boolean; errors: string[] } {
const errors: string[] = [];
if (!env.DATABASE_URL) {
errors.push("DATABASE_URL is required");
}
if (!env.NEXTAUTH_SECRET) {
errors.push("NEXTAUTH_SECRET is required");
}

View File

@ -1,14 +1,20 @@
// SessionImport to Session processor
import { ProcessingStage, SentimentCategory } from "@prisma/client";
import cron from "node-cron";
import { withRetry } from "./database-retry.js";
import { getSchedulerConfig } from "./env";
import { prisma } from "./prisma.js";
import { ProcessingStatusManager } from "./processingStatusManager";
import {
completeStage,
failStage,
initializeSession,
skipStage,
startStage,
} from "./processingStatusManager.js";
import {
fetchTranscriptContent,
isValidTranscriptUrl,
} from "./transcriptFetcher";
import { withRetry, isRetryableError } from "./database-retry.js";
interface ImportRecord {
id: string;
@ -167,6 +173,160 @@ async function parseTranscriptIntoMessages(
);
}
/**
* Create or update a Session record from ImportRecord
*/
async function createSession(importRecord: ImportRecord): Promise<string> {
const startTime = parseEuropeanDate(importRecord.startTimeRaw);
const endTime = parseEuropeanDate(importRecord.endTimeRaw);
console.log(
`[Import Processor] Processing ${importRecord.externalSessionId}: ${startTime.toISOString()} - ${endTime.toISOString()}`
);
const session = await prisma.session.upsert({
where: {
importId: importRecord.id,
},
update: {
startTime,
endTime,
ipAddress: importRecord.ipAddress,
country: importRecord.countryCode,
fullTranscriptUrl: importRecord.fullTranscriptUrl,
avgResponseTime: importRecord.avgResponseTimeSeconds,
initialMsg: importRecord.initialMessage,
},
create: {
companyId: importRecord.companyId,
importId: importRecord.id,
startTime,
endTime,
ipAddress: importRecord.ipAddress,
country: importRecord.countryCode,
fullTranscriptUrl: importRecord.fullTranscriptUrl,
avgResponseTime: importRecord.avgResponseTimeSeconds,
initialMsg: importRecord.initialMessage,
},
});
return session.id;
}
/**
* Handle transcript fetching for a session
*/
async function handleTranscriptFetching(
sessionId: string,
importRecord: ImportRecord
): Promise<string | null> {
let transcriptContent = importRecord.rawTranscriptContent;
if (
!transcriptContent &&
importRecord.fullTranscriptUrl &&
isValidTranscriptUrl(importRecord.fullTranscriptUrl)
) {
await startStage(sessionId, ProcessingStage.TRANSCRIPT_FETCH);
console.log(
`[Import Processor] Fetching transcript for ${importRecord.externalSessionId}...`
);
const company = await prisma.company.findUnique({
where: { id: importRecord.companyId },
select: { csvUsername: true, csvPassword: true },
});
const transcriptResult = await fetchTranscriptContent(
importRecord.fullTranscriptUrl,
company?.csvUsername || undefined,
company?.csvPassword || undefined
);
if (transcriptResult.success) {
transcriptContent = transcriptResult.content;
console.log(
`[Import Processor] ✓ Fetched transcript for ${importRecord.externalSessionId} (${transcriptContent?.length} chars)`
);
await prisma.sessionImport.update({
where: { id: importRecord.id },
data: { rawTranscriptContent: transcriptContent },
});
await completeStage(sessionId, ProcessingStage.TRANSCRIPT_FETCH, {
contentLength: transcriptContent?.length || 0,
url: importRecord.fullTranscriptUrl,
});
} else {
console.log(
`[Import Processor] ⚠️ Failed to fetch transcript for ${importRecord.externalSessionId}: ${transcriptResult.error}`
);
await failStage(
sessionId,
ProcessingStage.TRANSCRIPT_FETCH,
transcriptResult.error || "Unknown error"
);
}
} else if (!importRecord.fullTranscriptUrl) {
await skipStage(
sessionId,
ProcessingStage.TRANSCRIPT_FETCH,
"No transcript URL provided"
);
} else {
await completeStage(sessionId, ProcessingStage.TRANSCRIPT_FETCH, {
contentLength: transcriptContent?.length || 0,
source: "already_fetched",
});
}
return transcriptContent;
}
/**
* Handle session creation (message parsing)
*/
async function handleSessionCreation(
sessionId: string,
transcriptContent: string | null
): Promise<void> {
await startStage(sessionId, ProcessingStage.SESSION_CREATION);
if (transcriptContent) {
await parseTranscriptIntoMessages(sessionId, transcriptContent);
}
await completeStage(sessionId, ProcessingStage.SESSION_CREATION, {
hasTranscript: !!transcriptContent,
transcriptLength: transcriptContent?.length || 0,
});
}
/**
* Handle errors and mark appropriate stage as failed
*/
async function handleProcessingError(
sessionId: string | null,
error: unknown
): Promise<void> {
if (!sessionId) return;
const errorMessage = error instanceof Error ? error.message : String(error);
if (errorMessage.includes("transcript") || errorMessage.includes("fetch")) {
await failStage(sessionId, ProcessingStage.TRANSCRIPT_FETCH, errorMessage);
} else if (
errorMessage.includes("message") ||
errorMessage.includes("parse")
) {
await failStage(sessionId, ProcessingStage.SESSION_CREATION, errorMessage);
} else {
await failStage(sessionId, ProcessingStage.CSV_IMPORT, errorMessage);
}
}
/**
* Process a single SessionImport record into a Session record
* Uses new unified processing status tracking
@ -177,189 +337,22 @@ async function processSingleImport(
let sessionId: string | null = null;
try {
// Parse dates using European format parser
const startTime = parseEuropeanDate(importRecord.startTimeRaw);
const endTime = parseEuropeanDate(importRecord.endTimeRaw);
sessionId = await createSession(importRecord);
await initializeSession(sessionId);
await completeStage(sessionId, ProcessingStage.CSV_IMPORT);
console.log(
`[Import Processor] Processing ${importRecord.externalSessionId}: ${startTime.toISOString()} - ${endTime.toISOString()}`
);
// Create or update Session record with MINIMAL processing
const session = await prisma.session.upsert({
where: {
importId: importRecord.id,
},
update: {
startTime,
endTime,
// Direct copies (minimal processing)
ipAddress: importRecord.ipAddress,
country: importRecord.countryCode, // Keep as country code
fullTranscriptUrl: importRecord.fullTranscriptUrl,
avgResponseTime: importRecord.avgResponseTimeSeconds,
initialMsg: importRecord.initialMessage,
},
create: {
companyId: importRecord.companyId,
importId: importRecord.id,
startTime,
endTime,
// Direct copies (minimal processing)
ipAddress: importRecord.ipAddress,
country: importRecord.countryCode, // Keep as country code
fullTranscriptUrl: importRecord.fullTranscriptUrl,
avgResponseTime: importRecord.avgResponseTimeSeconds,
initialMsg: importRecord.initialMessage,
},
});
sessionId = session.id;
// Initialize processing status for this session
await ProcessingStatusManager.initializeSession(sessionId);
// Mark CSV_IMPORT as completed
await ProcessingStatusManager.completeStage(
const transcriptContent = await handleTranscriptFetching(
sessionId,
ProcessingStage.CSV_IMPORT
);
// Handle transcript fetching
let transcriptContent = importRecord.rawTranscriptContent;
if (
!transcriptContent &&
importRecord.fullTranscriptUrl &&
isValidTranscriptUrl(importRecord.fullTranscriptUrl)
) {
await ProcessingStatusManager.startStage(
sessionId,
ProcessingStage.TRANSCRIPT_FETCH
);
console.log(
`[Import Processor] Fetching transcript for ${importRecord.externalSessionId}...`
);
// Get company credentials for transcript fetching
const company = await prisma.company.findUnique({
where: { id: importRecord.companyId },
select: { csvUsername: true, csvPassword: true },
});
const transcriptResult = await fetchTranscriptContent(
importRecord.fullTranscriptUrl,
company?.csvUsername || undefined,
company?.csvPassword || undefined
);
if (transcriptResult.success) {
transcriptContent = transcriptResult.content;
console.log(
`[Import Processor] ✓ Fetched transcript for ${importRecord.externalSessionId} (${transcriptContent?.length} chars)`
);
// Update the import record with the fetched content
await prisma.sessionImport.update({
where: { id: importRecord.id },
data: { rawTranscriptContent: transcriptContent },
});
await ProcessingStatusManager.completeStage(
sessionId,
ProcessingStage.TRANSCRIPT_FETCH,
{
contentLength: transcriptContent?.length || 0,
url: importRecord.fullTranscriptUrl,
}
);
} else {
console.log(
`[Import Processor] ⚠️ Failed to fetch transcript for ${importRecord.externalSessionId}: ${transcriptResult.error}`
);
await ProcessingStatusManager.failStage(
sessionId,
ProcessingStage.TRANSCRIPT_FETCH,
transcriptResult.error || "Unknown error"
);
}
} else if (!importRecord.fullTranscriptUrl) {
// No transcript URL available - skip this stage
await ProcessingStatusManager.skipStage(
sessionId,
ProcessingStage.TRANSCRIPT_FETCH,
"No transcript URL provided"
);
} else {
// Transcript already fetched
await ProcessingStatusManager.completeStage(
sessionId,
ProcessingStage.TRANSCRIPT_FETCH,
{
contentLength: transcriptContent?.length || 0,
source: "already_fetched",
}
);
}
// Handle session creation (parse messages)
await ProcessingStatusManager.startStage(
sessionId,
ProcessingStage.SESSION_CREATION
);
if (transcriptContent) {
await parseTranscriptIntoMessages(sessionId, transcriptContent);
}
await ProcessingStatusManager.completeStage(
sessionId,
ProcessingStage.SESSION_CREATION,
{
hasTranscript: !!transcriptContent,
transcriptLength: transcriptContent?.length || 0,
}
importRecord
);
await handleSessionCreation(sessionId, transcriptContent);
return { success: true };
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
// Mark the current stage as failed if we have a sessionId
if (sessionId) {
// Determine which stage failed based on the error
if (
errorMessage.includes("transcript") ||
errorMessage.includes("fetch")
) {
await ProcessingStatusManager.failStage(
sessionId,
ProcessingStage.TRANSCRIPT_FETCH,
errorMessage
);
} else if (
errorMessage.includes("message") ||
errorMessage.includes("parse")
) {
await ProcessingStatusManager.failStage(
sessionId,
ProcessingStage.SESSION_CREATION,
errorMessage
);
} else {
// General failure - mark CSV_IMPORT as failed
await ProcessingStatusManager.failStage(
sessionId,
ProcessingStage.CSV_IMPORT,
errorMessage
);
}
}
await handleProcessingError(sessionId, error);
return {
success: false,
error: errorMessage,
error: error instanceof Error ? error.message : String(error),
};
}
}

View File

@ -7,10 +7,15 @@ import {
} from "@prisma/client";
import cron from "node-cron";
import fetch from "node-fetch";
import { withRetry } from "./database-retry.js";
import { prisma } from "./prisma.js";
import { ProcessingStatusManager } from "./processingStatusManager";
import {
completeStage,
failStage,
getSessionsNeedingProcessing,
startStage,
} from "./processingStatusManager.js";
import { getSchedulerConfig } from "./schedulerConfig";
import { withRetry, isRetryableError } from "./database-retry.js";
const OPENAI_API_KEY = process.env.OPENAI_API_KEY;
const OPENAI_API_URL = "https://api.openai.com/v1/chat/completions";
@ -516,10 +521,7 @@ async function processSingleSession(
try {
// Mark AI analysis as started
await ProcessingStatusManager.startStage(
session.id,
ProcessingStage.AI_ANALYSIS
);
await startStage(session.id, ProcessingStage.AI_ANALYSIS);
// Convert messages back to transcript format for OpenAI processing
const transcript = session.messages
@ -569,34 +571,23 @@ async function processSingleSession(
});
// Mark AI analysis as completed
await ProcessingStatusManager.completeStage(
session.id,
ProcessingStage.AI_ANALYSIS,
{
language: processedData.language,
sentiment: processedData.sentiment,
category: processedData.category,
questionsCount: processedData.questions.length,
}
);
await completeStage(session.id, ProcessingStage.AI_ANALYSIS, {
language: processedData.language,
sentiment: processedData.sentiment,
category: processedData.category,
questionsCount: processedData.questions.length,
});
// Start question extraction stage
await ProcessingStatusManager.startStage(
session.id,
ProcessingStage.QUESTION_EXTRACTION
);
await startStage(session.id, ProcessingStage.QUESTION_EXTRACTION);
// Process questions into separate tables
await processQuestions(session.id, processedData.questions);
// Mark question extraction as completed
await ProcessingStatusManager.completeStage(
session.id,
ProcessingStage.QUESTION_EXTRACTION,
{
questionsProcessed: processedData.questions.length,
}
);
await completeStage(session.id, ProcessingStage.QUESTION_EXTRACTION, {
questionsProcessed: processedData.questions.length,
});
return {
sessionId: session.id,
@ -604,7 +595,7 @@ async function processSingleSession(
};
} catch (error) {
// Mark AI analysis as failed
await ProcessingStatusManager.failStage(
await failStage(
session.id,
ProcessingStage.AI_ANALYSIS,
error instanceof Error ? error.message : String(error)
@ -688,11 +679,10 @@ async function processUnprocessedSessionsInternal(
maxConcurrency = 5
): Promise<void> {
// Get sessions that need AI processing using the new status system
const sessionsNeedingAI =
await ProcessingStatusManager.getSessionsNeedingProcessing(
ProcessingStage.AI_ANALYSIS,
batchSize || 50
);
const sessionsNeedingAI = await getSessionsNeedingProcessing(
ProcessingStage.AI_ANALYSIS,
batchSize || 50
);
if (sessionsNeedingAI.length === 0) {
process.stdout.write(

View File

@ -12,347 +12,339 @@ interface WhereClause {
}
/**
* Centralized processing status management
* Initialize processing status for a session with all stages set to PENDING
*/
export class ProcessingStatusManager {
/**
* Initialize processing status for a session with all stages set to PENDING
*/
static async initializeSession(sessionId: string): Promise<void> {
const stages = [
ProcessingStage.CSV_IMPORT,
ProcessingStage.TRANSCRIPT_FETCH,
ProcessingStage.SESSION_CREATION,
ProcessingStage.AI_ANALYSIS,
ProcessingStage.QUESTION_EXTRACTION,
];
export async function initializeSession(sessionId: string): Promise<void> {
const stages = [
ProcessingStage.CSV_IMPORT,
ProcessingStage.TRANSCRIPT_FETCH,
ProcessingStage.SESSION_CREATION,
ProcessingStage.AI_ANALYSIS,
ProcessingStage.QUESTION_EXTRACTION,
];
// Create all processing status records for this session
await prisma.sessionProcessingStatus.createMany({
data: stages.map((stage) => ({
sessionId,
stage,
status: ProcessingStatus.PENDING,
})),
skipDuplicates: true, // In case some already exist
});
}
/**
* Start a processing stage
*/
static async startStage(
sessionId: string,
stage: ProcessingStage,
metadata?: ProcessingMetadata
): Promise<void> {
await prisma.sessionProcessingStatus.upsert({
where: {
sessionId_stage: { sessionId, stage },
},
update: {
status: ProcessingStatus.IN_PROGRESS,
startedAt: new Date(),
errorMessage: null,
metadata: metadata || null,
},
create: {
sessionId,
stage,
status: ProcessingStatus.IN_PROGRESS,
startedAt: new Date(),
metadata: metadata || null,
},
});
}
/**
* Complete a processing stage successfully
*/
static async completeStage(
sessionId: string,
stage: ProcessingStage,
metadata?: ProcessingMetadata
): Promise<void> {
await prisma.sessionProcessingStatus.upsert({
where: {
sessionId_stage: { sessionId, stage },
},
update: {
status: ProcessingStatus.COMPLETED,
completedAt: new Date(),
errorMessage: null,
metadata: metadata || null,
},
create: {
sessionId,
stage,
status: ProcessingStatus.COMPLETED,
startedAt: new Date(),
completedAt: new Date(),
metadata: metadata || null,
},
});
}
/**
* Mark a processing stage as failed
*/
static async failStage(
sessionId: string,
stage: ProcessingStage,
errorMessage: string,
metadata?: ProcessingMetadata
): Promise<void> {
await prisma.sessionProcessingStatus.upsert({
where: {
sessionId_stage: { sessionId, stage },
},
update: {
status: ProcessingStatus.FAILED,
completedAt: new Date(),
errorMessage,
retryCount: { increment: 1 },
metadata: metadata || null,
},
create: {
sessionId,
stage,
status: ProcessingStatus.FAILED,
startedAt: new Date(),
completedAt: new Date(),
errorMessage,
retryCount: 1,
metadata: metadata || null,
},
});
}
/**
* Skip a processing stage (e.g., no transcript URL available)
*/
static async skipStage(
sessionId: string,
stage: ProcessingStage,
reason: string
): Promise<void> {
await prisma.sessionProcessingStatus.upsert({
where: {
sessionId_stage: { sessionId, stage },
},
update: {
status: ProcessingStatus.SKIPPED,
completedAt: new Date(),
errorMessage: reason,
},
create: {
sessionId,
stage,
status: ProcessingStatus.SKIPPED,
startedAt: new Date(),
completedAt: new Date(),
errorMessage: reason,
},
});
}
/**
* Get processing status for a specific session
*/
static async getSessionStatus(sessionId: string) {
return await prisma.sessionProcessingStatus.findMany({
where: { sessionId },
orderBy: { stage: "asc" },
});
}
/**
* Get sessions that need processing for a specific stage
*/
static async getSessionsNeedingProcessing(
stage: ProcessingStage,
limit = 50
) {
return await prisma.sessionProcessingStatus.findMany({
where: {
stage,
status: ProcessingStatus.PENDING,
session: {
company: {
status: "ACTIVE", // Only process sessions from active companies
},
},
},
include: {
session: {
select: {
id: true,
companyId: true,
importId: true,
startTime: true,
endTime: true,
fullTranscriptUrl: true,
import:
stage === ProcessingStage.TRANSCRIPT_FETCH
? {
select: {
id: true,
fullTranscriptUrl: true,
externalSessionId: true,
},
}
: false,
company: {
select: {
id: true,
csvUsername: true,
csvPassword: true,
},
},
},
},
},
take: limit,
orderBy: { session: { createdAt: "asc" } },
});
}
/**
* Get pipeline status overview
*/
static async getPipelineStatus() {
// Get counts by stage and status
const statusCounts = await prisma.sessionProcessingStatus.groupBy({
by: ["stage", "status"],
_count: { id: true },
});
// Get total sessions
const totalSessions = await prisma.session.count();
// Organize the data
const pipeline: Record<string, Record<string, number>> = {};
for (const { stage, status, _count } of statusCounts) {
if (!pipeline[stage]) {
pipeline[stage] = {};
}
pipeline[stage][status] = _count.id;
}
return {
totalSessions,
pipeline,
};
}
/**
* Get sessions with failed processing
*/
static async getFailedSessions(stage?: ProcessingStage) {
const where: WhereClause = {
status: ProcessingStatus.FAILED,
};
if (stage) {
where.stage = stage;
}
return await prisma.sessionProcessingStatus.findMany({
where,
select: {
id: true,
sessionId: true,
stage: true,
status: true,
startedAt: true,
completedAt: true,
errorMessage: true,
retryCount: true,
session: {
select: {
id: true,
companyId: true,
startTime: true,
import: {
select: {
id: true,
externalSessionId: true,
},
},
},
},
},
orderBy: { completedAt: "desc" },
take: 100, // Limit failed sessions to prevent overfetching
});
}
/**
* Reset a failed stage for retry
*/
static async resetStageForRetry(
sessionId: string,
stage: ProcessingStage
): Promise<void> {
await prisma.sessionProcessingStatus.update({
where: {
sessionId_stage: { sessionId, stage },
},
data: {
status: ProcessingStatus.PENDING,
startedAt: null,
completedAt: null,
errorMessage: null,
},
});
}
/**
* Check if a session has completed a specific stage
*/
static async hasCompletedStage(
sessionId: string,
stage: ProcessingStage
): Promise<boolean> {
const status = await prisma.sessionProcessingStatus.findUnique({
where: {
sessionId_stage: { sessionId, stage },
},
});
return status?.status === ProcessingStatus.COMPLETED;
}
/**
* Check if a session is ready for a specific stage (previous stages completed)
*/
static async isReadyForStage(
sessionId: string,
stage: ProcessingStage
): Promise<boolean> {
const stageOrder = [
ProcessingStage.CSV_IMPORT,
ProcessingStage.TRANSCRIPT_FETCH,
ProcessingStage.SESSION_CREATION,
ProcessingStage.AI_ANALYSIS,
ProcessingStage.QUESTION_EXTRACTION,
];
const currentStageIndex = stageOrder.indexOf(stage);
if (currentStageIndex === 0) return true; // First stage is always ready
// Check if all previous stages are completed
const previousStages = stageOrder.slice(0, currentStageIndex);
for (const prevStage of previousStages) {
const isCompleted = await ProcessingStatusManager.hasCompletedStage(
sessionId,
prevStage
);
if (!isCompleted) return false;
}
return true;
}
// Create all processing status records for this session
await prisma.sessionProcessingStatus.createMany({
data: stages.map((stage) => ({
sessionId,
stage,
status: ProcessingStatus.PENDING,
})),
skipDuplicates: true, // In case some already exist
});
}
/**
* Start a processing stage
*/
export async function startStage(
sessionId: string,
stage: ProcessingStage,
metadata?: ProcessingMetadata
): Promise<void> {
await prisma.sessionProcessingStatus.upsert({
where: {
sessionId_stage: { sessionId, stage },
},
update: {
status: ProcessingStatus.IN_PROGRESS,
startedAt: new Date(),
errorMessage: null,
metadata: metadata || null,
},
create: {
sessionId,
stage,
status: ProcessingStatus.IN_PROGRESS,
startedAt: new Date(),
metadata: metadata || null,
},
});
}
/**
* Complete a processing stage successfully
*/
export async function completeStage(
sessionId: string,
stage: ProcessingStage,
metadata?: ProcessingMetadata
): Promise<void> {
await prisma.sessionProcessingStatus.upsert({
where: {
sessionId_stage: { sessionId, stage },
},
update: {
status: ProcessingStatus.COMPLETED,
completedAt: new Date(),
errorMessage: null,
metadata: metadata || null,
},
create: {
sessionId,
stage,
status: ProcessingStatus.COMPLETED,
startedAt: new Date(),
completedAt: new Date(),
metadata: metadata || null,
},
});
}
/**
* Mark a processing stage as failed
*/
export async function failStage(
sessionId: string,
stage: ProcessingStage,
errorMessage: string,
metadata?: ProcessingMetadata
): Promise<void> {
await prisma.sessionProcessingStatus.upsert({
where: {
sessionId_stage: { sessionId, stage },
},
update: {
status: ProcessingStatus.FAILED,
completedAt: new Date(),
errorMessage,
retryCount: { increment: 1 },
metadata: metadata || null,
},
create: {
sessionId,
stage,
status: ProcessingStatus.FAILED,
startedAt: new Date(),
completedAt: new Date(),
errorMessage,
retryCount: 1,
metadata: metadata || null,
},
});
}
/**
* Skip a processing stage (e.g., no transcript URL available)
*/
export async function skipStage(
sessionId: string,
stage: ProcessingStage,
reason: string
): Promise<void> {
await prisma.sessionProcessingStatus.upsert({
where: {
sessionId_stage: { sessionId, stage },
},
update: {
status: ProcessingStatus.SKIPPED,
completedAt: new Date(),
errorMessage: reason,
},
create: {
sessionId,
stage,
status: ProcessingStatus.SKIPPED,
startedAt: new Date(),
completedAt: new Date(),
errorMessage: reason,
},
});
}
/**
* Get processing status for a specific session
*/
export async function getSessionStatus(sessionId: string) {
return await prisma.sessionProcessingStatus.findMany({
where: { sessionId },
orderBy: { stage: "asc" },
});
}
/**
* Get sessions that need processing for a specific stage
*/
export async function getSessionsNeedingProcessing(
stage: ProcessingStage,
limit = 50
) {
return await prisma.sessionProcessingStatus.findMany({
where: {
stage,
status: ProcessingStatus.PENDING,
session: {
company: {
status: "ACTIVE", // Only process sessions from active companies
},
},
},
include: {
session: {
select: {
id: true,
companyId: true,
importId: true,
startTime: true,
endTime: true,
fullTranscriptUrl: true,
import:
stage === ProcessingStage.TRANSCRIPT_FETCH
? {
select: {
id: true,
fullTranscriptUrl: true,
externalSessionId: true,
},
}
: false,
company: {
select: {
id: true,
csvUsername: true,
csvPassword: true,
},
},
},
},
},
take: limit,
orderBy: { session: { createdAt: "asc" } },
});
}
/**
* Get pipeline status overview
*/
export async function getPipelineStatus() {
// Get counts by stage and status
const statusCounts = await prisma.sessionProcessingStatus.groupBy({
by: ["stage", "status"],
_count: { id: true },
});
// Get total sessions
const totalSessions = await prisma.session.count();
// Organize the data
const pipeline: Record<string, Record<string, number>> = {};
for (const { stage, status, _count } of statusCounts) {
if (!pipeline[stage]) {
pipeline[stage] = {};
}
pipeline[stage][status] = _count.id;
}
return {
totalSessions,
pipeline,
};
}
/**
* Get sessions with failed processing
*/
export async function getFailedSessions(stage?: ProcessingStage) {
const where: WhereClause = {
status: ProcessingStatus.FAILED,
};
if (stage) {
where.stage = stage;
}
return await prisma.sessionProcessingStatus.findMany({
where,
select: {
id: true,
sessionId: true,
stage: true,
status: true,
startedAt: true,
completedAt: true,
errorMessage: true,
retryCount: true,
session: {
select: {
id: true,
companyId: true,
startTime: true,
import: {
select: {
id: true,
externalSessionId: true,
},
},
},
},
},
orderBy: { completedAt: "desc" },
take: 100, // Limit failed sessions to prevent overfetching
});
}
/**
* Reset a failed stage for retry
*/
export async function resetStageForRetry(
sessionId: string,
stage: ProcessingStage
): Promise<void> {
await prisma.sessionProcessingStatus.update({
where: {
sessionId_stage: { sessionId, stage },
},
data: {
status: ProcessingStatus.PENDING,
startedAt: null,
completedAt: null,
errorMessage: null,
},
});
}
/**
* Check if a session has completed a specific stage
*/
export async function hasCompletedStage(
sessionId: string,
stage: ProcessingStage
): Promise<boolean> {
const status = await prisma.sessionProcessingStatus.findUnique({
where: {
sessionId_stage: { sessionId, stage },
},
});
return status?.status === ProcessingStatus.COMPLETED;
}
/**
* Check if a session is ready for a specific stage (previous stages completed)
*/
export async function isReadyForStage(
sessionId: string,
stage: ProcessingStage
): Promise<boolean> {
const stageOrder = [
ProcessingStage.CSV_IMPORT,
ProcessingStage.TRANSCRIPT_FETCH,
ProcessingStage.SESSION_CREATION,
ProcessingStage.AI_ANALYSIS,
ProcessingStage.QUESTION_EXTRACTION,
];
const currentStageIndex = stageOrder.indexOf(stage);
if (currentStageIndex === 0) return true; // First stage is always ready
// Check if all previous stages are completed
const previousStages = stageOrder.slice(0, currentStageIndex);
for (const prevStage of previousStages) {
const isCompleted = await hasCompletedStage(sessionId, prevStage);
if (!isCompleted) return false;
}
return true;
}