From 8ba61923f3706687600c50e72073ed918002f529 Mon Sep 17 00:00:00 2001 From: emranemran Date: Thu, 5 Feb 2026 23:39:29 -0800 Subject: [PATCH 1/7] Add pipeline cleanup and GPU memory logging - Add cleanup() to Pipeline base class (gc.collect + empty_cache) - Call cleanup on pipeline unload to free GPU memory - Log GPU memory before/after unload and after load - Clear pinned buffer cache on frame processor stop - Release pipeline reference on pipeline processor stop Co-Authored-By: Claude Opus 4.6 Signed-off-by: emranemran --- src/scope/core/pipelines/interface.py | 15 +++++++ src/scope/server/frame_processor.py | 3 ++ src/scope/server/pipeline_manager.py | 56 ++++++++++++++++++++++++++ src/scope/server/pipeline_processor.py | 3 ++ 4 files changed, 77 insertions(+) diff --git a/src/scope/core/pipelines/interface.py b/src/scope/core/pipelines/interface.py index 7e56017b6..d40c8fa5a 100644 --- a/src/scope/core/pipelines/interface.py +++ b/src/scope/core/pipelines/interface.py @@ -1,5 +1,6 @@ """Base interface for all pipelines.""" +import gc from abc import ABC, abstractmethod from typing import TYPE_CHECKING @@ -76,3 +77,17 @@ def __call__(self, **kwargs) -> dict: The video tensor is in THWC format and [0, 1] range. """ pass + + def cleanup(self) -> None: + """Release GPU resources. Called before pipeline is unloaded. + + Subclasses may override for custom cleanup behavior. + """ + gc.collect() + try: + import torch + + if torch.cuda.is_available(): + torch.cuda.empty_cache() + except ImportError: + pass diff --git a/src/scope/server/frame_processor.py b/src/scope/server/frame_processor.py index 906dae5c2..128886105 100644 --- a/src/scope/server/frame_processor.py +++ b/src/scope/server/frame_processor.py @@ -219,6 +219,9 @@ def stop(self, error_message: str = None): # Clear pipeline processors self.pipeline_processors.clear() + # Clear pinned memory buffers + self._pinned_buffer_cache.clear() + # Clean up Spout sender self.spout_sender_enabled = False if self.spout_sender_thread and self.spout_sender_thread.is_alive(): diff --git a/src/scope/server/pipeline_manager.py b/src/scope/server/pipeline_manager.py index c02182f97..90a2155c0 100644 --- a/src/scope/server/pipeline_manager.py +++ b/src/scope/server/pipeline_manager.py @@ -420,6 +420,18 @@ def _load_pipelines_sync( if loaded_id not in pipeline_ids or current_params != new_params: pipelines_to_unload.add(loaded_id) + # Log GPU memory before unload + if pipelines_to_unload and torch.cuda.is_available(): + try: + from scope.core.pipelines.memory import get_cuda_free_memory_gb + + device = torch.device("cuda") + logger.info( + f"GPU memory free before unload: {get_cuda_free_memory_gb(device):.2f} GiB" + ) + except Exception as e: + logger.warning(f"Failed to log GPU memory before unload: {e}") + # Unload pipelines that need to be unloaded for pipeline_id_to_unload in pipelines_to_unload: self._unload_pipeline_by_id_unsafe( @@ -428,6 +440,28 @@ def _load_pipelines_sync( connection_info=connection_info, ) + # Log GPU memory after unload + if pipelines_to_unload and torch.cuda.is_available(): + try: + from scope.core.pipelines.memory import get_cuda_free_memory_gb + + device = torch.device("cuda") + logger.info( + f"GPU memory free after unload: {get_cuda_free_memory_gb(device):.2f} GiB" + ) + except Exception as e: + logger.warning(f"Failed to log GPU memory after unload: {e}") + + # Clean up stale status entries for pipelines not in the new request list + # This handles cases where a previous pipeline failed to load (status=ERROR/NOT_LOADED) + # but its status entry was never cleaned up because it wasn't in _pipelines + stale_status_ids = [ + pid for pid in self._pipeline_statuses.keys() if pid not in pipeline_ids + ] + for stale_id in stale_status_ids: + del self._pipeline_statuses[stale_id] + logger.debug(f"Cleaned up stale status entry for pipeline: {stale_id}") + # Load all pipelines success = True for pipeline_id in pipeline_ids: @@ -450,6 +484,18 @@ def _load_pipelines_sync( else: logger.error("Some pipelines failed to load") + # Log GPU memory after load + if torch.cuda.is_available(): + try: + from scope.core.pipelines.memory import get_cuda_free_memory_gb + + device = torch.device("cuda") + logger.info( + f"GPU memory free after load: {get_cuda_free_memory_gb(device):.2f} GiB" + ) + except Exception as e: + logger.warning(f"Failed to log GPU memory after load: {e}") + return success def _get_vace_checkpoint_path(self) -> str: @@ -576,6 +622,16 @@ def _unload_pipeline_by_id_unsafe( logger.info(f"Unloading pipeline: {pipeline_id}") + # Call cleanup to explicitly free GPU resources before removing references + pipeline_obj = self._pipelines.get(pipeline_id) + if pipeline_obj is None and self._pipeline_id == pipeline_id: + pipeline_obj = self._pipeline + if pipeline_obj is not None and hasattr(pipeline_obj, "cleanup"): + try: + pipeline_obj.cleanup() + except Exception as e: + logger.warning(f"Pipeline cleanup failed for {pipeline_id}: {e}") + # Remove from tracked pipelines if pipeline_id in self._pipelines: del self._pipelines[pipeline_id] diff --git a/src/scope/server/pipeline_processor.py b/src/scope/server/pipeline_processor.py index 495fa8116..97a9828ee 100644 --- a/src/scope/server/pipeline_processor.py +++ b/src/scope/server/pipeline_processor.py @@ -196,6 +196,9 @@ def stop(self): except queue.Empty: break + # Release pipeline reference to allow GC of GPU resources + self.pipeline = None + logger.info(f"PipelineProcessor stopped for pipeline: {self.pipeline_id}") def update_parameters(self, parameters: dict[str, Any]): From 040cfdd82c765cf3122bf9a827197f6e324dff8b Mon Sep 17 00:00:00 2001 From: emranemran Date: Fri, 6 Feb 2026 00:47:50 -0800 Subject: [PATCH 2/7] Release pipeline references before cleanup during switches Add unload callback mechanism so PipelineManager can notify FrameProcessors to release pipeline references before calling cleanup(). This allows gc.collect() + empty_cache() to actually free GPU memory during pipeline switches, not just on session end. Co-Authored-By: Claude Opus 4.6 Signed-off-by: emranemran --- src/scope/server/frame_processor.py | 18 ++++++++++++++++++ src/scope/server/pipeline_manager.py | 22 ++++++++++++++++++++++ src/scope/server/pipeline_processor.py | 9 +++++++++ 3 files changed, 49 insertions(+) diff --git a/src/scope/server/frame_processor.py b/src/scope/server/frame_processor.py index 128886105..0f18c654a 100644 --- a/src/scope/server/frame_processor.py +++ b/src/scope/server/frame_processor.py @@ -191,6 +191,12 @@ def start(self): self.running = False return + # Register for pipeline unload notifications to release references + if self.pipeline_manager: + self.pipeline_manager.register_unload_callback( + self._on_pipeline_unloading + ) + logger.info( f"[FRAME-PROCESSOR] Started with {len(self.pipeline_ids)} pipeline(s): {self.pipeline_ids}" ) @@ -212,6 +218,12 @@ def stop(self, error_message: str = None): self.running = False + # Unregister pipeline unload callback + if self.pipeline_manager: + self.pipeline_manager.unregister_unload_callback( + self._on_pipeline_unloading + ) + # Stop all pipeline processors for processor in self.pipeline_processors: processor.stop() @@ -575,6 +587,12 @@ def _get_pipeline_dimensions(self) -> tuple[int, int]: logger.warning(f"Could not get pipeline dimensions: {e}") return 512, 512 + def _on_pipeline_unloading(self, pipeline_id: str): + """Release pipeline reference when a pipeline is being unloaded.""" + for processor in self.pipeline_processors: + if processor.pipeline_id == pipeline_id: + processor.release_pipeline() + def update_parameters(self, parameters: dict[str, Any]): """Update parameters that will be used in the next pipeline call.""" # Handle Spout output settings diff --git a/src/scope/server/pipeline_manager.py b/src/scope/server/pipeline_manager.py index 90a2155c0..b2edb6f3f 100644 --- a/src/scope/server/pipeline_manager.py +++ b/src/scope/server/pipeline_manager.py @@ -6,6 +6,7 @@ import threading import time from enum import Enum +from collections.abc import Callable from typing import Any import torch @@ -52,6 +53,20 @@ def __init__(self): self._pipeline_statuses: dict[str, PipelineStatus] = {} # pipeline_id -> status self._pipeline_load_params: dict[str, dict] = {} # pipeline_id -> load_params + # Callbacks invoked before a pipeline is unloaded (to release external references) + self._unload_callbacks: list[Callable[[str], None]] = [] + + def register_unload_callback(self, callback: Callable[[str], None]): + """Register a callback to be called before a pipeline is unloaded.""" + self._unload_callbacks.append(callback) + + def unregister_unload_callback(self, callback: Callable[[str], None]): + """Unregister a pipeline unload callback.""" + try: + self._unload_callbacks.remove(callback) + except ValueError: + pass + @property def status(self) -> PipelineStatus: """Get current pipeline status.""" @@ -622,6 +637,13 @@ def _unload_pipeline_by_id_unsafe( logger.info(f"Unloading pipeline: {pipeline_id}") + # Notify listeners to release pipeline references before cleanup + for callback in self._unload_callbacks: + try: + callback(pipeline_id) + except Exception as e: + logger.warning(f"Unload callback failed for {pipeline_id}: {e}") + # Call cleanup to explicitly free GPU resources before removing references pipeline_obj = self._pipelines.get(pipeline_id) if pipeline_obj is None and self._pipeline_id == pipeline_id: diff --git a/src/scope/server/pipeline_processor.py b/src/scope/server/pipeline_processor.py index 97a9828ee..cc0001a10 100644 --- a/src/scope/server/pipeline_processor.py +++ b/src/scope/server/pipeline_processor.py @@ -201,6 +201,11 @@ def stop(self): logger.info(f"PipelineProcessor stopped for pipeline: {self.pipeline_id}") + def release_pipeline(self): + """Release pipeline reference to allow GC of GPU resources.""" + logger.info(f"Releasing pipeline reference for {self.pipeline_id}") + self.pipeline = None + def update_parameters(self, parameters: dict[str, Any]): """Update parameters that will be used in the next pipeline call.""" try: @@ -364,6 +369,10 @@ def process_chunk(self): except queue.Empty: break + # Pipeline may have been released during a switch + if self.pipeline is None: + return + requirements = None if hasattr(self.pipeline, "prepare"): prepare_params = dict(self.parameters.items()) From 774575691216d90dddb78bcbb9ebf46c495ef64a Mon Sep 17 00:00:00 2001 From: Varshith B Date: Fri, 6 Feb 2026 12:01:24 +0000 Subject: [PATCH 3/7] fix: cleanup Signed-off-by: Varshith B --- src/scope/core/pipelines/interface.py | 28 ++++++++++++++++++++++++--- src/scope/server/frame_processor.py | 1 + 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/src/scope/core/pipelines/interface.py b/src/scope/core/pipelines/interface.py index d40c8fa5a..4e6f25cda 100644 --- a/src/scope/core/pipelines/interface.py +++ b/src/scope/core/pipelines/interface.py @@ -79,10 +79,32 @@ def __call__(self, **kwargs) -> dict: pass def cleanup(self) -> None: - """Release GPU resources. Called before pipeline is unloaded. + """Release GPU resources. Called before pipeline is unloaded.""" + + if hasattr(self, "components"): + try: + components_dict = getattr(self.components, "_components", None) + if components_dict is not None: + for name in list(components_dict.keys()): + del components_dict[name] + del self.components + except Exception as e: + pass + + if hasattr(self, "state"): + try: + if hasattr(self.state, "values"): + self.state.values.clear() + del self.state + except Exception as e: + pass + + if hasattr(self, "blocks"): + try: + del self.blocks + except Exception as e: + pass - Subclasses may override for custom cleanup behavior. - """ gc.collect() try: import torch diff --git a/src/scope/server/frame_processor.py b/src/scope/server/frame_processor.py index 0f18c654a..f2f3b8fb6 100644 --- a/src/scope/server/frame_processor.py +++ b/src/scope/server/frame_processor.py @@ -591,6 +591,7 @@ def _on_pipeline_unloading(self, pipeline_id: str): """Release pipeline reference when a pipeline is being unloaded.""" for processor in self.pipeline_processors: if processor.pipeline_id == pipeline_id: + processor.stop() processor.release_pipeline() def update_parameters(self, parameters: dict[str, Any]): From 718e76ebf97507197d417da81ab7422f139ecb75 Mon Sep 17 00:00:00 2001 From: Varshith B Date: Fri, 6 Feb 2026 17:32:28 +0000 Subject: [PATCH 4/7] feat: multilevel cleanup Signed-off-by: Varshith B --- src/scope/core/pipelines/interface.py | 64 +++++++++++++++++++++++---- 1 file changed, 55 insertions(+), 9 deletions(-) diff --git a/src/scope/core/pipelines/interface.py b/src/scope/core/pipelines/interface.py index 4e6f25cda..c112dd728 100644 --- a/src/scope/core/pipelines/interface.py +++ b/src/scope/core/pipelines/interface.py @@ -80,6 +80,7 @@ def __call__(self, **kwargs) -> dict: def cleanup(self) -> None: """Release GPU resources. Called before pipeline is unloaded.""" + import torch if hasattr(self, "components"): try: @@ -88,7 +89,7 @@ def cleanup(self) -> None: for name in list(components_dict.keys()): del components_dict[name] del self.components - except Exception as e: + except Exception: pass if hasattr(self, "state"): @@ -96,20 +97,65 @@ def cleanup(self) -> None: if hasattr(self.state, "values"): self.state.values.clear() del self.state - except Exception as e: + except Exception: pass if hasattr(self, "blocks"): try: del self.blocks - except Exception as e: + except Exception: pass + self._cleanup_gpu_objects(self, depth=0, max_depth=2, visited=set()) + gc.collect() - try: - import torch + if torch.cuda.is_available(): + torch.cuda.empty_cache() + + def _cleanup_gpu_objects( + self, obj: object, depth: int, max_depth: int, visited: set + ) -> None: + """Recursively find and delete GPU objects (nn.Module, Tensor) from __dict__. + + Args: + obj: Object to inspect + depth: Current recursion depth + max_depth: Maximum recursion depth (2 levels catches wrapper patterns) + visited: Set of object ids already visited (prevents circular refs) + """ + import torch + + obj_id = id(obj) + if obj_id in visited: + return + visited.add(obj_id) + + if depth >= max_depth: + return + + if not hasattr(obj, "__dict__"): + return + + attrs_to_delete = [] + objects_to_recurse = [] + + for attr_name, attr_value in list(obj.__dict__.items()): + if isinstance(attr_value, (torch.nn.Module, torch.Tensor)): + attrs_to_delete.append(attr_name) + elif ( + hasattr(attr_value, "__dict__") + and not isinstance(attr_value, type) + and not isinstance( + attr_value, (str, bytes, int, float, bool, list, dict, tuple) + ) + ): + objects_to_recurse.append(attr_value) + + for attr_name in attrs_to_delete: + try: + delattr(obj, attr_name) + except Exception: + pass - if torch.cuda.is_available(): - torch.cuda.empty_cache() - except ImportError: - pass + for nested_obj in objects_to_recurse: + self._cleanup_gpu_objects(nested_obj, depth + 1, max_depth, visited) From b8774b1f1bc3000251f88dc80d5ceebddf92024a Mon Sep 17 00:00:00 2001 From: Varshith B Date: Fri, 6 Feb 2026 19:18:52 +0000 Subject: [PATCH 5/7] fix: cleanup logic Signed-off-by: Varshith B --- src/scope/core/pipelines/interface.py | 56 +-------------------------- 1 file changed, 2 insertions(+), 54 deletions(-) diff --git a/src/scope/core/pipelines/interface.py b/src/scope/core/pipelines/interface.py index c112dd728..c52158897 100644 --- a/src/scope/core/pipelines/interface.py +++ b/src/scope/core/pipelines/interface.py @@ -88,7 +88,6 @@ def cleanup(self) -> None: if components_dict is not None: for name in list(components_dict.keys()): del components_dict[name] - del self.components except Exception: pass @@ -96,66 +95,15 @@ def cleanup(self) -> None: try: if hasattr(self.state, "values"): self.state.values.clear() - del self.state except Exception: pass - if hasattr(self, "blocks"): + for attr_name in list(self.__dict__.keys()): try: - del self.blocks + delattr(self, attr_name) except Exception: pass - self._cleanup_gpu_objects(self, depth=0, max_depth=2, visited=set()) - gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() - - def _cleanup_gpu_objects( - self, obj: object, depth: int, max_depth: int, visited: set - ) -> None: - """Recursively find and delete GPU objects (nn.Module, Tensor) from __dict__. - - Args: - obj: Object to inspect - depth: Current recursion depth - max_depth: Maximum recursion depth (2 levels catches wrapper patterns) - visited: Set of object ids already visited (prevents circular refs) - """ - import torch - - obj_id = id(obj) - if obj_id in visited: - return - visited.add(obj_id) - - if depth >= max_depth: - return - - if not hasattr(obj, "__dict__"): - return - - attrs_to_delete = [] - objects_to_recurse = [] - - for attr_name, attr_value in list(obj.__dict__.items()): - if isinstance(attr_value, (torch.nn.Module, torch.Tensor)): - attrs_to_delete.append(attr_name) - elif ( - hasattr(attr_value, "__dict__") - and not isinstance(attr_value, type) - and not isinstance( - attr_value, (str, bytes, int, float, bool, list, dict, tuple) - ) - ): - objects_to_recurse.append(attr_value) - - for attr_name in attrs_to_delete: - try: - delattr(obj, attr_name) - except Exception: - pass - - for nested_obj in objects_to_recurse: - self._cleanup_gpu_objects(nested_obj, depth + 1, max_depth, visited) From e488ca7a940a87367704942770313e1825ec19b3 Mon Sep 17 00:00:00 2001 From: Varshith B Date: Thu, 12 Feb 2026 06:29:50 +0000 Subject: [PATCH 6/7] fix: ruff Signed-off-by: Varshith B --- src/scope/server/frame_processor.py | 4 +--- src/scope/server/pipeline_manager.py | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/scope/server/frame_processor.py b/src/scope/server/frame_processor.py index f2f3b8fb6..ef6498f23 100644 --- a/src/scope/server/frame_processor.py +++ b/src/scope/server/frame_processor.py @@ -193,9 +193,7 @@ def start(self): # Register for pipeline unload notifications to release references if self.pipeline_manager: - self.pipeline_manager.register_unload_callback( - self._on_pipeline_unloading - ) + self.pipeline_manager.register_unload_callback(self._on_pipeline_unloading) logger.info( f"[FRAME-PROCESSOR] Started with {len(self.pipeline_ids)} pipeline(s): {self.pipeline_ids}" diff --git a/src/scope/server/pipeline_manager.py b/src/scope/server/pipeline_manager.py index b2edb6f3f..2bd9b6019 100644 --- a/src/scope/server/pipeline_manager.py +++ b/src/scope/server/pipeline_manager.py @@ -5,8 +5,8 @@ import logging import threading import time -from enum import Enum from collections.abc import Callable +from enum import Enum from typing import Any import torch From 99e93cab1e904e3ff621cc411717241afd226a3b Mon Sep 17 00:00:00 2001 From: Varshith B Date: Thu, 12 Feb 2026 07:55:18 +0000 Subject: [PATCH 7/7] fix: thorough clean up Signed-off-by: Varshith B --- src/scope/core/pipelines/interface.py | 10 +++++++++- src/scope/server/frame_processor.py | 6 ++++-- src/scope/server/pipeline_manager.py | 4 +++- src/scope/server/pipeline_processor.py | 10 ++++++---- src/scope/server/tracks.py | 1 + 5 files changed, 23 insertions(+), 8 deletions(-) diff --git a/src/scope/core/pipelines/interface.py b/src/scope/core/pipelines/interface.py index c52158897..925eba102 100644 --- a/src/scope/core/pipelines/interface.py +++ b/src/scope/core/pipelines/interface.py @@ -88,6 +88,7 @@ def cleanup(self) -> None: if components_dict is not None: for name in list(components_dict.keys()): del components_dict[name] + del self.components except Exception: pass @@ -95,6 +96,7 @@ def cleanup(self) -> None: try: if hasattr(self.state, "values"): self.state.values.clear() + del self.state except Exception: pass @@ -104,6 +106,12 @@ def cleanup(self) -> None: except Exception: pass + gc.collect() gc.collect() if torch.cuda.is_available(): - torch.cuda.empty_cache() + try: + torch._dynamo.reset() + torch.cuda.empty_cache() + torch.cuda.synchronize() + except Exception: + pass diff --git a/src/scope/server/frame_processor.py b/src/scope/server/frame_processor.py index ef6498f23..888862f4d 100644 --- a/src/scope/server/frame_processor.py +++ b/src/scope/server/frame_processor.py @@ -229,8 +229,10 @@ def stop(self, error_message: str = None): # Clear pipeline processors self.pipeline_processors.clear() - # Clear pinned memory buffers - self._pinned_buffer_cache.clear() + with self._pinned_buffer_lock: + for key in list(self._pinned_buffer_cache.keys()): + tensor = self._pinned_buffer_cache.pop(key) + del tensor # Clean up Spout sender self.spout_sender_enabled = False diff --git a/src/scope/server/pipeline_manager.py b/src/scope/server/pipeline_manager.py index 2bd9b6019..aa59936a5 100644 --- a/src/scope/server/pipeline_manager.py +++ b/src/scope/server/pipeline_manager.py @@ -670,12 +670,14 @@ def _unload_pipeline_by_id_unsafe( self._load_params = None self._error_message = None - # Cleanup resources + gc.collect() gc.collect() if torch.cuda.is_available(): try: + torch._dynamo.reset() torch.cuda.empty_cache() torch.cuda.synchronize() + torch.cuda.ipc_collect() logger.info("CUDA cache cleared") except Exception as e: logger.warning(f"CUDA cleanup failed: {e}") diff --git a/src/scope/server/pipeline_processor.py b/src/scope/server/pipeline_processor.py index cc0001a10..7a9dda20f 100644 --- a/src/scope/server/pipeline_processor.py +++ b/src/scope/server/pipeline_processor.py @@ -179,20 +179,21 @@ def stop(self): if threading.current_thread() != self.worker_thread: self.worker_thread.join(timeout=5.0) - # Clear queues with self.input_queue_lock: input_queue_ref = self.input_queue if input_queue_ref: while not input_queue_ref.empty(): try: - input_queue_ref.get_nowait() + frame = input_queue_ref.get_nowait() + del frame except queue.Empty: break if self.output_queue: while not self.output_queue.empty(): try: - self.output_queue.get_nowait() + frame = self.output_queue.get_nowait() + del frame except queue.Empty: break @@ -365,7 +366,8 @@ def process_chunk(self): if self.output_queue: while not self.output_queue.empty(): try: - self.output_queue.get_nowait() + frame = self.output_queue.get_nowait() + del frame except queue.Empty: break diff --git a/src/scope/server/tracks.py b/src/scope/server/tracks.py index 90911e152..a769f1344 100644 --- a/src/scope/server/tracks.py +++ b/src/scope/server/tracks.py @@ -184,5 +184,6 @@ async def stop(self): if self.frame_processor is not None: self.frame_processor.stop() + self.frame_processor = None await super().stop()