From 8b40fe017910c19a0c85d911f5be72dfdbccc2f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Mon, 16 Feb 2026 11:42:30 +0100 Subject: [PATCH 01/11] DAG PoC --- src/scope/core/pipelines/base_schema.py | 7 + src/scope/core/pipelines/longlive/schema.py | 3 + src/scope/server/dag_executor.py | 191 +++++++++++++++ src/scope/server/dag_schema.py | 110 +++++++++ src/scope/server/frame_processor.py | 113 +++++---- src/scope/server/pipeline_processor.py | 254 +++++++++++--------- tests/test_dag.py | 113 +++++++++ 7 files changed, 625 insertions(+), 166 deletions(-) create mode 100644 src/scope/server/dag_executor.py create mode 100644 src/scope/server/dag_schema.py create mode 100644 tests/test_dag.py diff --git a/src/scope/core/pipelines/base_schema.py b/src/scope/core/pipelines/base_schema.py index 714e988d0..b5fdf895f 100644 --- a/src/scope/core/pipelines/base_schema.py +++ b/src/scope/core/pipelines/base_schema.py @@ -245,6 +245,13 @@ class BasePipelineConfig(BaseModel): # to appear in the preprocessor dropdown. usage: ClassVar[list[UsageType]] = [] + # DAG port declaration: named inputs/outputs for graph wiring. + # stream_inputs = port names this pipeline reads from (e.g. "video", "vace_input_frames"). + # stream_outputs = port names this pipeline writes to (e.g. "video", "vace_input_masks"). + # Used by the DAG executor to connect queues and parameter forwarding. + stream_inputs: ClassVar[list[str]] = ["video"] + stream_outputs: ClassVar[list[str]] = ["video"] + # Mode configuration - keys are mode names, values are ModeDefaults with field overrides # Use default=True to mark the default mode. Only include fields that differ from base. modes: ClassVar[dict[str, ModeDefaults]] = {"text": ModeDefaults(default=True)} diff --git a/src/scope/core/pipelines/longlive/schema.py b/src/scope/core/pipelines/longlive/schema.py index c2598d2a9..53215f0a8 100644 --- a/src/scope/core/pipelines/longlive/schema.py +++ b/src/scope/core/pipelines/longlive/schema.py @@ -17,6 +17,9 @@ class LongLiveConfig(BasePipelineConfig): pipeline_id = "longlive" pipeline_name = "LongLive" + # DAG: accepts main video and optional VACE inputs from upstream (e.g. YOLO plugin) + stream_inputs = ["video", "vace_input_frames", "vace_input_masks"] + stream_outputs = ["video"] pipeline_description = ( "A streaming pipeline and autoregressive video diffusion model from Nvidia, MIT, HKUST, HKU and THU. " "The model is trained using Self-Forcing on Wan2.1 1.3b with modifications to support smoother prompt " diff --git a/src/scope/server/dag_executor.py b/src/scope/server/dag_executor.py new file mode 100644 index 000000000..6770b7d9f --- /dev/null +++ b/src/scope/server/dag_executor.py @@ -0,0 +1,191 @@ +"""DAG executor: builds and runs pipeline graphs from DagConfig. + +Wires source queues (for put()), pipeline processors (one per pipeline node), +and identifies the sink for get(). All frame ports (video, vace_input_frames, +vace_input_masks) use stream edges (queues); no separate parameter path. +""" + +from __future__ import annotations + +import logging +import queue +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any + +from .dag_schema import DagConfig, DagEdge, DagNode +from .pipeline_processor import PipelineProcessor + +if TYPE_CHECKING: + from .pipeline_manager import PipelineManager + +logger = logging.getLogger(__name__) + +# Default queue sizes (match pipeline_processor) +DEFAULT_INPUT_QUEUE_MAXSIZE = 30 +DEFAULT_OUTPUT_QUEUE_MAXSIZE = 8 + + +@dataclass +class DagRun: + """Result of building a DAG: queues and processors to run.""" + + # When put(frame) is called, put to each of these queues (source fan-out) + source_queues: list[queue.Queue] + # The processor whose output_queue we read from for get() + sink_processor: PipelineProcessor | None + # All pipeline processors (for start/stop/update_parameters) + processors: list[PipelineProcessor] + # Pipeline IDs in graph order (for logging/events) + pipeline_ids: list[str] + # Node id of the sink (for clarity) + sink_node_id: str | None + + +def build_dag( + dag: DagConfig, + pipeline_manager: PipelineManager, + initial_parameters: dict[str, Any], + session_id: str | None = None, + user_id: str | None = None, + connection_id: str | None = None, + connection_info: dict | None = None, +) -> DagRun: + """Build executable DAG: create queues and processors, wire edges. + + Args: + dag: Parsed DAG config (nodes + edges). + pipeline_manager: Manager to resolve pipeline_id -> instance. + initial_parameters: Parameters passed to all pipelines. + session_id, user_id, connection_id, connection_info: For processors. + + Returns: + DagRun with source_queues, sink_processor, processors, pipeline_ids. + """ + # 1) Create one queue per edge (all edges are stream; frame-by-frame) + stream_queues: dict[tuple[str, str], queue.Queue] = {} + for e in dag.edges: + if e.kind == "stream": + stream_queues[(e.to_node, e.to_port)] = queue.Queue( + maxsize=DEFAULT_INPUT_QUEUE_MAXSIZE + ) + + # 2) Source queues: all queues that receive from a source node + source_queues: list[queue.Queue] = [] + for node_id in dag.get_source_node_ids(): + for e in dag.stream_edges_from(node_id): + q = stream_queues.get((e.to_node, e.to_port)) + if q is not None: + source_queues.append(q) + + # 3) Create a processor per pipeline node and wire input_queues per port + node_processors: dict[str, PipelineProcessor] = {} + pipeline_ids: list[str] = [] + + for node in dag.nodes: + if node.type != "pipeline" or node.pipeline_id is None: + continue + pipeline = pipeline_manager.get_pipeline_by_id(node.pipeline_id) + processor = PipelineProcessor( + pipeline=pipeline, + pipeline_id=node.id, + initial_parameters=initial_parameters.copy(), + session_id=session_id, + user_id=user_id, + connection_id=connection_id, + connection_info=connection_info, + ) + node_processors[node.id] = processor + pipeline_ids.append(node.pipeline_id) + + for e in dag.edges_to(node.id): + if e.kind != "stream": + continue + q = stream_queues.get((node.id, e.to_port)) + if q is not None: + processor.input_queues[e.to_port] = q + with processor.input_queue_lock: + processor.input_queue = processor.input_queues.get("video") + + # 4) Set each producer's output_queues per port (replace with wired queues) + for node in dag.nodes: + if node.type != "pipeline" or node.id not in node_processors: + continue + proc = node_processors[node.id] + out_by_port: dict[str, list[queue.Queue]] = {} + for e in dag.edges_from(node.id): + if e.kind != "stream": + continue + q = stream_queues.get((e.to_node, e.to_port)) + if q is not None and q not in out_by_port.get(e.from_port, []): + out_by_port.setdefault(e.from_port, []).append(q) + for port, qlist in out_by_port.items(): + proc.output_queues[port] = qlist + + # 4) Identify sink: node that has an edge to "output" (or type sink) + sink_node_id: str | None = None + for e in dag.edges: + if e.to_node == "output" and e.kind == "stream": + sink_node_id = e.from_node + break + if sink_node_id is None: + # No explicit output node: treat last pipeline node as sink (linear) + pipeline_node_ids = dag.get_pipeline_node_ids() + if pipeline_node_ids: + sink_node_id = pipeline_node_ids[-1] + + sink_processor = node_processors.get(sink_node_id) if sink_node_id else None + + return DagRun( + source_queues=source_queues, + sink_processor=sink_processor, + processors=list(node_processors.values()), + pipeline_ids=pipeline_ids, + sink_node_id=sink_node_id, + ) + + +def linear_dag_from_pipeline_ids(pipeline_ids: list[str]) -> DagConfig: + """Build a linear DAG from a list of pipeline IDs (backward compatibility). + + Produces: source -> p0 -> p1 -> ... -> sink. + """ + nodes = [ + DagNode(id="input", type="source"), + *[DagNode(id=pid, type="pipeline", pipeline_id=pid) for pid in pipeline_ids], + DagNode(id="output", type="sink"), + ] + edges: list[DagEdge] = [] + prev = "input" + for pid in pipeline_ids: + edges.append( + DagEdge( + from_node=prev, + from_port="video", + to_node=pid, + to_port="video", + kind="stream", + ) + ) + prev = pid + edges.append( + DagEdge( + from_node=prev, + from_port="video", + to_node="output", + to_port="video", + kind="stream", + ) + ) + # Vace streams: preprocessor outputs (vace_*) to next in chain (frame-by-frame queues) + for i in range(len(pipeline_ids) - 1): + for port in ("vace_input_frames", "vace_input_masks"): + edges.append( + DagEdge( + from_node=pipeline_ids[i], + from_port=port, + to_node=pipeline_ids[i + 1], + to_port=port, + kind="stream", + ) + ) + return DagConfig(nodes=nodes, edges=edges) diff --git a/src/scope/server/dag_schema.py b/src/scope/server/dag_schema.py new file mode 100644 index 000000000..327a2ae11 --- /dev/null +++ b/src/scope/server/dag_schema.py @@ -0,0 +1,110 @@ +"""DAG (Directed Acyclic Graph) schema for pipeline execution. + +Defines a JSON-friendly format to describe: +- Nodes: source (input), pipeline instances, sink (output) +- Edges: connections between nodes via named ports + +All frame ports (video, vace_input_frames, vace_input_masks) use stream edges +(frame-by-frame queues). The optional "parameter" kind is for future event-like +data only. + +Example (YOLO plugin + Longlive with shared input video): + + { + "nodes": [ + {"id": "input", "type": "source"}, + {"id": "yolo_plugin", "type": "pipeline", "pipeline_id": "yolo_plugin"}, + {"id": "longlive", "type": "pipeline", "pipeline_id": "longlive"}, + {"id": "output", "type": "sink"} + ], + "edges": [ + {"from": "input", "from_port": "video", "to_node": "yolo_plugin", "to_port": "video", "kind": "stream"}, + {"from": "input", "from_port": "video", "to_node": "longlive", "to_port": "video", "kind": "stream"}, + {"from": "yolo_plugin", "from_port": "vace_input_frames", "to_node": "longlive", "to_port": "vace_input_frames", "kind": "stream"}, + {"from": "yolo_plugin", "from_port": "vace_input_masks", "to_node": "longlive", "to_port": "vace_input_masks", "kind": "stream"}, + {"from": "longlive", "from_port": "video", "to_node": "output", "to_port": "video", "kind": "stream"} + ] + } +""" + +from __future__ import annotations + +from typing import Literal + +from pydantic import BaseModel, Field + + +class DagNode(BaseModel): + """A node in the pipeline DAG.""" + + id: str = Field( + ..., + description="Unique node id (e.g. 'input', 'yolo_plugin', 'longlive', 'output')", + ) + type: Literal["source", "pipeline", "sink"] = Field( + ..., + description="source = external input, pipeline = pipeline instance, sink = output", + ) + pipeline_id: str | None = Field( + default=None, + description="Pipeline ID (registry key) when type is 'pipeline'", + ) + + +class DagEdge(BaseModel): + """An edge connecting an output port to an input port.""" + + from_node: str = Field(..., alias="from", description="Source node id") + from_port: str = Field( + ..., description="Source port (e.g. 'video', 'vace_input_frames')" + ) + to_node: str = Field(..., description="Target node id") + to_port: str = Field(..., description="Target port name") + kind: Literal["stream", "parameter"] = Field( + default="stream", + description="stream = queue (frame-by-frame), parameter = chunk-level pass-through", + ) + + model_config = {"populate_by_name": True} + + +class DagConfig(BaseModel): + """Root DAG configuration (graph definition).""" + + nodes: list[DagNode] = Field(..., description="DAG nodes") + edges: list[DagEdge] = Field(..., description="Connections between nodes") + + def get_pipeline_node_ids(self) -> list[str]: + """Return node ids that are pipeline nodes, in definition order.""" + return [n.id for n in self.nodes if n.type == "pipeline"] + + def get_source_node_ids(self) -> list[str]: + """Return node ids that are source nodes.""" + return [n.id for n in self.nodes if n.type == "source"] + + def get_sink_node_ids(self) -> list[str]: + """Return node ids that are sink nodes.""" + return [n.id for n in self.nodes if n.type == "sink"] + + def edges_from(self, node_id: str) -> list[DagEdge]: + """Return edges whose source is the given node.""" + return [e for e in self.edges if e.from_node == node_id] + + def edges_to(self, node_id: str) -> list[DagEdge]: + """Return edges whose target is the given node.""" + return [e for e in self.edges if e.to_node == node_id] + + def stream_edges_from(self, node_id: str) -> list[DagEdge]: + """Return stream edges whose source is the given node.""" + return [e for e in self.edges_from(node_id) if e.kind == "stream"] + + def parameter_edges_from(self, node_id: str) -> list[DagEdge]: + """Return parameter edges whose source is the given node.""" + return [e for e in self.edges_from(node_id) if e.kind == "parameter"] + + def node_by_id(self, node_id: str) -> DagNode | None: + """Return the node with the given id.""" + for n in self.nodes: + if n.id == node_id: + return n + return None diff --git a/src/scope/server/frame_processor.py b/src/scope/server/frame_processor.py index 9a0bfca13..c78f17e38 100644 --- a/src/scope/server/frame_processor.py +++ b/src/scope/server/frame_processor.py @@ -8,6 +8,8 @@ import torch from aiortc.mediastreams import VideoFrame +from .dag_executor import DagRun, build_dag, linear_dag_from_pipeline_ids +from .dag_schema import DagConfig from .kafka_publisher import publish_event from .pipeline_manager import PipelineManager from .pipeline_processor import PipelineProcessor @@ -101,9 +103,11 @@ def __init__( # Input mode: video waits for frames, text generates immediately self._video_mode = (initial_parameters or {}).get("input_mode") == "video" - # Pipeline chaining support + # Pipeline chaining / DAG support self.pipeline_processors: list[PipelineProcessor] = [] self.pipeline_ids: list[str] = [] + # DAG run (set when using DAG executor; enables shared input and multi-port) + self._dag_run: DagRun | None = None # Frame counting for debug logging self._frames_in = 0 @@ -235,8 +239,9 @@ def stop(self, error_message: str = None): for processor in self.pipeline_processors: processor.stop() - # Clear pipeline processors + # Clear pipeline processors and DAG run self.pipeline_processors.clear() + self._dag_run = None # Clean up output sink self.output_sink_enabled = False @@ -365,18 +370,13 @@ def put(self, frame: VideoFrame) -> bool: return False return False - # Local mode: put into first processor's input queue + # Local mode: put into source queue(s) (DAG) or first processor's input queue (chain) if self.pipeline_processors: - first_processor = self.pipeline_processors[0] - frame_array = frame.to_ndarray(format="rgb24") if torch.cuda.is_available(): shape = frame_array.shape pinned_buffer = self._get_or_create_pinned_buffer(shape) - # Note: We reuse pinned buffers for performance. This assumes the copy_() - # operation completes before the next frame arrives. - # In practice, copy_() is very fast (~microseconds) and frames arrive at 60 FPS max pinned_buffer.copy_(torch.as_tensor(frame_array, dtype=torch.uint8)) frame_tensor = pinned_buffer.cuda(non_blocking=True) else: @@ -384,13 +384,23 @@ def put(self, frame: VideoFrame) -> bool: frame_tensor = frame_tensor.unsqueeze(0) - # Put frame into first processor's input queue - try: - first_processor.input_queue.put_nowait(frame_tensor) - except queue.Full: - # Queue full, drop frame (non-blocking) - logger.debug("First processor input queue full, dropping frame") - return False + if self._dag_run and self._dag_run.source_queues: + # DAG: fan-out to all source queues (e.g. shared input -> yolo + longlive) + for i, q in enumerate(self._dag_run.source_queues): + try: + # Clone for 2nd+ queues so each consumer has its own tensor + tensor = frame_tensor if i == 0 else frame_tensor.clone() + q.put_nowait(tensor) + except queue.Full: + logger.debug("Source queue full, dropping frame") + return False + else: + first_processor = self.pipeline_processors[0] + try: + first_processor.input_queue.put_nowait(frame_tensor) + except queue.Full: + logger.debug("First processor input queue full, dropping frame") + return False return True @@ -409,16 +419,20 @@ def get(self) -> torch.Tensor | None: except queue.Empty: return None else: - # Local mode: get from pipeline processor + # Local mode: get from sink processor (DAG) or last in chain if not self.pipeline_processors: return None - last_processor = self.pipeline_processors[-1] - if not last_processor.output_queue: + out_proc = ( + self._dag_run.sink_processor + if self._dag_run and self._dag_run.sink_processor + else self.pipeline_processors[-1] + ) + if not out_proc.output_queue: return None try: - frame = last_processor.output_queue.get_nowait() + frame = out_proc.output_queue.get_nowait() # Frame is stored as [1, H, W, C], convert to [H, W, C] for output # Move to CPU here for WebRTC streaming (frames stay on GPU between pipeline processors) frame = frame.squeeze(0) @@ -496,9 +510,12 @@ def get_fps(self) -> float: if not self.pipeline_processors: return DEFAULT_FPS - # Get FPS from the last processor in the chain - last_processor = self.pipeline_processors[-1] - return last_processor.get_fps() + out_proc = ( + self._dag_run.sink_processor + if self._dag_run and self._dag_run.sink_processor + else self.pipeline_processors[-1] + ) + return out_proc.get_fps() def _log_frame_stats(self): """Log frame processing statistics and emit heartbeat event.""" @@ -905,43 +922,45 @@ def _input_source_receiver_loop(self): ) def _setup_pipeline_chain_sync(self): - """Create pipeline processor chain (synchronous). + """Create pipeline DAG or linear chain (synchronous). + Uses DAG executor: if parameters contain "dag", build from that config; + otherwise build a linear DAG from pipeline_ids (backward compatible). Assumes all pipelines are already loaded by the pipeline manager. """ - if not self.pipeline_ids: - logger.error("No pipeline IDs provided") + if not self.pipeline_ids and not self.parameters.get("dag"): + logger.error("No pipeline IDs or DAG config provided") return - # Create pipeline processors (each creates its own queues) - for pipeline_id in self.pipeline_ids: - # Get pipeline instance from manager - pipeline = self.pipeline_manager.get_pipeline_by_id(pipeline_id) - - # Create processor with its own queues - processor = PipelineProcessor( - pipeline=pipeline, - pipeline_id=pipeline_id, - initial_parameters=self.parameters.copy(), - session_id=self.session_id, - user_id=self.user_id, - connection_id=self.connection_id, - connection_info=self.connection_info, - ) + dag_config: DagConfig + dag_raw = self.parameters.get("dag") + if isinstance(dag_raw, dict): + dag_config = DagConfig.model_validate(dag_raw) + elif isinstance(dag_raw, str): + dag_config = DagConfig.model_validate_json(dag_raw) + else: + dag_config = linear_dag_from_pipeline_ids(self.pipeline_ids) - self.pipeline_processors.append(processor) + self._dag_run = build_dag( + dag=dag_config, + pipeline_manager=self.pipeline_manager, + initial_parameters=self.parameters.copy(), + session_id=self.session_id, + user_id=self.user_id, + connection_id=self.connection_id, + connection_info=self.connection_info, + ) - for i in range(1, len(self.pipeline_processors)): - prev_processor = self.pipeline_processors[i - 1] - curr_processor = self.pipeline_processors[i] - prev_processor.set_next_processor(curr_processor) + self.pipeline_processors = self._dag_run.processors + if self._dag_run.pipeline_ids: + self.pipeline_ids = self._dag_run.pipeline_ids - # Start all processors for processor in self.pipeline_processors: processor.start() logger.info( - f"Created pipeline chain with {len(self.pipeline_processors)} processors" + f"Created pipeline DAG with {len(self.pipeline_processors)} processor(s): " + f"{self.pipeline_ids}" ) def __enter__(self): diff --git a/src/scope/server/pipeline_processor.py b/src/scope/server/pipeline_processor.py index 14559f95f..f0e1c0818 100644 --- a/src/scope/server/pipeline_processor.py +++ b/src/scope/server/pipeline_processor.py @@ -61,9 +61,15 @@ def __init__( self.connection_id = connection_id self.connection_info = connection_info - # Each processor creates its own queues - self.input_queue = queue.Queue(maxsize=30) - self.output_queue = queue.Queue(maxsize=8) + # Unified port-based queues: all frame streams (video, vace_input_frames, etc.) use queues + self.input_queues: dict[str, queue.Queue] = { + "video": queue.Queue(maxsize=30), + } + self.output_queues: dict[str, list[queue.Queue]] = { + "video": [queue.Queue(maxsize=8)], + } + # Primary queue refs for backward compat (chain mode, get_fps, resize) + self.input_queue = self.input_queues["video"] # Lock to protect input_queue assignment for thread-safe reference swapping self.input_queue_lock = threading.Lock() @@ -110,34 +116,27 @@ def __init__( self.throttler = PipelineThrottler() def _resize_output_queue(self, target_size: int): - """Resize the output queue to the target size, transferring existing frames. - - Args: - target_size: The desired maximum size for the output queue - """ - if self.output_queue is None: + """Resize the primary video output queue, transferring existing frames.""" + video_queues = self.output_queues.get("video") + if not video_queues: return - - if self.output_queue.maxsize < target_size: + primary = video_queues[0] + if primary.maxsize < target_size: logger.info( - f"Increasing output queue size to {target_size}, current size {self.output_queue.maxsize}" + f"Increasing output queue size to {target_size}, current size {primary.maxsize}" ) - - # Transfer frames from old queue to new queue - old_queue = self.output_queue - self.output_queue = queue.Queue(maxsize=target_size) - while not old_queue.empty(): + new_queue = queue.Queue(maxsize=target_size) + while not primary.empty(): try: - frame = old_queue.get_nowait() - self.output_queue.put_nowait(frame) + frame = primary.get_nowait() + new_queue.put_nowait(frame) except queue.Empty: break - - # Update next processor's input_queue to point to the new output_queue - # Use lock to ensure thread-safe reference swapping + self.output_queues["video"] = [new_queue] + video_queues[1:] if self.next_processor is not None: with self.next_processor.input_queue_lock: - self.next_processor.input_queue = self.output_queue + self.next_processor.input_queues["video"] = new_queue + self.next_processor.input_queue = new_queue def set_next_processor(self, next_processor: "PipelineProcessor"): """Set the next processor in the chain and update output queue size accordingly. @@ -161,7 +160,14 @@ def set_next_processor(self, next_processor: "PipelineProcessor"): # Update next processor's input_queue to point to this output_queue # Use lock to ensure thread-safe reference swapping with next_processor.input_queue_lock: - next_processor.input_queue = self.output_queue + next_processor.input_queues["video"] = self.output_queues["video"][0] + next_processor.input_queue = next_processor.input_queues["video"] + + @property + def output_queue(self) -> queue.Queue | None: + """Primary video output queue (for chain mode and sink get()).""" + queues = self.output_queues.get("video") + return queues[0] if queues else None def start(self): """Start the pipeline processor thread.""" @@ -199,12 +205,13 @@ def stop(self): except queue.Empty: break - if self.output_queue: - while not self.output_queue.empty(): - try: - self.output_queue.get_nowait() - except queue.Empty: - break + for queues in self.output_queues.values(): + for q in queues: + while not q.empty(): + try: + q.get_nowait() + except queue.Empty: + break logger.info(f"PipelineProcessor stopped for pipeline: {self.pipeline_id}") @@ -267,41 +274,11 @@ def prepare_chunk( self, input_queue_ref: queue.Queue, chunk_size: int ) -> list[torch.Tensor]: """ - Sample frames uniformly from the queue, convert them to tensors, and remove processed frames. - - This function implements uniform sampling across the entire queue to ensure - temporal coverage of input frames. It samples frames at evenly distributed - indices and removes all frames up to the last sampled frame to prevent - queue buildup. - - Note: - This function must be called with a queue reference obtained while holding - input_queue_lock. The caller is responsible for thread safety. - - Example: - With queue_size=8 and chunk_size=4: - - step = 8/4 = 2.0 - - indices = [0, 2, 4, 6] (uniformly distributed) - - Returns frames at positions 0, 2, 4, 6 - - Removes frames 0-6 from queue (7 frames total) - - Keeps frame 7 in queue - - Args: - input_queue_ref: Reference to the input queue (obtained while holding lock) - chunk_size: Number of frames to sample - - Returns: - List of tensor frames, each (1, H, W, C) for downstream preprocess_chunk + Sample frames uniformly from one queue (used when only video port is present). """ - - # Calculate uniform sampling step step = input_queue_ref.qsize() / chunk_size - # Generate indices for uniform sampling indices = [round(i * step) for i in range(chunk_size)] - # Extract VideoFrames at sampled indices video_frames = [] - - # Drop all frames up to and including the last sampled frame last_idx = indices[-1] for i in range(last_idx + 1): frame = input_queue_ref.get_nowait() @@ -309,6 +286,36 @@ def prepare_chunk( video_frames.append(frame) return video_frames + def prepare_multi_chunk( + self, + input_queues_ref: dict[str, queue.Queue], + primary_port: str, + chunk_size: int, + ) -> dict[str, list[torch.Tensor]]: + """ + Sample frames uniformly from the primary queue, then the same indices from other ports. + + Returns dict mapping port name to list of tensors (each 1,H,W,C). All ports + must have at least as many frames as we consume from the primary. + """ + primary = input_queues_ref.get(primary_port) + if primary is None or primary.qsize() < chunk_size: + return {} + step = primary.qsize() / chunk_size + indices = [round(i * step) for i in range(chunk_size)] + last_idx = indices[-1] + out: dict[str, list[torch.Tensor]] = {port: [] for port in input_queues_ref} + for port, q in input_queues_ref.items(): + if q.qsize() <= last_idx: + return {} + for port in input_queues_ref: + q = input_queues_ref[port] + for i in range(last_idx + 1): + frame = q.get_nowait() + if i in indices: + out[port].append(frame) + return out + def process_chunk(self): """Process a single chunk of frames.""" # Check if there are new parameters @@ -361,16 +368,16 @@ def process_chunk(self): reset_cache = self.parameters.pop("reset_cache", None) lora_scales = self.parameters.pop("lora_scales", None) - # Handle reset_cache: clear this processor's cache + # Handle reset_cache: clear this processor's output queues if reset_cache: logger.info(f"Clearing cache for pipeline processor: {self.pipeline_id}") - # Clear output queue - if self.output_queue: - while not self.output_queue.empty(): - try: - self.output_queue.get_nowait() - except queue.Empty: - break + for queues in self.output_queues.values(): + for q in queues: + while not q.empty(): + try: + q.get_nowait() + except queue.Empty: + break requirements = None if hasattr(self.pipeline, "prepare"): @@ -380,25 +387,26 @@ def process_chunk(self): prepare_params["video"] = True requirements = self.pipeline.prepare(**prepare_params) - video_input = None + chunks: dict[str, list[torch.Tensor]] = {} input_frame_count = 0 if requirements is not None: current_chunk_size = requirements.input_size - - # Capture a local reference to input_queue while holding the lock - # This ensures thread-safe access even if input_queue is reassigned with self.input_queue_lock: - input_queue_ref = self.input_queue - - # Check if queue has enough frames before consuming them - if input_queue_ref.qsize() < current_chunk_size: - # Not enough frames in queue, sleep briefly and try again next iteration + input_queues_ref = dict(self.input_queues) + primary = input_queues_ref.get("video") + if primary is None or primary.qsize() < current_chunk_size: self.shutdown_event.wait(SLEEP_TIME) return - - # Use prepare_chunk to uniformly sample frames from the queue - video_input = self.prepare_chunk(input_queue_ref, current_chunk_size) - input_frame_count = len(video_input) if video_input else 0 + if len(input_queues_ref) == 1: + chunks["video"] = self.prepare_chunk(primary, current_chunk_size) + else: + chunks = self.prepare_multi_chunk( + input_queues_ref, "video", current_chunk_size + ) + if not chunks: + self.shutdown_event.wait(SLEEP_TIME) + return + input_frame_count = len(chunks.get("video") or []) try: # Pass parameters (excluding prepare-only parameters) @@ -420,33 +428,30 @@ def process_chunk(self): # Reset mouse accumulator, keep key state self.parameters["ctrl_input"]["mouse"] = [0.0, 0.0] - # Route video input based on VACE status - # Don't overwrite if preprocessor already provided vace_input_frames - if video_input is not None and "vace_input_frames" not in call_params: - if ( - self._pipeline_supports_vace - and self.vace_enabled - and self.vace_use_input_video - ): - call_params["vace_input_frames"] = video_input - else: - call_params["video"] = video_input + # Fill call_params from stream chunks + if chunks: + if "vace_input_frames" in chunks and "vace_input_masks" in chunks: + call_params["vace_input_frames"] = chunks["vace_input_frames"] + call_params["vace_input_masks"] = chunks["vace_input_masks"] + if "video" in chunks: + if ( + self._pipeline_supports_vace + and self.vace_enabled + and self.vace_use_input_video + and "vace_input_frames" not in call_params + ): + call_params["vace_input_frames"] = chunks["video"] + else: + call_params["video"] = chunks["video"] processing_start = time.time() output_dict = self.pipeline(**call_params) processing_time = time.time() - processing_start - # Extract video from the returned dictionary output = output_dict.get("video") if output is None: return - # Forward extra params to downstream pipeline (dual-output pattern) - # Preprocessors return {"video": frames, "vace_input_frames": ..., "vace_input_masks": ...} - extra_params = {k: v for k, v in output_dict.items() if k != "video"} - if extra_params and self.next_processor is not None: - self.next_processor.update_parameters(extra_params) - # Clear one-shot parameters after use to prevent sending them on subsequent chunks # These parameters should only be sent when explicitly provided in parameter updates one_shot_params = [ @@ -489,29 +494,40 @@ def process_chunk(self): .detach() ) - # Resize output queue to meet target max size + # Resize primary video output queue to meet target max size target_output_queue_max_size = num_frames * OUTPUT_QUEUE_MAX_SIZE_FACTOR self._resize_output_queue(target_output_queue_max_size) - # Put frames in output queue - # For intermediate pipelines, output goes to next pipeline's input - # For last pipeline, output goes to frame_processor's output_queue - # Output frames are [H, W, C], convert to [1, H, W, C] for consistency - for frame in output: - frame = frame.unsqueeze(0) - # Track when a frame is ready (production rate) - self._track_output_frame() - try: - self.output_queue.put_nowait(frame) - except queue.Full: - logger.info( - f"Output queue full for {self.pipeline_id}, dropping processed frame" - ) + # Put each output port's frames to its queues (all frame ports are streamed) + for port, value in output_dict.items(): + if value is None or not isinstance(value, torch.Tensor): continue - - # Apply throttling if this pipeline is producing faster than next can consume - # Only throttle if: (1) has video input, (2) has next processor - if video_input is not None and self.next_processor is not None: + queues = self.output_queues.get(port) + if not queues: + continue + if value.dtype != torch.uint8: + value = ( + (value * 255.0) + .clamp(0, 255) + .to(dtype=torch.uint8) + .contiguous() + .detach() + ) + frames = [value[i].unsqueeze(0) for i in range(value.shape[0])] + for frame in frames: + if port == "video": + self._track_output_frame() + for q in queues: + try: + q.put_nowait(frame if q is queues[0] else frame.clone()) + except queue.Full: + if port == "video": + logger.info( + f"Output queue full for {self.pipeline_id}, dropping frame" + ) + break + + if chunks and self.next_processor is not None: self.throttler.throttle() except Exception as e: diff --git a/tests/test_dag.py b/tests/test_dag.py new file mode 100644 index 000000000..5a60f378b --- /dev/null +++ b/tests/test_dag.py @@ -0,0 +1,113 @@ +"""Tests for DAG pipeline execution (dag_schema, dag_executor, linear DAG).""" + +import queue +from unittest.mock import MagicMock + +import pytest + +from scope.server.dag_executor import build_dag, linear_dag_from_pipeline_ids +from scope.server.dag_schema import DagConfig + + +class TestLinearDagFromPipelineIds: + """Tests for linear_dag_from_pipeline_ids.""" + + def test_single_pipeline(self): + dag = linear_dag_from_pipeline_ids(["longlive"]) + assert [n.id for n in dag.nodes] == ["input", "longlive", "output"] + edges = [(e.from_node, e.to_node, e.from_port, e.kind) for e in dag.edges] + assert ("input", "longlive", "video", "stream") in edges + assert ("longlive", "output", "video", "stream") in edges + assert all(e.kind == "stream" for e in dag.edges) + + def test_two_pipelines_includes_vace_stream_edges(self): + dag = linear_dag_from_pipeline_ids(["passthrough", "longlive"]) + assert len(dag.get_pipeline_node_ids()) == 2 + vace_edges = [ + e + for e in dag.edges + if e.from_port in ("vace_input_frames", "vace_input_masks") + ] + assert len(vace_edges) == 2 + assert all(e.kind == "stream" for e in vace_edges) + + def test_explicit_dag_config_roundtrip(self): + raw = { + "nodes": [ + {"id": "input", "type": "source"}, + {"id": "p1", "type": "pipeline", "pipeline_id": "passthrough"}, + {"id": "output", "type": "sink"}, + ], + "edges": [ + { + "from": "input", + "from_port": "video", + "to_node": "p1", + "to_port": "video", + "kind": "stream", + }, + { + "from": "p1", + "from_port": "video", + "to_node": "output", + "to_port": "video", + "kind": "stream", + }, + ], + } + config = DagConfig.model_validate(raw) + assert config.node_by_id("p1").pipeline_id == "passthrough" + assert len(config.edges) == 2 + + +class TestBuildDag: + """Tests for build_dag with a mock pipeline manager.""" + + @pytest.fixture + def mock_pipeline(self): + """Minimal pipeline mock: prepare() and __call__ return video.""" + p = MagicMock() + p.prepare.return_value = MagicMock(input_size=4) + return p + + @pytest.fixture + def mock_pipeline_manager(self, mock_pipeline): + """Pipeline manager that returns the mock pipeline for any known id.""" + mgr = MagicMock() + mgr.get_pipeline_by_id.side_effect = lambda pid: mock_pipeline + return mgr + + def test_build_linear_dag_returns_dag_run(self, mock_pipeline_manager): + dag = linear_dag_from_pipeline_ids(["passthrough"]) + run = build_dag( + dag=dag, + pipeline_manager=mock_pipeline_manager, + initial_parameters={"pipeline_ids": ["passthrough"], "input_mode": "video"}, + ) + assert run.sink_processor is not None + assert run.pipeline_ids == ["passthrough"] + assert len(run.processors) == 1 + assert len(run.source_queues) == 1 + + def test_build_dag_wires_source_queues(self, mock_pipeline_manager): + dag = linear_dag_from_pipeline_ids(["passthrough"]) + run = build_dag( + dag=dag, + pipeline_manager=mock_pipeline_manager, + initial_parameters={}, + ) + # One source node (input) -> one queue to first pipeline + assert len(run.source_queues) == 1 + q = run.source_queues[0] + assert isinstance(q, queue.Queue) + + def test_build_dag_sink_has_video_output_queue(self, mock_pipeline_manager): + dag = linear_dag_from_pipeline_ids(["passthrough"]) + run = build_dag( + dag=dag, + pipeline_manager=mock_pipeline_manager, + initial_parameters={}, + ) + assert run.sink_processor is not None + assert run.sink_processor.output_queue is not None + assert "video" in run.sink_processor.output_queues From 094317736002b374684feb9788145c20bee6fbfc Mon Sep 17 00:00:00 2001 From: Rafal Leszko Date: Mon, 16 Feb 2026 14:08:49 +0000 Subject: [PATCH 02/11] Improve DAG and add sample input.json --- input.json | 13 +++++ src/scope/server/app.py | 23 +++++++-- src/scope/server/frame_processor.py | 75 +++++++++++++++++++++++------ 3 files changed, 91 insertions(+), 20 deletions(-) create mode 100644 input.json diff --git a/input.json b/input.json new file mode 100644 index 000000000..b61d78281 --- /dev/null +++ b/input.json @@ -0,0 +1,13 @@ +{ + "nodes": [ + {"id": "input", "type": "source"}, + {"id": "passthrough", "type": "pipeline", "pipeline_id": "passthrough"}, + {"id": "rife", "type": "pipeline", "pipeline_id": "rife"}, + {"id": "output", "type": "sink"} + ], + "edges": [ + {"from": "input", "from_port": "video", "to_node": "passthrough", "to_port": "video", "kind": "stream"}, + {"from": "passthrough", "from_port": "video", "to_node": "rife", "to_port": "video", "kind": "stream"}, + {"from": "rife", "from_port": "video", "to_node": "output", "to_port": "video", "kind": "stream"} + ] +} diff --git a/src/scope/server/app.py b/src/scope/server/app.py index ac13a219d..330e60fb3 100644 --- a/src/scope/server/app.py +++ b/src/scope/server/app.py @@ -496,17 +496,30 @@ async def load_pipeline( cloud-hosted scope backend. """ try: - # Get pipeline IDs to load - pipeline_ids = request.pipeline_ids + # Override pipeline IDs from input.json DAG if available + from .frame_processor import get_pipeline_ids_from_input_json + + dag_pipeline_ids = get_pipeline_ids_from_input_json() + if dag_pipeline_ids is not None: + logger.info( + f"Overriding pipeline_ids from input.json: {dag_pipeline_ids} " + f"(was: {request.pipeline_ids})" + ) + pipeline_ids = dag_pipeline_ids + # Discard UI load_params — they belong to whatever pipeline the UI + # selected, not to the pipelines declared in input.json. + load_params_dict = None + else: + pipeline_ids = request.pipeline_ids + # load_params is already a dict (or None) + load_params_dict = request.load_params + if not pipeline_ids: raise HTTPException( status_code=400, detail="pipeline_ids must be provided and cannot be empty", ) - # load_params is already a dict (or None) - load_params_dict = request.load_params - # Local mode: start loading in background without blocking asyncio.create_task( pipeline_manager.load_pipelines( diff --git a/src/scope/server/frame_processor.py b/src/scope/server/frame_processor.py index c78f17e38..f80a71bc2 100644 --- a/src/scope/server/frame_processor.py +++ b/src/scope/server/frame_processor.py @@ -1,8 +1,10 @@ +import json import logging import queue import threading import time import uuid +from pathlib import Path from typing import TYPE_CHECKING, Any import torch @@ -32,6 +34,37 @@ # Heartbeat interval for stream stats logging and Kafka events HEARTBEAT_INTERVAL_SECONDS = 10.0 +# Path to input.json DAG configuration file (project root) +_INPUT_JSON_PATH = Path(__file__).resolve().parents[3] / "input.json" + + +def _load_dag_from_input_json() -> DagConfig | None: + """Load DAG config from input.json if it exists. Returns None otherwise.""" + if not _INPUT_JSON_PATH.exists(): + return None + try: + raw = json.loads(_INPUT_JSON_PATH.read_text()) + dag = DagConfig.model_validate(raw) + logger.info( + f"Loaded DAG from {_INPUT_JSON_PATH}: {dag.get_pipeline_node_ids()}" + ) + return dag + except Exception as e: + logger.error(f"Failed to load DAG from {_INPUT_JSON_PATH}: {e}") + return None + + +def get_pipeline_ids_from_input_json() -> list[str] | None: + """Return pipeline IDs from input.json, or None if not available.""" + dag = _load_dag_from_input_json() + if dag is None: + return None + return [ + n.pipeline_id + for n in dag.nodes + if n.type == "pipeline" and n.pipeline_id is not None + ] + class FrameProcessor: """Processes video frames through pipelines or cloud relay. @@ -117,10 +150,17 @@ def __init__( self._playback_ready_emitted = False self._stream_start_time: float | None = None - # Store pipeline_ids from initial_parameters if provided - pipeline_ids = (initial_parameters or {}).get("pipeline_ids") - if pipeline_ids is not None: - self.pipeline_ids = pipeline_ids + # Override pipeline_ids from input.json if available, else use UI params + dag_pipeline_ids = get_pipeline_ids_from_input_json() + if dag_pipeline_ids is not None: + self.pipeline_ids = dag_pipeline_ids + logger.info( + f"[FRAME-PROCESSOR] Using pipeline_ids from input.json: {dag_pipeline_ids}" + ) + else: + pipeline_ids = (initial_parameters or {}).get("pipeline_ids") + if pipeline_ids is not None: + self.pipeline_ids = pipeline_ids def start(self): if self.running: @@ -924,22 +964,27 @@ def _input_source_receiver_loop(self): def _setup_pipeline_chain_sync(self): """Create pipeline DAG or linear chain (synchronous). - Uses DAG executor: if parameters contain "dag", build from that config; - otherwise build a linear DAG from pipeline_ids (backward compatible). + Loads DAG from input.json if it exists, otherwise falls back to + parameters or linear pipeline chain. Assumes all pipelines are already loaded by the pipeline manager. """ - if not self.pipeline_ids and not self.parameters.get("dag"): + dag_config: DagConfig + + # Try loading DAG from input.json first + dag_config = _load_dag_from_input_json() + if dag_config is not None: + logger.info("[FRAME-PROCESSOR] Using DAG from input.json") + elif not self.pipeline_ids and not self.parameters.get("dag"): logger.error("No pipeline IDs or DAG config provided") return - - dag_config: DagConfig - dag_raw = self.parameters.get("dag") - if isinstance(dag_raw, dict): - dag_config = DagConfig.model_validate(dag_raw) - elif isinstance(dag_raw, str): - dag_config = DagConfig.model_validate_json(dag_raw) else: - dag_config = linear_dag_from_pipeline_ids(self.pipeline_ids) + dag_raw = self.parameters.get("dag") + if isinstance(dag_raw, dict): + dag_config = DagConfig.model_validate(dag_raw) + elif isinstance(dag_raw, str): + dag_config = DagConfig.model_validate_json(dag_raw) + else: + dag_config = linear_dag_from_pipeline_ids(self.pipeline_ids) self._dag_run = build_dag( dag=dag_config, From 0b67f69b8034d53e80e6eda24689160e450d8621 Mon Sep 17 00:00:00 2001 From: Rafal Leszko Date: Mon, 16 Feb 2026 15:23:39 +0000 Subject: [PATCH 03/11] Fix --- input-passthrough-rife.json | 13 +++++++++++++ input.json | 9 ++++++--- src/scope/server/pipeline_processor.py | 5 +++++ 3 files changed, 24 insertions(+), 3 deletions(-) create mode 100644 input-passthrough-rife.json diff --git a/input-passthrough-rife.json b/input-passthrough-rife.json new file mode 100644 index 000000000..b61d78281 --- /dev/null +++ b/input-passthrough-rife.json @@ -0,0 +1,13 @@ +{ + "nodes": [ + {"id": "input", "type": "source"}, + {"id": "passthrough", "type": "pipeline", "pipeline_id": "passthrough"}, + {"id": "rife", "type": "pipeline", "pipeline_id": "rife"}, + {"id": "output", "type": "sink"} + ], + "edges": [ + {"from": "input", "from_port": "video", "to_node": "passthrough", "to_port": "video", "kind": "stream"}, + {"from": "passthrough", "from_port": "video", "to_node": "rife", "to_port": "video", "kind": "stream"}, + {"from": "rife", "from_port": "video", "to_node": "output", "to_port": "video", "kind": "stream"} + ] +} diff --git a/input.json b/input.json index b61d78281..e09ed5e68 100644 --- a/input.json +++ b/input.json @@ -2,12 +2,15 @@ "nodes": [ {"id": "input", "type": "source"}, {"id": "passthrough", "type": "pipeline", "pipeline_id": "passthrough"}, - {"id": "rife", "type": "pipeline", "pipeline_id": "rife"}, + {"id": "depth", "type": "pipeline", "pipeline_id": "video-depth-anything"}, + {"id": "combine", "type": "pipeline", "pipeline_id": "combine_streams"}, {"id": "output", "type": "sink"} ], "edges": [ {"from": "input", "from_port": "video", "to_node": "passthrough", "to_port": "video", "kind": "stream"}, - {"from": "passthrough", "from_port": "video", "to_node": "rife", "to_port": "video", "kind": "stream"}, - {"from": "rife", "from_port": "video", "to_node": "output", "to_port": "video", "kind": "stream"} + {"from": "input", "from_port": "video", "to_node": "depth", "to_port": "video", "kind": "stream"}, + {"from": "passthrough", "from_port": "video", "to_node": "combine", "to_port": "video", "kind": "stream"}, + {"from": "depth", "from_port": "video", "to_node": "combine", "to_port": "video2", "kind": "stream"}, + {"from": "combine", "from_port": "video", "to_node": "output", "to_port": "video", "kind": "stream"} ] } diff --git a/src/scope/server/pipeline_processor.py b/src/scope/server/pipeline_processor.py index f0e1c0818..904bff747 100644 --- a/src/scope/server/pipeline_processor.py +++ b/src/scope/server/pipeline_processor.py @@ -443,6 +443,11 @@ def process_chunk(self): call_params["vace_input_frames"] = chunks["video"] else: call_params["video"] = chunks["video"] + # Pass any other stream ports (e.g. video2 for combine_streams) + for port, frame_list in chunks.items(): + if port in ("video", "vace_input_frames", "vace_input_masks"): + continue + call_params[port] = frame_list processing_start = time.time() output_dict = self.pipeline(**call_params) From 8a6b907fa491c404b42632dc75ce7f673c97c29d Mon Sep 17 00:00:00 2001 From: Rafal Leszko Date: Tue, 17 Feb 2026 08:45:09 +0000 Subject: [PATCH 04/11] Add DAG API and UI --- frontend/package-lock.json | 230 +++++++++++++++++ frontend/package.json | 1 + frontend/src/components/Header.tsx | 18 +- frontend/src/components/dag/DagEditor.tsx | 238 ++++++++++++++++++ .../src/components/dag/DagEditorDialog.tsx | 33 +++ frontend/src/components/dag/PipelineNode.tsx | 89 +++++++ frontend/src/components/dag/SinkNode.tsx | 20 ++ frontend/src/components/dag/SourceNode.tsx | 20 ++ frontend/src/lib/api.ts | 75 ++++++ frontend/src/lib/dagUtils.ts | 127 ++++++++++ src/scope/server/app.py | 106 +++++++- src/scope/server/dag_schema.py | 44 ++++ src/scope/server/dag_state.py | 39 +++ src/scope/server/frame_processor.py | 30 ++- 14 files changed, 1060 insertions(+), 10 deletions(-) create mode 100644 frontend/src/components/dag/DagEditor.tsx create mode 100644 frontend/src/components/dag/DagEditorDialog.tsx create mode 100644 frontend/src/components/dag/PipelineNode.tsx create mode 100644 frontend/src/components/dag/SinkNode.tsx create mode 100644 frontend/src/components/dag/SourceNode.tsx create mode 100644 frontend/src/lib/dagUtils.ts create mode 100644 src/scope/server/dag_state.py diff --git a/frontend/package-lock.json b/frontend/package-lock.json index a615e7fe1..f85bb074f 100644 --- a/frontend/package-lock.json +++ b/frontend/package-lock.json @@ -18,6 +18,7 @@ "@radix-ui/react-toggle": "^1.1.10", "@radix-ui/react-toggle-group": "^1.1.11", "@radix-ui/react-tooltip": "^1.2.8", + "@xyflow/react": "^12.10.0", "class-variance-authority": "^0.7.1", "clsx": "^2.1.1", "lucide-react": "^0.544.0", @@ -2658,6 +2659,55 @@ "@babel/types": "^7.28.2" } }, + "node_modules/@types/d3-color": { + "version": "3.1.3", + "resolved": "https://registry.npmjs.org/@types/d3-color/-/d3-color-3.1.3.tgz", + "integrity": "sha512-iO90scth9WAbmgv7ogoq57O9YpKmFBbmoEoCHDB2xMBY0+/KVrqAaCDyCE16dUspeOvIxFFRI+0sEtqDqy2b4A==", + "license": "MIT" + }, + "node_modules/@types/d3-drag": { + "version": "3.0.7", + "resolved": "https://registry.npmjs.org/@types/d3-drag/-/d3-drag-3.0.7.tgz", + "integrity": "sha512-HE3jVKlzU9AaMazNufooRJ5ZpWmLIoc90A37WU2JMmeq28w1FQqCZswHZ3xR+SuxYftzHq6WU6KJHvqxKzTxxQ==", + "license": "MIT", + "dependencies": { + "@types/d3-selection": "*" + } + }, + "node_modules/@types/d3-interpolate": { + "version": "3.0.4", + "resolved": "https://registry.npmjs.org/@types/d3-interpolate/-/d3-interpolate-3.0.4.tgz", + "integrity": "sha512-mgLPETlrpVV1YRJIglr4Ez47g7Yxjl1lj7YKsiMCb27VJH9W8NVM6Bb9d8kkpG/uAQS5AmbA48q2IAolKKo1MA==", + "license": "MIT", + "dependencies": { + "@types/d3-color": "*" + } + }, + "node_modules/@types/d3-selection": { + "version": "3.0.11", + "resolved": "https://registry.npmjs.org/@types/d3-selection/-/d3-selection-3.0.11.tgz", + "integrity": "sha512-bhAXu23DJWsrI45xafYpkQ4NtcKMwWnAC/vKrd2l+nxMFuvOT3XMYTIj2opv8vq8AO5Yh7Qac/nSeP/3zjTK0w==", + "license": "MIT" + }, + "node_modules/@types/d3-transition": { + "version": "3.0.9", + "resolved": "https://registry.npmjs.org/@types/d3-transition/-/d3-transition-3.0.9.tgz", + "integrity": "sha512-uZS5shfxzO3rGlu0cC3bjmMFKsXv+SmZZcgp0KD22ts4uGXp5EVYGzu/0YdwZeKmddhcAccYtREJKkPfXkZuCg==", + "license": "MIT", + "dependencies": { + "@types/d3-selection": "*" + } + }, + "node_modules/@types/d3-zoom": { + "version": "3.0.8", + "resolved": "https://registry.npmjs.org/@types/d3-zoom/-/d3-zoom-3.0.8.tgz", + "integrity": "sha512-iqMC4/YlFCSlO8+2Ii1GGGliCAY4XdeG748w5vQUbevlbDu0zSjH/+jojorQVBK/se0j6DUFNPBGSqD3YWYnDw==", + "license": "MIT", + "dependencies": { + "@types/d3-interpolate": "*", + "@types/d3-selection": "*" + } + }, "node_modules/@types/estree": { "version": "1.0.8", "resolved": "https://registry.npmjs.org/@types/estree/-/estree-1.0.8.tgz", @@ -2984,6 +3034,38 @@ "vite": "^4.2.0 || ^5.0.0 || ^6.0.0 || ^7.0.0" } }, + "node_modules/@xyflow/react": { + "version": "12.10.0", + "resolved": "https://registry.npmjs.org/@xyflow/react/-/react-12.10.0.tgz", + "integrity": "sha512-eOtz3whDMWrB4KWVatIBrKuxECHqip6PfA8fTpaS2RUGVpiEAe+nqDKsLqkViVWxDGreq0lWX71Xth/SPAzXiw==", + "license": "MIT", + "dependencies": { + "@xyflow/system": "0.0.74", + "classcat": "^5.0.3", + "zustand": "^4.4.0" + }, + "peerDependencies": { + "react": ">=17", + "react-dom": ">=17" + } + }, + "node_modules/@xyflow/system": { + "version": "0.0.74", + "resolved": "https://registry.npmjs.org/@xyflow/system/-/system-0.0.74.tgz", + "integrity": "sha512-7v7B/PkiVrkdZzSbL+inGAo6tkR/WQHHG0/jhSvLQToCsfa8YubOGmBYd1s08tpKpihdHDZFwzQZeR69QSBb4Q==", + "license": "MIT", + "dependencies": { + "@types/d3-drag": "^3.0.7", + "@types/d3-interpolate": "^3.0.4", + "@types/d3-selection": "^3.0.10", + "@types/d3-transition": "^3.0.8", + "@types/d3-zoom": "^3.0.8", + "d3-drag": "^3.0.0", + "d3-interpolate": "^3.0.1", + "d3-selection": "^3.0.0", + "d3-zoom": "^3.0.0" + } + }, "node_modules/acorn": { "version": "8.15.0", "resolved": "https://registry.npmjs.org/acorn/-/acorn-8.15.0.tgz", @@ -3242,6 +3324,12 @@ "url": "https://polar.sh/cva" } }, + "node_modules/classcat": { + "version": "5.0.5", + "resolved": "https://registry.npmjs.org/classcat/-/classcat-5.0.5.tgz", + "integrity": "sha512-JhZUT7JFcQy/EzW605k/ktHtncoo9vnyW/2GspNYwFlN1C/WmjuV/xtS04e9SOkL2sTdw0VAZ2UGCcQ9lR6p6w==", + "license": "MIT" + }, "node_modules/clsx": { "version": "2.1.1", "resolved": "https://registry.npmjs.org/clsx/-/clsx-2.1.1.tgz", @@ -3307,6 +3395,111 @@ "devOptional": true, "license": "MIT" }, + "node_modules/d3-color": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/d3-color/-/d3-color-3.1.0.tgz", + "integrity": "sha512-zg/chbXyeBtMQ1LbD/WSoW2DpC3I0mpmPdW+ynRTj/x2DAWYrIY7qeZIHidozwV24m4iavr15lNwIwLxRmOxhA==", + "license": "ISC", + "engines": { + "node": ">=12" + } + }, + "node_modules/d3-dispatch": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/d3-dispatch/-/d3-dispatch-3.0.1.tgz", + "integrity": "sha512-rzUyPU/S7rwUflMyLc1ETDeBj0NRuHKKAcvukozwhshr6g6c5d8zh4c2gQjY2bZ0dXeGLWc1PF174P2tVvKhfg==", + "license": "ISC", + "engines": { + "node": ">=12" + } + }, + "node_modules/d3-drag": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/d3-drag/-/d3-drag-3.0.0.tgz", + "integrity": "sha512-pWbUJLdETVA8lQNJecMxoXfH6x+mO2UQo8rSmZ+QqxcbyA3hfeprFgIT//HW2nlHChWeIIMwS2Fq+gEARkhTkg==", + "license": "ISC", + "dependencies": { + "d3-dispatch": "1 - 3", + "d3-selection": "3" + }, + "engines": { + "node": ">=12" + } + }, + "node_modules/d3-ease": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/d3-ease/-/d3-ease-3.0.1.tgz", + "integrity": "sha512-wR/XK3D3XcLIZwpbvQwQ5fK+8Ykds1ip7A2Txe0yxncXSdq1L9skcG7blcedkOX+ZcgxGAmLX1FrRGbADwzi0w==", + "license": "BSD-3-Clause", + "engines": { + "node": ">=12" + } + }, + "node_modules/d3-interpolate": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/d3-interpolate/-/d3-interpolate-3.0.1.tgz", + "integrity": "sha512-3bYs1rOD33uo8aqJfKP3JWPAibgw8Zm2+L9vBKEHJ2Rg+viTR7o5Mmv5mZcieN+FRYaAOWX5SJATX6k1PWz72g==", + "license": "ISC", + "dependencies": { + "d3-color": "1 - 3" + }, + "engines": { + "node": ">=12" + } + }, + "node_modules/d3-selection": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/d3-selection/-/d3-selection-3.0.0.tgz", + "integrity": "sha512-fmTRWbNMmsmWq6xJV8D19U/gw/bwrHfNXxrIN+HfZgnzqTHp9jOmKMhsTUjXOJnZOdZY9Q28y4yebKzqDKlxlQ==", + "license": "ISC", + "engines": { + "node": ">=12" + } + }, + "node_modules/d3-timer": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/d3-timer/-/d3-timer-3.0.1.tgz", + "integrity": "sha512-ndfJ/JxxMd3nw31uyKoY2naivF+r29V+Lc0svZxe1JvvIRmi8hUsrMvdOwgS1o6uBHmiz91geQ0ylPP0aj1VUA==", + "license": "ISC", + "engines": { + "node": ">=12" + } + }, + "node_modules/d3-transition": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/d3-transition/-/d3-transition-3.0.1.tgz", + "integrity": "sha512-ApKvfjsSR6tg06xrL434C0WydLr7JewBB3V+/39RMHsaXTOG0zmt/OAXeng5M5LBm0ojmxJrpomQVZ1aPvBL4w==", + "license": "ISC", + "dependencies": { + "d3-color": "1 - 3", + "d3-dispatch": "1 - 3", + "d3-ease": "1 - 3", + "d3-interpolate": "1 - 3", + "d3-timer": "1 - 3" + }, + "engines": { + "node": ">=12" + }, + "peerDependencies": { + "d3-selection": "2 - 3" + } + }, + "node_modules/d3-zoom": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/d3-zoom/-/d3-zoom-3.0.0.tgz", + "integrity": "sha512-b8AmV3kfQaqWAuacbPuNbL6vahnOJflOhexLzMMNLga62+/nh0JzvJ0aO/5a5MVgUFGS7Hu1P9P03o3fJkDCyw==", + "license": "ISC", + "dependencies": { + "d3-dispatch": "1 - 3", + "d3-drag": "2 - 3", + "d3-interpolate": "1 - 3", + "d3-selection": "2 - 3", + "d3-transition": "2 - 3" + }, + "engines": { + "node": ">=12" + } + }, "node_modules/debug": { "version": "4.4.1", "resolved": "https://registry.npmjs.org/debug/-/debug-4.4.1.tgz", @@ -5194,6 +5387,15 @@ } } }, + "node_modules/use-sync-external-store": { + "version": "1.6.0", + "resolved": "https://registry.npmjs.org/use-sync-external-store/-/use-sync-external-store-1.6.0.tgz", + "integrity": "sha512-Pp6GSwGP/NrPIrxVFAIkOQeyw8lFenOHijQWkUTrDvrF4ALqylP2C/KCkeS9dpUM3KvYRQhna5vt7IL95+ZQ9w==", + "license": "MIT", + "peerDependencies": { + "react": "^16.8.0 || ^17.0.0 || ^18.0.0 || ^19.0.0" + } + }, "node_modules/vite": { "version": "7.3.0", "resolved": "https://registry.npmjs.org/vite/-/vite-7.3.0.tgz", @@ -5345,6 +5547,34 @@ "funding": { "url": "https://github.com/sponsors/sindresorhus" } + }, + "node_modules/zustand": { + "version": "4.5.7", + "resolved": "https://registry.npmjs.org/zustand/-/zustand-4.5.7.tgz", + "integrity": "sha512-CHOUy7mu3lbD6o6LJLfllpjkzhHXSBlX8B9+qPddUsIfeF5S/UZ5q0kmCsnRqT1UHFQZchNFDDzMbQsuesHWlw==", + "license": "MIT", + "dependencies": { + "use-sync-external-store": "^1.2.2" + }, + "engines": { + "node": ">=12.7.0" + }, + "peerDependencies": { + "@types/react": ">=16.8", + "immer": ">=9.0.6", + "react": ">=16.8" + }, + "peerDependenciesMeta": { + "@types/react": { + "optional": true + }, + "immer": { + "optional": true + }, + "react": { + "optional": true + } + } } } } diff --git a/frontend/package.json b/frontend/package.json index e1c92f6e5..329894fbd 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -24,6 +24,7 @@ "@radix-ui/react-toggle": "^1.1.10", "@radix-ui/react-toggle-group": "^1.1.11", "@radix-ui/react-tooltip": "^1.2.8", + "@xyflow/react": "^12.10.0", "class-variance-authority": "^0.7.1", "clsx": "^2.1.1", "lucide-react": "^0.544.0", diff --git a/frontend/src/components/Header.tsx b/frontend/src/components/Header.tsx index 27941f9eb..ddc4b9e98 100644 --- a/frontend/src/components/Header.tsx +++ b/frontend/src/components/Header.tsx @@ -1,7 +1,8 @@ import { useState, useEffect, useRef } from "react"; -import { Settings, Cloud, CloudOff } from "lucide-react"; +import { Settings, Cloud, CloudOff, GitBranch } from "lucide-react"; import { Button } from "./ui/button"; import { SettingsDialog } from "./SettingsDialog"; +import { DagEditorDialog } from "./dag/DagEditorDialog"; import { toast } from "sonner"; import { useCloudStatus } from "../hooks/useCloudStatus"; @@ -22,6 +23,7 @@ export function Header({ onSettingsTabOpened, }: HeaderProps) { const [settingsOpen, setSettingsOpen] = useState(false); + const [dagEditorOpen, setDagEditorOpen] = useState(false); const [initialTab, setInitialTab] = useState< "general" | "account" | "api-keys" | "plugins" >("general"); @@ -114,6 +116,15 @@ export function Header({

Daydream Scope

+ + + +
+ + + {status && ( + + {status} + {dagSource && ( + ({dagSource}) + )} + + )} +
+ + {/* Flow Canvas */} +
+ + + + + +
+
+ ); +} diff --git a/frontend/src/components/dag/DagEditorDialog.tsx b/frontend/src/components/dag/DagEditorDialog.tsx new file mode 100644 index 000000000..1fa822d0f --- /dev/null +++ b/frontend/src/components/dag/DagEditorDialog.tsx @@ -0,0 +1,33 @@ +import * as Dialog from "@radix-ui/react-dialog"; +import { X } from "lucide-react"; +import { DagEditor } from "./DagEditor"; + +interface DagEditorDialogProps { + open: boolean; + onClose: () => void; +} + +export function DagEditorDialog({ open, onClose }: DagEditorDialogProps) { + return ( + !val && onClose()}> + + + +
+ + DAG Editor + + + + +
+
+ +
+
+
+
+ ); +} diff --git a/frontend/src/components/dag/PipelineNode.tsx b/frontend/src/components/dag/PipelineNode.tsx new file mode 100644 index 000000000..61233e41a --- /dev/null +++ b/frontend/src/components/dag/PipelineNode.tsx @@ -0,0 +1,89 @@ +import { Handle, Position, useReactFlow } from "@xyflow/react"; +import type { NodeProps, Node } from "@xyflow/react"; +import type { FlowNodeData } from "../../lib/dagUtils"; + +type PipelineNodeType = Node; + +export function PipelineNode({ id, data }: NodeProps) { + const { setNodes } = useReactFlow(); + + const pipelineIds = data.availablePipelineIds || []; + + const handleChange = (e: React.ChangeEvent) => { + const newPipelineId = e.target.value; + setNodes(nds => + nds.map(n => + n.id === id + ? { + ...n, + data: { + ...n.data, + pipelineId: newPipelineId, + label: newPipelineId || n.id, + }, + } + : n + ) + ); + }; + + return ( +
+
Pipeline
+ + + + + + + +
+ ); +} diff --git a/frontend/src/components/dag/SinkNode.tsx b/frontend/src/components/dag/SinkNode.tsx new file mode 100644 index 000000000..c1a963451 --- /dev/null +++ b/frontend/src/components/dag/SinkNode.tsx @@ -0,0 +1,20 @@ +import { Handle, Position } from "@xyflow/react"; +import type { NodeProps, Node } from "@xyflow/react"; +import type { FlowNodeData } from "../../lib/dagUtils"; + +type SinkNodeType = Node; + +export function SinkNode({ data }: NodeProps) { + return ( +
+
Sink
+
{data.label}
+ +
+ ); +} diff --git a/frontend/src/components/dag/SourceNode.tsx b/frontend/src/components/dag/SourceNode.tsx new file mode 100644 index 000000000..bc89be903 --- /dev/null +++ b/frontend/src/components/dag/SourceNode.tsx @@ -0,0 +1,20 @@ +import { Handle, Position } from "@xyflow/react"; +import type { NodeProps, Node } from "@xyflow/react"; +import type { FlowNodeData } from "../../lib/dagUtils"; + +type SourceNodeType = Node; + +export function SourceNode({ data }: NodeProps) { + return ( +
+
Source
+
{data.label}
+ +
+ ); +} diff --git a/frontend/src/lib/api.ts b/frontend/src/lib/api.ts index 511a79ca1..05b03342e 100644 --- a/frontend/src/lib/api.ts +++ b/frontend/src/lib/api.ts @@ -797,6 +797,81 @@ export const deleteApiKey = async ( return response.json(); }; +// DAG Configuration types and API functions + +export interface DagNode { + id: string; + type: "source" | "pipeline" | "sink"; + pipeline_id?: string | null; +} + +export interface DagEdge { + from: string; + from_port: string; + to_node: string; + to_port: string; + kind?: "stream" | "parameter"; +} + +export interface DagConfig { + nodes: DagNode[]; + edges: DagEdge[]; +} + +export interface DagResponse { + source: "api" | "input.json" | null; + dag: DagConfig | null; +} + +export const getDag = async (): Promise => { + const response = await fetch("/api/v1/dag", { + method: "GET", + headers: { "Content-Type": "application/json" }, + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error( + `Get DAG failed: ${response.status} ${response.statusText}: ${errorText}` + ); + } + + return response.json(); +}; + +export const setDag = async ( + dag: DagConfig +): Promise<{ message: string; nodes: number; edges: number }> => { + const response = await fetch("/api/v1/dag", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(dag), + }); + + if (!response.ok) { + const detail = await extractErrorDetail(response); + throw new Error(detail); + } + + return response.json(); +}; + +export const clearDag = async (): Promise<{ message: string }> => { + const response = await fetch("/api/v1/dag", { + method: "DELETE", + headers: { "Content-Type": "application/json" }, + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error( + `Clear DAG failed: ${response.status} ${response.statusText}: ${errorText}` + ); + } + + return response.json(); +}; + export const downloadRecording = async (sessionId: string): Promise => { if (!sessionId) { throw new Error("Session ID is required to download recording"); diff --git a/frontend/src/lib/dagUtils.ts b/frontend/src/lib/dagUtils.ts new file mode 100644 index 000000000..a8adbbc80 --- /dev/null +++ b/frontend/src/lib/dagUtils.ts @@ -0,0 +1,127 @@ +import type { Node, Edge } from "@xyflow/react"; +import type { DagConfig, DagNode, DagEdge } from "./api"; + +// Layout constants +const NODE_WIDTH = 200; +const NODE_HEIGHT = 60; +const COLUMN_GAP = 300; +const ROW_GAP = 100; +const START_X = 50; +const START_Y = 50; + +export interface FlowNodeData { + label: string; + pipelineId?: string | null; + nodeType: "source" | "pipeline" | "sink"; + availablePipelineIds?: string[]; + [key: string]: unknown; +} + +/** + * Convert backend DagConfig to React Flow nodes and edges. + * Auto-layout: sources on the left, pipelines in the middle, sinks on the right. + */ +export function dagConfigToFlow(dag: DagConfig): { + nodes: Node[]; + edges: Edge[]; +} { + const sources = dag.nodes.filter(n => n.type === "source"); + const pipelines = dag.nodes.filter(n => n.type === "pipeline"); + const sinks = dag.nodes.filter(n => n.type === "sink"); + + const nodes: Node[] = []; + + // Layout sources (column 0) + sources.forEach((n, i) => { + nodes.push({ + id: n.id, + type: "source", + position: { x: START_X, y: START_Y + i * (NODE_HEIGHT + ROW_GAP) }, + data: { label: n.id, nodeType: "source" }, + }); + }); + + // Layout pipelines (column 1) + pipelines.forEach((n, i) => { + nodes.push({ + id: n.id, + type: "pipeline", + position: { + x: START_X + COLUMN_GAP, + y: START_Y + i * (NODE_HEIGHT + ROW_GAP), + }, + data: { + label: n.pipeline_id || n.id, + pipelineId: n.pipeline_id, + nodeType: "pipeline", + }, + }); + }); + + // Layout sinks (column 2) + sinks.forEach((n, i) => { + nodes.push({ + id: n.id, + type: "sink", + position: { + x: START_X + COLUMN_GAP * 2, + y: START_Y + i * (NODE_HEIGHT + ROW_GAP), + }, + data: { label: n.id, nodeType: "sink" }, + }); + }); + + // Convert edges + const edges: Edge[] = dag.edges.map((e, i) => ({ + id: `e-${i}-${e.from}-${e.to_node}`, + source: e.from, + sourceHandle: e.from_port, + target: e.to_node, + targetHandle: e.to_port, + label: e.from_port !== "video" ? e.from_port : undefined, + animated: true, + })); + + return { nodes, edges }; +} + +/** + * Convert React Flow state back to backend DagConfig JSON. + */ +export function flowToDagConfig( + nodes: Node[], + edges: Edge[] +): DagConfig { + const dagNodes: DagNode[] = nodes.map(n => ({ + id: n.id, + type: n.data.nodeType, + pipeline_id: + n.data.nodeType === "pipeline" ? (n.data.pipelineId ?? null) : undefined, + })); + + const dagEdges: DagEdge[] = edges.map(e => ({ + from: e.source, + from_port: e.sourceHandle || "video", + to_node: e.target, + to_port: e.targetHandle || "video", + kind: "stream" as const, + })); + + return { nodes: dagNodes, edges: dagEdges }; +} + +/** + * Generate a unique node ID with a given prefix. + */ +export function generateNodeId( + prefix: string, + existingIds: Set +): string { + if (!existingIds.has(prefix)) return prefix; + let i = 1; + while (existingIds.has(`${prefix}_${i}`)) i++; + return `${prefix}_${i}`; +} + +// Default node dimensions for reference +export { NODE_WIDTH, NODE_HEIGHT }; diff --git a/src/scope/server/app.py b/src/scope/server/app.py index 330e60fb3..c99c49893 100644 --- a/src/scope/server/app.py +++ b/src/scope/server/app.py @@ -496,11 +496,24 @@ async def load_pipeline( cloud-hosted scope backend. """ try: - # Override pipeline IDs from input.json DAG if available + # Override pipeline IDs: API DAG > input.json > request + from .dag_state import get_api_dag from .frame_processor import get_pipeline_ids_from_input_json - dag_pipeline_ids = get_pipeline_ids_from_input_json() - if dag_pipeline_ids is not None: + api_dag = get_api_dag() + if api_dag is not None: + dag_pipeline_ids = [ + n.pipeline_id + for n in api_dag.nodes + if n.type == "pipeline" and n.pipeline_id is not None + ] + logger.info( + f"Overriding pipeline_ids from API DAG: {dag_pipeline_ids} " + f"(was: {request.pipeline_ids})" + ) + pipeline_ids = dag_pipeline_ids + load_params_dict = None + elif (dag_pipeline_ids := get_pipeline_ids_from_input_json()) is not None: logger.info( f"Overriding pipeline_ids from input.json: {dag_pipeline_ids} " f"(was: {request.pipeline_ids})" @@ -1714,6 +1727,93 @@ async def reload_plugin( ) from e +# ============================================================================= +# DAG Configuration Endpoints +# ============================================================================= + + +@app.post("/api/v1/dag") +async def set_dag_config(request: Request): + """Accept and store a DAG configuration. + + Validates the structure and checks that pipeline_ids exist in the registry. + """ + from scope.core.pipelines.registry import PipelineRegistry + + from .dag_schema import DagConfig + from .dag_state import set_api_dag + + try: + body = await request.json() + dag = DagConfig.model_validate(body) + + # Structural validation + errors = dag.validate_structure() + if errors: + raise HTTPException(status_code=422, detail={"errors": errors}) + + # Validate that pipeline_ids exist in the registry + available = set(PipelineRegistry.list_pipelines()) + for node in dag.nodes: + if node.type == "pipeline" and node.pipeline_id not in available: + raise HTTPException( + status_code=422, + detail={ + "errors": [ + f"Pipeline '{node.pipeline_id}' not found in registry. " + f"Available: {sorted(available)}" + ] + }, + ) + + set_api_dag(dag) + return { + "message": "DAG configuration saved", + "nodes": len(dag.nodes), + "edges": len(dag.edges), + } + except HTTPException: + raise + except Exception as e: + logger.error(f"Error setting DAG config: {e}") + raise HTTPException(status_code=400, detail=str(e)) from e + + +@app.get("/api/v1/dag") +async def get_dag_config(): + """Return the current DAG configuration. + + Priority: API-set DAG > input.json > none. + """ + from .dag_state import get_api_dag + from .frame_processor import _load_dag_from_input_json + + api_dag = get_api_dag() + if api_dag is not None: + return { + "source": "api", + "dag": api_dag.model_dump(by_alias=True), + } + + file_dag = _load_dag_from_input_json() + if file_dag is not None: + return { + "source": "input.json", + "dag": file_dag.model_dump(by_alias=True), + } + + return {"source": None, "dag": None} + + +@app.delete("/api/v1/dag") +async def clear_dag_config(): + """Clear the API-set DAG, reverting to fallback behavior.""" + from .dag_state import clear_api_dag + + clear_api_dag() + return {"message": "DAG configuration cleared"} + + # ============================================================================= # Cloud Integration Endpoints # ============================================================================= diff --git a/src/scope/server/dag_schema.py b/src/scope/server/dag_schema.py index 327a2ae11..c29b4c1d3 100644 --- a/src/scope/server/dag_schema.py +++ b/src/scope/server/dag_schema.py @@ -108,3 +108,47 @@ def node_by_id(self, node_id: str) -> DagNode | None: if n.id == node_id: return n return None + + def validate_structure(self) -> list[str]: + """Validate the DAG structure and return a list of error messages. + + Checks: + - No duplicate node IDs + - At least one source and one sink node + - Pipeline nodes have a pipeline_id + - All edge references point to existing nodes + """ + errors: list[str] = [] + node_ids = [n.id for n in self.nodes] + + # Check for duplicate node IDs + seen: set[str] = set() + for nid in node_ids: + if nid in seen: + errors.append(f"Duplicate node ID: '{nid}'") + seen.add(nid) + + # At least one source and one sink + if not self.get_source_node_ids(): + errors.append("DAG must have at least one source node") + if not self.get_sink_node_ids(): + errors.append("DAG must have at least one sink node") + + # Pipeline nodes must have pipeline_id + for node in self.nodes: + if node.type == "pipeline" and not node.pipeline_id: + errors.append(f"Pipeline node '{node.id}' is missing pipeline_id") + + # Edge references must point to existing nodes + node_id_set = set(node_ids) + for edge in self.edges: + if edge.from_node not in node_id_set: + errors.append( + f"Edge references non-existent source node: '{edge.from_node}'" + ) + if edge.to_node not in node_id_set: + errors.append( + f"Edge references non-existent target node: '{edge.to_node}'" + ) + + return errors diff --git a/src/scope/server/dag_state.py b/src/scope/server/dag_state.py new file mode 100644 index 000000000..8a472c5df --- /dev/null +++ b/src/scope/server/dag_state.py @@ -0,0 +1,39 @@ +"""In-memory DAG configuration store. + +Holds a DAG config set via the API that takes priority over input.json. +Thread-safe via a threading lock. +""" + +from __future__ import annotations + +import logging +import threading + +from .dag_schema import DagConfig + +logger = logging.getLogger(__name__) + +_lock = threading.Lock() +_dag_config: DagConfig | None = None + + +def get_api_dag() -> DagConfig | None: + """Return the API-set DAG config, or None if not set.""" + with _lock: + return _dag_config + + +def set_api_dag(dag: DagConfig) -> None: + """Store a DAG config set via the API.""" + with _lock: + global _dag_config + _dag_config = dag + logger.info(f"API DAG set with {len(dag.nodes)} nodes and {len(dag.edges)} edges") + + +def clear_api_dag() -> None: + """Clear the API-set DAG config, reverting to fallback behavior.""" + with _lock: + global _dag_config + _dag_config = None + logger.info("API DAG cleared") diff --git a/src/scope/server/frame_processor.py b/src/scope/server/frame_processor.py index f80a71bc2..abc081efd 100644 --- a/src/scope/server/frame_processor.py +++ b/src/scope/server/frame_processor.py @@ -150,9 +150,21 @@ def __init__( self._playback_ready_emitted = False self._stream_start_time: float | None = None - # Override pipeline_ids from input.json if available, else use UI params - dag_pipeline_ids = get_pipeline_ids_from_input_json() - if dag_pipeline_ids is not None: + # Override pipeline_ids: API DAG > input.json > UI params + from .dag_state import get_api_dag + + api_dag = get_api_dag() + if api_dag is not None: + dag_pipeline_ids = [ + n.pipeline_id + for n in api_dag.nodes + if n.type == "pipeline" and n.pipeline_id is not None + ] + self.pipeline_ids = dag_pipeline_ids + logger.info( + f"[FRAME-PROCESSOR] Using pipeline_ids from API DAG: {dag_pipeline_ids}" + ) + elif (dag_pipeline_ids := get_pipeline_ids_from_input_json()) is not None: self.pipeline_ids = dag_pipeline_ids logger.info( f"[FRAME-PROCESSOR] Using pipeline_ids from input.json: {dag_pipeline_ids}" @@ -970,9 +982,15 @@ def _setup_pipeline_chain_sync(self): """ dag_config: DagConfig - # Try loading DAG from input.json first - dag_config = _load_dag_from_input_json() - if dag_config is not None: + # Priority: API DAG > input.json > parameters > linear fallback + from .dag_state import get_api_dag + + api_dag = get_api_dag() + if api_dag is not None: + dag_config = api_dag + logger.info("[FRAME-PROCESSOR] Using DAG from API") + elif (file_dag := _load_dag_from_input_json()) is not None: + dag_config = file_dag logger.info("[FRAME-PROCESSOR] Using DAG from input.json") elif not self.pipeline_ids and not self.parameters.get("dag"): logger.error("No pipeline IDs or DAG config provided") From 91057e3df2819aa0641b37c9b5dd0696761264cf Mon Sep 17 00:00:00 2001 From: Rafal Leszko Date: Tue, 17 Feb 2026 10:27:49 +0000 Subject: [PATCH 05/11] Improve DAG UI --- frontend/src/components/ComplexFields.tsx | 4 +- frontend/src/components/SettingsPanel.tsx | 293 +++++++++++-------- frontend/src/components/dag/DagEditor.tsx | 77 +++-- frontend/src/components/dag/PipelineNode.tsx | 108 ++++--- frontend/src/hooks/useStreamState.ts | 36 ++- frontend/src/lib/api.ts | 3 + frontend/src/lib/dagUtils.ts | 36 ++- frontend/src/pages/StreamPage.tsx | 53 +++- frontend/src/types/index.ts | 2 + src/scope/core/pipelines/base_schema.py | 2 + src/scope/server/app.py | 11 +- src/scope/server/dag_executor.py | 11 +- 12 files changed, 421 insertions(+), 215 deletions(-) diff --git a/frontend/src/components/ComplexFields.tsx b/frontend/src/components/ComplexFields.tsx index 77e162d32..0234c03d5 100644 --- a/frontend/src/components/ComplexFields.tsx +++ b/frontend/src/components/ComplexFields.tsx @@ -72,6 +72,8 @@ export interface SchemaComplexFieldContext { isStreaming?: boolean; isLoading?: boolean; isCloudMode?: boolean; + /** When true, VACE controls are disabled (pipeline chain from DAG editor). */ + graphMode?: boolean; /** Per-field overrides for schema-driven fields (e.g. image path). */ schemaFieldOverrides?: Record; onSchemaFieldOverrideChange?: ( @@ -132,7 +134,7 @@ export function SchemaComplexField({ if (component === "vace" && !rendered.has("vace")) { rendered.add("vace"); return ( -
+
void; isCloudMode?: boolean; + // Graph mode: pipeline chain comes from DAG editor + graphMode?: boolean; + onGraphModeChange?: (enabled: boolean) => void; } export function SettingsPanel({ @@ -243,6 +246,8 @@ export function SettingsPanel({ onPreprocessorSchemaFieldOverrideChange, onPostprocessorSchemaFieldOverrideChange, isCloudMode = false, + graphMode = false, + onGraphModeChange, }: SettingsPanelProps) { // Local slider state management hooks const noiseScaleSlider = useLocalSliderValue(noiseScale, onNoiseScaleChange); @@ -334,141 +339,168 @@ export function SettingsPanel({ Settings -
-

