feat: Implement configurable scheduler settings and enhance CSV import functionality

This commit is contained in:
Max Kowalski
2025-06-27 16:55:25 +02:00
parent 1dd618b666
commit 50b230aa9b
10 changed files with 457 additions and 48 deletions

View File

@ -1,32 +1,8 @@
// Session processing scheduler - TypeScript version
// Session processing scheduler with configurable intervals and batch sizes
import cron from "node-cron";
import { PrismaClient } from "@prisma/client";
import fetch from "node-fetch";
import { readFileSync } from "fs";
import { fileURLToPath } from "url";
import { dirname, join } from "path";
// Load environment variables from .env.local
const __filename = fileURLToPath(import.meta.url);
const __dirname = dirname(__filename);
const envPath = join(__dirname, '..', '.env.local');
try {
const envFile = readFileSync(envPath, 'utf8');
const envVars = envFile.split('\n').filter(line => line.trim() && !line.startsWith('#'));
envVars.forEach(line => {
const [key, ...valueParts] = line.split('=');
if (key && valueParts.length > 0) {
const value = valueParts.join('=').trim();
if (!process.env[key.trim()]) {
process.env[key.trim()] = value;
}
}
});
} catch (error) {
// Silently fail if .env.local doesn't exist
}
import { getSchedulerConfig } from "./schedulerConfig";
const prisma = new PrismaClient();
const OPENAI_API_KEY = process.env.OPENAI_API_KEY;
@ -399,21 +375,30 @@ export async function processUnprocessedSessions(batchSize: number | null = null
}
/**
* Start the processing scheduler
* Start the processing scheduler with configurable settings
*/
export function startProcessingScheduler(): void {
// Process unprocessed sessions every hour
cron.schedule("0 * * * *", async () => {
const config = getSchedulerConfig();
if (!config.enabled) {
console.log('[Processing Scheduler] Disabled via configuration');
return;
}
console.log(`[Processing Scheduler] Starting with interval: ${config.sessionProcessing.interval}`);
console.log(`[Processing Scheduler] Batch size: ${config.sessionProcessing.batchSize === 0 ? 'unlimited' : config.sessionProcessing.batchSize}`);
console.log(`[Processing Scheduler] Concurrency: ${config.sessionProcessing.concurrency}`);
cron.schedule(config.sessionProcessing.interval, async () => {
try {
await processUnprocessedSessions();
await processUnprocessedSessions(
config.sessionProcessing.batchSize === 0 ? null : config.sessionProcessing.batchSize,
config.sessionProcessing.concurrency
);
} catch (error) {
process.stderr.write(
`[ProcessingScheduler] Error in scheduler: ${error}\n`
);
}
});
process.stdout.write(
"[ProcessingScheduler] Started processing scheduler (runs hourly).\n"
);
}

View File

@ -1,10 +1,20 @@
// node-cron job to auto-refresh session data every 15 mins
// CSV import scheduler with configurable intervals
import cron from "node-cron";
import { prisma } from "./prisma";
import { fetchAndParseCsv } from "./csvFetcher";
import { getSchedulerConfig } from "./schedulerConfig";
export function startScheduler() {
cron.schedule("*/15 * * * *", async () => {
export function startCsvImportScheduler() {
const config = getSchedulerConfig();
if (!config.enabled) {
console.log('[CSV Import Scheduler] Disabled via configuration');
return;
}
console.log(`[CSV Import Scheduler] Starting with interval: ${config.csvImport.interval}`);
cron.schedule(config.csvImport.interval, async () => {
const companies = await prisma.company.findMany();
for (const company of companies) {
try {

82
lib/schedulerConfig.ts Normal file
View File

@ -0,0 +1,82 @@
// Unified scheduler configuration
import { readFileSync } from "fs";
import { fileURLToPath } from "url";
import { dirname, join } from "path";
// Load environment variables from .env.local
const __filename = fileURLToPath(import.meta.url);
const __dirname = dirname(__filename);
const envPath = join(__dirname, '..', '.env.local');
// Load .env.local if it exists
try {
const envFile = readFileSync(envPath, 'utf8');
const envVars = envFile.split('\n').filter(line => line.trim() && !line.startsWith('#'));
envVars.forEach(line => {
const [key, ...valueParts] = line.split('=');
if (key && valueParts.length > 0) {
const value = valueParts.join('=').trim();
if (!process.env[key.trim()]) {
process.env[key.trim()] = value;
}
}
});
} catch (error) {
// Silently fail if .env.local doesn't exist
}
export interface SchedulerConfig {
enabled: boolean;
csvImport: {
interval: string;
};
sessionProcessing: {
interval: string;
batchSize: number; // 0 = unlimited
concurrency: number;
};
}
/**
* Get scheduler configuration from environment variables
*/
export function getSchedulerConfig(): SchedulerConfig {
const enabled = process.env.SCHEDULER_ENABLED === 'true';
// Default values
const defaults = {
csvImportInterval: '*/15 * * * *', // Every 15 minutes
sessionProcessingInterval: '0 * * * *', // Every hour
sessionProcessingBatchSize: 0, // Unlimited
sessionProcessingConcurrency: 5,
};
return {
enabled,
csvImport: {
interval: process.env.CSV_IMPORT_INTERVAL || defaults.csvImportInterval,
},
sessionProcessing: {
interval: process.env.SESSION_PROCESSING_INTERVAL || defaults.sessionProcessingInterval,
batchSize: parseInt(process.env.SESSION_PROCESSING_BATCH_SIZE || '0', 10) || defaults.sessionProcessingBatchSize,
concurrency: parseInt(process.env.SESSION_PROCESSING_CONCURRENCY || '5', 10) || defaults.sessionProcessingConcurrency,
},
};
}
/**
* Log scheduler configuration
*/
export function logSchedulerConfig(config: SchedulerConfig): void {
if (!config.enabled) {
console.log('[Scheduler] Schedulers are DISABLED (SCHEDULER_ENABLED=false)');
return;
}
console.log('[Scheduler] Configuration:');
console.log(` CSV Import: ${config.csvImport.interval}`);
console.log(` Session Processing: ${config.sessionProcessing.interval}`);
console.log(` Batch Size: ${config.sessionProcessing.batchSize === 0 ? 'unlimited' : config.sessionProcessing.batchSize}`);
console.log(` Concurrency: ${config.sessionProcessing.concurrency}`);
}