Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions src/scope/core/pipelines/interface.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Base interface for all pipelines."""

import gc
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING

Expand Down Expand Up @@ -76,3 +77,41 @@ def __call__(self, **kwargs) -> dict:
The video tensor is in THWC format and [0, 1] range.
"""
pass

def cleanup(self) -> None:
"""Release GPU resources. Called before pipeline is unloaded."""
import torch

if hasattr(self, "components"):
try:
components_dict = getattr(self.components, "_components", None)
if components_dict is not None:
for name in list(components_dict.keys()):
del components_dict[name]
del self.components
except Exception:
pass

if hasattr(self, "state"):
try:
if hasattr(self.state, "values"):
self.state.values.clear()
del self.state
except Exception:
pass
Comment on lines +85 to +101
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this part specific to some pipelines. Wouldn't it be possible to just iterate over all fields (like below) and delete rather than assuming that some pipelines have components and state fields?


for attr_name in list(self.__dict__.keys()):
try:
delattr(self, attr_name)
except Exception:
pass

gc.collect()
gc.collect()
if torch.cuda.is_available():
try:
torch._dynamo.reset()
torch.cuda.empty_cache()
torch.cuda.synchronize()
except Exception:
pass
22 changes: 22 additions & 0 deletions src/scope/server/frame_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ def start(self):
self.running = False
return

# Register for pipeline unload notifications to release references
if self.pipeline_manager:
self.pipeline_manager.register_unload_callback(self._on_pipeline_unloading)

logger.info(
f"[FRAME-PROCESSOR] Started with {len(self.pipeline_ids)} pipeline(s): {self.pipeline_ids}"
)
Expand All @@ -212,13 +216,24 @@ def stop(self, error_message: str = None):

self.running = False

# Unregister pipeline unload callback
if self.pipeline_manager:
self.pipeline_manager.unregister_unload_callback(
self._on_pipeline_unloading
)

# Stop all pipeline processors
for processor in self.pipeline_processors:
processor.stop()

# Clear pipeline processors
self.pipeline_processors.clear()

with self._pinned_buffer_lock:
for key in list(self._pinned_buffer_cache.keys()):
tensor = self._pinned_buffer_cache.pop(key)
del tensor

# Clean up Spout sender
self.spout_sender_enabled = False
if self.spout_sender_thread and self.spout_sender_thread.is_alive():
Expand Down Expand Up @@ -572,6 +587,13 @@ def _get_pipeline_dimensions(self) -> tuple[int, int]:
logger.warning(f"Could not get pipeline dimensions: {e}")
return 512, 512

def _on_pipeline_unloading(self, pipeline_id: str):
"""Release pipeline reference when a pipeline is being unloaded."""
for processor in self.pipeline_processors:
if processor.pipeline_id == pipeline_id:
processor.stop()
processor.release_pipeline()

def update_parameters(self, parameters: dict[str, Any]):
"""Update parameters that will be used in the next pipeline call."""
# Handle Spout output settings
Expand Down
82 changes: 81 additions & 1 deletion src/scope/server/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import threading
import time
from collections.abc import Callable
from enum import Enum
from typing import Any

Expand Down Expand Up @@ -52,6 +53,20 @@ def __init__(self):
self._pipeline_statuses: dict[str, PipelineStatus] = {} # pipeline_id -> status
self._pipeline_load_params: dict[str, dict] = {} # pipeline_id -> load_params

# Callbacks invoked before a pipeline is unloaded (to release external references)
self._unload_callbacks: list[Callable[[str], None]] = []

def register_unload_callback(self, callback: Callable[[str], None]):
"""Register a callback to be called before a pipeline is unloaded."""
self._unload_callbacks.append(callback)

def unregister_unload_callback(self, callback: Callable[[str], None]):
"""Unregister a pipeline unload callback."""
try:
self._unload_callbacks.remove(callback)
except ValueError:
pass

