Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/scope/server/cloud_track.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ async def _start(self) -> None:
connection_id=self.connection_id,
connection_info=self.connection_info,
)
self.frame_processor.set_event_loop(asyncio.get_running_loop())
self.frame_processor.start()

# Start input processing if we have a source track
Expand Down Expand Up @@ -194,8 +195,7 @@ async def recv(self) -> VideoFrame:
self._last_frame = frame
return frame

# No frame yet, wait a bit
await asyncio.sleep(0.01)
await self.frame_processor.wait_for_output()

def update_parameters(self, params: dict) -> None:
"""Update pipeline parameters on cloud."""
Expand Down
80 changes: 67 additions & 13 deletions src/scope/server/frame_processor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import logging
import queue
import threading
Expand Down Expand Up @@ -76,8 +77,10 @@ def __init__(

self.paused = False

# Pinned memory buffer cache for faster GPU transfers (local mode only)
self._pinned_buffer_cache = {}
# Pinned memory double-buffer cache for faster GPU transfers (local mode only)
# Maps shape → (buffer_a, buffer_b) to avoid DA race conditions
self._pinned_buffer_cache: dict[tuple, tuple[torch.Tensor, torch.Tensor]] = {}
self._pinned_buffer_index: dict[tuple, int] = {}
self._pinned_buffer_lock = threading.Lock()

# Cloud mode: send frames to cloud instead of local processing
Expand All @@ -97,6 +100,10 @@ def __init__(
# Input mode: video waits for frames, text generates immediately
self._video_mode = (initial_parameters or {}).get("input_mode") == "video"

# Event-based output signaling (replaces 10ms polling sleep)
self._output_event: asyncio.Event | None = None
self._event_loop: asyncio.AbstractEventLoop | None = None

# Pipeline chaining support
self.pipeline_processors: list[PipelineProcessor] = []
self.pipeline_ids: list[str] = []
Expand Down Expand Up @@ -330,17 +337,23 @@ def stop(self, error_message: str = None):
)

def _get_or_create_pinned_buffer(self, shape):
"""Get or create a reusable pinned memory buffer for the given shape.
"""Get the next available pinned memory buffer for the given shape.

This avoids repeated pinned memory allocations, which are expensive.
Pinned memory enables faster DMA transfers to GPU.
Uses double-buffering to prevent data corruption: while one buffer's
DMA transfer is in flight (via cuda(non_blocking=True)), the other
buffer is available for the next frame's copy_() call.
"""
with self._pinned_buffer_lock:
if shape not in self._pinned_buffer_cache:
self._pinned_buffer_cache[shape] = torch.empty(
shape, dtype=torch.uint8, pin_memory=True
)
return self._pinned_buffer_cache[shape]
buffer_a = torch.empty(shape, dtype=torch.uint8, pin_memory=True)
buffer_b = torch.empty(shape, dtype=torch.uint8, pin_memory=True)
self._pinned_buffer_cache[shape] = (buffer_a, buffer_b)
self._pinned_buffer_index[shape] = 0

idx = self._pinned_buffer_index[shape]
buffer = self._pinned_buffer_cache[shape][idx]
self._pinned_buffer_index[shape] = 1 - idx
return buffer

def put(self, frame: VideoFrame) -> bool:
if not self.running:
Expand Down Expand Up @@ -377,10 +390,9 @@ def put(self, frame: VideoFrame) -> bool:

if torch.cuda.is_available():
shape = frame_array.shape
# Double-buffered: alternates between two pinned buffers so the
# prior DMA (non_blocking=True) can finish while we write into the other.
pinned_buffer = self._get_or_create_pinned_buffer(shape)
# Note: We reuse pinned buffers for performance. This assumes the copy_()
# operation completes before the next frame arrives.
# In practice, copy_() is very fast (~microseconds) and frames arrive at 60 FPS max
pinned_buffer.copy_(torch.as_tensor(frame_array, dtype=torch.uint8))
frame_tensor = pinned_buffer.cuda(non_blocking=True)
else:
Expand Down Expand Up @@ -424,9 +436,12 @@ def get(self) -> torch.Tensor | None:
try:
frame = last_processor.output_queue.get_nowait()
# Frame is stored as [1, H, W, C], convert to [H, W, C] for output
# Move to CPU here for WebRTC streaming (frames stay on GPU between pipeline processors)
# Frames are already on CPU (moved by the last PipelineProcessor).
frame = frame.squeeze(0)
if frame.is_cuda:
logger.warning(
"Frame from last processor is still on GPU, moving to CPU"
)
frame = frame.cpu()
except queue.Empty:
return None
Expand Down Expand Up @@ -490,6 +505,7 @@ def _on_frame_from_cloud(self, frame: "VideoFrame") -> None:
self._cloud_output_queue.put_nowait(frame_np)
except queue.Empty:
pass
self._signal_output_ready()
except Exception as e:
logger.error(f"[FRAME-PROCESSOR] Error processing frame from cloud: {e}")

Expand Down Expand Up @@ -949,6 +965,11 @@ def _setup_pipeline_chain_sync(self):
curr_processor = self.pipeline_processors[i]
prev_processor.set_next_processor(curr_processor)

if self.pipeline_processors:
self.pipeline_processors[-1].set_output_ready_callback(
self._signal_output_ready
)

# Start all processors
for processor in self.pipeline_processors:
processor.start()
Expand All @@ -957,6 +978,39 @@ def _setup_pipeline_chain_sync(self):
f"Created pipeline chain with {len(self.pipeline_processors)} processors"
)

def set_event_loop(self, loop: asyncio.AbstractEventLoop):
"""Register the asyncio event loop for thread-safe event signaling.

Must be called from the asyncio thread before recv() polling begins.
"""
self._event_loop = loop
self._output_event = asyncio.Event()

def _signal_output_ready(self):
"""Signal that a new output frame is available.

Thread-safe: can be called from any thread.
"""
if self._event_loop is not None and self._output_event is not None:
try:
self._event_loop.call_soon_threadsafe(self._output_event.set)
except RuntimeError:
pass

async def wait_for_output(self, timeout: float = 0.1) -> None:
"""Wait until a new output frame is available or timeout expires.

Returns immediately if a frame is already available.
"""
if self._output_event is None:
await asyncio.sleep(0.01)
return
try:
await asyncio.wait_for(self._output_event.wait(), timeout=timeout)
except TimeoutError:
pass
self._output_event.clear()

def __enter__(self):
self.start()
return self
Expand Down
18 changes: 17 additions & 1 deletion src/scope/server/pipeline_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,18 @@ def __init__(
# Set when reset_cache flushes queues, cleared after successful pipeline call
self._pending_cache_init = False

# Callback to signal that output frames are available (set by FrameProcessor)
self._output_ready_callback: callable | None = None

# Throttler for controlling processing rate in chained pipelines
# Throttling is applied when this pipeline produces frames faster than
# the next pipeline in the chain can consume them
self.throttler = PipelineThrottler()

def set_output_ready_callback(self, callback: callable):
"""Set callback to invoke when frames are placed in the output queue."""
self._output_ready_callback = callback

def _resize_output_queue(self, target_size: int):
"""Resize the output queue to the target size, transferring existing frames.

Expand Down Expand Up @@ -492,7 +499,6 @@ def process_chunk(self):
self.throttler.record_output_batch(num_frames, processing_time)

# Normalize to [0, 255] and convert to uint8
# Keep frames on GPU - frame_processor handles CPU transfer for streaming
output = (
(output * 255.0)
.clamp(0, 255)
Expand All @@ -501,6 +507,11 @@ def process_chunk(self):
.detach()
)

# Move to CPU in the worker thread for the last processor in the chain.
# Intermediate processors keep frames on GPU for the next pipeline.
if self.next_processor is None:
output = output.cpu()

# Resize output queue to meet target max size
target_output_queue_max_size = num_frames * OUTPUT_QUEUE_MAX_SIZE_FACTOR
self._resize_output_queue(target_output_queue_max_size)
Expand All @@ -509,18 +520,23 @@ def process_chunk(self):
# For intermediate pipelines, output goes to next pipeline's input
# For last pipeline, output goes to frame_processor's output_queue
# Output frames are [H, W, C], convert to [1, H, W, C] for consistency
frames_queued = False
for frame in output:
frame = frame.unsqueeze(0)
# Track when a frame is ready (production rate)
self._track_output_frame()
try:
self.output_queue.put_nowait(frame)
frames_queued = True
except queue.Full:
logger.debug(
f"Output queue full for {self.pipeline_id}, dropping processed frame"
)
continue

if frames_queued and self._output_ready_callback is not None:
self._output_ready_callback()

# Apply throttling if this pipeline is producing faster than next can consume
# Only throttle if: (1) has video input, (2) has next processor
if video_input is not None and self.next_processor is not None:
Expand Down
5 changes: 3 additions & 2 deletions src/scope/server/tracks.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def initialize_output_processing(self):
connection_id=self.connection_id,
connection_info=self.connection_info,
)
self.frame_processor.set_event_loop(asyncio.get_running_loop())
self.frame_processor.start()

def initialize_input_processing(self, track: MediaStreamTrack):
Expand Down Expand Up @@ -158,8 +159,8 @@ async def recv(self) -> VideoFrame:
self._last_frame = frame
return frame

# No frame available, wait a bit before trying again
await asyncio.sleep(0.01)
# Wait for a frame to be produced
await self.frame_processor.wait_for_output()

except Exception as e:
logger.error(f"Error getting processed frame: {e}")
Expand Down
Loading