mirror of
https://github.com/kjanat/livegraphs-django.git
synced 2026-01-16 09:52:09 +01:00
Implement data integration tasks with Celery, including periodic fetching and manual refresh of chat data; add utility functions for data processing and transcript handling; create views and URLs for manual data refresh; establish Redis and Celery configuration; enhance error handling and logging; introduce scripts for data cleanup and fixing dashboard data; update documentation for Redis and Celery setup and troubleshooting.
This commit is contained in:
116
dashboard_project/data_integration/tasks.py
Normal file
116
dashboard_project/data_integration/tasks.py
Normal file
@ -0,0 +1,116 @@
|
||||
import logging
|
||||
import os
|
||||
|
||||
from celery import shared_task
|
||||
from django.db import utils as django_db_utils
|
||||
from django.utils import timezone
|
||||
|
||||
from .models import ExternalDataSource
|
||||
from .utils import fetch_and_store_chat_data
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@shared_task(name="data_integration.tasks.test_task", bind=True)
|
||||
def test_task(self):
|
||||
"""A simple test task to verify Celery is working without external dependencies."""
|
||||
logger.info("Test task executed at %s (task_id: %s)", timezone.now(), self.request.id)
|
||||
return "Test task completed successfully!"
|
||||
|
||||
|
||||
@shared_task(
|
||||
name="data_integration.tasks.periodic_fetch_chat_data",
|
||||
bind=True,
|
||||
autoretry_for=(Exception,),
|
||||
retry_kwargs={"max_retries": 3, "countdown": 60},
|
||||
soft_time_limit=int(os.environ.get("FETCH_DATA_TIMEOUT", 300)), # 5 minutes default
|
||||
)
|
||||
def periodic_fetch_chat_data(self):
|
||||
"""Periodically fetch and process chat data from external sources.
|
||||
|
||||
This task:
|
||||
1. Fetches data from all active external data sources
|
||||
2. Processes and stores the data in the database
|
||||
3. Updates the last_synced timestamp on each source
|
||||
4. Handles errors with retries
|
||||
"""
|
||||
logger.info("Starting periodic chat data fetch (task_id: %s)...", self.request.id)
|
||||
try:
|
||||
# Get all active data sources
|
||||
active_sources = ExternalDataSource.objects.filter(is_active=True)
|
||||
|
||||
if not active_sources.exists():
|
||||
logger.warning("No active external data sources found. Skipping fetch.")
|
||||
return "No active data sources found"
|
||||
|
||||
successful_sources = []
|
||||
failed_sources = []
|
||||
|
||||
for source in active_sources:
|
||||
try:
|
||||
logger.info(f"Processing source: {source.name} (ID: {source.id})")
|
||||
fetch_and_store_chat_data(source_id=source.id)
|
||||
source.last_synced = timezone.now()
|
||||
# Check if error_count field exists in the model
|
||||
update_fields = ["last_synced"]
|
||||
try:
|
||||
source.error_count = 0
|
||||
source.last_error = None
|
||||
update_fields.extend(["error_count", "last_error"])
|
||||
except AttributeError:
|
||||
# Fields might not exist yet if migrations haven't been applied
|
||||
logger.warning("New fields not available. Run migrations to enable error tracking.")
|
||||
source.save(update_fields=update_fields)
|
||||
successful_sources.append(source.name)
|
||||
except Exception as e:
|
||||
logger.error(f"Error fetching data from source {source.name}: {e}", exc_info=True)
|
||||
try:
|
||||
source.error_count = getattr(source, "error_count", 0) + 1
|
||||
source.last_error = str(e)[:255] # Truncate to fit in the field
|
||||
source.save(update_fields=["error_count", "last_error"])
|
||||
except (AttributeError, django_db_utils.OperationalError):
|
||||
# If fields don't exist, just update last_synced
|
||||
logger.warning("Could not update error fields. Run migrations to enable error tracking.")
|
||||
source.last_synced = timezone.now()
|
||||
source.save(update_fields=["last_synced"])
|
||||
failed_sources.append(source.name)
|
||||
|
||||
if failed_sources and not successful_sources:
|
||||
# If all sources failed, we should raise an exception to trigger retry
|
||||
raise Exception(f"All data sources failed: {', '.join(failed_sources)}")
|
||||
|
||||
result_message = f"Completed: {len(successful_sources)} successful, {len(failed_sources)} failed"
|
||||
logger.info(result_message)
|
||||
return result_message
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error during periodic chat data fetch: {e}", exc_info=True)
|
||||
raise # Re-raise to trigger Celery retry
|
||||
|
||||
|
||||
@shared_task(name="data_integration.tasks.refresh_specific_source", bind=True)
|
||||
def refresh_specific_source(self, source_id):
|
||||
"""Manually refresh a specific data source.
|
||||
|
||||
Args:
|
||||
source_id: ID of the ExternalDataSource to refresh
|
||||
"""
|
||||
logger.info(f"Starting manual refresh of data source ID: {source_id} (task_id: {self.request.id})")
|
||||
try:
|
||||
source = ExternalDataSource.objects.get(id=source_id)
|
||||
fetch_and_store_chat_data(source_id=source_id)
|
||||
source.last_synced = timezone.now()
|
||||
source.error_count = 0
|
||||
source.last_error = None
|
||||
source.save(update_fields=["last_synced", "error_count", "last_error"])
|
||||
logger.info(f"Manual refresh of data source {source.name} completed successfully")
|
||||
return f"Successfully refreshed data source: {source.name}"
|
||||
except ExternalDataSource.DoesNotExist:
|
||||
logger.error(f"Data source with ID {source_id} does not exist")
|
||||
return f"Error: Data source with ID {source_id} does not exist"
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Error during manual refresh of data source {source_id}: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
return f"Error: {str(e)}"
|
||||
Reference in New Issue
Block a user