Skip to content

Claude/pluggable pubsub design x thf q#98

Open
ogrodnek wants to merge 9 commits intomainfrom
claude/pluggable-pubsub-design-xThfQ
Open

Claude/pluggable pubsub design x thf q#98
ogrodnek wants to merge 9 commits intomainfrom
claude/pluggable-pubsub-design-xThfQ

Conversation

@ogrodnek
Copy link
Copy Markdown
Owner

No description provided.

claude and others added 6 commits December 14, 2025 17:56
Design for making PyView's pub/sub system pluggable to support
multi-machine deployments with backends like Redis, while:
- Adding no new dependencies to core PyView
- Maintaining backward compatibility
- Following the existing InstrumentationProvider pattern

Includes:
- PubSubProvider abstract interface
- InMemoryPubSub default implementation
- SessionPubSub wrapper for ergonomic socket API
- Complete Redis implementation example
- Migration path and considerations
- Use typing.Protocol for structural typing (duck typing)
- Consistent with existing InfoEventScheduler pattern
- No inheritance required for implementations
- Use ... (Ellipsis) convention for protocol method stubs
- Add @runtime_checkable for isinstance() support if needed
Add a pluggable pub/sub architecture that allows custom backends
(Redis, PostgreSQL, NATS, etc.) for multi-machine deployments.

New modules:
- pyview/pubsub/interfaces.py: PubSubProvider Protocol
- pyview/pubsub/memory.py: InMemoryPubSub (default implementation)
- pyview/pubsub/session.py: SessionPubSub wrapper for socket API

Changes:
- PyView now accepts optional `pubsub` parameter
- Lifecycle hooks (start/stop) called during app lifespan
- ConnectedLiveViewSocket uses new pub/sub interface
- PubSubProvider exported from main pyview module

The default InMemoryPubSub maintains backward compatibility.
Custom implementations can be provided for distributed deployments:

    from pyview import PyView
    from myapp.redis_pubsub import RedisPubSub

    app = PyView(pubsub=RedisPubSub("redis://localhost:6379"))
Includes complete, copy-pasteable implementations for:
- Redis (using redis.asyncio)
- PostgreSQL (using asyncpg with NOTIFY/LISTEN)

Also covers:
- The PubSubProvider protocol
- Key implementation notes (handlers are local, serialization)
- Error handling best practices
- Test implementation pattern
Instead of reimplementing the pub/sub logic, InMemoryPubSub now
wraps the existing battle-tested Flet PubSubHub implementation.

This reduces code duplication while providing a clean adapter
that satisfies our PubSubProvider protocol.
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a pluggable pub/sub architecture for PyView, enabling distributed real-time communication across multiple server instances using backends like Redis or PostgreSQL.

Key Changes:

  • Introduces PubSubProvider protocol for pluggable pub/sub implementations
  • Adds in-memory default implementation and example Redis/PostgreSQL backends
  • Updates core framework to inject pub/sub provider through dependency injection

Reviewed changes

Copilot reviewed 18 out of 18 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
pyview/pubsub/interfaces.py Defines PubSubProvider protocol with async methods for subscribe, unsubscribe, broadcast, start/stop lifecycle
pyview/pubsub/memory.py In-memory pub/sub implementation delegating to existing Flet PubSubHub
pyview/pubsub/session.py Session-scoped wrapper that automatically includes session_id in operations
pyview/pubsub/init.py Module exports for public API
pyview/pyview.py Integrates pub/sub provider with optional injection, lifecycle management in lifespan context
pyview/ws_handler.py Updates LiveSocketHandler to accept and pass pub/sub provider to sockets
pyview/live_socket.py Replaces global pub_sub_hub with injected provider, uses SessionPubSub wrapper
pyview/init.py Exports PubSubProvider in public API
examples/custom_pubsub/redis_pubsub.py Redis-backed pub/sub with reference counting and background listener task
examples/custom_pubsub/postgres_pubsub.py PostgreSQL NOTIFY/LISTEN pub/sub backend
examples/custom_pubsub/test_pubsub.py Test implementation that records operations for testing
examples/custom_pubsub/app.py Multi-instance counter demo using Redis pub/sub
examples/custom_pubsub/README.md Comprehensive documentation with setup instructions and implementation guide
docs/pubsub-backends.md API documentation and usage guide for custom backends
examples/custom_pubsub/docker-compose.yml Docker Compose setup with Redis, PostgreSQL, and three app instances
examples/custom_pubsub/nginx.conf Nginx load balancer configuration for multi-instance setup
examples/custom_pubsub/requirements.txt Dependencies for example implementations
examples/custom_pubsub/Dockerfile Container image for demo app

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

