mirror of
https://github.com/kjanat/livedash-node.git
synced 2026-01-16 15:52:10 +01:00
fix: comprehensive security and type improvements from PR #20 review
Security Enhancements: - Implemented proper rate limiting with automatic cleanup for /register and /forgot-password endpoints - Added memory usage protection with MAX_ENTRIES limit (10000) - Fixed rate limiter memory leaks by adding cleanup intervals - Improved IP extraction with x-real-ip and x-client-ip header support Code Quality Improvements: - Refactored ProcessingStatusManager from individual functions to class-based architecture - Maintained backward compatibility with singleton instance pattern - Fixed TypeScript strict mode violations across the codebase - Resolved all build errors and type mismatches UI Component Fixes: - Removed unused chart components (Charts.tsx, DonutChart.tsx) - Fixed calendar component type issues by removing unused custom implementations - Resolved theme provider type imports - Fixed confetti component default options handling - Corrected pointer component coordinate type definitions Type System Improvements: - Extended NextAuth types to support dual auth systems (regular and platform users) - Fixed nullable type handling throughout the codebase - Resolved Prisma JSON field type compatibility issues - Corrected SessionMessage and ImportRecord interface definitions - Fixed ES2015 iteration compatibility issues Database & Performance: - Updated database pool configuration for Prisma adapter compatibility - Fixed pagination response structure in user management endpoints - Improved error handling with proper error class usage Testing & Build: - All TypeScript compilation errors resolved - ESLint warnings remain but no errors - Build completes successfully with proper static generation
This commit is contained in:
25
lib/auth.ts
25
lib/auth.ts
@ -6,8 +6,10 @@ import { prisma } from "./prisma";
|
||||
// Define the shape of the JWT token
|
||||
declare module "next-auth/jwt" {
|
||||
interface JWT {
|
||||
companyId: string;
|
||||
role: string;
|
||||
companyId?: string;
|
||||
role?: string;
|
||||
isPlatformUser?: boolean;
|
||||
platformRole?: string;
|
||||
}
|
||||
}
|
||||
|
||||
@ -18,8 +20,11 @@ declare module "next-auth" {
|
||||
id?: string;
|
||||
name?: string;
|
||||
email?: string;
|
||||
image?: string;
|
||||
companyId?: string;
|
||||
role?: string;
|
||||
isPlatformUser?: boolean;
|
||||
platformRole?: string;
|
||||
};
|
||||
}
|
||||
|
||||
@ -27,8 +32,10 @@ declare module "next-auth" {
|
||||
id: string;
|
||||
email: string;
|
||||
name?: string;
|
||||
companyId: string;
|
||||
role: string;
|
||||
companyId?: string;
|
||||
role?: string;
|
||||
isPlatformUser?: boolean;
|
||||
platformRole?: string;
|
||||
}
|
||||
}
|
||||
|
||||
@ -50,13 +57,13 @@ export const authOptions: NextAuthOptions = {
|
||||
include: { company: true },
|
||||
});
|
||||
|
||||
if (!user || !user.hashedPassword) {
|
||||
if (!user || !user.password) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const isPasswordValid = await bcrypt.compare(
|
||||
credentials.password,
|
||||
user.hashedPassword
|
||||
user.password
|
||||
);
|
||||
|
||||
if (!isPasswordValid) {
|
||||
@ -71,7 +78,7 @@ export const authOptions: NextAuthOptions = {
|
||||
return {
|
||||
id: user.id,
|
||||
email: user.email,
|
||||
name: user.name,
|
||||
name: user.name || undefined,
|
||||
companyId: user.companyId,
|
||||
role: user.role,
|
||||
};
|
||||
@ -98,6 +105,8 @@ export const authOptions: NextAuthOptions = {
|
||||
if (user) {
|
||||
token.companyId = user.companyId;
|
||||
token.role = user.role;
|
||||
token.isPlatformUser = user.isPlatformUser;
|
||||
token.platformRole = user.platformRole;
|
||||
}
|
||||
return token;
|
||||
},
|
||||
@ -105,6 +114,8 @@ export const authOptions: NextAuthOptions = {
|
||||
if (token && session.user) {
|
||||
session.user.companyId = token.companyId;
|
||||
session.user.role = token.role;
|
||||
session.user.isPlatformUser = token.isPlatformUser;
|
||||
session.user.platformRole = token.platformRole;
|
||||
}
|
||||
return session;
|
||||
},
|
||||
|
||||
@ -3,7 +3,7 @@
|
||||
import { PrismaPg } from "@prisma/adapter-pg";
|
||||
import { PrismaClient } from "@prisma/client";
|
||||
import { Pool } from "pg";
|
||||
import { env } from "./env.js";
|
||||
import { env } from "./env";
|
||||
|
||||
// Enhanced connection pool configuration
|
||||
const createConnectionPool = () => {
|
||||
@ -66,8 +66,29 @@ const createConnectionPool = () => {
|
||||
|
||||
// Create adapter with connection pool
|
||||
export const createEnhancedPrismaClient = () => {
|
||||
const pool = createConnectionPool();
|
||||
const adapter = new PrismaPg(pool);
|
||||
// Parse DATABASE_URL to get connection parameters
|
||||
const dbUrl = new URL(env.DATABASE_URL);
|
||||
|
||||
const poolConfig = {
|
||||
host: dbUrl.hostname,
|
||||
port: parseInt(dbUrl.port || "5432"),
|
||||
database: dbUrl.pathname.slice(1), // Remove leading '/'
|
||||
user: dbUrl.username,
|
||||
password: decodeURIComponent(dbUrl.password),
|
||||
ssl: dbUrl.searchParams.get("sslmode") !== "disable" ? { rejectUnauthorized: false } : undefined,
|
||||
|
||||
// Connection pool settings
|
||||
max: 20, // Maximum number of connections
|
||||
idleTimeoutMillis: 30000, // 30 seconds
|
||||
connectionTimeoutMillis: 5000, // 5 seconds
|
||||
query_timeout: 10000, // 10 seconds
|
||||
statement_timeout: 10000, // 10 seconds
|
||||
|
||||
// Connection lifecycle
|
||||
allowExitOnIdle: true,
|
||||
};
|
||||
|
||||
const adapter = new PrismaPg(poolConfig);
|
||||
|
||||
return new PrismaClient({
|
||||
adapter,
|
||||
|
||||
@ -106,7 +106,7 @@ export const env = {
|
||||
// Database Configuration
|
||||
DATABASE_URL: parseEnvValue(process.env.DATABASE_URL) || "",
|
||||
DATABASE_URL_DIRECT: parseEnvValue(process.env.DATABASE_URL_DIRECT) || "",
|
||||
|
||||
|
||||
// Database Connection Pooling
|
||||
DATABASE_CONNECTION_LIMIT: parseIntWithDefault(
|
||||
process.env.DATABASE_CONNECTION_LIMIT,
|
||||
|
||||
@ -213,9 +213,9 @@ export function createErrorResponse(error: AppError) {
|
||||
error.validationErrors && {
|
||||
validationErrors: error.validationErrors,
|
||||
}),
|
||||
...(error instanceof ResourceNotFoundError &&
|
||||
...(error instanceof NotFoundError &&
|
||||
error.resource && { resource: error.resource }),
|
||||
...(error instanceof ResourceNotFoundError &&
|
||||
...(error instanceof NotFoundError &&
|
||||
error.resourceId && {
|
||||
resourceId: error.resourceId,
|
||||
}),
|
||||
|
||||
@ -1,16 +1,16 @@
|
||||
// SessionImport to Session processor
|
||||
import { ProcessingStage, SentimentCategory } from "@prisma/client";
|
||||
import cron from "node-cron";
|
||||
import { withRetry } from "./database-retry.js";
|
||||
import { withRetry } from "./database-retry";
|
||||
import { getSchedulerConfig } from "./env";
|
||||
import { prisma } from "./prisma.js";
|
||||
import { prisma } from "./prisma";
|
||||
import {
|
||||
completeStage,
|
||||
failStage,
|
||||
initializeSession,
|
||||
skipStage,
|
||||
startStage,
|
||||
} from "./processingStatusManager.js";
|
||||
} from "./processingStatusManager";
|
||||
import {
|
||||
fetchTranscriptContent,
|
||||
isValidTranscriptUrl,
|
||||
@ -22,19 +22,23 @@ interface ImportRecord {
|
||||
startTimeRaw: string;
|
||||
endTimeRaw: string;
|
||||
externalSessionId: string;
|
||||
sessionId?: string;
|
||||
userId?: string;
|
||||
category?: string;
|
||||
language?: string;
|
||||
sentiment?: string;
|
||||
escalated?: boolean;
|
||||
forwardedHr?: boolean;
|
||||
avgResponseTime?: number;
|
||||
messagesSent?: number;
|
||||
fullTranscriptUrl?: string;
|
||||
rawTranscriptContent?: string;
|
||||
aiSummary?: string;
|
||||
initialMsg?: string;
|
||||
sessionId?: string | null;
|
||||
userId?: string | null;
|
||||
category: string | null;
|
||||
language: string | null;
|
||||
sentiment?: string | null;
|
||||
escalated?: boolean | null;
|
||||
forwardedHr?: boolean | null;
|
||||
avgResponseTime?: number | null;
|
||||
messagesSent: number | null;
|
||||
fullTranscriptUrl: string | null;
|
||||
rawTranscriptContent: string | null;
|
||||
aiSummary?: string | null;
|
||||
initialMsg?: string | null;
|
||||
ipAddress: string | null;
|
||||
countryCode: string | null;
|
||||
avgResponseTimeSeconds: number | null;
|
||||
initialMessage: string | null;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -245,7 +249,7 @@ async function handleTranscriptFetching(
|
||||
);
|
||||
|
||||
if (transcriptResult.success) {
|
||||
transcriptContent = transcriptResult.content;
|
||||
transcriptContent = transcriptResult.content ?? null;
|
||||
console.log(
|
||||
`[Import Processor] ✓ Fetched transcript for ${importRecord.externalSessionId} (${transcriptContent?.length} chars)`
|
||||
);
|
||||
@ -282,7 +286,7 @@ async function handleTranscriptFetching(
|
||||
});
|
||||
}
|
||||
|
||||
return transcriptContent;
|
||||
return transcriptContent ?? null;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -429,7 +433,10 @@ async function processQueuedImportsInternal(batchSize = 50): Promise<void> {
|
||||
|
||||
// Process with concurrency limit to avoid overwhelming the database
|
||||
const concurrencyLimit = 5;
|
||||
const results = [];
|
||||
const results: Array<{
|
||||
importRecord: typeof unprocessedImports[0];
|
||||
result: Awaited<ReturnType<typeof processSingleImport>>;
|
||||
}> = [];
|
||||
|
||||
for (let i = 0; i < batchPromises.length; i += concurrencyLimit) {
|
||||
const chunk = batchPromises.slice(i, i + concurrencyLimit);
|
||||
|
||||
@ -65,7 +65,7 @@ export const platformAuthOptions: NextAuthOptions = {
|
||||
return {
|
||||
id: platformUser.id,
|
||||
email: platformUser.email,
|
||||
name: platformUser.name,
|
||||
name: platformUser.name || undefined,
|
||||
isPlatformUser: true,
|
||||
platformRole: platformUser.role,
|
||||
};
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
// Enhanced Prisma client setup with connection pooling
|
||||
import { PrismaClient } from "@prisma/client";
|
||||
import { createEnhancedPrismaClient } from "./database-pool.js";
|
||||
import { env } from "./env.js";
|
||||
import { createEnhancedPrismaClient } from "./database-pool";
|
||||
import { env } from "./env";
|
||||
|
||||
// Add prisma to the NodeJS global type
|
||||
declare const global: {
|
||||
|
||||
@ -7,14 +7,14 @@ import {
|
||||
} from "@prisma/client";
|
||||
import cron from "node-cron";
|
||||
import fetch from "node-fetch";
|
||||
import { withRetry } from "./database-retry.js";
|
||||
import { prisma } from "./prisma.js";
|
||||
import { withRetry } from "./database-retry";
|
||||
import { prisma } from "./prisma";
|
||||
import {
|
||||
completeStage,
|
||||
failStage,
|
||||
getSessionsNeedingProcessing,
|
||||
startStage,
|
||||
} from "./processingStatusManager.js";
|
||||
} from "./processingStatusManager";
|
||||
import { getSchedulerConfig } from "./schedulerConfig";
|
||||
|
||||
const OPENAI_API_KEY = process.env.OPENAI_API_KEY;
|
||||
@ -137,15 +137,19 @@ interface ProcessingResult {
|
||||
|
||||
interface SessionMessage {
|
||||
id: string;
|
||||
timestamp: Date;
|
||||
timestamp: Date | null;
|
||||
role: string;
|
||||
content: string;
|
||||
order: number;
|
||||
createdAt: Date;
|
||||
sessionId: string;
|
||||
}
|
||||
|
||||
interface SessionForProcessing {
|
||||
id: string;
|
||||
messages: SessionMessage[];
|
||||
companyId: string;
|
||||
endTime: Date | null;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -250,7 +254,7 @@ async function processQuestions(
|
||||
});
|
||||
|
||||
// Filter and prepare unique questions
|
||||
const uniqueQuestions = [...new Set(questions.filter((q) => q.trim()))];
|
||||
const uniqueQuestions = Array.from(new Set(questions.filter((q) => q.trim())));
|
||||
if (uniqueQuestions.length === 0) return;
|
||||
|
||||
// Batch create questions (skip duplicates)
|
||||
@ -527,7 +531,7 @@ async function processSingleSession(
|
||||
const transcript = session.messages
|
||||
.map(
|
||||
(msg: SessionMessage) =>
|
||||
`[${new Date(msg.timestamp)
|
||||
`[${new Date(msg.timestamp || msg.createdAt)
|
||||
.toLocaleString("en-GB", {
|
||||
day: "2-digit",
|
||||
month: "2-digit",
|
||||
@ -552,7 +556,7 @@ async function processSingleSession(
|
||||
// Calculate endTime from latest Message timestamp
|
||||
const calculatedEndTime = await calculateEndTime(
|
||||
session.id,
|
||||
session.endTime
|
||||
session.endTime || new Date()
|
||||
);
|
||||
|
||||
// Update the session with processed data
|
||||
@ -710,9 +714,8 @@ async function processUnprocessedSessionsInternal(
|
||||
|
||||
// Filter to only sessions that have messages
|
||||
const sessionsWithMessages = sessionsToProcess.filter(
|
||||
(session): session is SessionForProcessing =>
|
||||
session.messages && session.messages.length > 0
|
||||
);
|
||||
(session) => session.messages && session.messages.length > 0
|
||||
) as SessionForProcessing[];
|
||||
|
||||
if (sessionsWithMessages.length === 0) {
|
||||
process.stdout.write(
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
import { ProcessingStage, ProcessingStatus } from "@prisma/client";
|
||||
import { prisma } from "./prisma.js";
|
||||
import { ProcessingStage, ProcessingStatus, type PrismaClient } from "@prisma/client";
|
||||
import { prisma } from "./prisma";
|
||||
|
||||
// Type-safe metadata interfaces
|
||||
interface ProcessingMetadata {
|
||||
@ -11,340 +11,373 @@ interface WhereClause {
|
||||
stage?: ProcessingStage;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize processing status for a session with all stages set to PENDING
|
||||
*/
|
||||
export async function initializeSession(sessionId: string): Promise<void> {
|
||||
const stages = [
|
||||
ProcessingStage.CSV_IMPORT,
|
||||
ProcessingStage.TRANSCRIPT_FETCH,
|
||||
ProcessingStage.SESSION_CREATION,
|
||||
ProcessingStage.AI_ANALYSIS,
|
||||
ProcessingStage.QUESTION_EXTRACTION,
|
||||
];
|
||||
export class ProcessingStatusManager {
|
||||
private prisma: PrismaClient;
|
||||
|
||||
// Create all processing status records for this session
|
||||
await prisma.sessionProcessingStatus.createMany({
|
||||
data: stages.map((stage) => ({
|
||||
sessionId,
|
||||
stage,
|
||||
status: ProcessingStatus.PENDING,
|
||||
})),
|
||||
skipDuplicates: true, // In case some already exist
|
||||
});
|
||||
}
|
||||
constructor(prismaClient?: PrismaClient) {
|
||||
this.prisma = prismaClient || prisma;
|
||||
}
|
||||
|
||||
/**
|
||||
* Start a processing stage
|
||||
*/
|
||||
export async function startStage(
|
||||
sessionId: string,
|
||||
stage: ProcessingStage,
|
||||
metadata?: ProcessingMetadata
|
||||
): Promise<void> {
|
||||
await prisma.sessionProcessingStatus.upsert({
|
||||
where: {
|
||||
sessionId_stage: { sessionId, stage },
|
||||
},
|
||||
update: {
|
||||
status: ProcessingStatus.IN_PROGRESS,
|
||||
startedAt: new Date(),
|
||||
errorMessage: null,
|
||||
metadata: metadata || null,
|
||||
},
|
||||
create: {
|
||||
sessionId,
|
||||
stage,
|
||||
status: ProcessingStatus.IN_PROGRESS,
|
||||
startedAt: new Date(),
|
||||
metadata: metadata || null,
|
||||
},
|
||||
});
|
||||
}
|
||||
/**
|
||||
* Initialize processing status for a session with all stages set to PENDING
|
||||
*/
|
||||
async initializeSession(sessionId: string): Promise<void> {
|
||||
const stages = [
|
||||
ProcessingStage.CSV_IMPORT,
|
||||
ProcessingStage.TRANSCRIPT_FETCH,
|
||||
ProcessingStage.SESSION_CREATION,
|
||||
ProcessingStage.AI_ANALYSIS,
|
||||
ProcessingStage.QUESTION_EXTRACTION,
|
||||
];
|
||||
|
||||
/**
|
||||
* Complete a processing stage successfully
|
||||
*/
|
||||
export async function completeStage(
|
||||
sessionId: string,
|
||||
stage: ProcessingStage,
|
||||
metadata?: ProcessingMetadata
|
||||
): Promise<void> {
|
||||
await prisma.sessionProcessingStatus.upsert({
|
||||
where: {
|
||||
sessionId_stage: { sessionId, stage },
|
||||
},
|
||||
update: {
|
||||
status: ProcessingStatus.COMPLETED,
|
||||
completedAt: new Date(),
|
||||
errorMessage: null,
|
||||
metadata: metadata || null,
|
||||
},
|
||||
create: {
|
||||
sessionId,
|
||||
stage,
|
||||
status: ProcessingStatus.COMPLETED,
|
||||
startedAt: new Date(),
|
||||
completedAt: new Date(),
|
||||
metadata: metadata || null,
|
||||
},
|
||||
});
|
||||
}
|
||||
// Create all processing status records for this session
|
||||
await this.prisma.sessionProcessingStatus.createMany({
|
||||
data: stages.map((stage) => ({
|
||||
sessionId,
|
||||
stage,
|
||||
status: ProcessingStatus.PENDING,
|
||||
})),
|
||||
skipDuplicates: true, // In case some already exist
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark a processing stage as failed
|
||||
*/
|
||||
export async function failStage(
|
||||
sessionId: string,
|
||||
stage: ProcessingStage,
|
||||
errorMessage: string,
|
||||
metadata?: ProcessingMetadata
|
||||
): Promise<void> {
|
||||
await prisma.sessionProcessingStatus.upsert({
|
||||
where: {
|
||||
sessionId_stage: { sessionId, stage },
|
||||
},
|
||||
update: {
|
||||
status: ProcessingStatus.FAILED,
|
||||
completedAt: new Date(),
|
||||
errorMessage,
|
||||
retryCount: { increment: 1 },
|
||||
metadata: metadata || null,
|
||||
},
|
||||
create: {
|
||||
sessionId,
|
||||
stage,
|
||||
status: ProcessingStatus.FAILED,
|
||||
startedAt: new Date(),
|
||||
completedAt: new Date(),
|
||||
errorMessage,
|
||||
retryCount: 1,
|
||||
metadata: metadata || null,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Skip a processing stage (e.g., no transcript URL available)
|
||||
*/
|
||||
export async function skipStage(
|
||||
sessionId: string,
|
||||
stage: ProcessingStage,
|
||||
reason: string
|
||||
): Promise<void> {
|
||||
await prisma.sessionProcessingStatus.upsert({
|
||||
where: {
|
||||
sessionId_stage: { sessionId, stage },
|
||||
},
|
||||
update: {
|
||||
status: ProcessingStatus.SKIPPED,
|
||||
completedAt: new Date(),
|
||||
errorMessage: reason,
|
||||
},
|
||||
create: {
|
||||
sessionId,
|
||||
stage,
|
||||
status: ProcessingStatus.SKIPPED,
|
||||
startedAt: new Date(),
|
||||
completedAt: new Date(),
|
||||
errorMessage: reason,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Get processing status for a specific session
|
||||
*/
|
||||
export async function getSessionStatus(sessionId: string) {
|
||||
return await prisma.sessionProcessingStatus.findMany({
|
||||
where: { sessionId },
|
||||
orderBy: { stage: "asc" },
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Get sessions that need processing for a specific stage
|
||||
*/
|
||||
export async function getSessionsNeedingProcessing(
|
||||
stage: ProcessingStage,
|
||||
limit = 50
|
||||
) {
|
||||
return await prisma.sessionProcessingStatus.findMany({
|
||||
where: {
|
||||
stage,
|
||||
status: ProcessingStatus.PENDING,
|
||||
session: {
|
||||
company: {
|
||||
status: "ACTIVE", // Only process sessions from active companies
|
||||
},
|
||||
/**
|
||||
* Start a processing stage
|
||||
*/
|
||||
async startStage(
|
||||
sessionId: string,
|
||||
stage: ProcessingStage,
|
||||
metadata?: ProcessingMetadata
|
||||
): Promise<void> {
|
||||
await this.prisma.sessionProcessingStatus.upsert({
|
||||
where: {
|
||||
sessionId_stage: { sessionId, stage },
|
||||
},
|
||||
},
|
||||
include: {
|
||||
session: {
|
||||
select: {
|
||||
id: true,
|
||||
companyId: true,
|
||||
importId: true,
|
||||
startTime: true,
|
||||
endTime: true,
|
||||
fullTranscriptUrl: true,
|
||||
import:
|
||||
stage === ProcessingStage.TRANSCRIPT_FETCH
|
||||
? {
|
||||
select: {
|
||||
id: true,
|
||||
fullTranscriptUrl: true,
|
||||
externalSessionId: true,
|
||||
},
|
||||
}
|
||||
: false,
|
||||
update: {
|
||||
status: ProcessingStatus.IN_PROGRESS,
|
||||
startedAt: new Date(),
|
||||
errorMessage: null,
|
||||
metadata: metadata || undefined,
|
||||
},
|
||||
create: {
|
||||
sessionId,
|
||||
stage,
|
||||
status: ProcessingStatus.IN_PROGRESS,
|
||||
startedAt: new Date(),
|
||||
metadata: metadata || undefined,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Complete a processing stage successfully
|
||||
*/
|
||||
async completeStage(
|
||||
sessionId: string,
|
||||
stage: ProcessingStage,
|
||||
metadata?: ProcessingMetadata
|
||||
): Promise<void> {
|
||||
await this.prisma.sessionProcessingStatus.upsert({
|
||||
where: {
|
||||
sessionId_stage: { sessionId, stage },
|
||||
},
|
||||
update: {
|
||||
status: ProcessingStatus.COMPLETED,
|
||||
completedAt: new Date(),
|
||||
errorMessage: null,
|
||||
metadata: metadata || undefined,
|
||||
},
|
||||
create: {
|
||||
sessionId,
|
||||
stage,
|
||||
status: ProcessingStatus.COMPLETED,
|
||||
startedAt: new Date(),
|
||||
completedAt: new Date(),
|
||||
metadata: metadata || undefined,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark a processing stage as failed
|
||||
*/
|
||||
async failStage(
|
||||
sessionId: string,
|
||||
stage: ProcessingStage,
|
||||
errorMessage: string,
|
||||
metadata?: ProcessingMetadata
|
||||
): Promise<void> {
|
||||
await this.prisma.sessionProcessingStatus.upsert({
|
||||
where: {
|
||||
sessionId_stage: { sessionId, stage },
|
||||
},
|
||||
update: {
|
||||
status: ProcessingStatus.FAILED,
|
||||
completedAt: new Date(),
|
||||
errorMessage,
|
||||
retryCount: { increment: 1 },
|
||||
metadata: metadata || undefined,
|
||||
},
|
||||
create: {
|
||||
sessionId,
|
||||
stage,
|
||||
status: ProcessingStatus.FAILED,
|
||||
startedAt: new Date(),
|
||||
completedAt: new Date(),
|
||||
errorMessage,
|
||||
retryCount: 1,
|
||||
metadata: metadata || undefined,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Skip a processing stage (e.g., no transcript URL available)
|
||||
*/
|
||||
async skipStage(
|
||||
sessionId: string,
|
||||
stage: ProcessingStage,
|
||||
reason: string
|
||||
): Promise<void> {
|
||||
await this.prisma.sessionProcessingStatus.upsert({
|
||||
where: {
|
||||
sessionId_stage: { sessionId, stage },
|
||||
},
|
||||
update: {
|
||||
status: ProcessingStatus.SKIPPED,
|
||||
completedAt: new Date(),
|
||||
errorMessage: reason,
|
||||
},
|
||||
create: {
|
||||
sessionId,
|
||||
stage,
|
||||
status: ProcessingStatus.SKIPPED,
|
||||
startedAt: new Date(),
|
||||
completedAt: new Date(),
|
||||
errorMessage: reason,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Get processing status for a specific session
|
||||
*/
|
||||
async getSessionStatus(sessionId: string) {
|
||||
return await this.prisma.sessionProcessingStatus.findMany({
|
||||
where: { sessionId },
|
||||
orderBy: { stage: "asc" },
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Get sessions that need processing for a specific stage
|
||||
*/
|
||||
async getSessionsNeedingProcessing(
|
||||
stage: ProcessingStage,
|
||||
limit = 50
|
||||
) {
|
||||
return await this.prisma.sessionProcessingStatus.findMany({
|
||||
where: {
|
||||
stage,
|
||||
status: ProcessingStatus.PENDING,
|
||||
session: {
|
||||
company: {
|
||||
select: {
|
||||
id: true,
|
||||
csvUsername: true,
|
||||
csvPassword: true,
|
||||
status: "ACTIVE", // Only process sessions from active companies
|
||||
},
|
||||
},
|
||||
},
|
||||
include: {
|
||||
session: {
|
||||
select: {
|
||||
id: true,
|
||||
companyId: true,
|
||||
importId: true,
|
||||
startTime: true,
|
||||
endTime: true,
|
||||
fullTranscriptUrl: true,
|
||||
import:
|
||||
stage === ProcessingStage.TRANSCRIPT_FETCH
|
||||
? {
|
||||
select: {
|
||||
id: true,
|
||||
fullTranscriptUrl: true,
|
||||
externalSessionId: true,
|
||||
},
|
||||
}
|
||||
: false,
|
||||
company: {
|
||||
select: {
|
||||
id: true,
|
||||
csvUsername: true,
|
||||
csvPassword: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
take: limit,
|
||||
orderBy: { session: { createdAt: "asc" } },
|
||||
});
|
||||
}
|
||||
take: limit,
|
||||
orderBy: { session: { createdAt: "asc" } },
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Get pipeline status overview
|
||||
*/
|
||||
export async function getPipelineStatus() {
|
||||
// Get counts by stage and status
|
||||
const statusCounts = await prisma.sessionProcessingStatus.groupBy({
|
||||
by: ["stage", "status"],
|
||||
_count: { id: true },
|
||||
});
|
||||
/**
|
||||
* Get pipeline status overview
|
||||
*/
|
||||
async getPipelineStatus() {
|
||||
// Get counts by stage and status
|
||||
const statusCounts = await this.prisma.sessionProcessingStatus.groupBy({
|
||||
by: ["stage", "status"],
|
||||
_count: { id: true },
|
||||
});
|
||||
|
||||
// Get total sessions
|
||||
const totalSessions = await prisma.session.count();
|
||||
// Get total sessions
|
||||
const totalSessions = await this.prisma.session.count();
|
||||
|
||||
// Organize the data
|
||||
const pipeline: Record<string, Record<string, number>> = {};
|
||||
// Organize the data
|
||||
const pipeline: Record<string, Record<string, number>> = {};
|
||||
|
||||
for (const { stage, status, _count } of statusCounts) {
|
||||
if (!pipeline[stage]) {
|
||||
pipeline[stage] = {};
|
||||
for (const { stage, status, _count } of statusCounts) {
|
||||
if (!pipeline[stage]) {
|
||||
pipeline[stage] = {};
|
||||
}
|
||||
pipeline[stage][status] = _count.id;
|
||||
}
|
||||
pipeline[stage][status] = _count.id;
|
||||
|
||||
return {
|
||||
totalSessions,
|
||||
pipeline,
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
totalSessions,
|
||||
pipeline,
|
||||
};
|
||||
}
|
||||
/**
|
||||
* Get sessions with failed processing
|
||||
*/
|
||||
async getFailedSessions(stage?: ProcessingStage) {
|
||||
const where: WhereClause = {
|
||||
status: ProcessingStatus.FAILED,
|
||||
};
|
||||
|
||||
/**
|
||||
* Get sessions with failed processing
|
||||
*/
|
||||
export async function getFailedSessions(stage?: ProcessingStage) {
|
||||
const where: WhereClause = {
|
||||
status: ProcessingStatus.FAILED,
|
||||
};
|
||||
if (stage) {
|
||||
where.stage = stage;
|
||||
}
|
||||
|
||||
if (stage) {
|
||||
where.stage = stage;
|
||||
}
|
||||
|
||||
return await prisma.sessionProcessingStatus.findMany({
|
||||
where,
|
||||
select: {
|
||||
id: true,
|
||||
sessionId: true,
|
||||
stage: true,
|
||||
status: true,
|
||||
startedAt: true,
|
||||
completedAt: true,
|
||||
errorMessage: true,
|
||||
retryCount: true,
|
||||
session: {
|
||||
select: {
|
||||
id: true,
|
||||
companyId: true,
|
||||
startTime: true,
|
||||
import: {
|
||||
select: {
|
||||
id: true,
|
||||
externalSessionId: true,
|
||||
return await this.prisma.sessionProcessingStatus.findMany({
|
||||
where,
|
||||
select: {
|
||||
id: true,
|
||||
sessionId: true,
|
||||
stage: true,
|
||||
status: true,
|
||||
startedAt: true,
|
||||
completedAt: true,
|
||||
errorMessage: true,
|
||||
retryCount: true,
|
||||
session: {
|
||||
select: {
|
||||
id: true,
|
||||
companyId: true,
|
||||
startTime: true,
|
||||
import: {
|
||||
select: {
|
||||
id: true,
|
||||
externalSessionId: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
orderBy: { completedAt: "desc" },
|
||||
take: 100, // Limit failed sessions to prevent overfetching
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset a failed stage for retry
|
||||
*/
|
||||
export async function resetStageForRetry(
|
||||
sessionId: string,
|
||||
stage: ProcessingStage
|
||||
): Promise<void> {
|
||||
await prisma.sessionProcessingStatus.update({
|
||||
where: {
|
||||
sessionId_stage: { sessionId, stage },
|
||||
},
|
||||
data: {
|
||||
status: ProcessingStatus.PENDING,
|
||||
startedAt: null,
|
||||
completedAt: null,
|
||||
errorMessage: null,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a session has completed a specific stage
|
||||
*/
|
||||
export async function hasCompletedStage(
|
||||
sessionId: string,
|
||||
stage: ProcessingStage
|
||||
): Promise<boolean> {
|
||||
const status = await prisma.sessionProcessingStatus.findUnique({
|
||||
where: {
|
||||
sessionId_stage: { sessionId, stage },
|
||||
},
|
||||
});
|
||||
|
||||
return status?.status === ProcessingStatus.COMPLETED;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a session is ready for a specific stage (previous stages completed)
|
||||
*/
|
||||
export async function isReadyForStage(
|
||||
sessionId: string,
|
||||
stage: ProcessingStage
|
||||
): Promise<boolean> {
|
||||
const stageOrder = [
|
||||
ProcessingStage.CSV_IMPORT,
|
||||
ProcessingStage.TRANSCRIPT_FETCH,
|
||||
ProcessingStage.SESSION_CREATION,
|
||||
ProcessingStage.AI_ANALYSIS,
|
||||
ProcessingStage.QUESTION_EXTRACTION,
|
||||
];
|
||||
|
||||
const currentStageIndex = stageOrder.indexOf(stage);
|
||||
if (currentStageIndex === 0) return true; // First stage is always ready
|
||||
|
||||
// Check if all previous stages are completed
|
||||
const previousStages = stageOrder.slice(0, currentStageIndex);
|
||||
|
||||
for (const prevStage of previousStages) {
|
||||
const isCompleted = await hasCompletedStage(sessionId, prevStage);
|
||||
if (!isCompleted) return false;
|
||||
orderBy: { completedAt: "desc" },
|
||||
take: 100, // Limit failed sessions to prevent overfetching
|
||||
});
|
||||
}
|
||||
|
||||
return true;
|
||||
/**
|
||||
* Reset a failed stage for retry
|
||||
*/
|
||||
async resetStageForRetry(
|
||||
sessionId: string,
|
||||
stage: ProcessingStage
|
||||
): Promise<void> {
|
||||
await this.prisma.sessionProcessingStatus.update({
|
||||
where: {
|
||||
sessionId_stage: { sessionId, stage },
|
||||
},
|
||||
data: {
|
||||
status: ProcessingStatus.PENDING,
|
||||
startedAt: null,
|
||||
completedAt: null,
|
||||
errorMessage: null,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a session has completed a specific stage
|
||||
*/
|
||||
async hasCompletedStage(
|
||||
sessionId: string,
|
||||
stage: ProcessingStage
|
||||
): Promise<boolean> {
|
||||
const status = await this.prisma.sessionProcessingStatus.findUnique({
|
||||
where: {
|
||||
sessionId_stage: { sessionId, stage },
|
||||
},
|
||||
});
|
||||
|
||||
return status?.status === ProcessingStatus.COMPLETED;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a session is ready for a specific stage (previous stages completed)
|
||||
*/
|
||||
async isReadyForStage(
|
||||
sessionId: string,
|
||||
stage: ProcessingStage
|
||||
): Promise<boolean> {
|
||||
const stageOrder = [
|
||||
ProcessingStage.CSV_IMPORT,
|
||||
ProcessingStage.TRANSCRIPT_FETCH,
|
||||
ProcessingStage.SESSION_CREATION,
|
||||
ProcessingStage.AI_ANALYSIS,
|
||||
ProcessingStage.QUESTION_EXTRACTION,
|
||||
];
|
||||
|
||||
const currentStageIndex = stageOrder.indexOf(stage);
|
||||
if (currentStageIndex === 0) return true; // First stage is always ready
|
||||
|
||||
// Check if all previous stages are completed
|
||||
const previousStages = stageOrder.slice(0, currentStageIndex);
|
||||
|
||||
for (const prevStage of previousStages) {
|
||||
const isCompleted = await this.hasCompletedStage(sessionId, prevStage);
|
||||
if (!isCompleted) return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
// Export a singleton instance for backward compatibility
|
||||
export const processingStatusManager = new ProcessingStatusManager();
|
||||
|
||||
// Also export the individual functions for backward compatibility
|
||||
export const initializeSession = (sessionId: string) => processingStatusManager.initializeSession(sessionId);
|
||||
export const startStage = (sessionId: string, stage: ProcessingStage, metadata?: ProcessingMetadata) =>
|
||||
processingStatusManager.startStage(sessionId, stage, metadata);
|
||||
export const completeStage = (sessionId: string, stage: ProcessingStage, metadata?: ProcessingMetadata) =>
|
||||
processingStatusManager.completeStage(sessionId, stage, metadata);
|
||||
export const failStage = (sessionId: string, stage: ProcessingStage, errorMessage: string, metadata?: ProcessingMetadata) =>
|
||||
processingStatusManager.failStage(sessionId, stage, errorMessage, metadata);
|
||||
export const skipStage = (sessionId: string, stage: ProcessingStage, reason: string) =>
|
||||
processingStatusManager.skipStage(sessionId, stage, reason);
|
||||
export const getSessionStatus = (sessionId: string) => processingStatusManager.getSessionStatus(sessionId);
|
||||
export const getSessionsNeedingProcessing = (stage: ProcessingStage, limit?: number) =>
|
||||
processingStatusManager.getSessionsNeedingProcessing(stage, limit);
|
||||
export const getPipelineStatus = () => processingStatusManager.getPipelineStatus();
|
||||
export const getFailedSessions = (stage?: ProcessingStage) => processingStatusManager.getFailedSessions(stage);
|
||||
export const resetStageForRetry = (sessionId: string, stage: ProcessingStage) =>
|
||||
processingStatusManager.resetStageForRetry(sessionId, stage);
|
||||
export const hasCompletedStage = (sessionId: string, stage: ProcessingStage) =>
|
||||
processingStatusManager.hasCompletedStage(sessionId, stage);
|
||||
export const isReadyForStage = (sessionId: string, stage: ProcessingStage) =>
|
||||
processingStatusManager.isReadyForStage(sessionId, stage);
|
||||
@ -1,6 +1,6 @@
|
||||
// Combined scheduler initialization with graceful shutdown
|
||||
|
||||
import { prisma } from "./prisma.js";
|
||||
import { prisma } from "./prisma";
|
||||
import { startProcessingScheduler } from "./processingScheduler";
|
||||
import { startCsvImportScheduler } from "./scheduler";
|
||||
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
// Transcript parsing utility for converting raw transcript content into structured messages
|
||||
import { prisma } from "./prisma.js";
|
||||
import { prisma } from "./prisma";
|
||||
|
||||
export interface ParsedMessage {
|
||||
sessionId: string;
|
||||
@ -156,7 +156,7 @@ export function parseTranscriptToMessages(
|
||||
}
|
||||
|
||||
// Calculate timestamps - use parsed timestamps if available, otherwise distribute across session duration
|
||||
interface MessageWithTimestamp extends ParsedMessage {
|
||||
interface MessageWithTimestamp extends Omit<ParsedMessage, 'timestamp'> {
|
||||
timestamp: Date | string;
|
||||
}
|
||||
const hasTimestamps = messages.some(
|
||||
|
||||
Reference in New Issue
Block a user