Files
livegraphs-django/dashboard_project/data_integration/utils.py
Kaj Kowalski f0ae061fa7 Enhance data integration and transcript parsing
- Improved date parsing in fetch_and_store_chat_data to support multiple formats and added error logging for unparseable dates.
- Enhanced parse_and_store_transcript_messages to handle empty transcripts and expanded message pattern recognition for both User and Assistant.
- Implemented intelligent splitting of transcripts based on detected patterns and timestamps, with fallback mechanisms for unrecognized formats.
- Updated documentation for Celery and Redis setup, troubleshooting, and project structure.
- Added markdown linting configuration and scripts for code formatting.
- Updated Nginx configuration to change the web server port.
- Added xlsxwriter dependency for Excel file handling in project requirements.
2025-05-18 19:18:48 +00:00

622 lines
22 KiB
Python

import csv
import logging
from datetime import datetime
import bleach
import requests
from bleach.css_sanitizer import CSSSanitizer
from django.utils.timezone import make_aware
from .models import ChatMessage, ChatSession, ExternalDataSource
logger = logging.getLogger(__name__)
EXPECTED_HEADERS = [
"session_id",
"start_time",
"end_time",
"ip_address",
"country",
"language",
"messages_sent",
"sentiment",
"escalated",
"forwarded_hr",
"full_transcript",
"avg_response_time",
"tokens",
"tokens_eur",
"category",
"initial_msg",
"user_rating",
]
def fetch_and_store_chat_data(source_id=None):
"""Fetch chat data from an external API and store it in the database.
Args:
source_id: Optional ID of specific ExternalDataSource to use.
If None, will use the first active source.
Returns:
dict: Stats about the operation (sessions created, updated, errors)
"""
if source_id:
source = ExternalDataSource.objects.filter(id=source_id, is_active=True).first()
if not source:
logger.error(f"Data source with ID {source_id} not found or not active.")
return {
"success": False,
"error": f"Data source with ID {source_id} not found or not active.",
}
else:
source = ExternalDataSource.objects.filter(is_active=True).first()
if not source:
logger.warning("No active data source found.")
return {"success": False, "error": "No active data source found."}
stats = {
"sessions_created": 0,
"sessions_updated": 0,
"transcripts_processed": 0,
"errors": 0,
"success": True,
}
try:
# Fetch data from API with timeout from source settings or default
timeout = getattr(source, "timeout", 30)
response = requests.get(
source.api_url,
auth=((source.get_auth_username(), source.get_auth_password()) if source.get_auth_username() else None),
timeout=timeout,
)
response.raise_for_status()
except requests.RequestException as e:
error_msg = f"Error fetching data from API {source.api_url}: {e}"
logger.error(error_msg)
return {"success": False, "error": error_msg}
# Process CSV data
csv_data = response.content.decode("utf-8").splitlines()
reader = csv.reader(csv_data)
# Skip header if present, or use predefined if not
# header = next(reader) # Assuming the first row is a header
# For this specific case, we know the header is missing.
header = EXPECTED_HEADERS
for row in reader:
if not row: # Skip empty rows
continue
try:
# Fix for zip() argument mismatch: pad the row with empty strings if needed
padded_row = row + [""] * (len(header) - len(row))
data = dict(zip(header, padded_row, strict=False))
# Parse date fields with multiple format support
start_time = None
end_time = None
# List of date formats to try
date_formats = [
"%d.%m.%Y %H:%M:%S", # European format: DD.MM.YYYY HH:MM:SS
"%Y-%m-%d %H:%M:%S", # ISO format: YYYY-MM-DD HH:MM:SS
"%m/%d/%Y %H:%M:%S", # US format: MM/DD/YYYY HH:MM:SS
"%Y-%m-%dT%H:%M:%S", # ISO format with T separator
"%Y-%m-%dT%H:%M:%S.%fZ", # ISO format with milliseconds and Z
]
# Try to parse start_time with multiple formats
for date_format in date_formats:
try:
start_time = make_aware(datetime.strptime(data["start_time"], date_format))
break
except (ValueError, TypeError):
continue
# Try to parse end_time with multiple formats
for date_format in date_formats:
try:
end_time = make_aware(datetime.strptime(data["end_time"], date_format))
break
except (ValueError, TypeError):
continue
# If we couldn't parse the dates, log an error and skip this row
if not start_time or not end_time:
error_msg = f"Could not parse date fields for session {data['session_id']}: start_time={data['start_time']}, end_time={data['end_time']}"
logger.error(error_msg)
stats["errors"] += 1
continue
messages_sent = int(data["messages_sent"]) if data["messages_sent"] else None
escalated = data["escalated"].lower() == "true" if data["escalated"] else None
forwarded_hr = data["forwarded_hr"].lower() == "true" if data["forwarded_hr"] else None
avg_response_time = float(data["avg_response_time"]) if data["avg_response_time"] else None
tokens = int(data["tokens"]) if data["tokens"] else None
tokens_eur = float(data["tokens_eur"]) if data["tokens_eur"] else None
user_rating = int(data["user_rating"]) if data["user_rating"] and data["user_rating"].isdigit() else None
session, created = ChatSession.objects.update_or_create(
session_id=data["session_id"],
defaults={
"start_time": start_time,
"end_time": end_time,
"ip_address": data.get("ip_address"),
"country": data.get("country"),
"language": data.get("language"),
"messages_sent": messages_sent,
"sentiment": data.get("sentiment"),
"escalated": escalated,
"forwarded_hr": forwarded_hr,
"full_transcript_url": data.get("full_transcript"),
"avg_response_time": avg_response_time,
"tokens": tokens,
"tokens_eur": tokens_eur,
"category": data.get("category"),
"initial_msg": data.get("initial_msg"),
"user_rating": user_rating,
},
)
if created:
stats["sessions_created"] += 1
logger.info(f"Created session: {session.session_id}")
else:
stats["sessions_updated"] += 1
logger.info(f"Updated session: {session.session_id}")
# Fetch and process transcript if URL is present
if session.full_transcript_url:
transcript_result = fetch_and_store_transcript(session, timeout)
if transcript_result["success"]:
stats["transcripts_processed"] += 1
except Exception as e:
logger.error(f"Error processing row: {row}. Error: {e}", exc_info=True)
stats["errors"] += 1
continue
source.last_synced = make_aware(datetime.now())
source.save()
logger.info("Data sync complete. Stats: {stats}")
return stats
def fetch_and_store_transcript(session, timeout=30):
"""Fetch and process transcript for a chat session.
Args:
session: The ChatSession object
timeout: Timeout in seconds for the request
Returns:
dict: Result of the operation
"""
result = {"success": False, "messages_created": 0, "error": None}
try:
transcript_response = requests.get(session.full_transcript_url, timeout=timeout)
transcript_response.raise_for_status()
transcript_content = transcript_response.content.decode("utf-8")
messages_created = parse_and_store_transcript_messages(session, transcript_content)
result["success"] = True
result["messages_created"] = messages_created
return result
except requests.RequestException as e:
error_msg = f"Error fetching transcript for session {session.session_id}: {e}"
logger.error(error_msg)
result["error"] = error_msg
return result
except Exception as e:
error_msg = f"Error processing transcript for session {session.session_id}: {e}"
logger.error(error_msg, exc_info=True)
result["error"] = error_msg
return result
def parse_and_store_transcript_messages(session, transcript_content):
"""Parse and store messages from a transcript.
This function parses a chat transcript that contains messages from both User and Assistant.
It identifies message boundaries by looking for lines that start with common sender patterns,
and groups all following lines until the next sender change as part of that message.
Args:
session: The ChatSession object
transcript_content: The raw transcript content
Returns:
int: Number of messages created
"""
# Handle empty transcripts
if not transcript_content or transcript_content.strip() == "":
logger.warning(f"Empty transcript received for session {session.session_id}")
return 0
lines = transcript_content.splitlines()
current_sender = None
current_message_lines = []
messages_created = 0
# First, delete existing messages for this session to avoid duplicates
existing_count = ChatMessage.objects.filter(session=session).count()
if existing_count > 0:
logger.info(f"Deleting {existing_count} existing messages for session {session.session_id}")
ChatMessage.objects.filter(session=session).delete()
# Define common message patterns to detect - expanded to include more variations
user_patterns = [
"User:",
"[User]:",
"Customer:",
"[Customer]:",
"Client:",
"[Client]:",
"Human:",
"[Human]:",
"Me:",
"[Me]:",
"Question:",
"User >",
"Customer >",
"User said:",
"Customer said:",
"User writes:",
"User asked:",
"User message:",
"From user:",
"Client message:",
"Q:",
"Input:",
"Query:",
"Person:",
"Visitor:",
"Guest:",
"User input:",
"User query:",
]
assistant_patterns = [
"Assistant:",
"[Assistant]:",
"Agent:",
"[Agent]:",
"Bot:",
"[Bot]:",
"AI:",
"[AI]:",
"ChatGPT:",
"[ChatGPT]:",
"System:",
"[System]:",
"Support:",
"[Support]:",
"Answer:",
"Assistant >",
"Bot >",
"Assistant said:",
"Assistant writes:",
"AI responded:",
"LLM:",
"[LLM]:",
"Response:",
"A:",
"Output:",
"AI output:",
"Model:",
"[Model]:",
"Assistant message:",
"From assistant:",
"Bot response:",
"AI says:",
"NotsoAI:",
"[NotsoAI]:",
"Notso:",
"[Notso]:",
]
# Function to save current message before starting a new one
def save_current_message():
nonlocal current_sender, current_message_lines, messages_created
if current_sender and current_message_lines:
message_text = "\n".join(current_message_lines)
# Only save if there's actual content (not just whitespace)
if message_text.strip() and save_message(session, current_sender, message_text):
messages_created += 1
logger.debug(f"Saved {current_sender} message with {len(current_message_lines)} lines")
# Initial scan to detect format type and potential message boundaries
has_recognized_patterns = False
potential_timestamps = []
timestamp_pattern_count = 0
# Regex patterns for common timestamp formats
import re
timestamp_patterns = [
r"^\[\d{2}:\d{2}:\d{2}\]", # [HH:MM:SS]
r"^\[\d{2}:\d{2}\]", # [HH:MM]
r"^\(\d{2}:\d{2}:\d{2}\)", # (HH:MM:SS)
r"^\(\d{2}:\d{2}\)", # (HH:MM)
r"^\d{2}:\d{2}:\d{2} -", # HH:MM:SS -
r"^\d{2}:\d{2} -", # HH:MM -
r"^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}", # YYYY-MM-DD HH:MM:SS
]
# First pass: detect format and message boundaries
for i, line in enumerate(lines):
line_stripped = line.strip()
# Check for standard message patterns
if any(line_stripped.startswith(pattern) for pattern in user_patterns + assistant_patterns):
has_recognized_patterns = True
# Check for timestamp patterns that might indicate message boundaries
for pattern in timestamp_patterns:
if re.match(pattern, line_stripped):
timestamp_pattern_count += 1
potential_timestamps.append(i)
break
# If no recognized patterns are found, try to intelligently split the transcript
if not has_recognized_patterns and len(lines) > 0:
logger.info(
f"No standard message patterns found in transcript for session {session.session_id}. Attempting intelligent split."
)
# Try timestamp-based parsing if we have enough consistent timestamps
if timestamp_pattern_count > 3 and timestamp_pattern_count > 0.2 * len(lines):
logger.info(f"Attempting timestamp-based parsing with {timestamp_pattern_count} detected timestamps")
# Add the end of file as a boundary
potential_timestamps.append(len(lines))
# Process messages between timestamps
for i in range(len(potential_timestamps) - 1):
start_idx = potential_timestamps[i]
end_idx = potential_timestamps[i + 1]
message_content = "\n".join(lines[start_idx:end_idx])
first_line = lines[start_idx].lower()
# Simple heuristic to identify sender
is_user = any(
user_word in first_line
for user_word in ["user", "customer", "client", "human", "question", "query"]
)
is_assistant = any(
assistant_word in first_line
for assistant_word in ["assistant", "agent", "bot", "ai", "system", "support", "answer", "response"]
)
sender = "User" if (is_user or (not is_assistant and i % 2 == 0)) else "Assistant"
if save_message(session, sender, message_content):
messages_created += 1
logger.info(f"Created {messages_created} messages using timestamp-based parsing")
return messages_created
# Simple heuristic: alternate between user and assistant, with first message from user
# Start with paragraphs (blank line separations) as message boundaries
paragraphs = []
current_paragraph = []
for line in lines:
if line.strip():
current_paragraph.append(line)
elif current_paragraph: # Empty line and we have a paragraph
paragraphs.append("\n".join(current_paragraph))
current_paragraph = []
# Add the last paragraph if it's not empty
if current_paragraph:
paragraphs.append("\n".join(current_paragraph))
# If we have just one paragraph, try to split by sentence boundaries for very long transcripts
if len(paragraphs) == 1 and len(paragraphs[0].split()) > 100:
import re
# Try to split by sentence boundaries
text = paragraphs[0]
# Define sentence ending patterns
sentence_endings = r"(?<=[.!?])\s+"
sentences = re.split(sentence_endings, text)
# Group sentences into logical chunks (assuming alternating speakers)
chunks = []
current_chunk = []
for i, sentence in enumerate(sentences):
current_chunk.append(sentence)
# Every 2-3 sentences or on a natural break like a question mark
if (i % 2 == 1 and sentence.endswith("?")) or len(current_chunk) >= 3:
chunks.append(" ".join(current_chunk))
current_chunk = []
# Add any remaining sentences
if current_chunk:
chunks.append(" ".join(current_chunk))
# Save the chunks alternating between user and assistant
for i, chunk in enumerate(chunks):
if chunk.strip():
sender = "User" if i % 2 == 0 else "Assistant"
if save_message(session, sender, chunk):
messages_created += 1
logger.info(f"Created {messages_created} messages by splitting single paragraph into sentences")
return messages_created
# Save messages alternating between user and assistant
for i, paragraph in enumerate(paragraphs):
if paragraph.strip(): # Only save non-empty paragraphs
sender = "User" if i % 2 == 0 else "Assistant"
if save_message(session, sender, paragraph):
messages_created += 1
logger.info(f"Created {messages_created} messages using intelligent split for session {session.session_id}")
return messages_created
# Standard processing with recognized patterns
for line in lines:
line_stripped = line.strip()
# Skip empty lines at the beginning
if not line_stripped and not current_sender:
continue
# Check if this line indicates a new sender
is_user_message = any(line_stripped.startswith(pattern) for pattern in user_patterns)
is_assistant_message = any(line_stripped.startswith(pattern) for pattern in assistant_patterns)
if is_user_message:
# Save previous message if any
save_current_message()
# Start new user message
current_sender = "User"
# Remove the prefix from the line
for pattern in user_patterns:
if line_stripped.startswith(pattern):
line = line[len(pattern) :].strip()
break
current_message_lines = [line] if line.strip() else []
elif is_assistant_message:
# Save previous message if any
save_current_message()
# Start new assistant message
current_sender = "Assistant"
# Remove the prefix from the line
for pattern in assistant_patterns:
if line_stripped.startswith(pattern):
line = line[len(pattern) :].strip()
break
current_message_lines = [line] if line.strip() else []
elif current_sender:
# Continue adding to current message
current_message_lines.append(line)
else:
# If we get here with no current_sender, assume it's the start of a user message
logger.warning(f"Found line without sender prefix: '{line}'. Assuming User message.")
current_sender = "User"
current_message_lines = [line]
# Save the last message
save_current_message()
# Handle case with no messages parsed (possibly incorrectly formatted transcript)
if messages_created == 0 and lines:
logger.warning(
f"No messages were parsed from transcript for session {session.session_id}. Using fallback parsing."
)
# Fallback: Just split the transcript in half, first part user, second part assistant
mid_point = len(lines) // 2
user_content = "\n".join(lines[:mid_point])
assistant_content = "\n".join(lines[mid_point:])
# Save the split messages if they have content
if user_content.strip() and save_message(session, "User", user_content):
messages_created += 1
if assistant_content.strip() and save_message(session, "Assistant", assistant_content):
messages_created += 1
logger.info(f"Created {messages_created} messages using fallback parsing")
logger.info(f"Created {messages_created} messages for session {session.session_id}")
return messages_created
def save_message(session, sender, message_text):
"""Save a message for a chat session.
Args:
session: The ChatSession object
sender: The sender of the message ("User" or "Assistant")
message_text: The message text, which may contain HTML
Returns:
bool: True if message was created, False otherwise
"""
if not message_text.strip():
return False
try:
# Create a CSS sanitizer with allowed CSS properties
css_sanitizer = CSSSanitizer(
allowed_css_properties=[
"color",
"background-color",
"font-family",
"font-size",
"font-weight",
"font-style",
"text-decoration",
"text-align",
"margin",
"margin-left",
"margin-right",
"margin-top",
"margin-bottom",
"padding",
"padding-left",
"padding-right",
"padding-top",
"padding-bottom",
"border",
"border-radius",
"width",
"height",
"line-height",
]
)
# Sanitize HTML content before saving if necessary
safe_html = bleach.clean(
message_text,
tags=[
"b",
"i",
"u",
"em",
"strong",
"a",
"br",
"p",
"ul",
"ol",
"li",
"span",
"div",
"pre",
"code",
"blockquote",
],
attributes={
"a": ["href", "title", "target"],
"span": ["style", "class"],
"div": ["style", "class"],
"p": ["style", "class"],
"pre": ["style", "class"],
},
css_sanitizer=css_sanitizer,
strip=True,
)
ChatMessage.objects.create(
session=session,
sender=sender,
message=message_text,
safe_html_message=safe_html,
)
logger.debug(f"Stored message for session {session.session_id} from {sender}")
return True
except Exception as e:
logger.error(f"Error saving message for session {session.session_id}: {e}", exc_info=True)
return False