π ARCHITECTURAL EXCELLENCE MILESTONE ACHIEVED Date: September 12, 2025 Status: MVP Deployment Ready with Complete DDD Compliance Validation: Perfect 5/5 Steps Completed with Zero Regressions
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β USER INTERFACE LAYER β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
FastAPI Web Server β β
Web UI (DDD, TDD, resizable chat window) β π Admin Interface β
β (Built & Running) β (Built & Working) β (Not Yet Designed) β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β APPLICATION LAYER β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
Intent Classifier β β
Workflow Factory β π Learning Engine β
β (Built & Working) β (Built & ValidationRegistry) β (Not Yet Designed) β
β β β β
β π Query Service β β
Orchestration β π Analytics Engine β
β (Being Added) β Engine β (Not Yet Designed) β
β β (Pre-execution Validation) β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β DOMAIN SERVICES LAYER β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
GitHubDomainService β β
SlackDomainService β β
NotionDomainService β
β (Router Architecture) β (Mediation Complete) β (Mediation Complete) β
β β’ Router-based operations β β’ Webhook handling β β’ Workspace mgmt β
β β’ Spatial intelligence β β’ Spatial events β β’ Database ops β
β β’ Feature flag control β β’ Health monitoring β β’ Page operations β
β β β β
β β
StandupOrchestrationService β β
PortConfigurationService β π Future Domain Services β
β (Workflow Coordination) β (Centralized Config) β (As Needed) β
β β’ Domain workflow mgmt β β’ Environment-aware β β
β β’ Integration orchestrationβ β’ URL generation β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SERVICE LAYER β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
Domain Models β β
Workflow Service β π Feedback Service β
β (Built) β (Built & Working) β (Not Yet Designed) β
β β β β
β β
Event System β β
GitHub Integration β β
Slack Integration β
β (Built) β (Fully Integrated) β (Spatial Metaphors) β
β β (Issue Creation Working)β (OAuth + Workflows) β
β β
Knowledge Base β β
Document Processor β π Report Generator β
β (Built & Working) β (Built & Working) β (Not Yet Designed) β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β RESPONSE ENHANCEMENT LAYER β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
ResponsePersonalityEnhancerβ β
PersonalityProfile β β
TransformationService β
β (Production Ready) β (Database + YAML) β (Warmth + Confidence) β
β β’ <70ms performance β β’ User preferences β β’ Action guidance β
β β’ Circuit breaker β β’ LRU caching β β’ Context adaptation β
β β’ Graceful degradation β β’ Config overrides β β’ Performance optimized β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β UI MESSAGE LAYER β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β ActionHumanizer β TemplateRenderer β Message Templates β
β β’ Cache-first lookup β β’ Template selection β β’ Intent-based β
β β’ Rule-based conversion β β’ Variable substitution β β’ Workflow-based β
β β’ Usage tracking β β’ Humanization integration β β’ Fallbacks β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β β
ββββββββββββββββββββββββββ΄ββββββββββββββββββββββββββ
β
βΌ
User-Facing Messages
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β DATA LAYER β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
PostgreSQL β β
ChromaDB β β
Redis β
β (Domain-First Schema) β (Deployed & Working) β (Deployed & Working) β
β β β β
β β
Domain Persistence β β
Vector Storage β β
Event Queue β
β (Working) β (Working) β (Working) β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β INFRASTRUCTURE LAYER β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
Docker Compose β β
Traefik Gateway β β
Temporal β
β (Deployed & Running) β (Deployed & Running) β (Deployed & Running) β
β β β β
β β
Service Discovery β β
Load Balancing β β
Workflow Engine β
β (Working) β (Working) β (Working) β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β EXTERNAL INTEGRATIONS β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
Claude API β β
GitHub API β π Slack/Teams β
β (Connected & Working) β (Fully Integrated) β (Not Yet Designed) β
β β (Issue Creation Working)β β
β β
OpenAI API β π Jira API β π Analytics APIs β
β (Connected & Working) β (Not Yet Designed) β (Planned Q3 2025) β
β β β - Datadog β
β β β - New Relic β
β β β - Google Analytics β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
## Infrastructure Constants
### Development Environment
- **Port**: 8001 (all local development, NOT 8080)
- **Web Pattern**: Single app.py (~750 lines as of Sept 2025)
- **Database**: PostgreSQL with AsyncSessionFactory pattern
- **Config**: PIPER.user.md in config/ directory
- **Python**: 3.11+ required
### API Patterns
- **REST Response Structure**:
```json
{
"status": "success",
"data": {
"field_name": "value",
"nested_fields": {...}
}
}
{
"status": "error",
"error": "message",
"details": {}
}
piper-morgan/
βββ cli/
β βββ commands/ # Standalone CLI scripts
βββ web/
β βββ app.py # Single file (NO routes/ directory)
βββ services/ # Domain-driven design
β βββ features/ # Feature services
β βββ infrastructure/ # Infrastructure services
β βββ shared_types.py # Shared type definitions
βββ config/
β βββ PIPER.user.md # User configuration
βββ docs/
βββ architecture/ # Technical documentation
βββ planning/ # Roadmaps, backlogs
The database layer uses PostgreSQL with SQLAlchemy 2.0+ async patterns.
# Current pattern in ALL services
from services.infrastructure.database import AsyncSessionFactory
async def get_data():
async with AsyncSessionFactory() as session:
result = await session.execute(query)
return result.scalars().all()
# Query pattern
async with AsyncSessionFactory() as session:
stmt = select(Model).where(Model.field == value)
result = await session.execute(stmt)
# Insert pattern
async with AsyncSessionFactory() as session:
session.add(new_object)
await session.commit()
The web layer uses a single FastAPI file (MVP pattern) with embedded HTML.
File: web/app.py (~750 lines as of Sept 2025)
# API endpoint returning JSON
@app.get("/api/standup")
async def morning_standup_api():
return {
"status": "success",
"data": {
"yesterday_accomplishments": [...],
"today_priorities": [...],
"blockers": []
}
}
# UI endpoint returning HTML
@app.get("/standup")
async def standup_ui():
return HTMLResponse(content="""
<!DOCTYPE html>
<html>
<!-- Embedded HTML with JavaScript -->
</html>
""")
yesterday_accomplishments, not accomplishmentsdata.data.field_name in JavaScriptThe MUX (Modeled User Experience) Object Model provides consciousness and ownership tracking for Piper Morgan. This enables Piper to have identity, awareness, emotional states, and epistemological tracking of knowledge sources.
Location: services/mux/
Tests: 165+ tests covering consciousness (104) and ownership (61)
Patterns: 055-058 (documented in pattern catalog)
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β MUX Object Model Layer β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β Consciousness Module β Ownership Module β Grammar β
β (services/mux/ β (services/mux/ β Foundation β
β consciousness.py) β ownership.py) β β
β β’ PiperEntity β β’ OwnershipCategory β "Entities β
β β’ AwarenessLevel β β’ OwnershipMetadata β experience β
β β’ EmotionalState β β’ OwnershipResolver β Moments β
β β’ ConsciousnessAttrs β β’ OwnershipTransform β at Places" β
β β’ EntityContext β β’ HasOwnership Proto β β
β β’ ConsciousnessExpr β β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
The MUX grammar provides semantic grounding for the object model:
Piper and other entities can have:
All knowledge has an epistemological relationship:
The grammar-driven classification (Pattern-057) uses MUX concepts:
ConsciousnessExpression generates first-person language:
OwnershipMetadata can be embedded in any domain model:
@dataclass
class Session:
id: str
ownership: OwnershipMetadata = field(
default_factory=lambda: OwnershipMetadata.native("piper-core")
)
Model Context Protocol (MCP) integration enables tool federation and spatial intelligence.
# MCP Consumer pattern
from services.infrastructure.mcp_consumer import MCPConsumerCore
async def fetch_github_issues():
consumer = MCPConsumerCore()
return await consumer.fetch_resources("github-issues")
8-dimensional analysis across:
# Always deploy both Code and Cursor for critical fixes
# Phase 0: Investigation (both agents)
# Phase 1: Implementation (agent-specific)
# Phase 2: Cross-validation (verify each other)
The Slack integration implements a revolutionary spatial metaphor approach, enabling Piper Morgan to understand and navigate Slack environments as physical spaces. This creates an embodied AI experience where Piper develops spatial awareness and memory.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Slack Spatial Integration β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β OAuth Handler β Spatial Mapper β Webhook Router β
β β’ Territory Init β β’ Metaphor Engine β β’ Event Process β
β β’ State Management β β’ Spatial Objects β β’ Signature Verifyβ
β β β’ Coordinate System β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β Workspace Navigator β Attention Model β Spatial Memory β
β β’ Multi-Territory β β’ Priority Algorithmsβ β’ Pattern Learningβ
β β’ State Tracking β β’ Decay Models β β’ JSON Persistenceβ
β β’ Risk Assessment β β’ Focus Management β β’ Cross-Session β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Slack Event β Spatial Mapping β Attention Processing β Navigation Decision β Workflow Creation
β β β β β
WebHook Event β Room/Territory β Attention Event β Priority Score β Piper Workflow
@piper help with feature β CREATE_FEATURE workflow
User Intent β Intent Classifier β EXECUTION/SYNTHESIS β Workflow Factory (ValidationRegistry) β Context Validation β Orchestration Engine β External Systems
β β
Pre-execution Validation State Changes
(User-friendly errors)
User Intent β Intent Classifier β QUERY β OrchestrationEngine.handle_query_intent()
β
QueryRouter.get_query_router() β Session-Aware Services β Repository β Direct Data Access
β
Formatted Results β Web Response
Current Implementation Details:
web/app.py routes QUERY intents to orchestration_engine.handle_query_intent(intent)The Context Validation Framework provides pre-execution validation for all workflow types, preventing runtime failures and delivering user-friendly error messages with actionable guidance.
services/orchestration/workflow_factory.pyservices/orchestration/validation.pyWorkflow Creation Request
β
ValidationRegistry (Context Requirements)
β
WorkflowContextValidator (Rule Validation)
β
[Valid Context] β Workflow Execution
β
[Invalid Context] β User-Friendly Error Message
original_message, validates project_id and repositoryoriginal_message onlyoriginal_message, validates file_id and resolved_file_idoriginal_message, validates data_sourceoriginal_message, validates github_urloriginal_message, validates scope and objectivesThe CQRS-lite pattern separates read operations (queries) from write operations (commands) in the Piper Morgan system. This provides clear architectural boundaries, better performance for simple data fetches, and prevents forcing query-like operations into complex workflow patterns.
Queries are identified through intent classification:
python
# Intent classifier recognizes QUERY category for read-only operations
if intent.category == IntentCategory.QUERY:
# Route to QueryRouter
result = await query_router.route_query(intent)
else:
# Route to WorkflowFactory for commands
workflow = await workflow_factory.create_from_intent(intent)
The QueryRouter handles QUERY category intents by dispatching them to appropriate query services based on intent analysis. Status: β
Operational and integrated.
The QueryRouter is integrated into the OrchestrationEngine using an on-demand initialization pattern with session-aware wrappers:
class OrchestrationEngine:
def __init__(self, llm_client: Optional[LLMClient] = None):
# QueryRouter initialized on-demand using async session pattern
self.query_router = None
async def get_query_router(self) -> QueryRouter:
"""Get QueryRouter, initializing on-demand with session-aware wrappers"""
if self.query_router is None:
# Initialize QueryRouter with session-aware services
self.query_router = QueryRouter(
project_query_service=SessionAwareProjectQueryService(),
conversation_query_service=ConversationQueryService(),
file_query_service=SessionAwareFileQueryService(),
)
return self.query_router
async def handle_query_intent(self, intent: Intent) -> Dict[str, Any]:
"""Handle QUERY intents using QueryRouter integration (GREAT-1B bridge method)"""
query_router = await self.get_query_router()
if intent.action in ["search_projects", "list_projects", "find_projects"]:
projects = await query_router.project_queries.list_active_projects()
return {"message": f"Found {len(projects)} active projects", "data": projects}
# ... other query routing logic
The QueryRouter uses a comprehensive initialization pattern with multiple service integrations:
class QueryRouter:
"""Routes QUERY intents to appropriate query services with LLM enhancement"""
def __init__(
self,
project_query_service: ProjectQueryService,
conversation_query_service: ConversationQueryService,
file_query_service: FileQueryService,
# PM-034 Phase 2B: LLM Intent Classification Integration
llm_classifier: Optional[LLMIntentClassifier] = None,
knowledge_graph_service: Optional[KnowledgeGraphService] = None,
semantic_indexing_service: Optional[SemanticIndexingService] = None,
# Performance and reliability features
performance_targets: Optional[Dict[str, float]] = None,
degradation_config: Optional[Dict] = None,
# MCP Consumer integration for external tool federation
mcp_consumer: Optional["MCPConsumerCore"] = None,
enable_mcp_federation: bool = True,
):
# Service initialization with comprehensive configuration
The complete flow from user input to QueryRouter execution:
web/app.py): Receives user input and creates workfloworchestration_engine.handle_query_intent(intent)# Actual flow in web/app.py
if intent.category.value == "QUERY":
print(f"π§ Routing generic QUERY intent to QueryRouter: {intent.action}")
result = await orchestration_engine.handle_query_intent(intent)
return {
"message": f"Query processed successfully: {intent.action}",
"result": result,
"workflow_id": workflow.id,
}
The QueryRouter implementation includes comprehensive error handling:
response_format={"type": "json_object"} parameter ensures consistent JSON responsestests/regression/test_queryrouter_lock.py prevents accidental disablingSeptember 2025 - PM-034 QueryRouter Resurrection:
response_format={"type": "json_object"} parameter to LLM callshandle_query_intent() for seamless integrationKey Technical Improvements:
Implementation Status: β Complete and operational
services/queries/query_router.py (935 lines total)__init__, route_query, classify_and_route, federated_search)handle_query_intent() bridge methodtests/regression/test_queryrouter_lock.py prevent accidental disablingVerified Metrics (October 13, 2025):
dev/2025/10/13/proof-1-great-1-evidence.md)Query services provide read-only access to domain data:
python
class ProjectQueryService:
async def list_active_projects(self) -> List[Project]:
return await self.repo.list_active_projects()
async def get_project_by_id(self, project_id: str) -> Optional[Project]:
return await self.repo.get_by_id(project_id)
list_projects - List all active projectsget_project - Get specific project by IDget_default_project - Get the default projectfind_project - Find project by namecount_projects - Count active projectsThe web UI is now implemented as a DDD-compliant, test-driven interface. All bot message rendering and response handling is unified in a shared domain module (bot-message-renderer.js), ensuring:
Key Features:
marked.js (battle-tested library)Architecture Impact:
Example Flow:
User Input: "Users are complaining about crashes when uploading photos"
β
Intent: ANALYSIS / investigate_crash
β
Workflow: GENERATE_REPORT (crash analysis)
β
Template: "I'll {human_action} you reported. Let me analyze this for you."
β
Humanization: "investigate_crash" β "investigate the crash"
β
Response: "I'll investigate the crash you reported. Let me analyze this for you."
All core infrastructure services are deployed and operational:
Core AI capabilities are operational:
Clean separation of concerns with PM concepts driving architecture:
Adopted per-call pattern for context injection rather than stateful factories. Benefits:
Introduced Query Service pattern to separate reads from writes:
Implemented sophisticated project resolution with:
Moved from hardcoded SQL to SQLAlchemy model-driven schema:
Discovered and documented during GitHub integration:
self.task_handlers = {TaskType.X: self._method_x}TaskType.GITHUB_CREATE_ISSUE: self._create_github_issueImplemented automatic repository lookup for GitHub workflows:
create_workflow_from_intentNamed Volumes (Recommended):
volumes:
piper_postgres_data:
name: piper_postgres_data_v1 # Explicit versioned name
Benefits:
Avoid Bind Mounts for Databases:
Lesson Learned: PM-011 - Directory rename caused data loss with bind mounts
Status: Not implemented Impact: Users get technical errors instead of helpful messages Solution: Implement comprehensive error handling with user-friendly messages
Status: Not started Impact: API-only interaction blocks user testing Solution: Build simple Streamlit or FastAPI chat interface
Status: Partially implemented (PM-009 query work in progress) Impact: Some queries forced into workflow pattern Solution: Complete Query Service implementation for LIST_PROJECTS and similar operations
All services communicate through events for:
Different models for different tasks:
External systems as plugins for:
Status: 100% Complete
Goals: Complete CQRS, activate learning, enhance workflows
Vision: Autonomous assistance and strategic insights
Date: July 13, 2025 Impact: High - Runtime reliability improvements, test contract changes
Problem Solved: File analysis workflow failures due to type mismatches between workflow context (integers) and database queries (strings).
Root Cause: Workflow context handling evolved to pass file IDs as integers from session management, but PostgreSQL repository interfaces expected string parameters.
Solution Pattern:
# Type conversion at service boundaries
file_id = str(file_id) # Convert to expected type before repository call
Architectural Lesson: Always validate and convert types at service boundaries to maintain contract integrity between layers.
Enhancement: Added intent metadata to workflow context for template system integration.
Pattern:
# Enhanced context propagation
context["intent_category"] = intent.category.value
context["intent_action"] = intent.action
Impact: Enables context-aware messaging without breaking domain model isolation.
Problem: Rule-based pre-classification became too aggressive, matching compound messages that require LLM analysis.
Evolution:
Required Fix: Add complexity detection to distinguish simple vs. compound messages.
Architectural Principle: Pre-classification should handle only unambiguous patterns. Complex or compound messages must flow through full LLM analysis.
Date: July 13, 2025 Status: Successfully integrated with minimal architectural impact
# Intent-based template selection
template = get_message_template(
intent_category=workflow.context.get("intent_category"),
intent_action=workflow.context.get("intent_action"),
workflow_type=workflow.type
)
Issue: Architectural improvements broke test assumptions about context handling and pre-classification behavior.
Root Cause: Tests written against earlier patterns didnβt evolve with architectural refinements.
Lessons Learned:
Achievement: Complete end-to-end file analysis pipeline now functional after resolving type safety issues.
Components Validated:
Performance: File analysis workflow success rate: 100% (post-fix)
AsyncSessionFactory Migration:
AsyncSessionFactory).Business Logic Test Suite Modernization:
Current Infrastructure TODOs:
See session logs and migration guide for full details.
Date: July 18, 2025 Impact: 642x performance improvement, production-ready infrastructure Status: Complete with feature flag deployment
The initial MCP integration suffered from a critical connection-per-request pattern causing:
Singleton Connection Pool with Circuit Breaker Protection:
class MCPConnectionPool:
"""Thread-safe singleton with circuit breaker and health monitoring"""
@asynccontextmanager
async def connection(self, server_config: Dict[str, Any]):
"""Context manager for automatic connection lifecycle"""
connection = await self.get_connection(server_config)
try:
yield connection
finally:
await self.return_connection(connection)
_instance = None
_instance_lock = threading.Lock()
@classmethod
def get_instance(cls):
if cls._instance is None:
with cls._instance_lock:
if cls._instance is None: # Double-checked locking
cls._instance = cls()
return cls._instance
async def _ensure_async_resources(self):
"""Initialize async resources only when needed"""
if self._connection_semaphore is None:
self._connection_semaphore = asyncio.Semaphore(self.max_connections)
if self._pool_lock is None:
self._pool_lock = asyncio.Lock()
Critical Discovery: Never hold async locks during I/O operations. Initial implementation deadlocked due to nested lock acquisition during connection creation.
async def _check_circuit_breaker(self):
"""Prevent cascade failures with configurable thresholds"""
if self._circuit_state == "open":
if time.time() - self._last_failure_time > self.circuit_breaker_timeout:
self._circuit_state = "half-open"
else:
raise MCPCircuitBreakerOpenError("Circuit breaker is open")
Configuration:
Safe Deployment Pattern:
# Feature flag with graceful fallback
USE_MCP_POOL = os.getenv("USE_MCP_POOL", "false").lower() == "true"
# Dual-mode operation in MCPResourceManager
if self.use_pool:
async with self.connection_pool.connection(self.client_config) as client:
content_results = await client.search_content(query)
else:
content_results = await self.client.search_content(query)
Benefits:
| Metric | Before (POC) | After (Pool) | Improvement |
|---|---|---|---|
| Connection Time | 103ms | 0.16ms | 642x faster |
| Memory Usage | Growing | Stable | Leak eliminated |
| Concurrent Scaling | Linear degradation | Constant performance | Unlimited scaling |
Real-World Impact:
TDD Metrics:
Test Categories:
Centralized Fault Tolerance: Pool-level circuit breaker provides system-wide protection against cascade failures.
class MCPConnectionPool:
def __init__(self):
# Circuit breaker configuration
self.circuit_breaker_threshold = 5 # Failures before opening
self.circuit_breaker_timeout = 60 # Recovery timeout (seconds)
self._circuit_state = "closed" # closed, open, half-open
self._failure_count = 0
self._last_failure_time = 0
async def _record_failure(self):
"""Record failure and potentially open circuit breaker"""
self._failure_count += 1
self._last_failure_time = time.time()
if self._failure_count >= self.circuit_breaker_threshold:
self._circuit_state = "open"
logger.error(f"Circuit breaker opened after {self._failure_count} failures")
async def _record_success(self):
"""Record success and potentially close circuit breaker"""
if self._circuit_state == "half-open":
self._circuit_state = "closed"
self._failure_count = 0
logger.info("Circuit breaker closed after successful connection")
β Anti-Pattern: Holding Locks During I/O
# WRONG - Causes deadlocks
async def _create_new_connection(self, server_config):
async with self._pool_lock: # Lock acquired
connection = PiperMCPClient(server_config)
await connection.connect() # I/O operation while holding lock
async with self._pool_lock: # DEADLOCK - nested acquisition
self._all_connections.append(connection)
β Correct Pattern: Minimize Lock Scope
# CORRECT - Lock only for shared state modification
async def _create_new_connection(self, server_config):
# I/O outside lock scope
connection = PiperMCPClient(server_config)
await connection.connect()
# Lock only for state modification
async with self._pool_lock:
self._all_connections.append(connection)
Problem: Async resources (locks, semaphores) cannot be created in __init__ due to event loop requirements.
Solution: Lazy initialization pattern:
async def _ensure_async_resources(self):
"""Initialize async resources when first needed"""
if self._connection_semaphore is None:
self._connection_semaphore = asyncio.Semaphore(self.max_connections)
if self._pool_lock is None:
self._pool_lock = asyncio.Lock()
Automatic Resource Management:
@asynccontextmanager
async def connection(self, server_config: Dict[str, Any]):
"""Automatic connection acquisition and return"""
connection = await self.get_connection(server_config)
try:
yield connection
finally:
await self.return_connection(connection)
Usage Benefits:
Connection Limiting Pattern:
async def get_connection(self, server_config):
# Acquire semaphore with timeout
await asyncio.wait_for(
self._connection_semaphore.acquire(),
timeout=self.connection_timeout
)
try:
return await self._get_or_create_connection(server_config)
except Exception:
# Always release semaphore on failure
self._connection_semaphore.release()
raise
Dual-Mode Integration: Enhanced existing MCPResourceManager to support both direct connections and pooled connections through feature flag.
Key Integration Points:
# Initialize method - detects pool availability
async def initialize(self, enabled: bool = False):
if self.use_pool:
# Test pool connectivity
async with self.connection_pool.connection(self.client_config) as test_client:
if await test_client.is_connected():
self.initialized = True
else:
# Direct connection (legacy mode)
self.client = PiperMCPClient(self.client_config)
# ... existing logic
Enhanced Statistics: Combined pool and connection metrics for comprehensive monitoring:
async def get_connection_stats(self):
base_stats = {
"using_pool": self.use_pool,
"enabled": self.enabled,
"initialized": self.initialized,
"available": await self.is_available()
}
if self.use_pool and self.connection_pool:
pool_stats = self.connection_pool.get_stats()
base_stats.update(pool_stats)
elif self.client:
client_stats = self.client.get_connection_stats()
base_stats.update(client_stats)
return base_stats
Backward Compatibility: All existing MCPResourceManager APIs maintained without modification.
Deployment Strategy:
USE_MCP_POOL=false by defaultUSE_MCP_POOL=true to enable poolMeasurement Methodology:
Key Findings:
Production Load Simulation:
Productivity Gains:
Complete Technical Documentation: MCP Connection Pool - 642x Performance Improvement
Comprehensive Analysis Including:
Key Architectural Contributions:
asyncio.timeout(): Critical for async operation timeoutsAll GitHub operations now flow through GitHubIntegrationRouter, enabling feature flag control between spatial intelligence and legacy operations.
services/orchestration/engine.py - OrchestrationEngineservices/domain/github_domain_service.py - GitHubDomainServiceservices/domain/pm_number_manager.py - PMNumberManagerservices/domain/standup_orchestration_service.py - StandupOrchestrationServiceservices/integrations/github/issue_analyzer.py - GitHubIssueAnalyzerservices/queries/query_router.py - QueryRoutertests/test_architecture_enforcement.py (7 comprehensive tests).pre-commit-config.yaml (automated violation blocking).github/workflows/architecture-enforcement.ymldocs/architecture/github-integration-router.mdUSE_SPATIAL_GITHUB=true: Enables spatial intelligence (8-dimensional analysis)USE_SPATIAL_GITHUB=false: Uses legacy GitHub operationsLast Updated: January 21, 2026