Complete API reference for the agentanycast Python SDK.
pip install agentanycastRequires Python 3.10+.
The main entry point. Manages the daemon lifecycle, sends and receives tasks.
Node(
card: AgentCard,
relay: str | None = None,
key_path: str | Path | None = None,
daemon_addr: str | None = None,
daemon_bin: str | Path | None = None,
daemon_path: str | Path | None = None,
home: str | Path | None = None,
)| Parameter | Description | Default |
|---|---|---|
card |
Agent's capability descriptor | Required |
relay |
Relay server multiaddr for cross-network communication | None (LAN only) |
key_path |
Path to Ed25519 identity key file | <home>/key |
daemon_addr |
Address of an externally managed daemon | Auto-managed |
daemon_bin |
Name of daemon binary to find on PATH | agentanycastd |
daemon_path |
Absolute path to daemon binary | Auto-download |
home |
Data directory for daemon state | ~/.agentanycast |
status_callback |
Optional callback for progress messages (daemon download/startup). | None |
The recommended way to use Node:
async with Node(card=card) as node:
# node.start() is called automatically
...
# node.stop() is called automatically| Property | Type | Description |
|---|---|---|
peer_id |
str |
This node's Peer ID (available after start()) |
is_running |
bool |
Whether the node is currently running |
await node.start()Starts the daemon process and connects the gRPC channel. Called automatically when using async with.
await node.stop()Stops the daemon process and closes the gRPC channel. Called automatically when using async with.
handle = await node.send_task(
peer_id: str | None = None,
skill: str | None = None,
url: str | None = None,
message: dict | Message,
target_skill_id: str | None = None,
context_id: str | None = None,
metadata: dict[str, str] | None = None,
) -> TaskHandleSends a task to a remote agent. Exactly one of peer_id, skill, or url must be provided.
| Parameter | Description |
|---|---|
peer_id |
Direct mode — target agent's Peer ID |
skill |
Anycast mode — skill ID to route by (relay resolves the target) |
url |
HTTP Bridge mode — URL of an HTTP A2A agent |
message |
The message to send (dict or Message object) |
target_skill_id |
Optional skill to target on the remote agent (for direct mode) |
context_id |
Optional context ID for multi-turn conversations |
metadata |
Optional key-value metadata for the task |
Returns: A TaskHandle for tracking the task's progress.
Examples:
# Direct — by Peer ID
handle = await node.send_task(
peer_id="12D3KooW...",
message={"role": "user", "parts": [{"text": "Hello!"}]},
)
# Anycast — by skill
handle = await node.send_task(
skill="translate",
message={"role": "user", "parts": [{"text": "Hello!"}]},
)
# HTTP Bridge — by URL
handle = await node.send_task(
url="https://agent.example.com",
message={"role": "user", "parts": [{"text": "Hello!"}]},
)agents = await node.discover(
skill: str,
tags: dict[str, str] | None = None,
limit: int | None = None,
) -> list[DiscoveredAgent]Finds agents offering a specific skill via the relay's skill registry or DHT.
| Parameter | Description |
|---|---|
skill |
Skill ID to search for |
tags |
Optional tag filters for fine-grained matching |
limit |
Max number of results to return |
Returns: A list of DiscoveredAgent objects with peer_id, name, description, and skills.
agents = await node.discover("translate", tags={"lang": "fr"})
for agent in agents:
print(f"{agent.name} ({agent.peer_id})")card = await node.get_card(peer_id: str) -> AgentCardFetches a remote agent's Agent Card, describing its capabilities and skills.
await node.connect_peer(
peer_id: str,
addresses: list[str] | None = None,
)Explicitly connects to a peer. Usually not needed — peers connect automatically via mDNS or relay.
peers = await node.list_peers() -> list[dict]Returns a list of currently connected peers with connection metadata.
@node.on_task
async def handle(task: IncomingTask):
...Decorator that registers a handler for incoming tasks. Only one handler can be registered per node.
await node.serve_forever()Blocks until the node is stopped (e.g., via Ctrl+C or node.stop()). Use this for server-mode agents that wait for incoming tasks.
Describes an agent's identity, capabilities, and metadata. Compatible with the A2A Agent Card specification.
@dataclass
class AgentCard:
name: str
description: str = ""
version: str = "1.0.0"
protocol_version: str = "a2a/0.3"
skills: list[Skill] = field(default_factory=list)
# Read-only (populated by the daemon after start):
peer_id: str | None = None
supported_transports: list[str] = field(default_factory=list)
relay_addresses: list[str] = field(default_factory=list)
did_key: str | None = None
did_web: str | None = None
did_dns: str | None = None
verifiable_credentials: list[str] = field(default_factory=list)| Field | Description |
|---|---|
name |
Human-readable agent name |
description |
What this agent does |
version |
Agent version string |
protocol_version |
A2A protocol version |
skills |
List of skills this agent can perform |
peer_id |
This agent's Peer ID (set by daemon, read-only) |
supported_transports |
Active transport protocols (read-only) |
relay_addresses |
Connected relay addresses (read-only) |
did_key |
W3C did:key derived from Ed25519 public key (read-only) |
did_web |
did:web identifier for web-based DID resolution (read-only) |
did_dns |
did:dns domain for DNS-based resolution (read-only) |
verifiable_credentials |
JSON-encoded Verifiable Credentials (read-only) |
card.to_dict() -> dict # Serialize to dictionary
AgentCard.from_dict(d) -> AgentCard # Deserialize from dictionaryDefines a single capability an agent can perform.
@dataclass
class Skill:
id: str
description: str = ""
input_schema: str | None = None # JSON Schema string
output_schema: str | None = None # JSON Schema string| Field | Description |
|---|---|
id |
Unique skill identifier (e.g., "echo", "translate") |
description |
Human-readable description of what the skill does |
input_schema |
Optional JSON Schema describing expected input |
output_schema |
Optional JSON Schema describing output format |
Example with schemas:
Skill(
id="translate",
description="Translate text between languages",
input_schema='{"type": "object", "properties": {"text": {"type": "string"}, "target_lang": {"type": "string"}}}',
output_schema='{"type": "object", "properties": {"translated": {"type": "string"}}}',
)Returned by send_task(). Tracks a remote task's progress.
| Property | Type | Description |
|---|---|---|
task_id |
str |
Unique task identifier |
status |
TaskStatus |
Current task status |
artifacts |
list[Artifact] |
Result artifacts (available after completion) |
result = await handle.wait(timeout: float | None = None) -> TaskBlocks until the task reaches a terminal state (completed, failed, or canceled).
| Parameter | Description |
|---|---|
timeout |
Max seconds to wait. None for no timeout. |
Returns: The completed Task object with all artifacts.
Raises: TaskTimeoutError, TaskFailedError, TaskCanceledError
await handle.cancel()Requests cancellation of the task.
Passed to @on_task handlers. Represents a task received from a remote agent.
| Property | Type | Description |
|---|---|---|
task_id |
str |
Unique task identifier |
peer_id |
str |
Sender's Peer ID |
messages |
list[Message] |
Messages from the sender |
target_skill_id |
str |
Which skill the sender is targeting |
sender_card |
AgentCard | None |
Sender's Agent Card (if available) |
await task.update_status(status: str)Updates the task's status. Use "working" to indicate processing has started.
await task.complete(artifacts: list[dict | Artifact] | None = None)Marks the task as completed with optional result artifacts.
Example:
await task.complete(artifacts=[
{"name": "result", "parts": [{"text": "The answer is 42"}]},
])await task.fail(error: str)Marks the task as failed with an error message.
await task.request_input(message: dict | Message | None = None)Requests additional input from the sender. Sets the task status to INPUT_REQUIRED.
Complete task state, returned by TaskHandle.wait().
@dataclass
class Task:
task_id: str
context_id: str = ""
status: TaskStatus = TaskStatus.SUBMITTED
messages: list[Message] = field(default_factory=list)
artifacts: list[Artifact] = field(default_factory=list)
target_skill_id: str = ""
originator_peer_id: str = ""
created_at: datetime | None = None
updated_at: datetime | None = NoneA single communication turn in a task conversation.
@dataclass
class Message:
role: str # "user" or "agent"
parts: list[Part] = field(default_factory=list)
message_id: str = ""
created_at: datetime | None = NoneMessages can be created from dicts for convenience:
# These are equivalent:
Message(role="user", parts=[Part(text="Hello")])
Message.from_dict({"role": "user", "parts": [{"text": "Hello"}]})A content unit within a Message or Artifact.
@dataclass
class Part:
text: str | None = None
data: dict[str, Any] | None = None
url: str | None = None
raw: bytes | None = None
media_type: str | None = None
metadata: dict[str, str] | None = NoneParts support multiple content types:
# Text content
Part(text="Hello, world!")
# Structured data
Part(data={"temperature": 72, "unit": "F"}, media_type="application/json")
# URL reference
Part(url="https://example.com/image.png", media_type="image/png")
# Binary data
Part(raw=b"\x89PNG...", media_type="image/png")An output produced by an agent when completing a task.
@dataclass
class Artifact:
artifact_id: str = ""
name: str = ""
parts: list[Part] = field(default_factory=list)Enum representing the lifecycle state of a task.
class TaskStatus(Enum):
SUBMITTED = "submitted"
WORKING = "working"
INPUT_REQUIRED = "input_required"
COMPLETED = "completed"
FAILED = "failed"
CANCELED = "canceled"
REJECTED = "rejected"| Property | Type | Description |
|---|---|---|
is_terminal |
bool |
Whether this status represents a final state |
Terminal states: COMPLETED, FAILED, CANCELED, REJECTED.
All exceptions inherit from AgentAnycastError.
AgentAnycastError
├── DaemonError
│ ├── DaemonNotFoundError — daemon binary not found
│ ├── DaemonStartError — daemon failed to start
│ └── DaemonConnectionError — gRPC connection to daemon lost
├── PeerError
│ ├── PeerNotFoundError — peer ID not found in the network
│ ├── PeerDisconnectedError — peer disconnected during operation
│ ├── PeerAuthenticationError — Noise handshake failed
│ └── CardNotAvailableError — peer's Agent Card not available
├── TaskError
│ ├── TaskNotFoundError — task ID does not exist
│ ├── TaskTimeoutError — wait() timed out
│ ├── TaskCanceledError — task was canceled
│ ├── TaskFailedError — remote agent failed the task
│ └── TaskRejectedError — remote agent rejected the task
├── RoutingError
│ └── SkillNotFoundError — no agents found for the requested skill
└── BridgeError
├── BridgeConnectionError — HTTP bridge connection failed
└── BridgeTranslationError — failed to translate HTTP ↔ P2P
Example error handling:
from agentanycast import Node, AgentCard
from agentanycast.exceptions import (
TaskTimeoutError,
PeerNotFoundError,
SkillNotFoundError,
)
async with Node(card=card) as node:
try:
handle = await node.send_task(skill="translate", message=msg)
result = await handle.wait(timeout=30)
except SkillNotFoundError:
print("No agents found for 'translate'")
except PeerNotFoundError:
print("Could not find the target agent")
except TaskTimeoutError:
print("Agent didn't respond within 30 seconds")Convert between libp2p Peer IDs and W3C Decentralized Identifiers.
from agentanycast.did import peer_id_to_did_key, did_key_to_peer_id
from agentanycast.did import did_web_to_url, url_to_did_web
# did:key — derived from Ed25519 public key
did = peer_id_to_did_key("12D3KooW...") # "did:key:z6Mk..."
pid = did_key_to_peer_id("did:key:z6Mk...") # "12D3KooW..."
# did:web — web-based DID resolution
url = did_web_to_url("did:web:example.com:agents:myagent")
# "https://example.com/agents/myagent/did.json"
did = url_to_did_web("https://example.com/agents/myagent/did.json")
# "did:web:example.com:agents:myagent"Map between MCP (Model Context Protocol) tools and A2A skills.
from agentanycast.mcp import (
MCPTool,
mcp_tool_to_skill,
skill_to_mcp_tool,
mcp_tools_to_agent_card,
)
# Convert MCP tools to an AgentCard
card = mcp_tools_to_agent_card(mcp_tools, name="MCPBridge")
# Individual conversions
skill = mcp_tool_to_skill(mcp_tool)
tool = skill_to_mcp_tool(skill)from agentanycast.adapters.crewai import CrewAIAdapter, serve_crew
# Quick start
await serve_crew(crew, card=card, relay="...")
# Or use the adapter directly
adapter = CrewAIAdapter(crew)
async with Node(card=card) as node:
@node.on_task
async def handle(task):
await adapter.handle(task)
await node.serve_forever()Install: pip install agentanycast[crewai]
from agentanycast.adapters.langgraph import LangGraphAdapter, serve_graph
# Quick start
await serve_graph(compiled_graph, card=card, relay="...")
# Or use the adapter directly
adapter = LangGraphAdapter(compiled_graph)Install: pip install agentanycast[langgraph]
from agentanycast.adapters.adk import ADKAdapter, serve_adk_agent
# Quick start
await serve_adk_agent(agent, card=card, relay="...")
# Or use the adapter directly
adapter = ADKAdapter(agent, app_name="myapp")Install: pip install agentanycast[google-adk]
from agentanycast.adapters.openai_agents import OpenAIAgentsAdapter, serve_openai_agent
# Quick start
await serve_openai_agent(agent, card=card, relay="...")Install: pip install agentanycast[openai-agents]
Bidirectional conversion between internal models and the official A2A v1.0 JSON format.
from agentanycast.compat.a2a_v1 import (
task_to_a2a_json, task_from_a2a_json,
card_to_a2a_json, card_from_a2a_json,
message_to_a2a_json, message_from_a2a_json,
)
a2a_json = task_to_a2a_json(task) # Internal Task → A2A v1.0 JSON
task = task_from_a2a_json(a2a_json) # A2A v1.0 JSON → Internal TaskConvert Agent Cards to/from OASF records for publishing to the AGNTCY Agent Directory Service.
from agentanycast.compat.oasf import card_to_oasf_record, card_from_oasf_record
record = card_to_oasf_record(card, authors=["my-org"])
card = card_from_oasf_record(record)Query the AGNTCY agent directory for cross-ecosystem discovery.
from agentanycast.compat.agntcy import AGNTCYDirectory
directory = AGNTCYDirectory(base_url="https://directory.agntcy.org")
agents = await directory.search("translation")The SDK includes a command-line interface. Pass --verbose (or -v) before any command to enable debug logging.
agentanycast [--verbose] demo [--relay ADDR] [--home DIR] # Start an echo agent
agentanycast [--verbose] discover SKILL [-t key=value] # Find agents by skill
agentanycast [--verbose] send PEER_ID MESSAGE # Send a task
agentanycast [--verbose] status # Show node status
agentanycast [--verbose] info # Version & config infoAll commands produce enhanced colored output for improved readability.