diff --git a/01-tutorials/10-Agent-Registry/01-advanced/registry-tool-provider/.env.example b/01-tutorials/10-Agent-Registry/01-advanced/registry-tool-provider/.env.example new file mode 100644 index 000000000..3fa781d8c --- /dev/null +++ b/01-tutorials/10-Agent-Registry/01-advanced/registry-tool-provider/.env.example @@ -0,0 +1,5 @@ +AWS_REGION=us-west-2 +AWS_ACCOUNT_ID= +REGISTRY_ID= +REGISTRY_CP_ENDPOINT=https://bedrock-agentcore-control.us-west-2.amazonaws.com +REGISTRY_DP_ENDPOINT=https://bedrock-agentcore.us-west-2.amazonaws.com diff --git a/01-tutorials/10-Agent-Registry/01-advanced/registry-tool-provider/Images/image.png b/01-tutorials/10-Agent-Registry/01-advanced/registry-tool-provider/Images/image.png new file mode 100644 index 000000000..9d96d6273 Binary files /dev/null and b/01-tutorials/10-Agent-Registry/01-advanced/registry-tool-provider/Images/image.png differ diff --git a/01-tutorials/10-Agent-Registry/01-advanced/registry-tool-provider/README.md b/01-tutorials/10-Agent-Registry/01-advanced/registry-tool-provider/README.md new file mode 100644 index 000000000..8a5b339e8 --- /dev/null +++ b/01-tutorials/10-Agent-Registry/01-advanced/registry-tool-provider/README.md @@ -0,0 +1,118 @@ +# RegistryToolProvider — Dynamic Tool Discovery for Strands Agents + +A Strands `ToolProvider` that discovers tools from AWS Agent Registry via semantic search. Instead of hardcoding tools at startup, the agent gets only the tools relevant to the current domain. + +![image](Images/image.png) + + +## How It Works + +1. Agent calls `load_tools()` before each LLM turn +2. Provider searches Registry with each domain keyword (semantic search) +3. Matching records are parsed based on protocol: + - **MCP** → extracts tools from `tools`, routes `tools/call` through Gateway + - **A2A** → creates an `invoke_` tool, routes through AgentCore Runtime + - **Custom** → creates a passthrough tool from record metadata +4. Only tools from `APPROVED` records are returned (configurable) +5. Results are cached for `cache_ttl` seconds (default 300) + +## Supported Protocols + +| Protocol | Record contains | Provider creates | Invocation path | +|---|---|---|---| +| MCP | `tools` with tool definitions | One `PythonAgentTool` per tool | Gateway `tools/call` | +| A2A | `agentCard` with skills and endpoint | Single `invoke_` tool | Runtime `invoke_agent_runtime` | +| Custom | Free-form metadata | Passthrough tool | Returns record metadata | + +For A2A records, the runtime ARN or endpoint URL is extracted from the `agentCard.inlineContent` JSON (`url` or `runtimeArn` field). If not found there, falls back to the top-level record field. + +## How It Works + +On each request, the provider searches the Registry for tools matching your domains, injects only the relevant ones into the LLM context, and routes tool calls through the Gateway to upstream MCP servers. The LLM picks the right tools, gets responses, and synthesizes a final answer. + +## Example: Enterprise Customer Support Agent +An org has 50+ tools registered across teams — Salesforce, ServiceNow, knowledge bases, Databricks, SAP, and escalation agents. Instead of hardcoding all tools, the provider dynamically selects what's relevant per request: + +"Status of ticket INC-4821?" → finds ServiceNow tools, returns ticket status +"Last quarter's invoices for Acme Corp" → finds SAP + Salesforce tools, returns invoices with account context +"Escalate this, customer is upset" → finds the A2A escalation agent, creates a priority ticket +Same agent, same code, different tools per request. When a team adds a new tool to the Registry, the agent picks it up automatically — no code change, no redeploy. + +## Prerequisites + +### IAM Policy + +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "RegistryRecordManagement", + "Effect": "Allow", + "Action": [ + "bedrock-agentcore:CreateRegistryRecord", + "bedrock-agentcore:GetRegistryRecord", + "bedrock-agentcore:UpdateRegistryRecord", + "bedrock-agentcore:ListRegistryRecords", + "bedrock-agentcore:DeleteRegistryRecord" + ], + "Resource": [ + "arn:aws:bedrock-agentcore:REGION:ACCOUNT_ID:registry/*", + "arn:aws:bedrock-agentcore:REGION:ACCOUNT_ID:registry/*/record/*" + ] + }, + { + "Sid": "RegistryManagement", + "Effect": "Allow", + "Action": [ + "bedrock-agentcore:CreateRegistry", + "bedrock-agentcore:GetRegistry", + "bedrock-agentcore:ListRegistries" + ], + "Resource": "arn:aws:bedrock-agentcore:REGION:ACCOUNT_ID:*" + }, + { + "Sid": "OAuthTokenForSync", + "Effect": "Allow", + "Action": "bedrock-agentcore:GetResourceOauth2Token", + "Resource": "arn:aws:bedrock-agentcore:REGION:ACCOUNT_ID:token-vault/*/oauth2credentialprovider/*" + }, + { + "Sid": "IAMPassRoleForSync", + "Effect": "Allow", + "Action": "iam:PassRole", + "Resource": "arn:aws:iam::ACCOUNT_ID:role/*", + "Condition": { + "StringEquals": { + "iam:PassedToService": "bedrock-agentcore.amazonaws.com" + } + } + } + ] +} +``` + +## Key Benefits + +- **Dynamic Discovery**: Tools are resolved at runtime via semantic search — no hardcoded tool lists +- **Protocol Aware**: Handles MCP, A2A, and Custom records with the right invocation path for each +- **Scoped Context**: Only injects tools relevant to the current domain, keeping LLM context small and cost-effective +- **Zero Redeploy**: New tools added to the Registry are picked up automatically — no code changes needed +- **Production Ready**: Supports approval filtering, runtime ARN restrictions, caching, and configurable failure modes +- **Pluggable Auth**: Bring your own token function for Gateway authentication (Cognito, custom OAuth, etc.) + +## Getting Started + +The included notebook walks through the full setup end-to-end: + +1. Creating a Registry and seeding it with MCP and A2A records +2. Configuring the `RegistryToolProvider` with domains and Gateway auth +3. Running an agent that dynamically discovers and invokes tools +4. Testing with different queries to see domain-scoped tool selection in action + +## Next Steps + +- Add your own MCP servers and A2A agents to the Registry +- Tune domain keywords to match your agent's use cases +- Lock down with `required_status`, `allowed_runtime_arns`, and `fail_open` for production +- Shorten `cache_ttl` for environments where records change frequently diff --git a/01-tutorials/10-Agent-Registry/01-advanced/registry-tool-provider/example_tool_provider_agent.py b/01-tutorials/10-Agent-Registry/01-advanced/registry-tool-provider/example_tool_provider_agent.py new file mode 100644 index 000000000..d37034b6b --- /dev/null +++ b/01-tutorials/10-Agent-Registry/01-advanced/registry-tool-provider/example_tool_provider_agent.py @@ -0,0 +1,51 @@ +"""Example: Strands agent using RegistryToolProvider. + +The agent discovers tools dynamically from AWS Agent Registry +instead of hardcoding them at startup. +""" + +import boto3 +import json +from strands import Agent +from strands.models import BedrockModel +from registry_tool_provider import RegistryToolProvider + +# --- Config --- +REGISTRY_ID = "Vf4gtZ5mreKG" # Your registry ID +GATEWAY_URL = "https://gw-xxx.gateway.bedrock-agentcore.us-east-1.amazonaws.com/mcp" +REGION = "us-west-2" + + +def get_gateway_token() -> str: + """Get a Cognito token for the Gateway. Replace with your auth logic.""" + sm = boto3.client("secretsmanager", region_name="us-east-1") + creds = json.loads(sm.get_secret_value(SecretId="my-gateway-mcp-cognito-credentials")["SecretString"]) + import httpx + resp = httpx.post(f"https://{creds['domain']}/oauth2/token", data={ + "grant_type": "client_credentials", + "client_id": creds["client_id"], + "client_secret": creds["client_secret"], + "scope": creds["scope"], + }, headers={"Content-Type": "application/x-www-form-urlencoded"}) + return resp.json()["access_token"] + + +# --- Agent setup --- +provider = RegistryToolProvider( + registry_ids=[REGISTRY_ID], + domains=["weather", "database", "email"], # Semantic search keywords + gateway_url=GATEWAY_URL, + gateway_token_fn=get_gateway_token, + region=REGION, + cache_ttl=300, +) + +agent = Agent( + model=BedrockModel(model_id="us.anthropic.claude-sonnet-4-20250514"), + tool_providers=[provider], + system_prompt="You are a helpful assistant. Use the available tools to answer questions.", +) + +if __name__ == "__main__": + result = agent("What tools do you have available? List them.") + print(result) diff --git a/01-tutorials/10-Agent-Registry/01-advanced/registry-tool-provider/registry_tool_provider.py b/01-tutorials/10-Agent-Registry/01-advanced/registry-tool-provider/registry_tool_provider.py new file mode 100644 index 000000000..6333e66a1 --- /dev/null +++ b/01-tutorials/10-Agent-Registry/01-advanced/registry-tool-provider/registry_tool_provider.py @@ -0,0 +1,320 @@ +"""AWS Agent Registry ToolProvider for Strands Agents. + +Dynamically discovers and loads tools from AWS Agent Registry via semantic search. +Instead of hardcoding tools at startup, the agent gets only the tools relevant +to the current domain — bounded context, automatic, no LLM decision needed. + +Usage: + from strands import Agent + from registry_tool_provider import RegistryToolProvider + + provider = RegistryToolProvider( + registry_ids=["Vf4gtZ5mreKG"], + domains=["order management", "CRM"], + gateway_url="https://gw-xxx.gateway.bedrock-agentcore.us-east-1.amazonaws.com/mcp", + gateway_token_fn=lambda: get_my_token(), + ) + agent = Agent(tool_providers=[provider]) + agent("Check my open orders and update the CRM") + +Security notes: + - Only APPROVED records are loaded by default (required_status parameter) + - Tool names are validated against a strict regex + - Tool descriptions are truncated to prevent prompt injection surface + - Gateway and endpoint URLs must be HTTPS + - Runtime ARNs can be restricted via allowed_runtime_arns allowlist + - JSON payloads from Registry are size-bounded before parsing +""" + +import json +import re +import time +import logging +import uuid +from typing import Any, Callable, Sequence + +import boto3 +from strands.tools.tool_provider import ToolProvider +from strands.tools.tools import PythonAgentTool + +logger = logging.getLogger(__name__) + +_TOOL_NAME_RE = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9_-]{0,127}$") +_MAX_DESCRIPTION_LEN = 256 +_MAX_SCHEMA_BYTES = 1_048_576 # 1 MB + + +def _validate_https(url: str, label: str) -> None: + if url and not url.startswith("https://"): + raise ValueError(f"{label} must use HTTPS, got: {url[:40]}") + + +def _safe_json_loads(raw: str, label: str) -> Any: + if len(raw) > _MAX_SCHEMA_BYTES: + logger.warning("Rejecting oversized %s payload (%d bytes)", label, len(raw)) + return None + try: + return json.loads(raw) + except (json.JSONDecodeError, TypeError): + return None + + +def _sanitize_description(desc: str) -> str: + # Strip control characters and truncate + cleaned = re.sub(r"[\x00-\x1f\x7f-\x9f]", "", desc) + return cleaned[:_MAX_DESCRIPTION_LEN] + + +class RegistryToolProvider(ToolProvider): + """Discovers tools from AWS Agent Registry via semantic search. + + Args: + registry_ids: Registry identifiers to search across. + domains: Domain keywords for semantic search. + gateway_url: Gateway URL for MCP tool invocation (must be HTTPS). + gateway_token_fn: Callable returning a Bearer token for the Gateway. + region: AWS region for Registry and Runtime API calls. + endpoint_url: Custom Registry endpoint (must be HTTPS). + max_results: Max records per search query. + cache_ttl: Seconds to cache results. 0 disables caching. + required_status: Only load records with this status. None disables filtering. + allowed_runtime_arns: Allowlist of Runtime ARNs for A2A invocation. None allows all. + fail_open: If True (default), search failures return empty. If False, raise. + """ + + def __init__( + self, + registry_ids: list[str], + domains: list[str], + gateway_url: str | None = None, + gateway_token_fn: Callable[[], str] | None = None, + region: str = "us-west-2", + endpoint_url: str | None = None, + max_results: int = 10, + cache_ttl: int = 300, + required_status: str | None = "APPROVED", + allowed_runtime_arns: list[str] | None = None, + fail_open: bool = True, + ): + _validate_https(gateway_url or "", "gateway_url") + _validate_https(endpoint_url or "", "endpoint_url") + + self._registry_ids = registry_ids + self._domains = domains + self._gateway_url = gateway_url + self._gateway_token_fn = gateway_token_fn + self._region = region + self._max_results = max_results + self._cache_ttl = cache_ttl + self._required_status = required_status + self._allowed_runtime_arns = set(allowed_runtime_arns) if allowed_runtime_arns else None + self._fail_open = fail_open + self._cache: list[PythonAgentTool] = [] + self._cache_ts: float = 0 + self._consumers: set = set() + + kw = {"region_name": region} + if endpoint_url: + kw["endpoint_url"] = endpoint_url + self._dp = boto3.client("bedrock-agentcore", **kw) + + # --- ToolProvider interface --- + + async def load_tools(self, **kwargs: Any) -> Sequence[PythonAgentTool]: + if self._cache_ttl and (time.time() - self._cache_ts) < self._cache_ttl and self._cache: + return self._cache + + seen: dict[str, PythonAgentTool] = {} + for domain in self._domains: + for record in self._search(domain): + if self._required_status and record.get("status") != self._required_status: + continue + for tool in self._record_to_tools(record): + seen[tool.tool_name] = tool + + self._cache = list(seen.values()) + self._cache_ts = time.time() + logger.info("RegistryToolProvider: loaded %d tools from %d domain(s)", len(self._cache), len(self._domains)) + return self._cache + + def add_consumer(self, consumer_id: Any, **kwargs: Any) -> None: + self._consumers.add(consumer_id) + + def remove_consumer(self, consumer_id: Any, **kwargs: Any) -> None: + self._consumers.discard(consumer_id) + if not self._consumers: + self._cache.clear() + + # --- Registry search --- + + def _search(self, query: str) -> list[dict]: + try: + resp = self._dp.search_registry_records( + registryIds=self._registry_ids, + searchQuery=query, + maxResults=self._max_results, + ) + return resp.get("registryRecords", []) + except Exception as e: + if not self._fail_open: + raise + logger.warning("Registry search failed for '%s': %s", query, e) + return [] + + # --- Record → Tool conversion --- + + def _record_to_tools(self, record: dict) -> list[PythonAgentTool]: + protocol = record.get("descriptorType", "") + descriptors = record.get("descriptors", {}) + name = record.get("name", "unknown") + + if protocol == "MCP": + return self._mcp_tools(name, descriptors) + elif protocol == "A2A": + return self._a2a_tool(name, record, descriptors) + return self._custom_tool(name, record) + + def _mcp_tools(self, record_name: str, descriptors: dict) -> list[PythonAgentTool]: + raw = descriptors.get("mcp", {}).get("tools", {}).get("inlineContent", "") + if not raw: + return [] + + defs = _safe_json_loads(raw, f"tools:{record_name}") + if defs is None: + return [] + if isinstance(defs, dict): + defs = defs.get("tools", []) + + tools = [] + for td in defs: + tool_name = td.get("name", "") + if not _TOOL_NAME_RE.match(tool_name): + logger.warning("Skipping invalid tool name: %s", tool_name[:50]) + continue + + spec = { + "name": tool_name, + "description": _sanitize_description(td.get("description", f"Tool from {record_name}")), + "inputSchema": {"json": td.get("inputSchema", {"type": "object", "properties": {}})}, + } + + gw, tk = self._gateway_url, self._gateway_token_fn + + def make_fn(n=tool_name, g=gw, t=tk): + def fn(tool, **kwargs): + return _call_gateway(g, t, n, kwargs) + return fn + + tools.append(PythonAgentTool(tool_name=tool_name, tool_spec=spec, tool_func=make_fn())) + return tools + + def _a2a_tool(self, record_name: str, record: dict, descriptors: dict) -> list[PythonAgentTool]: + card_raw = descriptors.get("a2a", {}).get("agentCard", {}).get("inlineContent", "") + card = _safe_json_loads(card_raw, f"agentCard:{record_name}") if card_raw else None + + # Extract endpoint URL from agent card (url field per A2A spec) + arn = "" + if card: + arn = card.get("url", "") or card.get("runtimeArn", "") + # Fallback to top-level record field + if not arn: + arn = record.get("runtimeArn", "") + if not arn: + logger.warning("No runtime ARN or endpoint found for A2A record: %s", record_name) + return [] + + if self._allowed_runtime_arns is not None and arn not in self._allowed_runtime_arns: + logger.warning("Runtime ARN not in allowlist, skipping: %s", arn[:80]) + return [] + + safe = re.sub(r"[^a-zA-Z0-9_]", "_", record_name) + desc = _sanitize_description(record.get("description", f"Invoke A2A agent: {record_name}")) + + if card: + skills = card.get("skills", []) + if skills: + skill_names = ", ".join(s.get("name", "") for s in skills[:5]) + desc = _sanitize_description(f"{desc}. Skills: {skill_names}") + + spec = { + "name": f"invoke_{safe}", + "description": desc, + "inputSchema": {"json": { + "type": "object", + "properties": {"message": {"type": "string", "description": "Message to send to the agent"}}, + "required": ["message"], + }}, + } + + region = self._region + + def fn(tool, message: str = "", **kwargs): + return _call_runtime(region, arn, message) + + return [PythonAgentTool(tool_name=f"invoke_{safe}", tool_spec=spec, tool_func=fn)] + + def _custom_tool(self, record_name: str, record: dict) -> list[PythonAgentTool]: + safe = re.sub(r"[^a-zA-Z0-9_]", "_", record_name) + if not _TOOL_NAME_RE.match(safe): + return [] + + spec = { + "name": safe, + "description": _sanitize_description(record.get("description", f"Custom resource: {record_name}")), + "inputSchema": {"json": { + "type": "object", + "properties": {"input": {"type": "string"}}, + }}, + } + + def fn(tool, **kwargs): + return json.dumps({"status": "custom_protocol", "record": record_name}) + + return [PythonAgentTool(tool_name=safe, tool_spec=spec, tool_func=fn)] + + +# --- Invocation helpers --- + +def _call_gateway(gateway_url: str, token_fn: Callable | None, tool_name: str, arguments: dict) -> str: + if not gateway_url: + return json.dumps({"error": "No gateway_url configured"}) + + import httpx + try: + token = token_fn() if callable(token_fn) else "" + resp = httpx.post( + gateway_url, + headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"}, + json={"jsonrpc": "2.0", "id": "1", "method": "tools/call", + "params": {"name": tool_name, "arguments": arguments}}, + timeout=120, + ) + except Exception as e: + # Never leak token in exception traces + return json.dumps({"error": f"Gateway call failed: {type(e).__name__}"}) + + body = resp.json() + if "error" in body: + return json.dumps(body["error"]) + content = body.get("result", {}).get("content", []) + return content[0].get("text", json.dumps(content)) if content else json.dumps(body) + + +def _call_runtime(region: str, runtime_arn: str, message: str) -> str: + if not runtime_arn: + return json.dumps({"error": "No runtimeArn in registry record"}) + + try: + client = boto3.client("bedrock-agentcore", region_name=region) + resp = client.invoke_agent_runtime( + agentRuntimeArn=runtime_arn, + sessionId=f"registry-{uuid.uuid4().hex[:12]}", + payload=json.dumps({"messages": [{"role": "user", "content": [{"text": message}]}]}), + ) + chunks = [] + for event in resp.get("body", []): + if "chunk" in event: + chunks.append(event["chunk"].get("bytes", b"").decode()) + return "".join(chunks) or json.dumps({"status": "no_response"}) + except Exception as e: + return json.dumps({"error": f"Runtime invocation failed: {type(e).__name__}"}) diff --git a/01-tutorials/10-Agent-Registry/01-advanced/registry-tool-provider/requirements.txt b/01-tutorials/10-Agent-Registry/01-advanced/registry-tool-provider/requirements.txt new file mode 100644 index 000000000..263a4903e --- /dev/null +++ b/01-tutorials/10-Agent-Registry/01-advanced/registry-tool-provider/requirements.txt @@ -0,0 +1,4 @@ +boto3>=1.42.87 +strands-agents +httpx +python-dotenv diff --git a/01-tutorials/10-Agent-Registry/01-advanced/registry-tool-provider/tool_provider.ipynb b/01-tutorials/10-Agent-Registry/01-advanced/registry-tool-provider/tool_provider.ipynb new file mode 100644 index 000000000..4c5aaf9a4 --- /dev/null +++ b/01-tutorials/10-Agent-Registry/01-advanced/registry-tool-provider/tool_provider.ipynb @@ -0,0 +1,422 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# RegistryToolProvider — Dynamic Tool Discovery for Strands Agents\n", + "\n", + "## Overview\n", + "\n", + "This notebook demonstrates the **RegistryToolProvider** — a Strands `ToolProvider` that discovers tools\n", + "from AWS Agent Registry via semantic search and injects them into the agent's LLM context automatically.\n", + "\n", + "### The Problem\n", + "\n", + "Every tool given to an LLM costs ~200-500 tokens. An agent with 200 tools burns 40K-100K tokens of context\n", + "before the user even asks a question — expensive, slow, and the model gets confused.\n", + "\n", + "### The Solution\n", + "\n", + "The `RegistryToolProvider` searches the Registry with domain keywords before each LLM turn and returns\n", + "only the relevant tools. The agent sees 5-10 tools instead of 200.\n", + "\n", + "![image](Images/image.png)\n", + "\n", + "## What You'll Build\n", + "\n", + "1. **Create a Registry** and register 3 public MCP servers via URL synchronization (AWS, Cloudflare, Astro)\n", + "2. **Use the RegistryToolProvider** to dynamically discover tools via semantic search\n", + "3. **Test domain scoping** — narrow (single server) vs broad (all servers)\n", + "4. **Test caching and security controls**\n", + "5. **Wire it into a Strands agent** for end-to-end dynamic tool usage\n", + "6. **Clean up** all resources\n", + "\n", + "## Prerequisites\n", + "\n", + "- AWS credentials configured\n", + "- `boto3>=1.42.87` (for URL sync support)\n", + "- `strands-agents`, `httpx`\n", + "\n", + "---\n", + "\n", + "| Information | Details |\n", + "|:---|:---|\n", + "| Tutorial type | Interactive |\n", + "| AgentCore components | Agent Registry, RegistryToolProvider |\n", + "| Auth method | IAM |\n", + "| Record creation | URL Synchronization (no deployment needed) |\n", + "| Example complexity | Intermediate |\n", + "| SDK used | boto3 (>=1.42.87), strands-agents |" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Setup" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "!pip install -q \"boto3>=1.42.87\" strands-agents httpx python-dotenv" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import boto3\n", + "import json\n", + "import time\n", + "import sys, os\n", + "sys.path.insert(0, os.path.abspath('.'))\n", + "\n", + "from utils import (\n", + " create_registry, seed, wait_for_search_index,\n", + " search, delete_registry, get_cp_client, get_dp_client, REGION,\n", + ")\n", + "\n", + "print(f\"Region: {REGION}\")\n", + "print(f\"boto3: {boto3.__version__}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "## Step 1: Create Registry and Seed MCP Records\n", + "\n", + "We seed 3 inline MCP records with distinct domains (weather, orders, inventory)\n", + "plus public MCP servers via URL sync (AWS Knowledge, Find-A-Domain, Peek).\n", + "The inline records have 2-3 tools each — small and focused, ideal for demonstrating domain scoping.\n", + "\n", + "- **weather-tools** — `get_current_weather`, `get_weather_forecast`\n", + "- **order-management-tools** — `get_order_status`, `list_orders`, `create_order`\n", + "- **inventory-tools** — `check_inventory`, `search_products`\n", + "- **aws-knowledge-mcp** — (URL sync) AWS documentation search\n", + "- **find-a-domain-mcp** — (URL sync) domain name availability\n", + "- **peek-mcp** — (URL sync) travel activities and tours" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "registry = create_registry(\n", + " name=\"tool-provider-demo\",\n", + " description=\"Registry for RegistryToolProvider demo — inline MCP records + URL sync\",\n", + ")\n", + "REGISTRY_ID = registry[\"registryId\"]\n", + "REGISTRY_ARN = registry[\"registryArn\"]\n", + "print(f\"Registry ID: {REGISTRY_ID}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Register public MCP servers via URL sync, wait for sync, and approve\n", + "seeded = seed(REGISTRY_ID)\n", + "\n", + "print(f\"\\nSeeded {len(seeded)} records:\")\n", + "total_tools = 0\n", + "for r in seeded:\n", + " print(f\" 📦 {r['name']} — {r['tool_count']} tools\")\n", + " total_tools += r['tool_count']\n", + "print(f\"\\nTotal tools: {total_tools}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Wait for search index to propagate\n", + "wait_for_search_index(REGISTRY_ID, expected_count=len(seeded))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "## Step 2: RegistryToolProvider — Dynamic Tool Discovery\n", + "\n", + "The `RegistryToolProvider` searches the Registry with domain keywords\n", + "and returns only the relevant tools. Different keywords = different tools = bounded LLM context." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from registry_tool_provider import RegistryToolProvider\n", + "import logging\n", + "logging.basicConfig(level=logging.INFO)\n", + "\n", + "DP_ENDPOINT = f\"https://bedrock-agentcore.{REGION}.amazonaws.com\"\n", + "\n", + "provider = RegistryToolProvider(\n", + " registry_ids=[REGISTRY_ID],\n", + " domains=[\"weather forecast\", \"order tracking\", \"inventory stock\"],\n", + " region=REGION,\n", + " endpoint_url=DP_ENDPOINT,\n", + " cache_ttl=0,\n", + " required_status=\"APPROVED\",\n", + ")\n", + "\n", + "tools = await provider.load_tools()\n", + "\n", + "print(f\"🔍 Discovered {len(tools)} tools from {len(provider._domains)} domain(s):\\n\")\n", + "for t in tools:\n", + " print(f\" 🔧 {t.tool_name}\")\n", + " print(f\" {t.tool_spec['description'][:100]}\")\n", + " print()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Domain scoping — narrow vs broad\n", + "\n", + "This is the core value: different keywords = different tools = bounded context." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Narrow: only weather\n", + "narrow = RegistryToolProvider(\n", + " registry_ids=[REGISTRY_ID], domains=[\"weather forecast\"],\n", + " region=REGION, endpoint_url=DP_ENDPOINT, cache_ttl=0, required_status=\"APPROVED\",\n", + ")\n", + "narrow_tools = await narrow.load_tools()\n", + "print(f\"Narrow ('weather forecast'): {len(narrow_tools)} tools\")\n", + "for t in narrow_tools: print(f\" - {t.tool_name}\")\n", + "\n", + "print()\n", + "\n", + "# Medium: orders only\n", + "medium = RegistryToolProvider(\n", + " registry_ids=[REGISTRY_ID], domains=[\"order tracking shipping\"],\n", + " region=REGION, endpoint_url=DP_ENDPOINT, cache_ttl=0, required_status=\"APPROVED\",\n", + ")\n", + "medium_tools = await medium.load_tools()\n", + "print(f\"Medium ('order tracking shipping'): {len(medium_tools)} tools\")\n", + "for t in medium_tools: print(f\" - {t.tool_name}\")\n", + "\n", + "print()\n", + "\n", + "# Broad: all domains\n", + "broad = RegistryToolProvider(\n", + " registry_ids=[REGISTRY_ID],\n", + " domains=[\"weather\", \"orders\", \"inventory\", \"shipping\", \"stock levels\"],\n", + " region=REGION, endpoint_url=DP_ENDPOINT, cache_ttl=0, required_status=\"APPROVED\",\n", + ")\n", + "broad_tools = await broad.load_tools()\n", + "print(f\"Broad (5 domains): {len(broad_tools)} tools\")\n", + "for t in broad_tools: print(f\" - {t.tool_name}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Caching and consumer lifecycle" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "provider._cache_ttl = 300\n", + "provider._cache = []\n", + "provider._cache_ts = 0\n", + "\n", + "start = time.time()\n", + "tools1 = await provider.load_tools()\n", + "print(f\"First call: {len(tools1)} tools in {(time.time()-start)*1000:.0f}ms (API call)\")\n", + "\n", + "start = time.time()\n", + "tools2 = await provider.load_tools()\n", + "print(f\"Second call: {len(tools2)} tools in {(time.time()-start)*1000:.1f}ms (cache hit)\")\n", + "\n", + "provider.add_consumer(\"agent-1\")\n", + "provider.add_consumer(\"agent-2\")\n", + "print(f\"\\nConsumers: 2 — cache alive: {bool(provider._cache)}\")\n", + "provider.remove_consumer(\"agent-1\")\n", + "print(f\"Consumers: 1 — cache alive: {bool(provider._cache)}\")\n", + "provider.remove_consumer(\"agent-2\")\n", + "print(f\"Consumers: 0 — cache alive: {bool(provider._cache)} (cleared!)\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Security controls" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# HTTPS enforcement\n", + "try:\n", + " RegistryToolProvider(registry_ids=[\"x\"], domains=[\"x\"], gateway_url=\"http://not-secure.com/mcp\")\n", + " print(\"❌ HTTP should have been rejected\")\n", + "except ValueError as e:\n", + " print(f\"✅ HTTPS enforcement: {e}\")\n", + "\n", + "# APPROVED-only filtering\n", + "draft_provider = RegistryToolProvider(\n", + " registry_ids=[REGISTRY_ID], domains=[\"weather\"],\n", + " region=REGION, endpoint_url=DP_ENDPOINT, cache_ttl=0, required_status=\"DRAFT\",\n", + ")\n", + "draft_tools = await draft_provider.load_tools()\n", + "print(f\"✅ Status filtering: APPROVED={len(narrow_tools)} tools, DRAFT={len(draft_tools)} tools\")\n", + "\n", + "# Runtime ARN allowlist\n", + "restricted = RegistryToolProvider(\n", + " registry_ids=[REGISTRY_ID], domains=[\"weather\"],\n", + " region=REGION, endpoint_url=DP_ENDPOINT, cache_ttl=0,\n", + " allowed_runtime_arns=[\"arn:aws:bedrock-agentcore:us-east-1:123:runtime/only-this-one\"],\n", + ")\n", + "print(\"✅ Runtime ARN allowlist configured\")\n", + "\n", + "print(\"\\n✅ All security controls working\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "## Step 3: Wire Into a Strands Agent\n", + "\n", + "End-to-end: the agent discovers tools from Registry automatically.\n", + "\n", + "**Note:** Tool invocation requires a Gateway URL and token. Without them, the agent\n", + "discovers tools but calls return an error. The discovery itself is the demo." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from strands import Agent\n", + "from strands.models import BedrockModel\n", + "\n", + "agent_provider = RegistryToolProvider(\n", + " registry_ids=[REGISTRY_ID],\n", + " domains=[\"weather\", \"orders\", \"inventory\"],\n", + " region=REGION,\n", + " endpoint_url=DP_ENDPOINT,\n", + " cache_ttl=300,\n", + " required_status=\"APPROVED\",\n", + ")\n", + "\n", + "discovered_tools = await agent_provider.load_tools()\n", + "print(f\"Discovered {len(discovered_tools)} tools from Registry\")\n", + "\n", + "agent = Agent(\n", + " model=BedrockModel(model_id=\"us.anthropic.claude-sonnet-4-20250514-v1:0\", region_name=\"us-east-1\"),\n", + " tools=list(discovered_tools),\n", + " system_prompt=\"You are a helpful assistant. Use the available tools to answer questions. \"\n", + " \"If a tool call fails, explain what happened.\",\n", + ")\n", + "\n", + "print(f\"✅ Agent created with {len(discovered_tools)} tools from Registry\")\n", + "result = agent(\"What tools do you have available? List them with a brief description of each.\")\n", + "print(result)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "## Step 4: Clean Up" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "delete_registry(REGISTRY_ID)\n", + "print(\"\\n✅ All resources cleaned up\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "## Summary\n", + "\n", + "| What we tested | Result |\n", + "|---|---|\n", + "| Registry creation | ✅ IAM auth, no OAuth needed |\n", + "| URL sync from public MCP servers | ✅ Auto-populated server + tool schemas |\n", + "| Tool discovery via RegistryToolProvider | ✅ Semantic search returns relevant tools |\n", + "| Domain scoping | ✅ Different keywords = different tool sets |\n", + "| Caching | ✅ First call ~500ms, cached calls <1ms |\n", + "| Security controls | ✅ HTTPS enforcement, APPROVED-only filtering |\n", + "| Strands agent integration | ✅ Agent discovers tools automatically |" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.14.2" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/01-tutorials/10-Agent-Registry/01-advanced/registry-tool-provider/utils.py b/01-tutorials/10-Agent-Registry/01-advanced/registry-tool-provider/utils.py new file mode 100644 index 000000000..1dcf2bb6a --- /dev/null +++ b/01-tutorials/10-Agent-Registry/01-advanced/registry-tool-provider/utils.py @@ -0,0 +1,241 @@ +"""Utilities for AWS Agent Registry: create, seed, search, and cleanup. + +Seeds the registry with inline MCP records (small, focused tool sets) plus +a public MCP server via URL sync. Requires boto3>=1.42.87. +""" + +import boto3 +import json +import time +import logging +import os +from pathlib import Path +from dotenv import load_dotenv + +load_dotenv(Path(__file__).parent / ".env") +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") +logger = logging.getLogger(__name__) + +REGION = os.getenv("AWS_REGION", "us-west-2") + + +def get_session(): + return boto3.Session(region_name=REGION) + + +def get_cp_client(): + return get_session().client("bedrock-agentcore-control", region_name=REGION) + + +def get_dp_client(): + return get_session().client("bedrock-agentcore", region_name=REGION) + + +# ── Registry ────────────────────────────────────────────────────────────────── + +def create_registry(name="tool-provider-registry", description="Registry for RegistryToolProvider demo"): + cp = get_cp_client() + logger.info("Creating registry '%s'...", name) + resp = cp.create_registry( + name=name, description=description, + approvalConfiguration={"autoApproval": False}, + ) + registry_arn = resp["registryArn"] + registry_id = registry_arn.split("/")[-1] + + for _ in range(30): + time.sleep(5) + status = cp.get_registry(registryId=registry_id).get("status", "UNKNOWN") + logger.info(" status: %s", status) + if status == "READY": + break + logger.info("✅ Registry ready: %s", registry_id) + return {"registryId": registry_id, "registryArn": registry_arn} + + +# ── Inline MCP + URL Sync Records ──────────────────────────────────────────── + +INLINE_RECORDS = [ + { + "name": "weather_tools_mcp", + "description": "Weather MCP server — current conditions and multi-day forecasts for any city worldwide", + "server_info": { + "name": "io.example/weather-tools", + "description": "MCP server for weather data", + "version": "1.0.0", + "packages": [{"registryType": "npm", "identifier": "@example/weather-mcp", "version": "1.0.0", "transport": {"type": "stdio"}}], + }, + "tools": [ + {"name": "get_current_weather", "description": "Get current weather conditions for a city including temperature, humidity, and wind speed", + "inputSchema": {"type": "object", "properties": {"city": {"type": "string", "description": "City name (e.g. Seattle)"}, "units": {"type": "string", "enum": ["celsius", "fahrenheit"]}}, "required": ["city"]}}, + {"name": "get_weather_forecast", "description": "Get a multi-day weather forecast for a city", + "inputSchema": {"type": "object", "properties": {"city": {"type": "string"}, "days": {"type": "integer", "description": "Number of forecast days (1-5)"}}, "required": ["city"]}}, + ], + }, + { + "name": "order_management_mcp", + "description": "Order management MCP server — track order status, list orders, and create new e-commerce orders", + "server_info": { + "name": "io.example/order-management", + "description": "MCP server for order tracking and management", + "version": "1.0.0", + "packages": [{"registryType": "npm", "identifier": "@example/order-mcp", "version": "1.0.0", "transport": {"type": "stdio"}}], + }, + "tools": [ + {"name": "get_order_status", "description": "Get the current status and details of an order by order ID", + "inputSchema": {"type": "object", "properties": {"order_id": {"type": "string", "description": "The order identifier"}}, "required": ["order_id"]}}, + {"name": "list_orders", "description": "List orders optionally filtered by status (pending, shipped, delivered)", + "inputSchema": {"type": "object", "properties": {"status": {"type": "string", "enum": ["pending", "shipped", "delivered", "ALL"]}}}}, + {"name": "create_order", "description": "Create a new order for a customer with product and quantity", + "inputSchema": {"type": "object", "properties": {"customer_name": {"type": "string"}, "product": {"type": "string"}, "quantity": {"type": "integer"}}, "required": ["customer_name", "product", "quantity"]}}, + ], + }, + { + "name": "inventory_tools_mcp", + "description": "Inventory MCP server — check real-time stock levels across warehouses and search products by name or category", + "server_info": { + "name": "io.example/inventory-tools", + "description": "MCP server for product inventory and stock levels", + "version": "1.0.0", + "packages": [{"registryType": "npm", "identifier": "@example/inventory-mcp", "version": "1.0.0", "transport": {"type": "stdio"}}], + }, + "tools": [ + {"name": "check_inventory", "description": "Check stock levels for a product SKU across warehouses", + "inputSchema": {"type": "object", "properties": {"sku": {"type": "string", "description": "Product SKU"}, "warehouse": {"type": "string", "description": "Warehouse ID (optional)"}}, "required": ["sku"]}}, + {"name": "search_products", "description": "Search products by name or category", + "inputSchema": {"type": "object", "properties": {"query": {"type": "string", "description": "Product name or keyword"}, "category": {"type": "string"}}, "required": ["query"]}}, + ], + }, +] + +URL_SYNC_SERVER = { + "name": "aws_knowledge_mcp", + "url": "https://knowledge-mcp.global.api.aws", + "description": "AWS Knowledge MCP — search AWS documentation, guides, and best practices", +} + + +def _wait_for_draft(cp, registry_id, record_id, timeout=120): + """Poll until record reaches DRAFT (or terminal) status.""" + for _ in range(timeout // 5): + r = cp.get_registry_record(registryId=registry_id, recordId=record_id) + status = r.get("status", "UNKNOWN") + if status in ("DRAFT", "APPROVED", "CREATE_FAILED", "PENDING_APPROVAL"): + return r + time.sleep(5) + return r + + +def _approve(cp, registry_id, record_id, name=""): + """Submit for approval and approve.""" + cp.submit_registry_record_for_approval(registryId=registry_id, recordId=record_id) + cp.update_registry_record_status( + registryId=registry_id, recordId=record_id, + status="APPROVED", statusReason="Auto-approved for demo", + ) + logger.info(" ✅ %s approved", name) + + +def seed(registry_id, include_url_sync=True): + """Seed registry with inline MCP records + optional URL sync. + + Returns list of {name, recordId, tool_count} dicts. + """ + cp = get_cp_client() + results = [] + + # 1. Inline MCP records — uses the same pattern as getting-started-registry-end-to-end + for rec in INLINE_RECORDS: + logger.info("Creating '%s' (MCP, %d tools)...", rec["name"], len(rec["tools"])) + try: + resp = cp.create_registry_record( + registryId=registry_id, + name=rec["name"], + description=rec["description"], + descriptorType="MCP", + descriptors={"mcp": { + "server": {"inlineContent": json.dumps(rec["server_info"])}, + "tools": {"inlineContent": json.dumps({"tools": rec["tools"]})}, + }}, + recordVersion="1.0", + ) + record_id = resp["recordArn"].split("/")[-1] + logger.info(" Created: %s", record_id) + except Exception as e: + logger.error(" Failed: %s", e) + continue + + r = _wait_for_draft(cp, registry_id, record_id) + if r.get("status") == "CREATE_FAILED": + logger.error(" ❌ %s failed: %s", rec["name"], r.get("statusReason", "")) + continue + + _approve(cp, registry_id, record_id, rec["name"]) + results.append({"name": rec["name"], "recordId": record_id, "tool_count": len(rec["tools"])}) + + # 2. URL sync (AWS Knowledge MCP — may be rate-limited) + if include_url_sync: + server = URL_SYNC_SERVER + logger.info("Registering '%s' via URL sync: %s", server["name"], server["url"]) + try: + resp = cp.create_registry_record( + registryId=registry_id, + name=server["name"], + description=server.get("description", ""), + descriptorType="MCP", + synchronizationType="URL", + synchronizationConfiguration={"fromUrl": {"url": server["url"]}}, + ) + record_id = resp["recordArn"].split("/")[-1] + r = _wait_for_draft(cp, registry_id, record_id) + if r.get("status") == "CREATE_FAILED": + logger.warning(" ⚠️ URL sync failed: %s — skipping", r.get("statusReason", "")) + cp.delete_registry_record(registryId=registry_id, recordId=record_id) + else: + tools = [] + try: + tools = json.loads(r["descriptors"]["mcp"]["tools"]["inlineContent"]).get("tools", []) + except Exception: + pass + _approve(cp, registry_id, record_id, server["name"]) + results.append({"name": server["name"], "recordId": record_id, "tool_count": len(tools)}) + except Exception as e: + logger.warning(" ⚠️ URL sync skipped: %s", e) + + return results + + +def wait_for_search_index(registry_id, expected_count, query="weather orders inventory documentation", max_wait=120): + dp = get_dp_client() + logger.info("Waiting for search index (%d records)...", expected_count) + for _ in range(max_wait // 10): + time.sleep(10) + resp = dp.search_registry_records( + registryIds=[registry_id], searchQuery=query, maxResults=10, + ) + found = len(resp.get("registryRecords", [])) + if found >= expected_count: + logger.info(" All %d records indexed.", found) + return + logger.info(" %d/%d indexed — waiting...", found, expected_count) + + +def search(query, registry_id, max_results=10): + dp = get_dp_client() + resp = dp.search_registry_records( + registryIds=[registry_id], searchQuery=query, maxResults=max_results, + ) + return resp.get("registryRecords", []) + + +def delete_registry(registry_id): + cp = get_cp_client() + records = cp.list_registry_records(registryId=registry_id).get("registryRecords", []) + for rec in records: + rid = rec["recordId"] + logger.info("Deleting record %s (%s)...", rec.get("name", ""), rid) + cp.delete_registry_record(registryId=registry_id, recordId=rid) + time.sleep(5) + logger.info("Deleting registry %s...", registry_id) + cp.delete_registry(registryId=registry_id) + logger.info("✅ Registry deleted") diff --git a/01-tutorials/10-Agent-Registry/01-advanced/url-synchronization/README.md b/01-tutorials/10-Agent-Registry/01-advanced/url-synchronization/README.md new file mode 100644 index 000000000..196a8b44d --- /dev/null +++ b/01-tutorials/10-Agent-Registry/01-advanced/url-synchronization/README.md @@ -0,0 +1,99 @@ +# Synchronize MCP and A2A Server Metadata with Registry + +Auto-populate registry records by pointing to an MCP server or A2A agent URL. The registry connects to the endpoint, fetches metadata (tools, server info, agent card), and populates the record — no manual entry needed. + +![image](image/image.png) + +## Use Cases + +Catalog MCP servers and A2A agents by pointing to their URLs — the registry auto-imports tools, schemas, and metadata. Keep records in sync on re-trigger, with support for public, OAuth-protected, and IAM-protected endpoints. + +## Prerequisites + +### IAM Policy + +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "RegistryRecordManagement", + "Effect": "Allow", + "Action": [ + "bedrock-agentcore:CreateRegistryRecord", + "bedrock-agentcore:GetRegistryRecord", + "bedrock-agentcore:UpdateRegistryRecord", + "bedrock-agentcore:ListRegistryRecords", + "bedrock-agentcore:DeleteRegistryRecord" + ], + "Resource": [ + "arn:aws:bedrock-agentcore:REGION:ACCOUNT_ID:registry/*", + "arn:aws:bedrock-agentcore:REGION:ACCOUNT_ID:registry/*/record/*" + ] + }, + { + "Sid": "RegistryManagement", + "Effect": "Allow", + "Action": [ + "bedrock-agentcore:CreateRegistry", + "bedrock-agentcore:GetRegistry", + "bedrock-agentcore:ListRegistries" + ], + "Resource": "arn:aws:bedrock-agentcore:REGION:ACCOUNT_ID:*" + }, + { + "Sid": "OAuthTokenForSync", + "Effect": "Allow", + "Action": "bedrock-agentcore:GetResourceOauth2Token", + "Resource": "arn:aws:bedrock-agentcore:REGION:ACCOUNT_ID:token-vault/*/oauth2credentialprovider/*" + }, + { + "Sid": "IAMPassRoleForSync", + "Effect": "Allow", + "Action": "iam:PassRole", + "Resource": "arn:aws:iam::ACCOUNT_ID:role/*", + "Condition": { + "StringEquals": { + "iam:PassedToService": "bedrock-agentcore.amazonaws.com" + } + } + } + ] +} +``` + +### New registry required + +Registries created before the URL sync feature was deployed lack the workload identity needed for credential resolution. **Create a new registry** to use OAuth or IAM sync. + +## Tutorial Examples + +[`url_synchronization.ipynb`](url_synchronization.ipynb): Interactive notebook covering all sync scenarios end-to-end: public MCP, A2A agent cards, OAuth-protected servers, re-sync, failure handling, and governance workflow + + +## Key Benefits + +- **Zero Manual Entry**: Point to a URL and the registry auto-populates tools, schemas, server info, and agent cards +- **Multi-Protocol**: Supports both MCP servers and A2A agent cards +- **Auth Flexible**: Works with public endpoints, OAuth-protected gateways (Cognito), and IAM-protected gateways (SigV4) +- **Keep In Sync**: Re-trigger sync anytime to refresh records when upstream servers change +- **Governance Built-In**: Synced records follow the same approval workflow as manually created ones + +## Getting Started + +The included notebook walks through the full setup end-to-end: + +1. Creating a new registry with workload identity support +2. Syncing a public MCP server (no auth) +3. Syncing an A2A agent card +4. Syncing an OAuth-protected MCP server via Cognito + AgentCore Gateway +5. Updating and re-syncing an existing record +6. Handling failure cases and inspecting error reasons +7. Walking through the full record lifecycle (Publisher → Approver → Consumer) + +## Next Steps + +- Sync your own MCP servers and A2A agents into the Registry +- Set up OAuth or IAM credential providers for protected endpoints +- Integrate with the [RegistryToolProvider](../registry-tool-provider) for dynamic tool discovery at runtime +- Use `triggerSynchronization` to keep records up to date as upstream servers evolve diff --git a/01-tutorials/10-Agent-Registry/01-advanced/url-synchronization/image/image.png b/01-tutorials/10-Agent-Registry/01-advanced/url-synchronization/image/image.png new file mode 100644 index 000000000..523e245f2 Binary files /dev/null and b/01-tutorials/10-Agent-Registry/01-advanced/url-synchronization/image/image.png differ diff --git a/01-tutorials/10-Agent-Registry/01-advanced/url-synchronization/url_synchronization.ipynb b/01-tutorials/10-Agent-Registry/01-advanced/url-synchronization/url_synchronization.ipynb new file mode 100644 index 000000000..c9c0410b1 --- /dev/null +++ b/01-tutorials/10-Agent-Registry/01-advanced/url-synchronization/url_synchronization.ipynb @@ -0,0 +1,608 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Synchronize MCP and A2A Server Metadata with Registry\n", + "\n", + "Auto-populate registry records by providing an MCP server or A2A agent URL. The system connects to the endpoint, fetches metadata, and populates the record automatically.\n", + "\n", + "![image](image/image.png)\n", + "\n", + "## What You'll Learn\n", + "\n", + "1. Sync a public MCP server (no auth)\n", + "2. Sync an A2A agent card\n", + "3. Sync an OAuth-protected MCP server (Cognito + AgentCore Gateway)\n", + "4. Update and re-sync a record\n", + "5. Handle failure cases\n", + "6. Full governance workflow — Publisher → Approver → Consumer\n", + "\n", + "## Prerequisites\n", + "\n", + "- Updated CLI models with sync support (see `URL_SYNC_GUIDE.md`)\n", + "- IAM permissions including `*WorkloadIdentity`, `GetWorkloadAccessToken`, `GetResourceOauth2Token` (see `IAM_PERMISSIONS.md`)\n", + "- **New registry required** — old registries lack the workload identity needed for OAuth/IAM sync" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "!pip install -q \"boto3>=1.42.87\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import boto3\n", + "import json\n", + "import time\n", + "import os\n", + "\n", + "# Configuration\n", + "AWS_REGION = \"us-west-2\"\n", + "AWS_PROFILE = \"\" # Change to your profile\n", + "\n", + "session = boto3.Session(profile_name=AWS_PROFILE, region_name=AWS_REGION)\n", + "ac = session.client(\"bedrock-agentcore-control\")\n", + "iam = session.client(\"iam\")\n", + "cognito = session.client(\"cognito-idp\")\n", + "sts = session.client(\"sts\")\n", + "\n", + "ACCOUNT_ID = sts.get_caller_identity()[\"Account\"]\n", + "\n", + "def pp(resp):\n", + " \"\"\"Pretty-print API response.\"\"\"\n", + " print(json.dumps({k: v for k, v in resp.items() if k != \"ResponseMetadata\"}, indent=2, default=str))\n", + "\n", + "def wait_record(registry_id, record_id, timeout=60):\n", + " \"\"\"Poll until record reaches a terminal status.\"\"\"\n", + " for _ in range(timeout // 5):\n", + " r = cp.get_registry_record(registryId=registry_id, recordId=record_id)\n", + " status = r[\"status\"]\n", + " print(f\" status: {status}\")\n", + " if status in (\"DRAFT\", \"CREATE_FAILED\", \"APPROVED\"):\n", + " return r\n", + " time.sleep(5)\n", + " return r\n", + "\n", + "print(f\"Account: {ACCOUNT_ID} | Region: {AWS_REGION}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "## 1. Create a New Registry\n", + "\n", + "A new registry is required — it gets a workload identity that enables OAuth/IAM sync." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from utils import (\n", + " create_registry, seed, wait_for_search_index,\n", + " search, delete_registry, get_cp_client, get_dp_client, REGION,\n", + ")\n", + "\n", + "registry = create_registry(name=\"URLSyncTest\", description=\"URL sync testing\")\n", + "REGISTRY_ID = registry[\"registryArn\"].split(\"/\")[-1]\n", + "print(f\"Registry ID: {REGISTRY_ID}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "REGISTRY_ID = registry[\"registryId\"]\n", + "REGISTRY_ARN = registry[\"registryArn\"]\n", + "print(f\"Registry ID: {REGISTRY_ID}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "## 2. Sync a Public MCP Server (No Auth)\n", + "\n", + "The simplest case — just provide a URL." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "cp = get_cp_client()\n", + "r = cp.create_registry_record(\n", + " registryId=REGISTRY_ID,\n", + " name=\"mcp_no_auth\",\n", + " descriptorType=\"MCP\",\n", + " synchronizationType=\"URL\",\n", + " synchronizationConfiguration={\n", + " \"fromUrl\": {\"url\": \"https://knowledge-mcp.global.api.aws\"}\n", + " },\n", + ")\n", + "RECORD_MCP = r[\"recordArn\"].split(\"/\")[-1]\n", + "print(f\"Record: {RECORD_MCP} | Status: {r['status']}\")\n", + "\n", + "# Wait for sync to complete\n", + "for _ in range(24):\n", + " record = cp.get_registry_record(registryId=REGISTRY_ID, recordId=RECORD_MCP)\n", + " status = record.get(\"status\", \"UNKNOWN\")\n", + " print(f\" status: {status}\")\n", + " if status in (\"DRAFT\", \"APPROVED\", \"CREATE_FAILED\"):\n", + " break\n", + " time.sleep(5)\n", + "\n", + "print(f\"\\nName: {record['name']}\")\n", + "print(f\"Status: {record['status']}\")\n", + "\n", + "if record[\"status\"] == \"CREATE_FAILED\":\n", + " print(f\"Reason: {record.get('statusReason', 'unknown')}\")\n", + " print(\"⚠️ Rate limited — delete this record and retry in 60 seconds:\")\n", + " print(f\" cp.delete_registry_record(registryId=REGISTRY_ID, recordId='{RECORD_MCP}')\")\n", + "else:\n", + " tools = json.loads(record[\"descriptors\"][\"mcp\"][\"tools\"][\"inlineContent\"])\n", + " print(f\"Tools synced: {len(tools['tools'])}\")\n", + " for t in tools[\"tools\"]:\n", + " print(f\" - {t['name']}: {t.get('description', '')[:80]}\")\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "## 3. Sync an A2A Agent Card" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "cp = get_cp_client()\n", + "r = cp.create_registry_record(\n", + " registryId=REGISTRY_ID,\n", + " name=\"a2a_agent\",\n", + " descriptorType=\"A2A\",\n", + " synchronizationType=\"URL\",\n", + " synchronizationConfiguration={\n", + " \"fromUrl\": {\"url\": \"https://agent.willform.ai/.well-known/agent-card.json\"}\n", + " },\n", + ")\n", + "RECORD_A2A = r[\"recordArn\"].split(\"/\")[-1]\n", + "print(f\"Record: {RECORD_A2A} | Status: {r['status']}\")\n", + "\n", + "record = wait_record(REGISTRY_ID, RECORD_A2A)\n", + "print(f\"\\nName: {record['name']} | Status: {record['status']}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "## 4. Sync an OAuth-Protected MCP Server\n", + "\n", + "This sets up a Cognito user pool, an AgentCore Gateway with Cognito JWT auth, and an OAuth2 credential provider. Then syncs from the protected gateway." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# 4a. Create Cognito user pool + M2M client\n", + "pool = cognito.create_user_pool(PoolName=\"url-sync-test-pool\")\n", + "POOL_ID = pool[\"UserPool\"][\"Id\"]\n", + "COGNITO_DOMAIN = f\"url-sync-test-{ACCOUNT_ID[:8]}\"\n", + "\n", + "cognito.create_user_pool_domain(UserPoolId=POOL_ID, Domain=COGNITO_DOMAIN)\n", + "cognito.create_resource_server(\n", + " UserPoolId=POOL_ID, Identifier=\"mcp-gateway\", Name=\"MCP Gateway\",\n", + " Scopes=[{\"ScopeName\": \"invoke\", \"ScopeDescription\": \"Invoke MCP\"}],\n", + ")\n", + "\n", + "client = cognito.create_user_pool_client(\n", + " UserPoolId=POOL_ID, ClientName=\"sync-m2m\", GenerateSecret=True,\n", + " AllowedOAuthFlows=[\"client_credentials\"],\n", + " AllowedOAuthScopes=[\"mcp-gateway/invoke\"],\n", + " AllowedOAuthFlowsUserPoolClient=True,\n", + ")\n", + "CLIENT_ID = client[\"UserPoolClient\"][\"ClientId\"]\n", + "CLIENT_SECRET = client[\"UserPoolClient\"][\"ClientSecret\"]\n", + "print(f\"Cognito Pool: {POOL_ID} | Client: {CLIENT_ID}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# 4b. Create gateway with Cognito JWT auth\n", + "gw = ac.create_gateway(\n", + " name=\"url-sync-test-gw\", protocolType=\"MCP\", authorizerType=\"CUSTOM_JWT\",\n", + " authorizerConfiguration={\"customJWTAuthorizer\": {\n", + " \"discoveryUrl\": f\"https://cognito-idp.{AWS_REGION}.amazonaws.com/{POOL_ID}/.well-known/openid-configuration\",\n", + " \"allowedClients\": [CLIENT_ID],\n", + " }},\n", + " roleArn=f\"arn:aws:iam::{ACCOUNT_ID}:role/AgentCoreGatewayExecutionRole\",\n", + ")\n", + "GW_ID = gw[\"gatewayId\"]\n", + "print(f\"Gateway: {GW_ID} — waiting for READY...\")\n", + "\n", + "for _ in range(12):\n", + " s = ac.get_gateway(gatewayIdentifier=GW_ID)[\"status\"]\n", + " if s == \"READY\": break\n", + " time.sleep(5)\n", + "\n", + "ac.create_gateway_target(\n", + " gatewayIdentifier=GW_ID, name=\"knowledge-mcp\",\n", + " targetConfiguration={\"mcp\": {\"mcpServer\": {\"endpoint\": \"https://knowledge-mcp.global.api.aws\"}}},\n", + ")\n", + "MCP_OAUTH_URL = ac.get_gateway(gatewayIdentifier=GW_ID)[\"gatewayUrl\"]\n", + "print(f\"Gateway URL: {MCP_OAUTH_URL}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# 4c. Create OAuth2 credential provider\n", + "TOKEN_EP = f\"https://{COGNITO_DOMAIN}.auth.{AWS_REGION}.amazoncognito.com/oauth2/token\"\n", + "AUTH_EP = f\"https://{COGNITO_DOMAIN}.auth.{AWS_REGION}.amazoncognito.com/oauth2/authorize\"\n", + "ISSUER = f\"https://cognito-idp.{AWS_REGION}.amazonaws.com/{POOL_ID}\"\n", + "\n", + "provider = ac.create_oauth2_credential_provider(\n", + " name=\"url-sync-test-cognito\",\n", + " credentialProviderVendor=\"CognitoOauth2\",\n", + " oauth2ProviderConfigInput={\"includedOauth2ProviderConfig\": {\n", + " \"clientId\": CLIENT_ID, \"clientSecret\": CLIENT_SECRET,\n", + " \"tokenEndpoint\": TOKEN_EP, \"issuer\": ISSUER, \"authorizationEndpoint\": AUTH_EP,\n", + " }},\n", + ")\n", + "OAUTH_ARN = provider[\"credentialProviderArn\"]\n", + "print(f\"OAuth Provider: {OAUTH_ARN}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# 4d. Sync from OAuth-protected gateway\n", + "r = cp.create_registry_record(\n", + " registryId=REGISTRY_ID,\n", + " name=\"mcp_oauth\",\n", + " descriptorType=\"MCP\",\n", + " synchronizationType=\"URL\",\n", + " synchronizationConfiguration={\n", + " \"fromUrl\": {\n", + " \"url\": MCP_OAUTH_URL,\n", + " \"credentialProviderConfigurations\": [{\n", + " \"credentialProviderType\": \"OAUTH\",\n", + " \"credentialProvider\": {\n", + " \"oauthCredentialProvider\": {\n", + " \"providerArn\": OAUTH_ARN,\n", + " \"grantType\": \"CLIENT_CREDENTIALS\",\n", + " \"scopes\": [\"mcp-gateway/invoke\"],\n", + " }\n", + " },\n", + " }],\n", + " }\n", + " },\n", + ")\n", + "RECORD_OAUTH = r[\"recordArn\"].split(\"/\")[-1]\n", + "print(f\"Record: {RECORD_OAUTH} | Status: {r['status']}\")\n", + "\n", + "record = wait_record(REGISTRY_ID, RECORD_OAUTH)\n", + "print(f\"\\nName: {record['name']} | Status: {record['status']}\")\n", + "if record.get(\"statusReason\"):\n", + " print(f\"Reason: {record['statusReason']}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "## 5. Update and Re-sync\n", + "\n", + "Change the sync URL on an existing record and trigger re-sync." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "cp.update_registry_record(\n", + " registryId=REGISTRY_ID,\n", + " recordId=RECORD_MCP,\n", + " synchronizationConfiguration={\n", + " \"optionalValue\": {\"fromUrl\": {\"url\": \"https://knowledge-mcp.global.api.aws\"}}\n", + " },\n", + " triggerSynchronization=True,\n", + ")\n", + "print(\"Re-sync triggered\")\n", + "record = wait_record(REGISTRY_ID, RECORD_MCP)\n", + "print(f\"Name: {record['name']} | Status: {record['status']}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "## 6. Failure Case — Bad URL" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "r = cp.create_registry_record(\n", + " registryId=REGISTRY_ID,\n", + " name=\"bad_url\",\n", + " descriptorType=\"MCP\",\n", + " synchronizationType=\"URL\",\n", + " synchronizationConfiguration={\n", + " \"fromUrl\": {\"url\": \"https://nonexistent.example.com/mcp\"}\n", + " },\n", + ")\n", + "RECORD_BAD = r[\"recordArn\"].split(\"/\")[-1]\n", + "record = wait_record(REGISTRY_ID, RECORD_BAD)\n", + "print(f\"\\nStatus: {record['status']}\")\n", + "print(f\"Reason: {record.get('statusReason', 'N/A')}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "## 7. Registry Record Lifecycle\n", + "\n", + "Walk through the full lifecycle of a registry record — from creation via URL sync, through approval, to consumer discovery. Each step uses a different persona (Publisher, Approver, Consumer) to illustrate the API calls involved." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 7a. Publisher — Create and Submit for Approval" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Publisher: create a record via URL sync\n", + "r = cp.create_registry_record(\n", + " registryId=REGISTRY_ID,\n", + " name=\"publisher_mcp_record\",\n", + " descriptorType=\"MCP\",\n", + " synchronizationType=\"URL\",\n", + " synchronizationConfiguration={\n", + " \"fromUrl\": {\"url\": \"https://knowledge-mcp.global.api.aws\"}\n", + " },\n", + ")\n", + "RECORD_GOV = r[\"recordArn\"].split(\"/\")[-1]\n", + "record = wait_record(REGISTRY_ID, RECORD_GOV)\n", + "print(f\"Publisher created: {record['name']} | Status: {record['status']}\")\n", + "\n", + "# Publisher: submit for approval\n", + "cp.submit_registry_record_for_approval(\n", + " registryId=REGISTRY_ID, recordId=RECORD_GOV\n", + ")\n", + "record = cp.get_registry_record(registryId=REGISTRY_ID, recordId=RECORD_GOV)\n", + "print(f\"Publisher submitted → Status: {record['status']}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 7b. Approver — Review and Approve" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Approver: review the record\n", + "record = cp.get_registry_record(registryId=REGISTRY_ID, recordId=RECORD_GOV)\n", + "print(f\"Approver reviewing: {record['name']}\")\n", + "print(f\" Status: {record['status']}\")\n", + "print(f\" Type: {record['descriptorType']}\")\n", + "\n", + "# Show tools that were auto-populated\n", + "tools = json.loads(record[\"descriptors\"][\"mcp\"][\"tools\"][\"inlineContent\"])\n", + "print(f\" Tools ({len(tools['tools'])}):\\n\")\n", + "for t in tools[\"tools\"]:\n", + " print(f\" - {t['name']}: {t.get('description', '')[:60]}\")\n", + "\n", + "# Approver: approve the record\n", + "cp.update_registry_record_status(\n", + " registryId=REGISTRY_ID,\n", + " recordId=RECORD_GOV,\n", + " status=\"APPROVED\",\n", + ")\n", + "record = cp.get_registry_record(registryId=REGISTRY_ID, recordId=RECORD_GOV)\n", + "print(f\"\\nApprover approved → Status: {record['status']}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 7c. Consumer — Discover via Search" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Consumer: search for approved records\n", + "dp = session.client(\n", + " \"bedrock-agentcore\",\n", + ")\n", + "\n", + "results = dp.search_registry_records(\n", + " registryIds=[REGISTRY_ID],\n", + " searchQuery=\"documentation\",\n", + ")\n", + "\n", + "print(f\"Consumer search results ({len(results.get('registryRecords', []))}):\\n\")\n", + "for rec in results.get(\"registryRecords\", []):\n", + " print(f\" - {rec['name']} [{rec['descriptorType']}] — {rec['status']}\")\n", + " print(f\" {rec.get('description', 'No description')[:80]}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 7d. Publisher — Update Approved Record (triggers re-review)\n", + "\n", + "When a publisher updates an approved record and re-syncs, it moves back to `DRAFT`. The approved revision remains searchable until the new version is approved." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Publisher: update and re-sync an approved record\n", + "cp.update_registry_record(\n", + " registryId=REGISTRY_ID,\n", + " recordId=RECORD_GOV,\n", + " synchronizationConfiguration={\n", + " \"optionalValue\": {\"fromUrl\": {\"url\": \"https://knowledge-mcp.global.api.aws\"}}\n", + " },\n", + " triggerSynchronization=True,\n", + ")\n", + "record = wait_record(REGISTRY_ID, RECORD_GOV)\n", + "print(f\"\\nPublisher re-synced → Status: {record['status']}\")\n", + "print(\"(Approved revision still searchable by consumers until this version is re-approved)\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "## 8. Cleanup\n", + "\n", + "Delete all test resources." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Delete records\n", + "records = cp.list_registry_records(registryId=REGISTRY_ID)\n", + "for rec in records.get(\"registryRecords\", []):\n", + " rid = rec[\"recordId\"]\n", + " print(f\"Deleting record {rid}...\")\n", + " cp.delete_registry_record(registryId=REGISTRY_ID, recordId=rid)\n", + "\n", + "time.sleep(10)\n", + "\n", + "# Delete registry\n", + "print(\"Deleting registry...\")\n", + "cp.delete_registry(registryId=REGISTRY_ID)\n", + "\n", + "# Delete gateway targets + gateway\n", + "try:\n", + " targets = ac.list_gateway_targets(gatewayIdentifier=GW_ID)\n", + " for t in targets.get(\"items\", []):\n", + " ac.delete_gateway_target(gatewayIdentifier=GW_ID, targetId=t[\"targetId\"])\n", + " time.sleep(5)\n", + " ac.delete_gateway(gatewayIdentifier=GW_ID)\n", + " print(f\"Deleted gateway {GW_ID}\")\n", + "except: pass\n", + "\n", + "# Delete OAuth provider\n", + "try:\n", + " ac.delete_oauth2_credential_provider(name=\"url-sync-test-cognito\")\n", + " print(\"Deleted OAuth provider\")\n", + "except: pass\n", + "\n", + "# Delete Cognito\n", + "try:\n", + " cognito.delete_user_pool_domain(UserPoolId=POOL_ID, Domain=COGNITO_DOMAIN)\n", + " cognito.delete_user_pool(UserPoolId=POOL_ID)\n", + " print(f\"Deleted Cognito pool {POOL_ID}\")\n", + "except: pass\n", + "\n", + "print(\"\\nCleanup complete!\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.14.2" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/01-tutorials/10-Agent-Registry/01-advanced/url-synchronization/utils.py b/01-tutorials/10-Agent-Registry/01-advanced/url-synchronization/utils.py new file mode 100644 index 000000000..1dcf2bb6a --- /dev/null +++ b/01-tutorials/10-Agent-Registry/01-advanced/url-synchronization/utils.py @@ -0,0 +1,241 @@ +"""Utilities for AWS Agent Registry: create, seed, search, and cleanup. + +Seeds the registry with inline MCP records (small, focused tool sets) plus +a public MCP server via URL sync. Requires boto3>=1.42.87. +""" + +import boto3 +import json +import time +import logging +import os +from pathlib import Path +from dotenv import load_dotenv + +load_dotenv(Path(__file__).parent / ".env") +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") +logger = logging.getLogger(__name__) + +REGION = os.getenv("AWS_REGION", "us-west-2") + + +def get_session(): + return boto3.Session(region_name=REGION) + + +def get_cp_client(): + return get_session().client("bedrock-agentcore-control", region_name=REGION) + + +def get_dp_client(): + return get_session().client("bedrock-agentcore", region_name=REGION) + + +# ── Registry ────────────────────────────────────────────────────────────────── + +def create_registry(name="tool-provider-registry", description="Registry for RegistryToolProvider demo"): + cp = get_cp_client() + logger.info("Creating registry '%s'...", name) + resp = cp.create_registry( + name=name, description=description, + approvalConfiguration={"autoApproval": False}, + ) + registry_arn = resp["registryArn"] + registry_id = registry_arn.split("/")[-1] + + for _ in range(30): + time.sleep(5) + status = cp.get_registry(registryId=registry_id).get("status", "UNKNOWN") + logger.info(" status: %s", status) + if status == "READY": + break + logger.info("✅ Registry ready: %s", registry_id) + return {"registryId": registry_id, "registryArn": registry_arn} + + +# ── Inline MCP + URL Sync Records ──────────────────────────────────────────── + +INLINE_RECORDS = [ + { + "name": "weather_tools_mcp", + "description": "Weather MCP server — current conditions and multi-day forecasts for any city worldwide", + "server_info": { + "name": "io.example/weather-tools", + "description": "MCP server for weather data", + "version": "1.0.0", + "packages": [{"registryType": "npm", "identifier": "@example/weather-mcp", "version": "1.0.0", "transport": {"type": "stdio"}}], + }, + "tools": [ + {"name": "get_current_weather", "description": "Get current weather conditions for a city including temperature, humidity, and wind speed", + "inputSchema": {"type": "object", "properties": {"city": {"type": "string", "description": "City name (e.g. Seattle)"}, "units": {"type": "string", "enum": ["celsius", "fahrenheit"]}}, "required": ["city"]}}, + {"name": "get_weather_forecast", "description": "Get a multi-day weather forecast for a city", + "inputSchema": {"type": "object", "properties": {"city": {"type": "string"}, "days": {"type": "integer", "description": "Number of forecast days (1-5)"}}, "required": ["city"]}}, + ], + }, + { + "name": "order_management_mcp", + "description": "Order management MCP server — track order status, list orders, and create new e-commerce orders", + "server_info": { + "name": "io.example/order-management", + "description": "MCP server for order tracking and management", + "version": "1.0.0", + "packages": [{"registryType": "npm", "identifier": "@example/order-mcp", "version": "1.0.0", "transport": {"type": "stdio"}}], + }, + "tools": [ + {"name": "get_order_status", "description": "Get the current status and details of an order by order ID", + "inputSchema": {"type": "object", "properties": {"order_id": {"type": "string", "description": "The order identifier"}}, "required": ["order_id"]}}, + {"name": "list_orders", "description": "List orders optionally filtered by status (pending, shipped, delivered)", + "inputSchema": {"type": "object", "properties": {"status": {"type": "string", "enum": ["pending", "shipped", "delivered", "ALL"]}}}}, + {"name": "create_order", "description": "Create a new order for a customer with product and quantity", + "inputSchema": {"type": "object", "properties": {"customer_name": {"type": "string"}, "product": {"type": "string"}, "quantity": {"type": "integer"}}, "required": ["customer_name", "product", "quantity"]}}, + ], + }, + { + "name": "inventory_tools_mcp", + "description": "Inventory MCP server — check real-time stock levels across warehouses and search products by name or category", + "server_info": { + "name": "io.example/inventory-tools", + "description": "MCP server for product inventory and stock levels", + "version": "1.0.0", + "packages": [{"registryType": "npm", "identifier": "@example/inventory-mcp", "version": "1.0.0", "transport": {"type": "stdio"}}], + }, + "tools": [ + {"name": "check_inventory", "description": "Check stock levels for a product SKU across warehouses", + "inputSchema": {"type": "object", "properties": {"sku": {"type": "string", "description": "Product SKU"}, "warehouse": {"type": "string", "description": "Warehouse ID (optional)"}}, "required": ["sku"]}}, + {"name": "search_products", "description": "Search products by name or category", + "inputSchema": {"type": "object", "properties": {"query": {"type": "string", "description": "Product name or keyword"}, "category": {"type": "string"}}, "required": ["query"]}}, + ], + }, +] + +URL_SYNC_SERVER = { + "name": "aws_knowledge_mcp", + "url": "https://knowledge-mcp.global.api.aws", + "description": "AWS Knowledge MCP — search AWS documentation, guides, and best practices", +} + + +def _wait_for_draft(cp, registry_id, record_id, timeout=120): + """Poll until record reaches DRAFT (or terminal) status.""" + for _ in range(timeout // 5): + r = cp.get_registry_record(registryId=registry_id, recordId=record_id) + status = r.get("status", "UNKNOWN") + if status in ("DRAFT", "APPROVED", "CREATE_FAILED", "PENDING_APPROVAL"): + return r + time.sleep(5) + return r + + +def _approve(cp, registry_id, record_id, name=""): + """Submit for approval and approve.""" + cp.submit_registry_record_for_approval(registryId=registry_id, recordId=record_id) + cp.update_registry_record_status( + registryId=registry_id, recordId=record_id, + status="APPROVED", statusReason="Auto-approved for demo", + ) + logger.info(" ✅ %s approved", name) + + +def seed(registry_id, include_url_sync=True): + """Seed registry with inline MCP records + optional URL sync. + + Returns list of {name, recordId, tool_count} dicts. + """ + cp = get_cp_client() + results = [] + + # 1. Inline MCP records — uses the same pattern as getting-started-registry-end-to-end + for rec in INLINE_RECORDS: + logger.info("Creating '%s' (MCP, %d tools)...", rec["name"], len(rec["tools"])) + try: + resp = cp.create_registry_record( + registryId=registry_id, + name=rec["name"], + description=rec["description"], + descriptorType="MCP", + descriptors={"mcp": { + "server": {"inlineContent": json.dumps(rec["server_info"])}, + "tools": {"inlineContent": json.dumps({"tools": rec["tools"]})}, + }}, + recordVersion="1.0", + ) + record_id = resp["recordArn"].split("/")[-1] + logger.info(" Created: %s", record_id) + except Exception as e: + logger.error(" Failed: %s", e) + continue + + r = _wait_for_draft(cp, registry_id, record_id) + if r.get("status") == "CREATE_FAILED": + logger.error(" ❌ %s failed: %s", rec["name"], r.get("statusReason", "")) + continue + + _approve(cp, registry_id, record_id, rec["name"]) + results.append({"name": rec["name"], "recordId": record_id, "tool_count": len(rec["tools"])}) + + # 2. URL sync (AWS Knowledge MCP — may be rate-limited) + if include_url_sync: + server = URL_SYNC_SERVER + logger.info("Registering '%s' via URL sync: %s", server["name"], server["url"]) + try: + resp = cp.create_registry_record( + registryId=registry_id, + name=server["name"], + description=server.get("description", ""), + descriptorType="MCP", + synchronizationType="URL", + synchronizationConfiguration={"fromUrl": {"url": server["url"]}}, + ) + record_id = resp["recordArn"].split("/")[-1] + r = _wait_for_draft(cp, registry_id, record_id) + if r.get("status") == "CREATE_FAILED": + logger.warning(" ⚠️ URL sync failed: %s — skipping", r.get("statusReason", "")) + cp.delete_registry_record(registryId=registry_id, recordId=record_id) + else: + tools = [] + try: + tools = json.loads(r["descriptors"]["mcp"]["tools"]["inlineContent"]).get("tools", []) + except Exception: + pass + _approve(cp, registry_id, record_id, server["name"]) + results.append({"name": server["name"], "recordId": record_id, "tool_count": len(tools)}) + except Exception as e: + logger.warning(" ⚠️ URL sync skipped: %s", e) + + return results + + +def wait_for_search_index(registry_id, expected_count, query="weather orders inventory documentation", max_wait=120): + dp = get_dp_client() + logger.info("Waiting for search index (%d records)...", expected_count) + for _ in range(max_wait // 10): + time.sleep(10) + resp = dp.search_registry_records( + registryIds=[registry_id], searchQuery=query, maxResults=10, + ) + found = len(resp.get("registryRecords", [])) + if found >= expected_count: + logger.info(" All %d records indexed.", found) + return + logger.info(" %d/%d indexed — waiting...", found, expected_count) + + +def search(query, registry_id, max_results=10): + dp = get_dp_client() + resp = dp.search_registry_records( + registryIds=[registry_id], searchQuery=query, maxResults=max_results, + ) + return resp.get("registryRecords", []) + + +def delete_registry(registry_id): + cp = get_cp_client() + records = cp.list_registry_records(registryId=registry_id).get("registryRecords", []) + for rec in records: + rid = rec["recordId"] + logger.info("Deleting record %s (%s)...", rec.get("name", ""), rid) + cp.delete_registry_record(registryId=registry_id, recordId=rid) + time.sleep(5) + logger.info("Deleting registry %s...", registry_id) + cp.delete_registry(registryId=registry_id) + logger.info("✅ Registry deleted")