Enterprise-grade agent orchestration framework implementing the Model Context Protocol (MCP) standard. Build, deploy, and manage AI agents at scale with standardized interfaces, context management, and observability.
Model Context Protocol (MCP) is an open standard by Anthropic that enables AI agents to:
- Share Context - Maintain consistent state across interactions
- Communicate - Exchange structured messages and data
- Collaborate - Work together on complex tasks
- Integrate - Connect with external tools and data sources
This framework provides a production-ready implementation of MCP for building sophisticated AI agent systems.
- π Standardized Agent Interface - Consistent API across all agents
- π§ Context Management - Automatic state tracking and persistence
- π Agent Orchestration - Sequential, parallel, and conditional workflows
- π Full Observability - Tracing, metrics, and logging built-in
- π‘οΈ Error Recovery - Automatic retries and graceful degradation
- β‘ Async-First - Built for high-performance async execution
- π§ Extensible - Easy to add custom agents and tools
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β MCP Agent Framework β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Agent Registry β β
β β β’ Agent Discovery β’ Lifecycle Management β’ Versioning β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Agent Orchestrator β β
β β β’ Workflow Execution β’ Dependency Resolution β’ Retry β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Context Manager β β
β β β’ State Storage β’ Cross-Agent Sharing β’ Persistence β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β ββββββββββββββ ββββββββββββββ ββββββββββββββ β
β β Agent 1 β β Agent 2 β β Agent N β β
β β(Extraction)β β(Validation)β β (Custom) β β
β ββββββββββββββ ββββββββββββββ ββββββββββββββ β
β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Observability Layer β β
β β β’ Tracing (OpenTelemetry) β’ Metrics β’ Structured Logs β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
git clone https://github.com/yourusername/mcp-agent-framework.git
cd mcp-agent-framework
pip install -r requirements.txtfrom mcp_framework import BaseAgent, AgentRequest, AgentResponse
from dataclasses import dataclass
@dataclass
class GreetingRequest(AgentRequest):
name: str
@dataclass
class GreetingResponse(AgentResponse):
message: str
class GreetingAgent(BaseAgent):
"""A simple greeting agent."""
async def execute(self, request: GreetingRequest) -> GreetingResponse:
return GreetingResponse(
message=f"Hello, {request.name}!",
success=True
)
# Use it
agent = GreetingAgent()
response = await agent.execute(GreetingRequest(name="World"))
print(response.message) # "Hello, World!"from mcp_framework import AgentOrchestrator
orchestrator = AgentOrchestrator()
# Register agents
orchestrator.register("extract", ExtractionAgent())
orchestrator.register("validate", ValidationAgent())
orchestrator.register("store", StorageAgent())
# Execute workflow
result = await orchestrator.execute([
("extract", {"source": "document.pdf"}),
("validate", {}), # Uses output from previous step
("store", {})
])The foundation for all agents:
from mcp_framework import BaseAgent
class MyAgent(BaseAgent):
async def execute(self, request):
# Your agent logic here
return response
async def initialize(self):
# Optional: Setup resources
pass
async def cleanup(self):
# Optional: Cleanup resources
passShare state across agents:
from mcp_framework import ContextManager
context = ContextManager()
# Store context
await context.set("user_id", "12345")
await context.set("session", {"key": "value"}, ttl=3600)
# Retrieve context
user_id = await context.get("user_id")
# Context is automatically shared across agents in a workflowCompose complex workflows:
from mcp_framework import AgentOrchestrator
orchestrator = AgentOrchestrator()
# Sequential execution
result = await orchestrator.execute_sequential([agent1, agent2, agent3])
# Parallel execution
results = await orchestrator.execute_parallel([agent1, agent2, agent3])
# Conditional branching
result = await orchestrator.execute_conditional(
condition=lambda ctx: ctx.get("type") == "premium",
if_true=premium_agent,
if_false=standard_agent
)from mcp_framework.agents.kg import (
EntityExtractionAgent,
RelationshipDiscoveryAgent,
KnowledgeGraphBuilder
)from mcp_framework.agents.retrieval import (
VectorSearchAgent,
WebContentLoaderAgent,
DocumentReaderAgent
)from mcp_framework.agents.validation import (
DataQualityAgent,
SchemaValidationAgent,
ContentModerationAgent
)from mcp_framework.agents.extraction import (
StructuredDataExtractor,
JSONExtractor,
TableExtractor
)mcp-agent-framework/
βββ mcp_framework/
β βββ __init__.py
β βββ core/
β β βββ base_agent.py # BaseAgent class
β β βββ orchestrator.py # AgentOrchestrator
β β βββ context.py # ContextManager
β β βββ registry.py # Agent registry
β β βββ protocol.py # MCP protocol types
β βββ agents/
β β βββ kg/ # Knowledge graph agents
β β βββ retrieval/ # Data retrieval agents
β β βββ validation/ # Validation agents
β β βββ extraction/ # Data extraction agents
β βββ integrations/
β βββ langfuse.py # Observability
β βββ redis.py # Context persistence
β βββ qdrant.py # Vector storage
βββ examples/
β βββ basic_agent.py
β βββ multi_agent_workflow.py
β βββ context_sharing.py
βββ tests/
βββ requirements.txt
βββ README.md
# Context storage (optional - defaults to in-memory)
MCP_CONTEXT_STORE=redis://localhost:6379
# Observability (optional)
LANGFUSE_PUBLIC_KEY=pk-lf-...
LANGFUSE_SECRET_KEY=sk-lf-...
# Tracing (optional)
OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317# config/agents.yaml
agents:
- name: entity_extractor
class: mcp_framework.agents.extraction.EntityExtractor
config:
model: gemini-1.5-pro
max_entities: 100
- name: validator
class: mcp_framework.agents.validation.DataValidator
config:
strict_mode: trueBuilt-in support for tracing and metrics:
from mcp_framework import BaseAgent, trace
class MyAgent(BaseAgent):
@trace("my_agent_execution")
async def execute(self, request):
with self.tracer.span("processing"):
result = await self.process(request)
self.metrics.increment("requests_processed")
return resultEach agent should do one thing well.
Define clear request/response types.
class RobustAgent(BaseAgent):
async def execute(self, request):
try:
return await self.primary_method(request)
except PrimaryError:
return await self.fallback_method(request)class ResourceAgent(BaseAgent):
async def initialize(self):
self.connection = await create_connection()
async def cleanup(self):
await self.connection.close()Contributions welcome! Please read our contributing guidelines.
MIT License - See LICENSE for details.
Ravi Teja K - AI/ML Engineer
- GitHub: @TEJA4704