Pattern-025: Canonical Query Extension Pattern

Status

Proven

Context

Existing canonical query systems often need additional intelligence and capabilities, but replacing them entirely risks breaking backward compatibility and losing proven functionality. Traditional approaches either create entirely new systems (losing compatibility) or modify existing systems directly (risking stability). The Canonical Query Extension Pattern addresses:

Pattern Description

The Canonical Query Extension Pattern extends existing canonical query systems with additional intelligence while preserving original functionality and maintaining backward compatibility. The pattern uses delegation to preserve existing behavior while adding enhancements through composition, ensuring graceful degradation and additive functionality.

Core concept:

Implementation

Core Extension Framework

from typing import Dict, Any, Optional, List
from dataclasses import dataclass
from datetime import datetime
import asyncio
import logging

@dataclass
class IssueIntelligenceResult:
    """Enhanced result preserving original response"""
    original_response: Dict[str, Any]      # From CanonicalHandlers
    enhanced_message: str                  # With additional context
    issue_intelligence: Dict[str, Any]     # Enhancement data
    context_source: str = "github_integration"
    enhancement_time_ms: Optional[int] = None
    fallback_used: bool = False

class CanonicalQueryExtensionEngine:
    """Base class for extending canonical query systems"""

    def __init__(self, original_handlers, enhancement_services: Dict[str, Any]):
        self.original_handlers = original_handlers  # Delegation target
        self.enhancement_services = enhancement_services
        self.performance_monitor = PerformanceMonitor()
        self.circuit_breaker = CircuitBreaker()
        self.logger = logging.getLogger(__name__)

    async def handle_extended_query(self, intent, session_id, enhancement_config: Dict[str, Any] = None) -> Dict[str, Any]:
        """Handle query with optional enhancements while preserving original behavior"""
        # Step 1: Always get original response first (delegation)
        start_time = datetime.utcnow()

        try:
            original_response = await self.original_handlers.handle(intent, session_id)
        except Exception as e:
            self.logger.error(f"Original handler failed: {e}")
            return {"error": "Original query handler failed", "details": str(e)}

        original_time = (datetime.utcnow() - start_time).total_seconds() * 1000

        # Step 2: Determine if enhancements are requested and available
        should_enhance = self._should_apply_enhancements(intent, enhancement_config)

        if not should_enhance:
            return {
                "response": original_response,
                "enhanced": False,
                "original_time_ms": original_time
            }

        # Step 3: Apply enhancements with circuit breaker protection
        enhancement_start = datetime.utcnow()
        enhancement_result = await self._apply_enhancements(
            original_response, intent, session_id
        )
        enhancement_time = (datetime.utcnow() - enhancement_start).total_seconds() * 1000

        # Step 4: Merge results preserving original structure
        return {
            "response": enhancement_result.original_response,  # Preserved exactly
            "enhanced_message": enhancement_result.enhanced_message,
            "enhancements": enhancement_result.issue_intelligence,
            "enhanced": True,
            "original_time_ms": original_time,
            "enhancement_time_ms": enhancement_time,
            "fallback_used": enhancement_result.fallback_used
        }

    async def _apply_enhancements(self, original_response: Dict[str, Any], intent, session_id) -> IssueIntelligenceResult:
        """Apply enhancements with graceful degradation"""
        try:
            # Attempt to gather enhancement intelligence
            enhancement_data = await self.circuit_breaker.call(
                self._gather_enhancement_intelligence,
                intent, session_id
            )

            # Enhance the original message
            enhanced_message = await self._enhance_message(
                original_response.get("message", ""),
                enhancement_data,
                intent
            )

            return IssueIntelligenceResult(
                original_response=original_response,
                enhanced_message=enhanced_message,
                issue_intelligence=enhancement_data,
                enhancement_time_ms=None,  # Calculated by caller
                fallback_used=False
            )

        except Exception as e:
            self.logger.warning(f"Enhancement failed, using fallback: {e}")

            # Graceful degradation - return original with minimal enhancement
            return IssueIntelligenceResult(
                original_response=original_response,
                enhanced_message=original_response.get("message", ""),
                issue_intelligence={"error": "Enhancement unavailable", "reason": str(e)},
                fallback_used=True
            )

    def _should_apply_enhancements(self, intent, config: Dict[str, Any] = None) -> bool:
        """Determine if enhancements should be applied"""
        if not config:
            config = {}

        # Check if enhancements are explicitly disabled
        if config.get("disable_enhancements", False):
            return False

        # Check if enhancement services are available
        if not self.enhancement_services:
            return False

        # Check circuit breaker state
        if self.circuit_breaker.is_open():
            return False

        # Check intent type compatibility
        compatible_actions = config.get("compatible_actions", ["list_issues", "search_issues", "get_project_status"])
        if intent.action not in compatible_actions:
            return False

        return True

    async def _gather_enhancement_intelligence(self, intent, session_id) -> Dict[str, Any]:
        """Gather enhancement intelligence from external services"""
        intelligence = {}

        # GitHub integration enhancement
        if "github" in self.enhancement_services:
            github_service = self.enhancement_services["github"]
            try:
                github_data = await github_service.get_related_issues(intent)
                intelligence["github_issues"] = github_data
            except Exception as e:
                self.logger.warning(f"GitHub enhancement failed: {e}")
                intelligence["github_issues"] = {"error": str(e)}

        # Spatial intelligence enhancement
        if "spatial" in self.enhancement_services:
            spatial_service = self.enhancement_services["spatial"]
            try:
                spatial_context = await spatial_service.analyze_spatial_context(intent)
                intelligence["spatial_context"] = spatial_context
            except Exception as e:
                self.logger.warning(f"Spatial enhancement failed: {e}")
                intelligence["spatial_context"] = {"error": str(e)}

        # Learning system enhancement
        if "learning" in self.enhancement_services:
            learning_service = self.enhancement_services["learning"]
            try:
                learned_patterns = await learning_service.get_relevant_patterns(intent)
                intelligence["learned_patterns"] = learned_patterns
            except Exception as e:
                self.logger.warning(f"Learning enhancement failed: {e}")
                intelligence["learned_patterns"] = {"error": str(e)}

        return intelligence

    async def _enhance_message(self, original_message: str, intelligence: Dict[str, Any], intent) -> str:
        """Enhance original message with additional context"""
        enhanced_parts = [original_message]

        # Add GitHub issues context if available
        github_issues = intelligence.get("github_issues", {})
        if github_issues and not github_issues.get("error"):
            issue_count = len(github_issues.get("issues", []))
            if issue_count > 0:
                enhanced_parts.append(f"\n\nπŸ”— Related: Found {issue_count} related GitHub issues")

                # Add top 3 most relevant issues
                for issue in github_issues.get("issues", [])[:3]:
                    enhanced_parts.append(f"  β€’ #{issue.get('number')}: {issue.get('title', 'Untitled')}")

        # Add spatial context if available
        spatial_context = intelligence.get("spatial_context", {})
        if spatial_context and not spatial_context.get("error"):
            attention_level = spatial_context.get("attention_level", 0)
            if attention_level > 0.7:
                enhanced_parts.append(f"\n⚑ High priority context detected (attention: {attention_level:.1f})")

        # Add learned patterns if available
        learned_patterns = intelligence.get("learned_patterns", {})
        if learned_patterns and not learned_patterns.get("error"):
            pattern_count = len(learned_patterns.get("patterns", []))
            if pattern_count > 0:
                enhanced_parts.append(f"\n🧠 {pattern_count} learned patterns may apply to this context")

        return "".join(enhanced_parts)

class IssueIntelligenceCanonicalQueryEngine(CanonicalQueryExtensionEngine):
    """Canonical query engine with GitHub issue intelligence"""

    def __init__(self, canonical_handlers, github_integration, session_manager):
        enhancement_services = {
            "github": github_integration,
            "session": session_manager
        }
        super().__init__(canonical_handlers, enhancement_services)

    async def enhance_canonical_query(self, intent, session_id) -> IssueIntelligenceResult:
        """Enhanced query with GitHub issue intelligence"""
        # Step 1: Get original response (delegation)
        original_response = await self.original_handlers.handle(intent, session_id)

        # Step 2: Gather GitHub issue intelligence
        issue_intelligence = await self._gather_issue_intelligence(intent)

        # Step 3: Enhance message with additional context
        enhanced_message = await self._enhance_message_with_issues(
            original_response["message"], issue_intelligence, intent
        )

        # Step 4: Return enhanced result preserving original
        return IssueIntelligenceResult(
            original_response=original_response,      # Preserved
            enhanced_message=enhanced_message,        # Enhanced
            issue_intelligence=issue_intelligence     # Additional data
        )

    async def _gather_issue_intelligence(self, intent) -> Dict[str, Any]:
        """Gather GitHub issue intelligence for enhancement"""
        intelligence = {}

        try:
            github_service = self.enhancement_services["github"]

            # Search for related issues based on intent
            if intent.action in ["list_issues", "search_issues"]:
                query_terms = self._extract_search_terms(intent)
                related_issues = await github_service.search_issues(query_terms)
                intelligence["related_issues"] = related_issues

            # Get project-specific issue metrics
            if hasattr(intent, 'project_id') and intent.project_id:
                project_metrics = await github_service.get_project_metrics(intent.project_id)
                intelligence["project_metrics"] = project_metrics

            # Get user-specific issue context
            if hasattr(intent, 'user_context') and intent.user_context:
                user_issues = await github_service.get_user_issues(intent.user_context)
                intelligence["user_issues"] = user_issues

        except Exception as e:
            self.logger.error(f"Failed to gather issue intelligence: {e}")
            intelligence["error"] = str(e)

        return intelligence

    async def _enhance_message_with_issues(self, original_message: str, intelligence: Dict[str, Any], intent) -> str:
        """Enhance message with GitHub issue context"""
        if intelligence.get("error"):
            return original_message  # No enhancement on error

        enhanced_parts = [original_message]

        # Add related issues
        related_issues = intelligence.get("related_issues", [])
        if related_issues:
            enhanced_parts.append(f"\n\nπŸ“‹ Found {len(related_issues)} related issues:")
            for issue in related_issues[:5]:  # Top 5 issues
                status_emoji = "βœ…" if issue.get("state") == "closed" else "πŸ”΄"
                enhanced_parts.append(f"  {status_emoji} #{issue.get('number')}: {issue.get('title')}")

        # Add project metrics
        project_metrics = intelligence.get("project_metrics", {})
        if project_metrics:
            open_count = project_metrics.get("open_issues", 0)
            closed_count = project_metrics.get("closed_issues", 0)
            if open_count > 0 or closed_count > 0:
                enhanced_parts.append(f"\n\nπŸ“Š Project status: {open_count} open, {closed_count} closed issues")

        # Add user context
        user_issues = intelligence.get("user_issues", [])
        if user_issues:
            assigned_count = len([i for i in user_issues if i.get("assigned_to_user")])
            if assigned_count > 0:
                enhanced_parts.append(f"\n\nπŸ‘€ You have {assigned_count} assigned issues")

        return "".join(enhanced_parts)

    def _extract_search_terms(self, intent) -> List[str]:
        """Extract search terms from intent for issue search"""
        terms = []

        if hasattr(intent, 'message') and intent.message:
            # Extract keywords from message
            words = intent.message.lower().split()
            # Filter out common words and keep meaningful terms
            meaningful_words = [w for w in words if len(w) > 3 and w not in ["this", "that", "with", "have", "will"]]
            terms.extend(meaningful_words[:5])  # Top 5 terms

        if hasattr(intent, 'action') and intent.action:
            terms.append(intent.action)

        return terms

Performance Monitoring and Circuit Breaker

class EnhancementPerformanceMonitor:
    """Monitor performance of canonical query extensions"""

    def __init__(self):
        self.metrics = {
            "total_queries": 0,
            "enhanced_queries": 0,
            "enhancement_failures": 0,
            "average_original_time_ms": 0.0,
            "average_enhancement_time_ms": 0.0,
            "circuit_breaker_trips": 0
        }
        self.recent_performance = []
        self.max_history = 100

    def record_query_performance(self, original_time_ms: float, enhancement_time_ms: float = None,
                                enhanced: bool = False, failed: bool = False):
        """Record performance metrics for monitoring"""
        self.metrics["total_queries"] += 1

        # Update average original time
        total_original_time = self.metrics["average_original_time_ms"] * (self.metrics["total_queries"] - 1)
        self.metrics["average_original_time_ms"] = (total_original_time + original_time_ms) / self.metrics["total_queries"]

        if enhanced:
            self.metrics["enhanced_queries"] += 1

            if enhancement_time_ms:
                # Update average enhancement time
                enhanced_count = self.metrics["enhanced_queries"]
                total_enhancement_time = self.metrics["average_enhancement_time_ms"] * (enhanced_count - 1)
                self.metrics["average_enhancement_time_ms"] = (total_enhancement_time + enhancement_time_ms) / enhanced_count

        if failed:
            self.metrics["enhancement_failures"] += 1

        # Keep recent performance history
        self.recent_performance.append({
            "timestamp": datetime.utcnow(),
            "original_time_ms": original_time_ms,
            "enhancement_time_ms": enhancement_time_ms,
            "enhanced": enhanced,
            "failed": failed
        })

        # Trim history if too long
        if len(self.recent_performance) > self.max_history:
            self.recent_performance = self.recent_performance[-self.max_history:]

    def get_performance_summary(self) -> Dict[str, Any]:
        """Get comprehensive performance summary"""
        enhancement_rate = (
            self.metrics["enhanced_queries"] / max(self.metrics["total_queries"], 1)
        )

        failure_rate = (
            self.metrics["enhancement_failures"] / max(self.metrics["enhanced_queries"], 1)
        )

        return {
            "total_queries": self.metrics["total_queries"],
            "enhancement_rate": enhancement_rate,
            "failure_rate": failure_rate,
            "average_times": {
                "original_ms": self.metrics["average_original_time_ms"],
                "enhancement_ms": self.metrics["average_enhancement_time_ms"],
                "total_ms": self.metrics["average_original_time_ms"] + self.metrics["average_enhancement_time_ms"]
            },
            "circuit_breaker_trips": self.metrics["circuit_breaker_trips"],
            "health_status": self._calculate_health_status()
        }

    def _calculate_health_status(self) -> str:
        """Calculate overall health status"""
        failure_rate = (
            self.metrics["enhancement_failures"] / max(self.metrics["enhanced_queries"], 1)
        )

        if failure_rate > 0.5:
            return "unhealthy"
        elif failure_rate > 0.2:
            return "degraded"
        else:
            return "healthy"

class CanonicalQueryCircuitBreaker:
    """Circuit breaker for canonical query enhancements"""

    def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = "closed"  # closed, open, half-open

    async def call(self, func, *args, **kwargs):
        """Execute function with circuit breaker protection"""
        if self.state == "open":
            if self.last_failure_time and \
               (datetime.utcnow() - self.last_failure_time).seconds > self.recovery_timeout:
                self.state = "half-open"
            else:
                raise Exception("Circuit breaker is open - enhancement unavailable")

        try:
            result = await func(*args, **kwargs)

            # Success resets failure count
            if self.state == "half-open":
                self.state = "closed"
                self.failure_count = 0

            return result

        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = datetime.utcnow()

            if self.failure_count >= self.failure_threshold:
                self.state = "open"

            raise

    def is_open(self) -> bool:
        """Check if circuit breaker is open"""
        return self.state == "open"

    def get_state(self) -> Dict[str, Any]:
        """Get current circuit breaker state"""
        return {
            "state": self.state,
            "failure_count": self.failure_count,
            "failure_threshold": self.failure_threshold,
            "last_failure": self.last_failure_time.isoformat() if self.last_failure_time else None,
            "recovery_timeout": self.recovery_timeout
        }

Usage Examples

# Setup canonical query extension
canonical_handlers = ExistingCanonicalHandlers()
github_integration = GitHubIntegrationService()
session_manager = SessionManager()

# Create extended engine
extended_engine = IssueIntelligenceCanonicalQueryEngine(
    canonical_handlers=canonical_handlers,
    github_integration=github_integration,
    session_manager=session_manager
)

# Handle query with enhancement
async def handle_enhanced_query(intent, session_id):
    result = await extended_engine.handle_extended_query(
        intent=intent,
        session_id=session_id,
        enhancement_config={
            "compatible_actions": ["list_issues", "search_issues", "get_project_status"],
            "disable_enhancements": False
        }
    )

    if result["enhanced"]:
        print(f"Original response: {result['response']}")
        print(f"Enhanced message: {result['enhanced_message']}")
        print(f"Performance: {result['original_time_ms']}ms original, {result['enhancement_time_ms']}ms enhancement")
    else:
        print(f"Original response only: {result['response']}")
        print(f"Performance: {result['original_time_ms']}ms")

    return result

# Monitor performance
performance_monitor = EnhancementPerformanceMonitor()

async def monitored_query_handler(intent, session_id):
    start_time = datetime.utcnow()

    try:
        result = await extended_engine.handle_extended_query(intent, session_id)

        # Record performance
        performance_monitor.record_query_performance(
            original_time_ms=result.get("original_time_ms", 0),
            enhancement_time_ms=result.get("enhancement_time_ms", 0),
            enhanced=result.get("enhanced", False),
            failed=False
        )

        return result

    except Exception as e:
        total_time = (datetime.utcnow() - start_time).total_seconds() * 1000
        performance_monitor.record_query_performance(
            original_time_ms=total_time,
            enhanced=False,
            failed=True
        )
        raise

Usage Guidelines

Extension Design Principles

Integration Best Practices

Configuration Management

Benefits

Trade-offs

Anti-patterns to Avoid

References

Migration Notes

Consolidated from:

Last updated: September 15, 2025