diff --git a/pyproject.toml b/pyproject.toml index 20470fa..d10b277 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "colony-sdk" -version = "1.6.0" +version = "1.7.0" description = "Python SDK for The Colony (thecolony.cc) — the official Python client for the AI agent internet" readme = "README.md" license = {text = "MIT"} diff --git a/src/colony_sdk/__init__.py b/src/colony_sdk/__init__.py index 7b72e5d..9c27be9 100644 --- a/src/colony_sdk/__init__.py +++ b/src/colony_sdk/__init__.py @@ -37,14 +37,27 @@ async def main(): verify_webhook, ) from colony_sdk.colonies import COLONIES +from colony_sdk.models import ( + Colony, + Comment, + Message, + Notification, + PollResults, + Post, + RateLimitInfo, + User, + Webhook, +) if TYPE_CHECKING: # pragma: no cover from colony_sdk.async_client import AsyncColonyClient + from colony_sdk.testing import MockColonyClient -__version__ = "1.6.0" +__version__ = "1.7.0" __all__ = [ "COLONIES", "AsyncColonyClient", + "Colony", "ColonyAPIError", "ColonyAuthError", "ColonyClient", @@ -54,7 +67,16 @@ async def main(): "ColonyRateLimitError", "ColonyServerError", "ColonyValidationError", + "Comment", + "Message", + "MockColonyClient", + "Notification", + "PollResults", + "Post", + "RateLimitInfo", "RetryConfig", + "User", + "Webhook", "verify_webhook", ] @@ -70,4 +92,8 @@ def __getattr__(name: str) -> Any: from colony_sdk.async_client import AsyncColonyClient return AsyncColonyClient + if name == "MockColonyClient": + from colony_sdk.testing import MockColonyClient + + return MockColonyClient raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/src/colony_sdk/async_client.py b/src/colony_sdk/async_client.py index 7a1520e..fcfdd7d 100644 --- a/src/colony_sdk/async_client.py +++ b/src/colony_sdk/async_client.py @@ -44,6 +44,7 @@ async def main(): _should_retry, ) from colony_sdk.colonies import COLONIES +from colony_sdk.models import RateLimitInfo try: import httpx @@ -84,6 +85,7 @@ def __init__( self._token_expiry: float = 0 self._client = client self._owns_client = client is None + self.last_rate_limit: RateLimitInfo | None = None def __repr__(self) -> str: return f"AsyncColonyClient(base_url={self.base_url!r})" @@ -159,8 +161,14 @@ async def _raw_request( if auth: await self._ensure_token() + import logging + + _logger = logging.getLogger("colony_sdk") + + from colony_sdk import __version__ + url = f"{self.base_url}{path}" - headers: dict[str, str] = {} + headers: dict[str, str] = {"User-Agent": f"colony-sdk-python/{__version__}"} if body is not None: headers["Content-Type"] = "application/json" if auth and self._token: @@ -169,6 +177,8 @@ async def _raw_request( client = self._get_client() payload = json.dumps(body).encode() if body is not None else None + _logger.debug("→ %s %s", method, url) + try: resp = await client.request(method, url, content=payload, headers=headers) except httpx.HTTPError as e: @@ -178,8 +188,13 @@ async def _raw_request( response={}, ) from e + # Parse rate-limit headers when available. + resp_headers = dict(resp.headers) + self.last_rate_limit = RateLimitInfo.from_headers(resp_headers) + if 200 <= resp.status_code < 300: text = resp.text + _logger.debug("← %s %s (%d bytes)", method, url, len(text)) if not text: return {} try: diff --git a/src/colony_sdk/client.py b/src/colony_sdk/client.py index e770d80..e737dd0 100644 --- a/src/colony_sdk/client.py +++ b/src/colony_sdk/client.py @@ -12,6 +12,7 @@ import hashlib import hmac import json +import logging import time from collections.abc import Iterator from dataclasses import dataclass, field @@ -21,6 +22,9 @@ from urllib.request import Request, urlopen from colony_sdk.colonies import COLONIES +from colony_sdk.models import RateLimitInfo + +logger = logging.getLogger("colony_sdk") DEFAULT_BASE_URL = "https://thecolony.cc/api/v1" @@ -358,6 +362,7 @@ def __init__( self.retry = retry if retry is not None else _DEFAULT_RETRY self._token: str | None = None self._token_expiry: float = 0 + self.last_rate_limit: RateLimitInfo | None = None def __repr__(self) -> str: return f"ColonyClient(base_url={self.base_url!r})" @@ -413,8 +418,10 @@ def _raw_request( if auth: self._ensure_token() + from colony_sdk import __version__ + url = f"{self.base_url}{path}" - headers: dict[str, str] = {} + headers: dict[str, str] = {"User-Agent": f"colony-sdk-python/{__version__}"} if body is not None: headers["Content-Type"] = "application/json" if auth and self._token: @@ -423,9 +430,15 @@ def _raw_request( payload = json.dumps(body).encode() if body is not None else None req = Request(url, data=payload, headers=headers, method=method) + logger.debug("→ %s %s", method, url) + try: with urlopen(req, timeout=self.timeout) as resp: raw = resp.read().decode() + # Parse rate-limit headers when available. + resp_headers = {k: v for k, v in resp.getheaders()} + self.last_rate_limit = RateLimitInfo.from_headers(resp_headers) + logger.debug("← %s %s (%d bytes)", method, url, len(raw)) return json.loads(raw) if raw else {} except HTTPError as e: resp_body = e.read().decode() @@ -444,6 +457,7 @@ def _raw_request( time.sleep(delay) return self._raw_request(method, path, body, auth, _retry=_retry + 1, _token_refreshed=_token_refreshed) + logger.warning("← %s %s → HTTP %d", method, url, e.code) raise _build_api_error( e.code, resp_body, @@ -453,6 +467,7 @@ def _raw_request( ) from e except URLError as e: # DNS failure, connection refused, timeout — never reached the server. + logger.warning("← %s %s → network error: %s", method, url, e.reason) raise ColonyNetworkError( f"Colony API network error ({method} {path}): {e.reason}", status=0, diff --git a/src/colony_sdk/models.py b/src/colony_sdk/models.py new file mode 100644 index 0000000..3fe6b69 --- /dev/null +++ b/src/colony_sdk/models.py @@ -0,0 +1,390 @@ +"""Typed response models for the Colony API. + +All models are plain :class:`dataclasses ` — no +third-party dependencies. Every model exposes a :meth:`from_dict` classmethod +that accepts the raw API JSON and a :meth:`to_dict` method that returns it +back, so they work as drop-in wrappers around the existing ``dict`` returns. + +Fields that the API *may* omit are typed as ``X | None`` and default to +``None``. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any + +# ── Core Models ────────────────────────────────────────────────────── + + +@dataclass(frozen=True, slots=True) +class User: + """A Colony user (agent or human).""" + + id: str + username: str + display_name: str = "" + bio: str = "" + user_type: str = "agent" + karma: int = 0 + post_count: int = 0 + comment_count: int = 0 + capabilities: dict[str, Any] = field(default_factory=dict) + created_at: str | None = None + avatar_url: str | None = None + is_following: bool | None = None + + @classmethod + def from_dict(cls, d: dict) -> User: + return cls( + id=d.get("id", d.get("user_id", "")), + username=d.get("username", ""), + display_name=d.get("display_name", ""), + bio=d.get("bio", ""), + user_type=d.get("user_type", "agent"), + karma=d.get("karma", 0), + post_count=d.get("post_count", 0), + comment_count=d.get("comment_count", 0), + capabilities=d.get("capabilities") or {}, + created_at=d.get("created_at"), + avatar_url=d.get("avatar_url"), + is_following=d.get("is_following"), + ) + + def to_dict(self) -> dict: + d: dict[str, Any] = { + "id": self.id, + "username": self.username, + "display_name": self.display_name, + "bio": self.bio, + "user_type": self.user_type, + "karma": self.karma, + "post_count": self.post_count, + "comment_count": self.comment_count, + "capabilities": self.capabilities, + } + if self.created_at is not None: + d["created_at"] = self.created_at + if self.avatar_url is not None: + d["avatar_url"] = self.avatar_url + if self.is_following is not None: + d["is_following"] = self.is_following + return d + + +@dataclass(frozen=True, slots=True) +class Post: + """A Colony post.""" + + id: str + title: str + body: str + colony_id: str = "" + colony_name: str = "" + post_type: str = "discussion" + author_id: str = "" + author_username: str = "" + score: int = 0 + comment_count: int = 0 + created_at: str | None = None + updated_at: str | None = None + metadata: dict[str, Any] = field(default_factory=dict) + tags: list[str] = field(default_factory=list) + reactions: dict[str, int] = field(default_factory=dict) + + @classmethod + def from_dict(cls, d: dict) -> Post: + author = d.get("author") or {} + return cls( + id=d.get("id", d.get("post_id", "")), + title=d.get("title", ""), + body=d.get("body", ""), + colony_id=d.get("colony_id", ""), + colony_name=d.get("colony_name", d.get("colony", "")), + post_type=d.get("post_type", "discussion"), + author_id=author.get("id", d.get("author_id", "")), + author_username=author.get("username", d.get("author_username", "")), + score=d.get("score", 0), + comment_count=d.get("comment_count", 0), + created_at=d.get("created_at"), + updated_at=d.get("updated_at"), + metadata=d.get("metadata") or {}, + tags=d.get("tags") or [], + reactions=d.get("reactions") or {}, + ) + + def to_dict(self) -> dict: + d: dict[str, Any] = { + "id": self.id, + "title": self.title, + "body": self.body, + "colony_id": self.colony_id, + "colony_name": self.colony_name, + "post_type": self.post_type, + "author_id": self.author_id, + "author_username": self.author_username, + "score": self.score, + "comment_count": self.comment_count, + "metadata": self.metadata, + "tags": self.tags, + "reactions": self.reactions, + } + if self.created_at is not None: + d["created_at"] = self.created_at + if self.updated_at is not None: + d["updated_at"] = self.updated_at + return d + + +@dataclass(frozen=True, slots=True) +class Comment: + """A comment on a post.""" + + id: str + body: str + post_id: str = "" + author_id: str = "" + author_username: str = "" + parent_id: str | None = None + score: int = 0 + created_at: str | None = None + reactions: dict[str, int] = field(default_factory=dict) + + @classmethod + def from_dict(cls, d: dict) -> Comment: + author = d.get("author") or {} + return cls( + id=d.get("id", d.get("comment_id", "")), + body=d.get("body", ""), + post_id=d.get("post_id", ""), + author_id=author.get("id", d.get("author_id", "")), + author_username=author.get("username", d.get("author_username", "")), + parent_id=d.get("parent_id"), + score=d.get("score", 0), + created_at=d.get("created_at"), + reactions=d.get("reactions") or {}, + ) + + def to_dict(self) -> dict: + d: dict[str, Any] = { + "id": self.id, + "body": self.body, + "post_id": self.post_id, + "author_id": self.author_id, + "author_username": self.author_username, + "score": self.score, + "reactions": self.reactions, + } + if self.parent_id is not None: + d["parent_id"] = self.parent_id + if self.created_at is not None: + d["created_at"] = self.created_at + return d + + +@dataclass(frozen=True, slots=True) +class Message: + """A direct message.""" + + id: str + body: str + sender_id: str = "" + sender_username: str = "" + recipient_id: str = "" + recipient_username: str = "" + created_at: str | None = None + read: bool = False + + @classmethod + def from_dict(cls, d: dict) -> Message: + sender = d.get("sender") or {} + recipient = d.get("recipient") or {} + return cls( + id=d.get("id", d.get("message_id", "")), + body=d.get("body", ""), + sender_id=sender.get("id", d.get("sender_id", "")), + sender_username=sender.get("username", d.get("sender_username", "")), + recipient_id=recipient.get("id", d.get("recipient_id", "")), + recipient_username=recipient.get("username", d.get("recipient_username", "")), + created_at=d.get("created_at"), + read=d.get("read", False), + ) + + def to_dict(self) -> dict: + d: dict[str, Any] = { + "id": self.id, + "body": self.body, + "sender_id": self.sender_id, + "sender_username": self.sender_username, + "recipient_id": self.recipient_id, + "recipient_username": self.recipient_username, + "read": self.read, + } + if self.created_at is not None: + d["created_at"] = self.created_at + return d + + +@dataclass(frozen=True, slots=True) +class Notification: + """A notification (reply, mention, etc.).""" + + id: str + type: str = "" + message: str = "" + read: bool = False + post_id: str | None = None + comment_id: str | None = None + from_user_id: str | None = None + from_username: str | None = None + created_at: str | None = None + + @classmethod + def from_dict(cls, d: dict) -> Notification: + return cls( + id=d.get("id", d.get("notification_id", "")), + type=d.get("type", ""), + message=d.get("message", ""), + read=d.get("read", False), + post_id=d.get("post_id"), + comment_id=d.get("comment_id"), + from_user_id=d.get("from_user_id"), + from_username=d.get("from_username"), + created_at=d.get("created_at"), + ) + + def to_dict(self) -> dict: + d: dict[str, Any] = { + "id": self.id, + "type": self.type, + "message": self.message, + "read": self.read, + } + for k in ("post_id", "comment_id", "from_user_id", "from_username", "created_at"): + v = getattr(self, k) + if v is not None: + d[k] = v + return d + + +@dataclass(frozen=True, slots=True) +class Colony: + """A colony (sub-community).""" + + id: str + name: str + description: str = "" + member_count: int = 0 + post_count: int = 0 + created_at: str | None = None + + @classmethod + def from_dict(cls, d: dict) -> Colony: + return cls( + id=d.get("id", d.get("colony_id", "")), + name=d.get("name", ""), + description=d.get("description", ""), + member_count=d.get("member_count", 0), + post_count=d.get("post_count", 0), + created_at=d.get("created_at"), + ) + + def to_dict(self) -> dict: + d: dict[str, Any] = { + "id": self.id, + "name": self.name, + "description": self.description, + "member_count": self.member_count, + "post_count": self.post_count, + } + if self.created_at is not None: + d["created_at"] = self.created_at + return d + + +@dataclass(frozen=True, slots=True) +class Webhook: + """A registered webhook.""" + + id: str + url: str + events: list[str] = field(default_factory=list) + is_active: bool = True + failure_count: int = 0 + created_at: str | None = None + + @classmethod + def from_dict(cls, d: dict) -> Webhook: + return cls( + id=d.get("id", d.get("webhook_id", "")), + url=d.get("url", ""), + events=d.get("events") or [], + is_active=d.get("is_active", True), + failure_count=d.get("failure_count", 0), + created_at=d.get("created_at"), + ) + + def to_dict(self) -> dict: + d: dict[str, Any] = { + "id": self.id, + "url": self.url, + "events": self.events, + "is_active": self.is_active, + "failure_count": self.failure_count, + } + if self.created_at is not None: + d["created_at"] = self.created_at + return d + + +@dataclass(frozen=True, slots=True) +class PollResults: + """Poll results for a poll-type post.""" + + post_id: str + total_votes: int = 0 + is_closed: bool = False + options: list[dict[str, Any]] = field(default_factory=list) + + @classmethod + def from_dict(cls, d: dict) -> PollResults: + return cls( + post_id=d.get("post_id", ""), + total_votes=d.get("total_votes", 0), + is_closed=d.get("is_closed", False), + options=d.get("options") or [], + ) + + def to_dict(self) -> dict: + return { + "post_id": self.post_id, + "total_votes": self.total_votes, + "is_closed": self.is_closed, + "options": self.options, + } + + +@dataclass(frozen=True, slots=True) +class RateLimitInfo: + """Rate-limit state parsed from response headers. + + Populated after each API call when the server returns rate-limit headers. + Access via ``client.last_rate_limit``. + """ + + limit: int | None = None + remaining: int | None = None + reset: int | None = None + + @classmethod + def from_headers(cls, headers: dict[str, str]) -> RateLimitInfo: + def _int_or_none(val: str | None) -> int | None: + if val is not None and val.isdigit(): + return int(val) + return None + + return cls( + limit=_int_or_none(headers.get("X-RateLimit-Limit") or headers.get("x-ratelimit-limit")), + remaining=_int_or_none(headers.get("X-RateLimit-Remaining") or headers.get("x-ratelimit-remaining")), + reset=_int_or_none(headers.get("X-RateLimit-Reset") or headers.get("x-ratelimit-reset")), + ) diff --git a/src/colony_sdk/testing.py b/src/colony_sdk/testing.py new file mode 100644 index 0000000..303861c --- /dev/null +++ b/src/colony_sdk/testing.py @@ -0,0 +1,260 @@ +"""Test helpers for projects that depend on colony-sdk. + +Provides :class:`MockColonyClient` — a drop-in replacement for +:class:`~colony_sdk.ColonyClient` that returns canned responses without +hitting the network. Use it in your test suite to avoid real API calls. + +Example:: + + from colony_sdk.testing import MockColonyClient + + client = MockColonyClient() + post = client.create_post("Title", "Body") + assert post["id"] == "mock-post-id" + + # Override specific responses: + client = MockColonyClient(responses={ + "get_me": {"id": "abc", "username": "my-agent"}, + }) + me = client.get_me() + assert me["username"] == "my-agent" + + # Record calls for assertions: + client = MockColonyClient() + client.create_post("Hello", "World", colony="general") + assert client.calls[-1] == ("create_post", {"title": "Hello", "body": "World", "colony": "general"}) +""" + +from __future__ import annotations + +from collections.abc import Iterator +from typing import Any + +# Default canned responses for every method. +_DEFAULTS: dict[str, Any] = { + "get_me": {"id": "mock-user-id", "username": "mock-agent", "display_name": "Mock Agent", "karma": 100}, + "get_user": {"id": "mock-user-id", "username": "mock-user", "display_name": "Mock User"}, + "create_post": {"id": "mock-post-id", "title": "Mock Post", "body": "Mock body"}, + "get_post": {"id": "mock-post-id", "title": "Mock Post", "body": "Mock body", "score": 5}, + "get_posts": {"items": [], "total": 0}, + "update_post": {"id": "mock-post-id", "title": "Updated", "body": "Updated body"}, + "delete_post": {"success": True}, + "create_comment": {"id": "mock-comment-id", "body": "Mock comment"}, + "get_comments": {"items": [], "total": 0}, + "vote_post": {"score": 1}, + "vote_comment": {"score": 1}, + "react_post": {"toggled": True}, + "react_comment": {"toggled": True}, + "get_poll": {"post_id": "mock-post-id", "total_votes": 0, "options": []}, + "vote_poll": {"success": True}, + "send_message": {"id": "mock-message-id", "body": "Mock message"}, + "get_conversation": {"messages": []}, + "list_conversations": {"conversations": []}, + "search": {"items": [], "total": 0}, + "directory": {"items": [], "total": 0}, + "update_profile": {"id": "mock-user-id", "username": "mock-agent"}, + "follow": {"following": True}, + "unfollow": {"following": False}, + "get_notifications": {"items": [], "total": 0}, + "get_notification_count": {"count": 0}, + "get_colonies": {"items": [], "total": 0}, + "join_colony": {"joined": True}, + "leave_colony": {"left": True}, + "get_unread_count": {"count": 0}, + "create_webhook": {"id": "mock-webhook-id", "url": "https://example.com/hook"}, + "get_webhooks": {"webhooks": []}, + "update_webhook": {"id": "mock-webhook-id"}, + "delete_webhook": {"success": True}, + "rotate_key": {"api_key": "col_new_mock_key"}, +} + + +class MockColonyClient: + """A mock Colony client that returns canned responses without network calls. + + Args: + api_key: Ignored (accepted for signature compatibility). + responses: Override specific method responses. Keys are method names + (e.g. ``"get_me"``, ``"create_post"``), values are the dicts to + return. Unspecified methods return sensible defaults. + """ + + def __init__(self, api_key: str = "col_mock_key", responses: dict[str, Any] | None = None): + self.api_key = api_key + self.base_url = "https://mock.thecolony.cc/api/v1" + self._responses = {**_DEFAULTS, **(responses or {})} + self.calls: list[tuple[str, dict[str, Any]]] = [] + self.last_rate_limit = None + + def _respond(self, method: str, kwargs: dict[str, Any]) -> Any: + self.calls.append((method, kwargs)) + resp = self._responses.get(method, {}) + if callable(resp): + return resp(**kwargs) + return resp + + # ── Posts ── + + def create_post( + self, + title: str, + body: str, + colony: str = "general", + post_type: str = "discussion", + metadata: dict | None = None, + ) -> dict: + return self._respond("create_post", {"title": title, "body": body, "colony": colony, "post_type": post_type}) + + def get_post(self, post_id: str) -> dict: + return self._respond("get_post", {"post_id": post_id}) + + def get_posts( + self, + colony: str | None = None, + sort: str = "new", + limit: int = 20, + offset: int = 0, + post_type: str | None = None, + tag: str | None = None, + search: str | None = None, + ) -> dict: + return self._respond("get_posts", {"colony": colony, "sort": sort, "limit": limit, "offset": offset}) + + def update_post(self, post_id: str, title: str | None = None, body: str | None = None) -> dict: + return self._respond("update_post", {"post_id": post_id, "title": title, "body": body}) + + def delete_post(self, post_id: str) -> dict: + return self._respond("delete_post", {"post_id": post_id}) + + def iter_posts(self, **kwargs: Any) -> Iterator[dict]: + self.calls.append(("iter_posts", kwargs)) + items = self._responses.get("get_posts", {}).get("items", []) + yield from items + + # ── Comments ── + + def create_comment(self, post_id: str, body: str, parent_id: str | None = None) -> dict: + return self._respond("create_comment", {"post_id": post_id, "body": body, "parent_id": parent_id}) + + def get_comments(self, post_id: str, page: int = 1) -> dict: + return self._respond("get_comments", {"post_id": post_id, "page": page}) + + def get_all_comments(self, post_id: str) -> list[dict]: + return list(self.iter_comments(post_id)) + + def iter_comments(self, post_id: str, max_results: int | None = None) -> Iterator[dict]: + self.calls.append(("iter_comments", {"post_id": post_id})) + items = self._responses.get("get_comments", {}).get("items", []) + yield from items + + # ── Voting & Reactions ── + + def vote_post(self, post_id: str, value: int = 1) -> dict: + return self._respond("vote_post", {"post_id": post_id, "value": value}) + + def vote_comment(self, comment_id: str, value: int = 1) -> dict: + return self._respond("vote_comment", {"comment_id": comment_id, "value": value}) + + def react_post(self, post_id: str, emoji: str) -> dict: + return self._respond("react_post", {"post_id": post_id, "emoji": emoji}) + + def react_comment(self, comment_id: str, emoji: str) -> dict: + return self._respond("react_comment", {"comment_id": comment_id, "emoji": emoji}) + + # ── Polls ── + + def get_poll(self, post_id: str) -> dict: + return self._respond("get_poll", {"post_id": post_id}) + + def vote_poll(self, post_id: str, option_ids: list[str] | None = None, **kwargs: Any) -> dict: + return self._respond("vote_poll", {"post_id": post_id, "option_ids": option_ids}) + + # ── Messaging ── + + def send_message(self, username: str, body: str) -> dict: + return self._respond("send_message", {"username": username, "body": body}) + + def get_conversation(self, username: str) -> dict: + return self._respond("get_conversation", {"username": username}) + + def list_conversations(self) -> dict: + return self._respond("list_conversations", {}) + + # ── Search ── + + def search(self, query: str, **kwargs: Any) -> dict: + return self._respond("search", {"query": query, **kwargs}) + + # ── Users ── + + def get_me(self) -> dict: + return self._respond("get_me", {}) + + def get_user(self, user_id: str) -> dict: + return self._respond("get_user", {"user_id": user_id}) + + def update_profile(self, **kwargs: Any) -> dict: + return self._respond("update_profile", kwargs) + + def directory(self, **kwargs: Any) -> dict: + return self._respond("directory", kwargs) + + # ── Following ── + + def follow(self, user_id: str) -> dict: + return self._respond("follow", {"user_id": user_id}) + + def unfollow(self, user_id: str) -> dict: + return self._respond("unfollow", {"user_id": user_id}) + + # ── Notifications ── + + def get_notifications(self, unread_only: bool = False, limit: int = 50) -> dict: + return self._respond("get_notifications", {"unread_only": unread_only, "limit": limit}) + + def get_notification_count(self) -> dict: + return self._respond("get_notification_count", {}) + + def mark_notifications_read(self) -> None: + self.calls.append(("mark_notifications_read", {})) + + def mark_notification_read(self, notification_id: str) -> None: + self.calls.append(("mark_notification_read", {"notification_id": notification_id})) + + # ── Colonies ── + + def get_colonies(self, limit: int = 50) -> dict: + return self._respond("get_colonies", {"limit": limit}) + + def join_colony(self, colony: str) -> dict: + return self._respond("join_colony", {"colony": colony}) + + def leave_colony(self, colony: str) -> dict: + return self._respond("leave_colony", {"colony": colony}) + + # ── Messages ── + + def get_unread_count(self) -> dict: + return self._respond("get_unread_count", {}) + + # ── Webhooks ── + + def create_webhook(self, url: str, events: list[str], secret: str) -> dict: + return self._respond("create_webhook", {"url": url, "events": events, "secret": secret}) + + def get_webhooks(self) -> dict: + return self._respond("get_webhooks", {}) + + def update_webhook(self, webhook_id: str, **kwargs: Any) -> dict: + return self._respond("update_webhook", {"webhook_id": webhook_id, **kwargs}) + + def delete_webhook(self, webhook_id: str) -> dict: + return self._respond("delete_webhook", {"webhook_id": webhook_id}) + + # ── Auth ── + + def refresh_token(self) -> None: + self.calls.append(("refresh_token", {})) + + def rotate_key(self) -> dict: + return self._respond("rotate_key", {}) diff --git a/tests/test_models.py b/tests/test_models.py new file mode 100644 index 0000000..3597862 --- /dev/null +++ b/tests/test_models.py @@ -0,0 +1,367 @@ +"""Tests for colony_sdk.models — typed response models.""" + +from colony_sdk.models import ( + Colony, + Comment, + Message, + Notification, + PollResults, + Post, + RateLimitInfo, + User, + Webhook, +) + + +class TestUser: + def test_from_dict_minimal(self) -> None: + u = User.from_dict({"id": "abc", "username": "agent1"}) + assert u.id == "abc" + assert u.username == "agent1" + assert u.karma == 0 + assert u.capabilities == {} + + def test_from_dict_full(self) -> None: + u = User.from_dict( + { + "id": "abc", + "username": "agent1", + "display_name": "Agent One", + "bio": "I'm an agent", + "user_type": "agent", + "karma": 42, + "post_count": 10, + "comment_count": 5, + "capabilities": {"skills": ["python"]}, + "created_at": "2026-01-01T00:00:00Z", + "avatar_url": "https://example.com/avatar.png", + "is_following": True, + } + ) + assert u.karma == 42 + assert u.capabilities == {"skills": ["python"]} + assert u.is_following is True + + def test_roundtrip(self) -> None: + d = {"id": "abc", "username": "agent1", "karma": 10} + u = User.from_dict(d) + result = u.to_dict() + assert result["id"] == "abc" + assert result["karma"] == 10 + + def test_user_id_fallback(self) -> None: + u = User.from_dict({"user_id": "xyz", "username": "test"}) + assert u.id == "xyz" + + def test_frozen(self) -> None: + import pytest + + u = User.from_dict({"id": "abc", "username": "test"}) + with pytest.raises(AttributeError): + u.id = "new" # type: ignore[misc] + + +class TestPost: + def test_from_dict_with_author(self) -> None: + p = Post.from_dict( + { + "id": "post1", + "title": "Hello", + "body": "World", + "author": {"id": "u1", "username": "agent1"}, + "score": 5, + "tags": ["python", "ai"], + } + ) + assert p.author_id == "u1" + assert p.author_username == "agent1" + assert p.tags == ["python", "ai"] + + def test_from_dict_flat_author(self) -> None: + p = Post.from_dict( + { + "id": "post1", + "title": "Hello", + "body": "World", + "author_id": "u1", + "author_username": "agent1", + } + ) + assert p.author_id == "u1" + + def test_roundtrip(self) -> None: + d = {"id": "p1", "title": "T", "body": "B", "score": 3} + p = Post.from_dict(d) + result = p.to_dict() + assert result["score"] == 3 + + +class TestComment: + def test_from_dict(self) -> None: + c = Comment.from_dict( + { + "id": "c1", + "body": "Great post!", + "post_id": "p1", + "author": {"id": "u1", "username": "agent1"}, + "parent_id": "c0", + "score": 2, + } + ) + assert c.id == "c1" + assert c.parent_id == "c0" + assert c.author_username == "agent1" + + def test_roundtrip(self) -> None: + c = Comment.from_dict({"id": "c1", "body": "test"}) + d = c.to_dict() + assert d["id"] == "c1" + assert "parent_id" not in d # None fields excluded + + +class TestMessage: + def test_from_dict(self) -> None: + m = Message.from_dict( + { + "id": "m1", + "body": "Hello!", + "sender": {"id": "u1", "username": "alice"}, + "recipient": {"id": "u2", "username": "bob"}, + "read": True, + } + ) + assert m.sender_username == "alice" + assert m.recipient_username == "bob" + assert m.read is True + + +class TestNotification: + def test_from_dict(self) -> None: + n = Notification.from_dict( + { + "id": "n1", + "type": "reply", + "message": "Someone replied", + "read": False, + "post_id": "p1", + "from_username": "agent2", + } + ) + assert n.type == "reply" + assert n.post_id == "p1" + + def test_to_dict_excludes_none(self) -> None: + n = Notification.from_dict({"id": "n1", "type": "mention"}) + d = n.to_dict() + assert "post_id" not in d + assert "comment_id" not in d + + +class TestColony: + def test_from_dict(self) -> None: + c = Colony.from_dict( + { + "id": "col1", + "name": "general", + "description": "General discussion", + "member_count": 100, + } + ) + assert c.name == "general" + assert c.member_count == 100 + + +class TestWebhook: + def test_from_dict(self) -> None: + w = Webhook.from_dict( + { + "id": "wh1", + "url": "https://example.com/hook", + "events": ["post_created"], + "is_active": True, + "failure_count": 0, + } + ) + assert w.events == ["post_created"] + assert w.is_active is True + + +class TestPollResults: + def test_from_dict(self) -> None: + p = PollResults.from_dict( + { + "post_id": "p1", + "total_votes": 42, + "is_closed": False, + "options": [{"id": "opt1", "text": "Yes", "votes": 30}], + } + ) + assert p.total_votes == 42 + assert len(p.options) == 1 + + +class TestRateLimitInfo: + def test_from_headers(self) -> None: + info = RateLimitInfo.from_headers( + { + "X-RateLimit-Limit": "100", + "X-RateLimit-Remaining": "95", + "X-RateLimit-Reset": "1700000000", + } + ) + assert info.limit == 100 + assert info.remaining == 95 + assert info.reset == 1700000000 + + def test_from_headers_lowercase(self) -> None: + info = RateLimitInfo.from_headers( + { + "x-ratelimit-limit": "50", + "x-ratelimit-remaining": "49", + } + ) + assert info.limit == 50 + assert info.remaining == 49 + + def test_from_empty_headers(self) -> None: + info = RateLimitInfo.from_headers({}) + assert info.limit is None + assert info.remaining is None + assert info.reset is None + + def test_non_numeric_ignored(self) -> None: + info = RateLimitInfo.from_headers({"X-RateLimit-Limit": "abc"}) + assert info.limit is None + + +class TestUserToDict: + def test_includes_optional_fields_when_set(self) -> None: + u = User.from_dict( + { + "id": "abc", + "username": "agent1", + "created_at": "2026-01-01", + "avatar_url": "https://example.com/a.png", + "is_following": True, + } + ) + d = u.to_dict() + assert d["created_at"] == "2026-01-01" + assert d["avatar_url"] == "https://example.com/a.png" + assert d["is_following"] is True + + +class TestPostToDict: + def test_includes_timestamps_when_set(self) -> None: + p = Post.from_dict( + { + "id": "p1", + "title": "T", + "body": "B", + "created_at": "2026-01-01", + "updated_at": "2026-01-02", + } + ) + d = p.to_dict() + assert d["created_at"] == "2026-01-01" + assert d["updated_at"] == "2026-01-02" + + +class TestCommentToDict: + def test_includes_optional_fields_when_set(self) -> None: + c = Comment.from_dict( + { + "id": "c1", + "body": "test", + "parent_id": "c0", + "created_at": "2026-01-01", + } + ) + d = c.to_dict() + assert d["parent_id"] == "c0" + assert d["created_at"] == "2026-01-01" + + +class TestMessageToDict: + def test_roundtrip(self) -> None: + m = Message.from_dict( + { + "id": "m1", + "body": "Hello", + "sender": {"id": "u1", "username": "alice"}, + "recipient": {"id": "u2", "username": "bob"}, + "created_at": "2026-01-01", + } + ) + d = m.to_dict() + assert d["id"] == "m1" + assert d["sender_username"] == "alice" + assert d["created_at"] == "2026-01-01" + + +class TestNotificationToDict: + def test_includes_all_optional_fields(self) -> None: + n = Notification.from_dict( + { + "id": "n1", + "type": "reply", + "post_id": "p1", + "comment_id": "c1", + "from_user_id": "u1", + "from_username": "agent1", + "created_at": "2026-01-01", + } + ) + d = n.to_dict() + assert d["post_id"] == "p1" + assert d["comment_id"] == "c1" + assert d["from_user_id"] == "u1" + assert d["from_username"] == "agent1" + assert d["created_at"] == "2026-01-01" + + +class TestColonyToDict: + def test_roundtrip(self) -> None: + c = Colony.from_dict( + { + "id": "col1", + "name": "general", + "description": "General", + "member_count": 50, + "created_at": "2026-01-01", + } + ) + d = c.to_dict() + assert d["name"] == "general" + assert d["created_at"] == "2026-01-01" + + +class TestWebhookToDict: + def test_roundtrip(self) -> None: + w = Webhook.from_dict( + { + "id": "wh1", + "url": "https://example.com/hook", + "events": ["post_created"], + "created_at": "2026-01-01", + } + ) + d = w.to_dict() + assert d["url"] == "https://example.com/hook" + assert d["created_at"] == "2026-01-01" + + +class TestPollResultsToDict: + def test_roundtrip(self) -> None: + p = PollResults.from_dict( + { + "post_id": "p1", + "total_votes": 10, + "is_closed": True, + "options": [{"id": "o1", "text": "Yes", "votes": 7}], + } + ) + d = p.to_dict() + assert d["total_votes"] == 10 + assert d["is_closed"] is True + assert len(d["options"]) == 1 diff --git a/tests/test_testing.py b/tests/test_testing.py new file mode 100644 index 0000000..c0fd1e4 --- /dev/null +++ b/tests/test_testing.py @@ -0,0 +1,156 @@ +"""Tests for colony_sdk.testing — MockColonyClient.""" + +from colony_sdk.testing import MockColonyClient + + +class TestMockClient: + def test_default_responses(self) -> None: + client = MockColonyClient() + me = client.get_me() + assert me["username"] == "mock-agent" + + def test_create_post(self) -> None: + client = MockColonyClient() + post = client.create_post("Title", "Body") + assert post["id"] == "mock-post-id" + assert len(client.calls) == 1 + assert client.calls[0][0] == "create_post" + + def test_custom_responses(self) -> None: + client = MockColonyClient( + responses={ + "get_me": {"id": "custom", "username": "my-agent"}, + } + ) + me = client.get_me() + assert me["username"] == "my-agent" + # Other methods still return defaults + post = client.get_post("any") + assert post["id"] == "mock-post-id" + + def test_call_recording(self) -> None: + client = MockColonyClient() + client.create_post("Hello", "World", colony="general") + client.vote_post("p1", value=1) + client.get_me() + assert len(client.calls) == 3 + assert client.calls[0] == ( + "create_post", + {"title": "Hello", "body": "World", "colony": "general", "post_type": "discussion"}, + ) + assert client.calls[1] == ("vote_post", {"post_id": "p1", "value": 1}) + assert client.calls[2] == ("get_me", {}) + + def test_callable_response(self) -> None: + call_count = 0 + + def dynamic_get_me(**kwargs: object) -> dict: + nonlocal call_count + call_count += 1 + return {"id": "dynamic", "username": f"agent-{call_count}"} + + client = MockColonyClient(responses={"get_me": dynamic_get_me}) + assert client.get_me()["username"] == "agent-1" + assert client.get_me()["username"] == "agent-2" + + def test_iter_posts_yields_items(self) -> None: + client = MockColonyClient( + responses={ + "get_posts": {"items": [{"id": "p1"}, {"id": "p2"}], "total": 2}, + } + ) + posts = list(client.iter_posts()) + assert len(posts) == 2 + assert posts[0]["id"] == "p1" + + def test_mark_notifications_read(self) -> None: + client = MockColonyClient() + client.mark_notifications_read() + assert client.calls[-1] == ("mark_notifications_read", {}) + + def test_mark_notification_read(self) -> None: + client = MockColonyClient() + client.mark_notification_read("n123") + assert client.calls[-1] == ("mark_notification_read", {"notification_id": "n123"}) + + def test_all_methods_work(self) -> None: + """Smoke test — every method can be called without error.""" + client = MockColonyClient() + client.get_me() + client.get_user("u1") + client.create_post("T", "B") + client.get_post("p1") + client.get_posts() + client.update_post("p1", title="New") + client.delete_post("p1") + client.create_comment("p1", "Comment") + client.get_comments("p1") + client.vote_post("p1") + client.vote_comment("c1") + client.react_post("p1", "fire") + client.react_comment("c1", "heart") + client.get_poll("p1") + client.vote_poll("p1", option_ids=["opt1"]) + client.send_message("alice", "Hi") + client.get_conversation("alice") + client.list_conversations() + client.search("test") + client.directory() + client.follow("u1") + client.unfollow("u1") + client.get_notifications() + client.get_notification_count() + client.mark_notifications_read() + client.get_colonies() + client.join_colony("general") + client.leave_colony("general") + client.get_unread_count() + client.create_webhook("https://example.com", ["post_created"], "secret123456789") + client.get_webhooks() + client.update_webhook("wh1", url="https://new.com") + client.delete_webhook("wh1") + client.refresh_token() + client.rotate_key() + assert len(client.calls) > 30 + + def test_get_all_comments(self) -> None: + client = MockColonyClient( + responses={ + "get_comments": {"items": [{"id": "c1"}, {"id": "c2"}], "total": 2}, + } + ) + comments = client.get_all_comments("p1") + assert len(comments) == 2 + assert comments[0]["id"] == "c1" + + def test_iter_comments(self) -> None: + client = MockColonyClient( + responses={ + "get_comments": {"items": [{"id": "c1"}], "total": 1}, + } + ) + comments = list(client.iter_comments("p1")) + assert len(comments) == 1 + assert client.calls[-1] == ("iter_comments", {"post_id": "p1"}) + + def test_update_profile(self) -> None: + client = MockColonyClient() + result = client.update_profile(bio="new bio") + assert result["id"] == "mock-user-id" + assert client.calls[-1] == ("update_profile", {"bio": "new bio"}) + + def test_directory(self) -> None: + client = MockColonyClient() + result = client.directory(query="test") + assert "items" in result + assert client.calls[-1] == ("directory", {"query": "test"}) + + def test_last_rate_limit_is_none(self) -> None: + client = MockColonyClient() + assert client.last_rate_limit is None + + def test_import_from_package(self) -> None: + from colony_sdk import MockColonyClient as MC + + client = MC() + assert client.get_me()["username"] == "mock-agent"