diff --git a/pyproject.toml b/pyproject.toml index f233508f2..4e2e242ed 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -117,6 +117,11 @@ dev = [ "pytest>=8.4.2", "freezegun>=1.5.5", ] +tensorrt = [ + "tensorrt>=10.0", + "polygraphy>=0.49", + "onnxscript>=0.1.0", +] [tool.pytest.ini_options] testpaths = ["tests"] diff --git a/src/scope/core/pipelines/__init__.py b/src/scope/core/pipelines/__init__.py index 3a307da15..bfc9d42c4 100644 --- a/src/scope/core/pipelines/__init__.py +++ b/src/scope/core/pipelines/__init__.py @@ -40,6 +40,10 @@ def __getattr__(name): from .rife.pipeline import RIFEPipeline return RIFEPipeline + elif name == "OpticalFlowPipeline": + from .optical_flow.pipeline import OpticalFlowPipeline + + return OpticalFlowPipeline # Config classes elif name == "BasePipelineConfig": from .base_schema import BasePipelineConfig @@ -91,6 +95,7 @@ def __getattr__(name): "VideoDepthAnythingPipeline", "ControllerVisualizerPipeline", "RIFEPipeline", + "OpticalFlowPipeline", # Config classes "BasePipelineConfig", "LongLiveConfig", diff --git a/src/scope/core/pipelines/optical_flow/__init__.py b/src/scope/core/pipelines/optical_flow/__init__.py new file mode 100644 index 000000000..7a0e91901 --- /dev/null +++ b/src/scope/core/pipelines/optical_flow/__init__.py @@ -0,0 +1,5 @@ +"""Optical Flow pipeline for VACE conditioning.""" + +from .pipeline import OpticalFlowPipeline + +__all__ = ["OpticalFlowPipeline"] diff --git a/src/scope/core/pipelines/optical_flow/download.py b/src/scope/core/pipelines/optical_flow/download.py new file mode 100644 index 000000000..444d55426 --- /dev/null +++ b/src/scope/core/pipelines/optical_flow/download.py @@ -0,0 +1,60 @@ +"""Model directory utilities for optical flow pipeline.""" + +import logging +from pathlib import Path + +from scope.core.config import get_model_file_path + +logger = logging.getLogger(__name__) + + +def get_models_dir() -> Path: + """Get the models directory for optical flow models. + + Returns: + Path to the models directory (created if needed) + """ + flow_dir = get_model_file_path("optical-flow") + flow_dir.mkdir(parents=True, exist_ok=True) + return flow_dir + + +def get_onnx_path( + models_dir: Path, height: int, width: int, model_name: str = "raft_small" +) -> Path: + """Get the path for a resolution-specific ONNX model. + + Args: + models_dir: Base models directory + height: Model input height + width: Model input width + model_name: Model name ("raft_small" or "raft_large") + + Returns: + Path where the ONNX model should be stored + """ + onnx_name = f"{model_name}_{height}x{width}.onnx" + return models_dir / onnx_name + + +def get_engine_path( + models_dir: Path, + height: int, + width: int, + gpu_name: str, + model_name: str = "raft_small", +) -> Path: + """Get the path for a GPU-specific TensorRT engine. + + Args: + models_dir: Base models directory + height: Model input height + width: Model input width + gpu_name: Sanitized GPU name for engine file naming + model_name: Model name ("raft_small" or "raft_large") + + Returns: + Path where the TRT engine should be stored + """ + engine_name = f"{model_name}_{height}x{width}_{gpu_name}.trt" + return models_dir / engine_name diff --git a/src/scope/core/pipelines/optical_flow/engine.py b/src/scope/core/pipelines/optical_flow/engine.py new file mode 100644 index 000000000..e1ded4406 --- /dev/null +++ b/src/scope/core/pipelines/optical_flow/engine.py @@ -0,0 +1,470 @@ +"""TensorRT engine wrapper and RAFT ONNX compilation utilities. + +Ported from StreamDiffusion's temporal_net_tensorrt.py and compile_raft_tensorrt.py +""" + +import logging +from collections import OrderedDict +from pathlib import Path + +import numpy as np +import torch + +logger = logging.getLogger(__name__) + +# Default resolution for optical flow computation +DEFAULT_HEIGHT = 512 +DEFAULT_WIDTH = 512 + +# Map of numpy dtype -> torch dtype +numpy_to_torch_dtype_dict = { + np.uint8: torch.uint8, + np.int8: torch.int8, + np.int16: torch.int16, + np.int32: torch.int32, + np.int64: torch.int64, + np.float16: torch.float16, + np.float32: torch.float32, + np.float64: torch.float64, + np.complex64: torch.complex64, + np.complex128: torch.complex128, +} +if np.version.full_version >= "1.24.0": + numpy_to_torch_dtype_dict[np.bool_] = torch.bool +else: + numpy_to_torch_dtype_dict[np.bool] = torch.bool + + +def get_gpu_name() -> str: + """Get a sanitized GPU name for engine file naming.""" + if not torch.cuda.is_available(): + return "cpu" + name = torch.cuda.get_device_name(0) + # Sanitize for use in filenames + return name.lower().replace(" ", "_").replace("-", "_") + + +def load_raft_model(use_large_model: bool, device: str = "cuda"): + """Load RAFT model from torchvision. + + Args: + use_large_model: If True, load RAFT Large. Otherwise load RAFT Small. + device: Device to load model on. + + Returns: + Tuple of (model, weights) where weights can be used for transforms. + """ + from torchvision.models.optical_flow import ( + Raft_Large_Weights, + Raft_Small_Weights, + raft_large, + raft_small, + ) + + if use_large_model: + weights = Raft_Large_Weights.DEFAULT + model = raft_large(weights=weights, progress=True) + else: + weights = Raft_Small_Weights.DEFAULT + model = raft_small(weights=weights, progress=True) + + model = model.to(device=device) + model.eval() + return model, weights + + +def apply_raft_transforms( + weights, frame1: torch.Tensor, frame2: torch.Tensor +) -> tuple[torch.Tensor, torch.Tensor]: + """Apply RAFT preprocessing transforms to frame pair. + + Args: + weights: RAFT weights object (Raft_Small_Weights or Raft_Large_Weights) + frame1: First frame tensor (BCHW format) + frame2: Second frame tensor (BCHW format) + + Returns: + Tuple of transformed (frame1, frame2) + """ + if hasattr(weights, "transforms") and weights.transforms is not None: + transforms = weights.transforms() + return transforms(frame1, frame2) + return frame1, frame2 + + +class _RAFTWrapper(torch.nn.Module): + """Wrapper to make RAFT return only the final flow prediction. + + RAFT returns a list of flow predictions (one per iteration). + TensorRT doesn't support SequenceConstruct, so we wrap the model + to return only the final (most refined) prediction. + """ + + def __init__(self, raft_model: torch.nn.Module): + super().__init__() + self.raft = raft_model + + def forward(self, frame1: torch.Tensor, frame2: torch.Tensor) -> torch.Tensor: + # RAFT returns list of predictions, take the last one + flow_predictions = self.raft(frame1, frame2) + return flow_predictions[-1] + + +def export_raft_to_onnx( + onnx_path: Path, + height: int = DEFAULT_HEIGHT, + width: int = DEFAULT_WIDTH, + device: str = "cuda", + use_large_model: bool = False, +) -> bool: + """Export RAFT model from torchvision to ONNX format. + + Ported from StreamDiffusion's compile_raft_tensorrt.py + + Args: + onnx_path: Path to save the ONNX model + height: Input height for the model + width: Input width for the model + device: Device to use for export + use_large_model: If True, export RAFT Large instead of RAFT Small + + Returns: + True if successful, False otherwise + """ + try: + import torchvision.models.optical_flow # noqa: F401 + except ImportError: + logger.error("torchvision is required but not installed") + return False + + model_size = "Large" if use_large_model else "Small" + logger.info(f"Exporting RAFT {model_size} model to ONNX: {onnx_path}") + logger.info(f"Resolution: {height}x{width}") + + try: + # Load RAFT model using helper + logger.info(f"Loading RAFT {model_size} model from torchvision...") + raft_model, weights = load_raft_model(use_large_model, device) + + # Wrap model to return only final prediction (avoids SequenceConstruct) + wrapped_model = _RAFTWrapper(raft_model) + wrapped_model.eval() + + # Create dummy inputs at target resolution + dummy_frame1 = torch.randn(1, 3, height, width).to(device) + dummy_frame2 = torch.randn(1, 3, height, width).to(device) + + # Apply RAFT preprocessing transforms + dummy_frame1, dummy_frame2 = apply_raft_transforms( + weights, dummy_frame1, dummy_frame2 + ) + + # Dynamic axes for batch, height, width + dynamic_axes = { + "frame1": {0: "batch_size", 2: "height", 3: "width"}, + "frame2": {0: "batch_size", 2: "height", 3: "width"}, + "flow": {0: "batch_size", 2: "height", 3: "width"}, + } + + logger.info("Exporting to ONNX...") + onnx_path.parent.mkdir(parents=True, exist_ok=True) + + with torch.no_grad(): + torch.onnx.export( + wrapped_model, + (dummy_frame1, dummy_frame2), + str(onnx_path), + verbose=False, + input_names=["frame1", "frame2"], + output_names=["flow"], + opset_version=17, + export_params=True, + dynamic_axes=dynamic_axes, + dynamo=False, # Use legacy exporter (torch.export doesn't support cudnn_grid_sampler) + ) + + del wrapped_model + del raft_model + torch.cuda.empty_cache() + + logger.info(f"Successfully exported ONNX model to {onnx_path}") + return True + + except Exception as e: + logger.error(f"Failed to export ONNX model: {e}") + import traceback + + traceback.print_exc() + return False + + +def build_tensorrt_engine( + onnx_path: Path, + engine_path: Path, + min_height: int = DEFAULT_HEIGHT, + min_width: int = DEFAULT_WIDTH, + max_height: int = DEFAULT_HEIGHT, + max_width: int = DEFAULT_WIDTH, + fp16: bool = True, + workspace_size_gb: int = 4, +) -> bool: + """Build TensorRT engine from ONNX model with optimization profiles. + + Ported from StreamDiffusion's compile_raft_tensorrt.py + + Args: + onnx_path: Path to the ONNX model + engine_path: Path to save the TensorRT engine + min_height: Minimum input height for optimization + min_width: Minimum input width for optimization + max_height: Maximum input height for optimization + max_width: Maximum input width for optimization + fp16: Enable FP16 precision mode + workspace_size_gb: Maximum workspace size in GB + + Returns: + True if successful, False otherwise + """ + try: + import tensorrt as trt + except ImportError: + logger.error("TensorRT is required but not installed") + return False + + if not onnx_path.exists(): + logger.error(f"ONNX model not found: {onnx_path}") + return False + + logger.info(f"Building TensorRT engine from ONNX model: {onnx_path}") + logger.info(f"Output path: {engine_path}") + logger.info( + f"Resolution range: {min_height}x{min_width} - {max_height}x{max_width}" + ) + logger.info(f"FP16 mode: {fp16}") + logger.info("This may take several minutes...") + + try: + trt_logger = trt.Logger(trt.Logger.INFO) + builder = trt.Builder(trt_logger) + network = builder.create_network( + 1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH) + ) + parser = trt.OnnxParser(network, trt.Logger(trt.Logger.WARNING)) + + logger.info("Parsing ONNX model...") + with open(onnx_path, "rb") as model: + if not parser.parse(model.read()): + logger.error("Failed to parse ONNX model") + for error in range(parser.num_errors): + logger.error(f"Parser error: {parser.get_error(error)}") + return False + + logger.info("Configuring TensorRT builder...") + config = builder.create_builder_config() + config.set_memory_pool_limit( + trt.MemoryPoolType.WORKSPACE, workspace_size_gb * (1 << 30) + ) + + if fp16: + config.set_flag(trt.BuilderFlag.FP16) + logger.info("FP16 mode enabled") + + # Calculate optimal resolution (middle point) + opt_height = (min_height + max_height) // 2 + opt_width = (min_width + max_width) // 2 + + # Create optimization profile for dynamic shapes + profile = builder.create_optimization_profile() + min_shape = (1, 3, min_height, min_width) + opt_shape = (1, 3, opt_height, opt_width) + max_shape = (1, 3, max_height, max_width) + + profile.set_shape("frame1", min_shape, opt_shape, max_shape) + profile.set_shape("frame2", min_shape, opt_shape, max_shape) + config.add_optimization_profile(profile) + + logger.info("Building TensorRT engine... (this will take a while)") + engine = builder.build_serialized_network(network, config) + + if engine is None: + logger.error("Failed to build TensorRT engine") + return False + + logger.info(f"Saving engine to {engine_path}") + engine_path.parent.mkdir(parents=True, exist_ok=True) + with open(engine_path, "wb") as f: + f.write(engine) + + logger.info(f"Successfully built TensorRT engine: {engine_path}") + logger.info(f"Engine size: {engine_path.stat().st_size / (1024 * 1024):.2f} MB") + + return True + + except Exception as e: + logger.error(f"Failed to build TensorRT engine: {e}") + import traceback + + traceback.print_exc() + return False + + +class TensorRTEngine: + """TensorRT engine wrapper for RAFT optical flow inference. + + Ported from StreamDiffusion's temporal_net_tensorrt.py. + Handles dynamic shapes and buffer reallocation. + """ + + def __init__(self, engine_path: str | Path): + """Initialize the engine wrapper. + + Args: + engine_path: Path to the TensorRT engine file + """ + self.engine_path = ( + Path(engine_path) if isinstance(engine_path, str) else engine_path + ) + self.engine = None + self.context = None + self.tensors: OrderedDict[str, torch.Tensor] = OrderedDict() + self._cuda_stream = None + + def load(self): + """Load TensorRT engine from file.""" + try: + import tensorrt as trt # noqa: F401 + from polygraphy.backend.common import bytes_from_path + from polygraphy.backend.trt import engine_from_bytes + except ImportError as e: + raise ImportError( + "TensorRT and polygraphy are required. " + "Install with: uv sync --group tensorrt" + ) from e + + if not self.engine_path.exists(): + raise FileNotFoundError(f"Engine file not found: {self.engine_path}") + + logger.info(f"Loading TensorRT engine: {self.engine_path}") + self.engine = engine_from_bytes(bytes_from_path(str(self.engine_path))) + + def activate(self): + """Create execution context.""" + if self.engine is None: + raise RuntimeError("Engine must be loaded before activation") + + self.context = self.engine.create_execution_context() + self._cuda_stream = torch.cuda.current_stream().cuda_stream + + def allocate_buffers(self, device: str = "cuda", input_shape: tuple | None = None): + """Allocate input/output buffers. + + Args: + device: Device to allocate tensors on + input_shape: Shape for input tensors (B, C, H, W). Required for dynamic shapes. + """ + try: + import tensorrt as trt + except ImportError as e: + raise ImportError("TensorRT is required") from e + + if self.context is None: + raise RuntimeError("Context must be activated before buffer allocation") + + for idx in range(self.engine.num_io_tensors): + name = self.engine.get_tensor_name(idx) + shape = self.context.get_tensor_shape(name) + dtype = trt.nptype(self.engine.get_tensor_dtype(name)) + + if self.engine.get_tensor_mode(name) == trt.TensorIOMode.INPUT: + # For dynamic shapes, use provided input_shape + if input_shape is not None and any(dim == -1 for dim in shape): + shape = input_shape + self.context.set_input_shape(name, shape) + # Update shape after setting it + shape = self.context.get_tensor_shape(name) + else: + # For output tensors, get shape after input shapes are set + shape = self.context.get_tensor_shape(name) + + # Verify shape has no dynamic dimensions + if any(dim == -1 for dim in shape): + raise RuntimeError( + f"Tensor '{name}' still has dynamic dimensions {shape}. " + f"Please provide input_shape parameter to allocate_buffers()." + ) + + tensor = torch.empty( + tuple(shape), dtype=numpy_to_torch_dtype_dict[dtype] + ).to(device=device) + self.tensors[name] = tensor + + def infer( + self, feed_dict: dict[str, torch.Tensor], stream: int | None = None + ) -> OrderedDict[str, torch.Tensor]: + """Run inference with optional stream parameter. + + Handles dynamic shape reallocation if input shapes change. + + Args: + feed_dict: Dictionary mapping input names to tensors + For RAFT: {"frame1": tensor, "frame2": tensor} + stream: Optional CUDA stream handle. Uses cached stream if None. + + Returns: + OrderedDict of output tensors (contains "flow" key) + """ + try: + import tensorrt as trt + except ImportError as e: + raise ImportError("TensorRT is required") from e + + if stream is None: + stream = self._cuda_stream + + # Check if we need to update tensor shapes for dynamic dimensions + need_realloc = False + for name, buf in feed_dict.items(): + if name in self.tensors: + if self.tensors[name].shape != buf.shape: + need_realloc = True + break + + # Reallocate buffers if input shape changed + if need_realloc: + # Update input shapes + for name, buf in feed_dict.items(): + if self.engine.get_tensor_mode(name) == trt.TensorIOMode.INPUT: + self.context.set_input_shape(name, buf.shape) + + # Reallocate all tensors with new shapes + for idx in range(self.engine.num_io_tensors): + name = self.engine.get_tensor_name(idx) + shape = self.context.get_tensor_shape(name) + dtype = trt.nptype(self.engine.get_tensor_dtype(name)) + + tensor = torch.empty( + tuple(shape), dtype=numpy_to_torch_dtype_dict[dtype] + ).to(device=self.tensors[name].device) + self.tensors[name] = tensor + + # Copy input data to tensors + for name, buf in feed_dict.items(): + if name in self.tensors: + self.tensors[name].copy_(buf) + + # Set tensor addresses + for name, tensor in self.tensors.items(): + self.context.set_tensor_address(name, tensor.data_ptr()) + + # Execute inference + success = self.context.execute_async_v3(stream) + if not success: + raise ValueError("TensorRT inference failed") + + return self.tensors + + def __del__(self): + """Cleanup resources.""" + self.tensors.clear() + self.context = None + self.engine = None diff --git a/src/scope/core/pipelines/optical_flow/pipeline.py b/src/scope/core/pipelines/optical_flow/pipeline.py new file mode 100644 index 000000000..14eb79888 --- /dev/null +++ b/src/scope/core/pipelines/optical_flow/pipeline.py @@ -0,0 +1,544 @@ +"""Optical Flow Pipeline for VACE conditioning. + +Computes optical flow between consecutive frames using RAFT and converts it to +RGB visualization. Uses TensorRT acceleration when available and enabled, +otherwise falls back to PyTorch RAFT. +""" + +import logging +import time +from typing import TYPE_CHECKING + +import torch +import torch.nn.functional as F +from torchvision.models.optical_flow import Raft_Small_Weights +from torchvision.utils import flow_to_image + +from ..interface import Pipeline, Requirements +from .download import get_engine_path, get_models_dir, get_onnx_path +from .schema import OpticalFlowConfig + +if TYPE_CHECKING: + from ..schema import BasePipelineConfig + +logger = logging.getLogger(__name__) + + +def _is_tensorrt_available() -> bool: + """Check if TensorRT and polygraphy are available.""" + try: + import tensorrt # noqa: F401 + from polygraphy.backend.trt import engine_from_bytes # noqa: F401 + + return True + except ImportError: + return False + + +class OpticalFlowPipeline(Pipeline): + """Optical flow pipeline for VACE conditioning. + + This pipeline computes optical flow between consecutive frames using + RAFT (Recurrent All-Pairs Field Transforms) and converts it to RGB + visualization for VACE/ControlNet conditioning. + + When TensorRT is available and enabled via config, uses TensorRT + acceleration. Otherwise falls back to PyTorch RAFT inference. + + The TensorRT engine is lazily initialized on first use, automatically + exporting RAFT from torchvision and compiling it to a GPU-specific engine. + """ + + @classmethod + def get_config_class(cls) -> type["BasePipelineConfig"]: + return OpticalFlowConfig + + def __init__( + self, + config, + device: torch.device | None = None, + ): + """Initialize the optical flow pipeline. + + Args: + config: Pipeline configuration with model_size and use_tensorrt settings + device: Target device (defaults to CUDA if available) + """ + from .engine import DEFAULT_HEIGHT, DEFAULT_WIDTH + + self.device = ( + device + if device is not None + else torch.device("cuda" if torch.cuda.is_available() else "cpu") + ) + self._height = DEFAULT_HEIGHT + self._width = DEFAULT_WIDTH + self._flow_strength = 1.0 + + # Read settings from config (set by pipeline_manager from load_params) + model_size = getattr(config, "model_size", "large") + use_tensorrt_config = getattr(config, "use_tensorrt", False) + + # Model configuration + self._use_large_model = model_size == "large" + self._model_name = "raft_large" if self._use_large_model else "raft_small" + + # RAFT Large requires FP32 for TensorRT (FP16 causes NaN) + # RAFT Small can use FP16 + self._fp16 = not self._use_large_model + + # Determine backend (config setting AND runtime availability) + self._use_tensorrt = use_tensorrt_config and _is_tensorrt_available() + + self._trt_engine = None + self._pytorch_model = None + self._raft_weights = None # Cached weights for transforms + self._is_cuda_available = torch.cuda.is_available() + + # Previous frame tracking for flow computation + self._prev_input: torch.Tensor | None = None + self._first_frame = True + + start = time.time() + backend = "TensorRT" if self._use_tensorrt else "PyTorch" + model_size_str = "Large" if self._use_large_model else "Small" + logger.info( + f"Optical Flow pipeline initialized with {backend} backend, " + f"RAFT {model_size_str} model (loads on first use)" + ) + if not self._use_tensorrt and use_tensorrt_config: + logger.warning( + "TensorRT not available, using PyTorch fallback. " + "Install with: uv sync --group tensorrt" + ) + logger.info(f"Initialization time: {time.time() - start:.3f}s") + + def _ensure_tensorrt_engine(self): + """Lazily initialize the TensorRT engine. + + Exports RAFT to ONNX and compiles to TRT on first call. + + Returns: + Initialized TensorRTEngine + + Raises: + RuntimeError: If ONNX export or TensorRT engine build fails + """ + if self._trt_engine is not None: + return self._trt_engine + + from .engine import ( + TensorRTEngine, + build_tensorrt_engine, + export_raft_to_onnx, + get_gpu_name, + ) + + models_dir = get_models_dir() + gpu_name = get_gpu_name() + + # Get paths (include model name for separate small/large engines) + onnx_path = get_onnx_path( + models_dir, self._height, self._width, self._model_name + ) + engine_path = get_engine_path( + models_dir, self._height, self._width, gpu_name, self._model_name + ) + + # Export RAFT to ONNX if needed + if not onnx_path.exists(): + model_size = "Large" if self._use_large_model else "Small" + logger.info(f"Exporting RAFT {model_size} model to ONNX...") + success = export_raft_to_onnx( + onnx_path=onnx_path, + height=self._height, + width=self._width, + device=str(self.device), + use_large_model=self._use_large_model, + ) + if not success: + raise RuntimeError("Failed to export RAFT to ONNX") + + # Build TensorRT engine if needed + if not engine_path.exists(): + logger.info("Building TensorRT engine...") + success = build_tensorrt_engine( + onnx_path=onnx_path, + engine_path=engine_path, + min_height=self._height, + min_width=self._width, + max_height=self._height, + max_width=self._width, + fp16=self._fp16, + ) + if not success: + raise RuntimeError("Failed to build TensorRT engine") + + # Load and activate engine + logger.info(f"Loading TensorRT engine: {engine_path}") + self._trt_engine = TensorRTEngine(engine_path) + self._trt_engine.load() + self._trt_engine.activate() + + # Allocate buffers with input shape for dynamic shapes + input_shape = (1, 3, self._height, self._width) + self._trt_engine.allocate_buffers( + device=str(self.device), input_shape=input_shape + ) + + logger.info( + f"Optical Flow TensorRT engine initialized: {self._height}x{self._width}" + ) + return self._trt_engine + + def _ensure_pytorch_model(self): + """Lazily initialize the PyTorch RAFT model. + + Returns: + Initialized RAFT model + """ + if self._pytorch_model is not None: + return self._pytorch_model + + from .engine import load_raft_model + + model_size = "Large" if self._use_large_model else "Small" + logger.info(f"Loading PyTorch RAFT {model_size} model...") + start = time.time() + + self._pytorch_model, self._raft_weights = load_raft_model( + self._use_large_model, device=str(self.device) + ) + + logger.info( + f"Loaded PyTorch RAFT {model_size} model in {time.time() - start:.3f}s" + ) + return self._pytorch_model + + def _get_raft_weights(self): + """Get the appropriate RAFT weights based on model size. + + Returns cached weights if available, otherwise loads them. + """ + if self._raft_weights is not None: + return self._raft_weights + + # Load weights on demand (for TensorRT path where model isn't loaded) + from torchvision.models.optical_flow import Raft_Large_Weights + + self._raft_weights = ( + Raft_Large_Weights.DEFAULT + if self._use_large_model + else Raft_Small_Weights.DEFAULT + ) + return self._raft_weights + + def _compute_optical_flow_tensorrt( + self, frame1: torch.Tensor, frame2: torch.Tensor + ) -> torch.Tensor: + """Compute optical flow between two frames using TensorRT-accelerated RAFT. + + Args: + frame1: First frame tensor (CHW format, [0,1]) + frame2: Second frame tensor (CHW format, [0,1]) + + Returns: + Optical flow tensor (2HW format) + """ + from .engine import apply_raft_transforms + + engine = self._ensure_tensorrt_engine() + + # Prepare inputs for TensorRT + frame1_batch = frame1.unsqueeze(0) + frame2_batch = frame2.unsqueeze(0) + + # Apply RAFT preprocessing transforms + weights = self._get_raft_weights() + frame1_batch, frame2_batch = apply_raft_transforms( + weights, frame1_batch, frame2_batch + ) + + # Run TensorRT inference + feed_dict = {"frame1": frame1_batch, "frame2": frame2_batch} + + cuda_stream = torch.cuda.current_stream().cuda_stream + result = engine.infer(feed_dict, cuda_stream) + flow = result["flow"][0] # Remove batch dimension + + return flow + + def _compute_optical_flow_pytorch( + self, frame1: torch.Tensor, frame2: torch.Tensor + ) -> torch.Tensor: + """Compute optical flow between two frames using PyTorch RAFT. + + Args: + frame1: First frame tensor (CHW format, [0,1]) + frame2: Second frame tensor (CHW format, [0,1]) + + Returns: + Optical flow tensor (2HW format) + """ + from .engine import apply_raft_transforms + + model = self._ensure_pytorch_model() + + # Prepare inputs + frame1_batch = frame1.unsqueeze(0) + frame2_batch = frame2.unsqueeze(0) + + # Apply RAFT preprocessing transforms + weights = self._get_raft_weights() + frame1_batch, frame2_batch = apply_raft_transforms( + weights, frame1_batch, frame2_batch + ) + + # Run PyTorch inference + with torch.no_grad(): + # RAFT returns a list of flow predictions at different iterations + # We take the last (most refined) prediction + flow_predictions = model(frame1_batch, frame2_batch) + flow = flow_predictions[-1][0] # Last prediction, remove batch dim + + return flow + + def _compute_optical_flow( + self, frame1: torch.Tensor, frame2: torch.Tensor + ) -> torch.Tensor: + """Compute optical flow between two frames using the configured backend. + + Args: + frame1: First frame tensor (CHW format, [0,1]) + frame2: Second frame tensor (CHW format, [0,1]) + + Returns: + Optical flow tensor (2HW format) + """ + if self._use_tensorrt: + return self._compute_optical_flow_tensorrt(frame1, frame2) + else: + return self._compute_optical_flow_pytorch(frame1, frame2) + + def _compute_flow_to_rgb_tensor( + self, prev_input_tensor: torch.Tensor, current_input_tensor: torch.Tensor + ) -> torch.Tensor: + """Compute optical flow and convert to RGB visualization. + + Args: + prev_input_tensor: Previous input frame tensor (CHW format, [0,1]) on GPU + current_input_tensor: Current input frame tensor (CHW format, [0,1]) on GPU + + Returns: + Flow visualization as RGB tensor (CHW format, [0,1]) on GPU + """ + # Convert to float32 for processing + prev_tensor = prev_input_tensor.to(device=self.device, dtype=torch.float32) + current_tensor = current_input_tensor.to( + device=self.device, dtype=torch.float32 + ) + + # Resize for flow computation if needed + prev_resized = self._resize_flow_to_target( + prev_tensor, self._height, self._width + ) + current_resized = self._resize_flow_to_target( + current_tensor, self._height, self._width + ) + + # Compute optical flow: prev_input -> current_input + flow = self._compute_optical_flow(prev_resized, current_resized) + + # Apply flow strength scaling + if self._flow_strength != 1.0: + flow = flow * self._flow_strength + + # Convert flow to RGB visualization using torchvision's flow_to_image + # flow_to_image expects (2, H, W) and returns (3, H, W) in range [0, 255] + flow_rgb = flow_to_image(flow) # Returns uint8 tensor [0, 255] + + # Convert to float [0, 1] range + return flow_rgb.float() / 255.0 + + def reset(self): + """Reset the pipeline state. + + Call this between different video sequences to clear the + previous frame buffer. + """ + self._first_frame = True + self._prev_input = None + + def prepare(self, **kwargs) -> Requirements: + """Return pipeline requirements. + + Returns: + Requirements specifying input_size needed for temporal consistency + """ + return Requirements(input_size=4) + + def _normalize_frame( + self, frame, target_shape: tuple | None + ) -> tuple[torch.Tensor, tuple]: + """Normalize a single frame to consistent format. + + Args: + frame: Input frame (tensor or numpy array) + target_shape: Target shape for consistency, or None to use this frame's shape + + Returns: + Tuple of (normalized frame tensor in HWC [0,1], target_shape) + """ + if isinstance(frame, torch.Tensor): + frame_tensor = frame + else: + frame_tensor = torch.from_numpy(frame) + + # Squeeze T dimension: (1, H, W, C) -> (H, W, C) + frame_tensor = frame_tensor.squeeze(0) + + # Use first frame's shape as target for consistency + if target_shape is None: + target_shape = frame_tensor.shape + elif frame_tensor.shape != target_shape: + # Resize frame to match target shape + frame_chw = frame_tensor.permute(2, 0, 1).unsqueeze(0).float() + frame_chw = F.interpolate( + frame_chw, + size=(target_shape[0], target_shape[1]), + mode="bilinear", + align_corners=False, + ) + frame_tensor = frame_chw.squeeze(0).permute(1, 2, 0) + + # Normalize to [0, 1] if needed + if frame_tensor.max() > 1.0: + frame_tensor = frame_tensor.float() / 255.0 + else: + frame_tensor = frame_tensor.float() + + return frame_tensor, target_shape + + def _resize_flow_to_target( + self, flow: torch.Tensor, h: int, w: int + ) -> torch.Tensor: + """Resize flow tensor to target dimensions if needed. + + Args: + flow: Flow tensor in CHW format + h: Target height + w: Target width + + Returns: + Resized flow tensor + """ + if flow.shape[-2] != h or flow.shape[-1] != w: + flow = F.interpolate( + flow.unsqueeze(0), + size=(h, w), + mode="bilinear", + align_corners=False, + ).squeeze(0) + return flow + + def _create_zero_flow(self, h: int, w: int) -> torch.Tensor: + """Create a zero flow tensor. + + Args: + h: Height + w: Width + + Returns: + Zero flow tensor in CHW format + """ + return torch.zeros( + 3, + h, + w, + device=self.device, + dtype=torch.float32, + ) + + def __call__(self, **kwargs) -> torch.Tensor: + """Process video frames and return optical flow visualizations. + + Args: + video: Input video frames as list of tensors (THWC format, [0, 255] range) + + Returns: + Flow maps as tensor in THWC format with values in [0, 1] range, + rendered as RGB visualizations. + """ + video = kwargs.get("video") + if video is None: + raise ValueError("Input video cannot be None for OpticalFlowPipeline") + + # Reset state for each new call + self.reset() + + # Normalize all frames + frames = [] + target_shape = None + for frame in video: + frame_tensor, target_shape = self._normalize_frame(frame, target_shape) + frames.append(frame_tensor) + + num_frames = len(frames) + h, w = target_shape[0], target_shape[1] + + # Process each frame to compute optical flow + flow_frames = [] + first_computed_flow = None + + for i in range(num_frames): + # HWC -> CHW + frame_chw = frames[i].permute(2, 0, 1) + + # Ensure on GPU + if self._is_cuda_available and not frame_chw.is_cuda: + frame_chw = frame_chw.to(self.device) + + if self._prev_input is not None and not self._first_frame: + try: + # Compute optical flow between prev_input -> current_input + flow_rgb = self._compute_flow_to_rgb_tensor( + self._prev_input, frame_chw + ) + # Store the first computed flow for duplicating to frame 0 + # (matches VACE's FlowVisAnnotator behavior) + if first_computed_flow is None: + first_computed_flow = flow_rgb.clone() + except Exception as e: + logger.error(f"Optical flow computation failed: {e}") + # Fallback: zero flow + flow_rgb = self._create_zero_flow(self._height, self._width) + else: + # First frame: placeholder, will be replaced with first computed flow + self._first_frame = False + flow_rgb = None # Placeholder + + # Resize to target size if needed + if flow_rgb is not None: + flow_rgb = self._resize_flow_to_target(flow_rgb, h, w) + + # Store current input as previous for next frame + self._prev_input = frame_chw.clone() + + flow_frames.append(flow_rgb) + + # Replace first frame's placeholder with duplicated first computed flow + # This matches VACE's behavior: flow_up_vis_list[:1] + flow_up_vis_list + if first_computed_flow is not None: + flow_frames[0] = self._resize_flow_to_target(first_computed_flow, h, w) + else: + # Single frame or no valid flow computed - use zero flow + flow_frames[0] = self._create_zero_flow(h, w) + + # Stack flows: [T, C, H, W] + flow_tensor = torch.stack(flow_frames, dim=0) + + # Convert from CHW to HWC: [T, C, H, W] -> [T, H, W, C] + flow_tensor = flow_tensor.permute(0, 2, 3, 1) + + # Output is already in [0, 1] range in THWC format + return flow_tensor diff --git a/src/scope/core/pipelines/optical_flow/schema.py b/src/scope/core/pipelines/optical_flow/schema.py new file mode 100644 index 000000000..446f67299 --- /dev/null +++ b/src/scope/core/pipelines/optical_flow/schema.py @@ -0,0 +1,34 @@ +"""Configuration schema for Optical Flow pipeline.""" + +from typing import Literal + +from ..base_schema import BasePipelineConfig, ModeDefaults, UsageType + + +class OpticalFlowConfig(BasePipelineConfig): + """Configuration for Optical Flow pipeline. + + This pipeline computes optical flow between consecutive video frames using + RAFT (Recurrent All-Pairs Field Transforms). When TensorRT is available and + enabled, it uses TensorRT acceleration; otherwise falls back to PyTorch. + The flow is converted to RGB visualization for VACE/ControlNet conditioning. + """ + + pipeline_id = "optical-flow" + pipeline_name = "Optical Flow" + pipeline_description = ( + "Optical flow computation using RAFT model. " + "Produces RGB flow visualizations for video conditioning. " + "Supports TensorRT acceleration when available." + ) + docs_url = "https://pytorch.org/vision/main/models/raft.html" + artifacts = [] # RAFT from torchvision, ONNX/TRT built locally + supports_prompts = False + modified = True + usage = [UsageType.PREPROCESSOR] + + modes = {"video": ModeDefaults(default=True)} + + # User-configurable settings + model_size: Literal["small", "large"] = "small" + use_tensorrt: bool = False diff --git a/src/scope/core/pipelines/optical_flow/test_tensorrt.py b/src/scope/core/pipelines/optical_flow/test_tensorrt.py new file mode 100644 index 000000000..da8d183bb --- /dev/null +++ b/src/scope/core/pipelines/optical_flow/test_tensorrt.py @@ -0,0 +1,186 @@ +"""Test script for RAFT TensorRT export and inference. + +Run with: uv run python -m scope.core.pipelines.optical_flow.test_tensorrt +""" + +import logging + +import torch + +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger(__name__) + + +def test_raft_tensorrt(use_large_model: bool = True, clean_start: bool = True): + """Test RAFT ONNX export and TensorRT engine build. + + Args: + use_large_model: If True, test RAFT Large. If False, test RAFT Small. + clean_start: If True, delete existing ONNX/engine files first. + """ + from .download import get_engine_path, get_models_dir, get_onnx_path + from .engine import ( + DEFAULT_HEIGHT, + DEFAULT_WIDTH, + TensorRTEngine, + build_tensorrt_engine, + export_raft_to_onnx, + get_gpu_name, + ) + + model_name = "raft_large" if use_large_model else "raft_small" + model_size = "Large" if use_large_model else "Small" + height, width = DEFAULT_HEIGHT, DEFAULT_WIDTH + device = "cuda" + + logger.info("=" * 60) + logger.info(f"Testing RAFT {model_size} with TensorRT") + logger.info("=" * 60) + + # Get paths + models_dir = get_models_dir() + gpu_name = get_gpu_name() + onnx_path = get_onnx_path(models_dir, height, width, model_name) + engine_path = get_engine_path(models_dir, height, width, gpu_name, model_name) + + logger.info(f"Models dir: {models_dir}") + logger.info(f"ONNX path: {onnx_path}") + logger.info(f"Engine path: {engine_path}") + logger.info(f"GPU: {gpu_name}") + + # Clean start if requested + if clean_start: + if onnx_path.exists(): + logger.info(f"Deleting existing ONNX: {onnx_path}") + onnx_path.unlink() + if engine_path.exists(): + logger.info(f"Deleting existing engine: {engine_path}") + engine_path.unlink() + + # Step 1: Export to ONNX + logger.info(f"\n{'=' * 60}") + logger.info("Step 1: Export RAFT to ONNX") + logger.info(f"{'=' * 60}") + + if not onnx_path.exists(): + success = export_raft_to_onnx( + onnx_path=onnx_path, + height=height, + width=width, + device=device, + use_large_model=use_large_model, + ) + if not success: + logger.error("ONNX export failed!") + return False + logger.info("ONNX export succeeded!") + else: + logger.info(f"ONNX already exists: {onnx_path}") + + # Step 2: Build TensorRT engine + logger.info(f"\n{'=' * 60}") + logger.info("Step 2: Build TensorRT engine") + logger.info(f"{'=' * 60}") + + if not engine_path.exists(): + success = build_tensorrt_engine( + onnx_path=onnx_path, + engine_path=engine_path, + min_height=height, + min_width=width, + max_height=height, + max_width=width, + fp16=True, + ) + if not success: + logger.error("TensorRT engine build failed!") + return False + logger.info("TensorRT engine build succeeded!") + else: + logger.info(f"Engine already exists: {engine_path}") + + # Step 3: Test inference + logger.info(f"\n{'=' * 60}") + logger.info("Step 3: Test TensorRT inference") + logger.info(f"{'=' * 60}") + + try: + from torchvision.models.optical_flow import ( + Raft_Large_Weights, + Raft_Small_Weights, + ) + + from .engine import apply_raft_transforms + + # Load engine + engine = TensorRTEngine(engine_path) + engine.load() + engine.activate() + + input_shape = (1, 3, height, width) + engine.allocate_buffers(device=device, input_shape=input_shape) + + # Create test inputs + frame1 = torch.randn(1, 3, height, width, device=device) + frame2 = torch.randn(1, 3, height, width, device=device) + + # Apply RAFT transforms + weights = ( + Raft_Large_Weights.DEFAULT + if use_large_model + else Raft_Small_Weights.DEFAULT + ) + frame1, frame2 = apply_raft_transforms(weights, frame1, frame2) + + # Run inference + feed_dict = {"frame1": frame1, "frame2": frame2} + cuda_stream = torch.cuda.current_stream().cuda_stream + result = engine.infer(feed_dict, cuda_stream) + + flow = result["flow"] + logger.info("Inference succeeded!") + logger.info(f"Flow shape: {flow.shape}") + logger.info(f"Flow dtype: {flow.dtype}") + logger.info(f"Flow min: {flow.min().item():.4f}, max: {flow.max().item():.4f}") + + del engine + torch.cuda.empty_cache() + + logger.info(f"\n{'=' * 60}") + logger.info(f"SUCCESS: RAFT {model_size} + TensorRT works!") + logger.info(f"{'=' * 60}") + return True + + except Exception as e: + logger.error(f"Inference failed: {e}") + import traceback + + traceback.print_exc() + return False + + +def main(): + import argparse + + parser = argparse.ArgumentParser( + description="Test RAFT TensorRT export and inference" + ) + parser.add_argument( + "--small", action="store_true", help="Test RAFT Small instead of Large" + ) + parser.add_argument( + "--no-clean", action="store_true", help="Don't delete existing files" + ) + args = parser.parse_args() + + use_large = not args.small + clean_start = not args.no_clean + + success = test_raft_tensorrt(use_large_model=use_large, clean_start=clean_start) + exit(0 if success else 1) + + +if __name__ == "__main__": + main() diff --git a/src/scope/core/pipelines/registry.py b/src/scope/core/pipelines/registry.py index c0955fbbc..d8958369b 100644 --- a/src/scope/core/pipelines/registry.py +++ b/src/scope/core/pipelines/registry.py @@ -143,6 +143,16 @@ def _register_pipelines(): "ControllerVisualizerPipeline", ), ("rife", ".rife.pipeline", "RIFEPipeline"), + ( + "optical_flow", + ".optical_flow.pipeline", + "OpticalFlowPipeline", + ), + ( + "sam3", + ".sam3.pipeline", + "SAM3Pipeline", + ), ] # Try to import and register each pipeline diff --git a/src/scope/core/pipelines/sam3/__init__.py b/src/scope/core/pipelines/sam3/__init__.py new file mode 100644 index 000000000..c40802559 --- /dev/null +++ b/src/scope/core/pipelines/sam3/__init__.py @@ -0,0 +1,5 @@ +"""SAM3 pipeline for real-time object masking and tracking.""" + +from .pipeline import SAM3Pipeline + +__all__ = ["SAM3Pipeline"] diff --git a/src/scope/core/pipelines/sam3/pipeline.py b/src/scope/core/pipelines/sam3/pipeline.py new file mode 100644 index 000000000..09bc099eb --- /dev/null +++ b/src/scope/core/pipelines/sam3/pipeline.py @@ -0,0 +1,385 @@ +"""SAM3 Pipeline for real-time object masking and tracking. + +Uses Meta's Segment Anything Model 3 (SAM3) for open-vocabulary segmentation +and video tracking. Supports text prompts to specify what objects to segment. +""" + +import logging +import time +from typing import TYPE_CHECKING + +import torch +import torch.nn.functional as F + +from ..interface import Pipeline, Requirements +from .schema import SAM3Config + +if TYPE_CHECKING: + from ..base_schema import BasePipelineConfig + +logger = logging.getLogger(__name__) + + +def _is_sam3_available() -> bool: + """Check if sam3 package is available.""" + try: + import sam3 # noqa: F401 + + return True + except ImportError: + return False + + +class SAM3Pipeline(Pipeline): + """SAM3 pipeline for real-time object masking and tracking. + + This pipeline uses Meta's Segment Anything Model 3 (SAM3) for + open-vocabulary segmentation. Unlike SAM1/SAM2 which required + visual prompts (clicks/boxes), SAM3 can segment objects based + on text descriptions. + + The model is lazily loaded on first use to minimize startup time. + """ + + @classmethod + def get_config_class(cls) -> type["BasePipelineConfig"]: + return SAM3Config + + def __init__( + self, + config: SAM3Config, + device: torch.device | None = None, + ): + """Initialize the SAM3 pipeline. + + Args: + config: Pipeline configuration + device: Target device (defaults to CUDA if available) + """ + self.device = ( + device + if device is not None + else torch.device("cuda" if torch.cuda.is_available() else "cpu") + ) + + # Configuration from schema + self._segment_prompt = getattr(config, "segment_prompt", "person") + self._output_mode = getattr(config, "output_mode", "mask") + self._mask_color = getattr(config, "mask_color", (0, 255, 0)) + self._mask_opacity = getattr(config, "mask_opacity", 0.5) + self._confidence_threshold = getattr(config, "confidence_threshold", 0.5) + self._enable_tracking = getattr(config, "enable_tracking", True) + + # Lazy-loaded model components + self._image_model = None + self._image_processor = None + self._video_predictor = None + self._video_session_id = None + + # State tracking + self._is_sam3_available = _is_sam3_available() + self._current_prompt = self._segment_prompt + self._frame_index = 0 + + start = time.time() + if self._is_sam3_available: + logger.info( + f"SAM3 pipeline initialized (model loads on first use). " + f"Segment prompt: '{self._segment_prompt}'" + ) + else: + logger.warning( + "SAM3 package not available. Install with: pip install sam3 " + "(requires access to facebook/sam3 on HuggingFace)" + ) + logger.info(f"Initialization time: {time.time() - start:.3f}s") + + def _ensure_image_model(self): + """Lazily initialize the SAM3 image model. + + Returns: + Tuple of (model, processor) + """ + if self._image_model is not None: + return self._image_model, self._image_processor + + if not self._is_sam3_available: + raise RuntimeError( + "SAM3 package not available. Install with: pip install sam3" + ) + + from sam3.model.sam3_image_processor import Sam3Processor + from sam3.model_builder import build_sam3_image_model + + logger.info("Loading SAM3 image model...") + start = time.time() + + self._image_model = build_sam3_image_model() + self._image_model = self._image_model.to(self.device) + self._image_model.eval() + self._image_processor = Sam3Processor(self._image_model) + + logger.info(f"SAM3 image model loaded in {time.time() - start:.3f}s") + return self._image_model, self._image_processor + + def _ensure_video_predictor(self): + """Lazily initialize the SAM3 video predictor. + + Returns: + Video predictor instance + """ + if self._video_predictor is not None: + return self._video_predictor + + if not self._is_sam3_available: + raise RuntimeError( + "SAM3 package not available. Install with: pip install sam3" + ) + + from sam3.model_builder import build_sam3_video_predictor + + logger.info("Loading SAM3 video predictor...") + start = time.time() + + self._video_predictor = build_sam3_video_predictor() + + logger.info(f"SAM3 video predictor loaded in {time.time() - start:.3f}s") + return self._video_predictor + + def _segment_frame_image_mode( + self, frame: torch.Tensor, prompt: str + ) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]: + """Segment a single frame using image mode. + + Args: + frame: Input frame tensor (HWC format, [0,1] range) + prompt: Text prompt for segmentation + + Returns: + Tuple of (masks, boxes, scores) tensors + """ + import numpy as np + from PIL import Image + + _, processor = self._ensure_image_model() + + # Convert tensor to PIL Image + frame_np = (frame.cpu().numpy() * 255).astype(np.uint8) + pil_image = Image.fromarray(frame_np) + + # Run segmentation + inference_state = processor.set_image(pil_image) + output = processor.set_text_prompt(state=inference_state, prompt=prompt) + + masks = output["masks"] + boxes = output["boxes"] + scores = output["scores"] + + return masks, boxes, scores + + def _apply_output_mode( + self, + frame: torch.Tensor, + masks: torch.Tensor, + scores: torch.Tensor, + ) -> torch.Tensor: + """Apply the configured output mode to produce final output. + + Args: + frame: Original input frame (HWC format, [0,1] range) + masks: Segmentation masks from SAM3 + scores: Confidence scores for each mask + + Returns: + Output frame tensor (HWC format, [0,1] range) + """ + h, w = frame.shape[:2] + + # Filter masks by confidence threshold + if scores is not None and len(scores) > 0: + valid_indices = scores >= self._confidence_threshold + if isinstance(valid_indices, torch.Tensor): + valid_indices = valid_indices.cpu() + masks = masks[valid_indices] if valid_indices.any() else masks[:0] + + # Combine all valid masks into a single mask + if masks is not None and len(masks) > 0: + # Convert masks to tensor if needed + if not isinstance(masks, torch.Tensor): + masks = torch.from_numpy(masks) + masks = masks.to(self.device) + + # Combine masks (union of all detected instances) + combined_mask = masks.any(dim=0).float() + + # Resize mask to frame size if needed + if combined_mask.shape[-2:] != (h, w): + combined_mask = F.interpolate( + combined_mask.unsqueeze(0).unsqueeze(0), + size=(h, w), + mode="bilinear", + align_corners=False, + ).squeeze() + else: + # No valid masks - create empty mask + combined_mask = torch.zeros(h, w, device=self.device) + + # Ensure frame is on device + frame = frame.to(self.device) + + if self._output_mode == "mask": + # Return binary mask as grayscale image (3 channels for consistency) + mask_3ch = combined_mask.unsqueeze(-1).expand(-1, -1, 3) + return mask_3ch + + elif self._output_mode == "overlay": + # Overlay colored mask on original frame + mask_color = ( + torch.tensor(self._mask_color, dtype=torch.float32, device=self.device) + / 255.0 + ) + mask_expanded = combined_mask.unsqueeze(-1) + + # Blend: output = frame * (1 - mask*opacity) + color * mask * opacity + overlay = frame * (1 - mask_expanded * self._mask_opacity) + overlay = overlay + mask_color * mask_expanded * self._mask_opacity + return overlay.clamp(0, 1) + + elif self._output_mode == "cutout": + # Show only the segmented object, black background + mask_expanded = combined_mask.unsqueeze(-1) + return frame * mask_expanded + + else: + # Fallback to mask mode + mask_3ch = combined_mask.unsqueeze(-1).expand(-1, -1, 3) + return mask_3ch + + def reset(self): + """Reset the pipeline state. + + Call this between different video sequences to clear tracking state. + """ + self._frame_index = 0 + self._video_session_id = None + + def prepare(self, **kwargs) -> Requirements: + """Return pipeline requirements. + + Returns: + Requirements specifying minimum input_size + """ + return Requirements(input_size=1) + + def update_prompt(self, prompt: str): + """Update the segmentation prompt. + + Args: + prompt: New text prompt for segmentation + """ + if prompt != self._current_prompt: + self._current_prompt = prompt + logger.info(f"Updated segment prompt to: '{prompt}'") + # Reset tracking when prompt changes + self.reset() + + def _normalize_frame( + self, frame, target_shape: tuple | None + ) -> tuple[torch.Tensor, tuple]: + """Normalize a single frame to consistent format. + + Args: + frame: Input frame (tensor or numpy array) + target_shape: Target shape for consistency, or None to use this frame's shape + + Returns: + Tuple of (normalized frame tensor in HWC [0,1], target_shape) + """ + if isinstance(frame, torch.Tensor): + frame_tensor = frame + else: + frame_tensor = torch.from_numpy(frame) + + # Squeeze T dimension: (1, H, W, C) -> (H, W, C) + frame_tensor = frame_tensor.squeeze(0) + + # Use first frame's shape as target for consistency + if target_shape is None: + target_shape = frame_tensor.shape + elif frame_tensor.shape != target_shape: + # Resize frame to match target shape + frame_chw = frame_tensor.permute(2, 0, 1).unsqueeze(0).float() + frame_chw = F.interpolate( + frame_chw, + size=(target_shape[0], target_shape[1]), + mode="bilinear", + align_corners=False, + ) + frame_tensor = frame_chw.squeeze(0).permute(1, 2, 0) + + # Normalize to [0, 1] if needed + if frame_tensor.max() > 1.0: + frame_tensor = frame_tensor.float() / 255.0 + else: + frame_tensor = frame_tensor.float() + + return frame_tensor, target_shape + + def __call__(self, **kwargs) -> torch.Tensor: + """Process video frames and return segmentation masks/overlays. + + Args: + video: Input video frames as list of tensors (THWC format, [0, 255] range) + segment_prompt: Optional text prompt override for this call + + Returns: + Output frames as tensor in THWC format with values in [0, 1] range. + Format depends on output_mode configuration. + """ + video = kwargs.get("video") + if video is None: + raise ValueError("Input video cannot be None for SAM3Pipeline") + + # Allow runtime prompt override + prompt = kwargs.get("segment_prompt", self._current_prompt) + if prompt != self._current_prompt: + self.update_prompt(prompt) + + # Normalize all frames + frames = [] + target_shape = None + for frame in video: + frame_tensor, target_shape = self._normalize_frame(frame, target_shape) + frames.append(frame_tensor) + + num_frames = len(frames) + output_frames = [] + + # Process each frame + for i in range(num_frames): + frame = frames[i] + + try: + # Segment the frame using image mode + # (Video mode with tracking can be added later for improved temporal consistency) + masks, boxes, scores = self._segment_frame_image_mode(frame, prompt) + + # Apply output mode (mask, overlay, or cutout) + output = self._apply_output_mode(frame, masks, scores) + output_frames.append(output) + + except Exception as e: + logger.error(f"SAM3 segmentation failed on frame {i}: {e}") + # Fallback: return original frame or empty mask + if self._output_mode == "mask": + h, w = frame.shape[:2] + output_frames.append(torch.zeros(h, w, 3, device=self.device)) + else: + output_frames.append(frame.to(self.device)) + + self._frame_index += 1 + + # Stack output frames: [T, H, W, C] + output_tensor = torch.stack(output_frames, dim=0) + + return output_tensor diff --git a/src/scope/core/pipelines/sam3/schema.py b/src/scope/core/pipelines/sam3/schema.py new file mode 100644 index 000000000..f9ad87bc4 --- /dev/null +++ b/src/scope/core/pipelines/sam3/schema.py @@ -0,0 +1,97 @@ +"""Configuration schema for SAM3 pipeline.""" + +from typing import Literal + +from pydantic import Field + +from ..artifacts import HuggingfaceRepoArtifact +from ..base_schema import BasePipelineConfig, ModeDefaults + + +class SAM3Config(BasePipelineConfig): + """Configuration for SAM3 (Segment Anything Model 3) pipeline. + + This pipeline provides real-time object masking and tracking using Meta's + SAM3 model. It supports text prompts for open-vocabulary segmentation, + allowing you to segment and track objects by describing them in natural + language (e.g., "person", "yellow school bus", "dog"). + + The pipeline outputs segmentation masks that can be used for: + - Object isolation and masking + - Video tracking + - Compositing and effects + """ + + pipeline_id = "sam3" + pipeline_name = "SAM3 Segmentation" + pipeline_description = ( + "Real-time object masking and tracking using Meta's Segment Anything Model 3. " + "Supports text prompts for open-vocabulary segmentation and video tracking." + ) + docs_url = "https://github.com/facebookresearch/sam3" + + artifacts = [ + HuggingfaceRepoArtifact( + repo_id="facebook/sam3", + files=[ + "sam3.pt", + "config.json", + "processor_config.json", + "tokenizer.json", + "tokenizer_config.json", + "vocab.json", + "merges.txt", + "special_tokens_map.json", + ], + ), + ] + + supports_prompts = False + modified = True + estimated_vram_gb = 8.0 + + modes = {"video": ModeDefaults(default=True)} + + # Segmentation prompt - what to segment/track + segment_prompt: str = Field( + default="person", + description="Text description of what to segment (e.g., 'person', 'dog', 'car')", + ) + + # Output visualization mode + output_mode: Literal["mask", "overlay", "cutout"] = Field( + default="mask", + description=( + "Output format: 'mask' returns binary mask, " + "'overlay' shows mask on original frame, " + "'cutout' shows only the segmented object" + ), + ) + + # Mask color for overlay mode (RGB) + mask_color: tuple[int, int, int] = Field( + default=(0, 255, 0), + description="RGB color for mask overlay visualization", + ) + + # Mask opacity for overlay mode + mask_opacity: float = Field( + default=0.5, + ge=0.0, + le=1.0, + description="Opacity of mask overlay (0.0 to 1.0)", + ) + + # Confidence threshold for detections + confidence_threshold: float = Field( + default=0.5, + ge=0.0, + le=1.0, + description="Minimum confidence score to include a detection", + ) + + # Whether to track objects across frames + enable_tracking: bool = Field( + default=True, + description="Enable object tracking across video frames", + ) diff --git a/src/scope/server/pipeline_manager.py b/src/scope/server/pipeline_manager.py index 31ade1346..02ce2eaaa 100644 --- a/src/scope/server/pipeline_manager.py +++ b/src/scope/server/pipeline_manager.py @@ -527,6 +527,7 @@ def _load_pipeline_implementation( "video-depth-anything", "controller-viz", "rife", + "optical-flow", } if pipeline_class is not None and pipeline_id not in BUILTIN_PIPELINES: @@ -949,6 +950,33 @@ def _load_pipeline_implementation( ) logger.info("RIFE pipeline initialized") return pipeline + + elif pipeline_id == "optical-flow": + from scope.core.pipelines import OpticalFlowPipeline + from scope.core.pipelines.optical_flow.schema import OpticalFlowConfig + + # Create config with schema defaults, overridden by load_params + params = load_params or {} + config = OmegaConf.create( + { + "model_size": params.get( + "model_size", + OpticalFlowConfig.model_fields["model_size"].default, + ), + "use_tensorrt": params.get( + "use_tensorrt", + OpticalFlowConfig.model_fields["use_tensorrt"].default, + ), + } + ) + + pipeline = OpticalFlowPipeline( + config, + device=get_device(), + ) + logger.info("OpticalFlow pipeline initialized") + return pipeline + else: raise ValueError(f"Invalid pipeline ID: {pipeline_id}")