From 176307474e3e2cc8bf40ba5640aed6ad4c327f21 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 14 Dec 2025 17:56:37 +0000 Subject: [PATCH 1/9] Add pluggable pub/sub design document Design for making PyView's pub/sub system pluggable to support multi-machine deployments with backends like Redis, while: - Adding no new dependencies to core PyView - Maintaining backward compatibility - Following the existing InstrumentationProvider pattern Includes: - PubSubProvider abstract interface - InMemoryPubSub default implementation - SessionPubSub wrapper for ergonomic socket API - Complete Redis implementation example - Migration path and considerations --- docs/design/pluggable-pubsub.md | 579 ++++++++++++++++++++++++++++++++ 1 file changed, 579 insertions(+) create mode 100644 docs/design/pluggable-pubsub.md diff --git a/docs/design/pluggable-pubsub.md b/docs/design/pluggable-pubsub.md new file mode 100644 index 0000000..1351e6e --- /dev/null +++ b/docs/design/pluggable-pubsub.md @@ -0,0 +1,579 @@ +# Pluggable Pub/Sub Design for PyView + +## Overview + +This document outlines the design for making PyView's pub/sub system pluggable, enabling multi-machine deployments with backends like Redis, while maintaining backward compatibility and adding no new dependencies to the core library. + +## Current State + +### Architecture +- **`PubSubHub`** (`pyview/vendor/flet/pubsub/pub_sub.py`): Central message hub that stores subscriptions in memory using dictionaries +- **`PubSub`**: Session-scoped wrapper that delegates to `PubSubHub` with automatic `session_id` injection +- **Global singleton**: `pub_sub_hub = PubSubHub()` instantiated at module level in `live_socket.py:40` + +### Current Usage in ConnectedLiveViewSocket + +```python +# live_socket.py +self.pub_sub = PubSub(pub_sub_hub, topic) + +# Methods used: +await self.pub_sub.subscribe_topic_async(topic, handler) +await self.pub_sub.send_all_on_topic_async(topic, message) +await self.pub_sub.unsubscribe_all_async() +``` + +### Limitations +1. In-memory only - doesn't work across multiple server instances +2. Not configurable - hardcoded singleton instantiation +3. No abstraction layer for alternative implementations + +--- + +## Proposed Design + +### Design Goals +1. **Pluggable backends**: Support Redis, PostgreSQL NOTIFY/LISTEN, or custom implementations +2. **Zero new dependencies**: Core PyView remains lightweight +3. **Backward compatible**: Existing code works unchanged +4. **Follow existing patterns**: Mirror the `InstrumentationProvider` architecture +5. **Minimal API surface**: Only abstract what's actually needed + +### Core Interface + +Create an abstract base class following the existing `InstrumentationProvider` pattern: + +```python +# pyview/pubsub/interfaces.py +from abc import ABC, abstractmethod +from typing import Any, Callable, Coroutine + +# Type alias for async handlers +TopicHandler = Callable[[str, Any], Coroutine[Any, Any, None]] + + +class PubSubProvider(ABC): + """Abstract base class for pub/sub implementations. + + Implementations must handle: + - Topic-based subscriptions per session + - Broadcasting messages to all subscribers on a topic + - Proper cleanup when sessions disconnect + + For distributed implementations (Redis, etc.): + - Handlers are local Python callables (not serializable) + - Messages must be serializable (JSON-compatible recommended) + - Implementation should handle cross-instance message routing + """ + + @abstractmethod + async def subscribe_topic( + self, + session_id: str, + topic: str, + handler: TopicHandler + ) -> None: + """Subscribe a session to a topic with a handler. + + Args: + session_id: Unique identifier for the session + topic: Topic name to subscribe to + handler: Async callable(topic, message) to invoke on messages + """ + pass + + @abstractmethod + async def unsubscribe_topic(self, session_id: str, topic: str) -> None: + """Unsubscribe a session from a specific topic.""" + pass + + @abstractmethod + async def unsubscribe_all(self, session_id: str) -> None: + """Remove all subscriptions for a session (cleanup on disconnect).""" + pass + + @abstractmethod + async def broadcast(self, topic: str, message: Any) -> None: + """Broadcast a message to all subscribers on a topic. + + Note: For distributed implementations, `message` should be + JSON-serializable. Complex objects should be converted before + broadcasting. + """ + pass + + # Optional lifecycle hooks for implementations that need them + async def start(self) -> None: + """Called when the PyView app starts. Override for setup.""" + pass + + async def stop(self) -> None: + """Called when the PyView app shuts down. Override for cleanup.""" + pass +``` + +### Default In-Memory Implementation + +Refactor the existing `PubSubHub` to implement the new interface: + +```python +# pyview/pubsub/memory.py +import asyncio +from typing import Any + +from .interfaces import PubSubProvider, TopicHandler + + +class InMemoryPubSub(PubSubProvider): + """Default in-memory pub/sub implementation. + + Suitable for single-instance deployments. Messages are delivered + only within the same Python process. + """ + + def __init__(self): + self._lock = asyncio.Lock() + # topic -> {session_id -> handler} + self._topic_subscribers: dict[str, dict[str, TopicHandler]] = {} + # session_id -> {topic -> handler} + self._session_topics: dict[str, dict[str, TopicHandler]] = {} + + async def subscribe_topic( + self, + session_id: str, + topic: str, + handler: TopicHandler + ) -> None: + async with self._lock: + # Add to topic subscribers + if topic not in self._topic_subscribers: + self._topic_subscribers[topic] = {} + self._topic_subscribers[topic][session_id] = handler + + # Track for session cleanup + if session_id not in self._session_topics: + self._session_topics[session_id] = {} + self._session_topics[session_id][topic] = handler + + async def unsubscribe_topic(self, session_id: str, topic: str) -> None: + async with self._lock: + if topic in self._topic_subscribers: + self._topic_subscribers[topic].pop(session_id, None) + if not self._topic_subscribers[topic]: + del self._topic_subscribers[topic] + + if session_id in self._session_topics: + self._session_topics[session_id].pop(topic, None) + if not self._session_topics[session_id]: + del self._session_topics[session_id] + + async def unsubscribe_all(self, session_id: str) -> None: + async with self._lock: + if session_id in self._session_topics: + for topic in list(self._session_topics[session_id].keys()): + if topic in self._topic_subscribers: + self._topic_subscribers[topic].pop(session_id, None) + if not self._topic_subscribers[topic]: + del self._topic_subscribers[topic] + del self._session_topics[session_id] + + async def broadcast(self, topic: str, message: Any) -> None: + async with self._lock: + handlers = list(self._topic_subscribers.get(topic, {}).values()) + + # Dispatch outside the lock to prevent deadlocks + for handler in handlers: + asyncio.create_task(handler(topic, message)) +``` + +### Session-Scoped Wrapper + +Keep a thin wrapper for ergonomic use in sockets: + +```python +# pyview/pubsub/session.py +from typing import Any + +from .interfaces import PubSubProvider, TopicHandler + + +class SessionPubSub: + """Session-scoped pub/sub wrapper. + + Provides a convenient API that automatically includes the session_id + in all operations. + """ + + def __init__(self, provider: PubSubProvider, session_id: str): + self._provider = provider + self._session_id = session_id + + async def subscribe(self, topic: str, handler: TopicHandler) -> None: + """Subscribe to a topic.""" + await self._provider.subscribe_topic(self._session_id, topic, handler) + + async def unsubscribe(self, topic: str) -> None: + """Unsubscribe from a specific topic.""" + await self._provider.unsubscribe_topic(self._session_id, topic) + + async def unsubscribe_all(self) -> None: + """Unsubscribe from all topics (called on disconnect).""" + await self._provider.unsubscribe_all(self._session_id) + + async def broadcast(self, topic: str, message: Any) -> None: + """Broadcast a message to all subscribers on a topic.""" + await self._provider.broadcast(topic, message) +``` + +### Integration with PyView + +Update `PyView.__init__()` to accept an optional pub/sub provider: + +```python +# pyview/pyview.py +from pyview.pubsub import PubSubProvider, InMemoryPubSub + +class PyView(Starlette): + def __init__( + self, + *args, + instrumentation: Optional[InstrumentationProvider] = None, + pubsub: Optional[PubSubProvider] = None, # NEW + **kwargs + ): + # ... + self.pubsub = pubsub or InMemoryPubSub() + self.live_handler = LiveSocketHandler( + self.view_lookup, + self.instrumentation, + self.pubsub # Pass to handler + ) +``` + +Update lifecycle management: + +```python +# In _create_lifespan() +async def lifespan(app): + app.live_handler.start_scheduler() + await app.pubsub.start() # NEW: Initialize pub/sub + + if user_lifespan: + async with user_lifespan(app): + yield + else: + yield + + await app.pubsub.stop() # NEW: Cleanup pub/sub + await app.live_handler.shutdown_scheduler() +``` + +Update `ConnectedLiveViewSocket`: + +```python +# live_socket.py +from pyview.pubsub import SessionPubSub, PubSubProvider + +class ConnectedLiveViewSocket(Generic[T]): + def __init__( + self, + websocket: WebSocket, + topic: str, + liveview: LiveView, + scheduler: AsyncIOScheduler, + instrumentation: InstrumentationProvider, + pubsub_provider: PubSubProvider, # NEW + ): + # ... + self._pubsub = SessionPubSub(pubsub_provider, topic) + + async def subscribe(self, topic: str): + await self._pubsub.subscribe(topic, self._topic_callback_internal) + + async def broadcast(self, topic: str, message: Any): + await self._pubsub.broadcast(topic, message) + + async def close(self): + # ... + await self._pubsub.unsubscribe_all() +``` + +--- + +## Redis Implementation Example + +This would be a **separate package** (e.g., `pyview-redis`) that users install if needed: + +```python +# Example: pyview_redis/pubsub.py +import asyncio +import json +from typing import Any + +import redis.asyncio as redis + +from pyview.pubsub import PubSubProvider, TopicHandler + + +class RedisPubSub(PubSubProvider): + """Redis-backed pub/sub for multi-instance deployments. + + Install: pip install pyview-redis + + Usage: + from pyview_redis import RedisPubSub + + app = PyView(pubsub=RedisPubSub("redis://localhost:6379")) + """ + + def __init__( + self, + url: str = "redis://localhost:6379", + channel_prefix: str = "pyview:", + ): + self._url = url + self._prefix = channel_prefix + self._client: redis.Redis | None = None + self._pubsub: redis.client.PubSub | None = None + self._listener_task: asyncio.Task | None = None + + # Local handler tracking (handlers are not serializable) + self._lock = asyncio.Lock() + self._topic_subscribers: dict[str, dict[str, TopicHandler]] = {} + self._session_topics: dict[str, dict[str, TopicHandler]] = {} + + async def start(self) -> None: + """Connect to Redis and start listening for messages.""" + self._client = redis.from_url(self._url) + self._pubsub = self._client.pubsub() + self._listener_task = asyncio.create_task(self._listen()) + + async def stop(self) -> None: + """Disconnect from Redis and cleanup.""" + if self._listener_task: + self._listener_task.cancel() + try: + await self._listener_task + except asyncio.CancelledError: + pass + + if self._pubsub: + await self._pubsub.close() + + if self._client: + await self._client.close() + + async def subscribe_topic( + self, + session_id: str, + topic: str, + handler: TopicHandler + ) -> None: + channel = f"{self._prefix}{topic}" + + async with self._lock: + # Track handler locally + if topic not in self._topic_subscribers: + self._topic_subscribers[topic] = {} + # First subscriber to topic - subscribe in Redis + await self._pubsub.subscribe(channel) + + self._topic_subscribers[topic][session_id] = handler + + if session_id not in self._session_topics: + self._session_topics[session_id] = {} + self._session_topics[session_id][topic] = handler + + async def unsubscribe_topic(self, session_id: str, topic: str) -> None: + channel = f"{self._prefix}{topic}" + + async with self._lock: + if topic in self._topic_subscribers: + self._topic_subscribers[topic].pop(session_id, None) + + if not self._topic_subscribers[topic]: + # Last subscriber - unsubscribe from Redis + del self._topic_subscribers[topic] + await self._pubsub.unsubscribe(channel) + + if session_id in self._session_topics: + self._session_topics[session_id].pop(topic, None) + if not self._session_topics[session_id]: + del self._session_topics[session_id] + + async def unsubscribe_all(self, session_id: str) -> None: + async with self._lock: + if session_id in self._session_topics: + for topic in list(self._session_topics[session_id].keys()): + channel = f"{self._prefix}{topic}" + + if topic in self._topic_subscribers: + self._topic_subscribers[topic].pop(session_id, None) + + if not self._topic_subscribers[topic]: + del self._topic_subscribers[topic] + await self._pubsub.unsubscribe(channel) + + del self._session_topics[session_id] + + async def broadcast(self, topic: str, message: Any) -> None: + """Publish message to Redis channel.""" + channel = f"{self._prefix}{topic}" + payload = json.dumps({"topic": topic, "message": message}) + await self._client.publish(channel, payload) + + async def _listen(self) -> None: + """Background task to receive Redis messages and dispatch to handlers.""" + async for message in self._pubsub.listen(): + if message["type"] == "message": + try: + data = json.loads(message["data"]) + topic = data["topic"] + payload = data["message"] + + async with self._lock: + handlers = list( + self._topic_subscribers.get(topic, {}).values() + ) + + for handler in handlers: + asyncio.create_task(handler(topic, payload)) + + except Exception as e: + # Log but don't crash the listener + import logging + logging.error(f"Error processing Redis message: {e}") +``` + +### Usage Example + +```python +# app.py +from pyview import PyView +from pyview_redis import RedisPubSub # Separate package + +# Single instance (default) +app = PyView() + +# Multi-instance with Redis +app = PyView( + pubsub=RedisPubSub( + url="redis://localhost:6379", + channel_prefix="myapp:" + ) +) +``` + +--- + +## Alternative Backend Examples + +### PostgreSQL NOTIFY/LISTEN + +```python +# Example structure for PostgreSQL backend +class PostgresPubSub(PubSubProvider): + """PostgreSQL NOTIFY/LISTEN for deployments already using PostgreSQL. + + Pros: No additional infrastructure if you have PostgreSQL + Cons: Less throughput than Redis, doesn't persist messages + """ + + def __init__(self, dsn: str, channel_prefix: str = "pyview_"): + self._dsn = dsn + self._prefix = channel_prefix + # ... similar pattern to Redis + + async def broadcast(self, topic: str, message: Any) -> None: + channel = f"{self._prefix}{topic}" + payload = json.dumps(message) + await self._conn.execute(f"NOTIFY {channel}, '{payload}'") +``` + +### NATS + +```python +# Example structure for NATS backend +class NatsPubSub(PubSubProvider): + """NATS for high-performance distributed messaging. + + Pros: Very high throughput, built for pub/sub + Cons: Additional infrastructure to manage + """ + pass +``` + +--- + +## File Structure + +``` +pyview/ +├── pubsub/ +│ ├── __init__.py # Exports: PubSubProvider, InMemoryPubSub, SessionPubSub +│ ├── interfaces.py # Abstract PubSubProvider class +│ ├── memory.py # InMemoryPubSub implementation +│ └── session.py # SessionPubSub wrapper +├── vendor/ +│ └── flet/ +│ └── pubsub/ # DEPRECATED - keep for backward compat temporarily +├── live_socket.py # Updated to use new pub/sub +├── pyview.py # Updated with pubsub parameter +└── ws_handler.py # Updated to pass pubsub to sockets +``` + +--- + +## Migration Path + +### Phase 1: Add New Interface (Non-Breaking) +1. Create `pyview/pubsub/` module with interface and in-memory impl +2. Add optional `pubsub` parameter to `PyView.__init__()` +3. Default to `InMemoryPubSub` when not provided +4. Update internal code to use new interface +5. Keep old `vendor/flet/pubsub` working for any direct imports + +### Phase 2: Documentation +1. Document the `PubSubProvider` interface +2. Provide Redis implementation example +3. Add guide for implementing custom backends + +### Phase 3: Deprecation (Future) +1. Add deprecation warning to `vendor/flet/pubsub` imports +2. Eventually remove in a future major version + +--- + +## Considerations + +### Message Serialization +For distributed backends, messages must be serializable. Recommend documenting: +- Use JSON-serializable types (dict, list, str, int, float, bool, None) +- For dataclasses, convert with `asdict()` before broadcasting +- Complex objects need explicit serialization + +### Handler Execution +- Handlers are async callables executed via `asyncio.create_task()` +- Distributed implementations must route messages to local handlers only +- A session's handler is only callable within the same Python process + +### Error Handling +- Implementations should not raise on message delivery failure to one handler +- Failed handlers shouldn't prevent delivery to other subscribers +- Connection errors (Redis down) should be logged, with optional retry logic + +### Testing +- Add mock/test implementation: `TestPubSub` that records all operations +- Useful for integration testing LiveViews that use pub/sub + +--- + +## Summary + +This design: +1. **Mirrors existing patterns** - Follows `InstrumentationProvider` architecture +2. **Minimal interface** - Only 4 abstract methods needed +3. **Zero new deps** - Core PyView stays lightweight +4. **Easy to implement** - Redis example is ~100 lines +5. **Backward compatible** - Default in-memory behavior unchanged +6. **Production ready** - Lifecycle hooks for proper startup/shutdown From 379f7fe3b4f2cf3499d0bbd179e9bbfcac124680 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 14 Dec 2025 18:00:49 +0000 Subject: [PATCH 2/9] Update pub/sub design to use Protocol instead of ABC - Use typing.Protocol for structural typing (duck typing) - Consistent with existing InfoEventScheduler pattern - No inheritance required for implementations - Use ... (Ellipsis) convention for protocol method stubs - Add @runtime_checkable for isinstance() support if needed --- docs/design/pluggable-pubsub.md | 79 ++++++++++++++++++++------------- 1 file changed, 47 insertions(+), 32 deletions(-) diff --git a/docs/design/pluggable-pubsub.md b/docs/design/pluggable-pubsub.md index 1351e6e..bb2214f 100644 --- a/docs/design/pluggable-pubsub.md +++ b/docs/design/pluggable-pubsub.md @@ -36,24 +36,24 @@ await self.pub_sub.unsubscribe_all_async() 1. **Pluggable backends**: Support Redis, PostgreSQL NOTIFY/LISTEN, or custom implementations 2. **Zero new dependencies**: Core PyView remains lightweight 3. **Backward compatible**: Existing code works unchanged -4. **Follow existing patterns**: Mirror the `InstrumentationProvider` architecture +4. **Follow existing patterns**: Use `Protocol` for structural typing (like `InfoEventScheduler`) 5. **Minimal API surface**: Only abstract what's actually needed ### Core Interface -Create an abstract base class following the existing `InstrumentationProvider` pattern: +Create a Protocol following the existing `InfoEventScheduler` pattern in `pyview/events/info_event.py`: ```python # pyview/pubsub/interfaces.py -from abc import ABC, abstractmethod -from typing import Any, Callable, Coroutine +from typing import Any, Callable, Coroutine, Protocol, runtime_checkable # Type alias for async handlers TopicHandler = Callable[[str, Any], Coroutine[Any, Any, None]] -class PubSubProvider(ABC): - """Abstract base class for pub/sub implementations. +@runtime_checkable +class PubSubProvider(Protocol): + """Protocol for pub/sub implementations. Implementations must handle: - Topic-based subscriptions per session @@ -64,9 +64,11 @@ class PubSubProvider(ABC): - Handlers are local Python callables (not serializable) - Messages must be serializable (JSON-compatible recommended) - Implementation should handle cross-instance message routing + + Using Protocol enables structural typing - any class implementing + these methods is compatible, no inheritance required. """ - @abstractmethod async def subscribe_topic( self, session_id: str, @@ -80,19 +82,16 @@ class PubSubProvider(ABC): topic: Topic name to subscribe to handler: Async callable(topic, message) to invoke on messages """ - pass + ... - @abstractmethod async def unsubscribe_topic(self, session_id: str, topic: str) -> None: """Unsubscribe a session from a specific topic.""" - pass + ... - @abstractmethod async def unsubscribe_all(self, session_id: str) -> None: """Remove all subscriptions for a session (cleanup on disconnect).""" - pass + ... - @abstractmethod async def broadcast(self, topic: str, message: Any) -> None: """Broadcast a message to all subscribers on a topic. @@ -100,31 +99,32 @@ class PubSubProvider(ABC): JSON-serializable. Complex objects should be converted before broadcasting. """ - pass + ... - # Optional lifecycle hooks for implementations that need them async def start(self) -> None: """Called when the PyView app starts. Override for setup.""" - pass + ... async def stop(self) -> None: """Called when the PyView app shuts down. Override for cleanup.""" - pass + ... ``` +Note: Using `...` (Ellipsis) instead of `pass` is the convention for Protocol method stubs. + ### Default In-Memory Implementation -Refactor the existing `PubSubHub` to implement the new interface: +Refactor the existing `PubSubHub` to satisfy the protocol (no inheritance needed): ```python # pyview/pubsub/memory.py import asyncio from typing import Any -from .interfaces import PubSubProvider, TopicHandler +from .interfaces import TopicHandler -class InMemoryPubSub(PubSubProvider): +class InMemoryPubSub: """Default in-memory pub/sub implementation. Suitable for single-instance deployments. Messages are delivered @@ -184,6 +184,14 @@ class InMemoryPubSub(PubSubProvider): # Dispatch outside the lock to prevent deadlocks for handler in handlers: asyncio.create_task(handler(topic, message)) + + async def start(self) -> None: + """No-op for in-memory implementation.""" + pass + + async def stop(self) -> None: + """No-op for in-memory implementation.""" + pass ``` ### Session-Scoped Wrapper @@ -312,10 +320,10 @@ from typing import Any import redis.asyncio as redis -from pyview.pubsub import PubSubProvider, TopicHandler +from pyview.pubsub import TopicHandler -class RedisPubSub(PubSubProvider): +class RedisPubSub: """Redis-backed pub/sub for multi-instance deployments. Install: pip install pyview-redis @@ -472,11 +480,13 @@ app = PyView( ```python # Example structure for PostgreSQL backend -class PostgresPubSub(PubSubProvider): +class PostgresPubSub: """PostgreSQL NOTIFY/LISTEN for deployments already using PostgreSQL. Pros: No additional infrastructure if you have PostgreSQL Cons: Less throughput than Redis, doesn't persist messages + + Satisfies PubSubProvider protocol via structural typing. """ def __init__(self, dsn: str, channel_prefix: str = "pyview_"): @@ -488,19 +498,23 @@ class PostgresPubSub(PubSubProvider): channel = f"{self._prefix}{topic}" payload = json.dumps(message) await self._conn.execute(f"NOTIFY {channel}, '{payload}'") + + # ... implement other protocol methods ``` ### NATS ```python # Example structure for NATS backend -class NatsPubSub(PubSubProvider): +class NatsPubSub: """NATS for high-performance distributed messaging. Pros: Very high throughput, built for pub/sub Cons: Additional infrastructure to manage + + Satisfies PubSubProvider protocol via structural typing. """ - pass + # ... implement protocol methods ``` --- @@ -511,7 +525,7 @@ class NatsPubSub(PubSubProvider): pyview/ ├── pubsub/ │ ├── __init__.py # Exports: PubSubProvider, InMemoryPubSub, SessionPubSub -│ ├── interfaces.py # Abstract PubSubProvider class +│ ├── interfaces.py # PubSubProvider Protocol │ ├── memory.py # InMemoryPubSub implementation │ └── session.py # SessionPubSub wrapper ├── vendor/ @@ -571,9 +585,10 @@ For distributed backends, messages must be serializable. Recommend documenting: ## Summary This design: -1. **Mirrors existing patterns** - Follows `InstrumentationProvider` architecture -2. **Minimal interface** - Only 4 abstract methods needed -3. **Zero new deps** - Core PyView stays lightweight -4. **Easy to implement** - Redis example is ~100 lines -5. **Backward compatible** - Default in-memory behavior unchanged -6. **Production ready** - Lifecycle hooks for proper startup/shutdown +1. **Mirrors existing patterns** - Uses `Protocol` like `InfoEventScheduler` +2. **Structural typing** - No inheritance required, just implement the methods +3. **Minimal interface** - Only 4 core methods + 2 lifecycle hooks +4. **Zero new deps** - Core PyView stays lightweight +5. **Easy to implement** - Redis example is ~100 lines +6. **Backward compatible** - Default in-memory behavior unchanged +7. **Production ready** - Lifecycle hooks for proper startup/shutdown From 81eea6296a011184934b5af84a2645d4ba7ded99 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 14 Dec 2025 18:05:54 +0000 Subject: [PATCH 3/9] Implement pluggable pub/sub system Add a pluggable pub/sub architecture that allows custom backends (Redis, PostgreSQL, NATS, etc.) for multi-machine deployments. New modules: - pyview/pubsub/interfaces.py: PubSubProvider Protocol - pyview/pubsub/memory.py: InMemoryPubSub (default implementation) - pyview/pubsub/session.py: SessionPubSub wrapper for socket API Changes: - PyView now accepts optional `pubsub` parameter - Lifecycle hooks (start/stop) called during app lifespan - ConnectedLiveViewSocket uses new pub/sub interface - PubSubProvider exported from main pyview module The default InMemoryPubSub maintains backward compatibility. Custom implementations can be provided for distributed deployments: from pyview import PyView from myapp.redis_pubsub import RedisPubSub app = PyView(pubsub=RedisPubSub("redis://localhost:6379")) --- pyview/__init__.py | 2 + pyview/live_socket.py | 14 +++--- pyview/pubsub/__init__.py | 12 +++++ pyview/pubsub/interfaces.py | 65 +++++++++++++++++++++++++++ pyview/pubsub/memory.py | 89 +++++++++++++++++++++++++++++++++++++ pyview/pubsub/session.py | 33 ++++++++++++++ pyview/pyview.py | 23 +++++++--- pyview/ws_handler.py | 13 ++++-- 8 files changed, 235 insertions(+), 16 deletions(-) create mode 100644 pyview/pubsub/__init__.py create mode 100644 pyview/pubsub/interfaces.py create mode 100644 pyview/pubsub/memory.py create mode 100644 pyview/pubsub/session.py diff --git a/pyview/__init__.py b/pyview/__init__.py index 6a3f1b4..2c6a931 100644 --- a/pyview/__init__.py +++ b/pyview/__init__.py @@ -7,6 +7,7 @@ ) from pyview.live_view import LiveView from pyview.playground import playground +from pyview.pubsub import PubSubProvider from pyview.pyview import PyView, RootTemplate, RootTemplateContext, defaultRootTemplate from pyview.stream import Stream @@ -14,6 +15,7 @@ "LiveView", "LiveViewSocket", "PyView", + "PubSubProvider", "defaultRootTemplate", "JsCommand", "RootTemplateContext", diff --git a/pyview/live_socket.py b/pyview/live_socket.py index 9b3d773..bf6416a 100644 --- a/pyview/live_socket.py +++ b/pyview/live_socket.py @@ -25,9 +25,9 @@ from pyview.async_stream_runner import AsyncStreamRunner from pyview.events import InfoEvent from pyview.meta import PyViewMeta +from pyview.pubsub import PubSubProvider, SessionPubSub from pyview.template.render_diff import calc_diff from pyview.uploads import UploadConfig, UploadConstraints, UploadManager -from pyview.vendor.flet.pubsub import PubSub, PubSubHub logger = logging.getLogger(__name__) @@ -36,9 +36,6 @@ from .instrumentation import InstrumentationProvider from .live_view import LiveView - -pub_sub_hub = PubSubHub() - T = TypeVar("T") @@ -84,6 +81,7 @@ def __init__( liveview: LiveView, scheduler: AsyncIOScheduler, instrumentation: InstrumentationProvider, + pubsub: PubSubProvider, ): self.websocket = websocket self.topic = topic @@ -91,7 +89,7 @@ def __init__( self.instrumentation = instrumentation self.scheduled_jobs = set() self.connected = True - self.pub_sub = PubSub(pub_sub_hub, topic) + self._pubsub = SessionPubSub(pubsub, topic) self.pending_events = [] self.upload_manager = UploadManager() self.stream_runner = AsyncStreamRunner(self) @@ -102,10 +100,10 @@ def meta(self) -> PyViewMeta: return PyViewMeta() async def subscribe(self, topic: str): - await self.pub_sub.subscribe_topic_async(topic, self._topic_callback_internal) + await self._pubsub.subscribe(topic, self._topic_callback_internal) async def broadcast(self, topic: str, message: Any): - await self.pub_sub.send_all_on_topic_async(topic, message) + await self._pubsub.broadcast(topic, message) async def _topic_callback_internal(self, topic, message): await self.send_info(InfoEvent(topic, message)) @@ -268,7 +266,7 @@ async def close(self): for id in list(self.scheduled_jobs): with suppress(JobLookupError): self.scheduler.remove_job(id) - await self.pub_sub.unsubscribe_all_async() + await self._pubsub.unsubscribe_all() with suppress(Exception): self.upload_manager.close() diff --git a/pyview/pubsub/__init__.py b/pyview/pubsub/__init__.py new file mode 100644 index 0000000..66245c6 --- /dev/null +++ b/pyview/pubsub/__init__.py @@ -0,0 +1,12 @@ +"""Pluggable pub/sub module for PyView.""" + +from .interfaces import PubSubProvider, TopicHandler +from .memory import InMemoryPubSub +from .session import SessionPubSub + +__all__ = [ + "PubSubProvider", + "TopicHandler", + "InMemoryPubSub", + "SessionPubSub", +] diff --git a/pyview/pubsub/interfaces.py b/pyview/pubsub/interfaces.py new file mode 100644 index 0000000..ff7e318 --- /dev/null +++ b/pyview/pubsub/interfaces.py @@ -0,0 +1,65 @@ +"""Protocol definition for pluggable pub/sub implementations.""" + +from typing import Any, Callable, Coroutine, Protocol, runtime_checkable + +# Type alias for async topic handlers +TopicHandler = Callable[[str, Any], Coroutine[Any, Any, None]] + + +@runtime_checkable +class PubSubProvider(Protocol): + """Protocol for pub/sub implementations. + + Implementations must handle: + - Topic-based subscriptions per session + - Broadcasting messages to all subscribers on a topic + - Proper cleanup when sessions disconnect + + For distributed implementations (Redis, etc.): + - Handlers are local Python callables (not serializable) + - Messages must be serializable (JSON-compatible recommended) + - Implementation should handle cross-instance message routing + + Using Protocol enables structural typing - any class implementing + these methods is compatible, no inheritance required. + """ + + async def subscribe_topic( + self, + session_id: str, + topic: str, + handler: TopicHandler, + ) -> None: + """Subscribe a session to a topic with a handler. + + Args: + session_id: Unique identifier for the session + topic: Topic name to subscribe to + handler: Async callable(topic, message) to invoke on messages + """ + ... + + async def unsubscribe_topic(self, session_id: str, topic: str) -> None: + """Unsubscribe a session from a specific topic.""" + ... + + async def unsubscribe_all(self, session_id: str) -> None: + """Remove all subscriptions for a session (cleanup on disconnect).""" + ... + + async def broadcast(self, topic: str, message: Any) -> None: + """Broadcast a message to all subscribers on a topic. + + Note: For distributed implementations, `message` should be + JSON-serializable. Complex objects should be converted before + broadcasting. + """ + ... + + async def start(self) -> None: + """Called when the PyView app starts. Override for setup.""" + ... + + async def stop(self) -> None: + """Called when the PyView app shuts down. Override for cleanup.""" + ... diff --git a/pyview/pubsub/memory.py b/pyview/pubsub/memory.py new file mode 100644 index 0000000..2371b23 --- /dev/null +++ b/pyview/pubsub/memory.py @@ -0,0 +1,89 @@ +"""Default in-memory pub/sub implementation.""" + +import asyncio +import logging +from typing import Any + +from .interfaces import TopicHandler + +logger = logging.getLogger(__name__) + + +class InMemoryPubSub: + """Default in-memory pub/sub implementation. + + Suitable for single-instance deployments. Messages are delivered + only within the same Python process. + + Satisfies the PubSubProvider protocol via structural typing. + """ + + def __init__(self): + self._lock = asyncio.Lock() + # topic -> {session_id -> handler} + self._topic_subscribers: dict[str, dict[str, TopicHandler]] = {} + # session_id -> {topic -> handler} + self._session_topics: dict[str, dict[str, TopicHandler]] = {} + + async def subscribe_topic( + self, + session_id: str, + topic: str, + handler: TopicHandler, + ) -> None: + """Subscribe a session to a topic with a handler.""" + logger.debug("pubsub.subscribe_topic(%s, %s)", session_id, topic) + async with self._lock: + # Add to topic subscribers + if topic not in self._topic_subscribers: + self._topic_subscribers[topic] = {} + self._topic_subscribers[topic][session_id] = handler + + # Track for session cleanup + if session_id not in self._session_topics: + self._session_topics[session_id] = {} + self._session_topics[session_id][topic] = handler + + async def unsubscribe_topic(self, session_id: str, topic: str) -> None: + """Unsubscribe a session from a specific topic.""" + logger.debug("pubsub.unsubscribe_topic(%s, %s)", session_id, topic) + async with self._lock: + self._unsubscribe_topic(session_id, topic) + + async def unsubscribe_all(self, session_id: str) -> None: + """Remove all subscriptions for a session.""" + logger.debug("pubsub.unsubscribe_all(%s)", session_id) + async with self._lock: + if session_id in self._session_topics: + for topic in list(self._session_topics[session_id].keys()): + self._unsubscribe_topic(session_id, topic) + + def _unsubscribe_topic(self, session_id: str, topic: str) -> None: + """Internal unsubscribe (must be called with lock held).""" + if topic in self._topic_subscribers: + self._topic_subscribers[topic].pop(session_id, None) + if not self._topic_subscribers[topic]: + del self._topic_subscribers[topic] + + if session_id in self._session_topics: + self._session_topics[session_id].pop(topic, None) + if not self._session_topics[session_id]: + del self._session_topics[session_id] + + async def broadcast(self, topic: str, message: Any) -> None: + """Broadcast a message to all subscribers on a topic.""" + logger.debug("pubsub.broadcast(%s, %s)", topic, message) + async with self._lock: + handlers = list(self._topic_subscribers.get(topic, {}).values()) + + # Dispatch outside the lock to prevent deadlocks + for handler in handlers: + asyncio.create_task(handler(topic, message)) + + async def start(self) -> None: + """No-op for in-memory implementation.""" + pass + + async def stop(self) -> None: + """No-op for in-memory implementation.""" + pass diff --git a/pyview/pubsub/session.py b/pyview/pubsub/session.py new file mode 100644 index 0000000..d368933 --- /dev/null +++ b/pyview/pubsub/session.py @@ -0,0 +1,33 @@ +"""Session-scoped pub/sub wrapper.""" + +from typing import Any + +from .interfaces import PubSubProvider, TopicHandler + + +class SessionPubSub: + """Session-scoped pub/sub wrapper. + + Provides a convenient API that automatically includes the session_id + in all operations. Used internally by ConnectedLiveViewSocket. + """ + + def __init__(self, provider: PubSubProvider, session_id: str): + self._provider = provider + self._session_id = session_id + + async def subscribe(self, topic: str, handler: TopicHandler) -> None: + """Subscribe to a topic.""" + await self._provider.subscribe_topic(self._session_id, topic, handler) + + async def unsubscribe(self, topic: str) -> None: + """Unsubscribe from a specific topic.""" + await self._provider.unsubscribe_topic(self._session_id, topic) + + async def unsubscribe_all(self) -> None: + """Unsubscribe from all topics (called on disconnect).""" + await self._provider.unsubscribe_all(self._session_id) + + async def broadcast(self, topic: str, message: Any) -> None: + """Broadcast a message to all subscribers on a topic.""" + await self._provider.broadcast(topic, message) diff --git a/pyview/pyview.py b/pyview/pyview.py index 0a6cfc5..15d5647 100644 --- a/pyview/pyview.py +++ b/pyview/pyview.py @@ -13,6 +13,7 @@ from pyview.csrf import generate_csrf_token from pyview.instrumentation import InstrumentationProvider, NoOpInstrumentation from pyview.live_socket import UnconnectedSocket +from pyview.pubsub import InMemoryPubSub, PubSubProvider from pyview.meta import PyViewMeta from pyview.session import serialize_session @@ -30,8 +31,15 @@ class PyView(Starlette): rootTemplate: RootTemplate instrumentation: InstrumentationProvider - - def __init__(self, *args, instrumentation: Optional[InstrumentationProvider] = None, **kwargs): + pubsub: PubSubProvider + + def __init__( + self, + *args, + instrumentation: Optional[InstrumentationProvider] = None, + pubsub: Optional[PubSubProvider] = None, + **kwargs, + ): # Extract user's lifespan if provided, then always use our composed lifespan user_lifespan = kwargs.pop("lifespan", None) kwargs["lifespan"] = self._create_lifespan(user_lifespan) @@ -39,8 +47,11 @@ def __init__(self, *args, instrumentation: Optional[InstrumentationProvider] = N super().__init__(*args, **kwargs) self.rootTemplate = defaultRootTemplate() self.instrumentation = instrumentation or NoOpInstrumentation() + self.pubsub = pubsub or InMemoryPubSub() self.view_lookup = LiveViewLookup() - self.live_handler = LiveSocketHandler(self.view_lookup, self.instrumentation) + self.live_handler = LiveSocketHandler( + self.view_lookup, self.instrumentation, self.pubsub + ) self.routes.append(WebSocketRoute("/live/websocket", self.live_handler.handle)) self.add_middleware(GZipMiddleware) @@ -54,8 +65,9 @@ def _create_lifespan(self, user_lifespan=None): @asynccontextmanager async def lifespan(app): - # Startup: Start the scheduler + # Startup app.live_handler.start_scheduler() + await app.pubsub.start() # Run user's lifespan if they provided one if user_lifespan: @@ -64,7 +76,8 @@ async def lifespan(app): else: yield - # Shutdown: Stop the scheduler + # Shutdown + await app.pubsub.stop() await app.live_handler.shutdown_scheduler() return lifespan diff --git a/pyview/ws_handler.py b/pyview/ws_handler.py index fe9a6c5..d11b4a2 100644 --- a/pyview/ws_handler.py +++ b/pyview/ws_handler.py @@ -13,6 +13,7 @@ from pyview.live_routes import LiveViewLookup from pyview.live_socket import ConnectedLiveViewSocket, LiveViewSocket from pyview.phx_message import parse_message +from pyview.pubsub import PubSubProvider from pyview.session import deserialize_session logger = logging.getLogger(__name__) @@ -47,9 +48,15 @@ def __init__(self, instrumentation: InstrumentationProvider): class LiveSocketHandler: - def __init__(self, routes: LiveViewLookup, instrumentation: InstrumentationProvider): + def __init__( + self, + routes: LiveViewLookup, + instrumentation: InstrumentationProvider, + pubsub: PubSubProvider, + ): self.routes = routes self.instrumentation = instrumentation + self.pubsub = pubsub self.metrics = LiveSocketMetrics(instrumentation) self.manager = ConnectionManager() self.sessions = 0 @@ -94,7 +101,7 @@ async def handle(self, websocket: WebSocket): lv, path_params = self.routes.get(url.path) await self.check_auth(websocket, lv) socket = ConnectedLiveViewSocket( - websocket, topic, lv, self.scheduler, self.instrumentation + websocket, topic, lv, self.scheduler, self.instrumentation, self.pubsub ) session = {} @@ -283,7 +290,7 @@ async def handle_connected(self, myJoinId, socket: ConnectedLiveViewSocket): # Create new socket for new LiveView socket = ConnectedLiveViewSocket( - socket.websocket, topic, lv, self.scheduler, self.instrumentation + socket.websocket, topic, lv, self.scheduler, self.instrumentation, self.pubsub ) session = {} From 1e354eeb686ac4afd1e37ea2ddc8855bda8cf2b0 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 14 Dec 2025 18:11:08 +0000 Subject: [PATCH 4/9] Add documentation for custom pub/sub backends Includes complete, copy-pasteable implementations for: - Redis (using redis.asyncio) - PostgreSQL (using asyncpg with NOTIFY/LISTEN) Also covers: - The PubSubProvider protocol - Key implementation notes (handlers are local, serialization) - Error handling best practices - Test implementation pattern --- docs/pubsub-backends.md | 496 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 496 insertions(+) create mode 100644 docs/pubsub-backends.md diff --git a/docs/pubsub-backends.md b/docs/pubsub-backends.md new file mode 100644 index 0000000..26f459e --- /dev/null +++ b/docs/pubsub-backends.md @@ -0,0 +1,496 @@ +# Custom Pub/Sub Backends + +PyView's pub/sub system is pluggable, allowing you to use distributed backends like Redis for multi-machine deployments. This guide shows how to implement custom backends. + +## Overview + +By default, PyView uses an in-memory pub/sub implementation suitable for single-instance deployments. For horizontally scaled applications, you'll need a distributed backend. + +```python +# Default (in-memory) +app = PyView() + +# Custom backend +app = PyView(pubsub=RedisPubSub("redis://localhost:6379")) +``` + +## The PubSubProvider Protocol + +Any class implementing these methods is compatible: + +```python +from typing import Any, Callable, Coroutine + +TopicHandler = Callable[[str, Any], Coroutine[Any, Any, None]] + +class PubSubProvider: + async def subscribe_topic(self, session_id: str, topic: str, handler: TopicHandler) -> None: + """Subscribe a session to a topic.""" + ... + + async def unsubscribe_topic(self, session_id: str, topic: str) -> None: + """Unsubscribe a session from a topic.""" + ... + + async def unsubscribe_all(self, session_id: str) -> None: + """Remove all subscriptions for a session.""" + ... + + async def broadcast(self, topic: str, message: Any) -> None: + """Broadcast a message to all subscribers on a topic.""" + ... + + async def start(self) -> None: + """Called when the app starts.""" + ... + + async def stop(self) -> None: + """Called when the app shuts down.""" + ... +``` + +## Redis Implementation + +Here's a complete Redis implementation you can use as a starting point: + +```python +# redis_pubsub.py +import asyncio +import json +import logging +from typing import Any, Callable, Coroutine + +import redis.asyncio as redis + +logger = logging.getLogger(__name__) + +TopicHandler = Callable[[str, Any], Coroutine[Any, Any, None]] + + +class RedisPubSub: + """Redis-backed pub/sub for multi-instance PyView deployments. + + Requirements: + pip install redis + + Usage: + from redis_pubsub import RedisPubSub + + app = PyView(pubsub=RedisPubSub("redis://localhost:6379")) + + How it works: + - Handlers are stored locally (they're Python callables, not serializable) + - When broadcast() is called, the message is published to Redis + - All instances receive the message and dispatch to their local handlers + - This enables real-time updates across multiple server instances + """ + + def __init__( + self, + url: str = "redis://localhost:6379", + channel_prefix: str = "pyview:", + ): + """Initialize Redis pub/sub. + + Args: + url: Redis connection URL + channel_prefix: Prefix for Redis channel names (helps avoid collisions) + """ + self._url = url + self._prefix = channel_prefix + self._client: redis.Redis | None = None + self._pubsub: redis.client.PubSub | None = None + self._listener_task: asyncio.Task | None = None + + # Local handler tracking (handlers can't be serialized to Redis) + self._lock = asyncio.Lock() + # topic -> {session_id -> handler} + self._topic_subscribers: dict[str, dict[str, TopicHandler]] = {} + # session_id -> {topic -> handler} + self._session_topics: dict[str, dict[str, TopicHandler]] = {} + + async def start(self) -> None: + """Connect to Redis and start the message listener.""" + self._client = redis.from_url(self._url) + self._pubsub = self._client.pubsub() + self._listener_task = asyncio.create_task(self._listen()) + logger.info("Redis pub/sub connected to %s", self._url) + + async def stop(self) -> None: + """Disconnect from Redis and clean up.""" + if self._listener_task: + self._listener_task.cancel() + try: + await self._listener_task + except asyncio.CancelledError: + pass + + if self._pubsub: + await self._pubsub.close() + + if self._client: + await self._client.close() + + logger.info("Redis pub/sub disconnected") + + async def subscribe_topic( + self, + session_id: str, + topic: str, + handler: TopicHandler, + ) -> None: + """Subscribe a session to a topic.""" + channel = f"{self._prefix}{topic}" + + async with self._lock: + # Track handler locally + if topic not in self._topic_subscribers: + self._topic_subscribers[topic] = {} + # First local subscriber - subscribe to Redis channel + await self._pubsub.subscribe(channel) + logger.debug("Subscribed to Redis channel: %s", channel) + + self._topic_subscribers[topic][session_id] = handler + + # Track for session cleanup + if session_id not in self._session_topics: + self._session_topics[session_id] = {} + self._session_topics[session_id][topic] = handler + + async def unsubscribe_topic(self, session_id: str, topic: str) -> None: + """Unsubscribe a session from a topic.""" + channel = f"{self._prefix}{topic}" + + async with self._lock: + if topic in self._topic_subscribers: + self._topic_subscribers[topic].pop(session_id, None) + + if not self._topic_subscribers[topic]: + # Last local subscriber - unsubscribe from Redis + del self._topic_subscribers[topic] + await self._pubsub.unsubscribe(channel) + logger.debug("Unsubscribed from Redis channel: %s", channel) + + if session_id in self._session_topics: + self._session_topics[session_id].pop(topic, None) + if not self._session_topics[session_id]: + del self._session_topics[session_id] + + async def unsubscribe_all(self, session_id: str) -> None: + """Remove all subscriptions for a session (called on disconnect).""" + async with self._lock: + if session_id not in self._session_topics: + return + + for topic in list(self._session_topics[session_id].keys()): + channel = f"{self._prefix}{topic}" + + if topic in self._topic_subscribers: + self._topic_subscribers[topic].pop(session_id, None) + + if not self._topic_subscribers[topic]: + del self._topic_subscribers[topic] + await self._pubsub.unsubscribe(channel) + + del self._session_topics[session_id] + + async def broadcast(self, topic: str, message: Any) -> None: + """Publish a message to all subscribers across all instances.""" + channel = f"{self._prefix}{topic}" + # Include topic in payload for routing on receive + payload = json.dumps({"topic": topic, "message": message}) + await self._client.publish(channel, payload) + + async def _listen(self) -> None: + """Background task that receives Redis messages and dispatches to handlers.""" + try: + async for message in self._pubsub.listen(): + if message["type"] != "message": + continue + + try: + data = json.loads(message["data"]) + topic = data["topic"] + payload = data["message"] + + # Get handlers while holding lock + async with self._lock: + handlers = list( + self._topic_subscribers.get(topic, {}).values() + ) + + # Dispatch outside lock to prevent deadlocks + for handler in handlers: + asyncio.create_task(handler(topic, payload)) + + except json.JSONDecodeError: + logger.warning("Invalid JSON in Redis message: %s", message["data"]) + except Exception: + logger.exception("Error processing Redis message") + + except asyncio.CancelledError: + pass + except Exception: + logger.exception("Redis listener crashed") +``` + +### Usage Example + +```python +# app.py +from pyview import PyView +from redis_pubsub import RedisPubSub + +# Create app with Redis pub/sub +app = PyView( + pubsub=RedisPubSub( + url="redis://localhost:6379", + channel_prefix="myapp:", # Namespace your channels + ) +) + +# Your LiveViews work exactly the same +@app.add_live_view("/counter") +class CounterLiveView(LiveView): + async def mount(self, socket, session): + socket.context = {"count": 0} + if is_connected(socket): + await socket.subscribe("counter") + + async def handle_event(self, event, payload, socket): + if event == "increment": + socket.context["count"] += 1 + # This now broadcasts via Redis to ALL instances + await socket.broadcast("counter", socket.context["count"]) + + async def handle_info(self, event, socket): + socket.context["count"] = event.payload +``` + +### Running Multiple Instances + +```bash +# Terminal 1 +uvicorn app:app --port 8000 + +# Terminal 2 +uvicorn app:app --port 8001 + +# Terminal 3 +uvicorn app:app --port 8002 +``` + +With a load balancer in front, users on different instances will see real-time updates from each other. + +## PostgreSQL Implementation + +If you're already using PostgreSQL, you can use NOTIFY/LISTEN: + +```python +# postgres_pubsub.py +import asyncio +import json +import logging +from typing import Any, Callable, Coroutine + +import asyncpg + +logger = logging.getLogger(__name__) + +TopicHandler = Callable[[str, Any], Coroutine[Any, Any, None]] + + +class PostgresPubSub: + """PostgreSQL NOTIFY/LISTEN pub/sub backend. + + Requirements: + pip install asyncpg + + Usage: + app = PyView(pubsub=PostgresPubSub("postgresql://user:pass@localhost/db")) + + Pros: + - No additional infrastructure if you already use PostgreSQL + - Transactional guarantees available if needed + + Cons: + - Lower throughput than Redis + - Not designed for high-volume pub/sub + """ + + def __init__(self, dsn: str, channel_prefix: str = "pyview_"): + self._dsn = dsn + self._prefix = channel_prefix + self._conn: asyncpg.Connection | None = None + self._listen_conn: asyncpg.Connection | None = None + + self._lock = asyncio.Lock() + self._topic_subscribers: dict[str, dict[str, TopicHandler]] = {} + self._session_topics: dict[str, dict[str, TopicHandler]] = {} + self._subscribed_channels: set[str] = set() + + async def start(self) -> None: + """Connect to PostgreSQL.""" + # Separate connections for publish and listen + self._conn = await asyncpg.connect(self._dsn) + self._listen_conn = await asyncpg.connect(self._dsn) + logger.info("PostgreSQL pub/sub connected") + + async def stop(self) -> None: + """Disconnect from PostgreSQL.""" + if self._conn: + await self._conn.close() + if self._listen_conn: + await self._listen_conn.close() + logger.info("PostgreSQL pub/sub disconnected") + + async def subscribe_topic( + self, + session_id: str, + topic: str, + handler: TopicHandler, + ) -> None: + """Subscribe to a topic using LISTEN.""" + channel = f"{self._prefix}{topic}" + + async with self._lock: + if topic not in self._topic_subscribers: + self._topic_subscribers[topic] = {} + + if channel not in self._subscribed_channels: + await self._listen_conn.add_listener( + channel, self._make_listener(topic) + ) + self._subscribed_channels.add(channel) + + self._topic_subscribers[topic][session_id] = handler + + if session_id not in self._session_topics: + self._session_topics[session_id] = {} + self._session_topics[session_id][topic] = handler + + def _make_listener(self, topic: str): + """Create a listener callback for a topic.""" + def listener(conn, pid, channel, payload): + asyncio.create_task(self._handle_notification(topic, payload)) + return listener + + async def _handle_notification(self, topic: str, payload: str) -> None: + """Handle incoming NOTIFY.""" + try: + message = json.loads(payload) + + async with self._lock: + handlers = list(self._topic_subscribers.get(topic, {}).values()) + + for handler in handlers: + asyncio.create_task(handler(topic, message)) + except Exception: + logger.exception("Error handling PostgreSQL notification") + + async def unsubscribe_topic(self, session_id: str, topic: str) -> None: + """Unsubscribe from a topic.""" + channel = f"{self._prefix}{topic}" + + async with self._lock: + if topic in self._topic_subscribers: + self._topic_subscribers[topic].pop(session_id, None) + + if not self._topic_subscribers[topic]: + del self._topic_subscribers[topic] + if channel in self._subscribed_channels: + await self._listen_conn.remove_listener(channel, None) + self._subscribed_channels.discard(channel) + + if session_id in self._session_topics: + self._session_topics[session_id].pop(topic, None) + if not self._session_topics[session_id]: + del self._session_topics[session_id] + + async def unsubscribe_all(self, session_id: str) -> None: + """Remove all subscriptions for a session.""" + async with self._lock: + if session_id not in self._session_topics: + return + + for topic in list(self._session_topics[session_id].keys()): + channel = f"{self._prefix}{topic}" + + if topic in self._topic_subscribers: + self._topic_subscribers[topic].pop(session_id, None) + + if not self._topic_subscribers[topic]: + del self._topic_subscribers[topic] + if channel in self._subscribed_channels: + await self._listen_conn.remove_listener(channel, None) + self._subscribed_channels.discard(channel) + + del self._session_topics[session_id] + + async def broadcast(self, topic: str, message: Any) -> None: + """Broadcast using NOTIFY.""" + channel = f"{self._prefix}{topic}" + payload = json.dumps(message) + # PostgreSQL NOTIFY has an 8000 byte payload limit + await self._conn.execute(f"NOTIFY {channel}, $1", payload) +``` + +## Key Implementation Notes + +### Handlers Are Local + +Handlers are Python async functions - they can't be serialized and sent over the network. Your implementation must: + +1. Store handlers in local memory (per-instance) +2. Publish only the message data to the distributed backend +3. Route received messages to local handlers + +### Message Serialization + +Messages must be JSON-serializable for distributed backends: + +```python +# Good - JSON serializable +await socket.broadcast("updates", {"user_id": 123, "action": "joined"}) +await socket.broadcast("counter", 42) + +# Bad - not serializable +await socket.broadcast("data", my_dataclass) # Convert with asdict() first +await socket.broadcast("func", some_function) # Can't serialize functions +``` + +### Error Handling + +Your implementation should: + +- Not crash if one handler fails (isolate errors) +- Log but continue on malformed messages +- Handle reconnection for network failures + +### Testing + +Consider creating a test implementation: + +```python +class TestPubSub: + """Records all pub/sub operations for testing.""" + + def __init__(self): + self.subscriptions: list[tuple[str, str]] = [] # (session_id, topic) + self.broadcasts: list[tuple[str, Any]] = [] # (topic, message) + self.handlers: dict[str, dict[str, TopicHandler]] = {} + + async def subscribe_topic(self, session_id: str, topic: str, handler: TopicHandler) -> None: + self.subscriptions.append((session_id, topic)) + if topic not in self.handlers: + self.handlers[topic] = {} + self.handlers[topic][session_id] = handler + + async def broadcast(self, topic: str, message: Any) -> None: + self.broadcasts.append((topic, message)) + # Immediately dispatch to local handlers for testing + for handler in self.handlers.get(topic, {}).values(): + await handler(topic, message) + + # ... other methods +``` From 1e727530fd932417075a79af6e71686093c44cfe Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 14 Dec 2025 20:35:25 +0000 Subject: [PATCH 5/9] Refactor InMemoryPubSub to delegate to Flet PubSubHub Instead of reimplementing the pub/sub logic, InMemoryPubSub now wraps the existing battle-tested Flet PubSubHub implementation. This reduces code duplication while providing a clean adapter that satisfies our PubSubProvider protocol. --- pyview/pubsub/memory.py | 55 ++++++----------------------------------- 1 file changed, 8 insertions(+), 47 deletions(-) diff --git a/pyview/pubsub/memory.py b/pyview/pubsub/memory.py index 2371b23..93911cc 100644 --- a/pyview/pubsub/memory.py +++ b/pyview/pubsub/memory.py @@ -1,17 +1,16 @@ """Default in-memory pub/sub implementation.""" -import asyncio -import logging from typing import Any -from .interfaces import TopicHandler +from pyview.vendor.flet.pubsub import PubSubHub -logger = logging.getLogger(__name__) +from .interfaces import TopicHandler class InMemoryPubSub: """Default in-memory pub/sub implementation. + Delegates to the battle-tested Flet PubSubHub implementation. Suitable for single-instance deployments. Messages are delivered only within the same Python process. @@ -19,11 +18,7 @@ class InMemoryPubSub: """ def __init__(self): - self._lock = asyncio.Lock() - # topic -> {session_id -> handler} - self._topic_subscribers: dict[str, dict[str, TopicHandler]] = {} - # session_id -> {topic -> handler} - self._session_topics: dict[str, dict[str, TopicHandler]] = {} + self._hub = PubSubHub() async def subscribe_topic( self, @@ -32,53 +27,19 @@ async def subscribe_topic( handler: TopicHandler, ) -> None: """Subscribe a session to a topic with a handler.""" - logger.debug("pubsub.subscribe_topic(%s, %s)", session_id, topic) - async with self._lock: - # Add to topic subscribers - if topic not in self._topic_subscribers: - self._topic_subscribers[topic] = {} - self._topic_subscribers[topic][session_id] = handler - - # Track for session cleanup - if session_id not in self._session_topics: - self._session_topics[session_id] = {} - self._session_topics[session_id][topic] = handler + await self._hub.subscribe_topic_async(session_id, topic, handler) async def unsubscribe_topic(self, session_id: str, topic: str) -> None: """Unsubscribe a session from a specific topic.""" - logger.debug("pubsub.unsubscribe_topic(%s, %s)", session_id, topic) - async with self._lock: - self._unsubscribe_topic(session_id, topic) + await self._hub.unsubscribe_topic_async(session_id, topic) async def unsubscribe_all(self, session_id: str) -> None: """Remove all subscriptions for a session.""" - logger.debug("pubsub.unsubscribe_all(%s)", session_id) - async with self._lock: - if session_id in self._session_topics: - for topic in list(self._session_topics[session_id].keys()): - self._unsubscribe_topic(session_id, topic) - - def _unsubscribe_topic(self, session_id: str, topic: str) -> None: - """Internal unsubscribe (must be called with lock held).""" - if topic in self._topic_subscribers: - self._topic_subscribers[topic].pop(session_id, None) - if not self._topic_subscribers[topic]: - del self._topic_subscribers[topic] - - if session_id in self._session_topics: - self._session_topics[session_id].pop(topic, None) - if not self._session_topics[session_id]: - del self._session_topics[session_id] + await self._hub.unsubscribe_all_async(session_id) async def broadcast(self, topic: str, message: Any) -> None: """Broadcast a message to all subscribers on a topic.""" - logger.debug("pubsub.broadcast(%s, %s)", topic, message) - async with self._lock: - handlers = list(self._topic_subscribers.get(topic, {}).values()) - - # Dispatch outside the lock to prevent deadlocks - for handler in handlers: - asyncio.create_task(handler(topic, message)) + await self._hub.send_all_on_topic_async(topic, message) async def start(self) -> None: """No-op for in-memory implementation.""" From 239939bab3b2435c689c7ab25a03b8c1f62f309c Mon Sep 17 00:00:00 2001 From: Larry Ogrodnek Date: Sun, 14 Dec 2025 18:52:29 -0500 Subject: [PATCH 6/9] some cleanup --- docs/design/pluggable-pubsub.md | 594 ---------------------- docs/pubsub-backends.md | 448 ++++------------ examples/custom_pubsub/Dockerfile | 13 + examples/custom_pubsub/README.md | 283 +++++++++++ examples/custom_pubsub/app.py | 159 ++++++ examples/custom_pubsub/docker-compose.yml | 72 +++ examples/custom_pubsub/nginx.conf | 35 ++ examples/custom_pubsub/postgres_pubsub.py | 147 ++++++ examples/custom_pubsub/redis_pubsub.py | 179 +++++++ examples/custom_pubsub/requirements.txt | 11 + examples/custom_pubsub/test_pubsub.py | 81 +++ pyview/pyview.py | 21 +- 12 files changed, 1076 insertions(+), 967 deletions(-) delete mode 100644 docs/design/pluggable-pubsub.md create mode 100644 examples/custom_pubsub/Dockerfile create mode 100644 examples/custom_pubsub/README.md create mode 100644 examples/custom_pubsub/app.py create mode 100644 examples/custom_pubsub/docker-compose.yml create mode 100644 examples/custom_pubsub/nginx.conf create mode 100644 examples/custom_pubsub/postgres_pubsub.py create mode 100644 examples/custom_pubsub/redis_pubsub.py create mode 100644 examples/custom_pubsub/requirements.txt create mode 100644 examples/custom_pubsub/test_pubsub.py diff --git a/docs/design/pluggable-pubsub.md b/docs/design/pluggable-pubsub.md deleted file mode 100644 index bb2214f..0000000 --- a/docs/design/pluggable-pubsub.md +++ /dev/null @@ -1,594 +0,0 @@ -# Pluggable Pub/Sub Design for PyView - -## Overview - -This document outlines the design for making PyView's pub/sub system pluggable, enabling multi-machine deployments with backends like Redis, while maintaining backward compatibility and adding no new dependencies to the core library. - -## Current State - -### Architecture -- **`PubSubHub`** (`pyview/vendor/flet/pubsub/pub_sub.py`): Central message hub that stores subscriptions in memory using dictionaries -- **`PubSub`**: Session-scoped wrapper that delegates to `PubSubHub` with automatic `session_id` injection -- **Global singleton**: `pub_sub_hub = PubSubHub()` instantiated at module level in `live_socket.py:40` - -### Current Usage in ConnectedLiveViewSocket - -```python -# live_socket.py -self.pub_sub = PubSub(pub_sub_hub, topic) - -# Methods used: -await self.pub_sub.subscribe_topic_async(topic, handler) -await self.pub_sub.send_all_on_topic_async(topic, message) -await self.pub_sub.unsubscribe_all_async() -``` - -### Limitations -1. In-memory only - doesn't work across multiple server instances -2. Not configurable - hardcoded singleton instantiation -3. No abstraction layer for alternative implementations - ---- - -## Proposed Design - -### Design Goals -1. **Pluggable backends**: Support Redis, PostgreSQL NOTIFY/LISTEN, or custom implementations -2. **Zero new dependencies**: Core PyView remains lightweight -3. **Backward compatible**: Existing code works unchanged -4. **Follow existing patterns**: Use `Protocol` for structural typing (like `InfoEventScheduler`) -5. **Minimal API surface**: Only abstract what's actually needed - -### Core Interface - -Create a Protocol following the existing `InfoEventScheduler` pattern in `pyview/events/info_event.py`: - -```python -# pyview/pubsub/interfaces.py -from typing import Any, Callable, Coroutine, Protocol, runtime_checkable - -# Type alias for async handlers -TopicHandler = Callable[[str, Any], Coroutine[Any, Any, None]] - - -@runtime_checkable -class PubSubProvider(Protocol): - """Protocol for pub/sub implementations. - - Implementations must handle: - - Topic-based subscriptions per session - - Broadcasting messages to all subscribers on a topic - - Proper cleanup when sessions disconnect - - For distributed implementations (Redis, etc.): - - Handlers are local Python callables (not serializable) - - Messages must be serializable (JSON-compatible recommended) - - Implementation should handle cross-instance message routing - - Using Protocol enables structural typing - any class implementing - these methods is compatible, no inheritance required. - """ - - async def subscribe_topic( - self, - session_id: str, - topic: str, - handler: TopicHandler - ) -> None: - """Subscribe a session to a topic with a handler. - - Args: - session_id: Unique identifier for the session - topic: Topic name to subscribe to - handler: Async callable(topic, message) to invoke on messages - """ - ... - - async def unsubscribe_topic(self, session_id: str, topic: str) -> None: - """Unsubscribe a session from a specific topic.""" - ... - - async def unsubscribe_all(self, session_id: str) -> None: - """Remove all subscriptions for a session (cleanup on disconnect).""" - ... - - async def broadcast(self, topic: str, message: Any) -> None: - """Broadcast a message to all subscribers on a topic. - - Note: For distributed implementations, `message` should be - JSON-serializable. Complex objects should be converted before - broadcasting. - """ - ... - - async def start(self) -> None: - """Called when the PyView app starts. Override for setup.""" - ... - - async def stop(self) -> None: - """Called when the PyView app shuts down. Override for cleanup.""" - ... -``` - -Note: Using `...` (Ellipsis) instead of `pass` is the convention for Protocol method stubs. - -### Default In-Memory Implementation - -Refactor the existing `PubSubHub` to satisfy the protocol (no inheritance needed): - -```python -# pyview/pubsub/memory.py -import asyncio -from typing import Any - -from .interfaces import TopicHandler - - -class InMemoryPubSub: - """Default in-memory pub/sub implementation. - - Suitable for single-instance deployments. Messages are delivered - only within the same Python process. - """ - - def __init__(self): - self._lock = asyncio.Lock() - # topic -> {session_id -> handler} - self._topic_subscribers: dict[str, dict[str, TopicHandler]] = {} - # session_id -> {topic -> handler} - self._session_topics: dict[str, dict[str, TopicHandler]] = {} - - async def subscribe_topic( - self, - session_id: str, - topic: str, - handler: TopicHandler - ) -> None: - async with self._lock: - # Add to topic subscribers - if topic not in self._topic_subscribers: - self._topic_subscribers[topic] = {} - self._topic_subscribers[topic][session_id] = handler - - # Track for session cleanup - if session_id not in self._session_topics: - self._session_topics[session_id] = {} - self._session_topics[session_id][topic] = handler - - async def unsubscribe_topic(self, session_id: str, topic: str) -> None: - async with self._lock: - if topic in self._topic_subscribers: - self._topic_subscribers[topic].pop(session_id, None) - if not self._topic_subscribers[topic]: - del self._topic_subscribers[topic] - - if session_id in self._session_topics: - self._session_topics[session_id].pop(topic, None) - if not self._session_topics[session_id]: - del self._session_topics[session_id] - - async def unsubscribe_all(self, session_id: str) -> None: - async with self._lock: - if session_id in self._session_topics: - for topic in list(self._session_topics[session_id].keys()): - if topic in self._topic_subscribers: - self._topic_subscribers[topic].pop(session_id, None) - if not self._topic_subscribers[topic]: - del self._topic_subscribers[topic] - del self._session_topics[session_id] - - async def broadcast(self, topic: str, message: Any) -> None: - async with self._lock: - handlers = list(self._topic_subscribers.get(topic, {}).values()) - - # Dispatch outside the lock to prevent deadlocks - for handler in handlers: - asyncio.create_task(handler(topic, message)) - - async def start(self) -> None: - """No-op for in-memory implementation.""" - pass - - async def stop(self) -> None: - """No-op for in-memory implementation.""" - pass -``` - -### Session-Scoped Wrapper - -Keep a thin wrapper for ergonomic use in sockets: - -```python -# pyview/pubsub/session.py -from typing import Any - -from .interfaces import PubSubProvider, TopicHandler - - -class SessionPubSub: - """Session-scoped pub/sub wrapper. - - Provides a convenient API that automatically includes the session_id - in all operations. - """ - - def __init__(self, provider: PubSubProvider, session_id: str): - self._provider = provider - self._session_id = session_id - - async def subscribe(self, topic: str, handler: TopicHandler) -> None: - """Subscribe to a topic.""" - await self._provider.subscribe_topic(self._session_id, topic, handler) - - async def unsubscribe(self, topic: str) -> None: - """Unsubscribe from a specific topic.""" - await self._provider.unsubscribe_topic(self._session_id, topic) - - async def unsubscribe_all(self) -> None: - """Unsubscribe from all topics (called on disconnect).""" - await self._provider.unsubscribe_all(self._session_id) - - async def broadcast(self, topic: str, message: Any) -> None: - """Broadcast a message to all subscribers on a topic.""" - await self._provider.broadcast(topic, message) -``` - -### Integration with PyView - -Update `PyView.__init__()` to accept an optional pub/sub provider: - -```python -# pyview/pyview.py -from pyview.pubsub import PubSubProvider, InMemoryPubSub - -class PyView(Starlette): - def __init__( - self, - *args, - instrumentation: Optional[InstrumentationProvider] = None, - pubsub: Optional[PubSubProvider] = None, # NEW - **kwargs - ): - # ... - self.pubsub = pubsub or InMemoryPubSub() - self.live_handler = LiveSocketHandler( - self.view_lookup, - self.instrumentation, - self.pubsub # Pass to handler - ) -``` - -Update lifecycle management: - -```python -# In _create_lifespan() -async def lifespan(app): - app.live_handler.start_scheduler() - await app.pubsub.start() # NEW: Initialize pub/sub - - if user_lifespan: - async with user_lifespan(app): - yield - else: - yield - - await app.pubsub.stop() # NEW: Cleanup pub/sub - await app.live_handler.shutdown_scheduler() -``` - -Update `ConnectedLiveViewSocket`: - -```python -# live_socket.py -from pyview.pubsub import SessionPubSub, PubSubProvider - -class ConnectedLiveViewSocket(Generic[T]): - def __init__( - self, - websocket: WebSocket, - topic: str, - liveview: LiveView, - scheduler: AsyncIOScheduler, - instrumentation: InstrumentationProvider, - pubsub_provider: PubSubProvider, # NEW - ): - # ... - self._pubsub = SessionPubSub(pubsub_provider, topic) - - async def subscribe(self, topic: str): - await self._pubsub.subscribe(topic, self._topic_callback_internal) - - async def broadcast(self, topic: str, message: Any): - await self._pubsub.broadcast(topic, message) - - async def close(self): - # ... - await self._pubsub.unsubscribe_all() -``` - ---- - -## Redis Implementation Example - -This would be a **separate package** (e.g., `pyview-redis`) that users install if needed: - -```python -# Example: pyview_redis/pubsub.py -import asyncio -import json -from typing import Any - -import redis.asyncio as redis - -from pyview.pubsub import TopicHandler - - -class RedisPubSub: - """Redis-backed pub/sub for multi-instance deployments. - - Install: pip install pyview-redis - - Usage: - from pyview_redis import RedisPubSub - - app = PyView(pubsub=RedisPubSub("redis://localhost:6379")) - """ - - def __init__( - self, - url: str = "redis://localhost:6379", - channel_prefix: str = "pyview:", - ): - self._url = url - self._prefix = channel_prefix - self._client: redis.Redis | None = None - self._pubsub: redis.client.PubSub | None = None - self._listener_task: asyncio.Task | None = None - - # Local handler tracking (handlers are not serializable) - self._lock = asyncio.Lock() - self._topic_subscribers: dict[str, dict[str, TopicHandler]] = {} - self._session_topics: dict[str, dict[str, TopicHandler]] = {} - - async def start(self) -> None: - """Connect to Redis and start listening for messages.""" - self._client = redis.from_url(self._url) - self._pubsub = self._client.pubsub() - self._listener_task = asyncio.create_task(self._listen()) - - async def stop(self) -> None: - """Disconnect from Redis and cleanup.""" - if self._listener_task: - self._listener_task.cancel() - try: - await self._listener_task - except asyncio.CancelledError: - pass - - if self._pubsub: - await self._pubsub.close() - - if self._client: - await self._client.close() - - async def subscribe_topic( - self, - session_id: str, - topic: str, - handler: TopicHandler - ) -> None: - channel = f"{self._prefix}{topic}" - - async with self._lock: - # Track handler locally - if topic not in self._topic_subscribers: - self._topic_subscribers[topic] = {} - # First subscriber to topic - subscribe in Redis - await self._pubsub.subscribe(channel) - - self._topic_subscribers[topic][session_id] = handler - - if session_id not in self._session_topics: - self._session_topics[session_id] = {} - self._session_topics[session_id][topic] = handler - - async def unsubscribe_topic(self, session_id: str, topic: str) -> None: - channel = f"{self._prefix}{topic}" - - async with self._lock: - if topic in self._topic_subscribers: - self._topic_subscribers[topic].pop(session_id, None) - - if not self._topic_subscribers[topic]: - # Last subscriber - unsubscribe from Redis - del self._topic_subscribers[topic] - await self._pubsub.unsubscribe(channel) - - if session_id in self._session_topics: - self._session_topics[session_id].pop(topic, None) - if not self._session_topics[session_id]: - del self._session_topics[session_id] - - async def unsubscribe_all(self, session_id: str) -> None: - async with self._lock: - if session_id in self._session_topics: - for topic in list(self._session_topics[session_id].keys()): - channel = f"{self._prefix}{topic}" - - if topic in self._topic_subscribers: - self._topic_subscribers[topic].pop(session_id, None) - - if not self._topic_subscribers[topic]: - del self._topic_subscribers[topic] - await self._pubsub.unsubscribe(channel) - - del self._session_topics[session_id] - - async def broadcast(self, topic: str, message: Any) -> None: - """Publish message to Redis channel.""" - channel = f"{self._prefix}{topic}" - payload = json.dumps({"topic": topic, "message": message}) - await self._client.publish(channel, payload) - - async def _listen(self) -> None: - """Background task to receive Redis messages and dispatch to handlers.""" - async for message in self._pubsub.listen(): - if message["type"] == "message": - try: - data = json.loads(message["data"]) - topic = data["topic"] - payload = data["message"] - - async with self._lock: - handlers = list( - self._topic_subscribers.get(topic, {}).values() - ) - - for handler in handlers: - asyncio.create_task(handler(topic, payload)) - - except Exception as e: - # Log but don't crash the listener - import logging - logging.error(f"Error processing Redis message: {e}") -``` - -### Usage Example - -```python -# app.py -from pyview import PyView -from pyview_redis import RedisPubSub # Separate package - -# Single instance (default) -app = PyView() - -# Multi-instance with Redis -app = PyView( - pubsub=RedisPubSub( - url="redis://localhost:6379", - channel_prefix="myapp:" - ) -) -``` - ---- - -## Alternative Backend Examples - -### PostgreSQL NOTIFY/LISTEN - -```python -# Example structure for PostgreSQL backend -class PostgresPubSub: - """PostgreSQL NOTIFY/LISTEN for deployments already using PostgreSQL. - - Pros: No additional infrastructure if you have PostgreSQL - Cons: Less throughput than Redis, doesn't persist messages - - Satisfies PubSubProvider protocol via structural typing. - """ - - def __init__(self, dsn: str, channel_prefix: str = "pyview_"): - self._dsn = dsn - self._prefix = channel_prefix - # ... similar pattern to Redis - - async def broadcast(self, topic: str, message: Any) -> None: - channel = f"{self._prefix}{topic}" - payload = json.dumps(message) - await self._conn.execute(f"NOTIFY {channel}, '{payload}'") - - # ... implement other protocol methods -``` - -### NATS - -```python -# Example structure for NATS backend -class NatsPubSub: - """NATS for high-performance distributed messaging. - - Pros: Very high throughput, built for pub/sub - Cons: Additional infrastructure to manage - - Satisfies PubSubProvider protocol via structural typing. - """ - # ... implement protocol methods -``` - ---- - -## File Structure - -``` -pyview/ -├── pubsub/ -│ ├── __init__.py # Exports: PubSubProvider, InMemoryPubSub, SessionPubSub -│ ├── interfaces.py # PubSubProvider Protocol -│ ├── memory.py # InMemoryPubSub implementation -│ └── session.py # SessionPubSub wrapper -├── vendor/ -│ └── flet/ -│ └── pubsub/ # DEPRECATED - keep for backward compat temporarily -├── live_socket.py # Updated to use new pub/sub -├── pyview.py # Updated with pubsub parameter -└── ws_handler.py # Updated to pass pubsub to sockets -``` - ---- - -## Migration Path - -### Phase 1: Add New Interface (Non-Breaking) -1. Create `pyview/pubsub/` module with interface and in-memory impl -2. Add optional `pubsub` parameter to `PyView.__init__()` -3. Default to `InMemoryPubSub` when not provided -4. Update internal code to use new interface -5. Keep old `vendor/flet/pubsub` working for any direct imports - -### Phase 2: Documentation -1. Document the `PubSubProvider` interface -2. Provide Redis implementation example -3. Add guide for implementing custom backends - -### Phase 3: Deprecation (Future) -1. Add deprecation warning to `vendor/flet/pubsub` imports -2. Eventually remove in a future major version - ---- - -## Considerations - -### Message Serialization -For distributed backends, messages must be serializable. Recommend documenting: -- Use JSON-serializable types (dict, list, str, int, float, bool, None) -- For dataclasses, convert with `asdict()` before broadcasting -- Complex objects need explicit serialization - -### Handler Execution -- Handlers are async callables executed via `asyncio.create_task()` -- Distributed implementations must route messages to local handlers only -- A session's handler is only callable within the same Python process - -### Error Handling -- Implementations should not raise on message delivery failure to one handler -- Failed handlers shouldn't prevent delivery to other subscribers -- Connection errors (Redis down) should be logged, with optional retry logic - -### Testing -- Add mock/test implementation: `TestPubSub` that records all operations -- Useful for integration testing LiveViews that use pub/sub - ---- - -## Summary - -This design: -1. **Mirrors existing patterns** - Uses `Protocol` like `InfoEventScheduler` -2. **Structural typing** - No inheritance required, just implement the methods -3. **Minimal interface** - Only 4 core methods + 2 lifecycle hooks -4. **Zero new deps** - Core PyView stays lightweight -5. **Easy to implement** - Redis example is ~100 lines -6. **Backward compatible** - Default in-memory behavior unchanged -7. **Production ready** - Lifecycle hooks for proper startup/shutdown diff --git a/docs/pubsub-backends.md b/docs/pubsub-backends.md index 26f459e..cffa7cd 100644 --- a/docs/pubsub-backends.md +++ b/docs/pubsub-backends.md @@ -1,6 +1,6 @@ # Custom Pub/Sub Backends -PyView's pub/sub system is pluggable, allowing you to use distributed backends like Redis for multi-machine deployments. This guide shows how to implement custom backends. +PyView's pub/sub system is pluggable, allowing you to use distributed backends like Redis for multi-machine deployments. ## Overview @@ -10,7 +10,7 @@ By default, PyView uses an in-memory pub/sub implementation suitable for single- # Default (in-memory) app = PyView() -# Custom backend +# Custom backend (Redis, PostgreSQL, etc.) app = PyView(pubsub=RedisPubSub("redis://localhost:6379")) ``` @@ -49,207 +49,31 @@ class PubSubProvider: ... ``` -## Redis Implementation +## Quick Start with Redis -Here's a complete Redis implementation you can use as a starting point: +### 1. Install Redis Client -```python -# redis_pubsub.py -import asyncio -import json -import logging -from typing import Any, Callable, Coroutine - -import redis.asyncio as redis - -logger = logging.getLogger(__name__) - -TopicHandler = Callable[[str, Any], Coroutine[Any, Any, None]] - - -class RedisPubSub: - """Redis-backed pub/sub for multi-instance PyView deployments. - - Requirements: - pip install redis - - Usage: - from redis_pubsub import RedisPubSub - - app = PyView(pubsub=RedisPubSub("redis://localhost:6379")) - - How it works: - - Handlers are stored locally (they're Python callables, not serializable) - - When broadcast() is called, the message is published to Redis - - All instances receive the message and dispatch to their local handlers - - This enables real-time updates across multiple server instances - """ - - def __init__( - self, - url: str = "redis://localhost:6379", - channel_prefix: str = "pyview:", - ): - """Initialize Redis pub/sub. - - Args: - url: Redis connection URL - channel_prefix: Prefix for Redis channel names (helps avoid collisions) - """ - self._url = url - self._prefix = channel_prefix - self._client: redis.Redis | None = None - self._pubsub: redis.client.PubSub | None = None - self._listener_task: asyncio.Task | None = None - - # Local handler tracking (handlers can't be serialized to Redis) - self._lock = asyncio.Lock() - # topic -> {session_id -> handler} - self._topic_subscribers: dict[str, dict[str, TopicHandler]] = {} - # session_id -> {topic -> handler} - self._session_topics: dict[str, dict[str, TopicHandler]] = {} - - async def start(self) -> None: - """Connect to Redis and start the message listener.""" - self._client = redis.from_url(self._url) - self._pubsub = self._client.pubsub() - self._listener_task = asyncio.create_task(self._listen()) - logger.info("Redis pub/sub connected to %s", self._url) - - async def stop(self) -> None: - """Disconnect from Redis and clean up.""" - if self._listener_task: - self._listener_task.cancel() - try: - await self._listener_task - except asyncio.CancelledError: - pass - - if self._pubsub: - await self._pubsub.close() - - if self._client: - await self._client.close() - - logger.info("Redis pub/sub disconnected") - - async def subscribe_topic( - self, - session_id: str, - topic: str, - handler: TopicHandler, - ) -> None: - """Subscribe a session to a topic.""" - channel = f"{self._prefix}{topic}" - - async with self._lock: - # Track handler locally - if topic not in self._topic_subscribers: - self._topic_subscribers[topic] = {} - # First local subscriber - subscribe to Redis channel - await self._pubsub.subscribe(channel) - logger.debug("Subscribed to Redis channel: %s", channel) - - self._topic_subscribers[topic][session_id] = handler - - # Track for session cleanup - if session_id not in self._session_topics: - self._session_topics[session_id] = {} - self._session_topics[session_id][topic] = handler - - async def unsubscribe_topic(self, session_id: str, topic: str) -> None: - """Unsubscribe a session from a topic.""" - channel = f"{self._prefix}{topic}" - - async with self._lock: - if topic in self._topic_subscribers: - self._topic_subscribers[topic].pop(session_id, None) - - if not self._topic_subscribers[topic]: - # Last local subscriber - unsubscribe from Redis - del self._topic_subscribers[topic] - await self._pubsub.unsubscribe(channel) - logger.debug("Unsubscribed from Redis channel: %s", channel) - - if session_id in self._session_topics: - self._session_topics[session_id].pop(topic, None) - if not self._session_topics[session_id]: - del self._session_topics[session_id] - - async def unsubscribe_all(self, session_id: str) -> None: - """Remove all subscriptions for a session (called on disconnect).""" - async with self._lock: - if session_id not in self._session_topics: - return - - for topic in list(self._session_topics[session_id].keys()): - channel = f"{self._prefix}{topic}" - - if topic in self._topic_subscribers: - self._topic_subscribers[topic].pop(session_id, None) - - if not self._topic_subscribers[topic]: - del self._topic_subscribers[topic] - await self._pubsub.unsubscribe(channel) - - del self._session_topics[session_id] - - async def broadcast(self, topic: str, message: Any) -> None: - """Publish a message to all subscribers across all instances.""" - channel = f"{self._prefix}{topic}" - # Include topic in payload for routing on receive - payload = json.dumps({"topic": topic, "message": message}) - await self._client.publish(channel, payload) - - async def _listen(self) -> None: - """Background task that receives Redis messages and dispatches to handlers.""" - try: - async for message in self._pubsub.listen(): - if message["type"] != "message": - continue - - try: - data = json.loads(message["data"]) - topic = data["topic"] - payload = data["message"] - - # Get handlers while holding lock - async with self._lock: - handlers = list( - self._topic_subscribers.get(topic, {}).values() - ) - - # Dispatch outside lock to prevent deadlocks - for handler in handlers: - asyncio.create_task(handler(topic, payload)) - - except json.JSONDecodeError: - logger.warning("Invalid JSON in Redis message: %s", message["data"]) - except Exception: - logger.exception("Error processing Redis message") - - except asyncio.CancelledError: - pass - except Exception: - logger.exception("Redis listener crashed") +```bash +pip install redis ``` -### Usage Example +### 2. Configure Your App ```python -# app.py from pyview import PyView -from redis_pubsub import RedisPubSub +from redis_pubsub import RedisPubSub # See examples below -# Create app with Redis pub/sub app = PyView( pubsub=RedisPubSub( url="redis://localhost:6379", - channel_prefix="myapp:", # Namespace your channels + channel_prefix="myapp:" ) ) +``` + +### 3. Your LiveViews Work Exactly the Same -# Your LiveViews work exactly the same +```python @app.add_live_view("/counter") class CounterLiveView(LiveView): async def mount(self, socket, session): @@ -267,7 +91,7 @@ class CounterLiveView(LiveView): socket.context["count"] = event.payload ``` -### Running Multiple Instances +### 4. Run Multiple Instances ```bash # Terminal 1 @@ -282,215 +106,117 @@ uvicorn app:app --port 8002 With a load balancer in front, users on different instances will see real-time updates from each other. -## PostgreSQL Implementation - -If you're already using PostgreSQL, you can use NOTIFY/LISTEN: +## Complete Examples -```python -# postgres_pubsub.py -import asyncio -import json -import logging -from typing import Any, Callable, Coroutine - -import asyncpg +Full working implementations are available in [`examples/custom_pubsub/`](../../examples/custom_pubsub/): -logger = logging.getLogger(__name__) - -TopicHandler = Callable[[str, Any], Coroutine[Any, Any, None]] +- **[`redis_pubsub.py`](../../examples/custom_pubsub/redis_pubsub.py)** - Redis backend (recommended for production) +- **[`postgres_pubsub.py`](../../examples/custom_pubsub/postgres_pubsub.py)** - PostgreSQL NOTIFY/LISTEN backend +- **[`test_pubsub.py`](../../examples/custom_pubsub/test_pubsub.py)** - Test backend for unit testing +- **[`app.py`](../../examples/custom_pubsub/app.py)** - Demo counter app +- **[`docker-compose.yml`](../../examples/custom_pubsub/docker-compose.yml)** - Redis + Postgres services +See the [examples README](../../examples/custom_pubsub/README.md) for setup instructions and a working demo. -class PostgresPubSub: - """PostgreSQL NOTIFY/LISTEN pub/sub backend. +## Implementation Guide - Requirements: - pip install asyncpg +### Key Concepts - Usage: - app = PyView(pubsub=PostgresPubSub("postgresql://user:pass@localhost/db")) +When implementing a custom backend, understand these constraints: - Pros: - - No additional infrastructure if you already use PostgreSQL - - Transactional guarantees available if needed +**Handlers are local** - Handlers are Python async callables that can't be serialized. Each instance must: +1. Store handlers in local memory +2. Publish only message data to the distributed backend +3. Route received messages to local handlers only - Cons: - - Lower throughput than Redis - - Not designed for high-volume pub/sub - """ +**Messages must be serializable** - For distributed backends, messages should be JSON-compatible: - def __init__(self, dsn: str, channel_prefix: str = "pyview_"): - self._dsn = dsn - self._prefix = channel_prefix - self._conn: asyncpg.Connection | None = None - self._listen_conn: asyncpg.Connection | None = None +```python +# Good - JSON serializable +await socket.broadcast("updates", {"user_id": 123, "action": "joined"}) +await socket.broadcast("counter", 42) - self._lock = asyncio.Lock() - self._topic_subscribers: dict[str, dict[str, TopicHandler]] = {} - self._session_topics: dict[str, dict[str, TopicHandler]] = {} - self._subscribed_channels: set[str] = set() +# Bad - not serializable +await socket.broadcast("data", my_dataclass) # Convert with asdict() first +await socket.broadcast("func", some_function) # Can't serialize functions +``` - async def start(self) -> None: - """Connect to PostgreSQL.""" - # Separate connections for publish and listen - self._conn = await asyncpg.connect(self._dsn) - self._listen_conn = await asyncpg.connect(self._dsn) - logger.info("PostgreSQL pub/sub connected") +### Implementation Checklist - async def stop(self) -> None: - """Disconnect from PostgreSQL.""" - if self._conn: - await self._conn.close() - if self._listen_conn: - await self._listen_conn.close() - logger.info("PostgreSQL pub/sub disconnected") - - async def subscribe_topic( - self, - session_id: str, - topic: str, - handler: TopicHandler, - ) -> None: - """Subscribe to a topic using LISTEN.""" - channel = f"{self._prefix}{topic}" - - async with self._lock: - if topic not in self._topic_subscribers: - self._topic_subscribers[topic] = {} - - if channel not in self._subscribed_channels: - await self._listen_conn.add_listener( - channel, self._make_listener(topic) - ) - self._subscribed_channels.add(channel) - - self._topic_subscribers[topic][session_id] = handler - - if session_id not in self._session_topics: - self._session_topics[session_id] = {} - self._session_topics[session_id][topic] = handler - - def _make_listener(self, topic: str): - """Create a listener callback for a topic.""" - def listener(conn, pid, channel, payload): - asyncio.create_task(self._handle_notification(topic, payload)) - return listener - - async def _handle_notification(self, topic: str, payload: str) -> None: - """Handle incoming NOTIFY.""" - try: - message = json.loads(payload) - - async with self._lock: - handlers = list(self._topic_subscribers.get(topic, {}).values()) - - for handler in handlers: - asyncio.create_task(handler(topic, message)) - except Exception: - logger.exception("Error handling PostgreSQL notification") +When building a custom backend: - async def unsubscribe_topic(self, session_id: str, topic: str) -> None: - """Unsubscribe from a topic.""" - channel = f"{self._prefix}{topic}" +- [ ] Store handlers in memory per instance (dict[str, dict[str, TopicHandler]]) +- [ ] Subscribe to distributed backend only when first local handler subscribes +- [ ] Publish messages as JSON to distributed backend +- [ ] Listen for messages from distributed backend +- [ ] Route received messages to local handlers using `asyncio.create_task()` +- [ ] Handle errors gracefully (one failing handler shouldn't affect others) +- [ ] Clean up connections in `stop()` - async with self._lock: - if topic in self._topic_subscribers: - self._topic_subscribers[topic].pop(session_id, None) +### Backend Comparison - if not self._topic_subscribers[topic]: - del self._topic_subscribers[topic] - if channel in self._subscribed_channels: - await self._listen_conn.remove_listener(channel, None) - self._subscribed_channels.discard(channel) +| Backend | Best For | Throughput | Setup Complexity | +|---------|----------|------------|------------------| +| **InMemory** (default) | Single instance, development | High | None | +| **Redis** | Production, multi-instance | Very High | Low (just Redis) | +| **PostgreSQL** | Already using Postgres | Medium | Low (use existing DB) | +| **Test** | Unit/integration tests | N/A | None | - if session_id in self._session_topics: - self._session_topics[session_id].pop(topic, None) - if not self._session_topics[session_id]: - del self._session_topics[session_id] +### Testing Your Backend - async def unsubscribe_all(self, session_id: str) -> None: - """Remove all subscriptions for a session.""" - async with self._lock: - if session_id not in self._session_topics: - return +Use the test implementation to verify your LiveView's pub/sub behavior: - for topic in list(self._session_topics[session_id].keys()): - channel = f"{self._prefix}{topic}" +```python +from test_pubsub import TestPubSub - if topic in self._topic_subscribers: - self._topic_subscribers[topic].pop(session_id, None) +def test_counter_broadcasts(): + test_pubsub = TestPubSub() + app = PyView(pubsub=test_pubsub) - if not self._topic_subscribers[topic]: - del self._topic_subscribers[topic] - if channel in self._subscribed_channels: - await self._listen_conn.remove_listener(channel, None) - self._subscribed_channels.discard(channel) + # ... test your LiveView ... - del self._session_topics[session_id] + # Verify subscriptions + assert ("session_123", "counter") in test_pubsub.subscriptions - async def broadcast(self, topic: str, message: Any) -> None: - """Broadcast using NOTIFY.""" - channel = f"{self._prefix}{topic}" - payload = json.dumps(message) - # PostgreSQL NOTIFY has an 8000 byte payload limit - await self._conn.execute(f"NOTIFY {channel}, $1", payload) + # Verify broadcasts + assert ("counter", 5) in test_pubsub.broadcasts ``` -## Key Implementation Notes - -### Handlers Are Local - -Handlers are Python async functions - they can't be serialized and sent over the network. Your implementation must: - -1. Store handlers in local memory (per-instance) -2. Publish only the message data to the distributed backend -3. Route received messages to local handlers +## Production Considerations -### Message Serialization +### Environment Configuration -Messages must be JSON-serializable for distributed backends: +Use environment variables for connection URLs: ```python -# Good - JSON serializable -await socket.broadcast("updates", {"user_id": 123, "action": "joined"}) -await socket.broadcast("counter", 42) +import os -# Bad - not serializable -await socket.broadcast("data", my_dataclass) # Convert with asdict() first -await socket.broadcast("func", some_function) # Can't serialize functions +redis_url = os.getenv("REDIS_URL", "redis://localhost:6379") +app = PyView(pubsub=RedisPubSub(redis_url)) ``` ### Error Handling -Your implementation should: - -- Not crash if one handler fails (isolate errors) -- Log but continue on malformed messages +Implementations should: +- Log but not crash on malformed messages +- Isolate handler errors (use try/except around each handler call) - Handle reconnection for network failures +- Clean up resources properly in `stop()` -### Testing +### Channel Prefixes -Consider creating a test implementation: +Use channel prefixes to avoid collisions when multiple apps share the same Redis/Postgres: ```python -class TestPubSub: - """Records all pub/sub operations for testing.""" - - def __init__(self): - self.subscriptions: list[tuple[str, str]] = [] # (session_id, topic) - self.broadcasts: list[tuple[str, Any]] = [] # (topic, message) - self.handlers: dict[str, dict[str, TopicHandler]] = {} +# App 1 +app1 = PyView(pubsub=RedisPubSub(url, channel_prefix="app1:")) - async def subscribe_topic(self, session_id: str, topic: str, handler: TopicHandler) -> None: - self.subscriptions.append((session_id, topic)) - if topic not in self.handlers: - self.handlers[topic] = {} - self.handlers[topic][session_id] = handler +# App 2 +app2 = PyView(pubsub=RedisPubSub(url, channel_prefix="app2:")) +``` - async def broadcast(self, topic: str, message: Any) -> None: - self.broadcasts.append((topic, message)) - # Immediately dispatch to local handlers for testing - for handler in self.handlers.get(topic, {}).values(): - await handler(topic, message) +## Next Steps - # ... other methods -``` +- Check out the [working examples](../../examples/custom_pubsub/) with Docker Compose +- Review the [PubSubProvider Protocol source](../../pyview/pubsub/interfaces.py) +- Consider implementing backends for NATS, RabbitMQ, or AWS SNS/SQS diff --git a/examples/custom_pubsub/Dockerfile b/examples/custom_pubsub/Dockerfile new file mode 100644 index 0000000..82febbd --- /dev/null +++ b/examples/custom_pubsub/Dockerfile @@ -0,0 +1,13 @@ +FROM python:3.11-slim + +WORKDIR /app + +# Install dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code +COPY . . + +# Run the app +CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/examples/custom_pubsub/README.md b/examples/custom_pubsub/README.md new file mode 100644 index 0000000..b0bfdb1 --- /dev/null +++ b/examples/custom_pubsub/README.md @@ -0,0 +1,283 @@ +# Custom Pub/Sub Backend Examples + +This directory contains complete, working implementations of custom pub/sub backends for PyView, along with a demo app and Docker Compose setup for testing. + +## What's Included + +- **`redis_pubsub.py`** - Redis-backed pub/sub (recommended for production) +- **`postgres_pubsub.py`** - PostgreSQL NOTIFY/LISTEN pub/sub +- **`test_pubsub.py`** - Test implementation for unit testing +- **`app.py`** - Demo counter app showing multi-instance pub/sub +- **`docker-compose.yml`** - Redis and PostgreSQL services + +## Quick Start + +### Option 1: Everything with Docker (Easiest) + +Run the entire demo with one command: + +```bash +docker-compose up --build +``` + +This starts: +- Redis on `localhost:6379` +- PostgreSQL on `localhost:5432` +- Three app instances (app1, app2, app3) +- Nginx load balancer on `localhost:8000` + +**Test it**: Open http://localhost:8000 in multiple browsers and click increment. All browsers stay in sync via Redis pub/sub! 🎉 + +To see which backend instance you're connected to: +- http://localhost:8001 (app1 directly) +- http://localhost:8002 (app2 directly) +- http://localhost:8003 (app3 directly) +- http://localhost:8000 (load balanced across all three) + +### Option 2: Run Locally + +If you want to run the app locally (for development): + +#### 1. Install Dependencies + +```bash +pip install -r requirements.txt +``` + +#### 2. Start Redis + +```bash +docker-compose up -d redis postgres +``` + +This starts just Redis and PostgreSQL, without the app instances. + +#### 3. Run Multiple App Instances + +Open three terminals and run: + +```bash +# Terminal 1 +uvicorn app:app --port 8000 + +# Terminal 2 +uvicorn app:app --port 8001 + +# Terminal 3 +uvicorn app:app --port 8002 +``` + +#### 4. Test It Out + +1. Open http://localhost:8000 in one browser +2. Open http://localhost:8001 in another browser +3. Click increment on either one +4. Watch both update in real-time! 🎉 + +The counter stays in sync across all instances because they're communicating through Redis pub/sub. + +## Using in Your App + +### Redis (Recommended) + +```python +from pyview import PyView +from redis_pubsub import RedisPubSub + +app = PyView( + pubsub=RedisPubSub( + url="redis://localhost:6379", + channel_prefix="myapp:" + ) +) +``` + +**When to use**: Production deployments with multiple app instances behind a load balancer. + +### PostgreSQL + +```python +from pyview import PyView +from postgres_pubsub import PostgresPubSub + +app = PyView( + pubsub=PostgresPubSub( + dsn="postgresql://pyview:pyview@localhost/pyview", + channel_prefix="myapp_" + ) +) +``` + +**When to use**: You're already using PostgreSQL and want to avoid adding Redis. Note: Lower throughput than Redis. + +### Testing + +```python +from pyview import PyView +from test_pubsub import TestPubSub + +test_pubsub = TestPubSub() +app = PyView(pubsub=test_pubsub) + +# In your tests +assert ("session_123", "updates") in test_pubsub.subscriptions +assert ("updates", {"action": "joined"}) in test_pubsub.broadcasts +``` + +**When to use**: Unit and integration tests where you don't want external dependencies. + +## Implementation Details + +### How It Works + +Distributed pub/sub backends must handle a key challenge: **handlers are local Python callables** that can't be serialized and sent over the network. The solution: + +1. **Local handler storage**: Each instance stores handlers in memory +2. **Message broadcasting**: Only message data is sent through Redis/Postgres +3. **Local dispatch**: Each instance routes received messages to its own handlers + +### Redis Implementation + +The Redis implementation uses: +- `redis.asyncio` for async operations +- Pub/sub channels for message distribution +- Reference counting for channel subscriptions (only subscribe when first local handler subscribes) +- Background listener task for receiving messages + +### PostgreSQL Implementation + +The PostgreSQL implementation uses: +- `asyncpg` for async operations +- `NOTIFY`/`LISTEN` for pub/sub +- Separate connections for publish and listen +- Callback-based notification handling + +### Message Serialization + +Messages must be JSON-serializable: + +```python +# Good +await socket.broadcast("updates", {"user_id": 123, "action": "joined"}) +await socket.broadcast("counter", 42) + +# Bad - won't work with distributed backends +await socket.broadcast("data", my_dataclass) # Use asdict() first +await socket.broadcast("func", some_function) # Can't serialize functions +``` + +## Implementing Your Own Backend + +To create a custom backend, implement these methods: + +```python +class MyPubSub: + async def subscribe_topic(self, session_id: str, topic: str, handler: TopicHandler) -> None: + """Subscribe a session to a topic.""" + ... + + async def unsubscribe_topic(self, session_id: str, topic: str) -> None: + """Unsubscribe from a topic.""" + ... + + async def unsubscribe_all(self, session_id: str) -> None: + """Remove all subscriptions for a session.""" + ... + + async def broadcast(self, topic: str, message: Any) -> None: + """Broadcast a message to all subscribers.""" + ... + + async def start(self) -> None: + """Called when the app starts.""" + ... + + async def stop(self) -> None: + """Called when the app shuts down.""" + ... +``` + +See the [PubSubProvider Protocol](../../pyview/pubsub/interfaces.py) for full documentation. + +## Production Deployment + +### Environment Variables + +The demo app reads `REDIS_URL` from the environment: + +```bash +export REDIS_URL="redis://your-redis-host:6379" +uvicorn app:app +``` + +### Connection Pooling + +For production, consider connection pooling: + +```python +class RedisPubSub: + def __init__(self, url: str, max_connections: int = 50): + self._pool = redis.ConnectionPool.from_url( + url, + max_connections=max_connections, + decode_responses=False + ) + self._client = redis.Redis(connection_pool=self._pool) +``` + +### Error Handling + +Both implementations include: +- Graceful handling of malformed messages +- Isolated error handling (one failing handler doesn't affect others) +- Proper cleanup on shutdown + +### Monitoring + +Add metrics for: +- Active subscriptions per instance +- Message publish rate +- Message receive rate +- Handler execution time + +## Troubleshooting + +### Redis connection errors + +``` +redis.exceptions.ConnectionError: Error connecting to Redis +``` + +Make sure Redis is running: +```bash +docker-compose up -d redis +redis-cli ping # Should return PONG +``` + +### PostgreSQL connection errors + +``` +asyncpg.exceptions.CannotConnectNowError +``` + +Check PostgreSQL is running and credentials are correct: +```bash +docker-compose up -d postgres +psql postgresql://pyview:pyview@localhost/pyview -c "SELECT 1" +``` + +### Messages not syncing across instances + +- Check all instances are connected to the same Redis/Postgres +- Verify `channel_prefix` is the same across all instances +- Check logs for subscription/broadcast errors + +## Next Steps + +- See the [pub/sub documentation](../../docs/pubsub-backends.md) for more details +- Check out [production deployment patterns](../../docs/deployment.md) (if available) +- Consider implementing NATS, RabbitMQ, or AWS SNS/SQS backends + +## License + +These examples are part of the PyView project and use the same license. diff --git a/examples/custom_pubsub/app.py b/examples/custom_pubsub/app.py new file mode 100644 index 0000000..850233e --- /dev/null +++ b/examples/custom_pubsub/app.py @@ -0,0 +1,159 @@ +"""Demo app showing multi-instance pub/sub with Redis. + +This counter app demonstrates how pub/sub enables real-time updates across +multiple server instances. When any user clicks increment/decrement, ALL +connected users see the update immediately - even if they're connected to +different server instances. + +To test locally: + 1. Start Redis: docker-compose up -d + 2. Run multiple instances: + Terminal 1: uvicorn app:app --port 8000 + Terminal 2: uvicorn app:app --port 8001 + Terminal 3: uvicorn app:app --port 8002 + 3. Open http://localhost:8000 and http://localhost:8001 in different browsers + 4. Click increment on one - see it update on both! + +To switch backends: + - Redis (recommended): RedisPubSub("redis://localhost:6379") + - PostgreSQL: PostgresPubSub("postgresql://user:pass@localhost/db") + - Testing: TestPubSub() +""" + +import os +from typing import TypedDict + +from pyview import LiveView, PyView, is_connected +from redis_pubsub import RedisPubSub + + +class CounterContext(TypedDict): + count: int + + +class CounterLiveView(LiveView[CounterContext]): + """Shared counter across all instances.""" + + async def mount(self, socket, session): + socket.context = {"count": 0} + if is_connected(socket): + # Subscribe to counter updates from any instance + await socket.subscribe("counter") + + async def handle_event(self, event, payload, socket): + if event == "increment": + socket.context["count"] += 1 + # Broadcast to ALL instances via Redis + await socket.broadcast("counter", socket.context["count"]) + elif event == "decrement": + socket.context["count"] -= 1 + await socket.broadcast("counter", socket.context["count"]) + elif event == "reset": + socket.context["count"] = 0 + await socket.broadcast("counter", 0) + + async def handle_info(self, event, socket): + # Received update from another instance + socket.context["count"] = event.payload + + async def render(self, context, meta): + return """ + + + + Multi-Instance Counter + + + +
+

