From d415162d5b874ffd48c23d6575ee84c88b9ada9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Mon, 23 Feb 2026 10:51:11 +0100 Subject: [PATCH 1/3] Add Graph Mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Rafał Leszko --- frontend/package-lock.json | 230 ++++++ frontend/package.json | 1 + frontend/src/components/Header.tsx | 45 +- frontend/src/components/SettingsPanel.tsx | 14 +- frontend/src/components/graph/GraphEditor.tsx | 451 +++++++++++ .../components/graph/NodeParametersPanel.tsx | 127 +++ .../src/components/graph/PipelineNode.tsx | 105 +++ frontend/src/components/graph/SinkNode.tsx | 20 + frontend/src/components/graph/SourceNode.tsx | 20 + frontend/src/hooks/useUnifiedWebRTC.ts | 2 + frontend/src/lib/api.ts | 79 ++ frontend/src/lib/graphUtils.ts | 294 +++++++ frontend/src/pages/StreamPage.tsx | 730 ++++++++++-------- src/scope/core/pipelines/base_schema.py | 6 + src/scope/core/pipelines/longlive/schema.py | 3 + src/scope/server/app.py | 95 +++ src/scope/server/frame_processor.py | 124 ++- src/scope/server/graph_executor.py | 173 +++++ src/scope/server/graph_schema.py | 150 ++++ src/scope/server/graph_state.py | 41 + src/scope/server/pipeline_processor.py | 306 +++++--- src/scope/server/schema.py | 6 + 22 files changed, 2540 insertions(+), 482 deletions(-) create mode 100644 frontend/src/components/graph/GraphEditor.tsx create mode 100644 frontend/src/components/graph/NodeParametersPanel.tsx create mode 100644 frontend/src/components/graph/PipelineNode.tsx create mode 100644 frontend/src/components/graph/SinkNode.tsx create mode 100644 frontend/src/components/graph/SourceNode.tsx create mode 100644 frontend/src/lib/graphUtils.ts create mode 100644 src/scope/server/graph_executor.py create mode 100644 src/scope/server/graph_schema.py create mode 100644 src/scope/server/graph_state.py diff --git a/frontend/package-lock.json b/frontend/package-lock.json index 6c6688f4c..864304484 100644 --- a/frontend/package-lock.json +++ b/frontend/package-lock.json @@ -18,6 +18,7 @@ "@radix-ui/react-toggle": "^1.1.10", "@radix-ui/react-toggle-group": "^1.1.11", "@radix-ui/react-tooltip": "^1.2.8", + "@xyflow/react": "^12.10.1", "class-variance-authority": "^0.7.1", "clsx": "^2.1.1", "lucide-react": "^0.544.0", @@ -2658,6 +2659,55 @@ "@babel/types": "^7.28.2" } }, + "node_modules/@types/d3-color": { + "version": "3.1.3", + "resolved": "https://registry.npmjs.org/@types/d3-color/-/d3-color-3.1.3.tgz", + "integrity": "sha512-iO90scth9WAbmgv7ogoq57O9YpKmFBbmoEoCHDB2xMBY0+/KVrqAaCDyCE16dUspeOvIxFFRI+0sEtqDqy2b4A==", + "license": "MIT" + }, + "node_modules/@types/d3-drag": { + "version": "3.0.7", + "resolved": "https://registry.npmjs.org/@types/d3-drag/-/d3-drag-3.0.7.tgz", + "integrity": "sha512-HE3jVKlzU9AaMazNufooRJ5ZpWmLIoc90A37WU2JMmeq28w1FQqCZswHZ3xR+SuxYftzHq6WU6KJHvqxKzTxxQ==", + "license": "MIT", + "dependencies": { + "@types/d3-selection": "*" + } + }, + "node_modules/@types/d3-interpolate": { + "version": "3.0.4", + "resolved": "https://registry.npmjs.org/@types/d3-interpolate/-/d3-interpolate-3.0.4.tgz", + "integrity": "sha512-mgLPETlrpVV1YRJIglr4Ez47g7Yxjl1lj7YKsiMCb27VJH9W8NVM6Bb9d8kkpG/uAQS5AmbA48q2IAolKKo1MA==", + "license": "MIT", + "dependencies": { + "@types/d3-color": "*" + } + }, + "node_modules/@types/d3-selection": { + "version": "3.0.11", + "resolved": "https://registry.npmjs.org/@types/d3-selection/-/d3-selection-3.0.11.tgz", + "integrity": "sha512-bhAXu23DJWsrI45xafYpkQ4NtcKMwWnAC/vKrd2l+nxMFuvOT3XMYTIj2opv8vq8AO5Yh7Qac/nSeP/3zjTK0w==", + "license": "MIT" + }, + "node_modules/@types/d3-transition": { + "version": "3.0.9", + "resolved": "https://registry.npmjs.org/@types/d3-transition/-/d3-transition-3.0.9.tgz", + "integrity": "sha512-uZS5shfxzO3rGlu0cC3bjmMFKsXv+SmZZcgp0KD22ts4uGXp5EVYGzu/0YdwZeKmddhcAccYtREJKkPfXkZuCg==", + "license": "MIT", + "dependencies": { + "@types/d3-selection": "*" + } + }, + "node_modules/@types/d3-zoom": { + "version": "3.0.8", + "resolved": "https://registry.npmjs.org/@types/d3-zoom/-/d3-zoom-3.0.8.tgz", + "integrity": "sha512-iqMC4/YlFCSlO8+2Ii1GGGliCAY4XdeG748w5vQUbevlbDu0zSjH/+jojorQVBK/se0j6DUFNPBGSqD3YWYnDw==", + "license": "MIT", + "dependencies": { + "@types/d3-interpolate": "*", + "@types/d3-selection": "*" + } + }, "node_modules/@types/estree": { "version": "1.0.8", "resolved": "https://registry.npmjs.org/@types/estree/-/estree-1.0.8.tgz", @@ -2984,6 +3034,38 @@ "vite": "^4.2.0 || ^5.0.0 || ^6.0.0 || ^7.0.0" } }, + "node_modules/@xyflow/react": { + "version": "12.10.1", + "resolved": "https://registry.npmjs.org/@xyflow/react/-/react-12.10.1.tgz", + "integrity": "sha512-5eSWtIK/+rkldOuFbOOz44CRgQRjtS9v5nufk77DV+XBnfCGL9HAQ8PG00o2ZYKqkEU/Ak6wrKC95Tu+2zuK3Q==", + "license": "MIT", + "dependencies": { + "@xyflow/system": "0.0.75", + "classcat": "^5.0.3", + "zustand": "^4.4.0" + }, + "peerDependencies": { + "react": ">=17", + "react-dom": ">=17" + } + }, + "node_modules/@xyflow/system": { + "version": "0.0.75", + "resolved": "https://registry.npmjs.org/@xyflow/system/-/system-0.0.75.tgz", + "integrity": "sha512-iXs+AGFLi8w/VlAoc/iSxk+CxfT6o64Uw/k0CKASOPqjqz6E0rb5jFZgJtXGZCpfQI6OQpu5EnumP5fGxQheaQ==", + "license": "MIT", + "dependencies": { + "@types/d3-drag": "^3.0.7", + "@types/d3-interpolate": "^3.0.4", + "@types/d3-selection": "^3.0.10", + "@types/d3-transition": "^3.0.8", + "@types/d3-zoom": "^3.0.8", + "d3-drag": "^3.0.0", + "d3-interpolate": "^3.0.1", + "d3-selection": "^3.0.0", + "d3-zoom": "^3.0.0" + } + }, "node_modules/acorn": { "version": "8.15.0", "resolved": "https://registry.npmjs.org/acorn/-/acorn-8.15.0.tgz", @@ -3242,6 +3324,12 @@ "url": "https://polar.sh/cva" } }, + "node_modules/classcat": { + "version": "5.0.5", + "resolved": "https://registry.npmjs.org/classcat/-/classcat-5.0.5.tgz", + "integrity": "sha512-JhZUT7JFcQy/EzW605k/ktHtncoo9vnyW/2GspNYwFlN1C/WmjuV/xtS04e9SOkL2sTdw0VAZ2UGCcQ9lR6p6w==", + "license": "MIT" + }, "node_modules/clsx": { "version": "2.1.1", "resolved": "https://registry.npmjs.org/clsx/-/clsx-2.1.1.tgz", @@ -3307,6 +3395,111 @@ "devOptional": true, "license": "MIT" }, + "node_modules/d3-color": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/d3-color/-/d3-color-3.1.0.tgz", + "integrity": "sha512-zg/chbXyeBtMQ1LbD/WSoW2DpC3I0mpmPdW+ynRTj/x2DAWYrIY7qeZIHidozwV24m4iavr15lNwIwLxRmOxhA==", + "license": "ISC", + "engines": { + "node": ">=12" + } + }, + "node_modules/d3-dispatch": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/d3-dispatch/-/d3-dispatch-3.0.1.tgz", + "integrity": "sha512-rzUyPU/S7rwUflMyLc1ETDeBj0NRuHKKAcvukozwhshr6g6c5d8zh4c2gQjY2bZ0dXeGLWc1PF174P2tVvKhfg==", + "license": "ISC", + "engines": { + "node": ">=12" + } + }, + "node_modules/d3-drag": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/d3-drag/-/d3-drag-3.0.0.tgz", + "integrity": "sha512-pWbUJLdETVA8lQNJecMxoXfH6x+mO2UQo8rSmZ+QqxcbyA3hfeprFgIT//HW2nlHChWeIIMwS2Fq+gEARkhTkg==", + "license": "ISC", + "dependencies": { + "d3-dispatch": "1 - 3", + "d3-selection": "3" + }, + "engines": { + "node": ">=12" + } + }, + "node_modules/d3-ease": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/d3-ease/-/d3-ease-3.0.1.tgz", + "integrity": "sha512-wR/XK3D3XcLIZwpbvQwQ5fK+8Ykds1ip7A2Txe0yxncXSdq1L9skcG7blcedkOX+ZcgxGAmLX1FrRGbADwzi0w==", + "license": "BSD-3-Clause", + "engines": { + "node": ">=12" + } + }, + "node_modules/d3-interpolate": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/d3-interpolate/-/d3-interpolate-3.0.1.tgz", + "integrity": "sha512-3bYs1rOD33uo8aqJfKP3JWPAibgw8Zm2+L9vBKEHJ2Rg+viTR7o5Mmv5mZcieN+FRYaAOWX5SJATX6k1PWz72g==", + "license": "ISC", + "dependencies": { + "d3-color": "1 - 3" + }, + "engines": { + "node": ">=12" + } + }, + "node_modules/d3-selection": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/d3-selection/-/d3-selection-3.0.0.tgz", + "integrity": "sha512-fmTRWbNMmsmWq6xJV8D19U/gw/bwrHfNXxrIN+HfZgnzqTHp9jOmKMhsTUjXOJnZOdZY9Q28y4yebKzqDKlxlQ==", + "license": "ISC", + "engines": { + "node": ">=12" + } + }, + "node_modules/d3-timer": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/d3-timer/-/d3-timer-3.0.1.tgz", + "integrity": "sha512-ndfJ/JxxMd3nw31uyKoY2naivF+r29V+Lc0svZxe1JvvIRmi8hUsrMvdOwgS1o6uBHmiz91geQ0ylPP0aj1VUA==", + "license": "ISC", + "engines": { + "node": ">=12" + } + }, + "node_modules/d3-transition": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/d3-transition/-/d3-transition-3.0.1.tgz", + "integrity": "sha512-ApKvfjsSR6tg06xrL434C0WydLr7JewBB3V+/39RMHsaXTOG0zmt/OAXeng5M5LBm0ojmxJrpomQVZ1aPvBL4w==", + "license": "ISC", + "dependencies": { + "d3-color": "1 - 3", + "d3-dispatch": "1 - 3", + "d3-ease": "1 - 3", + "d3-interpolate": "1 - 3", + "d3-timer": "1 - 3" + }, + "engines": { + "node": ">=12" + }, + "peerDependencies": { + "d3-selection": "2 - 3" + } + }, + "node_modules/d3-zoom": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/d3-zoom/-/d3-zoom-3.0.0.tgz", + "integrity": "sha512-b8AmV3kfQaqWAuacbPuNbL6vahnOJflOhexLzMMNLga62+/nh0JzvJ0aO/5a5MVgUFGS7Hu1P9P03o3fJkDCyw==", + "license": "ISC", + "dependencies": { + "d3-dispatch": "1 - 3", + "d3-drag": "2 - 3", + "d3-interpolate": "1 - 3", + "d3-selection": "2 - 3", + "d3-transition": "2 - 3" + }, + "engines": { + "node": ">=12" + } + }, "node_modules/debug": { "version": "4.4.1", "resolved": "https://registry.npmjs.org/debug/-/debug-4.4.1.tgz", @@ -5194,6 +5387,15 @@ } } }, + "node_modules/use-sync-external-store": { + "version": "1.6.0", + "resolved": "https://registry.npmjs.org/use-sync-external-store/-/use-sync-external-store-1.6.0.tgz", + "integrity": "sha512-Pp6GSwGP/NrPIrxVFAIkOQeyw8lFenOHijQWkUTrDvrF4ALqylP2C/KCkeS9dpUM3KvYRQhna5vt7IL95+ZQ9w==", + "license": "MIT", + "peerDependencies": { + "react": "^16.8.0 || ^17.0.0 || ^18.0.0 || ^19.0.0" + } + }, "node_modules/vite": { "version": "7.3.0", "resolved": "https://registry.npmjs.org/vite/-/vite-7.3.0.tgz", @@ -5345,6 +5547,34 @@ "funding": { "url": "https://github.com/sponsors/sindresorhus" } + }, + "node_modules/zustand": { + "version": "4.5.7", + "resolved": "https://registry.npmjs.org/zustand/-/zustand-4.5.7.tgz", + "integrity": "sha512-CHOUy7mu3lbD6o6LJLfllpjkzhHXSBlX8B9+qPddUsIfeF5S/UZ5q0kmCsnRqT1UHFQZchNFDDzMbQsuesHWlw==", + "license": "MIT", + "dependencies": { + "use-sync-external-store": "^1.2.2" + }, + "engines": { + "node": ">=12.7.0" + }, + "peerDependencies": { + "@types/react": ">=16.8", + "immer": ">=9.0.6", + "react": ">=16.8" + }, + "peerDependenciesMeta": { + "@types/react": { + "optional": true + }, + "immer": { + "optional": true + }, + "react": { + "optional": true + } + } } } } diff --git a/frontend/package.json b/frontend/package.json index 2c50024fe..d1dc3f961 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -24,6 +24,7 @@ "@radix-ui/react-toggle": "^1.1.10", "@radix-ui/react-toggle-group": "^1.1.11", "@radix-ui/react-tooltip": "^1.2.8", + "@xyflow/react": "^12.10.1", "class-variance-authority": "^0.7.1", "clsx": "^2.1.1", "lucide-react": "^0.544.0", diff --git a/frontend/src/components/Header.tsx b/frontend/src/components/Header.tsx index 580e77b99..42425e1fa 100644 --- a/frontend/src/components/Header.tsx +++ b/frontend/src/components/Header.tsx @@ -1,5 +1,12 @@ import { useState, useEffect, useRef } from "react"; -import { Settings, Cloud, CloudOff, Plug } from "lucide-react"; +import { + Settings, + Cloud, + CloudOff, + Plug, + Workflow, + Monitor, +} from "lucide-react"; import { Button } from "./ui/button"; import { SettingsDialog } from "./SettingsDialog"; import { PluginsDialog } from "./PluginsDialog"; @@ -13,6 +20,9 @@ interface HeaderProps { // External settings tab control openSettingsTab?: string | null; onSettingsTabOpened?: () => void; + // Graph mode toggle + graphMode?: boolean; + onGraphModeToggle?: () => void; } export function Header({ @@ -21,6 +31,8 @@ export function Header({ cloudDisabled, openSettingsTab, onSettingsTabOpened, + graphMode = false, + onGraphModeToggle, }: HeaderProps) { const [settingsOpen, setSettingsOpen] = useState(false); const [pluginsOpen, setPluginsOpen] = useState(false); @@ -119,7 +131,36 @@ export function Header({ return (
-

