Technical Architecture
Technology Stack
Core Technologies
Component | Technology | Purpose |
---|---|---|
Primary Language | Python 3.10+ | Main implementation language |
Web Framework | FastAPI | API and WebSocket endpoints |
Graph Database | Neo4j | Entity storage and relationship mapping |
Event Stream | Kafka | Reliable event processing |
Cache/State | Redis | In-memory data and session management |
Object Storage | MinIO | File and artifact management |
ML Framework | PyTorch Geometric | GNN implementation |
Container Runtime | Docker | Component isolation and deployment |
Orchestration | Kubernetes | Scaling and service management |
Development Tools
Tool | Purpose |
---|---|
Pydantic | Data validation and schema definition |
Protocol Classes | Service interface contracts |
pytest | Testing framework with coverage metrics |
mypy | Static type checking |
black & isort | Code formatting |
Architecture Patterns
Service Registry Pattern
All services in Agent Party are registered through a centralized Service Registry, which:
- Provides dependency injection for components
- Manages service lifecycle and initialization
- Enables mocking and testing of service dependencies
- Controls service scoping (singleton, transient, scoped)
class ServiceRegistry:
"""Central registry for all system services."""
def __init__(self):
self._services = {}
def register(self, service_type: Type, implementation: Any, scope: str = "singleton"):
"""Register a service implementation for a given interface type."""
self._services[service_type] = {
"implementation": implementation,
"scope": scope,
"instance": None if scope != "singleton" else implementation
}
def get_service(self, service_type: Type) -> Any:
"""Retrieve a service of the specified type."""
if service_type not in self._services:
raise ServiceNotFound(f"Service {service_type.__name__} not registered")
service = self._services[service_type]
if service["scope"] == "singleton":
return service["instance"]
elif service["scope"] == "transient":
return service["implementation"]()
# Handle other scopes as needed
Repository Pattern
Data access is implemented through repositories, which:
- Abstract database operations from business logic
- Provide strongly-typed access methods
- Implement proper transaction management
- Optimize Neo4j queries with appropriate indexing
class AgentRepository(Protocol):
"""Interface for agent data access."""
async def get_agent_by_id(self, agent_id: str) -> Agent:
"""Retrieve an agent by ID."""
...
async def create_agent(self, agent: Agent) -> str:
"""Create a new agent record."""
...
async def update_agent_state(self, agent_id: str, state: str) -> None:
"""Update an agent's state."""
...
Event-Driven Architecture
The system uses event-driven communication with:
- Strongly-typed event definitions using Pydantic
- Reliable event delivery through Kafka
- Idempotent event handlers
- Clear event ownership boundaries
class AgentStateChangedEvent(BaseModel):
"""Event triggered when an agent changes state."""
agent_id: str
previous_state: str
new_state: str
timestamp: datetime
transition_approver: str | None
transition_reason: str
metrics: dict[str, Any]
Component Architecture
Service Layer Components
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β API Layer (FastAPI) β
βββββββββββββββββ²ββββββββββββββββββββββββββββββββ²ββββββββββββββββ
β β
β β
βββββββββββββββββΌββββββββββββ βββββββββββββββββΌββββββββββββββββ
β Service Layer β β WebSocket Manager β
β β β β
β - AgentService β β - ConnectionManager β
β - TeamService β β - MessageHandler β
β - TaskService β β - VisualizationPublisher β
β - TemplateService β β - EventSubscriber β
βββββββββββββββββ²ββββββββββββ βββββββββββββββββββββββββββββββββ
β
β
βββββββββββββββββΌββββββββββββββββββββββββββββββββββββββββββββββββ
β Domain Services β
β β
β - AgentLifecycleManager β
β - TeamAssemblyService β
β - GNNRecommendationService β
β - TaskAnalysisService β
βββββββββββββββββ²ββββββββββββββββββββββββββββββββ²ββββββββββββββββ
β β
β β
βββββββββββββββββΌββββββββββββ βββββββββββββββββΌββββββββββββββββ
β Repository Layer β β Infrastructure Services β
β β β β
β - AgentRepository β β - KafkaEventPublisher β
β - TeamRepository β β - MinIOStorageService β
β - TaskRepository β β - RedisStateManager β
β - TemplateRepository β β - ModelProviderService β
βββββββββββββββββ²ββββββββββββ βββββββββββββββββ²ββββββββββββββββ
β β
β β
βββββββββββββββββΌββββββββββββ βββββββββββββββββΌββββββββββββββββ
β Neo4j Database β β External Services β
βββββββββββββββββββββββββββββ βββββββββββββββββββββββββββββββββ
Agent System Components
βββββββββββββββββββββββββββββ
β Template Registry β
β β
β - Template management β
β - Capability registry β
β - Version control β
βββββββββββββββββ²ββββββββββββ
β
β
βββββββββββββββββΌββββββββββββ βββββββββββββββββββββββββββββββββ
β Agent Factory β β Talent Scout Agent β
β β β β
β - Agent instantiation βββββ€ - Template creation β
β - Parameter validation β β - Capability analysis β
β - Resource allocation β β - Template optimization β
βββββββββββββββββ²ββββββββββββ βββββββββββββββββββββββββββββββββ
β
β
βββββββββββββββββΌββββββββββββ
β Lifecycle Manager β
β β
β - State transitions β
β - Approval workflows β
β - Event generation β
β - Resource tracking β
βββββββββββββββββ²ββββββββββββ
β
β
βββββββββββββββββΌββββββββββββ βββββββββββββββββββββββββββββββββ
β Agent Runtime β β Manager Agents β
β β β β
β - Execution environment βββββ€ - Transition approval β
β - Context management β β - Policy enforcement β
β - Output handling β β - Escalation handling β
βββββββββββββββββββββββββββββ βββββββββββββββββββββββββββββββββ
Team Formation Components
βββββββββββββββββββββββββββββ βββββββββββββββββββββββββββββββββ
β Task Analysis Service β β DJ (GNN Engine) β
β β β β
β - Requirement extraction β β - Graph representation β
β - Capability mapping ββββΊβ - Embeddings generation β
β - Priority assignment β β - Collaboration scoring β
βββββββββββββββββ²ββββββββββββ βββββββββββββββββ²ββββββββββββββββ
β β
β β
βββββββββββββββββΌββββββββββββββββββββββββββββββββΌββββββββββββββββ
β Bartender (Team Assembly Service) β
β β
β - Candidate selection β
β - Role assignment β
β - Team optimization β
β - Resource balancing β
βββββββββββββββββ²ββββββββββββββββββββββββββββββββ²ββββββββββββββββ
β β
β β
βββββββββββββββββΌββββββββββββ βββββββββββββββββΌββββββββββββββββ
β Team Manager β β Collaboration Analyzer β
β β β β
β - Team state management β β - Historical performance β
β - Progress tracking β β - Success pattern detection β
β - Resource allocation β β - Feedback integration β
βββββββββββββββββββββββββββββ βββββββββββββββββββββββββββββββββ
Neo4j Database Design
Core Schema
// Core node types
CREATE CONSTRAINT agent_id IF NOT EXISTS ON (a:Agent) ASSERT a.id IS UNIQUE;
CREATE CONSTRAINT template_id IF NOT EXISTS ON (t:Template) ASSERT t.id IS UNIQUE;
CREATE CONSTRAINT template_version_id IF NOT EXISTS ON (tv:TemplateVersion) ASSERT tv.id IS UNIQUE;
CREATE CONSTRAINT team_id IF NOT EXISTS ON (t:Team) ASSERT t.id IS UNIQUE;
CREATE CONSTRAINT task_id IF NOT EXISTS ON (t:Task) ASSERT t.id IS UNIQUE;
CREATE CONSTRAINT capability_id IF NOT EXISTS ON (c:Capability) ASSERT c.id IS UNIQUE;
CREATE CONSTRAINT user_id IF NOT EXISTS ON (u:User) ASSERT u.id IS UNIQUE;
// Core relationship indices
CREATE INDEX IF NOT EXISTS FOR ()-[r:MEMBER_OF]->() ON (r.role, r.joined_at);
CREATE INDEX IF NOT EXISTS FOR ()-[r:BASED_ON]->() ON (r.version);
CREATE INDEX IF NOT EXISTS FOR ()-[r:WORKED_WITH]->() ON (r.success_score);
CREATE INDEX IF NOT EXISTS FOR ()-[r:HAS_CAPABILITY]->() ON (r.proficiency);
Performance Optimizations
- Use
MERGE
for nodes with unique constraints - Apply appropriate indices for relationship properties
- Batch operations for large data imports
- Use parameterized queries to enable caching
- Implement query timeout management
- Model time-series data with appropriate patterns
Integration Patterns
Event-Driven Integration
βββββββββββββββββ βββββββββββββββββ βββββββββββββββββ
β Producer β β Kafka β β Consumer β
β Service ββββββΊβ Topic ββββββΊβ Service β
βββββββββββββββββ βββββββββββββββββ βββββββββββββββββ
- Reliable Delivery: At-least-once delivery guarantees
- Idempotent Consumers: Handle potential duplicate events
- Event Schemas: Strict typing with Pydantic models
- Error Handling: Dead-letter queues for failed processing
- Event Sourcing: Rebuild state from event streams when needed
Synchronous APIs
- RESTful API for external integrations
- GraphQL for flexible queries
- WebSockets for real-time updates
- Rate limiting and backpressure mechanisms
- Comprehensive authentication and authorization
Testing Strategy
Testing Layers
- Unit Testing
- Focus on one module at a time
- Target 100% test coverage with strategic exclusions
- Use proper mocking for dependencies
- Follow test-driven development approach
- Integration Testing
- Test repository implementations with Neo4j test containers
- Validate Kafka producer/consumer integration
- Test WebSocket communication
- System Testing
- End-to-end workflows through API endpoints
- Performance testing with realistic data volumes
- Fault injection for resilience testing
Neo4j Testing
- Use test fixtures for database setup and teardown
- Create isolated test databases
- Implement cleanup code to remove test artifacts
- Mock Neo4j connections for pure unit tests
Quality Metrics
- Code coverage with pytest-cov
- Performance benchmarks for critical paths
- Type checking compliance with mypy
- Docstring completeness
Deployment Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Kubernetes Cluster β
β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β API Pods β β Worker Pods β β ML Pods β β
β β β β β β β β
β β - FastAPI β β - Consumers β β - GNN β β
β β - WebSocketsβ β - Processorsβ β - Training β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Neo4j β β Kafka β β MinIO β β
β β β β β β β β
β β - Graph DB β β - Events β β - Storage β β
β β - Metrics β β - Streaming β β - Objects β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Redis β β Monitoring β β Logging β β
β β β β β β β β
β β - Cache β β - Prometheusβ β - ELK β β
β β - State β β - Grafana β β - Tracing β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Security Architecture
- Authentication: OAuth2 with JWT tokens
- Authorization: Role-based access control
- API Security: Rate limiting, input validation
- Data Encryption: TLS for all communication
- Secrets Management: Kubernetes secrets or Vault
- Audit Logging: Comprehensive activity logging
- Container Security: Minimal base images, regular scanning
Monitoring and Observability
- Logging: Structured JSON logs with correlation IDs
- Metrics: Prometheus for time-series metrics
- Tracing: Distributed tracing with OpenTelemetry
- Dashboards: Grafana for visualization
- Alerts: Proactive notification for anomalies
- Performance: Tracking execution times of critical paths
Scaling Considerations
- Horizontal Scaling: Stateless components scale horizontally
- Neo4j Clustering: Causal clustering for graph database
- Kafka Partitioning: Topic partitioning for parallel processing
- Redis Clustering: Redis cluster for cache availability
- Resource Optimization: Efficient resource utilization strategies
- Backpressure: Handling overload scenarios gracefully