@property
def status(self) -> PipelineStatus:
"""Get current pipeline status."""
Expand Down Expand Up @@ -420,6 +435,18 @@ def _load_pipelines_sync(
if loaded_id not in pipeline_ids or current_params != new_params:
pipelines_to_unload.add(loaded_id)

# Log GPU memory before unload
if pipelines_to_unload and torch.cuda.is_available():
try:
from scope.core.pipelines.memory import get_cuda_free_memory_gb

device = torch.device("cuda")
logger.info(
f"GPU memory free before unload: {get_cuda_free_memory_gb(device):.2f} GiB"
)
except Exception as e:
logger.warning(f"Failed to log GPU memory before unload: {e}")

# Unload pipelines that need to be unloaded
for pipeline_id_to_unload in pipelines_to_unload:
self._unload_pipeline_by_id_unsafe(
Expand All @@ -428,6 +455,28 @@ def _load_pipelines_sync(
connection_info=connection_info,
)

# Log GPU memory after unload
if pipelines_to_unload and torch.cuda.is_available():
try:
from scope.core.pipelines.memory import get_cuda_free_memory_gb

device = torch.device("cuda")
logger.info(
f"GPU memory free after unload: {get_cuda_free_memory_gb(device):.2f} GiB"
)
except Exception as e:
logger.warning(f"Failed to log GPU memory after unload: {e}")

# Clean up stale status entries for pipelines not in the new request list
# This handles cases where a previous pipeline failed to load (status=ERROR/NOT_LOADED)
# but its status entry was never cleaned up because it wasn't in _pipelines
stale_status_ids = [
pid for pid in self._pipeline_statuses.keys() if pid not in pipeline_ids
]
for stale_id in stale_status_ids:
del self._pipeline_statuses[stale_id]
logger.debug(f"Cleaned up stale status entry for pipeline: {stale_id}")

# Load all pipelines
success = True
for pipeline_id in pipeline_ids:
Expand All @@ -450,6 +499,18 @@ def _load_pipelines_sync(
else:
logger.error("Some pipelines failed to load")

# Log GPU memory after load
if torch.cuda.is_available():
try:
from scope.core.pipelines.memory import get_cuda_free_memory_gb

device = torch.device("cuda")
logger.info(
f"GPU memory free after load: {get_cuda_free_memory_gb(device):.2f} GiB"
)
except Exception as e:
logger.warning(f"Failed to log GPU memory after load: {e}")

return success

def _get_vace_checkpoint_path(self) -> str:
Expand Down Expand Up @@ -576,6 +637,23 @@ def _unload_pipeline_by_id_unsafe(

logger.info(f"Unloading pipeline: {pipeline_id}")

# Notify listeners to release pipeline references before cleanup
for callback in self._unload_callbacks:
try:
callback(pipeline_id)
except Exception as e:
logger.warning(f"Unload callback failed for {pipeline_id}: {e}")

# Call cleanup to explicitly free GPU resources before removing references
pipeline_obj = self._pipelines.get(pipeline_id)
if pipeline_obj is None and self._pipeline_id == pipeline_id:
pipeline_obj = self._pipeline
if pipeline_obj is not None and hasattr(pipeline_obj, "cleanup"):
try:
pipeline_obj.cleanup()
except Exception as e:
logger.warning(f"Pipeline cleanup failed for {pipeline_id}: {e}")

# Remove from tracked pipelines
if pipeline_id in self._pipelines:
del self._pipelines[pipeline_id]
Expand All @@ -592,12 +670,14 @@ def _unload_pipeline_by_id_unsafe(
self._load_params = None
self._error_message = None

# Cleanup resources
gc.collect()
gc.collect()
if torch.cuda.is_available():
try:
torch._dynamo.reset()
torch.cuda.empty_cache()
torch.cuda.synchronize()
torch.cuda.ipc_collect()
logger.info("CUDA cache cleared")
except Exception as e:
logger.warning(f"CUDA cleanup failed: {e}")
Expand Down
22 changes: 18 additions & 4 deletions src/scope/server/pipeline_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,25 +179,34 @@ def stop(self):
if threading.current_thread() != self.worker_thread:
self.worker_thread.join(timeout=5.0)

# Clear queues
with self.input_queue_lock:
input_queue_ref = self.input_queue
if input_queue_ref:
while not input_queue_ref.empty():
try:
input_queue_ref.get_nowait()
frame = input_queue_ref.get_nowait()
del frame
except queue.Empty:
break

if self.output_queue:
while not self.output_queue.empty():
try:
self.output_queue.get_nowait()
frame = self.output_queue.get_nowait()
del frame
except queue.Empty:
break

# Release pipeline reference to allow GC of GPU resources
self.pipeline = None
Comment on lines +200 to +201
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should not need to explicitly set self.pipeline = None. The whole PipelineProcessor object should be cleaned after the stream is stopped.

I think the flow should be:

  1. We start the stream, there are the following objects created: VideoTrack/CloudTrack, FrameProcessor, and (a few) PipelineProcessor objects.
  2. We stop the stream, all these objects are deleted
    So effectively, the only object that is preserved between stream start/stop, is PipelineManager, because we want to avoid reloading the pipelines if they are already loaded. All the rest should be cleaned up in between stream start/stop.


logger.info(f"PipelineProcessor stopped for pipeline: {self.pipeline_id}")

def release_pipeline(self):
"""Release pipeline reference to allow GC of GPU resources."""
logger.info(f"Releasing pipeline reference for {self.pipeline_id}")
self.pipeline = None

Comment on lines +205 to +209
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be needed, because we set self.pipeline = None in stop() and we always call stop() and release_pipeline() together.

def update_parameters(self, parameters: dict[str, Any]):
"""Update parameters that will be used in the next pipeline call."""
try:
Expand Down Expand Up @@ -357,10 +366,15 @@ def process_chunk(self):
if self.output_queue:
while not self.output_queue.empty():
try:
self.output_queue.get_nowait()
frame = self.output_queue.get_nowait()
del frame
except queue.Empty:
break

# Pipeline may have been released during a switch
if self.pipeline is None:
return

requirements = None
if hasattr(self.pipeline, "prepare"):
prepare_params = dict(self.parameters.items())
Expand Down
1 change: 1 addition & 0 deletions src/scope/server/tracks.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,5 +184,6 @@ async def stop(self):

if self.frame_processor is not None:
self.frame_processor.stop()
self.frame_processor = None

await super().stop()
Loading