Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions pytrickle/media.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ async def run_subscribe(
target_height: Optional[int] = DEFAULT_HEIGHT,
max_framerate: Union[int, Callable[[], int], None] = DEFAULT_MAX_FRAMERATE,
subscriber_timeout: Optional[float] = None,
error_callback: Optional[Callable] = None,
):
"""
Run subscription loop to receive and decode video streams.
Expand All @@ -48,6 +49,7 @@ async def run_subscribe(
target_height: Target height for decoded frames
max_framerate: Maximum framerate or callable returning framerate for decoded frames
subscriber_timeout: Optional timeout for subscriber connection
error_callback: Optional callback for error handling
"""
# Ensure default values are applied if None
if target_width is None:
Expand All @@ -64,7 +66,7 @@ async def run_subscribe(
_decode_in(in_pipe, frame_callback, put_metadata, write_fd, target_width, target_height, max_framerate)
)
subscribe_task = asyncio.create_task(
_subscribe(subscribe_url, write_fd, monitoring_callback, subscriber_timeout)
_subscribe(subscribe_url, write_fd, monitoring_callback, subscriber_timeout, error_callback)
)
await asyncio.gather(subscribe_task, parse_task)
logger.info("run_subscribe complete")
Expand All @@ -79,14 +81,15 @@ async def _subscribe(
out_pipe,
monitoring_callback: Optional[Callable] = None,
subscriber_timeout: Optional[float] = None,
error_callback: Optional[Callable] = None,
):
"""Subscribe to trickle stream and write data to pipe."""
first_segment = True

if subscriber_timeout is not None:
subscriber_ctx = TrickleSubscriber(url=subscribe_url, connect_timeout_seconds=subscriber_timeout)
subscriber_ctx = TrickleSubscriber(url=subscribe_url, connect_timeout_seconds=subscriber_timeout, error_callback=error_callback)
else:
subscriber_ctx = TrickleSubscriber(url=subscribe_url)
subscriber_ctx = TrickleSubscriber(url=subscribe_url, error_callback=error_callback)
async with subscriber_ctx as subscriber:
logger.info(f"Launching subscribe loop for {subscribe_url}")
while True:
Expand Down Expand Up @@ -191,6 +194,7 @@ async def run_publish(
monitoring_callback: Optional[Callable] = None,
publisher_timeout: Optional[float] = None,
detect_out_resolution: bool = True,
error_callback: Optional[Callable] = None,
):
"""
Run publishing loop to encode and publish video streams.
Expand All @@ -204,12 +208,13 @@ async def run_publish(
detect_out_resolution: If True, detect output resolution from first frame's tensor shape.
If False, use target_width/target_height from decoder metadata.
Default is True to support Super Resolution workflows.
error_callback: Optional callback for error handling
"""
first_segment = True
publisher = None

try:
publisher = TricklePublisher(url=publish_url, mime_type="video/mp2t")
publisher = TricklePublisher(url=publish_url, mime_type="video/mp2t", error_callback=error_callback)
if publisher_timeout is not None:
publisher.connect_timeout_seconds = publisher_timeout

Expand Down
4 changes: 3 additions & 1 deletion pytrickle/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ async def start(self):
self.height or DEFAULT_HEIGHT,
lambda: self.max_framerate or DEFAULT_MAX_FRAMERATE,
self.subscriber_timeout,
self._notify_error,
)
)
else:
Expand All @@ -176,6 +177,7 @@ async def start(self):
self.emit_monitoring_event,
self.publisher_timeout,
self.detect_out_resolution,
self._notify_error,
)
)

Expand Down Expand Up @@ -328,7 +330,7 @@ def dequeue_frame():
except queue.Empty:
return None

while not done.is_set() and not self.shutdown_event.is_set():
while not done.is_set() and not self.shutdown_event.is_set() and not self.error_event.is_set():
frame = await asyncio.to_thread(dequeue_frame)
if frame is None:
continue
Expand Down