if not self._topic_subscribers[topic]:
del self._topic_subscribers[topic]
if channel in self._subscribed_channels:
await self._listen_conn.remove_listener(channel, None)
Copy link

Copilot AI Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue as line 114: passing None to remove_listener instead of the actual callback. The listener callback should be stored when added and passed here for proper cleanup.

Copilot uses AI. Check for mistakes.
channel = f"{self._prefix}{topic}"
payload = json.dumps(message)
# PostgreSQL NOTIFY has an 8000 byte payload limit
await self._conn.execute(f"NOTIFY {channel}, $1", payload)
Copy link

Copilot AI Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SQL injection vulnerability: the channel name is directly interpolated into the SQL command using an f-string. Although the channel is typically constructed from a prefix and topic, this should use parameterized queries or proper identifier quoting.

Use asyncpg's proper NOTIFY syntax or quote the identifier using asyncpg.utils.quote_ident().

Copilot uses AI. Check for mistakes.
continue

try:
data = json.loads(message["data"])
Copy link

Copilot AI Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing KeyError handling: if the message data doesn't contain "topic" or "message" keys, a KeyError will be raised which will be caught by the generic Exception handler. Consider adding explicit KeyError handling or validating the data structure before accessing these keys to provide more specific error messages.

Suggested change
data = json.loads(message["data"])
data = json.loads(message["data"])
if "topic" not in data or "message" not in data:
logger.warning(
"Missing 'topic' or 'message' key in Redis message: %s", message["data"]
)
continue

Copilot uses AI. Check for mistakes.

async def start(self) -> None:
"""Connect to Redis and start the message listener."""
self._client = redis.from_url(self._url)
Copy link

Copilot AI Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing decode_responses=True configuration for Redis client. By default, redis-py returns bytes for message data, which will cause JSON decoding issues in the _listen method. Either set decode_responses=True when creating the client, or explicitly decode bytes to strings before JSON parsing.

Suggested change
self._client = redis.from_url(self._url)
self._client = redis.from_url(self._url, decode_responses=True)

Copilot uses AI. Check for mistakes.
if not self._topic_subscribers[topic]:
del self._topic_subscribers[topic]
if channel in self._subscribed_channels:
await self._listen_conn.remove_listener(channel, None)
Copy link

Copilot AI Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The remove_listener call should store the listener callback to properly remove it later. Currently passing None as the callback argument, which may not properly remove the listener.

The callback created by _make_listener(topic) should be stored (e.g., in _listener_callbacks dict) when adding the listener, then passed to remove_listener instead of None.

Copilot uses AI. Check for mistakes.
async def stop(self) -> None:
"""Disconnect from Redis and clean up."""
if self._listener_task:
self._listener_task.cancel()
Copy link

Copilot AI Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'except' clause does nothing but pass and there is no explanatory comment.

Suggested change
self._listener_task.cancel()
self._listener_task.cancel()
# Suppress CancelledError since task cancellation is expected during shutdown

Copilot uses AI. Check for mistakes.
except asyncio.CancelledError:
pass
except Exception:
logger.exception("Redis listener crashed")
Copy link

Copilot AI Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'except' clause does nothing but pass and there is no explanatory comment.

Suggested change
logger.exception("Redis listener crashed")
# Task was cancelled (e.g., on shutdown); safe to ignore.

Copilot uses AI. Check for mistakes.
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Dec 14, 2025

⚠️ Please install the 'codecov app svg image' to ensure uploads and comments are reliably processed by Codecov.

Codecov Report

❌ Patch coverage is 46.78899% with 58 lines in your changes missing coverage. Please review.
✅ Project coverage is 68.82%. Comparing base (205cf46) to head (1e1c0a8).
⚠️ Report is 15 commits behind head on main.

Files with missing lines Patch % Lines
examples/custom_pubsub/test_pubsub.py 26.19% 31 Missing ⚠️
pyview/pyview.py 25.00% 9 Missing ⚠️
pyview/pubsub/memory.py 61.11% 7 Missing ⚠️
pyview/pubsub/session.py 57.14% 6 Missing ⚠️
pyview/live_socket.py 20.00% 4 Missing ⚠️
pyview/ws_handler.py 66.66% 1 Missing ⚠️
❗ Your organization needs to install the Codecov GitHub app to enable full functionality.
Additional details and impacted files
@@            Coverage Diff             @@
##             main      #98      +/-   ##
==========================================
- Coverage   69.34%   68.82%   -0.52%     
==========================================
  Files          66       71       +5     
  Lines        4687     4783      +96     
  Branches      391      399       +8     
==========================================
+ Hits         3250     3292      +42     
- Misses       1370     1424      +54     
  Partials       67       67              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 18 out of 18 changed files in this pull request and generated 10 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

