From 7892ef54a9347cf4406c9db832b89fd314a6424f Mon Sep 17 00:00:00 2001 From: John | Elite Encoder Date: Wed, 4 Feb 2026 17:51:12 -0500 Subject: [PATCH] feat(media, protocol): Add error callback support for subscription and publishing processes --- pytrickle/media.py | 13 +++++++++---- pytrickle/protocol.py | 4 +++- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/pytrickle/media.py b/pytrickle/media.py index 2f20c58..41d19c2 100644 --- a/pytrickle/media.py +++ b/pytrickle/media.py @@ -35,6 +35,7 @@ async def run_subscribe( target_height: Optional[int] = DEFAULT_HEIGHT, max_framerate: Union[int, Callable[[], int], None] = DEFAULT_MAX_FRAMERATE, subscriber_timeout: Optional[float] = None, + error_callback: Optional[Callable] = None, ): """ Run subscription loop to receive and decode video streams. @@ -48,6 +49,7 @@ async def run_subscribe( target_height: Target height for decoded frames max_framerate: Maximum framerate or callable returning framerate for decoded frames subscriber_timeout: Optional timeout for subscriber connection + error_callback: Optional callback for error handling """ # Ensure default values are applied if None if target_width is None: @@ -64,7 +66,7 @@ async def run_subscribe( _decode_in(in_pipe, frame_callback, put_metadata, write_fd, target_width, target_height, max_framerate) ) subscribe_task = asyncio.create_task( - _subscribe(subscribe_url, write_fd, monitoring_callback, subscriber_timeout) + _subscribe(subscribe_url, write_fd, monitoring_callback, subscriber_timeout, error_callback) ) await asyncio.gather(subscribe_task, parse_task) logger.info("run_subscribe complete") @@ -79,14 +81,15 @@ async def _subscribe( out_pipe, monitoring_callback: Optional[Callable] = None, subscriber_timeout: Optional[float] = None, + error_callback: Optional[Callable] = None, ): """Subscribe to trickle stream and write data to pipe.""" first_segment = True if subscriber_timeout is not None: - subscriber_ctx = TrickleSubscriber(url=subscribe_url, connect_timeout_seconds=subscriber_timeout) + subscriber_ctx = TrickleSubscriber(url=subscribe_url, connect_timeout_seconds=subscriber_timeout, error_callback=error_callback) else: - subscriber_ctx = TrickleSubscriber(url=subscribe_url) + subscriber_ctx = TrickleSubscriber(url=subscribe_url, error_callback=error_callback) async with subscriber_ctx as subscriber: logger.info(f"Launching subscribe loop for {subscribe_url}") while True: @@ -191,6 +194,7 @@ async def run_publish( monitoring_callback: Optional[Callable] = None, publisher_timeout: Optional[float] = None, detect_out_resolution: bool = True, + error_callback: Optional[Callable] = None, ): """ Run publishing loop to encode and publish video streams. @@ -204,12 +208,13 @@ async def run_publish( detect_out_resolution: If True, detect output resolution from first frame's tensor shape. If False, use target_width/target_height from decoder metadata. Default is True to support Super Resolution workflows. + error_callback: Optional callback for error handling """ first_segment = True publisher = None try: - publisher = TricklePublisher(url=publish_url, mime_type="video/mp2t") + publisher = TricklePublisher(url=publish_url, mime_type="video/mp2t", error_callback=error_callback) if publisher_timeout is not None: publisher.connect_timeout_seconds = publisher_timeout diff --git a/pytrickle/protocol.py b/pytrickle/protocol.py index 9a0e063..5f8945b 100644 --- a/pytrickle/protocol.py +++ b/pytrickle/protocol.py @@ -158,6 +158,7 @@ async def start(self): self.height or DEFAULT_HEIGHT, lambda: self.max_framerate or DEFAULT_MAX_FRAMERATE, self.subscriber_timeout, + self._notify_error, ) ) else: @@ -176,6 +177,7 @@ async def start(self): self.emit_monitoring_event, self.publisher_timeout, self.detect_out_resolution, + self._notify_error, ) ) @@ -328,7 +330,7 @@ def dequeue_frame(): except queue.Empty: return None - while not done.is_set() and not self.shutdown_event.is_set(): + while not done.is_set() and not self.shutdown_event.is_set() and not self.error_event.is_set(): frame = await asyncio.to_thread(dequeue_frame) if frame is None: continue