Pipeline ID

- + {graphMode ? "ON" : "OFF"} +
- {currentPipeline && ( - - -
-

- {currentPipeline.name} - {currentPipeline.pluginName && ( - - {" "} - ({currentPipeline.pluginName}) - - )} -

-
+
+
+

Pipeline ID

+ +
-
- {(currentPipeline.about || - currentPipeline.docsUrl || - currentPipeline.modified) && ( -
- {currentPipeline.about && ( - - - - - - - - -

{currentPipeline.about}

-
-
-
+ {currentPipeline && ( + + +
+

+ {currentPipeline.name} + {currentPipeline.pluginName && ( + + {" "} + ({currentPipeline.pluginName}) + )} - {currentPipeline.modified && ( - - - - - - - - -

- This pipeline contains modifications based on the - original project. -

-
-
-
- )} - {currentPipeline.docsUrl && ( - - +

+ + -
-
- )} + + Docs + + + )} +
+ )} +
+ + + )} +
{/* Preprocessor Selector */} -
+
{/* Postprocessor Selector - fixed, always shown */} -
+
{currentPipeline?.supportsVACE && ( -
+
( [] ); + const [portsMap, setPortsMap] = useState< + Record + >({}); - // Fetch available pipeline IDs on mount + // Fetch available pipeline IDs and port schemas on mount useEffect(() => { getPipelineSchemas() .then(schemas => { setAvailablePipelineIds(Object.keys(schemas.pipelines)); + setPortsMap(buildPipelinePortsMap(schemas.pipelines)); }) .catch(err => { console.error("Failed to fetch pipeline schemas:", err); }); }, []); - // Inject availablePipelineIds into all pipeline nodes when the list changes + // Inject availablePipelineIds and portsMap into all pipeline nodes when they change useEffect(() => { if (availablePipelineIds.length === 0) return; setNodes(nds => nds.map(n => n.data.nodeType === "pipeline" - ? { ...n, data: { ...n.data, availablePipelineIds } } + ? { + ...n, + data: { + ...n.data, + availablePipelineIds, + pipelinePortsMap: portsMap, + }, + } : n ) ); - }, [availablePipelineIds, setNodes]); + }, [availablePipelineIds, portsMap, setNodes]); - // Load existing DAG on mount + // Load existing DAG on mount (after portsMap is ready) useEffect(() => { + // Wait until portsMap is populated + if (Object.keys(portsMap).length === 0) return; + getDag() .then(response => { if (response.dag) { const { nodes: flowNodes, edges: flowEdges } = dagConfigToFlow( - response.dag + response.dag, + portsMap ); - // Inject available pipeline IDs into pipeline nodes + // Inject available pipeline IDs and portsMap into pipeline nodes const enrichedNodes = flowNodes.map(n => n.data.nodeType === "pipeline" - ? { ...n, data: { ...n.data, availablePipelineIds } } + ? { + ...n, + data: { + ...n.data, + availablePipelineIds, + pipelinePortsMap: portsMap, + }, + } : n ); setNodes(enrichedNodes); @@ -90,7 +113,7 @@ export function DagEditor() { setStatus("Failed to load DAG"); }); // eslint-disable-next-line react-hooks/exhaustive-deps - }, []); + }, [portsMap]); const onConnect = useCallback( (connection: Connection) => { @@ -99,10 +122,7 @@ export function DagEditor() { [setEdges] ); - const existingIds = useMemo( - () => new Set(nodes.map(n => n.id)), - [nodes] - ); + const existingIds = useMemo(() => new Set(nodes.map(n => n.id)), [nodes]); const addSourceNode = useCallback(() => { const id = generateNodeId("input", existingIds); @@ -126,10 +146,13 @@ export function DagEditor() { pipelineId: null, nodeType: "pipeline", availablePipelineIds, + pipelinePortsMap: portsMap, + streamInputs: ["video"], + streamOutputs: ["video"], }, }; setNodes(nds => [...nds, newNode]); - }, [existingIds, nodes.length, setNodes, availablePipelineIds]); + }, [existingIds, nodes.length, setNodes, availablePipelineIds, portsMap]); const addSinkNode = useCallback(() => { const id = generateNodeId("output", existingIds); @@ -167,6 +190,21 @@ export function DagEditor() { } }, [setNodes, setEdges]); + const handleExport = useCallback(() => { + const dagConfig = flowToDagConfig(nodes, edges); + const dataStr = JSON.stringify(dagConfig, null, 2); + const blob = new Blob([dataStr], { type: "application/json" }); + const url = URL.createObjectURL(blob); + const link = document.createElement("a"); + link.href = url; + link.download = `dag-${new Date().toISOString().split("T")[0]}.json`; + document.body.appendChild(link); + link.click(); + document.body.removeChild(link); + URL.revokeObjectURL(url); + setStatus("DAG exported"); + }, [nodes, edges]); + return (
{/* Toolbar */} @@ -190,6 +228,12 @@ export function DagEditor() { + Sink
+
diff --git a/frontend/src/components/dag/PipelineNode.tsx b/frontend/src/components/dag/PipelineNode.tsx index 61233e41a..610b5a3e1 100644 --- a/frontend/src/components/dag/PipelineNode.tsx +++ b/frontend/src/components/dag/PipelineNode.tsx @@ -4,13 +4,29 @@ import type { FlowNodeData } from "../../lib/dagUtils"; type PipelineNodeType = Node; +/** Color palette for port handles - each port gets a consistent color */ +const PORT_COLORS: Record = { + video: "bg-blue-400", + video2: "bg-cyan-400", + vace_input_frames: "bg-purple-400", + vace_input_masks: "bg-pink-400", +}; + +function getPortColor(portName: string): string { + return PORT_COLORS[portName] ?? "bg-gray-400"; +} + export function PipelineNode({ id, data }: NodeProps) { const { setNodes } = useReactFlow(); const pipelineIds = data.availablePipelineIds || []; + const portsMap = data.pipelinePortsMap; + const streamInputs = data.streamInputs ?? ["video"]; + const streamOutputs = data.streamOutputs ?? ["video"]; const handleChange = (e: React.ChangeEvent) => { const newPipelineId = e.target.value; + const ports = newPipelineId && portsMap ? portsMap[newPipelineId] : null; setNodes(nds => nds.map(n => n.id === id @@ -18,8 +34,10 @@ export function PipelineNode({ id, data }: NodeProps) { ...n, data: { ...n.data, - pipelineId: newPipelineId, + pipelineId: newPipelineId || null, label: newPipelineId || n.id, + streamInputs: ports?.inputs ?? ["video"], + streamOutputs: ports?.outputs ?? ["video"], }, } : n @@ -27,6 +45,10 @@ export function PipelineNode({ id, data }: NodeProps) { ); }; + // Calculate handle positions evenly distributed + const inputCount = streamInputs.length; + const outputCount = streamOutputs.length; + return (
Pipeline
@@ -42,48 +64,48 @@ export function PipelineNode({ id, data }: NodeProps) { ))} - - - - - - + + {/* Port labels */} +
+
+ {streamInputs.map(port => ( +
+ {port} +
+ ))} +
+
+ {streamOutputs.map(port => ( +
+ {port} +
+ ))} +
+
+ + {/* Input handles (left side) */} + {streamInputs.map((port, i) => ( + + ))} + + {/* Output handles (right side) */} + {streamOutputs.map((port, i) => ( + + ))}
); } diff --git a/frontend/src/hooks/useStreamState.ts b/frontend/src/hooks/useStreamState.ts index 72ffb49e2..662270018 100644 --- a/frontend/src/hooks/useStreamState.ts +++ b/frontend/src/hooks/useStreamState.ts @@ -161,7 +161,8 @@ export function useStreamState() { const defaultPipelineId = "longlive"; // Get initial defaults (use fallback since schemas haven't loaded yet) - const initialDefaults = getFallbackDefaults("text"); + // Use video mode defaults since graphMode (default ON) uses video input + const initialDefaults = getFallbackDefaults("video"); const [settings, setSettings] = useState({ pipelineId: "longlive", @@ -177,7 +178,8 @@ export function useStreamState() { kvCacheAttentionBias: 0.3, paused: false, loraMergeStrategy: "permanent_merge", - inputMode: initialDefaults.inputMode, + inputMode: "video", + graphMode: true, }); const [promptData, setPromptData] = useState({ @@ -322,7 +324,9 @@ export function useStreamState() { // Track previous pipelineId so we only reset inputMode when the pipeline actually changes const prevPipelineIdRef = useRef(null); - // Update inputMode when schemas first load or pipeline changes + // Update inputMode and resolution when schemas first load or pipeline changes. + // When graphMode is ON, force video input and apply video-mode resolution from schema. + // When graphMode is OFF, apply pipeline's default mode. useEffect(() => { if (pipelineSchemas) { const schema = pipelineSchemas.pipelines[settings.pipelineId]; @@ -330,14 +334,30 @@ export function useStreamState() { schema?.default_mode && prevPipelineIdRef.current !== settings.pipelineId ) { - setSettings(prev => ({ - ...prev, - inputMode: schema.default_mode, - })); + if (settings.graphMode) { + // Graph mode: keep video input, but apply video-mode resolution from schema + const videoDefaults = getDefaults(settings.pipelineId, "video"); + setSettings(prev => ({ + ...prev, + inputMode: "video", + resolution: { + height: videoDefaults.height, + width: videoDefaults.width, + }, + denoisingSteps: videoDefaults.denoisingSteps, + noiseScale: videoDefaults.noiseScale, + noiseController: videoDefaults.noiseController, + })); + } else { + setSettings(prev => ({ + ...prev, + inputMode: schema.default_mode, + })); + } } prevPipelineIdRef.current = settings.pipelineId; } - }, [pipelineSchemas, settings.pipelineId]); + }, [pipelineSchemas, settings.pipelineId, settings.graphMode, getDefaults]); // Set recommended quantization based on pipeline schema and available VRAM useEffect(() => { diff --git a/frontend/src/lib/api.ts b/frontend/src/lib/api.ts index 05b03342e..0bb913ef3 100644 --- a/frontend/src/lib/api.ts +++ b/frontend/src/lib/api.ts @@ -520,6 +520,9 @@ export interface PipelineSchemaInfo { recommended_quantization_vram_threshold: number | null; modified: boolean; plugin_name: string | null; + // DAG port declarations + stream_inputs: string[]; + stream_outputs: string[]; } export interface PipelineSchemasResponse { diff --git a/frontend/src/lib/dagUtils.ts b/frontend/src/lib/dagUtils.ts index a8adbbc80..9a9c938bb 100644 --- a/frontend/src/lib/dagUtils.ts +++ b/frontend/src/lib/dagUtils.ts @@ -1,5 +1,5 @@ import type { Node, Edge } from "@xyflow/react"; -import type { DagConfig, DagNode, DagEdge } from "./api"; +import type { DagConfig, DagNode, DagEdge, PipelineSchemaInfo } from "./api"; // Layout constants const NODE_WIDTH = 200; @@ -9,19 +9,48 @@ const ROW_GAP = 100; const START_X = 50; const START_Y = 50; +export interface PortInfo { + name: string; +} + export interface FlowNodeData { label: string; pipelineId?: string | null; nodeType: "source" | "pipeline" | "sink"; availablePipelineIds?: string[]; + /** Declared input ports for the selected pipeline */ + streamInputs?: string[]; + /** Declared output ports for the selected pipeline */ + streamOutputs?: string[]; + /** Pipeline schemas keyed by pipeline_id, for looking up ports on selection change */ + pipelinePortsMap?: Record; [key: string]: unknown; } +/** + * Build a map of pipeline_id -> { inputs, outputs } from schemas. + */ +export function buildPipelinePortsMap( + schemas: Record +): Record { + const map: Record = {}; + for (const [id, schema] of Object.entries(schemas)) { + map[id] = { + inputs: schema.stream_inputs ?? ["video"], + outputs: schema.stream_outputs ?? ["video"], + }; + } + return map; +} + /** * Convert backend DagConfig to React Flow nodes and edges. * Auto-layout: sources on the left, pipelines in the middle, sinks on the right. */ -export function dagConfigToFlow(dag: DagConfig): { +export function dagConfigToFlow( + dag: DagConfig, + portsMap?: Record +): { nodes: Node[]; edges: Edge[]; } { @@ -43,6 +72,7 @@ export function dagConfigToFlow(dag: DagConfig): { // Layout pipelines (column 1) pipelines.forEach((n, i) => { + const ports = n.pipeline_id && portsMap ? portsMap[n.pipeline_id] : null; nodes.push({ id: n.id, type: "pipeline", @@ -54,6 +84,8 @@ export function dagConfigToFlow(dag: DagConfig): { label: n.pipeline_id || n.id, pipelineId: n.pipeline_id, nodeType: "pipeline", + streamInputs: ports?.inputs ?? ["video"], + streamOutputs: ports?.outputs ?? ["video"], }, }); }); diff --git a/frontend/src/pages/StreamPage.tsx b/frontend/src/pages/StreamPage.tsx index 3d7099a36..7b0998768 100644 --- a/frontend/src/pages/StreamPage.tsx +++ b/frontend/src/pages/StreamPage.tsx @@ -28,7 +28,7 @@ import type { DownloadProgress, } from "../types"; import type { PromptItem, PromptTransition } from "../lib/api"; -import { getInputSourceResolution } from "../lib/api"; +import { getInputSourceResolution, getDag } from "../lib/api"; import { sendLoRAScaleUpdates } from "../utils/loraHelpers"; import { toast } from "sonner"; @@ -786,6 +786,10 @@ export function StreamPage() { }); }; + const handleGraphModeChange = (enabled: boolean) => { + updateSettings({ graphMode: enabled }); + }; + const handleSpoutSenderChange = ( spoutSender: { enabled: boolean; name: string } | undefined ) => { @@ -1000,14 +1004,43 @@ export function StreamPage() { const pipelineIdToUse = overridePipelineId || settings.pipelineId; try { - // Build pipeline chain: preprocessors + main pipeline + postprocessors - const pipelineIds: string[] = []; - if (settings.preprocessorIds && settings.preprocessorIds.length > 0) { - pipelineIds.push(...settings.preprocessorIds); - } - pipelineIds.push(pipelineIdToUse); - if (settings.postprocessorIds && settings.postprocessorIds.length > 0) { - pipelineIds.push(...settings.postprocessorIds); + // Build pipeline chain depending on graph mode + let pipelineIds: string[]; + + if (settings.graphMode) { + // Graph mode: fetch pipeline IDs from the DAG + try { + const dagResponse = await getDag(); + if (dagResponse.dag) { + const dagPipelineIds = dagResponse.dag.nodes + .filter(n => n.type === "pipeline" && n.pipeline_id) + .map(n => n.pipeline_id as string); + if (dagPipelineIds.length > 0) { + pipelineIds = dagPipelineIds; + } else { + console.warn( + "DAG has no pipeline nodes, falling back to settings" + ); + pipelineIds = [pipelineIdToUse]; + } + } else { + console.warn("No DAG configured, falling back to settings"); + pipelineIds = [pipelineIdToUse]; + } + } catch (err) { + console.error("Failed to fetch DAG, falling back to settings:", err); + pipelineIds = [pipelineIdToUse]; + } + } else { + // Manual mode: build chain from preprocessors + main + postprocessors + pipelineIds = []; + if (settings.preprocessorIds && settings.preprocessorIds.length > 0) { + pipelineIds.push(...settings.preprocessorIds); + } + pipelineIds.push(pipelineIdToUse); + if (settings.postprocessorIds && settings.postprocessorIds.length > 0) { + pipelineIds.push(...settings.postprocessorIds); + } } // Check if models are needed but not downloaded for all pipelines in the chain @@ -1699,6 +1732,8 @@ export function StreamPage() { } }} isCloudMode={isCloudMode} + graphMode={settings.graphMode ?? true} + onGraphModeChange={handleGraphModeChange} />
diff --git a/frontend/src/types/index.ts b/frontend/src/types/index.ts index 97a10b799..aa30e82e5 100644 --- a/frontend/src/types/index.ts +++ b/frontend/src/types/index.ts @@ -86,6 +86,8 @@ export interface SettingsState { source_type: string; source_name: string; }; + // Graph mode: when enabled, pipeline chain comes from DAG editor instead of manual controls + graphMode?: boolean; // Dynamic schema-driven fields (key = schema field name snake_case, value = parsed value) schemaFieldOverrides?: Record; // Schema-driven overrides for preprocessor/postprocessor plugin configs diff --git a/src/scope/core/pipelines/base_schema.py b/src/scope/core/pipelines/base_schema.py index b5fdf895f..4c875bd12 100644 --- a/src/scope/core/pipelines/base_schema.py +++ b/src/scope/core/pipelines/base_schema.py @@ -389,6 +389,8 @@ def get_schema_with_metadata(cls) -> dict[str, Any]: metadata["modified"] = cls.modified # Convert UsageType enum values to strings for JSON serialization metadata["usage"] = [usage.value for usage in cls.usage] if cls.usage else [] + metadata["stream_inputs"] = list(cls.stream_inputs) + metadata["stream_outputs"] = list(cls.stream_outputs) metadata["config_schema"] = cls.model_json_schema() # Include mode-specific defaults (excluding None values and the "default" flag) diff --git a/src/scope/server/app.py b/src/scope/server/app.py index c99c49893..58937d2f5 100644 --- a/src/scope/server/app.py +++ b/src/scope/server/app.py @@ -512,16 +512,19 @@ async def load_pipeline( f"(was: {request.pipeline_ids})" ) pipeline_ids = dag_pipeline_ids - load_params_dict = None + # Pass UI load_params (resolution, seed, vae_type, etc.) so each + # pipeline in the DAG is loaded with the same params; each pipeline + # uses only the params it needs (e.g. longlive uses height/width). + load_params_dict = request.load_params elif (dag_pipeline_ids := get_pipeline_ids_from_input_json()) is not None: logger.info( f"Overriding pipeline_ids from input.json: {dag_pipeline_ids} " f"(was: {request.pipeline_ids})" ) pipeline_ids = dag_pipeline_ids - # Discard UI load_params — they belong to whatever pipeline the UI - # selected, not to the pipelines declared in input.json. - load_params_dict = None + # Pass UI load_params so resolution and other params are applied to + # each pipeline in the DAG (e.g. longlive gets height/width/seed). + load_params_dict = request.load_params else: pipeline_ids = request.pipeline_ids # load_params is already a dict (or None) diff --git a/src/scope/server/dag_executor.py b/src/scope/server/dag_executor.py index 6770b7d9f..f8b96ca34 100644 --- a/src/scope/server/dag_executor.py +++ b/src/scope/server/dag_executor.py @@ -21,7 +21,8 @@ logger = logging.getLogger(__name__) # Default queue sizes (match pipeline_processor) -DEFAULT_INPUT_QUEUE_MAXSIZE = 30 +# Use larger size for inter-pipeline queues so downstream can accumulate a full chunk +DEFAULT_INPUT_QUEUE_MAXSIZE = 64 DEFAULT_OUTPUT_QUEUE_MAXSIZE = 8 @@ -106,7 +107,7 @@ def build_dag( with processor.input_queue_lock: processor.input_queue = processor.input_queues.get("video") - # 4) Set each producer's output_queues per port (replace with wired queues) + # 4) Set each producer's output_queues per port and wire consumer input to same queue for node in dag.nodes: if node.type != "pipeline" or node.id not in node_processors: continue @@ -118,6 +119,12 @@ def build_dag( q = stream_queues.get((e.to_node, e.to_port)) if q is not None and q not in out_by_port.get(e.from_port, []): out_by_port.setdefault(e.from_port, []).append(q) + # Symmetric wiring: ensure consumer reads from this queue (fixes chained pipelines) + consumer = node_processors.get(e.to_node) + if consumer is not None: + consumer.input_queues[e.to_port] = q + with consumer.input_queue_lock: + consumer.input_queue = consumer.input_queues.get("video") for port, qlist in out_by_port.items(): proc.output_queues[port] = qlist From 353b52a2a40c1a73ff51ed6c79d297ba1a8ca083 Mon Sep 17 00:00:00 2001 From: Rafal Leszko Date: Tue, 17 Feb 2026 10:31:04 +0000 Subject: [PATCH 06/11] Add Import --- frontend/src/components/dag/DagEditor.tsx | 62 ++++++++++++++++++++++- 1 file changed, 61 insertions(+), 1 deletion(-) diff --git a/frontend/src/components/dag/DagEditor.tsx b/frontend/src/components/dag/DagEditor.tsx index ebd20d157..29d259ad3 100644 --- a/frontend/src/components/dag/DagEditor.tsx +++ b/frontend/src/components/dag/DagEditor.tsx @@ -1,4 +1,4 @@ -import { useCallback, useEffect, useState, useMemo } from "react"; +import { useCallback, useEffect, useRef, useState, useMemo } from "react"; import { ReactFlow, Controls, @@ -190,6 +190,53 @@ export function DagEditor() { } }, [setNodes, setEdges]); + const fileInputRef = useRef(null); + + const handleImport = useCallback( + (event: React.ChangeEvent) => { + const file = event.target.files?.[0]; + if (!file) return; + + const reader = new FileReader(); + reader.onload = e => { + try { + const dagConfig = JSON.parse(e.target?.result as string); + if (!dagConfig.nodes || !dagConfig.edges) { + setStatus("Import failed: invalid DAG format"); + return; + } + const { nodes: flowNodes, edges: flowEdges } = dagConfigToFlow( + dagConfig, + portsMap + ); + const enrichedNodes = flowNodes.map(n => + n.data.nodeType === "pipeline" + ? { + ...n, + data: { + ...n.data, + availablePipelineIds, + pipelinePortsMap: portsMap, + }, + } + : n + ); + setNodes(enrichedNodes); + setEdges(flowEdges); + setDagSource(null); + setStatus(`Imported from ${file.name}`); + } catch { + setStatus("Import failed: invalid JSON"); + } + }; + reader.readAsText(file); + + // Reset so the same file can be re-imported + event.target.value = ""; + }, + [portsMap, availablePipelineIds, setNodes, setEdges] + ); + const handleExport = useCallback(() => { const dagConfig = flowToDagConfig(nodes, edges); const dataStr = JSON.stringify(dagConfig, null, 2); @@ -228,6 +275,19 @@ export function DagEditor() { + Sink
+ + + {status && ( {status} @@ -318,21 +332,23 @@ export function DagEditor() { {/* Flow Canvas */}
- - - - - + + + + + + +
); diff --git a/frontend/src/components/dag/DagPreviewContext.tsx b/frontend/src/components/dag/DagPreviewContext.tsx new file mode 100644 index 000000000..b53e82d7e --- /dev/null +++ b/frontend/src/components/dag/DagPreviewContext.tsx @@ -0,0 +1,11 @@ +import { createContext, useContext } from "react"; + +/** Maps node_id (e.g. "input", pipeline_id, or sink id) to a data URL */ +export type PreviewMap = Record; + +export const DagPreviewContext = createContext({}); + +export function useDagPreview(nodeId: string): string | undefined { + const previews = useContext(DagPreviewContext); + return previews[nodeId]; +} diff --git a/frontend/src/components/dag/PipelineNode.tsx b/frontend/src/components/dag/PipelineNode.tsx index 610b5a3e1..7641ee2cf 100644 --- a/frontend/src/components/dag/PipelineNode.tsx +++ b/frontend/src/components/dag/PipelineNode.tsx @@ -1,6 +1,7 @@ import { Handle, Position, useReactFlow } from "@xyflow/react"; import type { NodeProps, Node } from "@xyflow/react"; import type { FlowNodeData } from "../../lib/dagUtils"; +import { useDagPreview } from "./DagPreviewContext"; type PipelineNodeType = Node; @@ -18,6 +19,7 @@ function getPortColor(portName: string): string { export function PipelineNode({ id, data }: NodeProps) { const { setNodes } = useReactFlow(); + const previewUrl = useDagPreview(id); const pipelineIds = data.availablePipelineIds || []; const portsMap = data.pipelinePortsMap; @@ -50,7 +52,7 @@ export function PipelineNode({ id, data }: NodeProps) { const outputCount = streamOutputs.length; return ( -
+
Pipeline