Pattern-015: Internal Task Handler Pattern

Status

Proven

Context

Complex orchestration systems need to handle various task types without creating excessive indirection or separate handler classes. Traditional approaches create separate handler classes for each task type, leading to scattered logic, difficult debugging, and complex state management. The Internal Task Handler Pattern addresses:

Pattern Description

The Internal Task Handler Pattern handles orchestration tasks using internal engine methods rather than separate handler classes, ensuring direct access to engine state, reducing indirection, and simplifying the codebase.

Core concept:

Implementation

Orchestration Engine with Internal Handlers

from typing import Dict, Callable, Any
from enum import Enum

class TaskType(Enum):
    FILE_ANALYSIS = "file_analysis"
    GITHUB_CREATE_ISSUE = "github_create_issue"
    WORKFLOW_EXECUTE = "workflow_execute"
    VALIDATION_CHECK = "validation_check"

class Task:
    def __init__(self, task_type: TaskType, data: Dict[str, Any]):
        self.type = task_type
        self.data = data

class OrchestrationEngine:
    """Main orchestration engine with internal task handlers"""

    def __init__(self, github_agent, file_service, validation_service):
        self.github_agent = github_agent
        self.file_service = file_service
        self.validation_service = validation_service

        # Internal handler mapping
        self.task_handlers: Dict[TaskType, Callable] = {
            TaskType.FILE_ANALYSIS: self._analyze_file,
            TaskType.GITHUB_CREATE_ISSUE: self._create_github_issue,
            TaskType.WORKFLOW_EXECUTE: self._execute_workflow,
            TaskType.VALIDATION_CHECK: self._validate_context,
        }

    async def handle_task(self, task: Task, context: Dict[str, Any]) -> Dict[str, Any]:
        """Route task to appropriate internal handler"""
        handler = self.task_handlers.get(task.type)
        if not handler:
            raise NotImplementedError(f"No handler for task type: {task.type}")

        try:
            result = await handler(task, context)
            return {"status": "success", "result": result}
        except Exception as e:
            return {"status": "error", "error": str(e)}

    async def _analyze_file(self, task: Task, context: Dict[str, Any]) -> Dict[str, Any]:
        """Handle file analysis tasks"""
        file_path = task.data.get("file_path")
        analysis_type = task.data.get("analysis_type", "basic")

        if not file_path:
            raise ValueError("file_path required for file analysis")

        # Direct access to engine dependencies
        analysis_result = await self.file_service.analyze(
            file_path=file_path,
            analysis_type=analysis_type,
            project_context=context.get("project")
        )

        return {
            "file_path": file_path,
            "analysis": analysis_result,
            "timestamp": context.get("timestamp")
        }

    async def _create_github_issue(self, task: Task, context: Dict[str, Any]) -> Dict[str, Any]:
        """Handle GitHub issue creation tasks"""
        repo = context.get("repository")
        title = task.data.get("title")
        body = task.data.get("body")
        labels = task.data.get("labels", [])

        if not all([repo, title, body]):
            raise ValueError("repository, title, and body required for issue creation")

        # Direct access to GitHub agent
        issue_result = await self.github_agent.create_issue(
            repository=repo,
            title=title,
            body=body,
            labels=labels
        )

        return {
            "issue_url": issue_result.get("html_url"),
            "issue_number": issue_result.get("number"),
            "repository": repo
        }

    async def _execute_workflow(self, task: Task, context: Dict[str, Any]) -> Dict[str, Any]:
        """Handle workflow execution tasks"""
        workflow_id = task.data.get("workflow_id")
        parameters = task.data.get("parameters", {})

        if not workflow_id:
            raise ValueError("workflow_id required for workflow execution")

        # Direct access to engine state for workflow coordination
        execution_context = {
            **context,
            "engine_state": self._get_current_state(),
            "parameters": parameters
        }

        workflow_result = await self._execute_workflow_steps(
            workflow_id, execution_context
        )

        return {
            "workflow_id": workflow_id,
            "execution_result": workflow_result,
            "final_state": self._get_current_state()
        }

    async def _validate_context(self, task: Task, context: Dict[str, Any]) -> Dict[str, Any]:
        """Handle context validation tasks"""
        validation_rules = task.data.get("rules", [])

        validation_results = []
        for rule in validation_rules:
            result = await self.validation_service.validate_rule(rule, context)
            validation_results.append({
                "rule": rule,
                "valid": result.is_valid,
                "details": result.details
            })

        return {
            "validation_results": validation_results,
            "all_valid": all(r["valid"] for r in validation_results)
        }

    def _get_current_state(self) -> Dict[str, Any]:
        """Get current engine state for context"""
        return {
            "active_tasks": len(self.task_handlers),
            "github_connected": self.github_agent.is_connected,
            "timestamp": datetime.utcnow().isoformat()
        }

    async def _execute_workflow_steps(self, workflow_id: str, context: Dict[str, Any]):
        """Internal workflow execution logic"""
        # Implementation for executing workflow steps
        pass

Task Registration and Management

class TaskRegistry:
    """Registry for managing task types and their handlers"""

    def __init__(self, engine: OrchestrationEngine):
        self.engine = engine

    def register_handler(self, task_type: TaskType, handler_method_name: str):
        """Register a new handler method on the engine"""
        if not hasattr(self.engine, handler_method_name):
            raise ValueError(f"Handler method {handler_method_name} not found on engine")

        handler_method = getattr(self.engine, handler_method_name)
        self.engine.task_handlers[task_type] = handler_method

    def get_supported_tasks(self) -> List[TaskType]:
        """Get list of supported task types"""
        return list(self.engine.task_handlers.keys())

    def validate_task(self, task: Task) -> bool:
        """Validate if task type is supported"""
        return task.type in self.engine.task_handlers

# Usage example
engine = OrchestrationEngine(github_agent, file_service, validation_service)
registry = TaskRegistry(engine)

# Task execution
task = Task(
    task_type=TaskType.GITHUB_CREATE_ISSUE,
    data={
        "title": "New Feature Request",
        "body": "Detailed description...",
        "labels": ["enhancement"]
    }
)

context = {
    "repository": "owner/repo",
    "project": project_instance,
    "timestamp": datetime.utcnow().isoformat()
}

result = await engine.handle_task(task, context)

Usage Guidelines

Handler Method Design

State Management

Error Handling

Benefits

Trade-offs

Anti-patterns to Avoid

References

Migration Notes

Consolidated from: