⚠️ STALE ARCHITECTURE WARNING (Apr 11, 2026) This document describes the September 2025 architecture and predates the M1 floor inversion (#911), the Apr 8 IDENTITY full migration to floor, and several other M1-era architectural decisions. The diagrams below show “Intent Classifier → Workflow Factory” as the application core; the current reality is “Classifier → Action Gate → (Floor with Context | Canonical Handler | Workflow Dispatcher)” per ADR-059 and ADR-060.
Full rewrite pending in M2a doc cleanup. Use this document for high-level DDD context only; refer to the ADRs above for current routing reality.
Last Updated: 2026-04-11 (post-M1) Status: M1 complete. Floor-first routing live in production.
The application layer is now a routing graph rather than a simple “classifier -> workflow factory” pair. The graph terminates at one of three destinations: the conversational floor, a canonical handler, or the workflow dispatcher.
User Message
|
v
Pre-classifier (fast pattern match)
| miss
+-----> LLM Classifier (task_type="intent_classification")
| |
v v
Intent (category, action, confidence, context)
|
v
Action Gate
|--- _requires_canonical_handler(intent) ----> Canonical Handler
| (PORTFOLIO, EXECUTION, TEMPORAL, STATUS,
| PRIORITY, CONVERSATION+greeting,
| GUIDANCE+setup-topic)
|
|--- _should_route_to_floor(intent) ---------> Conversational Floor
| (GUIDANCE, IDENTITY, DISCOVERY, with assembled context
| TRUST, MEMORY, CONVERSATION, (ContextAssembler)
| UNKNOWN) + 6-turn history
| + domain_context block
|
+--- otherwise ------------------------------> Workflow Dispatcher
(ANALYSIS, SYNTHESIS, STRATEGY, (ADR-059)
PLANNING, REVIEW, LEARNING, QUERY)
Source of truth:
services/intent/intent_service.py lines 9863-9962
(_requires_canonical_handler, _should_route_to_floor,
_handle_floor_with_context)services/intent_service/canonical_handlers.py line 129
(CanonicalHandler.can_handle)services/intent_service/conversational_floor.py
(floor impl, system prompt, fabrication guardrails)ConversationTurn in
services/intent_service/conversation_context.py gained a response
field so the floor sees both the user’s message and Piper’s prior reply
when assembling history.conversational_floor.py now explicitly prohibits inventing
user data (todos, projects, calendar entries) when the context block is
empty. See intent-categories-reference.md.model_tier. See
llm-configuration.md.Conversational Floor
(services/intent_service/conversational_floor.py)
domain_context
dict assembled per-categoryLLMClient.complete(task_type="conversation", ...)_classify_llm_error:
auth/config, no-provider, or transientCanonical Handlers (services/intent_service/canonical_handlers.py)
Workflow Dispatcher (ADR-059, services/workflows/)
The diagram below is the pre-M1 application-layer view. The domain-services, data, and infrastructure layers it shows are still accurate; the application layer is superseded by the routing graph above.
┌─────────────────────────────────────────────────────────────────────────────┐
│ USER INTERFACE LAYER │
├─────────────────────────────────────────────────────────────────────────────┤
│ ✅ FastAPI Web Server │ ✅ Web UI (DDD, TDD, resizable chat window) │ 📋 Admin Interface │
│ (Built & Running) │ (Built & Working) │ (Not Yet Designed) │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│ APPLICATION LAYER (SUPERSEDED — see above) │
├─────────────────────────────────────────────────────────────────────────────┤
│ ⚠️ Intent Classifier │ ⚠️ Workflow Factory │ 📋 Learning Engine │
│ (Now one of 3 terminals) │ (Now Workflow │ (Not Yet Designed) │
│ │ Dispatcher, ADR-059) │ │
│ ⚠️ Query Service │ ⚠️ Orchestration │ 📋 Analytics Engine │
│ (Part of floor context) │ Engine │ (Not Yet Designed) │
│ │ (Pre-exec Validation) │ │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│ DOMAIN SERVICES LAYER │
├─────────────────────────────────────────────────────────────────────────────┤
│ ✅ GitHubDomainService │ ✅ SlackDomainService │ ✅ NotionDomainService │
│ (Router Architecture) │ (Mediation Complete) │ (Mediation Complete) │
│ • Router-based operations │ • Webhook handling │ • Workspace mgmt │
│ • Spatial intelligence │ • Spatial events │ • Database ops │
│ • Feature flag control │ • Health monitoring │ • Page operations │
│ │ │ │
│ ✅ StandupOrchestrationService │ ✅ PortConfigurationService │ 📋 Future Domain Services │
│ (Workflow Coordination) │ (Centralized Config) │ (As Needed) │
│ • Domain workflow mgmt │ • Environment-aware │ │
│ • Integration orchestration│ • URL generation │ │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│ SERVICE LAYER │
├─────────────────────────────────────────────────────────────────────────────┤
│ ✅ Domain Models │ ✅ Workflow Service │ 📋 Feedback Service │
│ (Built) │ (Built & Working) │ (Not Yet Designed) │
│ │ │ │
│ ✅ Event System │ ✅ GitHub Integration │ ✅ Slack Integration │
│ (Built) │ (Fully Integrated) │ (Spatial Metaphors) │
│ │ (Issue Creation Working)│ (OAuth + Workflows) │
│ ✅ Knowledge Base │ ✅ Document Processor │ 📋 Report Generator │
│ (Built & Working) │ (Built & Working) │ (Not Yet Designed) │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│ RESPONSE ENHANCEMENT LAYER │
├─────────────────────────────────────────────────────────────────────────────┤
│ ✅ ResponsePersonalityEnhancer│ ✅ PersonalityProfile │ ✅ TransformationService │
│ (Production Ready) │ (Database + YAML) │ (Warmth + Confidence) │
│ • <70ms performance │ • User preferences │ • Action guidance │
│ • Circuit breaker │ • LRU caching │ • Context adaptation │
│ • Graceful degradation │ • Config overrides │ • Performance optimized │
└─────────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ UI MESSAGE LAYER │
├─────────────────────────────────────────────────────────────────────────────┤
│ ActionHumanizer │ TemplateRenderer │ Message Templates │
│ • Cache-first lookup │ • Template selection │ • Intent-based │
│ • Rule-based conversion │ • Variable substitution │ • Workflow-based │
│ • Usage tracking │ • Humanization integration │ • Fallbacks │
└─────────────────────────────────────────────────────────────────────────────┘
│ │ │
└────────────────────────┴─────────────────────────┘
│
▼
User-Facing Messages
┌─────────────────────────────────────────────────────────────────────────────┐
│ DATA LAYER │
├─────────────────────────────────────────────────────────────────────────────┤
│ ✅ PostgreSQL │ ✅ ChromaDB │ ✅ Redis │
│ (Domain-First Schema) │ (Deployed & Working) │ (Deployed & Working) │
│ │ │ │
│ ✅ Domain Persistence │ ✅ Vector Storage │ ✅ Event Queue │
│ (Working) │ (Working) │ (Working) │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│ INFRASTRUCTURE LAYER │
├─────────────────────────────────────────────────────────────────────────────┤
│ ✅ Docker Compose │ ✅ Traefik Gateway │ ✅ Temporal │
│ (Deployed & Running) │ (Deployed & Running) │ (Deployed & Running) │
│ │ │ │
│ ✅ Service Discovery │ ✅ Load Balancing │ ✅ Workflow Engine │
│ (Working) │ (Working) │ (Working) │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│ EXTERNAL INTEGRATIONS │
├─────────────────────────────────────────────────────────────────────────────┤
│ ✅ Claude API │ ✅ GitHub API │ 📋 Slack/Teams │
│ (Connected & Working) │ (Fully Integrated) │ (Not Yet Designed) │
│ │ (Issue Creation Working)│ │
│ ✅ OpenAI API │ 📋 Jira API │ 📋 Analytics APIs │
│ (Connected & Working) │ (Not Yet Designed) │ (Planned Q3 2025) │
│ │ │ - Datadog │
│ │ │ - New Relic │
│ │ │ - Google Analytics │
└─────────────────────────────────────────────────────────────────────────────┘
## Infrastructure Constants
### Development Environment
- **Port**: 8001 (all local development, NOT 8080)
- **Web Pattern**: Single app.py (~750 lines as of Sept 2025)
- **Database**: PostgreSQL with AsyncSessionFactory pattern
- **Config**: PIPER.user.md in config/ directory
- **Python**: 3.11+ required
### API Patterns
- **REST Response Structure**:
```json
{
"status": "success",
"data": {
"field_name": "value",
"nested_fields": {...}
}
}
{
"status": "error",
"error": "message",
"details": {}
}
piper-morgan/
├── cli/
│ └── commands/ # Standalone CLI scripts
├── web/
│ └── app.py # Single file (NO routes/ directory)
├── services/ # Domain-driven design
│ ├── features/ # Feature services
│ ├── infrastructure/ # Infrastructure services
│ └── shared_types.py # Shared type definitions
├── config/
│ └── PIPER.user.md # User configuration
└── docs/
├── architecture/ # Technical documentation
└── planning/ # Roadmaps, backlogs
The database layer uses PostgreSQL with SQLAlchemy 2.0+ async patterns.
# Current pattern in ALL services
from services.infrastructure.database import AsyncSessionFactory
async def get_data():
async with AsyncSessionFactory() as session:
result = await session.execute(query)
return result.scalars().all()
# Query pattern
async with AsyncSessionFactory() as session:
stmt = select(Model).where(Model.field == value)
result = await session.execute(stmt)
# Insert pattern
async with AsyncSessionFactory() as session:
session.add(new_object)
await session.commit()
The web layer uses a single FastAPI file (MVP pattern) with embedded HTML.
File: web/app.py (~750 lines as of Sept 2025)
# API endpoint returning JSON
@app.get("/api/standup")
async def morning_standup_api():
return {
"status": "success",
"data": {
"yesterday_accomplishments": [...],
"today_priorities": [...],
"blockers": []
}
}
# UI endpoint returning HTML
@app.get("/standup")
async def standup_ui():
return HTMLResponse(content="""
<!DOCTYPE html>
<html>
<!-- Embedded HTML with JavaScript -->
</html>
""")
yesterday_accomplishments, not accomplishmentsdata.data.field_name in JavaScriptThe MUX (Modeled User Experience) Object Model provides consciousness and ownership tracking for Piper Morgan. This enables Piper to have identity, awareness, emotional states, and epistemological tracking of knowledge sources.
Location: services/mux/
Tests: 165+ tests covering consciousness (104) and ownership (61)
Patterns: 055-058 (documented in pattern catalog)
┌─────────────────────────────────────────────────────────────────┐
│ MUX Object Model Layer │
├─────────────────────────────────────────────────────────────────┤
│ Consciousness Module │ Ownership Module │ Grammar │
│ (services/mux/ │ (services/mux/ │ Foundation │
│ consciousness.py) │ ownership.py) │ │
│ • PiperEntity │ • OwnershipCategory │ "Entities │
│ • AwarenessLevel │ • OwnershipMetadata │ experience │
│ • EmotionalState │ • OwnershipResolver │ Moments │
│ • ConsciousnessAttrs │ • OwnershipTransform │ at Places" │
│ • EntityContext │ • HasOwnership Proto │ │
│ • ConsciousnessExpr │ │ │
└─────────────────────────────────────────────────────────────────┘
The MUX grammar provides semantic grounding for the object model:
Piper and other entities can have:
All knowledge has an epistemological relationship:
The grammar-driven classification (Pattern-057) uses MUX concepts:
ConsciousnessExpression generates first-person language:
OwnershipMetadata can be embedded in any domain model:
@dataclass
class Session:
id: str
ownership: OwnershipMetadata = field(
default_factory=lambda: OwnershipMetadata.native("piper-core")
)
Model Context Protocol (MCP) integration enables tool federation and spatial intelligence.
# MCP Consumer pattern
from services.infrastructure.mcp_consumer import MCPConsumerCore
async def fetch_github_issues():
consumer = MCPConsumerCore()
return await consumer.fetch_resources("github-issues")
8-dimensional analysis across:
# Always deploy both Code and Cursor for critical fixes
# Phase 0: Investigation (both agents)
# Phase 1: Implementation (agent-specific)
# Phase 2: Cross-validation (verify each other)
The Slack integration implements a revolutionary spatial metaphor approach, enabling Piper Morgan to understand and navigate Slack environments as physical spaces. This creates an embodied AI experience where Piper develops spatial awareness and memory.
┌─────────────────────────────────────────────────────────────────┐
│ Slack Spatial Integration │
├─────────────────────────────────────────────────────────────────┤
│ OAuth Handler │ Spatial Mapper │ Webhook Router │
│ • Territory Init │ • Metaphor Engine │ • Event Process │
│ • State Management │ • Spatial Objects │ • Signature Verify│
│ │ • Coordinate System │ │
├─────────────────────────────────────────────────────────────────┤
│ Workspace Navigator │ Attention Model │ Spatial Memory │
│ • Multi-Territory │ • Priority Algorithms│ • Pattern Learning│
│ • State Tracking │ • Decay Models │ • JSON Persistence│
│ • Risk Assessment │ • Focus Management │ • Cross-Session │
└─────────────────────────────────────────────────────────────────┘
Slack Event → Spatial Mapping → Attention Processing → Navigation Decision → Workflow Creation
↓ ↓ ↓ ↓ ↓
WebHook Event → Room/Territory → Attention Event → Priority Score → Piper Workflow
@piper help with feature → CREATE_FEATURE workflow
User Intent → Intent Classifier → EXECUTION/SYNTHESIS → Workflow Factory (ValidationRegistry) → Context Validation → Orchestration Engine → External Systems
↓ ↓
Pre-execution Validation State Changes
(User-friendly errors)
User Intent → Intent Classifier → QUERY → OrchestrationEngine.handle_query_intent()
↓
QueryRouter.get_query_router() → Session-Aware Services → Repository → Direct Data Access
↓
Formatted Results → Web Response
Current Implementation Details:
web/app.py routes QUERY intents to orchestration_engine.handle_query_intent(intent)The Context Validation Framework provides pre-execution validation for all workflow types, preventing runtime failures and delivering user-friendly error messages with actionable guidance.
services/orchestration/workflow_factory.pyservices/orchestration/validation.pyWorkflow Creation Request
↓
ValidationRegistry (Context Requirements)
↓
WorkflowContextValidator (Rule Validation)
↓
[Valid Context] → Workflow Execution
↓
[Invalid Context] → User-Friendly Error Message
original_message, validates project_id and repositoryoriginal_message onlyoriginal_message, validates file_id and resolved_file_idoriginal_message, validates data_sourceoriginal_message, validates github_urloriginal_message, validates scope and objectivesThe CQRS-lite pattern separates read operations (queries) from write operations (commands) in the Piper Morgan system. This provides clear architectural boundaries, better performance for simple data fetches, and prevents forcing query-like operations into complex workflow patterns.
Queries are identified through intent classification:
python
# Intent classifier recognizes QUERY category for read-only operations
if intent.category == IntentCategory.QUERY:
# Route to QueryRouter
result = await query_router.route_query(intent)
else:
# Route to WorkflowFactory for commands
workflow = await workflow_factory.create_from_intent(intent)
The QueryRouter handles QUERY category intents by dispatching them to appropriate query services based on intent analysis. Status: ✅ Operational and integrated.
The QueryRouter is integrated into the OrchestrationEngine using an on-demand initialization pattern with session-aware wrappers:
class OrchestrationEngine:
def __init__(self, llm_client: Optional[LLMClient] = None):
# QueryRouter initialized on-demand using async session pattern
self.query_router = None
async def get_query_router(self) -> QueryRouter:
"""Get QueryRouter, initializing on-demand with session-aware wrappers"""
if self.query_router is None:
# Initialize QueryRouter with session-aware services
self.query_router = QueryRouter(
project_query_service=SessionAwareProjectQueryService(),
conversation_query_service=ConversationQueryService(),
file_query_service=SessionAwareFileQueryService(),
)
return self.query_router
async def handle_query_intent(self, intent: Intent) -> Dict[str, Any]:
"""Handle QUERY intents using QueryRouter integration (GREAT-1B bridge method)"""
query_router = await self.get_query_router()
if intent.action in ["search_projects", "list_projects", "find_projects"]:
projects = await query_router.project_queries.list_active_projects()
return {"message": f"Found {len(projects)} active projects", "data": projects}
# ... other query routing logic
The QueryRouter uses a comprehensive initialization pattern with multiple service integrations:
class QueryRouter:
"""Routes QUERY intents to appropriate query services with LLM enhancement"""
def __init__(
self,
project_query_service: ProjectQueryService,
conversation_query_service: ConversationQueryService,
file_query_service: FileQueryService,
# PM-034 Phase 2B: LLM Intent Classification Integration
llm_classifier: Optional[LLMIntentClassifier] = None,
knowledge_graph_service: Optional[KnowledgeGraphService] = None,
semantic_indexing_service: Optional[SemanticIndexingService] = None,
# Performance and reliability features
performance_targets: Optional[Dict[str, float]] = None,
degradation_config: Optional[Dict] = None,
# MCP Consumer integration for external tool federation
mcp_consumer: Optional["MCPConsumerCore"] = None,
enable_mcp_federation: bool = True,
):
# Service initialization with comprehensive configuration
The complete flow from user input to QueryRouter execution:
web/app.py): Receives user input and creates workfloworchestration_engine.handle_query_intent(intent)# Actual flow in web/app.py
if intent.category.value == "QUERY":
print(f"🔧 Routing generic QUERY intent to QueryRouter: {intent.action}")
result = await orchestration_engine.handle_query_intent(intent)
return {
"message": f"Query processed successfully: {intent.action}",
"result": result,
"workflow_id": workflow.id,
}
The QueryRouter implementation includes comprehensive error handling:
response_format={"type": "json_object"} parameter ensures consistent JSON responsestests/regression/test_queryrouter_lock.py prevents accidental disablingSeptember 2025 - PM-034 QueryRouter Resurrection:
response_format={"type": "json_object"} parameter to LLM callshandle_query_intent() for seamless integrationKey Technical Improvements:
Implementation Status: ✅ Complete and operational
services/queries/query_router.py (935 lines total)__init__, route_query, classify_and_route, federated_search)handle_query_intent() bridge methodtests/regression/test_queryrouter_lock.py prevent accidental disablingVerified Metrics (October 13, 2025):
dev/2025/10/13/proof-1-great-1-evidence.md)Query services provide read-only access to domain data:
python
class ProjectQueryService:
async def list_active_projects(self) -> List[Project]:
return await self.repo.list_active_projects()
async def get_project_by_id(self, project_id: str) -> Optional[Project]:
return await self.repo.get_by_id(project_id)
list_projects - List all active projectsget_project - Get specific project by IDget_default_project - Get the default projectfind_project - Find project by namecount_projects - Count active projectsThe web UI is now implemented as a DDD-compliant, test-driven interface. All bot message rendering and response handling is unified in a shared domain module (bot-message-renderer.js), ensuring:
Key Features:
marked.js (battle-tested library)Architecture Impact:
Example Flow:
User Input: "Users are complaining about crashes when uploading photos"
↓
Intent: ANALYSIS / investigate_crash
↓
Workflow: GENERATE_REPORT (crash analysis)
↓
Template: "I'll {human_action} you reported. Let me analyze this for you."
↓
Humanization: "investigate_crash" → "investigate the crash"
↓
Response: "I'll investigate the crash you reported. Let me analyze this for you."
All core infrastructure services are deployed and operational:
Core AI capabilities are operational:
Clean separation of concerns with PM concepts driving architecture:
Adopted per-call pattern for context injection rather than stateful factories. Benefits:
Introduced Query Service pattern to separate reads from writes:
Implemented sophisticated project resolution with:
Moved from hardcoded SQL to SQLAlchemy model-driven schema:
Discovered and documented during GitHub integration:
self.task_handlers = {TaskType.X: self._method_x}TaskType.GITHUB_CREATE_ISSUE: self._create_github_issueImplemented automatic repository lookup for GitHub workflows:
create_workflow_from_intentNamed Volumes (Recommended):
volumes:
piper_postgres_data:
name: piper_postgres_data_v1 # Explicit versioned name
Benefits:
Avoid Bind Mounts for Databases:
Lesson Learned: PM-011 - Directory rename caused data loss with bind mounts
Status: Not implemented Impact: Users get technical errors instead of helpful messages Solution: Implement comprehensive error handling with user-friendly messages
Status: Not started Impact: API-only interaction blocks user testing Solution: Build simple Streamlit or FastAPI chat interface
Status: Partially implemented (PM-009 query work in progress) Impact: Some queries forced into workflow pattern Solution: Complete Query Service implementation for LIST_PROJECTS and similar operations
All services communicate through events for:
Different models for different tasks:
External systems as plugins for:
Status: 100% Complete
Goals: Complete CQRS, activate learning, enhance workflows
Vision: Autonomous assistance and strategic insights
Date: July 13, 2025 Impact: High - Runtime reliability improvements, test contract changes
Problem Solved: File analysis workflow failures due to type mismatches between workflow context (integers) and database queries (strings).
Root Cause: Workflow context handling evolved to pass file IDs as integers from session management, but PostgreSQL repository interfaces expected string parameters.
Solution Pattern:
# Type conversion at service boundaries
file_id = str(file_id) # Convert to expected type before repository call
Architectural Lesson: Always validate and convert types at service boundaries to maintain contract integrity between layers.
Enhancement: Added intent metadata to workflow context for template system integration.
Pattern:
# Enhanced context propagation
context["intent_category"] = intent.category.value
context["intent_action"] = intent.action
Impact: Enables context-aware messaging without breaking domain model isolation.
Problem: Rule-based pre-classification became too aggressive, matching compound messages that require LLM analysis.
Evolution:
Required Fix: Add complexity detection to distinguish simple vs. compound messages.
Architectural Principle: Pre-classification should handle only unambiguous patterns. Complex or compound messages must flow through full LLM analysis.
Date: July 13, 2025 Status: Successfully integrated with minimal architectural impact
# Intent-based template selection
template = get_message_template(
intent_category=workflow.context.get("intent_category"),
intent_action=workflow.context.get("intent_action"),
workflow_type=workflow.type
)
Issue: Architectural improvements broke test assumptions about context handling and pre-classification behavior.
Root Cause: Tests written against earlier patterns didn’t evolve with architectural refinements.
Lessons Learned:
Achievement: Complete end-to-end file analysis pipeline now functional after resolving type safety issues.
Components Validated:
Performance: File analysis workflow success rate: 100% (post-fix)
AsyncSessionFactory Migration:
AsyncSessionFactory).Business Logic Test Suite Modernization:
Current Infrastructure TODOs:
See session logs and migration guide for full details.
Date: July 18, 2025 Impact: 642x performance improvement, production-ready infrastructure Status: Complete with feature flag deployment
The initial MCP integration suffered from a critical connection-per-request pattern causing:
Singleton Connection Pool with Circuit Breaker Protection:
class MCPConnectionPool:
"""Thread-safe singleton with circuit breaker and health monitoring"""
@asynccontextmanager
async def connection(self, server_config: Dict[str, Any]):
"""Context manager for automatic connection lifecycle"""
connection = await self.get_connection(server_config)
try:
yield connection
finally:
await self.return_connection(connection)
_instance = None
_instance_lock = threading.Lock()
@classmethod
def get_instance(cls):
if cls._instance is None:
with cls._instance_lock:
if cls._instance is None: # Double-checked locking
cls._instance = cls()
return cls._instance
async def _ensure_async_resources(self):
"""Initialize async resources only when needed"""
if self._connection_semaphore is None:
self._connection_semaphore = asyncio.Semaphore(self.max_connections)
if self._pool_lock is None:
self._pool_lock = asyncio.Lock()
Critical Discovery: Never hold async locks during I/O operations. Initial implementation deadlocked due to nested lock acquisition during connection creation.
async def _check_circuit_breaker(self):
"""Prevent cascade failures with configurable thresholds"""
if self._circuit_state == "open":
if time.time() - self._last_failure_time > self.circuit_breaker_timeout:
self._circuit_state = "half-open"
else:
raise MCPCircuitBreakerOpenError("Circuit breaker is open")
Configuration:
Safe Deployment Pattern:
# Feature flag with graceful fallback
USE_MCP_POOL = os.getenv("USE_MCP_POOL", "false").lower() == "true"
# Dual-mode operation in MCPResourceManager
if self.use_pool:
async with self.connection_pool.connection(self.client_config) as client:
content_results = await client.search_content(query)
else:
content_results = await self.client.search_content(query)
Benefits:
| Metric | Before (POC) | After (Pool) | Improvement |
|---|---|---|---|
| Connection Time | 103ms | 0.16ms | 642x faster |
| Memory Usage | Growing | Stable | Leak eliminated |
| Concurrent Scaling | Linear degradation | Constant performance | Unlimited scaling |
Real-World Impact:
TDD Metrics:
Test Categories:
Centralized Fault Tolerance: Pool-level circuit breaker provides system-wide protection against cascade failures.
class MCPConnectionPool:
def __init__(self):
# Circuit breaker configuration
self.circuit_breaker_threshold = 5 # Failures before opening
self.circuit_breaker_timeout = 60 # Recovery timeout (seconds)
self._circuit_state = "closed" # closed, open, half-open
self._failure_count = 0
self._last_failure_time = 0
async def _record_failure(self):
"""Record failure and potentially open circuit breaker"""
self._failure_count += 1
self._last_failure_time = time.time()
if self._failure_count >= self.circuit_breaker_threshold:
self._circuit_state = "open"
logger.error(f"Circuit breaker opened after {self._failure_count} failures")
async def _record_success(self):
"""Record success and potentially close circuit breaker"""
if self._circuit_state == "half-open":
self._circuit_state = "closed"
self._failure_count = 0
logger.info("Circuit breaker closed after successful connection")
❌ Anti-Pattern: Holding Locks During I/O
# WRONG - Causes deadlocks
async def _create_new_connection(self, server_config):
async with self._pool_lock: # Lock acquired
connection = PiperMCPClient(server_config)
await connection.connect() # I/O operation while holding lock
async with self._pool_lock: # DEADLOCK - nested acquisition
self._all_connections.append(connection)
✅ Correct Pattern: Minimize Lock Scope
# CORRECT - Lock only for shared state modification
async def _create_new_connection(self, server_config):
# I/O outside lock scope
connection = PiperMCPClient(server_config)
await connection.connect()
# Lock only for state modification
async with self._pool_lock:
self._all_connections.append(connection)
Problem: Async resources (locks, semaphores) cannot be created in __init__ due to event loop requirements.
Solution: Lazy initialization pattern:
async def _ensure_async_resources(self):
"""Initialize async resources when first needed"""
if self._connection_semaphore is None:
self._connection_semaphore = asyncio.Semaphore(self.max_connections)
if self._pool_lock is None:
self._pool_lock = asyncio.Lock()
Automatic Resource Management:
@asynccontextmanager
async def connection(self, server_config: Dict[str, Any]):
"""Automatic connection acquisition and return"""
connection = await self.get_connection(server_config)
try:
yield connection
finally:
await self.return_connection(connection)
Usage Benefits:
Connection Limiting Pattern:
async def get_connection(self, server_config):
# Acquire semaphore with timeout
await asyncio.wait_for(
self._connection_semaphore.acquire(),
timeout=self.connection_timeout
)
try:
return await self._get_or_create_connection(server_config)
except Exception:
# Always release semaphore on failure
self._connection_semaphore.release()
raise
Dual-Mode Integration: Enhanced existing MCPResourceManager to support both direct connections and pooled connections through feature flag.
Key Integration Points:
# Initialize method - detects pool availability
async def initialize(self, enabled: bool = False):
if self.use_pool:
# Test pool connectivity
async with self.connection_pool.connection(self.client_config) as test_client:
if await test_client.is_connected():
self.initialized = True
else:
# Direct connection (legacy mode)
self.client = PiperMCPClient(self.client_config)
# ... existing logic
Enhanced Statistics: Combined pool and connection metrics for comprehensive monitoring:
async def get_connection_stats(self):
base_stats = {
"using_pool": self.use_pool,
"enabled": self.enabled,
"initialized": self.initialized,
"available": await self.is_available()
}
if self.use_pool and self.connection_pool:
pool_stats = self.connection_pool.get_stats()
base_stats.update(pool_stats)
elif self.client:
client_stats = self.client.get_connection_stats()
base_stats.update(client_stats)
return base_stats
Backward Compatibility: All existing MCPResourceManager APIs maintained without modification.
Deployment Strategy:
USE_MCP_POOL=false by defaultUSE_MCP_POOL=true to enable poolMeasurement Methodology:
Key Findings:
Production Load Simulation:
Productivity Gains:
Complete Technical Documentation: MCP Connection Pool - 642x Performance Improvement
Comprehensive Analysis Including:
Key Architectural Contributions:
asyncio.timeout(): Critical for async operation timeoutsAll GitHub operations now flow through GitHubIntegrationRouter, enabling feature flag control between spatial intelligence and legacy operations.
services/orchestration/engine.py - OrchestrationEngineservices/domain/github_domain_service.py - GitHubDomainServiceservices/domain/pm_number_manager.py - PMNumberManagerservices/domain/standup_orchestration_service.py - StandupOrchestrationServiceservices/integrations/github/issue_analyzer.py - GitHubIssueAnalyzerservices/queries/query_router.py - QueryRoutertests/test_architecture_enforcement.py (7 comprehensive tests).pre-commit-config.yaml (automated violation blocking).github/workflows/architecture-enforcement.ymldocs/architecture/github-integration-router.mdUSE_SPATIAL_GITHUB=true: Enables spatial intelligence (8-dimensional analysis)USE_SPATIAL_GITHUB=false: Uses legacy GitHub operationsLast Updated: January 21, 2026