diff --git a/README.md b/README.md index 2805f4a..6911eeb 100644 --- a/README.md +++ b/README.md @@ -505,6 +505,42 @@ is_authorized = await verify_agent_for_property( See `examples/adagents_validation.py` for complete examples. +### Authorization Discovery + +Discover which publishers have authorized your agent using two approaches: + +**1. "Push" Approach** - Ask the agent (recommended, fastest): +```python +from adcp import ADCPClient + +async with ADCPClient(agent_config) as client: + # Single API call to agent + response = await client.simple.list_authorized_properties() + print(f"Authorized for: {response.publisher_domains}") +``` + +**2. "Pull" Approach** - Check publisher adagents.json files (when you need property details): +```python +from adcp import fetch_agent_authorizations + +# Check specific publishers (fetches in parallel) +contexts = await fetch_agent_authorizations( + "https://our-sales-agent.com", + ["nytimes.com", "wsj.com", "cnn.com"] +) + +for domain, ctx in contexts.items(): + print(f"{domain}:") + print(f" Property IDs: {ctx.property_ids}") + print(f" Tags: {ctx.property_tags}") +``` + +**When to use which:** +- **Push**: Quick discovery, portfolio overview, high-level authorization check +- **Pull**: Property-level details, specific publisher list, works offline + +See `examples/fetch_agent_authorizations.py` for complete examples. + ## CLI Tool The `adcp` command-line tool provides easy interaction with AdCP agents without writing code. diff --git a/examples/fetch_agent_authorizations.py b/examples/fetch_agent_authorizations.py new file mode 100644 index 0000000..5fe0da7 --- /dev/null +++ b/examples/fetch_agent_authorizations.py @@ -0,0 +1,144 @@ +""" +Example showing how to discover which publishers have authorized your agent. + +This example demonstrates TWO approaches: + +1. "Push" approach - Ask the agent what it's authorized for: + - Use the agent's list_authorized_properties endpoint + - Agent tells you which publisher domains it represents + - Fast and efficient - single API call + +2. "Pull" approach - Check publisher adagents.json files: + - Use fetch_agent_authorizations to check multiple publishers + - Fetch adagents.json from each publisher's .well-known directory + - Useful when you have a specific list of publishers to check + - Supports connection pooling for better performance +""" + +import asyncio + +from adcp import ADCPClient, AgentConfig, Protocol, fetch_agent_authorizations + + +async def approach_1_push(): + """APPROACH 1: Ask the agent what it's authorized for (RECOMMENDED).""" + print("=" * 70) + print("APPROACH 1: Push - Ask agent what it's authorized for") + print("=" * 70) + print() + + # Configure the agent client + agent_config = AgentConfig( + id="sales_agent", + agent_uri="https://our-sales-agent.com", + protocol=Protocol.A2A, + ) + + async with ADCPClient(agent_config) as client: + # Ask the agent directly what publishers it represents + # This is fast - just one API call! + response = await client.simple.list_authorized_properties() + + print(f"✅ Agent represents {len(response.publisher_domains)} publishers:\n") + + for domain in response.publisher_domains: + print(f" • {domain}") + + print() + print("📊 Portfolio Summary:") + if response.primary_channels: + print(f" Primary Channels: {', '.join(response.primary_channels)}") + if response.primary_countries: + print(f" Primary Countries: {', '.join(response.primary_countries)}") + if response.portfolio_description: + print(f" Description: {response.portfolio_description[:100]}...") + + print() + print("💡 TIP: Now fetch each publisher's adagents.json to see property details") + print() + + +async def approach_2_pull(): + """APPROACH 2: Check publisher adagents.json files (when you know which publishers to check).""" + print("=" * 70) + print("APPROACH 2: Pull - Check specific publisher adagents.json files") + print("=" * 70) + print() + + # Your agent's URL + agent_url = "https://our-sales-agent.com" + + # Publisher domains to check + publisher_domains = [ + "nytimes.com", + "wsj.com", + "cnn.com", + "espn.com", + "techcrunch.com", + ] + + print(f"Checking authorization for {agent_url} across {len(publisher_domains)} publishers...\n") + + # Fetch authorization contexts (fetches all in parallel) + contexts = await fetch_agent_authorizations(agent_url, publisher_domains) + + # Display results + if not contexts: + print("No authorizations found.") + return + + print(f"✅ Authorized for {len(contexts)}/{len(publisher_domains)} publishers:\n") + + for domain, ctx in contexts.items(): + print(f"{domain}:") + print(f" Property IDs: {ctx.property_ids}") + print(f" Property Tags: {ctx.property_tags}") + print(f" Total Properties: {len(ctx.raw_properties)}") + print() + + # Example: Check if specific tags are available + all_tags = set() + for ctx in contexts.values(): + all_tags.update(ctx.property_tags) + + print(f"📊 Total unique tags across all publishers: {len(all_tags)}") + print(f"Tags: {sorted(all_tags)}") + print() + + +async def main(): + """Demonstrate both approaches.""" + # APPROACH 1: Fast - ask agent what it's authorized for + await approach_1_push() + + print("\n" + "=" * 70 + "\n") + + # APPROACH 2: Check specific publishers + await approach_2_pull() + + +async def main_with_connection_pooling(): + """More efficient version using connection pooling for multiple requests.""" + import httpx + + agent_url = "https://our-sales-agent.com" + publisher_domains = ["nytimes.com", "wsj.com", "cnn.com"] + + # Use a shared HTTP client for connection pooling + async with httpx.AsyncClient( + limits=httpx.Limits(max_keepalive_connections=10, max_connections=20) + ) as client: + print("Using connection pooling for better performance...\n") + + contexts = await fetch_agent_authorizations(agent_url, publisher_domains, client=client) + + for domain, ctx in contexts.items(): + print(f"{domain}: {len(ctx.property_ids)} properties") + + +if __name__ == "__main__": + # Run basic example + asyncio.run(main()) + + # Uncomment to run connection pooling example + # asyncio.run(main_with_connection_pooling()) diff --git a/src/adcp/__init__.py b/src/adcp/__init__.py index 597335e..ef03915 100644 --- a/src/adcp/__init__.py +++ b/src/adcp/__init__.py @@ -8,8 +8,10 @@ """ from adcp.adagents import ( + AuthorizationContext, domain_matches, fetch_adagents, + fetch_agent_authorizations, get_all_properties, get_all_tags, get_properties_by_agent, @@ -178,7 +180,9 @@ "Product", "Property", # Adagents validation + "AuthorizationContext", "fetch_adagents", + "fetch_agent_authorizations", "verify_agent_authorization", "verify_agent_for_property", "domain_matches", diff --git a/src/adcp/adagents.py b/src/adcp/adagents.py index dd788b0..a05433f 100644 --- a/src/adcp/adagents.py +++ b/src/adcp/adagents.py @@ -518,3 +518,125 @@ def get_properties_by_agent(adagents_data: dict[str, Any], agent_url: str) -> li return [p for p in properties if isinstance(p, dict)] return [] + + +class AuthorizationContext: + """Authorization context for a publisher domain. + + Attributes: + property_ids: List of property IDs the agent is authorized for + property_tags: List of property tags the agent is authorized for + raw_properties: Raw property data from adagents.json + """ + + def __init__(self, properties: list[dict[str, Any]]): + """Initialize from list of properties. + + Args: + properties: List of property dictionaries from adagents.json + """ + self.property_ids: list[str] = [] + self.property_tags: list[str] = [] + self.raw_properties = properties + + # Extract property IDs and tags + for prop in properties: + if not isinstance(prop, dict): + continue + + # Extract property ID + prop_id = prop.get("id") + if prop_id and isinstance(prop_id, str): + self.property_ids.append(prop_id) + + # Extract tags + tags = prop.get("tags", []) + if isinstance(tags, list): + for tag in tags: + if isinstance(tag, str) and tag not in self.property_tags: + self.property_tags.append(tag) + + def __repr__(self) -> str: + return ( + f"AuthorizationContext(" + f"property_ids={self.property_ids}, " + f"property_tags={self.property_tags})" + ) + + +async def fetch_agent_authorizations( + agent_url: str, + publisher_domains: list[str], + timeout: float = 10.0, + client: httpx.AsyncClient | None = None, +) -> dict[str, AuthorizationContext]: + """Fetch authorization contexts by checking publisher adagents.json files. + + This function discovers what publishers have authorized your agent by fetching + their adagents.json files from the .well-known directory and extracting the + properties your agent can access. + + This is the "pull" approach - you query publishers to see if they've authorized you. + For the "push" approach where the agent tells you what it's authorized for, + use the agent's list_authorized_properties endpoint via ADCPClient. + + Args: + agent_url: URL of your sales agent + publisher_domains: List of publisher domains to check (e.g., ["nytimes.com", "wsj.com"]) + timeout: Request timeout in seconds for each fetch + client: Optional httpx.AsyncClient for connection pooling + + Returns: + Dictionary mapping publisher domain to AuthorizationContext. + Only includes domains where the agent is authorized. + + Example: + >>> # "Pull" approach - check what publishers have authorized you + >>> contexts = await fetch_agent_authorizations( + ... "https://our-sales-agent.com", + ... ["nytimes.com", "wsj.com", "cnn.com"] + ... ) + >>> for domain, ctx in contexts.items(): + ... print(f"{domain}:") + ... print(f" Property IDs: {ctx.property_ids}") + ... print(f" Tags: {ctx.property_tags}") + + See Also: + ADCPClient.list_authorized_properties: "Push" approach using the agent's API + + Notes: + - Silently skips domains where adagents.json is not found or invalid + - Only returns domains where the agent is explicitly authorized + - For production use with many domains, pass a shared httpx.AsyncClient + to enable connection pooling + """ + import asyncio + + # Create tasks to fetch all adagents.json files in parallel + async def fetch_authorization_for_domain( + domain: str, + ) -> tuple[str, AuthorizationContext | None]: + """Fetch authorization context for a single domain.""" + try: + adagents_data = await fetch_adagents(domain, timeout=timeout, client=client) + + # Check if agent is authorized + if not verify_agent_authorization(adagents_data, agent_url): + return (domain, None) + + # Get properties for this agent + properties = get_properties_by_agent(adagents_data, agent_url) + + # Create authorization context + return (domain, AuthorizationContext(properties)) + + except (AdagentsNotFoundError, AdagentsValidationError, AdagentsTimeoutError): + # Silently skip domains with missing or invalid adagents.json + return (domain, None) + + # Fetch all domains in parallel + tasks = [fetch_authorization_for_domain(domain) for domain in publisher_domains] + results = await asyncio.gather(*tasks) + + # Build result dictionary, filtering out None values + return {domain: ctx for domain, ctx in results if ctx is not None} diff --git a/tests/test_adagents.py b/tests/test_adagents.py index 4f0f9fd..4975a60 100644 --- a/tests/test_adagents.py +++ b/tests/test_adagents.py @@ -7,9 +7,11 @@ import pytest from adcp.adagents import ( + AuthorizationContext, _normalize_domain, _validate_publisher_domain, domain_matches, + fetch_agent_authorizations, get_all_properties, get_all_tags, get_properties_by_agent, @@ -651,3 +653,363 @@ def test_get_properties_by_agent_not_found(self): properties = get_properties_by_agent(adagents_data, "https://unknown-agent.com") assert len(properties) == 0 + + +class TestAuthorizationContext: + """Test AuthorizationContext class.""" + + def test_extract_property_ids(self): + """Should extract property IDs from properties.""" + properties = [ + { + "id": "prop1", + "property_type": "website", + "name": "Site 1", + "identifiers": [{"type": "domain", "value": "site1.com"}], + }, + { + "id": "prop2", + "property_type": "mobile_app", + "name": "App 1", + "identifiers": [{"type": "bundle_id", "value": "com.site1.app"}], + }, + ] + + ctx = AuthorizationContext(properties) + assert ctx.property_ids == ["prop1", "prop2"] + + def test_extract_property_tags(self): + """Should extract unique tags from properties.""" + properties = [ + { + "id": "prop1", + "property_type": "website", + "name": "Site 1", + "tags": ["premium", "news"], + }, + { + "id": "prop2", + "property_type": "website", + "name": "Site 2", + "tags": ["premium", "sports"], + }, + ] + + ctx = AuthorizationContext(properties) + assert set(ctx.property_tags) == {"premium", "news", "sports"} + + def test_deduplicate_tags(self): + """Should deduplicate tags.""" + properties = [ + { + "id": "prop1", + "tags": ["premium", "news"], + }, + { + "id": "prop2", + "tags": ["premium", "sports"], + }, + ] + + ctx = AuthorizationContext(properties) + # Each tag should appear only once + assert ctx.property_tags.count("premium") == 1 + + def test_handle_missing_fields(self): + """Should handle properties without ID or tags.""" + properties = [ + { + "property_type": "website", + "name": "Site 1", + } + ] + + ctx = AuthorizationContext(properties) + assert ctx.property_ids == [] + assert ctx.property_tags == [] + + def test_raw_properties_preserved(self): + """Should preserve raw properties data.""" + properties = [ + { + "id": "prop1", + "property_type": "website", + "name": "Site 1", + "custom_field": "custom_value", + } + ] + + ctx = AuthorizationContext(properties) + assert ctx.raw_properties == properties + assert ctx.raw_properties[0]["custom_field"] == "custom_value" + + def test_repr(self): + """Should have useful string representation.""" + properties = [ + { + "id": "prop1", + "tags": ["premium"], + } + ] + + ctx = AuthorizationContext(properties) + repr_str = repr(ctx) + assert "AuthorizationContext" in repr_str + assert "property_ids" in repr_str + assert "property_tags" in repr_str + + +@pytest.mark.asyncio +class TestFetchAgentAuthorizations: + """Test fetch_agent_authorizations function.""" + + async def test_single_publisher_authorized(self): + """Should return authorization context for authorized publisher.""" + from unittest.mock import patch + + # Mock adagents.json data + adagents_data = { + "authorized_agents": [ + { + "url": "https://our-agent.com", + "properties": [ + { + "id": "prop1", + "property_type": "website", + "name": "Site 1", + "identifiers": [{"type": "domain", "value": "nytimes.com"}], + "tags": ["premium", "news"], + } + ], + } + ] + } + + # Mock fetch_adagents to return our test data + with patch("adcp.adagents.fetch_adagents", new_callable=AsyncMock) as mock_fetch: + mock_fetch.return_value = adagents_data + + contexts = await fetch_agent_authorizations("https://our-agent.com", ["nytimes.com"]) + + assert len(contexts) == 1 + assert "nytimes.com" in contexts + ctx = contexts["nytimes.com"] + assert ctx.property_ids == ["prop1"] + assert "premium" in ctx.property_tags + assert "news" in ctx.property_tags + + async def test_multiple_publishers(self): + """Should fetch and return contexts for multiple publishers in parallel.""" + from unittest.mock import patch + + # Mock adagents.json data for different publishers + nytimes_data = { + "authorized_agents": [ + { + "url": "https://our-agent.com", + "properties": [ + { + "id": "nyt_prop1", + "tags": ["news"], + } + ], + } + ] + } + + wsj_data = { + "authorized_agents": [ + { + "url": "https://our-agent.com", + "properties": [ + { + "id": "wsj_prop1", + "tags": ["business"], + } + ], + } + ] + } + + async def mock_fetch_adagents(domain, **kwargs): + if domain == "nytimes.com": + return nytimes_data + elif domain == "wsj.com": + return wsj_data + else: + raise Exception("Unexpected domain") + + with patch("adcp.adagents.fetch_adagents", side_effect=mock_fetch_adagents): + contexts = await fetch_agent_authorizations( + "https://our-agent.com", ["nytimes.com", "wsj.com"] + ) + + assert len(contexts) == 2 + assert "nytimes.com" in contexts + assert "wsj.com" in contexts + assert contexts["nytimes.com"].property_ids == ["nyt_prop1"] + assert contexts["wsj.com"].property_ids == ["wsj_prop1"] + + async def test_skip_unauthorized_publishers(self): + """Should skip publishers where agent is not authorized.""" + from unittest.mock import patch + + # nytimes authorizes our agent + nytimes_data = { + "authorized_agents": [ + { + "url": "https://our-agent.com", + "properties": [{"id": "prop1"}], + } + ] + } + + # wsj does NOT authorize our agent + wsj_data = { + "authorized_agents": [ + { + "url": "https://different-agent.com", + "properties": [{"id": "prop2"}], + } + ] + } + + async def mock_fetch_adagents(domain, **kwargs): + if domain == "nytimes.com": + return nytimes_data + elif domain == "wsj.com": + return wsj_data + else: + raise Exception("Unexpected domain") + + with patch("adcp.adagents.fetch_adagents", side_effect=mock_fetch_adagents): + contexts = await fetch_agent_authorizations( + "https://our-agent.com", ["nytimes.com", "wsj.com"] + ) + + # Should only include nytimes + assert len(contexts) == 1 + assert "nytimes.com" in contexts + assert "wsj.com" not in contexts + + async def test_skip_missing_adagents_json(self): + """Should silently skip publishers with missing adagents.json.""" + from unittest.mock import patch + + from adcp.exceptions import AdagentsNotFoundError + + # nytimes has adagents.json + nytimes_data = { + "authorized_agents": [ + { + "url": "https://our-agent.com", + "properties": [{"id": "prop1"}], + } + ] + } + + async def mock_fetch_adagents(domain, **kwargs): + if domain == "nytimes.com": + return nytimes_data + elif domain == "wsj.com": + # wsj doesn't have adagents.json (404) + raise AdagentsNotFoundError("wsj.com") + else: + raise Exception("Unexpected domain") + + with patch("adcp.adagents.fetch_adagents", side_effect=mock_fetch_adagents): + contexts = await fetch_agent_authorizations( + "https://our-agent.com", ["nytimes.com", "wsj.com"] + ) + + # Should only include nytimes + assert len(contexts) == 1 + assert "nytimes.com" in contexts + assert "wsj.com" not in contexts + + async def test_skip_invalid_adagents_json(self): + """Should silently skip publishers with invalid adagents.json.""" + from unittest.mock import patch + + from adcp.exceptions import AdagentsValidationError + + nytimes_data = { + "authorized_agents": [ + { + "url": "https://our-agent.com", + "properties": [{"id": "prop1"}], + } + ] + } + + async def mock_fetch_adagents(domain, **kwargs): + if domain == "nytimes.com": + return nytimes_data + elif domain == "wsj.com": + # wsj has invalid adagents.json + raise AdagentsValidationError("Invalid JSON") + else: + raise Exception("Unexpected domain") + + with patch("adcp.adagents.fetch_adagents", side_effect=mock_fetch_adagents): + contexts = await fetch_agent_authorizations( + "https://our-agent.com", ["nytimes.com", "wsj.com"] + ) + + # Should only include nytimes + assert len(contexts) == 1 + assert "nytimes.com" in contexts + assert "wsj.com" not in contexts + + async def test_empty_result_when_no_authorizations(self): + """Should return empty dict when no publishers authorize the agent.""" + from unittest.mock import patch + + # No publishers authorize our agent + adagents_data = { + "authorized_agents": [ + { + "url": "https://different-agent.com", + "properties": [{"id": "prop1"}], + } + ] + } + + with patch("adcp.adagents.fetch_adagents", new_callable=AsyncMock) as mock_fetch: + mock_fetch.return_value = adagents_data + + contexts = await fetch_agent_authorizations( + "https://our-agent.com", ["nytimes.com", "wsj.com"] + ) + + assert len(contexts) == 0 + assert contexts == {} + + async def test_uses_provided_http_client(self): + """Should use provided HTTP client for connection pooling.""" + from unittest.mock import MagicMock, patch + + import httpx + + adagents_data = { + "authorized_agents": [ + { + "url": "https://our-agent.com", + "properties": [{"id": "prop1"}], + } + ] + } + + mock_client = MagicMock(spec=httpx.AsyncClient) + + with patch("adcp.adagents.fetch_adagents", new_callable=AsyncMock) as mock_fetch: + mock_fetch.return_value = adagents_data + + await fetch_agent_authorizations( + "https://our-agent.com", ["nytimes.com"], client=mock_client + ) + + # Verify fetch_adagents was called with the provided client + mock_fetch.assert_called_once() + call_kwargs = mock_fetch.call_args[1] + assert call_kwargs.get("client") == mock_client