Multi-Instance Counter

+
{{ count }}
+
+ + + +
+
+ 🚀 Try opening this page in multiple browsers or run multiple instances on different ports. + All instances stay in sync via Redis pub/sub! +
+
+ + + """ + + +# Create app with Redis pub/sub +# Uses REDIS_URL environment variable if set, otherwise defaults to localhost +redis_url = os.getenv("REDIS_URL", "redis://localhost:6379") +app = PyView(pubsub=RedisPubSub(redis_url)) + +app.add_live_view("/", CounterLiveView) diff --git a/examples/custom_pubsub/docker-compose.yml b/examples/custom_pubsub/docker-compose.yml new file mode 100644 index 0000000..e90c1ec --- /dev/null +++ b/examples/custom_pubsub/docker-compose.yml @@ -0,0 +1,72 @@ +version: '3.8' + +services: + redis: + image: redis:7-alpine + ports: + - "6379:6379" + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 5s + timeout: 3s + retries: 5 + + postgres: + image: postgres:16-alpine + environment: + POSTGRES_USER: pyview + POSTGRES_PASSWORD: pyview + POSTGRES_DB: pyview + ports: + - "5432:5432" + healthcheck: + test: ["CMD-SHELL", "pg_isready -U pyview"] + interval: 5s + timeout: 3s + retries: 5 + volumes: + - postgres_data:/var/lib/postgresql/data + + app1: + build: . + environment: + - REDIS_URL=redis://redis:6379 + depends_on: + redis: + condition: service_healthy + ports: + - "8001:8000" + + app2: + build: . + environment: + - REDIS_URL=redis://redis:6379 + depends_on: + redis: + condition: service_healthy + ports: + - "8002:8000" + + app3: + build: . + environment: + - REDIS_URL=redis://redis:6379 + depends_on: + redis: + condition: service_healthy + ports: + - "8003:8000" + + nginx: + image: nginx:alpine + ports: + - "8000:80" + volumes: + - ./nginx.conf:/etc/nginx/nginx.conf:ro + depends_on: + - app1 + - app2 + - app3 + +volumes: + postgres_data: diff --git a/examples/custom_pubsub/nginx.conf b/examples/custom_pubsub/nginx.conf new file mode 100644 index 0000000..8be50e9 --- /dev/null +++ b/examples/custom_pubsub/nginx.conf @@ -0,0 +1,35 @@ +events { + worker_connections 1024; +} + +http { + upstream pyview_backends { + # Round-robin load balancing across three app instances + server app1:8000; + server app2:8000; + server app3:8000; + } + + server { + listen 80; + + location / { + proxy_pass http://pyview_backends; + proxy_http_version 1.1; + + # WebSocket support + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "upgrade"; + + # Standard proxy headers + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + + # Timeouts + proxy_read_timeout 86400; + proxy_send_timeout 86400; + } + } +} diff --git a/examples/custom_pubsub/postgres_pubsub.py b/examples/custom_pubsub/postgres_pubsub.py new file mode 100644 index 0000000..30a27ec --- /dev/null +++ b/examples/custom_pubsub/postgres_pubsub.py @@ -0,0 +1,147 @@ +"""PostgreSQL NOTIFY/LISTEN pub/sub backend. + +Requirements: + pip install asyncpg + +Usage: + app = PyView(pubsub=PostgresPubSub("postgresql://user:pass@localhost/db")) + +Pros: + - No additional infrastructure if you already use PostgreSQL + - Transactional guarantees available if needed + +Cons: + - Lower throughput than Redis + - Not designed for high-volume pub/sub +""" + +import asyncio +import json +import logging +from typing import Any, Callable, Coroutine + +import asyncpg + +logger = logging.getLogger(__name__) + +TopicHandler = Callable[[str, Any], Coroutine[Any, Any, None]] + + +class PostgresPubSub: + """PostgreSQL NOTIFY/LISTEN pub/sub implementation.""" + + def __init__(self, dsn: str, channel_prefix: str = "pyview_"): + self._dsn = dsn + self._prefix = channel_prefix + self._conn: asyncpg.Connection | None = None + self._listen_conn: asyncpg.Connection | None = None + + self._lock = asyncio.Lock() + self._topic_subscribers: dict[str, dict[str, TopicHandler]] = {} + self._session_topics: dict[str, dict[str, TopicHandler]] = {} + self._subscribed_channels: set[str] = set() + + async def start(self) -> None: + """Connect to PostgreSQL.""" + # Separate connections for publish and listen + self._conn = await asyncpg.connect(self._dsn) + self._listen_conn = await asyncpg.connect(self._dsn) + logger.info("PostgreSQL pub/sub connected") + + async def stop(self) -> None: + """Disconnect from PostgreSQL.""" + if self._conn: + await self._conn.close() + if self._listen_conn: + await self._listen_conn.close() + logger.info("PostgreSQL pub/sub disconnected") + + async def subscribe_topic( + self, + session_id: str, + topic: str, + handler: TopicHandler, + ) -> None: + """Subscribe to a topic using LISTEN.""" + channel = f"{self._prefix}{topic}" + + async with self._lock: + if topic not in self._topic_subscribers: + self._topic_subscribers[topic] = {} + + if channel not in self._subscribed_channels: + await self._listen_conn.add_listener( + channel, self._make_listener(topic) + ) + self._subscribed_channels.add(channel) + + self._topic_subscribers[topic][session_id] = handler + + if session_id not in self._session_topics: + self._session_topics[session_id] = {} + self._session_topics[session_id][topic] = handler + + def _make_listener(self, topic: str): + """Create a listener callback for a topic.""" + def listener(conn, pid, channel, payload): + asyncio.create_task(self._handle_notification(topic, payload)) + return listener + + async def _handle_notification(self, topic: str, payload: str) -> None: + """Handle incoming NOTIFY.""" + try: + message = json.loads(payload) + + async with self._lock: + handlers = list(self._topic_subscribers.get(topic, {}).values()) + + for handler in handlers: + asyncio.create_task(handler(topic, message)) + except Exception: + logger.exception("Error handling PostgreSQL notification") + + async def unsubscribe_topic(self, session_id: str, topic: str) -> None: + """Unsubscribe from a topic.""" + channel = f"{self._prefix}{topic}" + + async with self._lock: + if topic in self._topic_subscribers: + self._topic_subscribers[topic].pop(session_id, None) + + if not self._topic_subscribers[topic]: + del self._topic_subscribers[topic] + if channel in self._subscribed_channels: + await self._listen_conn.remove_listener(channel, None) + self._subscribed_channels.discard(channel) + + if session_id in self._session_topics: + self._session_topics[session_id].pop(topic, None) + if not self._session_topics[session_id]: + del self._session_topics[session_id] + + async def unsubscribe_all(self, session_id: str) -> None: + """Remove all subscriptions for a session.""" + async with self._lock: + if session_id not in self._session_topics: + return + + for topic in list(self._session_topics[session_id].keys()): + channel = f"{self._prefix}{topic}" + + if topic in self._topic_subscribers: + self._topic_subscribers[topic].pop(session_id, None) + + if not self._topic_subscribers[topic]: + del self._topic_subscribers[topic] + if channel in self._subscribed_channels: + await self._listen_conn.remove_listener(channel, None) + self._subscribed_channels.discard(channel) + + del self._session_topics[session_id] + + async def broadcast(self, topic: str, message: Any) -> None: + """Broadcast using NOTIFY.""" + channel = f"{self._prefix}{topic}" + payload = json.dumps(message) + # PostgreSQL NOTIFY has an 8000 byte payload limit + await self._conn.execute(f"NOTIFY {channel}, $1", payload) diff --git a/examples/custom_pubsub/redis_pubsub.py b/examples/custom_pubsub/redis_pubsub.py new file mode 100644 index 0000000..dc4e382 --- /dev/null +++ b/examples/custom_pubsub/redis_pubsub.py @@ -0,0 +1,179 @@ +"""Redis-backed pub/sub for multi-instance PyView deployments. + +Requirements: + pip install redis + +Usage: + from redis_pubsub import RedisPubSub + + app = PyView(pubsub=RedisPubSub("redis://localhost:6379")) + +How it works: + - Handlers are stored locally (they're Python callables, not serializable) + - When broadcast() is called, the message is published to Redis + - All instances receive the message and dispatch to their local handlers + - This enables real-time updates across multiple server instances +""" + +import asyncio +import json +import logging +from typing import Any, Callable, Coroutine + +import redis.asyncio as redis + +logger = logging.getLogger(__name__) + +TopicHandler = Callable[[str, Any], Coroutine[Any, Any, None]] + + +class RedisPubSub: + """Redis-backed pub/sub implementation.""" + + def __init__( + self, + url: str = "redis://localhost:6379", + channel_prefix: str = "pyview:", + ): + """Initialize Redis pub/sub. + + Args: + url: Redis connection URL + channel_prefix: Prefix for Redis channel names (helps avoid collisions) + """ + self._url = url + self._prefix = channel_prefix + self._client: redis.Redis | None = None + self._pubsub: redis.client.PubSub | None = None + self._listener_task: asyncio.Task | None = None + + # Local handler tracking (handlers can't be serialized to Redis) + self._lock = asyncio.Lock() + # topic -> {session_id -> handler} + self._topic_subscribers: dict[str, dict[str, TopicHandler]] = {} + # session_id -> {topic -> handler} + self._session_topics: dict[str, dict[str, TopicHandler]] = {} + + async def start(self) -> None: + """Connect to Redis and start the message listener.""" + self._client = redis.from_url(self._url) + self._pubsub = self._client.pubsub() + self._listener_task = asyncio.create_task(self._listen()) + logger.info("Redis pub/sub connected to %s", self._url) + + async def stop(self) -> None: + """Disconnect from Redis and clean up.""" + if self._listener_task: + self._listener_task.cancel() + try: + await self._listener_task + except asyncio.CancelledError: + pass + + if self._pubsub: + await self._pubsub.close() + + if self._client: + await self._client.close() + + logger.info("Redis pub/sub disconnected") + + async def subscribe_topic( + self, + session_id: str, + topic: str, + handler: TopicHandler, + ) -> None: + """Subscribe a session to a topic.""" + channel = f"{self._prefix}{topic}" + + async with self._lock: + # Track handler locally + if topic not in self._topic_subscribers: + self._topic_subscribers[topic] = {} + # First local subscriber - subscribe to Redis channel + await self._pubsub.subscribe(channel) + logger.debug("Subscribed to Redis channel: %s", channel) + + self._topic_subscribers[topic][session_id] = handler + + # Track for session cleanup + if session_id not in self._session_topics: + self._session_topics[session_id] = {} + self._session_topics[session_id][topic] = handler + + async def unsubscribe_topic(self, session_id: str, topic: str) -> None: + """Unsubscribe a session from a topic.""" + channel = f"{self._prefix}{topic}" + + async with self._lock: + if topic in self._topic_subscribers: + self._topic_subscribers[topic].pop(session_id, None) + + if not self._topic_subscribers[topic]: + # Last local subscriber - unsubscribe from Redis + del self._topic_subscribers[topic] + await self._pubsub.unsubscribe(channel) + logger.debug("Unsubscribed from Redis channel: %s", channel) + + if session_id in self._session_topics: + self._session_topics[session_id].pop(topic, None) + if not self._session_topics[session_id]: + del self._session_topics[session_id] + + async def unsubscribe_all(self, session_id: str) -> None: + """Remove all subscriptions for a session (called on disconnect).""" + async with self._lock: + if session_id not in self._session_topics: + return + + for topic in list(self._session_topics[session_id].keys()): + channel = f"{self._prefix}{topic}" + + if topic in self._topic_subscribers: + self._topic_subscribers[topic].pop(session_id, None) + + if not self._topic_subscribers[topic]: + del self._topic_subscribers[topic] + await self._pubsub.unsubscribe(channel) + + del self._session_topics[session_id] + + async def broadcast(self, topic: str, message: Any) -> None: + """Publish a message to all subscribers across all instances.""" + channel = f"{self._prefix}{topic}" + # Include topic in payload for routing on receive + payload = json.dumps({"topic": topic, "message": message}) + await self._client.publish(channel, payload) + + async def _listen(self) -> None: + """Background task that receives Redis messages and dispatches to handlers.""" + try: + async for message in self._pubsub.listen(): + if message["type"] != "message": + continue + + try: + data = json.loads(message["data"]) + topic = data["topic"] + payload = data["message"] + + # Get handlers while holding lock + async with self._lock: + handlers = list( + self._topic_subscribers.get(topic, {}).values() + ) + + # Dispatch outside lock to prevent deadlocks + for handler in handlers: + asyncio.create_task(handler(topic, payload)) + + except json.JSONDecodeError: + logger.warning("Invalid JSON in Redis message: %s", message["data"]) + except Exception: + logger.exception("Error processing Redis message") + + except asyncio.CancelledError: + pass + except Exception: + logger.exception("Redis listener crashed") diff --git a/examples/custom_pubsub/requirements.txt b/examples/custom_pubsub/requirements.txt new file mode 100644 index 0000000..66999f7 --- /dev/null +++ b/examples/custom_pubsub/requirements.txt @@ -0,0 +1,11 @@ +# Core dependencies +pyview-web + +# Redis pub/sub backend +redis>=5.0.0 + +# PostgreSQL pub/sub backend +asyncpg>=0.29.0 + +# ASGI server +uvicorn[standard]>=0.24.0 diff --git a/examples/custom_pubsub/test_pubsub.py b/examples/custom_pubsub/test_pubsub.py new file mode 100644 index 0000000..91eac5f --- /dev/null +++ b/examples/custom_pubsub/test_pubsub.py @@ -0,0 +1,81 @@ +"""Test pub/sub implementation that records all operations. + +Useful for integration testing LiveViews that use pub/sub without needing +Redis or other external dependencies. + +Usage: + test_pubsub = TestPubSub() + app = PyView(pubsub=test_pubsub) + + # In your tests + assert ("user_123", "updates") in test_pubsub.subscriptions + assert ("updates", {"action": "joined"}) in test_pubsub.broadcasts +""" + +from typing import Any, Callable, Coroutine + +TopicHandler = Callable[[str, Any], Coroutine[Any, Any, None]] + + +class TestPubSub: + """Records all pub/sub operations for testing.""" + + def __init__(self): + self.subscriptions: list[tuple[str, str]] = [] # (session_id, topic) + self.unsubscriptions: list[tuple[str, str]] = [] # (session_id, topic) + self.broadcasts: list[tuple[str, Any]] = [] # (topic, message) + self.handlers: dict[str, dict[str, TopicHandler]] = {} + self.started = False + self.stopped = False + + async def start(self) -> None: + """Mark as started.""" + self.started = True + + async def stop(self) -> None: + """Mark as stopped.""" + self.stopped = True + + async def subscribe_topic( + self, + session_id: str, + topic: str, + handler: TopicHandler, + ) -> None: + """Record subscription and store handler.""" + self.subscriptions.append((session_id, topic)) + if topic not in self.handlers: + self.handlers[topic] = {} + self.handlers[topic][session_id] = handler + + async def unsubscribe_topic(self, session_id: str, topic: str) -> None: + """Record unsubscription.""" + self.unsubscriptions.append((session_id, topic)) + if topic in self.handlers and session_id in self.handlers[topic]: + del self.handlers[topic][session_id] + if not self.handlers[topic]: + del self.handlers[topic] + + async def unsubscribe_all(self, session_id: str) -> None: + """Record unsubscribe all.""" + for topic in list(self.handlers.keys()): + if session_id in self.handlers[topic]: + self.unsubscriptions.append((session_id, topic)) + del self.handlers[topic][session_id] + if not self.handlers[topic]: + del self.handlers[topic] + + async def broadcast(self, topic: str, message: Any) -> None: + """Record broadcast and dispatch to local handlers.""" + self.broadcasts.append((topic, message)) + # Immediately dispatch to local handlers for testing + if topic in self.handlers: + for handler in list(self.handlers[topic].values()): + await handler(topic, message) + + def clear(self) -> None: + """Clear all recorded operations.""" + self.subscriptions.clear() + self.unsubscriptions.clear() + self.broadcasts.clear() + self.handlers.clear() diff --git a/pyview/pyview.py b/pyview/pyview.py index 15d5647..8d6a257 100644 --- a/pyview/pyview.py +++ b/pyview/pyview.py @@ -49,9 +49,7 @@ def __init__( self.instrumentation = instrumentation or NoOpInstrumentation() self.pubsub = pubsub or InMemoryPubSub() self.view_lookup = LiveViewLookup() - self.live_handler = LiveSocketHandler( - self.view_lookup, self.instrumentation, self.pubsub - ) + self.live_handler = LiveSocketHandler(self.view_lookup, self.instrumentation, self.pubsub) self.routes.append(WebSocketRoute("/live/websocket", self.live_handler.handle)) self.add_middleware(GZipMiddleware) @@ -69,16 +67,15 @@ async def lifespan(app): app.live_handler.start_scheduler() await app.pubsub.start() - # Run user's lifespan if they provided one - if user_lifespan: - async with user_lifespan(app): + try: + if user_lifespan: + async with user_lifespan(app): + yield + else: yield - else: - yield - - # Shutdown - await app.pubsub.stop() - await app.live_handler.shutdown_scheduler() + finally: + await app.pubsub.stop() + await app.live_handler.shutdown_scheduler() return lifespan From 713fa8596f0ca8a4f82b54844faa5acdc9e290fa Mon Sep 17 00:00:00 2001 From: Larry Ogrodnek Date: Sun, 14 Dec 2025 18:57:29 -0500 Subject: [PATCH 7/9] clneaup --- examples/custom_pubsub/app.py | 3 ++- examples/custom_pubsub/redis_pubsub.py | 5 ++--- pyview/pyview.py | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/examples/custom_pubsub/app.py b/examples/custom_pubsub/app.py index 850233e..be2bf34 100644 --- a/examples/custom_pubsub/app.py +++ b/examples/custom_pubsub/app.py @@ -23,9 +23,10 @@ import os from typing import TypedDict -from pyview import LiveView, PyView, is_connected from redis_pubsub import RedisPubSub +from pyview import LiveView, PyView, is_connected + class CounterContext(TypedDict): count: int diff --git a/examples/custom_pubsub/redis_pubsub.py b/examples/custom_pubsub/redis_pubsub.py index dc4e382..842971a 100644 --- a/examples/custom_pubsub/redis_pubsub.py +++ b/examples/custom_pubsub/redis_pubsub.py @@ -18,6 +18,7 @@ import asyncio import json import logging +from contextlib import suppress from typing import Any, Callable, Coroutine import redis.asyncio as redis @@ -65,10 +66,8 @@ async def stop(self) -> None: """Disconnect from Redis and clean up.""" if self._listener_task: self._listener_task.cancel() - try: + with suppress(asyncio.CancelledError): await self._listener_task - except asyncio.CancelledError: - pass if self._pubsub: await self._pubsub.close() diff --git a/pyview/pyview.py b/pyview/pyview.py index 8d6a257..1d5e1ad 100644 --- a/pyview/pyview.py +++ b/pyview/pyview.py @@ -13,8 +13,8 @@ from pyview.csrf import generate_csrf_token from pyview.instrumentation import InstrumentationProvider, NoOpInstrumentation from pyview.live_socket import UnconnectedSocket -from pyview.pubsub import InMemoryPubSub, PubSubProvider from pyview.meta import PyViewMeta +from pyview.pubsub import InMemoryPubSub, PubSubProvider from pyview.session import serialize_session from .live_routes import LiveViewLookup From ac12dd88c81da325da59970151b946cb5ce0512c Mon Sep 17 00:00:00 2001 From: Larry Ogrodnek Date: Sun, 14 Dec 2025 18:58:26 -0500 Subject: [PATCH 8/9] formatting --- examples/custom_pubsub/postgres_pubsub.py | 6 +++--- examples/custom_pubsub/redis_pubsub.py | 4 +--- pyview/ws_handler.py | 7 ++++++- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/examples/custom_pubsub/postgres_pubsub.py b/examples/custom_pubsub/postgres_pubsub.py index 30a27ec..b6262bc 100644 --- a/examples/custom_pubsub/postgres_pubsub.py +++ b/examples/custom_pubsub/postgres_pubsub.py @@ -70,9 +70,7 @@ async def subscribe_topic( self._topic_subscribers[topic] = {} if channel not in self._subscribed_channels: - await self._listen_conn.add_listener( - channel, self._make_listener(topic) - ) + await self._listen_conn.add_listener(channel, self._make_listener(topic)) self._subscribed_channels.add(channel) self._topic_subscribers[topic][session_id] = handler @@ -83,8 +81,10 @@ async def subscribe_topic( def _make_listener(self, topic: str): """Create a listener callback for a topic.""" + def listener(conn, pid, channel, payload): asyncio.create_task(self._handle_notification(topic, payload)) + return listener async def _handle_notification(self, topic: str, payload: str) -> None: diff --git a/examples/custom_pubsub/redis_pubsub.py b/examples/custom_pubsub/redis_pubsub.py index 842971a..34d0214 100644 --- a/examples/custom_pubsub/redis_pubsub.py +++ b/examples/custom_pubsub/redis_pubsub.py @@ -159,9 +159,7 @@ async def _listen(self) -> None: # Get handlers while holding lock async with self._lock: - handlers = list( - self._topic_subscribers.get(topic, {}).values() - ) + handlers = list(self._topic_subscribers.get(topic, {}).values()) # Dispatch outside lock to prevent deadlocks for handler in handlers: diff --git a/pyview/ws_handler.py b/pyview/ws_handler.py index d11b4a2..4dd7f59 100644 --- a/pyview/ws_handler.py +++ b/pyview/ws_handler.py @@ -290,7 +290,12 @@ async def handle_connected(self, myJoinId, socket: ConnectedLiveViewSocket): # Create new socket for new LiveView socket = ConnectedLiveViewSocket( - socket.websocket, topic, lv, self.scheduler, self.instrumentation, self.pubsub + socket.websocket, + topic, + lv, + self.scheduler, + self.instrumentation, + self.pubsub, ) session = {} From 1e1c0a8c65b2c6cc395ca29ac5b15166684bd467 Mon Sep 17 00:00:00 2001 From: Larry Ogrodnek Date: Sun, 14 Dec 2025 19:03:07 -0500 Subject: [PATCH 9/9] cleanup --- examples/custom_pubsub/postgres_pubsub.py | 18 ++++++++++++++---- examples/custom_pubsub/redis_pubsub.py | 15 ++++++++++++++- 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/examples/custom_pubsub/postgres_pubsub.py b/examples/custom_pubsub/postgres_pubsub.py index b6262bc..452d222 100644 --- a/examples/custom_pubsub/postgres_pubsub.py +++ b/examples/custom_pubsub/postgres_pubsub.py @@ -21,6 +21,7 @@ from typing import Any, Callable, Coroutine import asyncpg +from asyncpg.utils import quote_ident logger = logging.getLogger(__name__) @@ -40,6 +41,8 @@ def __init__(self, dsn: str, channel_prefix: str = "pyview_"): self._topic_subscribers: dict[str, dict[str, TopicHandler]] = {} self._session_topics: dict[str, dict[str, TopicHandler]] = {} self._subscribed_channels: set[str] = set() + # Store listener callbacks for proper cleanup + self._channel_listeners: dict[str, Callable] = {} async def start(self) -> None: """Connect to PostgreSQL.""" @@ -70,7 +73,9 @@ async def subscribe_topic( self._topic_subscribers[topic] = {} if channel not in self._subscribed_channels: - await self._listen_conn.add_listener(channel, self._make_listener(topic)) + listener = self._make_listener(topic) + self._channel_listeners[channel] = listener + await self._listen_conn.add_listener(channel, listener) self._subscribed_channels.add(channel) self._topic_subscribers[topic][session_id] = handler @@ -111,7 +116,9 @@ async def unsubscribe_topic(self, session_id: str, topic: str) -> None: if not self._topic_subscribers[topic]: del self._topic_subscribers[topic] if channel in self._subscribed_channels: - await self._listen_conn.remove_listener(channel, None) + listener = self._channel_listeners.pop(channel, None) + if listener: + await self._listen_conn.remove_listener(channel, listener) self._subscribed_channels.discard(channel) if session_id in self._session_topics: @@ -134,7 +141,9 @@ async def unsubscribe_all(self, session_id: str) -> None: if not self._topic_subscribers[topic]: del self._topic_subscribers[topic] if channel in self._subscribed_channels: - await self._listen_conn.remove_listener(channel, None) + listener = self._channel_listeners.pop(channel, None) + if listener: + await self._listen_conn.remove_listener(channel, listener) self._subscribed_channels.discard(channel) del self._session_topics[session_id] @@ -144,4 +153,5 @@ async def broadcast(self, topic: str, message: Any) -> None: channel = f"{self._prefix}{topic}" payload = json.dumps(message) # PostgreSQL NOTIFY has an 8000 byte payload limit - await self._conn.execute(f"NOTIFY {channel}, $1", payload) + # Use quote_ident to prevent SQL injection + await self._conn.execute(f"NOTIFY {quote_ident(channel)}, $1", payload) diff --git a/examples/custom_pubsub/redis_pubsub.py b/examples/custom_pubsub/redis_pubsub.py index 34d0214..751f4a6 100644 --- a/examples/custom_pubsub/redis_pubsub.py +++ b/examples/custom_pubsub/redis_pubsub.py @@ -57,7 +57,7 @@ def __init__( async def start(self) -> None: """Connect to Redis and start the message listener.""" - self._client = redis.from_url(self._url) + self._client = redis.from_url(self._url, decode_responses=True) self._pubsub = self._client.pubsub() self._listener_task = asyncio.create_task(self._listen()) logger.info("Redis pub/sub connected to %s", self._url) @@ -66,6 +66,7 @@ async def stop(self) -> None: """Disconnect from Redis and clean up.""" if self._listener_task: self._listener_task.cancel() + # Suppress CancelledError from listener task - expected during shutdown with suppress(asyncio.CancelledError): await self._listener_task @@ -154,6 +155,14 @@ async def _listen(self) -> None: try: data = json.loads(message["data"]) + + # Validate required fields + if "topic" not in data or "message" not in data: + logger.warning( + "Redis message missing required fields: %s", list(data.keys()) + ) + continue + topic = data["topic"] payload = data["message"] @@ -166,11 +175,15 @@ async def _listen(self) -> None: asyncio.create_task(handler(topic, payload)) except json.JSONDecodeError: + # Malformed JSON from Redis - log and continue logger.warning("Invalid JSON in Redis message: %s", message["data"]) except Exception: + # Unexpected error processing message - log and continue logger.exception("Error processing Redis message") except asyncio.CancelledError: + # Task cancelled during shutdown - this is expected pass except Exception: + # Unexpected listener crash - log for debugging logger.exception("Redis listener crashed")