Proven
Background tasks in asynchronous applications run independently of the main request/response cycle, making error handling and task lifecycle management critical for application stability. Without proper error handling, background task failures can crash applications, lose important work, or leave the system in inconsistent states. The Background Task Error Handling Pattern addresses:
The Background Task Error Handling Pattern provides robust error handling and lifecycle management for background tasks through comprehensive task tracking, context preservation, and structured error recovery. The pattern wraps background task execution with monitoring, logging, and recovery mechanisms to ensure system stability and observability.
# Background task error handling framework
class BackgroundTaskManager:
def __init__(self):
self.active_tasks: Set[asyncio.Task] = set()
self.task_registry: Dict[str, TaskInfo] = {}
self.error_handlers: Dict[str, Callable] = {}
async def execute_task(self, task_func: Callable, task_name: str, **kwargs) -> TaskResult:
"""Execute background task with comprehensive error handling"""
pass
def register_error_handler(self, task_type: str, handler: Callable):
"""Register custom error handler for specific task types"""
pass
async def cleanup_completed_tasks(self):
"""Clean up completed tasks to prevent resource leaks"""
pass
import asyncio
import uuid
from typing import Set, Dict, Any, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime
import structlog
logger = structlog.get_logger()
@dataclass
class TaskMetrics:
"""Track comprehensive metrics for background tasks"""
task_id: str
name: str
created_at: datetime = field(default_factory=datetime.utcnow)
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
error_at: Optional[datetime] = None
success: bool = False
error_message: Optional[str] = None
retry_count: int = 0
context: Dict[str, Any] = field(default_factory=dict)
def mark_started(self):
"""Mark task as started"""
self.started_at = datetime.utcnow()
def mark_completed(self, success: bool = True, error: Optional[str] = None):
"""Mark task as completed"""
self.completed_at = datetime.utcnow()
self.success = success
if not success and error:
self.error_message = error
self.error_at = datetime.utcnow()
def duration(self) -> Optional[float]:
"""Calculate task duration in seconds"""
if self.started_at and self.completed_at:
return (self.completed_at - self.started_at).total_seconds()
return None
class RobustTaskManager:
"""Manages background tasks with context preservation and comprehensive tracking"""
def __init__(self):
self.active_tasks: Set[asyncio.Task] = set()
self.task_metrics: Dict[str, TaskMetrics] = {}
self.task_results: Dict[str, Any] = {}
self.context: Dict[str, Any] = {}
self.correlation_id: Optional[str] = None
self.error_handlers: Dict[str, Callable] = {}
def set_context(self, context: Dict[str, Any], correlation_id: Optional[str] = None):
"""Set execution context for task tracking"""
self.context = context.copy()
self.correlation_id = correlation_id or str(uuid.uuid4())
def add_task(self, task_name: str, task_data: Dict[str, Any]) -> str:
"""Add a task to the manager for tracking"""
task_id = str(uuid.uuid4())
metrics = TaskMetrics(
task_id=task_id,
name=task_name,
context={
**self.context,
**task_data,
'correlation_id': self.correlation_id
}
)
self.task_metrics[task_id] = metrics
logger.info(
"Task registered for execution",
task_id=task_id,
task_name=task_name,
correlation_id=self.correlation_id
)
return task_id
def start_task(self, task_name: str) -> bool:
"""Mark a task as started"""
for task_id, metrics in self.task_metrics.items():
if metrics.name == task_name and metrics.started_at is None:
metrics.mark_started()
logger.info(
"Task execution started",
task_id=task_id,
task_name=task_name,
correlation_id=self.correlation_id
)
return True
return False
def complete_task(self, task_name: str, result: Dict[str, Any]) -> bool:
"""Mark a task as completed with result"""
for task_id, metrics in self.task_metrics.items():
if metrics.name == task_name and metrics.completed_at is None:
metrics.mark_completed(success=True)
self.task_results[task_id] = result
logger.info(
"Task completed successfully",
task_id=task_id,
task_name=task_name,
duration=metrics.duration(),
correlation_id=self.correlation_id
)
return True
return False
def fail_task(self, task_name: str, error: Exception) -> bool:
"""Mark a task as failed with error details"""
for task_id, metrics in self.task_metrics.items():
if metrics.name == task_name and metrics.completed_at is None:
metrics.mark_completed(success=False, error=str(error))
logger.error(
"Task execution failed",
task_id=task_id,
task_name=task_name,
error=str(error),
duration=metrics.duration(),
correlation_id=self.correlation_id
)
# Execute custom error handler if registered
if task_name in self.error_handlers:
try:
self.error_handlers[task_name](task_id, error, metrics.context)
except Exception as handler_error:
logger.error(
"Error handler failed",
task_name=task_name,
handler_error=str(handler_error)
)
return True
return False
async def execute_with_tracking(self, task_func: Callable, task_name: str, **kwargs) -> Any:
"""Execute function with comprehensive error handling and tracking"""
task_id = self.add_task(task_name, kwargs)
try:
self.start_task(task_name)
result = await task_func(**kwargs)
self.complete_task(task_name, {"result": result})
return result
except Exception as e:
self.fail_task(task_name, e)
# Re-raise to maintain error propagation
raise
finally:
# Clean up task tracking
await self._cleanup_task(task_id)
async def _cleanup_task(self, task_id: str):
"""Clean up completed task to prevent memory leaks"""
if task_id in self.task_metrics:
# Keep metrics for a short time for debugging
await asyncio.sleep(0.1) # Allow logging to complete
# In production, might move to a separate cleanup process
class RetryableTaskManager(RobustTaskManager):
"""Extended task manager with retry capabilities"""
def __init__(self, max_retries: int = 3, retry_delay: float = 1.0):
super().__init__()
self.max_retries = max_retries
self.retry_delay = retry_delay
async def execute_with_retry(self, task_func: Callable, task_name: str, **kwargs) -> Any:
"""Execute task with automatic retry on failure"""
task_id = self.add_task(task_name, kwargs)
last_exception = None
for attempt in range(self.max_retries + 1):
try:
if attempt > 0:
# Update retry count in metrics
if task_id in self.task_metrics:
self.task_metrics[task_id].retry_count = attempt
logger.info(
"Retrying task execution",
task_id=task_id,
task_name=task_name,
attempt=attempt,
max_retries=self.max_retries
)
await asyncio.sleep(self.retry_delay * attempt) # Exponential backoff
self.start_task(task_name)
result = await task_func(**kwargs)
self.complete_task(task_name, {"result": result, "attempts": attempt + 1})
return result
except Exception as e:
last_exception = e
logger.warning(
"Task attempt failed",
task_id=task_id,
task_name=task_name,
attempt=attempt + 1,
error=str(e)
)
if attempt == self.max_retries:
# Final failure
self.fail_task(task_name, e)
break
# All retries exhausted
logger.error(
"Task failed after all retries",
task_id=task_id,
task_name=task_name,
total_attempts=self.max_retries + 1,
final_error=str(last_exception)
)
if last_exception:
raise last_exception
# Usage example with graceful degradation
async def process_background_workflow(workflow_data: Dict[str, Any]):
"""Example of background task with comprehensive error handling"""
task_manager = RetryableTaskManager(max_retries=2)
try:
# Execute critical task with retry
result = await task_manager.execute_with_retry(
process_critical_step,
"critical_workflow_step",
data=workflow_data
)
# Execute non-critical task without retry
await task_manager.execute_with_tracking(
process_optional_step,
"optional_workflow_step",
data=workflow_data,
result=result
)
except Exception as e:
logger.error(
"Background workflow failed",
error=str(e),
workflow_id=workflow_data.get('id')
)
# Graceful degradation - don't crash the application
return {"status": "partial_failure", "error": str(e)}
return {"status": "success"}
pattern-catalog.md: Section 17 “Background Task Error Handling Pattern” - core implementation and usage guidelinesPATTERN-INDEX.md: No direct equivalent - this is an infrastructure patterndocs/architecture/pattern-catalog.md#17-background-task-error-handling-patternservices/orchestration/services/background/services/utils/async_helpers.pyLast updated: September 15, 2025