diff --git a/src/scope/cloud/fal_app.py b/src/scope/cloud/fal_app.py index 659262eb9..18af20a6d 100644 --- a/src/scope/cloud/fal_app.py +++ b/src/scope/cloud/fal_app.py @@ -169,7 +169,7 @@ def cleanup_session_data(): # fal deploy fal_app.py --auth public # Configuration -DOCKER_IMAGE = "daydreamlive/scope:6334b27" +DOCKER_IMAGE = "daydreamlive/scope:5340825" # Create a Dockerfile that uses your existing image as base dockerfile_str = f""" diff --git a/src/scope/core/pipelines/interface.py b/src/scope/core/pipelines/interface.py index 7e56017b6..c52158897 100644 --- a/src/scope/core/pipelines/interface.py +++ b/src/scope/core/pipelines/interface.py @@ -1,5 +1,6 @@ """Base interface for all pipelines.""" +import gc from abc import ABC, abstractmethod from typing import TYPE_CHECKING @@ -76,3 +77,33 @@ def __call__(self, **kwargs) -> dict: The video tensor is in THWC format and [0, 1] range. """ pass + + def cleanup(self) -> None: + """Release GPU resources. Called before pipeline is unloaded.""" + import torch + + if hasattr(self, "components"): + try: + components_dict = getattr(self.components, "_components", None) + if components_dict is not None: + for name in list(components_dict.keys()): + del components_dict[name] + except Exception: + pass + + if hasattr(self, "state"): + try: + if hasattr(self.state, "values"): + self.state.values.clear() + except Exception: + pass + + for attr_name in list(self.__dict__.keys()): + try: + delattr(self, attr_name) + except Exception: + pass + + gc.collect() + if torch.cuda.is_available(): + torch.cuda.empty_cache() diff --git a/src/scope/server/frame_processor.py b/src/scope/server/frame_processor.py index a4f441521..f5fa4d098 100644 --- a/src/scope/server/frame_processor.py +++ b/src/scope/server/frame_processor.py @@ -170,6 +170,12 @@ def start(self): self.running = False return + # Register for pipeline unload notifications to release references + if self.pipeline_manager: + self.pipeline_manager.register_unload_callback( + self._on_pipeline_unloading + ) + logger.info( f"[FRAME-PROCESSOR] Started with {len(self.pipeline_ids)} pipeline(s): {self.pipeline_ids}" ) @@ -189,6 +195,12 @@ def stop(self, error_message: str = None): self.running = False + # Unregister pipeline unload callback + if self.pipeline_manager: + self.pipeline_manager.unregister_unload_callback( + self._on_pipeline_unloading + ) + # Stop all pipeline processors for processor in self.pipeline_processors: processor.stop() @@ -196,6 +208,9 @@ def stop(self, error_message: str = None): # Clear pipeline processors self.pipeline_processors.clear() + # Clear pinned memory buffers + self._pinned_buffer_cache.clear() + # Clean up Spout sender self.spout_sender_enabled = False if self.spout_sender_thread and self.spout_sender_thread.is_alive(): @@ -486,6 +501,13 @@ def _get_pipeline_dimensions(self) -> tuple[int, int]: logger.warning(f"Could not get pipeline dimensions: {e}") return 512, 512 + def _on_pipeline_unloading(self, pipeline_id: str): + """Release pipeline reference when a pipeline is being unloaded.""" + for processor in self.pipeline_processors: + if processor.pipeline_id == pipeline_id: + processor.stop() + processor.release_pipeline() + def update_parameters(self, parameters: dict[str, Any]): """Update parameters that will be used in the next pipeline call.""" # Handle Spout output settings diff --git a/src/scope/server/pipeline_manager.py b/src/scope/server/pipeline_manager.py index 2fbcbfbc6..f2cd449a6 100644 --- a/src/scope/server/pipeline_manager.py +++ b/src/scope/server/pipeline_manager.py @@ -5,6 +5,7 @@ import logging import threading from enum import Enum +from collections.abc import Callable from typing import Any import torch @@ -51,6 +52,20 @@ def __init__(self): self._pipeline_statuses: dict[str, PipelineStatus] = {} # pipeline_id -> status self._pipeline_load_params: dict[str, dict] = {} # pipeline_id -> load_params + # Callbacks invoked before a pipeline is unloaded (to release external references) + self._unload_callbacks: list[Callable[[str], None]] = [] + + def register_unload_callback(self, callback: Callable[[str], None]): + """Register a callback to be called before a pipeline is unloaded.""" + self._unload_callbacks.append(callback) + + def unregister_unload_callback(self, callback: Callable[[str], None]): + """Unregister a pipeline unload callback.""" + try: + self._unload_callbacks.remove(callback) + except ValueError: + pass + @property def status(self) -> PipelineStatus: """Get current pipeline status.""" @@ -369,10 +384,34 @@ def _load_pipelines_sync( if loaded_id not in pipeline_ids or current_params != new_params: pipelines_to_unload.add(loaded_id) + # Log GPU memory before unload + if pipelines_to_unload and torch.cuda.is_available(): + try: + from scope.core.pipelines.memory import get_cuda_free_memory_gb + + device = torch.device("cuda") + logger.info( + f"GPU memory free before unload: {get_cuda_free_memory_gb(device):.2f} GiB" + ) + except Exception as e: + logger.warning(f"Failed to log GPU memory before unload: {e}") + # Unload pipelines that need to be unloaded for pipeline_id_to_unload in pipelines_to_unload: self._unload_pipeline_by_id_unsafe(pipeline_id_to_unload) + # Log GPU memory after unload + if pipelines_to_unload and torch.cuda.is_available(): + try: + from scope.core.pipelines.memory import get_cuda_free_memory_gb + + device = torch.device("cuda") + logger.info( + f"GPU memory free after unload: {get_cuda_free_memory_gb(device):.2f} GiB" + ) + except Exception as e: + logger.warning(f"Failed to log GPU memory after unload: {e}") + # Clean up stale status entries for pipelines not in the new request list # This handles cases where a previous pipeline failed to load (status=ERROR/NOT_LOADED) # but its status entry was never cleaned up because it wasn't in _pipelines @@ -400,6 +439,18 @@ def _load_pipelines_sync( else: logger.error("Some pipelines failed to load") + # Log GPU memory after load + if torch.cuda.is_available(): + try: + from scope.core.pipelines.memory import get_cuda_free_memory_gb + + device = torch.device("cuda") + logger.info( + f"GPU memory free after load: {get_cuda_free_memory_gb(device):.2f} GiB" + ) + except Exception as e: + logger.warning(f"Failed to log GPU memory after load: {e}") + return success def _get_vace_checkpoint_path(self) -> str: @@ -512,6 +563,23 @@ def _unload_pipeline_by_id_unsafe(self, pipeline_id: str): logger.info(f"Unloading pipeline: {pipeline_id}") + # Notify listeners to release pipeline references before cleanup + for callback in self._unload_callbacks: + try: + callback(pipeline_id) + except Exception as e: + logger.warning(f"Unload callback failed for {pipeline_id}: {e}") + + # Call cleanup to explicitly free GPU resources before removing references + pipeline_obj = self._pipelines.get(pipeline_id) + if pipeline_obj is None and self._pipeline_id == pipeline_id: + pipeline_obj = self._pipeline + if pipeline_obj is not None and hasattr(pipeline_obj, "cleanup"): + try: + pipeline_obj.cleanup() + except Exception as e: + logger.warning(f"Pipeline cleanup failed for {pipeline_id}: {e}") + # Remove from tracked pipelines if pipeline_id in self._pipelines: del self._pipelines[pipeline_id] diff --git a/src/scope/server/pipeline_processor.py b/src/scope/server/pipeline_processor.py index 13c74bc31..8d2fbdaed 100644 --- a/src/scope/server/pipeline_processor.py +++ b/src/scope/server/pipeline_processor.py @@ -184,8 +184,16 @@ def stop(self): except queue.Empty: break + # Release pipeline reference to allow GC of GPU resources + self.pipeline = None + logger.info(f"PipelineProcessor stopped for pipeline: {self.pipeline_id}") + def release_pipeline(self): + """Release pipeline reference to allow GC of GPU resources.""" + logger.info(f"Releasing pipeline reference for {self.pipeline_id}") + self.pipeline = None + def update_parameters(self, parameters: dict[str, Any]): """Update parameters that will be used in the next pipeline call.""" try: @@ -347,6 +355,10 @@ def process_chunk(self): except queue.Empty: break + # Pipeline may have been released during a switch + if self.pipeline is None: + return + requirements = None if hasattr(self.pipeline, "prepare"): prepare_params = dict(self.parameters.items())