Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/scope/cloud/fal_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def cleanup_session_data():
# fal deploy fal_app.py --auth public

# Configuration
DOCKER_IMAGE = "daydreamlive/scope:6334b27"
DOCKER_IMAGE = "daydreamlive/scope:5340825"

# Create a Dockerfile that uses your existing image as base
dockerfile_str = f"""
Expand Down
31 changes: 31 additions & 0 deletions src/scope/core/pipelines/interface.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Base interface for all pipelines."""

import gc
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING

Expand Down Expand Up @@ -76,3 +77,33 @@ def __call__(self, **kwargs) -> dict:
The video tensor is in THWC format and [0, 1] range.
"""
pass

def cleanup(self) -> None:
"""Release GPU resources. Called before pipeline is unloaded."""
import torch

if hasattr(self, "components"):
try:
components_dict = getattr(self.components, "_components", None)
if components_dict is not None:
for name in list(components_dict.keys()):
del components_dict[name]
except Exception:
pass

if hasattr(self, "state"):
try:
if hasattr(self.state, "values"):
self.state.values.clear()
except Exception:
pass

for attr_name in list(self.__dict__.keys()):
try:
delattr(self, attr_name)
except Exception:
pass

gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
22 changes: 22 additions & 0 deletions src/scope/server/frame_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@ def start(self):
self.running = False
return

# Register for pipeline unload notifications to release references
if self.pipeline_manager:
self.pipeline_manager.register_unload_callback(
self._on_pipeline_unloading
)

logger.info(
f"[FRAME-PROCESSOR] Started with {len(self.pipeline_ids)} pipeline(s): {self.pipeline_ids}"
)
Expand All @@ -189,13 +195,22 @@ def stop(self, error_message: str = None):

self.running = False

# Unregister pipeline unload callback
if self.pipeline_manager:
self.pipeline_manager.unregister_unload_callback(
self._on_pipeline_unloading
)

# Stop all pipeline processors
for processor in self.pipeline_processors:
processor.stop()

# Clear pipeline processors
self.pipeline_processors.clear()

# Clear pinned memory buffers
self._pinned_buffer_cache.clear()

# Clean up Spout sender
self.spout_sender_enabled = False
if self.spout_sender_thread and self.spout_sender_thread.is_alive():
Expand Down Expand Up @@ -486,6 +501,13 @@ def _get_pipeline_dimensions(self) -> tuple[int, int]:
logger.warning(f"Could not get pipeline dimensions: {e}")
return 512, 512

def _on_pipeline_unloading(self, pipeline_id: str):
"""Release pipeline reference when a pipeline is being unloaded."""
for processor in self.pipeline_processors:
if processor.pipeline_id == pipeline_id:
processor.stop()
processor.release_pipeline()

def update_parameters(self, parameters: dict[str, Any]):
"""Update parameters that will be used in the next pipeline call."""
# Handle Spout output settings
Expand Down
68 changes: 68 additions & 0 deletions src/scope/server/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import threading
from enum import Enum
from collections.abc import Callable
from typing import Any

import torch
Expand Down Expand Up @@ -51,6 +52,20 @@ def __init__(self):
self._pipeline_statuses: dict[str, PipelineStatus] = {} # pipeline_id -> status
self._pipeline_load_params: dict[str, dict] = {} # pipeline_id -> load_params

# Callbacks invoked before a pipeline is unloaded (to release external references)
self._unload_callbacks: list[Callable[[str], None]] = []

def register_unload_callback(self, callback: Callable[[str], None]):
"""Register a callback to be called before a pipeline is unloaded."""
self._unload_callbacks.append(callback)

def unregister_unload_callback(self, callback: Callable[[str], None]):
"""Unregister a pipeline unload callback."""
try:
self._unload_callbacks.remove(callback)
except ValueError:
pass

@property
def status(self) -> PipelineStatus:
"""Get current pipeline status."""
Expand Down Expand Up @@ -369,10 +384,34 @@ def _load_pipelines_sync(
if loaded_id not in pipeline_ids or current_params != new_params:
pipelines_to_unload.add(loaded_id)

# Log GPU memory before unload
if pipelines_to_unload and torch.cuda.is_available():
try:
from scope.core.pipelines.memory import get_cuda_free_memory_gb

device = torch.device("cuda")
logger.info(
f"GPU memory free before unload: {get_cuda_free_memory_gb(device):.2f} GiB"
)
except Exception as e:
logger.warning(f"Failed to log GPU memory before unload: {e}")

# Unload pipelines that need to be unloaded
for pipeline_id_to_unload in pipelines_to_unload:
self._unload_pipeline_by_id_unsafe(pipeline_id_to_unload)

# Log GPU memory after unload
if pipelines_to_unload and torch.cuda.is_available():
try:
from scope.core.pipelines.memory import get_cuda_free_memory_gb

device = torch.device("cuda")
logger.info(
f"GPU memory free after unload: {get_cuda_free_memory_gb(device):.2f} GiB"
)
except Exception as e:
logger.warning(f"Failed to log GPU memory after unload: {e}")

# Clean up stale status entries for pipelines not in the new request list
# This handles cases where a previous pipeline failed to load (status=ERROR/NOT_LOADED)
# but its status entry was never cleaned up because it wasn't in _pipelines
Expand Down Expand Up @@ -400,6 +439,18 @@ def _load_pipelines_sync(
else:
logger.error("Some pipelines failed to load")

# Log GPU memory after load
if torch.cuda.is_available():
try:
from scope.core.pipelines.memory import get_cuda_free_memory_gb

device = torch.device("cuda")
logger.info(
f"GPU memory free after load: {get_cuda_free_memory_gb(device):.2f} GiB"
)
except Exception as e:
logger.warning(f"Failed to log GPU memory after load: {e}")

return success

def _get_vace_checkpoint_path(self) -> str:
Expand Down Expand Up @@ -512,6 +563,23 @@ def _unload_pipeline_by_id_unsafe(self, pipeline_id: str):

logger.info(f"Unloading pipeline: {pipeline_id}")

# Notify listeners to release pipeline references before cleanup
for callback in self._unload_callbacks:
try:
callback(pipeline_id)
except Exception as e:
logger.warning(f"Unload callback failed for {pipeline_id}: {e}")

# Call cleanup to explicitly free GPU resources before removing references
pipeline_obj = self._pipelines.get(pipeline_id)
if pipeline_obj is None and self._pipeline_id == pipeline_id:
pipeline_obj = self._pipeline
if pipeline_obj is not None and hasattr(pipeline_obj, "cleanup"):
try:
pipeline_obj.cleanup()
except Exception as e:
logger.warning(f"Pipeline cleanup failed for {pipeline_id}: {e}")

# Remove from tracked pipelines
if pipeline_id in self._pipelines:
del self._pipelines[pipeline_id]
Expand Down
12 changes: 12 additions & 0 deletions src/scope/server/pipeline_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,16 @@ def stop(self):
except queue.Empty:
break

# Release pipeline reference to allow GC of GPU resources
self.pipeline = None

logger.info(f"PipelineProcessor stopped for pipeline: {self.pipeline_id}")

def release_pipeline(self):
"""Release pipeline reference to allow GC of GPU resources."""
logger.info(f"Releasing pipeline reference for {self.pipeline_id}")
self.pipeline = None

def update_parameters(self, parameters: dict[str, Any]):
"""Update parameters that will be used in the next pipeline call."""
try:
Expand Down Expand Up @@ -347,6 +355,10 @@ def process_chunk(self):
except queue.Empty:
break

# Pipeline may have been released during a switch
if self.pipeline is None:
return

requirements = None
if hasattr(self.pipeline, "prepare"):
prepare_params = dict(self.parameters.items())
Expand Down