Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
485 changes: 485 additions & 0 deletions docs/registry-guide.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ select = [
]
exclude = [
"src/amp/FlightSql_pb2.py", # Generated protobuf file
"src/amp/registry/models.py", # Generated from OpenAPI spec
"*notebook*"
]

Expand Down
3 changes: 2 additions & 1 deletion src/amp/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Amp - Flight SQL client with comprehensive data loading capabilities."""

from amp.client import Client, QueryBuilder
from amp.registry import RegistryClient

__all__ = ['Client', 'QueryBuilder']
__all__ = ['Client', 'QueryBuilder', 'RegistryClient']
75 changes: 67 additions & 8 deletions src/amp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,13 @@ class Client:
"""Enhanced Flight SQL client with data loading capabilities.

Supports both query operations (via Flight SQL) and optional admin operations
(via HTTP Admin API).
(via HTTP Admin API) and registry operations (via Registry API).

Args:
url: Flight SQL URL (for backward compatibility, treated as query_url)
query_url: Query endpoint URL via Flight SQL (e.g., 'grpc://localhost:1602')
admin_url: Optional Admin API URL (e.g., 'http://localhost:8080')
registry_url: Optional Registry API URL (default: staging registry)
auth_token: Optional Bearer token for authentication (highest priority)
auth: If True, load auth token from ~/.amp/cache (shared with TS CLI)

Expand All @@ -293,13 +294,23 @@ class Client:
>>> # Client with auth from environment variable
>>> # export AMP_AUTH_TOKEN="eyJhbGci..."
>>> client = Client(query_url='grpc://localhost:1602')
>>>
>>> # Client with registry support
>>> client = Client(
... query_url='grpc://localhost:1602',
... admin_url='http://localhost:8080',
... registry_url='https://api.registry.amp.staging.thegraph.com',
... auth=True
... )
>>> results = client.registry.datasets.search('ethereum')
"""

def __init__(
self,
url: Optional[str] = None,
query_url: Optional[str] = None,
admin_url: Optional[str] = None,
registry_url: str = 'https://api.registry.amp.staging.thegraph.com',
auth_token: Optional[str] = None,
auth: bool = False,
):
Expand Down Expand Up @@ -346,21 +357,40 @@ def get_token():
if admin_url:
from amp.admin.client import AdminClient

# Pass auth=True if we have a get_token callable from auth file
# Otherwise pass the static token if available
# Pass through auth parameters to AdminClient so it can set up its own auth
# (AdminClient needs to manage its own AuthService for token refresh)
if auth:
# Use auth file (auto-refreshing)
# Use auth file - AdminClient will set up AuthService for auto-refresh
self._admin_client = AdminClient(admin_url, auth=True)
elif auth_token or os.getenv('AMP_AUTH_TOKEN'):
# Use static token
static_token = auth_token or os.getenv('AMP_AUTH_TOKEN')
self._admin_client = AdminClient(admin_url, auth_token=static_token)
# Use static token (explicit param takes priority)
token = auth_token or os.getenv('AMP_AUTH_TOKEN')
self._admin_client = AdminClient(admin_url, auth_token=token)
else:
# No auth
# No authentication
self._admin_client = AdminClient(admin_url)
else:
self._admin_client = None

# Initialize optional Registry API client
if registry_url:
from amp.registry import RegistryClient

# Pass through auth parameters to RegistryClient so it can set up its own auth
# (RegistryClient needs to manage its own AuthService for token refresh)
if auth:
# Use auth file - RegistryClient will set up AuthService for auto-refresh
self._registry_client = RegistryClient(registry_url, auth=True)
elif auth_token or os.getenv('AMP_AUTH_TOKEN'):
# Use static token (explicit param takes priority)
token = auth_token or os.getenv('AMP_AUTH_TOKEN')
self._registry_client = RegistryClient(registry_url, auth_token=token)
else:
# No authentication
self._registry_client = RegistryClient(registry_url)
else:
self._registry_client = None

def sql(self, query: str) -> QueryBuilder:
"""
Create a chainable query builder
Expand Down Expand Up @@ -460,6 +490,35 @@ def schema(self):
)
return self._admin_client.schema

@property
def registry(self):
"""Access registry client for Registry API operations.

Returns:
RegistryClient for dataset discovery, search, and publishing

Raises:
ValueError: If registry_url was not provided during Client initialization

Example:
>>> client = Client(
... query_url='grpc://localhost:1602',
... registry_url='https://api.registry.amp.staging.thegraph.com'
... )
>>> # Search for datasets
>>> results = client.registry.datasets.search('ethereum blocks')
>>> # Get a specific dataset
>>> dataset = client.registry.datasets.get('graphops', 'ethereum-mainnet')
>>> # Fetch manifest
>>> manifest = client.registry.datasets.get_manifest('graphops', 'ethereum-mainnet', 'latest')
"""
if not self._registry_client:
raise ValueError(
'Registry API not configured. Provide registry_url parameter to Client() '
'to enable dataset discovery and search operations.'
)
return self._registry_client

# Existing methods for backward compatibility
def get_sql(self, query, read_all=False):
"""Execute SQL query and return Arrow data"""
Expand Down
27 changes: 27 additions & 0 deletions src/amp/registry/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""Amp Registry API client.

The Registry provides dataset discovery, search, and publishing capabilities.

