From 3fe16555fe201729d74600e8ff0b3c60acb8ea23 Mon Sep 17 00:00:00 2001 From: emranemran Date: Thu, 5 Feb 2026 23:39:29 -0800 Subject: [PATCH 1/6] Add pipeline cleanup and GPU memory logging - Add cleanup() to Pipeline base class (gc.collect + empty_cache) - Call cleanup on pipeline unload to free GPU memory - Log GPU memory before/after unload and after load - Clear pinned buffer cache on frame processor stop - Release pipeline reference on pipeline processor stop Co-Authored-By: Claude Opus 4.6 Signed-off-by: emranemran --- src/scope/core/pipelines/interface.py | 15 +++++++++ src/scope/server/frame_processor.py | 3 ++ src/scope/server/pipeline_manager.py | 46 ++++++++++++++++++++++++++ src/scope/server/pipeline_processor.py | 3 ++ 4 files changed, 67 insertions(+) diff --git a/src/scope/core/pipelines/interface.py b/src/scope/core/pipelines/interface.py index 7e56017b6..d40c8fa5a 100644 --- a/src/scope/core/pipelines/interface.py +++ b/src/scope/core/pipelines/interface.py @@ -1,5 +1,6 @@ """Base interface for all pipelines.""" +import gc from abc import ABC, abstractmethod from typing import TYPE_CHECKING @@ -76,3 +77,17 @@ def __call__(self, **kwargs) -> dict: The video tensor is in THWC format and [0, 1] range. """ pass + + def cleanup(self) -> None: + """Release GPU resources. Called before pipeline is unloaded. + + Subclasses may override for custom cleanup behavior. + """ + gc.collect() + try: + import torch + + if torch.cuda.is_available(): + torch.cuda.empty_cache() + except ImportError: + pass diff --git a/src/scope/server/frame_processor.py b/src/scope/server/frame_processor.py index a4f441521..59dd7d155 100644 --- a/src/scope/server/frame_processor.py +++ b/src/scope/server/frame_processor.py @@ -196,6 +196,9 @@ def stop(self, error_message: str = None): # Clear pipeline processors self.pipeline_processors.clear() + # Clear pinned memory buffers + self._pinned_buffer_cache.clear() + # Clean up Spout sender self.spout_sender_enabled = False if self.spout_sender_thread and self.spout_sender_thread.is_alive(): diff --git a/src/scope/server/pipeline_manager.py b/src/scope/server/pipeline_manager.py index 2fbcbfbc6..c323238bf 100644 --- a/src/scope/server/pipeline_manager.py +++ b/src/scope/server/pipeline_manager.py @@ -369,10 +369,34 @@ def _load_pipelines_sync( if loaded_id not in pipeline_ids or current_params != new_params: pipelines_to_unload.add(loaded_id) + # Log GPU memory before unload + if pipelines_to_unload and torch.cuda.is_available(): + try: + from scope.core.pipelines.memory import get_cuda_free_memory_gb + + device = torch.device("cuda") + logger.info( + f"GPU memory free before unload: {get_cuda_free_memory_gb(device):.2f} GiB" + ) + except Exception as e: + logger.warning(f"Failed to log GPU memory before unload: {e}") + # Unload pipelines that need to be unloaded for pipeline_id_to_unload in pipelines_to_unload: self._unload_pipeline_by_id_unsafe(pipeline_id_to_unload) + # Log GPU memory after unload + if pipelines_to_unload and torch.cuda.is_available(): + try: + from scope.core.pipelines.memory import get_cuda_free_memory_gb + + device = torch.device("cuda") + logger.info( + f"GPU memory free after unload: {get_cuda_free_memory_gb(device):.2f} GiB" + ) + except Exception as e: + logger.warning(f"Failed to log GPU memory after unload: {e}") + # Clean up stale status entries for pipelines not in the new request list # This handles cases where a previous pipeline failed to load (status=ERROR/NOT_LOADED) # but its status entry was never cleaned up because it wasn't in _pipelines @@ -400,6 +424,18 @@ def _load_pipelines_sync( else: logger.error("Some pipelines failed to load") + # Log GPU memory after load + if torch.cuda.is_available(): + try: + from scope.core.pipelines.memory import get_cuda_free_memory_gb + + device = torch.device("cuda") + logger.info( + f"GPU memory free after load: {get_cuda_free_memory_gb(device):.2f} GiB" + ) + except Exception as e: + logger.warning(f"Failed to log GPU memory after load: {e}") + return success def _get_vace_checkpoint_path(self) -> str: @@ -512,6 +548,16 @@ def _unload_pipeline_by_id_unsafe(self, pipeline_id: str): logger.info(f"Unloading pipeline: {pipeline_id}") + # Call cleanup to explicitly free GPU resources before removing references + pipeline_obj = self._pipelines.get(pipeline_id) + if pipeline_obj is None and self._pipeline_id == pipeline_id: + pipeline_obj = self._pipeline + if pipeline_obj is not None and hasattr(pipeline_obj, "cleanup"): + try: + pipeline_obj.cleanup() + except Exception as e: + logger.warning(f"Pipeline cleanup failed for {pipeline_id}: {e}") + # Remove from tracked pipelines if pipeline_id in self._pipelines: del self._pipelines[pipeline_id] diff --git a/src/scope/server/pipeline_processor.py b/src/scope/server/pipeline_processor.py index 13c74bc31..de87dc5f4 100644 --- a/src/scope/server/pipeline_processor.py +++ b/src/scope/server/pipeline_processor.py @@ -184,6 +184,9 @@ def stop(self): except queue.Empty: break + # Release pipeline reference to allow GC of GPU resources + self.pipeline = None + logger.info(f"PipelineProcessor stopped for pipeline: {self.pipeline_id}") def update_parameters(self, parameters: dict[str, Any]): From a012e96ad25ed6cb19a468a540f72d17879ecfd9 Mon Sep 17 00:00:00 2001 From: emranemran Date: Fri, 6 Feb 2026 00:47:50 -0800 Subject: [PATCH 2/6] Release pipeline references before cleanup during switches Add unload callback mechanism so PipelineManager can notify FrameProcessors to release pipeline references before calling cleanup(). This allows gc.collect() + empty_cache() to actually free GPU memory during pipeline switches, not just on session end. Co-Authored-By: Claude Opus 4.6 Signed-off-by: emranemran --- src/scope/server/frame_processor.py | 18 ++++++++++++++++++ src/scope/server/pipeline_manager.py | 22 ++++++++++++++++++++++ src/scope/server/pipeline_processor.py | 9 +++++++++ 3 files changed, 49 insertions(+) diff --git a/src/scope/server/frame_processor.py b/src/scope/server/frame_processor.py index 59dd7d155..ce88d40a7 100644 --- a/src/scope/server/frame_processor.py +++ b/src/scope/server/frame_processor.py @@ -170,6 +170,12 @@ def start(self): self.running = False return + # Register for pipeline unload notifications to release references + if self.pipeline_manager: + self.pipeline_manager.register_unload_callback( + self._on_pipeline_unloading + ) + logger.info( f"[FRAME-PROCESSOR] Started with {len(self.pipeline_ids)} pipeline(s): {self.pipeline_ids}" ) @@ -189,6 +195,12 @@ def stop(self, error_message: str = None): self.running = False + # Unregister pipeline unload callback + if self.pipeline_manager: + self.pipeline_manager.unregister_unload_callback( + self._on_pipeline_unloading + ) + # Stop all pipeline processors for processor in self.pipeline_processors: processor.stop() @@ -489,6 +501,12 @@ def _get_pipeline_dimensions(self) -> tuple[int, int]: logger.warning(f"Could not get pipeline dimensions: {e}") return 512, 512 + def _on_pipeline_unloading(self, pipeline_id: str): + """Release pipeline reference when a pipeline is being unloaded.""" + for processor in self.pipeline_processors: + if processor.pipeline_id == pipeline_id: + processor.release_pipeline() + def update_parameters(self, parameters: dict[str, Any]): """Update parameters that will be used in the next pipeline call.""" # Handle Spout output settings diff --git a/src/scope/server/pipeline_manager.py b/src/scope/server/pipeline_manager.py index c323238bf..f2cd449a6 100644 --- a/src/scope/server/pipeline_manager.py +++ b/src/scope/server/pipeline_manager.py @@ -5,6 +5,7 @@ import logging import threading from enum import Enum +from collections.abc import Callable from typing import Any import torch @@ -51,6 +52,20 @@ def __init__(self): self._pipeline_statuses: dict[str, PipelineStatus] = {} # pipeline_id -> status self._pipeline_load_params: dict[str, dict] = {} # pipeline_id -> load_params + # Callbacks invoked before a pipeline is unloaded (to release external references) + self._unload_callbacks: list[Callable[[str], None]] = [] + + def register_unload_callback(self, callback: Callable[[str], None]): + """Register a callback to be called before a pipeline is unloaded.""" + self._unload_callbacks.append(callback) + + def unregister_unload_callback(self, callback: Callable[[str], None]): + """Unregister a pipeline unload callback.""" + try: + self._unload_callbacks.remove(callback) + except ValueError: + pass + @property def status(self) -> PipelineStatus: """Get current pipeline status.""" @@ -548,6 +563,13 @@ def _unload_pipeline_by_id_unsafe(self, pipeline_id: str): logger.info(f"Unloading pipeline: {pipeline_id}") + # Notify listeners to release pipeline references before cleanup + for callback in self._unload_callbacks: + try: + callback(pipeline_id) + except Exception as e: + logger.warning(f"Unload callback failed for {pipeline_id}: {e}") + # Call cleanup to explicitly free GPU resources before removing references pipeline_obj = self._pipelines.get(pipeline_id) if pipeline_obj is None and self._pipeline_id == pipeline_id: diff --git a/src/scope/server/pipeline_processor.py b/src/scope/server/pipeline_processor.py index de87dc5f4..8d2fbdaed 100644 --- a/src/scope/server/pipeline_processor.py +++ b/src/scope/server/pipeline_processor.py @@ -189,6 +189,11 @@ def stop(self): logger.info(f"PipelineProcessor stopped for pipeline: {self.pipeline_id}") + def release_pipeline(self): + """Release pipeline reference to allow GC of GPU resources.""" + logger.info(f"Releasing pipeline reference for {self.pipeline_id}") + self.pipeline = None + def update_parameters(self, parameters: dict[str, Any]): """Update parameters that will be used in the next pipeline call.""" try: @@ -350,6 +355,10 @@ def process_chunk(self): except queue.Empty: break + # Pipeline may have been released during a switch + if self.pipeline is None: + return + requirements = None if hasattr(self.pipeline, "prepare"): prepare_params = dict(self.parameters.items()) From d785b114b5bca0fca99a9422bffc6225ea2a2692 Mon Sep 17 00:00:00 2001 From: Varshith B Date: Fri, 6 Feb 2026 12:01:24 +0000 Subject: [PATCH 3/6] fix: cleanup Signed-off-by: Varshith B --- src/scope/core/pipelines/interface.py | 28 ++++++++++++++++++++++++--- src/scope/server/frame_processor.py | 1 + 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/src/scope/core/pipelines/interface.py b/src/scope/core/pipelines/interface.py index d40c8fa5a..4e6f25cda 100644 --- a/src/scope/core/pipelines/interface.py +++ b/src/scope/core/pipelines/interface.py @@ -79,10 +79,32 @@ def __call__(self, **kwargs) -> dict: pass def cleanup(self) -> None: - """Release GPU resources. Called before pipeline is unloaded. + """Release GPU resources. Called before pipeline is unloaded.""" + + if hasattr(self, "components"): + try: + components_dict = getattr(self.components, "_components", None) + if components_dict is not None: + for name in list(components_dict.keys()): + del components_dict[name] + del self.components + except Exception as e: + pass + + if hasattr(self, "state"): + try: + if hasattr(self.state, "values"): + self.state.values.clear() + del self.state + except Exception as e: + pass + + if hasattr(self, "blocks"): + try: + del self.blocks + except Exception as e: + pass - Subclasses may override for custom cleanup behavior. - """ gc.collect() try: import torch diff --git a/src/scope/server/frame_processor.py b/src/scope/server/frame_processor.py index ce88d40a7..f5fa4d098 100644 --- a/src/scope/server/frame_processor.py +++ b/src/scope/server/frame_processor.py @@ -505,6 +505,7 @@ def _on_pipeline_unloading(self, pipeline_id: str): """Release pipeline reference when a pipeline is being unloaded.""" for processor in self.pipeline_processors: if processor.pipeline_id == pipeline_id: + processor.stop() processor.release_pipeline() def update_parameters(self, parameters: dict[str, Any]): From b541c5dce69c115786ec2e5c80a222efdc9e6c18 Mon Sep 17 00:00:00 2001 From: Varshith B Date: Fri, 6 Feb 2026 17:32:28 +0000 Subject: [PATCH 4/6] feat: multilevel cleanup Signed-off-by: Varshith B --- src/scope/core/pipelines/interface.py | 64 +++++++++++++++++++++++---- 1 file changed, 55 insertions(+), 9 deletions(-) diff --git a/src/scope/core/pipelines/interface.py b/src/scope/core/pipelines/interface.py index 4e6f25cda..c112dd728 100644 --- a/src/scope/core/pipelines/interface.py +++ b/src/scope/core/pipelines/interface.py @@ -80,6 +80,7 @@ def __call__(self, **kwargs) -> dict: def cleanup(self) -> None: """Release GPU resources. Called before pipeline is unloaded.""" + import torch if hasattr(self, "components"): try: @@ -88,7 +89,7 @@ def cleanup(self) -> None: for name in list(components_dict.keys()): del components_dict[name] del self.components - except Exception as e: + except Exception: pass if hasattr(self, "state"): @@ -96,20 +97,65 @@ def cleanup(self) -> None: if hasattr(self.state, "values"): self.state.values.clear() del self.state - except Exception as e: + except Exception: pass if hasattr(self, "blocks"): try: del self.blocks - except Exception as e: + except Exception: pass + self._cleanup_gpu_objects(self, depth=0, max_depth=2, visited=set()) + gc.collect() - try: - import torch + if torch.cuda.is_available(): + torch.cuda.empty_cache() + + def _cleanup_gpu_objects( + self, obj: object, depth: int, max_depth: int, visited: set + ) -> None: + """Recursively find and delete GPU objects (nn.Module, Tensor) from __dict__. + + Args: + obj: Object to inspect + depth: Current recursion depth + max_depth: Maximum recursion depth (2 levels catches wrapper patterns) + visited: Set of object ids already visited (prevents circular refs) + """ + import torch + + obj_id = id(obj) + if obj_id in visited: + return + visited.add(obj_id) + + if depth >= max_depth: + return + + if not hasattr(obj, "__dict__"): + return + + attrs_to_delete = [] + objects_to_recurse = [] + + for attr_name, attr_value in list(obj.__dict__.items()): + if isinstance(attr_value, (torch.nn.Module, torch.Tensor)): + attrs_to_delete.append(attr_name) + elif ( + hasattr(attr_value, "__dict__") + and not isinstance(attr_value, type) + and not isinstance( + attr_value, (str, bytes, int, float, bool, list, dict, tuple) + ) + ): + objects_to_recurse.append(attr_value) + + for attr_name in attrs_to_delete: + try: + delattr(obj, attr_name) + except Exception: + pass - if torch.cuda.is_available(): - torch.cuda.empty_cache() - except ImportError: - pass + for nested_obj in objects_to_recurse: + self._cleanup_gpu_objects(nested_obj, depth + 1, max_depth, visited) From 5340825a7f5449610c0dc6d94735e0a44beb404e Mon Sep 17 00:00:00 2001 From: Varshith B Date: Fri, 6 Feb 2026 19:18:52 +0000 Subject: [PATCH 5/6] fix: cleanup logic Signed-off-by: Varshith B --- src/scope/core/pipelines/interface.py | 56 +-------------------------- 1 file changed, 2 insertions(+), 54 deletions(-) diff --git a/src/scope/core/pipelines/interface.py b/src/scope/core/pipelines/interface.py index c112dd728..c52158897 100644 --- a/src/scope/core/pipelines/interface.py +++ b/src/scope/core/pipelines/interface.py @@ -88,7 +88,6 @@ def cleanup(self) -> None: if components_dict is not None: for name in list(components_dict.keys()): del components_dict[name] - del self.components except Exception: pass @@ -96,66 +95,15 @@ def cleanup(self) -> None: try: if hasattr(self.state, "values"): self.state.values.clear() - del self.state except Exception: pass - if hasattr(self, "blocks"): + for attr_name in list(self.__dict__.keys()): try: - del self.blocks + delattr(self, attr_name) except Exception: pass - self._cleanup_gpu_objects(self, depth=0, max_depth=2, visited=set()) - gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() - - def _cleanup_gpu_objects( - self, obj: object, depth: int, max_depth: int, visited: set - ) -> None: - """Recursively find and delete GPU objects (nn.Module, Tensor) from __dict__. - - Args: - obj: Object to inspect - depth: Current recursion depth - max_depth: Maximum recursion depth (2 levels catches wrapper patterns) - visited: Set of object ids already visited (prevents circular refs) - """ - import torch - - obj_id = id(obj) - if obj_id in visited: - return - visited.add(obj_id) - - if depth >= max_depth: - return - - if not hasattr(obj, "__dict__"): - return - - attrs_to_delete = [] - objects_to_recurse = [] - - for attr_name, attr_value in list(obj.__dict__.items()): - if isinstance(attr_value, (torch.nn.Module, torch.Tensor)): - attrs_to_delete.append(attr_name) - elif ( - hasattr(attr_value, "__dict__") - and not isinstance(attr_value, type) - and not isinstance( - attr_value, (str, bytes, int, float, bool, list, dict, tuple) - ) - ): - objects_to_recurse.append(attr_value) - - for attr_name in attrs_to_delete: - try: - delattr(obj, attr_name) - except Exception: - pass - - for nested_obj in objects_to_recurse: - self._cleanup_gpu_objects(nested_obj, depth + 1, max_depth, visited) From 15bbef13f5818d30a5692f02e7d9c0ebd2a52e2d Mon Sep 17 00:00:00 2001 From: Varshith Bathini Date: Mon, 9 Feb 2026 17:16:00 +0530 Subject: [PATCH 6/6] docker update --- src/scope/cloud/fal_app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/scope/cloud/fal_app.py b/src/scope/cloud/fal_app.py index 659262eb9..18af20a6d 100644 --- a/src/scope/cloud/fal_app.py +++ b/src/scope/cloud/fal_app.py @@ -169,7 +169,7 @@ def cleanup_session_data(): # fal deploy fal_app.py --auth public # Configuration -DOCKER_IMAGE = "daydreamlive/scope:6334b27" +DOCKER_IMAGE = "daydreamlive/scope:5340825" # Create a Dockerfile that uses your existing image as base dockerfile_str = f"""