Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions server/ap_server/routers/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from ..models import (
Agent,
Capabilities,
AgentSchema,
AgentsSearchPostRequest,
AgentsSearchPostResponse,
Expand All @@ -29,7 +30,16 @@ def search_agents(
"""
Search Agents
"""
pass
return AgentsSearchPostResponse(
root=[
Agent(
agent_id="agent_1",
name="find_legal_docs",
description="An agent that can find legal documents.",
capabilities=Capabilities(),
)
]
)


@router.get(
Expand All @@ -55,4 +65,21 @@ def get_agent_schemas(agent_id: str) -> Union[AgentSchema, ErrorResponse]:
"""
Get Agent Schemas
"""
pass
if agent_id == "agent_1":
return AgentSchema(
agent_id=agent_id,
input_schema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Text to search for relevant legal documents",
}
},
"required": ["query"],
"additionalProperties": False,
},
output_schema={},
state_schema={},
)
return ErrorResponse(code=404, message="No such agent.")
13 changes: 12 additions & 1 deletion server/ap_server/routers/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,15 @@ def create_and_wait_run(body: RunCreate) -> Union[RunWaitResponse, ErrorResponse
"""
Create Run, Wait for Output
"""
pass
if body.thread_id:
raise NotImplementedError("Thread ID is not supported in this example.")
if body.agent_id == "agent_1":
return RunWaitResponse(
values={
"name": "Legal Document",
"description": "This is a legal document.",
"findings": ["Finding 1", "Finding 2"],
}
)
else:
raise NotImplementedError()
112 changes: 112 additions & 0 deletions server/mcp_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
"""
# List tools
curl -X POST http://localhost:8000/mcp/ \
-H "Content-Type: application/json" \
-H "Accept: application/json,text/event-stream" \
-d '{
"jsonrpc": "2.0",
"id": 1,
"method": "tools/list",
"params": {}
}'

# Call tools
curl -X POST "http://localhost:8000/mcp/" \
-H "Content-Type: application/json" \
-H "Accept: application/json,text/event-stream" \
-d '{
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "agent_1",
"arguments": {
"foo": "bar",
"count": 3
}
}
}'
"""

import json
from typing import Any, Sequence

import ap_client
from mcp import Tool as MCPTool
from mcp.server.fastmcp import FastMCP
from mcp.server.lowlevel import Server
from mcp.types import TextContent

URL = "http://localhost:8002"

server = Server(name="Agent Protocol MCP")
configuration = ap_client.Configuration(host=URL)


class AgentProtocolMCP(FastMCP):
"""Agent Protocol MCP."""

@server.list_tools()
async def list_tools(self) -> list[MCPTool]:
"""Override list tools to allow setting it dynamically"""
with ap_client.ApiClient(configuration) as api_client:
# Create an instance of the API class
api_instance = ap_client.AgentsApi(api_client)
response = api_instance.search_agents_with_http_info(
ap_client.models.SearchAgentsRequest()
)

mcp_tools = []
for agent in response.data:
# Can parallelize this
schema_response = api_instance.get_agent_schemas(
agent_id=agent.agent_id
)
mcp_tools.append(
MCPTool(
# Choosing the agent ID as the convention for the name.
# This is because MCP has no mechanism to avoid name collisions
# so the name should be unique.
name=agent.id,
# You could prepend the actual agent name to the description.
description=agent.description,
inputSchema=schema_response.input_schema,
)
)
return mcp_tools

@server.call_tool()
async def call_tool(
self,
name: str,
arguments: dict[str, Any],
) -> Sequence["TextContent | ImageContent | EmbeddedResource"]:
"""List tool."""
with ap_client.ApiClient(configuration) as api_client:
# Create an instance of the API class
runs_api = ap_client.RunsApi(api_client)
response = runs_api.create_and_wait_run(
ap_client.RunCreate(
# Not using a state since this is a stateless server
thread_id=None,
agent_id=name,
input=arguments,
)
)
# The easiest thing is to encode the values as JSON and put them
# into a TextContent object.
# You can use other return types if they are appropriate for your use
# case.
return [TextContent(type="text", text=json.dumps(response.values))]


mcp = AgentProtocolMCP(
"StatelessServer",
# Our server implementation is stateless, but you could use a stateful
# implementation if it makes sense for your use case.
stateless_http=True,
# Currently JSON response makes more sense since we're only doing tool calls
# and aren't sending progress notifications.
json_response=True,
)
app = mcp.streamable_http_app()
14 changes: 12 additions & 2 deletions server/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,24 @@ authors = [
{name = "Nuno Campos",email = "nuno@langchain.dev"}
]
license = {text = "MIT"}
readme = "README.md"
requires-python = ">=3.10,<4.0"
dependencies = [
"pydantic (>=2.10.6,<3.0.0)",
"fastapi (>=0.115.8,<0.116.0)"
"fastapi (>=0.115.8,<0.116.0)",
"mcp>=1.8.1",
]


[build-system]
requires = ["poetry-core>=2.0.0,<3.0.0"]
build-backend = "poetry.core.masonry.api"

[dependency-groups]
dev = [
"ap-client",
"ipython>=8.36.0",
"ruff>=0.11.9",
]

[tool.uv.sources]
ap-client = { path = "../client-python" }
Loading