Example:
>>> from amp.registry import RegistryClient
>>>
>>> # Read-only operations
>>> client = RegistryClient()
>>> results = client.datasets.search('ethereum blocks')
>>> for dataset in results.datasets:
... print(f"{dataset.namespace}/{dataset.name} - Score: {dataset.score}")
>>>
>>> # Get a specific dataset
>>> dataset = client.datasets.get('graphops', 'ethereum-mainnet')
>>> manifest = client.datasets.get_manifest('graphops', 'ethereum-mainnet', 'latest')
>>>
>>> # Authenticated operations
>>> client = RegistryClient(auth_token='your-token')
>>> client.datasets.publish(...)
"""

from . import errors, models
from .client import RegistryClient
from .datasets import RegistryDatasetsClient

__all__ = ['RegistryClient', 'RegistryDatasetsClient', 'errors', 'models']
184 changes: 184 additions & 0 deletions src/amp/registry/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
"""Registry API client."""

import logging
import os
from typing import Optional

import httpx

from . import errors

logger = logging.getLogger(__name__)


class RegistryClient:
"""Client for interacting with the Amp Registry API.

The Registry API provides dataset discovery, search, and publishing capabilities.

Args:
base_url: Base URL for the Registry API (default: staging registry)
auth_token: Optional Bearer token for authenticated operations (highest priority)
auth: If True, load auth token from ~/.amp/cache (shared with TS CLI)

Authentication Priority (highest to lowest):
1. Explicit auth_token parameter
2. AMP_AUTH_TOKEN environment variable
3. auth=True - reads from ~/.amp/cache/amp_cli_auth

Example:
>>> # Read-only operations (no auth required)
>>> client = RegistryClient()
>>> datasets = client.datasets.search('ethereum')
>>>
>>> # Authenticated operations with explicit token
>>> client = RegistryClient(auth_token='your-token')
>>> client.datasets.publish(...)
>>>
>>> # Authenticated operations with auth file (auto-refresh)
>>> client = RegistryClient(auth=True)
>>> client.datasets.publish(...)
"""

def __init__(
self,
base_url: str = 'https://api.registry.amp.staging.thegraph.com',
auth_token: Optional[str] = None,
auth: bool = False,
):
"""Initialize Registry client.

Args:
base_url: Base URL for the Registry API
auth_token: Optional Bearer token for authentication
auth: If True, load auth token from ~/.amp/cache

Raises:
ValueError: If both auth=True and auth_token are provided
"""
if auth and auth_token:
raise ValueError('Cannot specify both auth=True and auth_token. Choose one authentication method.')

self.base_url = base_url.rstrip('/')

# Resolve auth token provider with priority: explicit param > env var > auth file
self._get_token = None
if auth_token:
# Priority 1: Explicit auth_token parameter (static token)
def get_token():
return auth_token

self._get_token = get_token
elif os.getenv('AMP_AUTH_TOKEN'):
# Priority 2: AMP_AUTH_TOKEN environment variable (static token)
env_token = os.getenv('AMP_AUTH_TOKEN')

def get_token():
return env_token

self._get_token = get_token
elif auth:
# Priority 3: Load from ~/.amp/cache/amp_cli_auth (auto-refreshing)
from amp.auth import AuthService

auth_service = AuthService()
self._get_token = auth_service.get_token # Callable that auto-refreshes

# Create HTTP client (no auth header yet - will be added per-request)
self._http = httpx.Client(
base_url=self.base_url,
headers={
'Content-Type': 'application/json',
'Accept': 'application/json',
},
timeout=30.0,
)

logger.info(f'Initialized Registry client for {base_url}')

@property
def datasets(self):
"""Access the datasets client.

Returns:
RegistryDatasetsClient: Client for dataset operations
"""
from .datasets import RegistryDatasetsClient

return RegistryDatasetsClient(self)

def _request(
self,
method: str,
path: str,
**kwargs,
) -> httpx.Response:
"""Make an HTTP request to the Registry API.

Args:
method: HTTP method (GET, POST, etc.)
path: API path (without base URL)
**kwargs: Additional arguments to pass to httpx

Returns:
httpx.Response: HTTP response

Raises:
RegistryError: If the request fails
"""
url = path if path.startswith('http') else f'{self.base_url}{path}'

# Add auth header dynamically (auto-refreshes if needed)
headers = kwargs.get('headers', {})
if self._get_token:
headers['Authorization'] = f'Bearer {self._get_token()}'
kwargs['headers'] = headers

try:
response = self._http.request(method, url, **kwargs)

# Handle error responses
if response.status_code >= 400:
self._handle_error(response)

return response

except httpx.RequestError as e:
raise errors.RegistryError(f'Request failed: {e}') from e

def _handle_error(self, response: httpx.Response) -> None:
"""Handle error responses from the API.

Args:
response: HTTP error response

Raises:
RegistryError: Mapped exception for the error
"""
try:
error_data = response.json()
error_code = error_data.get('error_code', '')
error_message = error_data.get('error_message', response.text)
request_id = error_data.get('request_id', '')

# Map to specific exception
raise errors.map_error(error_code, error_message, request_id)

except (ValueError, KeyError):
# Couldn't parse error response, raise generic error
raise errors.RegistryError(
f'HTTP {response.status_code}: {response.text}',
error_code=str(response.status_code),
) from None

def close(self):
"""Close the HTTP client."""
self._http.close()

def __enter__(self):
"""Context manager entry."""
return self

def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.close()
Loading
Loading