self._pool = redis.ConnectionPool.from_url(
url,
max_connections=max_connections,
decode_responses=False
Copy link

Copilot AI Dec 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The connection pooling example code snippet has an issue: it sets decode_responses=False but the actual RedisPubSub implementation expects decode_responses=True (as seen in line 60 of redis_pubsub.py). This inconsistency would cause the implementation to fail when trying to decode responses. The example should use decode_responses=True to match the working implementation.

Suggested change
decode_responses=False
decode_responses=True

Copilot uses AI. Check for mistakes.
Comment on lines +151 to +157
async def broadcast(self, topic: str, message: Any) -> None:
"""Broadcast using NOTIFY."""
channel = f"{self._prefix}{topic}"
payload = json.dumps(message)
# PostgreSQL NOTIFY has an 8000 byte payload limit
# Use quote_ident to prevent SQL injection
await self._conn.execute(f"NOTIFY {quote_ident(channel)}, $1", payload)
Copy link

Copilot AI Dec 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The broadcast method can fail if called before start() or after stop(). The _conn will be None in these states, causing an AttributeError. Consider adding a check for self._conn is None and either raise a clear error message or log and return early.

Copilot uses AI. Check for mistakes.
Comment on lines +81 to +103
async def subscribe_topic(
self,
session_id: str,
topic: str,
handler: TopicHandler,
) -> None:
"""Subscribe a session to a topic."""
channel = f"{self._prefix}{topic}"

async with self._lock:
# Track handler locally
if topic not in self._topic_subscribers:
self._topic_subscribers[topic] = {}
# First local subscriber - subscribe to Redis channel
await self._pubsub.subscribe(channel)
logger.debug("Subscribed to Redis channel: %s", channel)

self._topic_subscribers[topic][session_id] = handler

# Track for session cleanup
if session_id not in self._session_topics:
self._session_topics[session_id] = {}
self._session_topics[session_id][topic] = handler
Copy link

Copilot AI Dec 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The subscribe_topic method can fail if called before start(). The _pubsub will be None, causing an AttributeError at line 95. Consider adding a check for self._pubsub is None at the beginning of the method and raising a clear error message.

Copilot uses AI. Check for mistakes.
Comment on lines +54 to +60
async def stop(self) -> None:
"""Disconnect from PostgreSQL."""
if self._conn:
await self._conn.close()
if self._listen_conn:
await self._listen_conn.close()
logger.info("PostgreSQL pub/sub disconnected")
Copy link

Copilot AI Dec 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stop method doesn't properly clean up active listeners before closing connections. If there are active listeners when stop() is called, they should be removed first to avoid potential errors or warnings. Consider iterating through _channel_listeners and calling remove_listener for each before closing the connections, wrapped in try-except blocks for robustness.

Copilot uses AI. Check for mistakes.
channel = f"{self._prefix}{topic}"
# Include topic in payload for routing on receive
payload = json.dumps({"topic": topic, "message": message})
await self._client.publish(channel, payload)
Copy link

Copilot AI Dec 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The broadcast method lacks error handling for Redis connection failures. If Redis is temporarily unavailable or the connection is lost, this will raise an unhandled exception. Consider wrapping the publish call in a try-except block and logging the error to prevent broadcast failures from crashing the application.

Suggested change
await self._client.publish(channel, payload)
try:
await self._client.publish(channel, payload)
except Exception as e:
logger.exception("Failed to publish message to Redis channel '%s': %s", channel, e)

Copilot uses AI. Check for mistakes.
payload = json.dumps(message)
# PostgreSQL NOTIFY has an 8000 byte payload limit
# Use quote_ident to prevent SQL injection
await self._conn.execute(f"NOTIFY {quote_ident(channel)}, $1", payload)
Copy link

Copilot AI Dec 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The broadcast method lacks error handling for PostgreSQL connection failures. If the database connection is lost or times out, this will raise an unhandled exception. Consider wrapping the NOTIFY execution in a try-except block and logging the error to prevent broadcast failures from crashing the application.

Suggested change
await self._conn.execute(f"NOTIFY {quote_ident(channel)}, $1", payload)
try:
await self._conn.execute(f"NOTIFY {quote_ident(channel)}, $1", payload)
except Exception:
logger.exception(f"Failed to broadcast message on topic '{topic}'")

Copilot uses AI. Check for mistakes.
Comment on lines +142 to +147
async def broadcast(self, topic: str, message: Any) -> None:
"""Publish a message to all subscribers across all instances."""
channel = f"{self._prefix}{topic}"
# Include topic in payload for routing on receive
payload = json.dumps({"topic": topic, "message": message})
await self._client.publish(channel, payload)
Copy link

Copilot AI Dec 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The broadcast method can fail if called before start() or after stop(). The _client will be None in these states, causing an AttributeError. Consider adding a check for self._client is None and either raise a clear error message or log and return early.

Copilot uses AI. Check for mistakes.
Comment on lines +62 to +85
async def subscribe_topic(
self,
session_id: str,
topic: str,
handler: TopicHandler,
) -> None:
"""Subscribe to a topic using LISTEN."""
channel = f"{self._prefix}{topic}"

async with self._lock:
if topic not in self._topic_subscribers:
self._topic_subscribers[topic] = {}

if channel not in self._subscribed_channels:
listener = self._make_listener(topic)
self._channel_listeners[channel] = listener
await self._listen_conn.add_listener(channel, listener)
self._subscribed_channels.add(channel)

self._topic_subscribers[topic][session_id] = handler

if session_id not in self._session_topics:
self._session_topics[session_id] = {}
self._session_topics[session_id][topic] = handler
Copy link

Copilot AI Dec 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The subscribe_topic method can fail if called before start(). The _listen_conn will be None, causing an AttributeError at line 78. Consider adding a check for self._listen_conn is None at the beginning of the method and raising a clear error message.

Copilot uses AI. Check for mistakes.
Comment on lines +105 to +140
async def unsubscribe_topic(self, session_id: str, topic: str) -> None:
"""Unsubscribe a session from a topic."""
channel = f"{self._prefix}{topic}"

async with self._lock:
if topic in self._topic_subscribers:
self._topic_subscribers[topic].pop(session_id, None)

if not self._topic_subscribers[topic]:
# Last local subscriber - unsubscribe from Redis
del self._topic_subscribers[topic]
await self._pubsub.unsubscribe(channel)
logger.debug("Unsubscribed from Redis channel: %s", channel)

if session_id in self._session_topics:
self._session_topics[session_id].pop(topic, None)
if not self._session_topics[session_id]:
del self._session_topics[session_id]

async def unsubscribe_all(self, session_id: str) -> None:
"""Remove all subscriptions for a session (called on disconnect)."""
async with self._lock:
if session_id not in self._session_topics:
return

for topic in list(self._session_topics[session_id].keys()):
channel = f"{self._prefix}{topic}"

if topic in self._topic_subscribers:
self._topic_subscribers[topic].pop(session_id, None)

if not self._topic_subscribers[topic]:
del self._topic_subscribers[topic]
await self._pubsub.unsubscribe(channel)

del self._session_topics[session_id]
Copy link

Copilot AI Dec 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The unsubscribe_topic and unsubscribe_all methods can fail if called after stop(). The _pubsub will be None, causing an AttributeError when trying to unsubscribe. Consider adding null checks or wrapping the unsubscribe calls in try-except blocks to handle graceful shutdown scenarios.

Copilot uses AI. Check for mistakes.
Comment on lines +108 to +149
async def unsubscribe_topic(self, session_id: str, topic: str) -> None:
"""Unsubscribe from a topic."""
channel = f"{self._prefix}{topic}"

async with self._lock:
if topic in self._topic_subscribers:
self._topic_subscribers[topic].pop(session_id, None)

if not self._topic_subscribers[topic]:
del self._topic_subscribers[topic]
if channel in self._subscribed_channels:
listener = self._channel_listeners.pop(channel, None)
if listener:
await self._listen_conn.remove_listener(channel, listener)
self._subscribed_channels.discard(channel)

if session_id in self._session_topics:
self._session_topics[session_id].pop(topic, None)
if not self._session_topics[session_id]:
del self._session_topics[session_id]

async def unsubscribe_all(self, session_id: str) -> None:
"""Remove all subscriptions for a session."""
async with self._lock:
if session_id not in self._session_topics:
return

for topic in list(self._session_topics[session_id].keys()):
channel = f"{self._prefix}{topic}"

if topic in self._topic_subscribers:
self._topic_subscribers[topic].pop(session_id, None)

if not self._topic_subscribers[topic]:
del self._topic_subscribers[topic]
if channel in self._subscribed_channels:
listener = self._channel_listeners.pop(channel, None)
if listener:
await self._listen_conn.remove_listener(channel, listener)
self._subscribed_channels.discard(channel)

del self._session_topics[session_id]
Copy link

Copilot AI Dec 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The unsubscribe_topic and unsubscribe_all methods can fail if called after stop(). The _listen_conn will be None, causing an AttributeError when trying to remove listeners. Consider adding null checks or wrapping the remove_listener calls in try-except blocks to handle graceful shutdown scenarios.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants