diff --git a/docs/pubsub-backends.md b/docs/pubsub-backends.md new file mode 100644 index 0000000..cffa7cd --- /dev/null +++ b/docs/pubsub-backends.md @@ -0,0 +1,222 @@ +# Custom Pub/Sub Backends + +PyView's pub/sub system is pluggable, allowing you to use distributed backends like Redis for multi-machine deployments. + +## Overview + +By default, PyView uses an in-memory pub/sub implementation suitable for single-instance deployments. For horizontally scaled applications, you'll need a distributed backend. + +```python +# Default (in-memory) +app = PyView() + +# Custom backend (Redis, PostgreSQL, etc.) +app = PyView(pubsub=RedisPubSub("redis://localhost:6379")) +``` + +## The PubSubProvider Protocol + +Any class implementing these methods is compatible: + +```python +from typing import Any, Callable, Coroutine + +TopicHandler = Callable[[str, Any], Coroutine[Any, Any, None]] + +class PubSubProvider: + async def subscribe_topic(self, session_id: str, topic: str, handler: TopicHandler) -> None: + """Subscribe a session to a topic.""" + ... + + async def unsubscribe_topic(self, session_id: str, topic: str) -> None: + """Unsubscribe a session from a topic.""" + ... + + async def unsubscribe_all(self, session_id: str) -> None: + """Remove all subscriptions for a session.""" + ... + + async def broadcast(self, topic: str, message: Any) -> None: + """Broadcast a message to all subscribers on a topic.""" + ... + + async def start(self) -> None: + """Called when the app starts.""" + ... + + async def stop(self) -> None: + """Called when the app shuts down.""" + ... +``` + +## Quick Start with Redis + +### 1. Install Redis Client + +```bash +pip install redis +``` + +### 2. Configure Your App + +```python +from pyview import PyView +from redis_pubsub import RedisPubSub # See examples below + +app = PyView( + pubsub=RedisPubSub( + url="redis://localhost:6379", + channel_prefix="myapp:" + ) +) +``` + +### 3. Your LiveViews Work Exactly the Same + +```python +@app.add_live_view("/counter") +class CounterLiveView(LiveView): + async def mount(self, socket, session): + socket.context = {"count": 0} + if is_connected(socket): + await socket.subscribe("counter") + + async def handle_event(self, event, payload, socket): + if event == "increment": + socket.context["count"] += 1 + # This now broadcasts via Redis to ALL instances + await socket.broadcast("counter", socket.context["count"]) + + async def handle_info(self, event, socket): + socket.context["count"] = event.payload +``` + +### 4. Run Multiple Instances + +```bash +# Terminal 1 +uvicorn app:app --port 8000 + +# Terminal 2 +uvicorn app:app --port 8001 + +# Terminal 3 +uvicorn app:app --port 8002 +``` + +With a load balancer in front, users on different instances will see real-time updates from each other. + +## Complete Examples + +Full working implementations are available in [`examples/custom_pubsub/`](../../examples/custom_pubsub/): + +- **[`redis_pubsub.py`](../../examples/custom_pubsub/redis_pubsub.py)** - Redis backend (recommended for production) +- **[`postgres_pubsub.py`](../../examples/custom_pubsub/postgres_pubsub.py)** - PostgreSQL NOTIFY/LISTEN backend +- **[`test_pubsub.py`](../../examples/custom_pubsub/test_pubsub.py)** - Test backend for unit testing +- **[`app.py`](../../examples/custom_pubsub/app.py)** - Demo counter app +- **[`docker-compose.yml`](../../examples/custom_pubsub/docker-compose.yml)** - Redis + Postgres services + +See the [examples README](../../examples/custom_pubsub/README.md) for setup instructions and a working demo. + +## Implementation Guide + +### Key Concepts + +When implementing a custom backend, understand these constraints: + +**Handlers are local** - Handlers are Python async callables that can't be serialized. Each instance must: +1. Store handlers in local memory +2. Publish only message data to the distributed backend +3. Route received messages to local handlers only + +**Messages must be serializable** - For distributed backends, messages should be JSON-compatible: + +```python +# Good - JSON serializable +await socket.broadcast("updates", {"user_id": 123, "action": "joined"}) +await socket.broadcast("counter", 42) + +# Bad - not serializable +await socket.broadcast("data", my_dataclass) # Convert with asdict() first +await socket.broadcast("func", some_function) # Can't serialize functions +``` + +### Implementation Checklist + +When building a custom backend: + +- [ ] Store handlers in memory per instance (dict[str, dict[str, TopicHandler]]) +- [ ] Subscribe to distributed backend only when first local handler subscribes +- [ ] Publish messages as JSON to distributed backend +- [ ] Listen for messages from distributed backend +- [ ] Route received messages to local handlers using `asyncio.create_task()` +- [ ] Handle errors gracefully (one failing handler shouldn't affect others) +- [ ] Clean up connections in `stop()` + +### Backend Comparison + +| Backend | Best For | Throughput | Setup Complexity | +|---------|----------|------------|------------------| +| **InMemory** (default) | Single instance, development | High | None | +| **Redis** | Production, multi-instance | Very High | Low (just Redis) | +| **PostgreSQL** | Already using Postgres | Medium | Low (use existing DB) | +| **Test** | Unit/integration tests | N/A | None | + +### Testing Your Backend + +Use the test implementation to verify your LiveView's pub/sub behavior: + +```python +from test_pubsub import TestPubSub + +def test_counter_broadcasts(): + test_pubsub = TestPubSub() + app = PyView(pubsub=test_pubsub) + + # ... test your LiveView ... + + # Verify subscriptions + assert ("session_123", "counter") in test_pubsub.subscriptions + + # Verify broadcasts + assert ("counter", 5) in test_pubsub.broadcasts +``` + +## Production Considerations + +### Environment Configuration + +Use environment variables for connection URLs: + +```python +import os + +redis_url = os.getenv("REDIS_URL", "redis://localhost:6379") +app = PyView(pubsub=RedisPubSub(redis_url)) +``` + +### Error Handling + +Implementations should: +- Log but not crash on malformed messages +- Isolate handler errors (use try/except around each handler call) +- Handle reconnection for network failures +- Clean up resources properly in `stop()` + +### Channel Prefixes + +Use channel prefixes to avoid collisions when multiple apps share the same Redis/Postgres: + +```python +# App 1 +app1 = PyView(pubsub=RedisPubSub(url, channel_prefix="app1:")) + +# App 2 +app2 = PyView(pubsub=RedisPubSub(url, channel_prefix="app2:")) +``` + +## Next Steps + +- Check out the [working examples](../../examples/custom_pubsub/) with Docker Compose +- Review the [PubSubProvider Protocol source](../../pyview/pubsub/interfaces.py) +- Consider implementing backends for NATS, RabbitMQ, or AWS SNS/SQS diff --git a/examples/custom_pubsub/Dockerfile b/examples/custom_pubsub/Dockerfile new file mode 100644 index 0000000..82febbd --- /dev/null +++ b/examples/custom_pubsub/Dockerfile @@ -0,0 +1,13 @@ +FROM python:3.11-slim + +WORKDIR /app + +# Install dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code +COPY . . + +# Run the app +CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/examples/custom_pubsub/README.md b/examples/custom_pubsub/README.md new file mode 100644 index 0000000..b0bfdb1 --- /dev/null +++ b/examples/custom_pubsub/README.md @@ -0,0 +1,283 @@ +# Custom Pub/Sub Backend Examples + +This directory contains complete, working implementations of custom pub/sub backends for PyView, along with a demo app and Docker Compose setup for testing. + +## What's Included + +- **`redis_pubsub.py`** - Redis-backed pub/sub (recommended for production) +- **`postgres_pubsub.py`** - PostgreSQL NOTIFY/LISTEN pub/sub +- **`test_pubsub.py`** - Test implementation for unit testing +- **`app.py`** - Demo counter app showing multi-instance pub/sub +- **`docker-compose.yml`** - Redis and PostgreSQL services + +## Quick Start + +### Option 1: Everything with Docker (Easiest) + +Run the entire demo with one command: + +```bash +docker-compose up --build +``` + +This starts: +- Redis on `localhost:6379` +- PostgreSQL on `localhost:5432` +- Three app instances (app1, app2, app3) +- Nginx load balancer on `localhost:8000` + +**Test it**: Open http://localhost:8000 in multiple browsers and click increment. All browsers stay in sync via Redis pub/sub! 🎉 + +To see which backend instance you're connected to: +- http://localhost:8001 (app1 directly) +- http://localhost:8002 (app2 directly) +- http://localhost:8003 (app3 directly) +- http://localhost:8000 (load balanced across all three) + +### Option 2: Run Locally + +If you want to run the app locally (for development): + +#### 1. Install Dependencies + +```bash +pip install -r requirements.txt +``` + +#### 2. Start Redis + +```bash +docker-compose up -d redis postgres +``` + +This starts just Redis and PostgreSQL, without the app instances. + +#### 3. Run Multiple App Instances + +Open three terminals and run: + +```bash +# Terminal 1 +uvicorn app:app --port 8000 + +# Terminal 2 +uvicorn app:app --port 8001 + +# Terminal 3 +uvicorn app:app --port 8002 +``` + +#### 4. Test It Out + +1. Open http://localhost:8000 in one browser +2. Open http://localhost:8001 in another browser +3. Click increment on either one +4. Watch both update in real-time! 🎉 + +The counter stays in sync across all instances because they're communicating through Redis pub/sub. + +## Using in Your App + +### Redis (Recommended) + +```python +from pyview import PyView +from redis_pubsub import RedisPubSub + +app = PyView( + pubsub=RedisPubSub( + url="redis://localhost:6379", + channel_prefix="myapp:" + ) +) +``` + +**When to use**: Production deployments with multiple app instances behind a load balancer. + +### PostgreSQL + +```python +from pyview import PyView +from postgres_pubsub import PostgresPubSub + +app = PyView( + pubsub=PostgresPubSub( + dsn="postgresql://pyview:pyview@localhost/pyview", + channel_prefix="myapp_" + ) +) +``` + +**When to use**: You're already using PostgreSQL and want to avoid adding Redis. Note: Lower throughput than Redis. + +### Testing + +```python +from pyview import PyView +from test_pubsub import TestPubSub + +test_pubsub = TestPubSub() +app = PyView(pubsub=test_pubsub) + +# In your tests +assert ("session_123", "updates") in test_pubsub.subscriptions +assert ("updates", {"action": "joined"}) in test_pubsub.broadcasts +``` + +**When to use**: Unit and integration tests where you don't want external dependencies. + +## Implementation Details + +### How It Works + +Distributed pub/sub backends must handle a key challenge: **handlers are local Python callables** that can't be serialized and sent over the network. The solution: + +1. **Local handler storage**: Each instance stores handlers in memory +2. **Message broadcasting**: Only message data is sent through Redis/Postgres +3. **Local dispatch**: Each instance routes received messages to its own handlers + +### Redis Implementation + +The Redis implementation uses: +- `redis.asyncio` for async operations +- Pub/sub channels for message distribution +- Reference counting for channel subscriptions (only subscribe when first local handler subscribes) +- Background listener task for receiving messages + +### PostgreSQL Implementation + +The PostgreSQL implementation uses: +- `asyncpg` for async operations +- `NOTIFY`/`LISTEN` for pub/sub +- Separate connections for publish and listen +- Callback-based notification handling + +### Message Serialization + +Messages must be JSON-serializable: + +```python +# Good +await socket.broadcast("updates", {"user_id": 123, "action": "joined"}) +await socket.broadcast("counter", 42) + +# Bad - won't work with distributed backends +await socket.broadcast("data", my_dataclass) # Use asdict() first +await socket.broadcast("func", some_function) # Can't serialize functions +``` + +## Implementing Your Own Backend + +To create a custom backend, implement these methods: + +```python +class MyPubSub: + async def subscribe_topic(self, session_id: str, topic: str, handler: TopicHandler) -> None: + """Subscribe a session to a topic.""" + ... + + async def unsubscribe_topic(self, session_id: str, topic: str) -> None: + """Unsubscribe from a topic.""" + ... + + async def unsubscribe_all(self, session_id: str) -> None: + """Remove all subscriptions for a session.""" + ... + + async def broadcast(self, topic: str, message: Any) -> None: + """Broadcast a message to all subscribers.""" + ... + + async def start(self) -> None: + """Called when the app starts.""" + ... + + async def stop(self) -> None: + """Called when the app shuts down.""" + ... +``` + +See the [PubSubProvider Protocol](../../pyview/pubsub/interfaces.py) for full documentation. + +## Production Deployment + +### Environment Variables + +The demo app reads `REDIS_URL` from the environment: + +```bash +export REDIS_URL="redis://your-redis-host:6379" +uvicorn app:app +``` + +### Connection Pooling + +For production, consider connection pooling: + +```python +class RedisPubSub: + def __init__(self, url: str, max_connections: int = 50): + self._pool = redis.ConnectionPool.from_url( + url, + max_connections=max_connections, + decode_responses=False + ) + self._client = redis.Redis(connection_pool=self._pool) +``` + +### Error Handling + +Both implementations include: +- Graceful handling of malformed messages +- Isolated error handling (one failing handler doesn't affect others) +- Proper cleanup on shutdown + +### Monitoring + +Add metrics for: +- Active subscriptions per instance +- Message publish rate +- Message receive rate +- Handler execution time + +## Troubleshooting + +### Redis connection errors + +``` +redis.exceptions.ConnectionError: Error connecting to Redis +``` + +Make sure Redis is running: +```bash +docker-compose up -d redis +redis-cli ping # Should return PONG +``` + +### PostgreSQL connection errors + +``` +asyncpg.exceptions.CannotConnectNowError +``` + +Check PostgreSQL is running and credentials are correct: +```bash +docker-compose up -d postgres +psql postgresql://pyview:pyview@localhost/pyview -c "SELECT 1" +``` + +### Messages not syncing across instances + +- Check all instances are connected to the same Redis/Postgres +- Verify `channel_prefix` is the same across all instances +- Check logs for subscription/broadcast errors + +## Next Steps + +- See the [pub/sub documentation](../../docs/pubsub-backends.md) for more details +- Check out [production deployment patterns](../../docs/deployment.md) (if available) +- Consider implementing NATS, RabbitMQ, or AWS SNS/SQS backends + +## License + +These examples are part of the PyView project and use the same license. diff --git a/examples/custom_pubsub/app.py b/examples/custom_pubsub/app.py new file mode 100644 index 0000000..be2bf34 --- /dev/null +++ b/examples/custom_pubsub/app.py @@ -0,0 +1,160 @@ +"""Demo app showing multi-instance pub/sub with Redis. + +This counter app demonstrates how pub/sub enables real-time updates across +multiple server instances. When any user clicks increment/decrement, ALL +connected users see the update immediately - even if they're connected to +different server instances. + +To test locally: + 1. Start Redis: docker-compose up -d + 2. Run multiple instances: + Terminal 1: uvicorn app:app --port 8000 + Terminal 2: uvicorn app:app --port 8001 + Terminal 3: uvicorn app:app --port 8002 + 3. Open http://localhost:8000 and http://localhost:8001 in different browsers + 4. Click increment on one - see it update on both! + +To switch backends: + - Redis (recommended): RedisPubSub("redis://localhost:6379") + - PostgreSQL: PostgresPubSub("postgresql://user:pass@localhost/db") + - Testing: TestPubSub() +""" + +import os +from typing import TypedDict + +from redis_pubsub import RedisPubSub + +from pyview import LiveView, PyView, is_connected + + +class CounterContext(TypedDict): + count: int + + +class CounterLiveView(LiveView[CounterContext]): + """Shared counter across all instances.""" + + async def mount(self, socket, session): + socket.context = {"count": 0} + if is_connected(socket): + # Subscribe to counter updates from any instance + await socket.subscribe("counter") + + async def handle_event(self, event, payload, socket): + if event == "increment": + socket.context["count"] += 1 + # Broadcast to ALL instances via Redis + await socket.broadcast("counter", socket.context["count"]) + elif event == "decrement": + socket.context["count"] -= 1 + await socket.broadcast("counter", socket.context["count"]) + elif event == "reset": + socket.context["count"] = 0 + await socket.broadcast("counter", 0) + + async def handle_info(self, event, socket): + # Received update from another instance + socket.context["count"] = event.payload + + async def render(self, context, meta): + return """ + + + + Multi-Instance Counter + + + +
+

Multi-Instance Counter

+
{{ count }}
+
+ + + +
+
+ 🚀 Try opening this page in multiple browsers or run multiple instances on different ports. + All instances stay in sync via Redis pub/sub! +
+
+ + + """ + + +# Create app with Redis pub/sub +# Uses REDIS_URL environment variable if set, otherwise defaults to localhost +redis_url = os.getenv("REDIS_URL", "redis://localhost:6379") +app = PyView(pubsub=RedisPubSub(redis_url)) + +app.add_live_view("/", CounterLiveView) diff --git a/examples/custom_pubsub/docker-compose.yml b/examples/custom_pubsub/docker-compose.yml new file mode 100644 index 0000000..e90c1ec --- /dev/null +++ b/examples/custom_pubsub/docker-compose.yml @@ -0,0 +1,72 @@ +version: '3.8' + +services: + redis: + image: redis:7-alpine + ports: + - "6379:6379" + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 5s + timeout: 3s + retries: 5 + + postgres: + image: postgres:16-alpine + environment: + POSTGRES_USER: pyview + POSTGRES_PASSWORD: pyview + POSTGRES_DB: pyview + ports: + - "5432:5432" + healthcheck: + test: ["CMD-SHELL", "pg_isready -U pyview"] + interval: 5s + timeout: 3s + retries: 5 + volumes: + - postgres_data:/var/lib/postgresql/data + + app1: + build: . + environment: + - REDIS_URL=redis://redis:6379 + depends_on: + redis: + condition: service_healthy + ports: + - "8001:8000" + + app2: + build: . + environment: + - REDIS_URL=redis://redis:6379 + depends_on: + redis: + condition: service_healthy + ports: + - "8002:8000" + + app3: + build: . + environment: + - REDIS_URL=redis://redis:6379 + depends_on: + redis: + condition: service_healthy + ports: + - "8003:8000" + + nginx: + image: nginx:alpine + ports: + - "8000:80" + volumes: + - ./nginx.conf:/etc/nginx/nginx.conf:ro + depends_on: + - app1 + - app2 + - app3 + +volumes: + postgres_data: diff --git a/examples/custom_pubsub/nginx.conf b/examples/custom_pubsub/nginx.conf new file mode 100644 index 0000000..8be50e9 --- /dev/null +++ b/examples/custom_pubsub/nginx.conf @@ -0,0 +1,35 @@ +events { + worker_connections 1024; +} + +http { + upstream pyview_backends { + # Round-robin load balancing across three app instances + server app1:8000; + server app2:8000; + server app3:8000; + } + + server { + listen 80; + + location / { + proxy_pass http://pyview_backends; + proxy_http_version 1.1; + + # WebSocket support + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "upgrade"; + + # Standard proxy headers + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + + # Timeouts + proxy_read_timeout 86400; + proxy_send_timeout 86400; + } + } +} diff --git a/examples/custom_pubsub/postgres_pubsub.py b/examples/custom_pubsub/postgres_pubsub.py new file mode 100644 index 0000000..452d222 --- /dev/null +++ b/examples/custom_pubsub/postgres_pubsub.py @@ -0,0 +1,157 @@ +"""PostgreSQL NOTIFY/LISTEN pub/sub backend. + +Requirements: + pip install asyncpg + +Usage: + app = PyView(pubsub=PostgresPubSub("postgresql://user:pass@localhost/db")) + +Pros: + - No additional infrastructure if you already use PostgreSQL + - Transactional guarantees available if needed + +Cons: + - Lower throughput than Redis + - Not designed for high-volume pub/sub +""" + +import asyncio +import json +import logging +from typing import Any, Callable, Coroutine + +import asyncpg +from asyncpg.utils import quote_ident + +logger = logging.getLogger(__name__) + +TopicHandler = Callable[[str, Any], Coroutine[Any, Any, None]] + + +class PostgresPubSub: + """PostgreSQL NOTIFY/LISTEN pub/sub implementation.""" + + def __init__(self, dsn: str, channel_prefix: str = "pyview_"): + self._dsn = dsn + self._prefix = channel_prefix + self._conn: asyncpg.Connection | None = None + self._listen_conn: asyncpg.Connection | None = None + + self._lock = asyncio.Lock() + self._topic_subscribers: dict[str, dict[str, TopicHandler]] = {} + self._session_topics: dict[str, dict[str, TopicHandler]] = {} + self._subscribed_channels: set[str] = set() + # Store listener callbacks for proper cleanup + self._channel_listeners: dict[str, Callable] = {} + + async def start(self) -> None: + """Connect to PostgreSQL.""" + # Separate connections for publish and listen + self._conn = await asyncpg.connect(self._dsn) + self._listen_conn = await asyncpg.connect(self._dsn) + logger.info("PostgreSQL pub/sub connected") + + async def stop(self) -> None: + """Disconnect from PostgreSQL.""" + if self._conn: + await self._conn.close() + if self._listen_conn: + await self._listen_conn.close() + logger.info("PostgreSQL pub/sub disconnected") + + async def subscribe_topic( + self, + session_id: str, + topic: str, + handler: TopicHandler, + ) -> None: + """Subscribe to a topic using LISTEN.""" + channel = f"{self._prefix}{topic}" + + async with self._lock: + if topic not in self._topic_subscribers: + self._topic_subscribers[topic] = {} + + if channel not in self._subscribed_channels: + listener = self._make_listener(topic) + self._channel_listeners[channel] = listener + await self._listen_conn.add_listener(channel, listener) + self._subscribed_channels.add(channel) + + self._topic_subscribers[topic][session_id] = handler + + if session_id not in self._session_topics: + self._session_topics[session_id] = {} + self._session_topics[session_id][topic] = handler + + def _make_listener(self, topic: str): + """Create a listener callback for a topic.""" + + def listener(conn, pid, channel, payload): + asyncio.create_task(self._handle_notification(topic, payload)) + + return listener + + async def _handle_notification(self, topic: str, payload: str) -> None: + """Handle incoming NOTIFY.""" + try: + message = json.loads(payload) + + async with self._lock: + handlers = list(self._topic_subscribers.get(topic, {}).values()) + + for handler in handlers: + asyncio.create_task(handler(topic, message)) + except Exception: + logger.exception("Error handling PostgreSQL notification") + + async def unsubscribe_topic(self, session_id: str, topic: str) -> None: + """Unsubscribe from a topic.""" + channel = f"{self._prefix}{topic}" + + async with self._lock: + if topic in self._topic_subscribers: + self._topic_subscribers[topic].pop(session_id, None) + + if not self._topic_subscribers[topic]: + del self._topic_subscribers[topic] + if channel in self._subscribed_channels: + listener = self._channel_listeners.pop(channel, None) + if listener: + await self._listen_conn.remove_listener(channel, listener) + self._subscribed_channels.discard(channel) + + if session_id in self._session_topics: + self._session_topics[session_id].pop(topic, None) + if not self._session_topics[session_id]: + del self._session_topics[session_id] + + async def unsubscribe_all(self, session_id: str) -> None: + """Remove all subscriptions for a session.""" + async with self._lock: + if session_id not in self._session_topics: + return + + for topic in list(self._session_topics[session_id].keys()): + channel = f"{self._prefix}{topic}" + + if topic in self._topic_subscribers: + self._topic_subscribers[topic].pop(session_id, None) + + if not self._topic_subscribers[topic]: + del self._topic_subscribers[topic] + if channel in self._subscribed_channels: + listener = self._channel_listeners.pop(channel, None) + if listener: + await self._listen_conn.remove_listener(channel, listener) + self._subscribed_channels.discard(channel) + + del self._session_topics[session_id] + + async def broadcast(self, topic: str, message: Any) -> None: + """Broadcast using NOTIFY.""" + channel = f"{self._prefix}{topic}" + payload = json.dumps(message) + # PostgreSQL NOTIFY has an 8000 byte payload limit + # Use quote_ident to prevent SQL injection + await self._conn.execute(f"NOTIFY {quote_ident(channel)}, $1", payload) diff --git a/examples/custom_pubsub/redis_pubsub.py b/examples/custom_pubsub/redis_pubsub.py new file mode 100644 index 0000000..751f4a6 --- /dev/null +++ b/examples/custom_pubsub/redis_pubsub.py @@ -0,0 +1,189 @@ +"""Redis-backed pub/sub for multi-instance PyView deployments. + +Requirements: + pip install redis + +Usage: + from redis_pubsub import RedisPubSub + + app = PyView(pubsub=RedisPubSub("redis://localhost:6379")) + +How it works: + - Handlers are stored locally (they're Python callables, not serializable) + - When broadcast() is called, the message is published to Redis + - All instances receive the message and dispatch to their local handlers + - This enables real-time updates across multiple server instances +""" + +import asyncio +import json +import logging +from contextlib import suppress +from typing import Any, Callable, Coroutine + +import redis.asyncio as redis + +logger = logging.getLogger(__name__) + +TopicHandler = Callable[[str, Any], Coroutine[Any, Any, None]] + + +class RedisPubSub: + """Redis-backed pub/sub implementation.""" + + def __init__( + self, + url: str = "redis://localhost:6379", + channel_prefix: str = "pyview:", + ): + """Initialize Redis pub/sub. + + Args: + url: Redis connection URL + channel_prefix: Prefix for Redis channel names (helps avoid collisions) + """ + self._url = url + self._prefix = channel_prefix + self._client: redis.Redis | None = None + self._pubsub: redis.client.PubSub | None = None + self._listener_task: asyncio.Task | None = None + + # Local handler tracking (handlers can't be serialized to Redis) + self._lock = asyncio.Lock() + # topic -> {session_id -> handler} + self._topic_subscribers: dict[str, dict[str, TopicHandler]] = {} + # session_id -> {topic -> handler} + self._session_topics: dict[str, dict[str, TopicHandler]] = {} + + async def start(self) -> None: + """Connect to Redis and start the message listener.""" + self._client = redis.from_url(self._url, decode_responses=True) + self._pubsub = self._client.pubsub() + self._listener_task = asyncio.create_task(self._listen()) + logger.info("Redis pub/sub connected to %s", self._url) + + async def stop(self) -> None: + """Disconnect from Redis and clean up.""" + if self._listener_task: + self._listener_task.cancel() + # Suppress CancelledError from listener task - expected during shutdown + with suppress(asyncio.CancelledError): + await self._listener_task + + if self._pubsub: + await self._pubsub.close() + + if self._client: + await self._client.close() + + logger.info("Redis pub/sub disconnected") + + async def subscribe_topic( + self, + session_id: str, + topic: str, + handler: TopicHandler, + ) -> None: + """Subscribe a session to a topic.""" + channel = f"{self._prefix}{topic}" + + async with self._lock: + # Track handler locally + if topic not in self._topic_subscribers: + self._topic_subscribers[topic] = {} + # First local subscriber - subscribe to Redis channel + await self._pubsub.subscribe(channel) + logger.debug("Subscribed to Redis channel: %s", channel) + + self._topic_subscribers[topic][session_id] = handler + + # Track for session cleanup + if session_id not in self._session_topics: + self._session_topics[session_id] = {} + self._session_topics[session_id][topic] = handler + + async def unsubscribe_topic(self, session_id: str, topic: str) -> None: + """Unsubscribe a session from a topic.""" + channel = f"{self._prefix}{topic}" + + async with self._lock: + if topic in self._topic_subscribers: + self._topic_subscribers[topic].pop(session_id, None) + + if not self._topic_subscribers[topic]: + # Last local subscriber - unsubscribe from Redis + del self._topic_subscribers[topic] + await self._pubsub.unsubscribe(channel) + logger.debug("Unsubscribed from Redis channel: %s", channel) + + if session_id in self._session_topics: + self._session_topics[session_id].pop(topic, None) + if not self._session_topics[session_id]: + del self._session_topics[session_id] + + async def unsubscribe_all(self, session_id: str) -> None: + """Remove all subscriptions for a session (called on disconnect).""" + async with self._lock: + if session_id not in self._session_topics: + return + + for topic in list(self._session_topics[session_id].keys()): + channel = f"{self._prefix}{topic}" + + if topic in self._topic_subscribers: + self._topic_subscribers[topic].pop(session_id, None) + + if not self._topic_subscribers[topic]: + del self._topic_subscribers[topic] + await self._pubsub.unsubscribe(channel) + + del self._session_topics[session_id] + + async def broadcast(self, topic: str, message: Any) -> None: + """Publish a message to all subscribers across all instances.""" + channel = f"{self._prefix}{topic}" + # Include topic in payload for routing on receive + payload = json.dumps({"topic": topic, "message": message}) + await self._client.publish(channel, payload) + + async def _listen(self) -> None: + """Background task that receives Redis messages and dispatches to handlers.""" + try: + async for message in self._pubsub.listen(): + if message["type"] != "message": + continue + + try: + data = json.loads(message["data"]) + + # Validate required fields + if "topic" not in data or "message" not in data: + logger.warning( + "Redis message missing required fields: %s", list(data.keys()) + ) + continue + + topic = data["topic"] + payload = data["message"] + + # Get handlers while holding lock + async with self._lock: + handlers = list(self._topic_subscribers.get(topic, {}).values()) + + # Dispatch outside lock to prevent deadlocks + for handler in handlers: + asyncio.create_task(handler(topic, payload)) + + except json.JSONDecodeError: + # Malformed JSON from Redis - log and continue + logger.warning("Invalid JSON in Redis message: %s", message["data"]) + except Exception: + # Unexpected error processing message - log and continue + logger.exception("Error processing Redis message") + + except asyncio.CancelledError: + # Task cancelled during shutdown - this is expected + pass + except Exception: + # Unexpected listener crash - log for debugging + logger.exception("Redis listener crashed") diff --git a/examples/custom_pubsub/requirements.txt b/examples/custom_pubsub/requirements.txt new file mode 100644 index 0000000..66999f7 --- /dev/null +++ b/examples/custom_pubsub/requirements.txt @@ -0,0 +1,11 @@ +# Core dependencies +pyview-web + +# Redis pub/sub backend +redis>=5.0.0 + +# PostgreSQL pub/sub backend +asyncpg>=0.29.0 + +# ASGI server +uvicorn[standard]>=0.24.0 diff --git a/examples/custom_pubsub/test_pubsub.py b/examples/custom_pubsub/test_pubsub.py new file mode 100644 index 0000000..91eac5f --- /dev/null +++ b/examples/custom_pubsub/test_pubsub.py @@ -0,0 +1,81 @@ +"""Test pub/sub implementation that records all operations. + +Useful for integration testing LiveViews that use pub/sub without needing +Redis or other external dependencies. + +Usage: + test_pubsub = TestPubSub() + app = PyView(pubsub=test_pubsub) + + # In your tests + assert ("user_123", "updates") in test_pubsub.subscriptions + assert ("updates", {"action": "joined"}) in test_pubsub.broadcasts +""" + +from typing import Any, Callable, Coroutine + +TopicHandler = Callable[[str, Any], Coroutine[Any, Any, None]] + + +class TestPubSub: + """Records all pub/sub operations for testing.""" + + def __init__(self): + self.subscriptions: list[tuple[str, str]] = [] # (session_id, topic) + self.unsubscriptions: list[tuple[str, str]] = [] # (session_id, topic) + self.broadcasts: list[tuple[str, Any]] = [] # (topic, message) + self.handlers: dict[str, dict[str, TopicHandler]] = {} + self.started = False + self.stopped = False + + async def start(self) -> None: + """Mark as started.""" + self.started = True + + async def stop(self) -> None: + """Mark as stopped.""" + self.stopped = True + + async def subscribe_topic( + self, + session_id: str, + topic: str, + handler: TopicHandler, + ) -> None: + """Record subscription and store handler.""" + self.subscriptions.append((session_id, topic)) + if topic not in self.handlers: + self.handlers[topic] = {} + self.handlers[topic][session_id] = handler + + async def unsubscribe_topic(self, session_id: str, topic: str) -> None: + """Record unsubscription.""" + self.unsubscriptions.append((session_id, topic)) + if topic in self.handlers and session_id in self.handlers[topic]: + del self.handlers[topic][session_id] + if not self.handlers[topic]: + del self.handlers[topic] + + async def unsubscribe_all(self, session_id: str) -> None: + """Record unsubscribe all.""" + for topic in list(self.handlers.keys()): + if session_id in self.handlers[topic]: + self.unsubscriptions.append((session_id, topic)) + del self.handlers[topic][session_id] + if not self.handlers[topic]: + del self.handlers[topic] + + async def broadcast(self, topic: str, message: Any) -> None: + """Record broadcast and dispatch to local handlers.""" + self.broadcasts.append((topic, message)) + # Immediately dispatch to local handlers for testing + if topic in self.handlers: + for handler in list(self.handlers[topic].values()): + await handler(topic, message) + + def clear(self) -> None: + """Clear all recorded operations.""" + self.subscriptions.clear() + self.unsubscriptions.clear() + self.broadcasts.clear() + self.handlers.clear() diff --git a/pyview/__init__.py b/pyview/__init__.py index 6a3f1b4..2c6a931 100644 --- a/pyview/__init__.py +++ b/pyview/__init__.py @@ -7,6 +7,7 @@ ) from pyview.live_view import LiveView from pyview.playground import playground +from pyview.pubsub import PubSubProvider from pyview.pyview import PyView, RootTemplate, RootTemplateContext, defaultRootTemplate from pyview.stream import Stream @@ -14,6 +15,7 @@ "LiveView", "LiveViewSocket", "PyView", + "PubSubProvider", "defaultRootTemplate", "JsCommand", "RootTemplateContext", diff --git a/pyview/live_socket.py b/pyview/live_socket.py index 9b3d773..bf6416a 100644 --- a/pyview/live_socket.py +++ b/pyview/live_socket.py @@ -25,9 +25,9 @@ from pyview.async_stream_runner import AsyncStreamRunner from pyview.events import InfoEvent from pyview.meta import PyViewMeta +from pyview.pubsub import PubSubProvider, SessionPubSub from pyview.template.render_diff import calc_diff from pyview.uploads import UploadConfig, UploadConstraints, UploadManager -from pyview.vendor.flet.pubsub import PubSub, PubSubHub logger = logging.getLogger(__name__) @@ -36,9 +36,6 @@ from .instrumentation import InstrumentationProvider from .live_view import LiveView - -pub_sub_hub = PubSubHub() - T = TypeVar("T") @@ -84,6 +81,7 @@ def __init__( liveview: LiveView, scheduler: AsyncIOScheduler, instrumentation: InstrumentationProvider, + pubsub: PubSubProvider, ): self.websocket = websocket self.topic = topic @@ -91,7 +89,7 @@ def __init__( self.instrumentation = instrumentation self.scheduled_jobs = set() self.connected = True - self.pub_sub = PubSub(pub_sub_hub, topic) + self._pubsub = SessionPubSub(pubsub, topic) self.pending_events = [] self.upload_manager = UploadManager() self.stream_runner = AsyncStreamRunner(self) @@ -102,10 +100,10 @@ def meta(self) -> PyViewMeta: return PyViewMeta() async def subscribe(self, topic: str): - await self.pub_sub.subscribe_topic_async(topic, self._topic_callback_internal) + await self._pubsub.subscribe(topic, self._topic_callback_internal) async def broadcast(self, topic: str, message: Any): - await self.pub_sub.send_all_on_topic_async(topic, message) + await self._pubsub.broadcast(topic, message) async def _topic_callback_internal(self, topic, message): await self.send_info(InfoEvent(topic, message)) @@ -268,7 +266,7 @@ async def close(self): for id in list(self.scheduled_jobs): with suppress(JobLookupError): self.scheduler.remove_job(id) - await self.pub_sub.unsubscribe_all_async() + await self._pubsub.unsubscribe_all() with suppress(Exception): self.upload_manager.close() diff --git a/pyview/pubsub/__init__.py b/pyview/pubsub/__init__.py new file mode 100644 index 0000000..66245c6 --- /dev/null +++ b/pyview/pubsub/__init__.py @@ -0,0 +1,12 @@ +"""Pluggable pub/sub module for PyView.""" + +from .interfaces import PubSubProvider, TopicHandler +from .memory import InMemoryPubSub +from .session import SessionPubSub + +__all__ = [ + "PubSubProvider", + "TopicHandler", + "InMemoryPubSub", + "SessionPubSub", +] diff --git a/pyview/pubsub/interfaces.py b/pyview/pubsub/interfaces.py new file mode 100644 index 0000000..ff7e318 --- /dev/null +++ b/pyview/pubsub/interfaces.py @@ -0,0 +1,65 @@ +"""Protocol definition for pluggable pub/sub implementations.""" + +from typing import Any, Callable, Coroutine, Protocol, runtime_checkable + +# Type alias for async topic handlers +TopicHandler = Callable[[str, Any], Coroutine[Any, Any, None]] + + +@runtime_checkable +class PubSubProvider(Protocol): + """Protocol for pub/sub implementations. + + Implementations must handle: + - Topic-based subscriptions per session + - Broadcasting messages to all subscribers on a topic + - Proper cleanup when sessions disconnect + + For distributed implementations (Redis, etc.): + - Handlers are local Python callables (not serializable) + - Messages must be serializable (JSON-compatible recommended) + - Implementation should handle cross-instance message routing + + Using Protocol enables structural typing - any class implementing + these methods is compatible, no inheritance required. + """ + + async def subscribe_topic( + self, + session_id: str, + topic: str, + handler: TopicHandler, + ) -> None: + """Subscribe a session to a topic with a handler. + + Args: + session_id: Unique identifier for the session + topic: Topic name to subscribe to + handler: Async callable(topic, message) to invoke on messages + """ + ... + + async def unsubscribe_topic(self, session_id: str, topic: str) -> None: + """Unsubscribe a session from a specific topic.""" + ... + + async def unsubscribe_all(self, session_id: str) -> None: + """Remove all subscriptions for a session (cleanup on disconnect).""" + ... + + async def broadcast(self, topic: str, message: Any) -> None: + """Broadcast a message to all subscribers on a topic. + + Note: For distributed implementations, `message` should be + JSON-serializable. Complex objects should be converted before + broadcasting. + """ + ... + + async def start(self) -> None: + """Called when the PyView app starts. Override for setup.""" + ... + + async def stop(self) -> None: + """Called when the PyView app shuts down. Override for cleanup.""" + ... diff --git a/pyview/pubsub/memory.py b/pyview/pubsub/memory.py new file mode 100644 index 0000000..93911cc --- /dev/null +++ b/pyview/pubsub/memory.py @@ -0,0 +1,50 @@ +"""Default in-memory pub/sub implementation.""" + +from typing import Any + +from pyview.vendor.flet.pubsub import PubSubHub + +from .interfaces import TopicHandler + + +class InMemoryPubSub: + """Default in-memory pub/sub implementation. + + Delegates to the battle-tested Flet PubSubHub implementation. + Suitable for single-instance deployments. Messages are delivered + only within the same Python process. + + Satisfies the PubSubProvider protocol via structural typing. + """ + + def __init__(self): + self._hub = PubSubHub() + + async def subscribe_topic( + self, + session_id: str, + topic: str, + handler: TopicHandler, + ) -> None: + """Subscribe a session to a topic with a handler.""" + await self._hub.subscribe_topic_async(session_id, topic, handler) + + async def unsubscribe_topic(self, session_id: str, topic: str) -> None: + """Unsubscribe a session from a specific topic.""" + await self._hub.unsubscribe_topic_async(session_id, topic) + + async def unsubscribe_all(self, session_id: str) -> None: + """Remove all subscriptions for a session.""" + await self._hub.unsubscribe_all_async(session_id) + + async def broadcast(self, topic: str, message: Any) -> None: + """Broadcast a message to all subscribers on a topic.""" + await self._hub.send_all_on_topic_async(topic, message) + + async def start(self) -> None: + """No-op for in-memory implementation.""" + pass + + async def stop(self) -> None: + """No-op for in-memory implementation.""" + pass diff --git a/pyview/pubsub/session.py b/pyview/pubsub/session.py new file mode 100644 index 0000000..d368933 --- /dev/null +++ b/pyview/pubsub/session.py @@ -0,0 +1,33 @@ +"""Session-scoped pub/sub wrapper.""" + +from typing import Any + +from .interfaces import PubSubProvider, TopicHandler + + +class SessionPubSub: + """Session-scoped pub/sub wrapper. + + Provides a convenient API that automatically includes the session_id + in all operations. Used internally by ConnectedLiveViewSocket. + """ + + def __init__(self, provider: PubSubProvider, session_id: str): + self._provider = provider + self._session_id = session_id + + async def subscribe(self, topic: str, handler: TopicHandler) -> None: + """Subscribe to a topic.""" + await self._provider.subscribe_topic(self._session_id, topic, handler) + + async def unsubscribe(self, topic: str) -> None: + """Unsubscribe from a specific topic.""" + await self._provider.unsubscribe_topic(self._session_id, topic) + + async def unsubscribe_all(self) -> None: + """Unsubscribe from all topics (called on disconnect).""" + await self._provider.unsubscribe_all(self._session_id) + + async def broadcast(self, topic: str, message: Any) -> None: + """Broadcast a message to all subscribers on a topic.""" + await self._provider.broadcast(topic, message) diff --git a/pyview/pyview.py b/pyview/pyview.py index 0a6cfc5..1d5e1ad 100644 --- a/pyview/pyview.py +++ b/pyview/pyview.py @@ -14,6 +14,7 @@ from pyview.instrumentation import InstrumentationProvider, NoOpInstrumentation from pyview.live_socket import UnconnectedSocket from pyview.meta import PyViewMeta +from pyview.pubsub import InMemoryPubSub, PubSubProvider from pyview.session import serialize_session from .live_routes import LiveViewLookup @@ -30,8 +31,15 @@ class PyView(Starlette): rootTemplate: RootTemplate instrumentation: InstrumentationProvider - - def __init__(self, *args, instrumentation: Optional[InstrumentationProvider] = None, **kwargs): + pubsub: PubSubProvider + + def __init__( + self, + *args, + instrumentation: Optional[InstrumentationProvider] = None, + pubsub: Optional[PubSubProvider] = None, + **kwargs, + ): # Extract user's lifespan if provided, then always use our composed lifespan user_lifespan = kwargs.pop("lifespan", None) kwargs["lifespan"] = self._create_lifespan(user_lifespan) @@ -39,8 +47,9 @@ def __init__(self, *args, instrumentation: Optional[InstrumentationProvider] = N super().__init__(*args, **kwargs) self.rootTemplate = defaultRootTemplate() self.instrumentation = instrumentation or NoOpInstrumentation() + self.pubsub = pubsub or InMemoryPubSub() self.view_lookup = LiveViewLookup() - self.live_handler = LiveSocketHandler(self.view_lookup, self.instrumentation) + self.live_handler = LiveSocketHandler(self.view_lookup, self.instrumentation, self.pubsub) self.routes.append(WebSocketRoute("/live/websocket", self.live_handler.handle)) self.add_middleware(GZipMiddleware) @@ -54,18 +63,19 @@ def _create_lifespan(self, user_lifespan=None): @asynccontextmanager async def lifespan(app): - # Startup: Start the scheduler + # Startup app.live_handler.start_scheduler() + await app.pubsub.start() - # Run user's lifespan if they provided one - if user_lifespan: - async with user_lifespan(app): + try: + if user_lifespan: + async with user_lifespan(app): + yield + else: yield - else: - yield - - # Shutdown: Stop the scheduler - await app.live_handler.shutdown_scheduler() + finally: + await app.pubsub.stop() + await app.live_handler.shutdown_scheduler() return lifespan diff --git a/pyview/ws_handler.py b/pyview/ws_handler.py index fe9a6c5..4dd7f59 100644 --- a/pyview/ws_handler.py +++ b/pyview/ws_handler.py @@ -13,6 +13,7 @@ from pyview.live_routes import LiveViewLookup from pyview.live_socket import ConnectedLiveViewSocket, LiveViewSocket from pyview.phx_message import parse_message +from pyview.pubsub import PubSubProvider from pyview.session import deserialize_session logger = logging.getLogger(__name__) @@ -47,9 +48,15 @@ def __init__(self, instrumentation: InstrumentationProvider): class LiveSocketHandler: - def __init__(self, routes: LiveViewLookup, instrumentation: InstrumentationProvider): + def __init__( + self, + routes: LiveViewLookup, + instrumentation: InstrumentationProvider, + pubsub: PubSubProvider, + ): self.routes = routes self.instrumentation = instrumentation + self.pubsub = pubsub self.metrics = LiveSocketMetrics(instrumentation) self.manager = ConnectionManager() self.sessions = 0 @@ -94,7 +101,7 @@ async def handle(self, websocket: WebSocket): lv, path_params = self.routes.get(url.path) await self.check_auth(websocket, lv) socket = ConnectedLiveViewSocket( - websocket, topic, lv, self.scheduler, self.instrumentation + websocket, topic, lv, self.scheduler, self.instrumentation, self.pubsub ) session = {} @@ -283,7 +290,12 @@ async def handle_connected(self, myJoinId, socket: ConnectedLiveViewSocket): # Create new socket for new LiveView socket = ConnectedLiveViewSocket( - socket.websocket, topic, lv, self.scheduler, self.instrumentation + socket.websocket, + topic, + lv, + self.scheduler, + self.instrumentation, + self.pubsub, ) session = {}