Daydream Scope

+
+

+ Daydream Scope +

+ {onGraphModeToggle && ( + + )} +
+ + + {status && ( + + {status} + {graphSource && ( + ({graphSource}) + )} + + )} +
+ + {/* Flow Canvas */} +
+ setSelectedNodeId(node.id)} + onPaneClick={() => setSelectedNodeId(null)} + nodeTypes={nodeTypes} + colorMode="dark" + fitView + deleteKeyCode={["Backspace", "Delete"]} + > + + + +
+
+ + {/* Right panel: Node parameters */} + {selectedNodeId && selectedPipelineId && ( +
+ +
+ )} + + ); +} diff --git a/frontend/src/components/graph/NodeParametersPanel.tsx b/frontend/src/components/graph/NodeParametersPanel.tsx new file mode 100644 index 000000000..033cbbc67 --- /dev/null +++ b/frontend/src/components/graph/NodeParametersPanel.tsx @@ -0,0 +1,127 @@ +import { useMemo } from "react"; +import type { PipelineSchemaInfo } from "../../lib/api"; +import { + parseConfigurationFields, + type ParsedFieldConfig, +} from "../../lib/schemaSettings"; +import { SchemaPrimitiveField } from "../PrimitiveFields"; + +interface NodeParametersPanelProps { + /** The pipeline_id of the selected node */ + pipelineId: string | null; + /** The node_id in the graph */ + nodeId: string; + /** All available pipeline schemas */ + pipelineSchemas: Record; + /** Current parameter values for this node */ + parameterValues: Record; + /** Callback when a parameter value changes */ + onParameterChange: (nodeId: string, key: string, value: unknown) => void; + /** Whether the stream is currently active */ + isStreaming: boolean; +} + +export function NodeParametersPanel({ + pipelineId, + nodeId, + pipelineSchemas, + parameterValues, + onParameterChange, + isStreaming, +}: NodeParametersPanelProps) { + const schema = pipelineId ? pipelineSchemas[pipelineId] : null; + + const { runtimeFields, loadFields } = useMemo(() => { + if (!schema?.config_schema) { + return { runtimeFields: [], loadFields: [] }; + } + + const allFields = parseConfigurationFields( + schema.config_schema as import("../../lib/schemaSettings").ConfigSchemaLike, + undefined // Show all modes in graph editor + ); + + const runtime: ParsedFieldConfig[] = []; + const load: ParsedFieldConfig[] = []; + + for (const field of allFields) { + if (field.ui.is_load_param) { + load.push(field); + } else { + runtime.push(field); + } + } + + return { runtimeFields: runtime, loadFields: load }; + }, [schema]); + + if (!pipelineId || !schema) { + return ( +
+ Select a pipeline node to view its parameters. +
+ ); + } + + const renderPrimitiveField = ( + field: ParsedFieldConfig, + disabled: boolean + ) => { + // Only render primitive fields (skip complex components for now) + if ( + typeof field.fieldType === "string" && + ["text", "number", "slider", "toggle", "enum"].includes(field.fieldType) + ) { + return ( + onParameterChange(nodeId, field.key, val)} + disabled={disabled} + label={field.ui.label} + fieldType={ + field.fieldType as "text" | "number" | "slider" | "toggle" | "enum" + } + /> + ); + } + return null; + }; + + return ( +
+
+

+ {schema.name} +

+

Node: {nodeId}

+
+ + {/* Runtime Parameters */} + {runtimeFields.length > 0 && ( +
+

+ Properties +

+
+ {runtimeFields.map(field => renderPrimitiveField(field, false))} +
+
+ )} + + {/* Load Parameters */} + {loadFields.length > 0 && ( +
+

+ Model Parameters +

+
+ {loadFields.map(field => renderPrimitiveField(field, isStreaming))} +
+
+ )} +
+ ); +} diff --git a/frontend/src/components/graph/PipelineNode.tsx b/frontend/src/components/graph/PipelineNode.tsx new file mode 100644 index 000000000..0cbc1a05b --- /dev/null +++ b/frontend/src/components/graph/PipelineNode.tsx @@ -0,0 +1,105 @@ +import { Handle, Position } from "@xyflow/react"; +import type { NodeProps, Node } from "@xyflow/react"; +import type { FlowNodeData } from "../../lib/graphUtils"; + +type PipelineNodeType = Node; + +/** Color palette for port handles - each port gets a consistent color */ +const PORT_COLORS: Record = { + video: "bg-blue-400", + video2: "bg-cyan-400", + vace_input_frames: "bg-purple-400", + vace_input_masks: "bg-pink-400", +}; + +function getPortColor(portName: string): string { + return PORT_COLORS[portName] ?? "bg-gray-400"; +} + +export function PipelineNode({ + id, + data, + selected, +}: NodeProps) { + const pipelineIds = data.availablePipelineIds || []; + const streamInputs = data.streamInputs ?? ["video"]; + const streamOutputs = data.streamOutputs ?? ["video"]; + const onPipelineSelect = data.onPipelineSelect as + | ((nodeId: string, pipelineId: string | null) => void) + | undefined; + + const handleChange = (e: React.ChangeEvent) => { + const newPipelineId = e.target.value || null; + onPipelineSelect?.(id, newPipelineId); + }; + + // Calculate handle positions evenly distributed + const inputCount = streamInputs.length; + const outputCount = streamOutputs.length; + + return ( +
+
Pipeline
+ + + {/* Port labels */} +
+
+ {streamInputs.map(port => ( +
+ {port} +
+ ))} +
+
+ {streamOutputs.map(port => ( +
+ {port} +
+ ))} +
+
+ + {/* Input handles (left side) */} + {streamInputs.map((port, i) => ( + + ))} + + {/* Output handles (right side) */} + {streamOutputs.map((port, i) => ( + + ))} +
+ ); +} diff --git a/frontend/src/components/graph/SinkNode.tsx b/frontend/src/components/graph/SinkNode.tsx new file mode 100644 index 000000000..efa62bb48 --- /dev/null +++ b/frontend/src/components/graph/SinkNode.tsx @@ -0,0 +1,20 @@ +import { Handle, Position } from "@xyflow/react"; +import type { NodeProps, Node } from "@xyflow/react"; +import type { FlowNodeData } from "../../lib/graphUtils"; + +type SinkNodeType = Node; + +export function SinkNode({ data }: NodeProps) { + return ( +
+
Sink
+
{data.label}
+ +
+ ); +} diff --git a/frontend/src/components/graph/SourceNode.tsx b/frontend/src/components/graph/SourceNode.tsx new file mode 100644 index 000000000..9735c6b1f --- /dev/null +++ b/frontend/src/components/graph/SourceNode.tsx @@ -0,0 +1,20 @@ +import { Handle, Position } from "@xyflow/react"; +import type { NodeProps, Node } from "@xyflow/react"; +import type { FlowNodeData } from "../../lib/graphUtils"; + +type SourceNodeType = Node; + +export function SourceNode({ data }: NodeProps) { + return ( +
+
Source
+
{data.label}
+ +
+ ); +} diff --git a/frontend/src/hooks/useUnifiedWebRTC.ts b/frontend/src/hooks/useUnifiedWebRTC.ts index 190d1d41f..596e80b6f 100644 --- a/frontend/src/hooks/useUnifiedWebRTC.ts +++ b/frontend/src/hooks/useUnifiedWebRTC.ts @@ -466,6 +466,8 @@ export function useUnifiedWebRTC(options?: UseUnifiedWebRTCOptions) { images?: string[]; first_frame_image?: string; last_frame_image?: string; + node_id?: string; + [key: string]: unknown; }) => { if ( dataChannelRef.current && diff --git a/frontend/src/lib/api.ts b/frontend/src/lib/api.ts index 0d1251b72..2d68907b6 100644 --- a/frontend/src/lib/api.ts +++ b/frontend/src/lib/api.ts @@ -470,6 +470,7 @@ export interface PipelineSchemaProperty { $ref?: string; /** UI hints from backend (Field json_schema_extra) */ ui?: SchemaFieldUI; + [k: string]: unknown; } export interface PipelineConfigSchema { @@ -521,6 +522,9 @@ export interface PipelineSchemaInfo { recommended_quantization_vram_threshold: number | null; modified: boolean; plugin_name: string | null; + // Graph port declarations + stream_inputs?: string[]; + stream_outputs?: string[]; } export interface PipelineSchemasResponse { @@ -798,6 +802,81 @@ export const deleteApiKey = async ( return response.json(); }; +// Graph Configuration types and API functions + +export interface GraphNode { + id: string; + type: "source" | "pipeline" | "sink"; + pipeline_id?: string | null; +} + +export interface GraphEdge { + from: string; + from_port: string; + to_node: string; + to_port: string; + kind?: "stream" | "parameter"; +} + +export interface GraphConfig { + nodes: GraphNode[]; + edges: GraphEdge[]; +} + +export interface GraphResponse { + source: "api" | null; + graph: GraphConfig | null; +} + +export const getGraph = async (): Promise => { + const response = await fetch("/api/v1/graph", { + method: "GET", + headers: { "Content-Type": "application/json" }, + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error( + `Get graph failed: ${response.status} ${response.statusText}: ${errorText}` + ); + } + + return response.json(); +}; + +export const setGraph = async ( + graph: GraphConfig +): Promise<{ message: string; nodes: number; edges: number }> => { + const response = await fetch("/api/v1/graph", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(graph), + }); + + if (!response.ok) { + const detail = await extractErrorDetail(response); + throw new Error(detail); + } + + return response.json(); +}; + +export const clearGraph = async (): Promise<{ message: string }> => { + const response = await fetch("/api/v1/graph", { + method: "DELETE", + headers: { "Content-Type": "application/json" }, + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error( + `Clear graph failed: ${response.status} ${response.statusText}: ${errorText}` + ); + } + + return response.json(); +}; + export const downloadRecording = async (sessionId: string): Promise => { if (!sessionId) { throw new Error("Session ID is required to download recording"); diff --git a/frontend/src/lib/graphUtils.ts b/frontend/src/lib/graphUtils.ts new file mode 100644 index 000000000..e8d07a349 --- /dev/null +++ b/frontend/src/lib/graphUtils.ts @@ -0,0 +1,294 @@ +import type { Node, Edge } from "@xyflow/react"; +import type { + GraphConfig, + GraphNode, + GraphEdge, + PipelineSchemaInfo, +} from "./api"; + +// Layout constants +const NODE_WIDTH = 200; +const NODE_HEIGHT = 60; +const COLUMN_GAP = 300; +const ROW_GAP = 100; +const START_X = 50; +const START_Y = 50; + +export interface PortInfo { + name: string; +} + +export interface FlowNodeData { + label: string; + pipelineId?: string | null; + nodeType: "source" | "pipeline" | "sink"; + availablePipelineIds?: string[]; + /** Declared input ports for the selected pipeline */ + streamInputs?: string[]; + /** Declared output ports for the selected pipeline */ + streamOutputs?: string[]; + /** Pipeline schemas keyed by pipeline_id, for looking up ports on selection change */ + pipelinePortsMap?: Record; + [key: string]: unknown; +} + +/** + * Build a map of pipeline_id -> { inputs, outputs } from schemas. + */ +export function buildPipelinePortsMap( + schemas: Record +): Record { + const map: Record = {}; + for (const [id, schema] of Object.entries(schemas)) { + map[id] = { + inputs: schema.stream_inputs ?? ["video"], + outputs: schema.stream_outputs ?? ["video"], + }; + } + return map; +} + +/** + * Convert backend GraphConfig to React Flow nodes and edges. + * Auto-layout: sources on the left, pipelines in the middle, sinks on the right. + */ +export function graphConfigToFlow( + graph: GraphConfig, + portsMap?: Record +): { + nodes: Node[]; + edges: Edge[]; +} { + const sources = graph.nodes.filter(n => n.type === "source"); + const pipelines = graph.nodes.filter(n => n.type === "pipeline"); + const sinks = graph.nodes.filter(n => n.type === "sink"); + + const nodes: Node[] = []; + + // Layout sources (column 0) + sources.forEach((n, i) => { + nodes.push({ + id: n.id, + type: "source", + position: { x: START_X, y: START_Y + i * (NODE_HEIGHT + ROW_GAP) }, + data: { label: n.id, nodeType: "source" }, + }); + }); + + // Layout pipelines (column 1) + pipelines.forEach((n, i) => { + const ports = n.pipeline_id && portsMap ? portsMap[n.pipeline_id] : null; + nodes.push({ + id: n.id, + type: "pipeline", + position: { + x: START_X + COLUMN_GAP, + y: START_Y + i * (NODE_HEIGHT + ROW_GAP), + }, + data: { + label: n.pipeline_id || n.id, + pipelineId: n.pipeline_id, + nodeType: "pipeline", + streamInputs: ports?.inputs ?? ["video"], + streamOutputs: ports?.outputs ?? ["video"], + }, + }); + }); + + // Layout sinks (column 2) + sinks.forEach((n, i) => { + nodes.push({ + id: n.id, + type: "sink", + position: { + x: START_X + COLUMN_GAP * 2, + y: START_Y + i * (NODE_HEIGHT + ROW_GAP), + }, + data: { label: n.id, nodeType: "sink" }, + }); + }); + + // Convert edges + const edges: Edge[] = graph.edges.map((e, i) => ({ + id: `e-${i}-${e.from}-${e.to_node}`, + source: e.from, + sourceHandle: e.from_port, + target: e.to_node, + targetHandle: e.to_port, + label: e.from_port !== "video" ? e.from_port : undefined, + animated: true, + })); + + return { nodes, edges }; +} + +/** + * Convert React Flow state back to backend GraphConfig JSON. + */ +export function flowToGraphConfig( + nodes: Node[], + edges: Edge[] +): GraphConfig { + const graphNodes: GraphNode[] = nodes.map(n => ({ + id: n.id, + type: n.data.nodeType, + pipeline_id: + n.data.nodeType === "pipeline" ? (n.data.pipelineId ?? null) : undefined, + })); + + const graphEdges: GraphEdge[] = edges.map(e => ({ + from: e.source, + from_port: e.sourceHandle || "video", + to_node: e.target, + to_port: e.targetHandle || "video", + kind: "stream" as const, + })); + + return { nodes: graphNodes, edges: graphEdges }; +} + +/** + * Generate a unique node ID with a given prefix. + */ +export function generateNodeId( + prefix: string, + existingIds: Set +): string { + if (!existingIds.has(prefix)) return prefix; + let i = 1; + while (existingIds.has(`${prefix}_${i}`)) i++; + return `${prefix}_${i}`; +} + +/** + * Build a linear graph from settings panel config (frontend-only). + * Produces: source -> preprocessor0 -> ... -> pipeline -> postprocessor0 -> ... -> sink. + */ +export function linearGraphFromSettings( + pipelineId: string, + preprocessorIds: string[], + postprocessorIds: string[] +): GraphConfig { + const allIds = [...preprocessorIds, pipelineId, ...postprocessorIds]; + const nodes: GraphNode[] = [ + { id: "input", type: "source" }, + ...allIds.map(pid => ({ + id: pid, + type: "pipeline" as const, + pipeline_id: pid, + })), + { id: "output", type: "sink" }, + ]; + + const edges: GraphEdge[] = []; + let prev = "input"; + for (const pid of allIds) { + edges.push({ + from: prev, + from_port: "video", + to_node: pid, + to_port: "video", + kind: "stream", + }); + prev = pid; + } + edges.push({ + from: prev, + from_port: "video", + to_node: "output", + to_port: "video", + kind: "stream", + }); + + return { nodes, edges }; +} + +/** + * Try to extract linear pipeline settings from a graph config. + * + * Returns null if the graph is non-linear (branching, fan-out, etc.). + * Returns the pipeline settings if the graph is a simple linear chain: + * source → preprocessors → pipeline → postprocessors → sink. + */ +export function tryExtractLinearSettings( + graph: GraphConfig, + pipelines: Record +): { + pipelineId: string; + preprocessorIds: string[]; + postprocessorIds: string[]; +} | null { + // 1. Must have exactly 1 source and 1 sink + const sources = graph.nodes.filter(n => n.type === "source"); + const sinks = graph.nodes.filter(n => n.type === "sink"); + if (sources.length !== 1 || sinks.length !== 1) return null; + + const sourceId = sources[0].id; + const sinkId = sinks[0].id; + + // 2. All edges must use "video" ports and be "stream" kind + for (const edge of graph.edges) { + if (edge.from_port !== "video" || edge.to_port !== "video") return null; + if (edge.kind && edge.kind !== "stream") return null; + } + + // 3. Walk the chain: source → ... → sink + // Build adjacency: from_node → list of to_nodes + const outgoing = new Map(); + for (const edge of graph.edges) { + const list = outgoing.get(edge.from) ?? []; + list.push(edge.to_node); + outgoing.set(edge.from, list); + } + + // Walk from source to sink + const chain: string[] = []; + let current = sourceId; + const visited = new Set(); + + while (current !== sinkId) { + if (visited.has(current)) return null; // cycle + visited.add(current); + + const next = outgoing.get(current); + if (!next || next.length !== 1) return null; // 0 or 2+ outgoing = non-linear + current = next[0]; + + // Collect pipeline nodes (skip source/sink) + if (current !== sinkId) { + chain.push(current); + } + } + + // 4. Classify pipeline nodes + const preprocessorIds: string[] = []; + const postprocessorIds: string[] = []; + let mainPipelineId: string | null = null; + + for (const nodeId of chain) { + const node = graph.nodes.find(n => n.id === nodeId); + if (!node || node.type !== "pipeline" || !node.pipeline_id) return null; + + const pipelineInfo = pipelines[node.pipeline_id]; + const usage = pipelineInfo?.usage ?? []; + + if (usage.includes("preprocessor")) { + if (mainPipelineId !== null) return null; // preprocessor after main pipeline + preprocessorIds.push(node.pipeline_id); + } else if (usage.includes("postprocessor")) { + if (mainPipelineId === null) return null; // postprocessor before main pipeline + postprocessorIds.push(node.pipeline_id); + } else { + if (mainPipelineId !== null) return null; // 2+ main pipelines + mainPipelineId = node.pipeline_id; + } + } + + // 5. Must have exactly 1 main pipeline + if (mainPipelineId === null) return null; + + return { pipelineId: mainPipelineId, preprocessorIds, postprocessorIds }; +} + +// Default node dimensions for reference +export { NODE_WIDTH, NODE_HEIGHT }; diff --git a/frontend/src/pages/StreamPage.tsx b/frontend/src/pages/StreamPage.tsx index 85230f5d6..6ba9733e0 100644 --- a/frontend/src/pages/StreamPage.tsx +++ b/frontend/src/pages/StreamPage.tsx @@ -5,6 +5,7 @@ import { VideoOutput } from "../components/VideoOutput"; import { SettingsPanel } from "../components/SettingsPanel"; import { PromptInputWithTimeline } from "../components/PromptInputWithTimeline"; import { DownloadDialog } from "../components/DownloadDialog"; +import { GraphEditor } from "../components/graph/GraphEditor"; import type { TimelinePrompt } from "../components/PromptTimeline"; import { StatusBar } from "../components/StatusBar"; import { useUnifiedWebRTC } from "../hooks/useUnifiedWebRTC"; @@ -32,7 +33,8 @@ import type { DownloadProgress, } from "../types"; import type { PromptItem, PromptTransition } from "../lib/api"; -import { getInputSourceResolution } from "../lib/api"; +import { getInputSourceResolution, setGraph, getGraph } from "../lib/api"; +import { linearGraphFromSettings } from "../lib/graphUtils"; import { sendLoRAScaleUpdates } from "../utils/loraHelpers"; import { toast } from "sonner"; @@ -188,6 +190,23 @@ export function StreamPage() { // Track when waiting for cloud WebSocket to connect after clicking Play const [isCloudConnecting, setIsCloudConnecting] = useState(false); + // Graph mode state + const [graphMode, setGraphMode] = useState(false); + + // When true, pipeline controls are disabled in Perform Mode + // (set when user edits anything in Graph Mode, cleared when user clicks Clear) + const [nonLinearGraph, setNonLinearGraph] = useState(false); + + // Called by GraphEditor whenever user edits the graph + const handleGraphChange = useCallback(() => { + setNonLinearGraph(true); + }, []); + + // Called by GraphEditor when user clicks Clear + const handleGraphClear = useCallback(() => { + setNonLinearGraph(false); + }, []); + // Video display state const [videoScaleMode, setVideoScaleMode] = useState<"fit" | "native">("fit"); @@ -394,6 +413,9 @@ export function StreamPage() { }; const handlePipelineIdChange = (pipelineId: PipelineId) => { + // User manually changed pipeline, clear non-linear flag + setNonLinearGraph(false); + // Stop the stream if it's currently running if (isStreaming) { stopStream(); @@ -728,6 +750,8 @@ export function StreamPage() { type PipelineKind = keyof typeof pipelineSettingsKeys; const makePipelineIdsHandler = (kind: PipelineKind) => (ids: string[]) => { + // User manually changed pipeline chain, clear non-linear flag + setNonLinearGraph(false); const k = pipelineSettingsKeys[kind]; // Preserve overrides for processors that remain in the list const currentOverrides = @@ -1231,6 +1255,22 @@ export function StreamPage() { ); } + // Build and save a linear graph so backend uses the unified graph path. + // Skip when user has a custom graph from Graph Mode (nonLinearGraph=true) + // — the backend already has their graph and should use it. + if (!graphMode && !nonLinearGraph) { + try { + const linearGraph = linearGraphFromSettings( + pipelineIdToUse, + settings.preprocessorIds ?? [], + settings.postprocessorIds ?? [] + ); + await setGraph(linearGraph); + } catch (err) { + console.warn("Failed to save linear graph:", err); + } + } + const loadSuccess = await loadPipeline( pipelineIds, loadParams || undefined @@ -1420,352 +1460,396 @@ export function StreamPage() { cloudDisabled={isStreaming} openSettingsTab={openSettingsTab} onSettingsTabOpened={() => setOpenSettingsTab(null)} + graphMode={graphMode} + onGraphModeToggle={async () => { + if (!graphMode) { + // Switching Perform → Graph: only seed a graph if none exists yet + try { + const response = await getGraph(); + if (!response.graph) { + const graph = linearGraphFromSettings( + settings.pipelineId, + settings.preprocessorIds ?? [], + settings.postprocessorIds ?? [] + ); + await setGraph(graph); + } + } catch { + /* ignore */ + } + } + // Graph → Perform: just switch mode (modes are independent) + setGraphMode(prev => !prev); + }} /> {/* Main Content Area */} -
- {/* Left Panel - Input & Controls */} -
- + { - updateSettings({ - schemaFieldOverrides: { - ...(settings.schemaFieldOverrides ?? {}), - [key]: value, - }, - }); - if (isRuntimeParam && isStreaming) { - sendParameterUpdate({ [key]: value }); - } + onNodeParameterChange={(nodeId, key, value) => { + sendParameterUpdate({ node_id: nodeId, [key]: value }); }} + onGraphChange={handleGraphChange} + onGraphClear={handleGraphClear} />
- - {/* Center Panel - Video Output + Timeline */} -
- {/* Video area - takes remaining space but can shrink */} -
- + {/* Left Panel - Input & Controls */} +
+ { - // Use timeline's play/pause handler instead of direct video toggle - if (timelinePlayPauseRef.current) { - timelinePlayPauseRef.current(); - } - }} - onStartStream={() => { - // Use timeline's play/pause handler to start stream - if (timelinePlayPauseRef.current) { - timelinePlayPauseRef.current(); - } - }} - onVideoPlaying={() => { - // Execute callback when video starts playing - if (onVideoPlayingCallbackRef.current) { - onVideoPlayingCallbackRef.current(); - onVideoPlayingCallbackRef.current = null; // Clear after execution + supportsImages={pipelines?.[settings.pipelineId]?.supportsImages} + firstFrameImage={settings.firstFrameImage} + onFirstFrameImageChange={handleFirstFrameImageChange} + lastFrameImage={settings.lastFrameImage} + onLastFrameImageChange={handleLastFrameImageChange} + extensionMode={settings.extensionMode || "firstframe"} + onExtensionModeChange={handleExtensionModeChange} + onSendExtensionFrames={handleSendExtensionFrames} + configSchema={ + pipelines?.[settings.pipelineId]?.configSchema as + | import("../lib/schemaSettings").ConfigSchemaLike + | undefined + } + schemaFieldOverrides={settings.schemaFieldOverrides ?? {}} + onSchemaFieldOverrideChange={(key, value, isRuntimeParam) => { + updateSettings({ + schemaFieldOverrides: { + ...(settings.schemaFieldOverrides ?? {}), + [key]: value, + }, + }); + if (isRuntimeParam && isStreaming) { + sendParameterUpdate({ [key]: value }); } }} - // Controller input props - supportsControllerInput={currentPipelineSupportsController} - isPointerLocked={isPointerLocked} - onRequestPointerLock={requestPointerLock} - videoContainerRef={videoContainerRef} - // Video scale mode - videoScaleMode={videoScaleMode} />
- {/* Timeline area - compact, always visible */} -
- { - // Update the left panel's prompt state to reflect current timeline prompt - const prompts = [{ text, weight: 100 }]; - setPromptItems(prompts); - - // Send to backend - use transition if streaming and transition steps > 0 - if (isStreaming && transitionSteps > 0) { - sendParameterUpdate({ - transition: { - target_prompts: prompts, - num_steps: transitionSteps, - temporal_interpolation_method: - temporalInterpolationMethod, - }, - }); - } else { - // Send direct prompts without transition - sendParameterUpdate({ - prompts, - prompt_interpolation_method: interpolationMethod, - denoising_step_list: settings.denoisingSteps || [700, 500], - }); - } - }} - onPromptItemsSubmit={( - prompts, - blockTransitionSteps, - blockTemporalInterpolationMethod - ) => { - // Update the left panel's prompt state to reflect current timeline prompt blend - setPromptItems(prompts); - - // Use transition params from block if provided, otherwise use global settings - const effectiveTransitionSteps = - blockTransitionSteps ?? transitionSteps; - const effectiveTemporalInterpolationMethod = - blockTemporalInterpolationMethod ?? - temporalInterpolationMethod; - - // Update the left panel's transition settings to reflect current block's values - if (blockTransitionSteps !== undefined) { - setTransitionSteps(blockTransitionSteps); + + {/* Center Panel - Video Output + Timeline */} +
+ {/* Video area - takes remaining space but can shrink */} +
+ { + // Use timeline's play/pause handler instead of direct video toggle + if (timelinePlayPauseRef.current) { + timelinePlayPauseRef.current(); + } + }} + onStartStream={() => { + // Use timeline's play/pause handler to start stream + if (timelinePlayPauseRef.current) { + timelinePlayPauseRef.current(); + } + }} + onVideoPlaying={() => { + // Execute callback when video starts playing + if (onVideoPlayingCallbackRef.current) { + onVideoPlayingCallbackRef.current(); + onVideoPlayingCallbackRef.current = null; // Clear after execution + } + }} + // Controller input props + supportsControllerInput={currentPipelineSupportsController} + isPointerLocked={isPointerLocked} + onRequestPointerLock={requestPointerLock} + videoContainerRef={videoContainerRef} + // Video scale mode + videoScaleMode={videoScaleMode} + /> +
+ {/* Timeline area - compact, always visible */} +
+ { + // Update the left panel's prompt state to reflect current timeline prompt + const prompts = [{ text, weight: 100 }]; + setPromptItems(prompts); + + // Send to backend - use transition if streaming and transition steps > 0 + if (isStreaming && transitionSteps > 0) { + sendParameterUpdate({ + transition: { + target_prompts: prompts, + num_steps: transitionSteps, + temporal_interpolation_method: + temporalInterpolationMethod, + }, + }); + } else { + // Send direct prompts without transition + sendParameterUpdate({ + prompts, + prompt_interpolation_method: interpolationMethod, + denoising_step_list: settings.denoisingSteps || [ + 700, 500, + ], + }); + } + }} + onPromptItemsSubmit={( + prompts, + blockTransitionSteps, + blockTemporalInterpolationMethod + ) => { + // Update the left panel's prompt state to reflect current timeline prompt blend + setPromptItems(prompts); + + // Use transition params from block if provided, otherwise use global settings + const effectiveTransitionSteps = + blockTransitionSteps ?? transitionSteps; + const effectiveTemporalInterpolationMethod = + blockTemporalInterpolationMethod ?? + temporalInterpolationMethod; + + // Update the left panel's transition settings to reflect current block's values + if (blockTransitionSteps !== undefined) { + setTransitionSteps(blockTransitionSteps); + } + if (blockTemporalInterpolationMethod !== undefined) { + setTemporalInterpolationMethod( + blockTemporalInterpolationMethod + ); + } + + // Send to backend - use transition if streaming and transition steps > 0 + if (isStreaming && effectiveTransitionSteps > 0) { + sendParameterUpdate({ + transition: { + target_prompts: prompts, + num_steps: effectiveTransitionSteps, + temporal_interpolation_method: + effectiveTemporalInterpolationMethod, + }, + }); + } else { + // Send direct prompts without transition + sendParameterUpdate({ + prompts, + prompt_interpolation_method: interpolationMethod, + denoising_step_list: settings.denoisingSteps || [ + 700, 500, + ], + }); + } + }} + disabled={ + isPipelineLoading || + isConnecting || + isCloudConnecting || + showDownloadDialog } - if (blockTemporalInterpolationMethod !== undefined) { - setTemporalInterpolationMethod( - blockTemporalInterpolationMethod - ); + isStreaming={isStreaming} + isVideoPaused={settings.paused} + timelineRef={timelineRef} + onLiveStateChange={setIsLive} + onLivePromptSubmit={handleLivePromptSubmit} + onDisconnect={stopStream} + onStartStream={handleStartStream} + onVideoPlayPauseToggle={handlePlayPauseToggle} + onPromptEdit={handleTimelinePromptEdit} + isCollapsed={isTimelineCollapsed} + onCollapseToggle={setIsTimelineCollapsed} + externalSelectedPromptId={externalSelectedPromptId} + settings={settings} + onSettingsImport={updateSettings} + onPlayPauseRef={timelinePlayPauseRef} + onVideoPlayingCallbackRef={onVideoPlayingCallbackRef} + onResetCache={handleResetCache} + onTimelinePromptsChange={handleTimelinePromptsChange} + onTimelineCurrentTimeChange={handleTimelineCurrentTimeChange} + onTimelinePlayingChange={handleTimelinePlayingChange} + isLoading={isLoading} + videoScaleMode={videoScaleMode} + onVideoScaleModeToggle={() => + setVideoScaleMode(prev => (prev === "fit" ? "native" : "fit")) } + isDownloading={isDownloading} + onSaveGeneration={handleSaveGeneration} + isRecording={isRecording} + onRecordingToggle={() => setIsRecording(prev => !prev)} + /> +
+
- // Send to backend - use transition if streaming and transition steps > 0 - if (isStreaming && effectiveTransitionSteps > 0) { - sendParameterUpdate({ - transition: { - target_prompts: prompts, - num_steps: effectiveTransitionSteps, - temporal_interpolation_method: - effectiveTemporalInterpolationMethod, - }, - }); - } else { - // Send direct prompts without transition - sendParameterUpdate({ - prompts, - prompt_interpolation_method: interpolationMethod, - denoising_step_list: settings.denoisingSteps || [700, 500], - }); + {/* Right Panel - Settings */} +
+ - setVideoScaleMode(prev => (prev === "fit" ? "native" : "fit")) + loras={settings.loras || []} + onLorasChange={handleLorasChange} + loraMergeStrategy={ + settings.loraMergeStrategy ?? "permanent_merge" } - isDownloading={isDownloading} - onSaveGeneration={handleSaveGeneration} - isRecording={isRecording} - onRecordingToggle={() => setIsRecording(prev => !prev)} - /> -
-
- - {/* Right Panel - Settings */} -
- { - updateSettings({ - schemaFieldOverrides: { - ...(settings.schemaFieldOverrides ?? {}), - [key]: value, - }, - }); - if (isRuntimeParam && isStreaming) { - sendParameterUpdate({ [key]: value }); + onVaceEnabledChange={handleVaceEnabledChange} + vaceUseInputVideo={settings.vaceUseInputVideo ?? false} + onVaceUseInputVideoChange={handleVaceUseInputVideoChange} + vaceContextScale={settings.vaceContextScale ?? 1.0} + onVaceContextScaleChange={handleVaceContextScaleChange} + preprocessorIds={settings.preprocessorIds ?? []} + onPreprocessorIdsChange={handlePreprocessorIdsChange} + postprocessorIds={settings.postprocessorIds ?? []} + onPostprocessorIdsChange={handlePostprocessorIdsChange} + preprocessorSchemaFieldOverrides={ + settings.preprocessorSchemaFieldOverrides ?? {} } - }} - isCloudMode={isCloudMode} - /> + postprocessorSchemaFieldOverrides={ + settings.postprocessorSchemaFieldOverrides ?? {} + } + onPreprocessorSchemaFieldOverrideChange={ + handlePreprocessorSchemaFieldOverrideChange + } + onPostprocessorSchemaFieldOverrideChange={ + handlePostprocessorSchemaFieldOverrideChange + } + schemaFieldOverrides={settings.schemaFieldOverrides ?? {}} + onSchemaFieldOverrideChange={(key, value, isRuntimeParam) => { + updateSettings({ + schemaFieldOverrides: { + ...(settings.schemaFieldOverrides ?? {}), + [key]: value, + }, + }); + if (isRuntimeParam && isStreaming) { + sendParameterUpdate({ [key]: value }); + } + }} + isCloudMode={isCloudMode} + nonLinearGraph={nonLinearGraph} + /> +
-
+ )} {/* Status Bar */} diff --git a/src/scope/core/pipelines/base_schema.py b/src/scope/core/pipelines/base_schema.py index 714e988d0..07609b57d 100644 --- a/src/scope/core/pipelines/base_schema.py +++ b/src/scope/core/pipelines/base_schema.py @@ -245,6 +245,10 @@ class BasePipelineConfig(BaseModel): # to appear in the preprocessor dropdown. usage: ClassVar[list[UsageType]] = [] + # Graph port declaration: which stream ports this pipeline exposes + stream_inputs: ClassVar[list[str]] = ["video"] + stream_outputs: ClassVar[list[str]] = ["video"] + # Mode configuration - keys are mode names, values are ModeDefaults with field overrides # Use default=True to mark the default mode. Only include fields that differ from base. modes: ClassVar[dict[str, ModeDefaults]] = {"text": ModeDefaults(default=True)} @@ -382,6 +386,8 @@ def get_schema_with_metadata(cls) -> dict[str, Any]: metadata["modified"] = cls.modified # Convert UsageType enum values to strings for JSON serialization metadata["usage"] = [usage.value for usage in cls.usage] if cls.usage else [] + metadata["stream_inputs"] = list(cls.stream_inputs) + metadata["stream_outputs"] = list(cls.stream_outputs) metadata["config_schema"] = cls.model_json_schema() # Include mode-specific defaults (excluding None values and the "default" flag) diff --git a/src/scope/core/pipelines/longlive/schema.py b/src/scope/core/pipelines/longlive/schema.py index c2598d2a9..0b6c61bb7 100644 --- a/src/scope/core/pipelines/longlive/schema.py +++ b/src/scope/core/pipelines/longlive/schema.py @@ -39,6 +39,9 @@ class LongLiveConfig(BasePipelineConfig): ), ] + stream_inputs = ["video", "vace_input_frames", "vace_input_masks"] + stream_outputs = ["video"] + supports_cache_management = True supports_quantization = True min_dimension = 16 diff --git a/src/scope/server/app.py b/src/scope/server/app.py index ac8080f58..86a46f1ad 100644 --- a/src/scope/server/app.py +++ b/src/scope/server/app.py @@ -498,6 +498,22 @@ async def load_pipeline( try: # Get pipeline IDs to load pipeline_ids = request.pipeline_ids + + # If an API graph is saved, extract pipeline_ids from it to ensure + # all graph pipelines are loaded (frontend saves graph before calling load) + from .graph_state import get_api_graph + + api_graph = get_api_graph() + if api_graph is not None: + graph_pipeline_ids = [ + n.pipeline_id + for n in api_graph.nodes + if n.type == "pipeline" and n.pipeline_id + ] + if graph_pipeline_ids: + # Use graph pipeline IDs, ensuring all are loaded + pipeline_ids = list(dict.fromkeys(graph_pipeline_ids)) + if not pipeline_ids: raise HTTPException( status_code=400, @@ -1713,6 +1729,85 @@ async def reload_plugin( ) from e +# ============================================================================= +# Graph Configuration Endpoints +# ============================================================================= + + +@app.post("/api/v1/graph") +async def set_graph_config(request: Request): + """Accept and store a graph configuration. + + Validates the structure and checks that pipeline_ids exist in the registry. + """ + from scope.core.pipelines.registry import PipelineRegistry + + from .graph_schema import GraphConfig + from .graph_state import set_api_graph + + try: + body = await request.json() + graph = GraphConfig.model_validate(body) + + # Structural validation + errors = graph.validate_structure() + if errors: + raise HTTPException(status_code=422, detail={"errors": errors}) + + # Validate that pipeline_ids exist in the registry + available = set(PipelineRegistry.list_pipelines()) + for node in graph.nodes: + if node.type == "pipeline" and node.pipeline_id not in available: + raise HTTPException( + status_code=422, + detail={ + "errors": [ + f"Pipeline '{node.pipeline_id}' not found in registry. " + f"Available: {sorted(available)}" + ] + }, + ) + + set_api_graph(graph) + return { + "message": "Graph configuration saved", + "nodes": len(graph.nodes), + "edges": len(graph.edges), + } + except HTTPException: + raise + except Exception as e: + logger.error(f"Error setting graph config: {e}") + raise HTTPException(status_code=400, detail=str(e)) from e + + +@app.get("/api/v1/graph") +async def get_graph_config(): + """Return the current graph configuration. + + Priority: API-set graph > none. + """ + from .graph_state import get_api_graph + + api_graph = get_api_graph() + if api_graph is not None: + return { + "source": "api", + "graph": api_graph.model_dump(by_alias=True), + } + + return {"source": None, "graph": None} + + +@app.delete("/api/v1/graph") +async def clear_graph_config(): + """Clear the API-set graph, reverting to fallback behavior.""" + from .graph_state import clear_api_graph + + clear_api_graph() + return {"message": "Graph configuration cleared"} + + # ============================================================================= # Cloud Integration Endpoints # ============================================================================= diff --git a/src/scope/server/frame_processor.py b/src/scope/server/frame_processor.py index ef411c3e8..8878e4de8 100644 --- a/src/scope/server/frame_processor.py +++ b/src/scope/server/frame_processor.py @@ -101,6 +101,15 @@ def __init__( self.pipeline_processors: list[PipelineProcessor] = [] self.pipeline_ids: list[str] = [] + # Graph support: processors indexed by node_id for per-node routing + self._processors_by_node_id: dict[str, PipelineProcessor] = {} + # Graph source queues for fan-out from source nodes + self._graph_source_queues: list[queue.Queue] = [] + # Whether graph mode is active (vs linear chain) + self._graph_mode = False + # The processor whose output we read in graph mode + self._sink_processor: PipelineProcessor | None = None + # Frame counting for debug logging self._frames_in = 0 self._frames_out = 0 @@ -369,30 +378,35 @@ def put(self, frame: VideoFrame) -> bool: return False return False - # Local mode: put into first processor's input queue - if self.pipeline_processors: - first_processor = self.pipeline_processors[0] - - frame_array = frame.to_ndarray(format="rgb24") - - if torch.cuda.is_available(): - shape = frame_array.shape - pinned_buffer = self._get_or_create_pinned_buffer(shape) - # Note: We reuse pinned buffers for performance. This assumes the copy_() - # operation completes before the next frame arrives. - # In practice, copy_() is very fast (~microseconds) and frames arrive at 60 FPS max - pinned_buffer.copy_(torch.as_tensor(frame_array, dtype=torch.uint8)) - frame_tensor = pinned_buffer.cuda(non_blocking=True) - else: - frame_tensor = torch.as_tensor(frame_array, dtype=torch.uint8) + # Local mode: convert frame to tensor + frame_array = frame.to_ndarray(format="rgb24") + + if torch.cuda.is_available(): + shape = frame_array.shape + pinned_buffer = self._get_or_create_pinned_buffer(shape) + # Note: We reuse pinned buffers for performance. This assumes the copy_() + # operation completes before the next frame arrives. + # In practice, copy_() is very fast (~microseconds) and frames arrive at 60 FPS max + pinned_buffer.copy_(torch.as_tensor(frame_array, dtype=torch.uint8)) + frame_tensor = pinned_buffer.cuda(non_blocking=True) + else: + frame_tensor = torch.as_tensor(frame_array, dtype=torch.uint8) - frame_tensor = frame_tensor.unsqueeze(0) + frame_tensor = frame_tensor.unsqueeze(0) - # Put frame into first processor's input queue + if self._graph_mode and self._graph_source_queues: + # Graph mode: fan-out to all source queues + for sq in self._graph_source_queues: + try: + sq.put_nowait(frame_tensor) + except queue.Full: + logger.debug("Graph source queue full, dropping frame") + elif self.pipeline_processors: + # Linear chain mode: put into first processor's input queue + first_processor = self.pipeline_processors[0] try: first_processor.input_queue.put_nowait(frame_tensor) except queue.Full: - # Queue full, drop frame (non-blocking) logger.debug("First processor input queue full, dropping frame") return False @@ -417,7 +431,11 @@ def get(self) -> torch.Tensor | None: if not self.pipeline_processors: return None - last_processor = self.pipeline_processors[-1] + last_processor = ( + self._sink_processor + if self._graph_mode and self._sink_processor + else self.pipeline_processors[-1] + ) if not last_processor.output_queue: return None @@ -502,8 +520,12 @@ def get_fps(self) -> float: if not self.pipeline_processors: return DEFAULT_FPS - # Get FPS from the last processor in the chain - last_processor = self.pipeline_processors[-1] + # Get FPS from the sink processor (graph mode) or last processor (chain mode) + last_processor = ( + self._sink_processor + if self._graph_mode and self._sink_processor + else self.pipeline_processors[-1] + ) return last_processor.get_fps() def _log_frame_stats(self): @@ -610,9 +632,14 @@ def update_parameters(self, parameters: dict[str, Any]): input_source_config = parameters.pop("input_source") self._update_input_source(input_source_config) - # Update parameters for all pipeline processors - for processor in self.pipeline_processors: - processor.update_parameters(parameters) + # Per-node routing: if node_id is specified, route to that processor only + node_id = parameters.pop("node_id", None) + if node_id and self._graph_mode and node_id in self._processors_by_node_id: + self._processors_by_node_id[node_id].update_parameters(parameters) + else: + # Broadcast to all pipeline processors (backward compat) + for processor in self.pipeline_processors: + processor.update_parameters(parameters) # Update local parameters self.parameters = {**self.parameters, **parameters} @@ -918,10 +945,22 @@ def _input_source_receiver_loop(self): ) def _setup_pipeline_chain_sync(self): - """Create pipeline processor chain (synchronous). + """Create pipeline processor chain or graph (synchronous). + If a graph config is available from the API, uses build_graph() to create + the execution graph. Otherwise, falls back to linear chain mode. Assumes all pipelines are already loaded by the pipeline manager. """ + from .graph_state import get_api_graph + + api_graph = get_api_graph() + + if api_graph is not None: + # Graph mode: use build_graph() to create the execution graph + self._setup_graph(api_graph) + return + + # Linear chain mode (backward compat) if not self.pipeline_ids: logger.error("No pipeline IDs provided") return @@ -957,6 +996,39 @@ def _setup_pipeline_chain_sync(self): f"Created pipeline chain with {len(self.pipeline_processors)} processors" ) + def _setup_graph(self, graph): + """Set up graph-based execution from a GraphConfig.""" + from .graph_executor import build_graph + + graph_run = build_graph( + graph=graph, + pipeline_manager=self.pipeline_manager, + initial_parameters=self.parameters.copy(), + session_id=self.session_id, + user_id=self.user_id, + connection_id=self.connection_id, + connection_info=self.connection_info, + ) + + self._graph_mode = True + self._graph_source_queues = graph_run.source_queues + self._sink_processor = graph_run.sink_processor + self.pipeline_processors = graph_run.processors + self.pipeline_ids = graph_run.pipeline_ids + + # Index processors by node_id (pipeline_id in processor is the node_id) + for proc in self.pipeline_processors: + self._processors_by_node_id[proc.pipeline_id] = proc + + # Start all processors + for processor in self.pipeline_processors: + processor.start() + + logger.info( + f"Created graph with {len(self.pipeline_processors)} processors, " + f"sink={graph_run.sink_node_id}" + ) + def __enter__(self): self.start() return self diff --git a/src/scope/server/graph_executor.py b/src/scope/server/graph_executor.py new file mode 100644 index 000000000..4cccd0845 --- /dev/null +++ b/src/scope/server/graph_executor.py @@ -0,0 +1,173 @@ +"""Graph executor: builds and runs pipeline graphs from GraphConfig. + +Wires source queues (for put()), pipeline processors (one per pipeline node), +and identifies the sink for get(). All frame ports (video, vace_input_frames, +vace_input_masks) use stream edges (queues); no separate parameter path. +""" + +from __future__ import annotations + +import logging +import queue +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any + +from .graph_schema import GraphConfig +from .pipeline_processor import PipelineProcessor + +if TYPE_CHECKING: + from .pipeline_manager import PipelineManager + +logger = logging.getLogger(__name__) + +# Default queue sizes (match pipeline_processor) +# Use larger size for inter-pipeline queues so downstream can accumulate a full chunk +DEFAULT_INPUT_QUEUE_MAXSIZE = 64 +DEFAULT_OUTPUT_QUEUE_MAXSIZE = 8 + + +@dataclass +class GraphRun: + """Result of building a graph: queues and processors to run.""" + + # When put(frame) is called, put to each of these queues (source fan-out) + source_queues: list[queue.Queue] + # The processor whose output_queue we read from for get() + sink_processor: PipelineProcessor | None + # All pipeline processors (for start/stop/update_parameters) + processors: list[PipelineProcessor] + # Pipeline IDs in graph order (for logging/events) + pipeline_ids: list[str] + # Node id of the sink (for clarity) + sink_node_id: str | None + # Node ids of output/sink nodes (e.g. "output") for preview mapping + output_node_ids: list[str] + + +def build_graph( + graph: GraphConfig, + pipeline_manager: PipelineManager, + initial_parameters: dict[str, Any], + session_id: str | None = None, + user_id: str | None = None, + connection_id: str | None = None, + connection_info: dict | None = None, +) -> GraphRun: + """Build executable graph: create queues and processors, wire edges. + + Args: + graph: Parsed graph config (nodes + edges). + pipeline_manager: Manager to resolve pipeline_id -> instance. + initial_parameters: Parameters passed to all pipelines. + session_id, user_id, connection_id, connection_info: For processors. + + Returns: + GraphRun with source_queues, sink_processor, processors, pipeline_ids. + """ + # 1) Create one queue per edge (all edges are stream; frame-by-frame) + stream_queues: dict[tuple[str, str], queue.Queue] = {} + for e in graph.edges: + if e.kind == "stream": + stream_queues[(e.to_node, e.to_port)] = queue.Queue( + maxsize=DEFAULT_INPUT_QUEUE_MAXSIZE + ) + + # 2) Source queues: all queues that receive from a source node + source_queues: list[queue.Queue] = [] + for node_id in graph.get_source_node_ids(): + for e in graph.stream_edges_from(node_id): + q = stream_queues.get((e.to_node, e.to_port)) + if q is not None: + source_queues.append(q) + + # 3) Create a processor per pipeline node and wire input_queues per port + node_processors: dict[str, PipelineProcessor] = {} + pipeline_ids: list[str] = [] + + for node in graph.nodes: + if node.type != "pipeline" or node.pipeline_id is None: + continue + pipeline = pipeline_manager.get_pipeline_by_id(node.pipeline_id) + processor = PipelineProcessor( + pipeline=pipeline, + pipeline_id=node.id, + initial_parameters=initial_parameters.copy(), + session_id=session_id, + user_id=user_id, + connection_id=connection_id, + connection_info=connection_info, + ) + node_processors[node.id] = processor + pipeline_ids.append(node.pipeline_id) + + for e in graph.edges_to(node.id): + if e.kind != "stream": + continue + q = stream_queues.get((node.id, e.to_port)) + if q is not None: + processor.input_queues[e.to_port] = q + with processor.input_queue_lock: + processor.input_queue = processor.input_queues.get("video") + + # 4) Set each producer's output_queues per port and wire consumer input to same queue + for node in graph.nodes: + if node.type != "pipeline" or node.id not in node_processors: + continue + proc = node_processors[node.id] + out_by_port: dict[str, list[queue.Queue]] = {} + for e in graph.edges_from(node.id): + if e.kind != "stream": + continue + q = stream_queues.get((e.to_node, e.to_port)) + if q is not None and q not in out_by_port.get(e.from_port, []): + out_by_port.setdefault(e.from_port, []).append(q) + # Symmetric wiring: ensure consumer reads from this queue + consumer = node_processors.get(e.to_node) + if consumer is not None: + consumer.input_queues[e.to_port] = q + with consumer.input_queue_lock: + consumer.input_queue = consumer.input_queues.get("video") + for port, qlist in out_by_port.items(): + proc.output_queues[port] = qlist + + # 5) Identify sink: node that has an edge to "output" (or type sink) + sink_node_id: str | None = None + for e in graph.edges: + if e.to_node == "output" and e.kind == "stream": + sink_node_id = e.from_node + break + if sink_node_id is None: + # Check other sink nodes + for sink_id in graph.get_sink_node_ids(): + for e in graph.edges: + if e.to_node == sink_id and e.kind == "stream": + sink_node_id = e.from_node + break + if sink_node_id: + break + if sink_node_id is None: + # No explicit output node: treat last pipeline node as sink (linear) + pipeline_node_ids = graph.get_pipeline_node_ids() + if pipeline_node_ids: + sink_node_id = pipeline_node_ids[-1] + + sink_processor = node_processors.get(sink_node_id) if sink_node_id else None + if sink_node_id and sink_processor is None: + logger.warning( + "Graph sink node %s not found in processors (missing pipeline?)", + sink_node_id, + ) + elif sink_node_id: + logger.info("Graph sink for playback: node_id=%s", sink_node_id) + + # Collect output/sink node IDs for preview mapping + output_node_ids = [n.id for n in graph.nodes if n.type == "sink"] + + return GraphRun( + source_queues=source_queues, + sink_processor=sink_processor, + processors=list(node_processors.values()), + pipeline_ids=pipeline_ids, + sink_node_id=sink_node_id, + output_node_ids=output_node_ids, + ) diff --git a/src/scope/server/graph_schema.py b/src/scope/server/graph_schema.py new file mode 100644 index 000000000..63eea4a52 --- /dev/null +++ b/src/scope/server/graph_schema.py @@ -0,0 +1,150 @@ +"""Graph (Directed Acyclic Graph) schema for pipeline execution. + +Defines a JSON-friendly format to describe: +- Nodes: source (input), pipeline instances, sink (output) +- Edges: connections between nodes via named ports + +All frame ports (video, vace_input_frames, vace_input_masks) use stream edges +(frame-by-frame queues). The optional "parameter" kind is for future event-like +data only. + +Example (YOLO plugin + Longlive with shared input video): + + { + "nodes": [ + {"id": "input", "type": "source"}, + {"id": "yolo_plugin", "type": "pipeline", "pipeline_id": "yolo_plugin"}, + {"id": "longlive", "type": "pipeline", "pipeline_id": "longlive"}, + {"id": "output", "type": "sink"} + ], + "edges": [ + {"from": "input", "from_port": "video", "to_node": "yolo_plugin", "to_port": "video", "kind": "stream"}, + {"from": "input", "from_port": "video", "to_node": "longlive", "to_port": "video", "kind": "stream"}, + {"from": "yolo_plugin", "from_port": "vace_input_frames", "to_node": "longlive", "to_port": "vace_input_frames", "kind": "stream"}, + {"from": "yolo_plugin", "from_port": "vace_input_masks", "to_node": "longlive", "to_port": "vace_input_masks", "kind": "stream"}, + {"from": "longlive", "from_port": "video", "to_node": "output", "to_port": "video", "kind": "stream"} + ] + } +""" + +from __future__ import annotations + +from typing import Literal + +from pydantic import BaseModel, Field + + +class GraphNode(BaseModel): + """A node in the pipeline graph.""" + + id: str = Field( + ..., + description="Unique node id (e.g. 'input', 'yolo_plugin', 'longlive', 'output')", + ) + type: Literal["source", "pipeline", "sink"] = Field( + ..., + description="source = external input, pipeline = pipeline instance, sink = output", + ) + pipeline_id: str | None = Field( + default=None, + description="Pipeline ID (registry key) when type is 'pipeline'", + ) + + +class GraphEdge(BaseModel): + """An edge connecting an output port to an input port.""" + + from_node: str = Field(..., alias="from", description="Source node id") + from_port: str = Field( + ..., description="Source port (e.g. 'video', 'vace_input_frames')" + ) + to_node: str = Field(..., description="Target node id") + to_port: str = Field(..., description="Target port name") + kind: Literal["stream", "parameter"] = Field( + default="stream", + description="stream = queue (frame-by-frame), parameter = chunk-level pass-through", + ) + + model_config = {"populate_by_name": True} + + +class GraphConfig(BaseModel): + """Root graph configuration (graph definition).""" + + nodes: list[GraphNode] = Field(..., description="Graph nodes") + edges: list[GraphEdge] = Field(..., description="Connections between nodes") + + def get_pipeline_node_ids(self) -> list[str]: + """Return node ids that are pipeline nodes, in definition order.""" + return [n.id for n in self.nodes if n.type == "pipeline"] + + def get_source_node_ids(self) -> list[str]: + """Return node ids that are source nodes.""" + return [n.id for n in self.nodes if n.type == "source"] + + def get_sink_node_ids(self) -> list[str]: + """Return node ids that are sink nodes.""" + return [n.id for n in self.nodes if n.type == "sink"] + + def edges_from(self, node_id: str) -> list[GraphEdge]: + """Return edges whose source is the given node.""" + return [e for e in self.edges if e.from_node == node_id] + + def edges_to(self, node_id: str) -> list[GraphEdge]: + """Return edges whose target is the given node.""" + return [e for e in self.edges if e.to_node == node_id] + + def stream_edges_from(self, node_id: str) -> list[GraphEdge]: + """Return stream edges whose source is the given node.""" + return [e for e in self.edges_from(node_id) if e.kind == "stream"] + + def node_by_id(self, node_id: str) -> GraphNode | None: + """Return the node with the given id.""" + for n in self.nodes: + if n.id == node_id: + return n + return None + + def validate_structure(self) -> list[str]: + """Validate the graph structure and return a list of error messages. + + Checks: + - No duplicate node IDs + - At least one source and one sink node + - Pipeline nodes have a pipeline_id + - All edge references point to existing nodes + """ + errors: list[str] = [] + node_ids = [n.id for n in self.nodes] + + # Check for duplicate node IDs + seen: set[str] = set() + for nid in node_ids: + if nid in seen: + errors.append(f"Duplicate node ID: '{nid}'") + seen.add(nid) + + # At least one source and one sink + if not self.get_source_node_ids(): + errors.append("Graph must have at least one source node") + if not self.get_sink_node_ids(): + errors.append("Graph must have at least one sink node") + + # Pipeline nodes must have pipeline_id + for node in self.nodes: + if node.type == "pipeline" and not node.pipeline_id: + errors.append(f"Pipeline node '{node.id}' is missing pipeline_id") + + # Edge references must point to existing nodes + node_id_set = set(node_ids) + for edge in self.edges: + if edge.from_node not in node_id_set: + errors.append( + f"Edge references non-existent source node: '{edge.from_node}'" + ) + if edge.to_node not in node_id_set: + errors.append( + f"Edge references non-existent target node: '{edge.to_node}'" + ) + + return errors diff --git a/src/scope/server/graph_state.py b/src/scope/server/graph_state.py new file mode 100644 index 000000000..377832fa0 --- /dev/null +++ b/src/scope/server/graph_state.py @@ -0,0 +1,41 @@ +"""In-memory graph configuration store. + +Holds a graph config set via the API that takes priority over input.json. +Thread-safe via a threading lock. +""" + +from __future__ import annotations + +import logging +import threading + +from .graph_schema import GraphConfig + +logger = logging.getLogger(__name__) + +_lock = threading.Lock() +_graph_config: GraphConfig | None = None + + +def get_api_graph() -> GraphConfig | None: + """Return the API-set graph config, or None if not set.""" + with _lock: + return _graph_config + + +def set_api_graph(graph: GraphConfig) -> None: + """Store a graph config set via the API.""" + with _lock: + global _graph_config + _graph_config = graph + logger.info( + f"API graph set with {len(graph.nodes)} nodes and {len(graph.edges)} edges" + ) + + +def clear_api_graph() -> None: + """Clear the API-set graph config, reverting to fallback behavior.""" + with _lock: + global _graph_config + _graph_config = None + logger.info("API graph cleared") diff --git a/src/scope/server/pipeline_processor.py b/src/scope/server/pipeline_processor.py index b11638996..2b755e487 100644 --- a/src/scope/server/pipeline_processor.py +++ b/src/scope/server/pipeline_processor.py @@ -61,9 +61,15 @@ def __init__( self.connection_id = connection_id self.connection_info = connection_info - # Each processor creates its own queues - self.input_queue = queue.Queue(maxsize=30) - self.output_queue = queue.Queue(maxsize=8) + # Unified port-based queues: all frame streams (video, vace_input_frames, etc.) use queues + self.input_queues: dict[str, queue.Queue] = { + "video": queue.Queue(maxsize=30), + } + self.output_queues: dict[str, list[queue.Queue]] = { + "video": [queue.Queue(maxsize=8)], + } + # Primary queue refs for backward compat (chain mode, get_fps, resize) + self.input_queue = self.input_queues["video"] # Lock to protect input_queue assignment for thread-safe reference swapping self.input_queue_lock = threading.Lock() @@ -113,35 +119,32 @@ def __init__( # the next pipeline in the chain can consume them self.throttler = PipelineThrottler() - def _resize_output_queue(self, target_size: int): - """Resize the output queue to the target size, transferring existing frames. + # Latest output frame for graph preview (read-only from preview side) + self._preview_frame: torch.Tensor | None = None + self._preview_frame_lock = threading.Lock() - Args: - target_size: The desired maximum size for the output queue - """ - if self.output_queue is None: + def _resize_output_queue(self, target_size: int): + """Resize the primary video output queue, transferring existing frames.""" + video_queues = self.output_queues.get("video") + if not video_queues: return - - if self.output_queue.maxsize < target_size: + primary = video_queues[0] + if primary.maxsize < target_size: logger.info( - f"Increasing output queue size to {target_size}, current size {self.output_queue.maxsize}" + f"Increasing output queue size to {target_size}, current size {primary.maxsize}" ) - - # Transfer frames from old queue to new queue - old_queue = self.output_queue - self.output_queue = queue.Queue(maxsize=target_size) - while not old_queue.empty(): + new_queue = queue.Queue(maxsize=target_size) + while not primary.empty(): try: - frame = old_queue.get_nowait() - self.output_queue.put_nowait(frame) + frame = primary.get_nowait() + new_queue.put_nowait(frame) except queue.Empty: break - - # Update next processor's input_queue to point to the new output_queue - # Use lock to ensure thread-safe reference swapping + self.output_queues["video"] = [new_queue] + video_queues[1:] if self.next_processor is not None: with self.next_processor.input_queue_lock: - self.next_processor.input_queue = self.output_queue + self.next_processor.input_queues["video"] = new_queue + self.next_processor.input_queue = new_queue def set_next_processor(self, next_processor: "PipelineProcessor"): """Set the next processor in the chain and update output queue size accordingly. @@ -165,7 +168,14 @@ def set_next_processor(self, next_processor: "PipelineProcessor"): # Update next processor's input_queue to point to this output_queue # Use lock to ensure thread-safe reference swapping with next_processor.input_queue_lock: - next_processor.input_queue = self.output_queue + next_processor.input_queues["video"] = self.output_queues["video"][0] + next_processor.input_queue = next_processor.input_queues["video"] + + @property + def output_queue(self) -> queue.Queue | None: + """Primary video output queue (for chain mode and sink get()).""" + queues = self.output_queues.get("video") + return queues[0] if queues else None def start(self): """Start the pipeline processor thread.""" @@ -203,12 +213,13 @@ def stop(self): except queue.Empty: break - if self.output_queue: - while not self.output_queue.empty(): - try: - self.output_queue.get_nowait() - except queue.Empty: - break + for queues in self.output_queues.values(): + for q in queues: + while not q.empty(): + try: + q.get_nowait() + except queue.Empty: + break logger.info(f"PipelineProcessor stopped for pipeline: {self.pipeline_id}") @@ -271,41 +282,11 @@ def prepare_chunk( self, input_queue_ref: queue.Queue, chunk_size: int ) -> list[torch.Tensor]: """ - Sample frames uniformly from the queue, convert them to tensors, and remove processed frames. - - This function implements uniform sampling across the entire queue to ensure - temporal coverage of input frames. It samples frames at evenly distributed - indices and removes all frames up to the last sampled frame to prevent - queue buildup. - - Note: - This function must be called with a queue reference obtained while holding - input_queue_lock. The caller is responsible for thread safety. - - Example: - With queue_size=8 and chunk_size=4: - - step = 8/4 = 2.0 - - indices = [0, 2, 4, 6] (uniformly distributed) - - Returns frames at positions 0, 2, 4, 6 - - Removes frames 0-6 from queue (7 frames total) - - Keeps frame 7 in queue - - Args: - input_queue_ref: Reference to the input queue (obtained while holding lock) - chunk_size: Number of frames to sample - - Returns: - List of tensor frames, each (1, H, W, C) for downstream preprocess_chunk + Sample frames uniformly from one queue (used when only video port is present). """ - - # Calculate uniform sampling step step = input_queue_ref.qsize() / chunk_size - # Generate indices for uniform sampling indices = [round(i * step) for i in range(chunk_size)] - # Extract VideoFrames at sampled indices video_frames = [] - - # Drop all frames up to and including the last sampled frame last_idx = indices[-1] for i in range(last_idx + 1): frame = input_queue_ref.get_nowait() @@ -313,6 +294,36 @@ def prepare_chunk( video_frames.append(frame) return video_frames + def prepare_multi_chunk( + self, + input_queues_ref: dict[str, queue.Queue], + primary_port: str, + chunk_size: int, + ) -> dict[str, list[torch.Tensor]]: + """ + Sample frames uniformly from the primary queue, then the same indices from other ports. + + Returns dict mapping port name to list of tensors (each 1,H,W,C). All ports + must have at least as many frames as we consume from the primary. + """ + primary = input_queues_ref.get(primary_port) + if primary is None or primary.qsize() < chunk_size: + return {} + step = primary.qsize() / chunk_size + indices = [round(i * step) for i in range(chunk_size)] + last_idx = indices[-1] + out: dict[str, list[torch.Tensor]] = {port: [] for port in input_queues_ref} + for port, q in input_queues_ref.items(): + if q.qsize() <= last_idx: + return {} + for port in input_queues_ref: + q = input_queues_ref[port] + for i in range(last_idx + 1): + frame = q.get_nowait() + if i in indices: + out[port].append(frame) + return out + def process_chunk(self): """Process a single chunk of frames.""" # Check if there are new parameters @@ -365,23 +376,16 @@ def process_chunk(self): reset_cache = self.parameters.pop("reset_cache", None) lora_scales = self.parameters.pop("lora_scales", None) - # Handle reset_cache: clear this processor's queues and mark for cache init + # Handle reset_cache: clear this processor's output queues if reset_cache: logger.info(f"Clearing cache for pipeline processor: {self.pipeline_id}") - with self.input_queue_lock: - input_queue_ref = self.input_queue - if input_queue_ref: - while not input_queue_ref.empty(): - try: - input_queue_ref.get_nowait() - except queue.Empty: - break - if self.output_queue: - while not self.output_queue.empty(): - try: - self.output_queue.get_nowait() - except queue.Empty: - break + for queues in self.output_queues.values(): + for q in queues: + while not q.empty(): + try: + q.get_nowait() + except queue.Empty: + break self._pending_cache_init = True requirements = None @@ -392,34 +396,40 @@ def process_chunk(self): prepare_params["video"] = True requirements = self.pipeline.prepare(**prepare_params) - video_input = None + chunks: dict[str, list[torch.Tensor]] = {} input_frame_count = 0 if requirements is not None: current_chunk_size = requirements.input_size - - # Capture a local reference to input_queue while holding the lock - # This ensures thread-safe access even if input_queue is reassigned with self.input_queue_lock: - input_queue_ref = self.input_queue - - # Check if queue has enough frames before consuming them - if input_queue_ref.qsize() < current_chunk_size: + input_queues_ref = dict(self.input_queues) + primary = input_queues_ref.get("video") + if primary is None or primary.qsize() < current_chunk_size: # Preserve popped one-shot parameters so they are applied once frames arrive if lora_scales is not None: self.parameters["lora_scales"] = lora_scales - # Not enough frames in queue, sleep briefly and try again next iteration self.shutdown_event.wait(SLEEP_TIME) return - - # Use prepare_chunk to uniformly sample frames from the queue - video_input = self.prepare_chunk(input_queue_ref, current_chunk_size) - input_frame_count = len(video_input) if video_input else 0 + if len(input_queues_ref) == 1: + chunks["video"] = self.prepare_chunk(primary, current_chunk_size) + else: + chunks = self.prepare_multi_chunk( + input_queues_ref, "video", current_chunk_size + ) + if not chunks: + if lora_scales is not None: + self.parameters["lora_scales"] = lora_scales + self.shutdown_event.wait(SLEEP_TIME) + return + input_frame_count = len(chunks.get("video") or []) try: # Pass parameters (excluding prepare-only parameters) call_params = dict(self.parameters.items()) + # Pass reset_cache as init_cache to pipeline call_params["init_cache"] = not self.is_prepared or self._pending_cache_init + if reset_cache is not None: + call_params["init_cache"] = reset_cache # Pass lora_scales only when present if lora_scales is not None: @@ -432,33 +442,35 @@ def process_chunk(self): # Reset mouse accumulator, keep key state self.parameters["ctrl_input"]["mouse"] = [0.0, 0.0] - # Route video input based on VACE status - # Don't overwrite if preprocessor already provided vace_input_frames - if video_input is not None and "vace_input_frames" not in call_params: - if ( - self._pipeline_supports_vace - and self.vace_enabled - and self.vace_use_input_video - ): - call_params["vace_input_frames"] = video_input - else: - call_params["video"] = video_input + # Fill call_params from stream chunks + if chunks: + if "vace_input_frames" in chunks and "vace_input_masks" in chunks: + call_params["vace_input_frames"] = chunks["vace_input_frames"] + call_params["vace_input_masks"] = chunks["vace_input_masks"] + if "video" in chunks: + if ( + self._pipeline_supports_vace + and self.vace_enabled + and self.vace_use_input_video + and "vace_input_frames" not in call_params + ): + call_params["vace_input_frames"] = chunks["video"] + else: + call_params["video"] = chunks["video"] + # Pass any other stream ports (e.g. video2 for combine_streams) + for port, frame_list in chunks.items(): + if port in ("video", "vace_input_frames", "vace_input_masks"): + continue + call_params[port] = frame_list processing_start = time.time() output_dict = self.pipeline(**call_params) processing_time = time.time() - processing_start - # Extract video from the returned dictionary output = output_dict.get("video") if output is None: return - # Forward extra params to downstream pipeline (dual-output pattern) - # Preprocessors return {"video": frames, "vace_input_frames": ..., "vace_input_masks": ...} - extra_params = {k: v for k, v in output_dict.items() if k != "video"} - if extra_params and self.next_processor is not None: - self.next_processor.update_parameters(extra_params) - # Clear one-shot parameters after use to prevent sending them on subsequent chunks # These parameters should only be sent when explicitly provided in parameter updates one_shot_params = [ @@ -501,29 +513,62 @@ def process_chunk(self): .detach() ) - # Resize output queue to meet target max size - target_output_queue_max_size = num_frames * OUTPUT_QUEUE_MAX_SIZE_FACTOR - self._resize_output_queue(target_output_queue_max_size) - - # Put frames in output queue - # For intermediate pipelines, output goes to next pipeline's input - # For last pipeline, output goes to frame_processor's output_queue - # Output frames are [H, W, C], convert to [1, H, W, C] for consistency - for frame in output: - frame = frame.unsqueeze(0) - # Track when a frame is ready (production rate) - self._track_output_frame() - try: - self.output_queue.put_nowait(frame) - except queue.Full: - logger.debug( - f"Output queue full for {self.pipeline_id}, dropping processed frame" - ) + # Resize primary video output queue to meet target max size. + # Skip for the sink (no next_processor): keep the graph-wired queue so + # frame_processor.get() always reads from the same queue we write to. + if self.next_processor is not None: + target_output_queue_max_size = num_frames * OUTPUT_QUEUE_MAX_SIZE_FACTOR + self._resize_output_queue(target_output_queue_max_size) + + # Put each output port's frames to its queues (all frame ports are streamed) + for port, value in output_dict.items(): + if value is None or not isinstance(value, torch.Tensor): + continue + queues = self.output_queues.get(port) + if not queues: continue + if value.dtype != torch.uint8: + value = ( + (value * 255.0) + .clamp(0, 255) + .to(dtype=torch.uint8) + .contiguous() + .detach() + ) + frames = [value[i].unsqueeze(0) for i in range(value.shape[0])] + for frame in frames: + if port == "video": + self._track_output_frame() + for q in queues: + try: + q.put_nowait(frame if q is queues[0] else frame.clone()) + except queue.Full: + if port == "video": + logger.info( + f"Output queue full for {self.pipeline_id}, dropping frame" + ) + break + + # Store the last video output frame for graph preview + video_output = output_dict.get("video") + if ( + video_output is not None + and isinstance(video_output, torch.Tensor) + and video_output.shape[0] > 0 + ): + last_frame = video_output[-1].unsqueeze(0) + if last_frame.dtype != torch.uint8: + last_frame = ( + (last_frame * 255.0) + .clamp(0, 255) + .to(dtype=torch.uint8) + .contiguous() + .detach() + ) + with self._preview_frame_lock: + self._preview_frame = last_frame - # Apply throttling if this pipeline is producing faster than next can consume - # Only throttle if: (1) has video input, (2) has next processor - if video_input is not None and self.next_processor is not None: + if chunks and self.next_processor is not None: self.throttler.throttle() except Exception as e: @@ -566,6 +611,11 @@ def _calculate_output_fps(self): estimated_fps = max(MIN_FPS, min(MAX_FPS, estimated_fps)) self.current_output_fps = estimated_fps + def get_preview_frame(self) -> torch.Tensor | None: + """Get the latest output frame for preview (thread-safe).""" + with self._preview_frame_lock: + return self._preview_frame + def get_fps(self) -> float: """Get the current dynamically calculated pipeline FPS. diff --git a/src/scope/server/schema.py b/src/scope/server/schema.py index c50127aab..ca67ec328 100644 --- a/src/scope/server/schema.py +++ b/src/scope/server/schema.py @@ -156,6 +156,12 @@ class Parameters(BaseModel): default=None, description="Enable recording for this session. When true, the backend records the stream. ", ) + node_id: str | None = Field( + default=None, + description="Target graph node ID for per-node parameter updates. " + "When set, parameters are routed to the specific pipeline node. " + "When None, parameters are broadcast to all processors (backward compat).", + ) class OutputSinkConfig(BaseModel): From 9fc63add5c51c4fe5f98f10342eb873152eebdac Mon Sep 17 00:00:00 2001 From: Rafal Leszko Date: Mon, 23 Feb 2026 14:05:48 +0000 Subject: [PATCH 2/3] Fix --- src/scope/server/pipeline_processor.py | 55 ++++++++++++++++++++------ 1 file changed, 42 insertions(+), 13 deletions(-) diff --git a/src/scope/server/pipeline_processor.py b/src/scope/server/pipeline_processor.py index 2b755e487..9178af6d7 100644 --- a/src/scope/server/pipeline_processor.py +++ b/src/scope/server/pipeline_processor.py @@ -301,27 +301,56 @@ def prepare_multi_chunk( chunk_size: int, ) -> dict[str, list[torch.Tensor]]: """ - Sample frames uniformly from the primary queue, then the same indices from other ports. - - Returns dict mapping port name to list of tensors (each 1,H,W,C). All ports - must have at least as many frames as we consume from the primary. + Sample chunk_size frames from the primary queue, then sample each + secondary queue. + + Only the primary port is required to have >= chunk_size frames. + Secondary ports with enough frames are sampled with the same indices + as the primary (synchronized). Ports with fewer frames are sampled + independently, and empty ports return an empty list so the pipeline + can handle missing inputs (e.g. text-mode with no camera). """ primary = input_queues_ref.get(primary_port) if primary is None or primary.qsize() < chunk_size: return {} + + out: dict[str, list[torch.Tensor]] = {port: [] for port in input_queues_ref} + + # Sample primary queue step = primary.qsize() / chunk_size indices = [round(i * step) for i in range(chunk_size)] last_idx = indices[-1] - out: dict[str, list[torch.Tensor]] = {port: [] for port in input_queues_ref} + for i in range(last_idx + 1): + frame = primary.get_nowait() + if i in indices: + out[primary_port].append(frame) + + # Sample each secondary queue for port, q in input_queues_ref.items(): - if q.qsize() <= last_idx: - return {} - for port in input_queues_ref: - q = input_queues_ref[port] - for i in range(last_idx + 1): - frame = q.get_nowait() - if i in indices: - out[port].append(frame) + if port == primary_port: + continue + avail = q.qsize() + if avail == 0: + # No frames — pass empty list (pipeline handles gracefully) + continue + if avail > last_idx: + # Enough frames: use same indices as primary (synchronized) + for i in range(last_idx + 1): + frame = q.get_nowait() + if i in indices: + out[port].append(frame) + else: + # Fewer frames: sample independently at this port's own rate + sec_step = max(avail / chunk_size, 1.0) + sec_indices = [ + round(i * sec_step) for i in range(min(chunk_size, avail)) + ] + sec_last = sec_indices[-1] + for i in range(sec_last + 1): + frame = q.get_nowait() + if i in sec_indices: + out[port].append(frame) + return out def process_chunk(self): From 5e503e3898a6d14f1e6e1268f8c26db646d6ec03 Mon Sep 17 00:00:00 2001 From: Rafal Leszko Date: Mon, 23 Feb 2026 16:28:15 +0000 Subject: [PATCH 3/3] Fix --- src/scope/server/frame_processor.py | 4 ++-- src/scope/server/graph_executor.py | 3 ++- src/scope/server/pipeline_processor.py | 9 +++++++++ 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/src/scope/server/frame_processor.py b/src/scope/server/frame_processor.py index 8878e4de8..5af50fc40 100644 --- a/src/scope/server/frame_processor.py +++ b/src/scope/server/frame_processor.py @@ -1016,9 +1016,9 @@ def _setup_graph(self, graph): self.pipeline_processors = graph_run.processors self.pipeline_ids = graph_run.pipeline_ids - # Index processors by node_id (pipeline_id in processor is the node_id) + # Index processors by node_id for per-node parameter routing for proc in self.pipeline_processors: - self._processors_by_node_id[proc.pipeline_id] = proc + self._processors_by_node_id[proc.node_id] = proc # Start all processors for processor in self.pipeline_processors: diff --git a/src/scope/server/graph_executor.py b/src/scope/server/graph_executor.py index 4cccd0845..7d412350d 100644 --- a/src/scope/server/graph_executor.py +++ b/src/scope/server/graph_executor.py @@ -90,12 +90,13 @@ def build_graph( pipeline = pipeline_manager.get_pipeline_by_id(node.pipeline_id) processor = PipelineProcessor( pipeline=pipeline, - pipeline_id=node.id, + pipeline_id=node.pipeline_id, initial_parameters=initial_parameters.copy(), session_id=session_id, user_id=user_id, connection_id=connection_id, connection_info=connection_info, + node_id=node.id, ) node_processors[node.id] = processor pipeline_ids.append(node.pipeline_id) diff --git a/src/scope/server/pipeline_processor.py b/src/scope/server/pipeline_processor.py index 9178af6d7..179e0dbdd 100644 --- a/src/scope/server/pipeline_processor.py +++ b/src/scope/server/pipeline_processor.py @@ -42,6 +42,7 @@ def __init__( user_id: str | None = None, connection_id: str | None = None, connection_info: dict | None = None, + node_id: str | None = None, ): """Initialize a pipeline processor. @@ -53,9 +54,11 @@ def __init__( user_id: User ID for event tracking connection_id: Connection ID from fal.ai WebSocket for event correlation connection_info: Connection metadata (gpu_type, region, etc.) + node_id: Graph node ID (used for per-node parameter routing in graph mode) """ self.pipeline = pipeline self.pipeline_id = pipeline_id + self.node_id = node_id or pipeline_id self.session_id = session_id self.user_id = user_id self.connection_id = connection_id @@ -454,6 +457,12 @@ def process_chunk(self): try: # Pass parameters (excluding prepare-only parameters) call_params = dict(self.parameters.items()) + if not self.is_prepared: + logger.info( + f"[DEBUG] First call for {self.pipeline_id}: " + f"params keys={sorted(self.parameters.keys())}, " + f"has_prompts={'prompts' in self.parameters}" + ) # Pass reset_cache as init_cache to pipeline call_params["init_cache"] = not self.is_prepared or self._pending_cache_init