diff --git a/frontend/package-lock.json b/frontend/package-lock.json index a615e7fe1..f85bb074f 100644 --- a/frontend/package-lock.json +++ b/frontend/package-lock.json @@ -18,6 +18,7 @@ "@radix-ui/react-toggle": "^1.1.10", "@radix-ui/react-toggle-group": "^1.1.11", "@radix-ui/react-tooltip": "^1.2.8", + "@xyflow/react": "^12.10.0", "class-variance-authority": "^0.7.1", "clsx": "^2.1.1", "lucide-react": "^0.544.0", @@ -2658,6 +2659,55 @@ "@babel/types": "^7.28.2" } }, + "node_modules/@types/d3-color": { + "version": "3.1.3", + "resolved": "https://registry.npmjs.org/@types/d3-color/-/d3-color-3.1.3.tgz", + "integrity": "sha512-iO90scth9WAbmgv7ogoq57O9YpKmFBbmoEoCHDB2xMBY0+/KVrqAaCDyCE16dUspeOvIxFFRI+0sEtqDqy2b4A==", + "license": "MIT" + }, + "node_modules/@types/d3-drag": { + "version": "3.0.7", + "resolved": "https://registry.npmjs.org/@types/d3-drag/-/d3-drag-3.0.7.tgz", + "integrity": "sha512-HE3jVKlzU9AaMazNufooRJ5ZpWmLIoc90A37WU2JMmeq28w1FQqCZswHZ3xR+SuxYftzHq6WU6KJHvqxKzTxxQ==", + "license": "MIT", + "dependencies": { + "@types/d3-selection": "*" + } + }, + "node_modules/@types/d3-interpolate": { + "version": "3.0.4", + "resolved": "https://registry.npmjs.org/@types/d3-interpolate/-/d3-interpolate-3.0.4.tgz", + "integrity": "sha512-mgLPETlrpVV1YRJIglr4Ez47g7Yxjl1lj7YKsiMCb27VJH9W8NVM6Bb9d8kkpG/uAQS5AmbA48q2IAolKKo1MA==", + "license": "MIT", + "dependencies": { + "@types/d3-color": "*" + } + }, + "node_modules/@types/d3-selection": { + "version": "3.0.11", + "resolved": "https://registry.npmjs.org/@types/d3-selection/-/d3-selection-3.0.11.tgz", + "integrity": "sha512-bhAXu23DJWsrI45xafYpkQ4NtcKMwWnAC/vKrd2l+nxMFuvOT3XMYTIj2opv8vq8AO5Yh7Qac/nSeP/3zjTK0w==", + "license": "MIT" + }, + "node_modules/@types/d3-transition": { + "version": "3.0.9", + "resolved": "https://registry.npmjs.org/@types/d3-transition/-/d3-transition-3.0.9.tgz", + "integrity": "sha512-uZS5shfxzO3rGlu0cC3bjmMFKsXv+SmZZcgp0KD22ts4uGXp5EVYGzu/0YdwZeKmddhcAccYtREJKkPfXkZuCg==", + "license": "MIT", + "dependencies": { + "@types/d3-selection": "*" + } + }, + "node_modules/@types/d3-zoom": { + "version": "3.0.8", + "resolved": "https://registry.npmjs.org/@types/d3-zoom/-/d3-zoom-3.0.8.tgz", + "integrity": "sha512-iqMC4/YlFCSlO8+2Ii1GGGliCAY4XdeG748w5vQUbevlbDu0zSjH/+jojorQVBK/se0j6DUFNPBGSqD3YWYnDw==", + "license": "MIT", + "dependencies": { + "@types/d3-interpolate": "*", + "@types/d3-selection": "*" + } + }, "node_modules/@types/estree": { "version": "1.0.8", "resolved": "https://registry.npmjs.org/@types/estree/-/estree-1.0.8.tgz", @@ -2984,6 +3034,38 @@ "vite": "^4.2.0 || ^5.0.0 || ^6.0.0 || ^7.0.0" } }, + "node_modules/@xyflow/react": { + "version": "12.10.0", + "resolved": "https://registry.npmjs.org/@xyflow/react/-/react-12.10.0.tgz", + "integrity": "sha512-eOtz3whDMWrB4KWVatIBrKuxECHqip6PfA8fTpaS2RUGVpiEAe+nqDKsLqkViVWxDGreq0lWX71Xth/SPAzXiw==", + "license": "MIT", + "dependencies": { + "@xyflow/system": "0.0.74", + "classcat": "^5.0.3", + "zustand": "^4.4.0" + }, + "peerDependencies": { + "react": ">=17", + "react-dom": ">=17" + } + }, + "node_modules/@xyflow/system": { + "version": "0.0.74", + "resolved": "https://registry.npmjs.org/@xyflow/system/-/system-0.0.74.tgz", + "integrity": "sha512-7v7B/PkiVrkdZzSbL+inGAo6tkR/WQHHG0/jhSvLQToCsfa8YubOGmBYd1s08tpKpihdHDZFwzQZeR69QSBb4Q==", + "license": "MIT", + "dependencies": { + "@types/d3-drag": "^3.0.7", + "@types/d3-interpolate": "^3.0.4", + "@types/d3-selection": "^3.0.10", + "@types/d3-transition": "^3.0.8", + "@types/d3-zoom": "^3.0.8", + "d3-drag": "^3.0.0", + "d3-interpolate": "^3.0.1", + "d3-selection": "^3.0.0", + "d3-zoom": "^3.0.0" + } + }, "node_modules/acorn": { "version": "8.15.0", "resolved": "https://registry.npmjs.org/acorn/-/acorn-8.15.0.tgz", @@ -3242,6 +3324,12 @@ "url": "https://polar.sh/cva" } }, + "node_modules/classcat": { + "version": "5.0.5", + "resolved": "https://registry.npmjs.org/classcat/-/classcat-5.0.5.tgz", + "integrity": "sha512-JhZUT7JFcQy/EzW605k/ktHtncoo9vnyW/2GspNYwFlN1C/WmjuV/xtS04e9SOkL2sTdw0VAZ2UGCcQ9lR6p6w==", + "license": "MIT" + }, "node_modules/clsx": { "version": "2.1.1", "resolved": "https://registry.npmjs.org/clsx/-/clsx-2.1.1.tgz", @@ -3307,6 +3395,111 @@ "devOptional": true, "license": "MIT" }, + "node_modules/d3-color": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/d3-color/-/d3-color-3.1.0.tgz", + "integrity": "sha512-zg/chbXyeBtMQ1LbD/WSoW2DpC3I0mpmPdW+ynRTj/x2DAWYrIY7qeZIHidozwV24m4iavr15lNwIwLxRmOxhA==", + "license": "ISC", + "engines": { + "node": ">=12" + } + }, + "node_modules/d3-dispatch": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/d3-dispatch/-/d3-dispatch-3.0.1.tgz", + "integrity": "sha512-rzUyPU/S7rwUflMyLc1ETDeBj0NRuHKKAcvukozwhshr6g6c5d8zh4c2gQjY2bZ0dXeGLWc1PF174P2tVvKhfg==", + "license": "ISC", + "engines": { + "node": ">=12" + } + }, + "node_modules/d3-drag": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/d3-drag/-/d3-drag-3.0.0.tgz", + "integrity": "sha512-pWbUJLdETVA8lQNJecMxoXfH6x+mO2UQo8rSmZ+QqxcbyA3hfeprFgIT//HW2nlHChWeIIMwS2Fq+gEARkhTkg==", + "license": "ISC", + "dependencies": { + "d3-dispatch": "1 - 3", + "d3-selection": "3" + }, + "engines": { + "node": ">=12" + } + }, + "node_modules/d3-ease": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/d3-ease/-/d3-ease-3.0.1.tgz", + "integrity": "sha512-wR/XK3D3XcLIZwpbvQwQ5fK+8Ykds1ip7A2Txe0yxncXSdq1L9skcG7blcedkOX+ZcgxGAmLX1FrRGbADwzi0w==", + "license": "BSD-3-Clause", + "engines": { + "node": ">=12" + } + }, + "node_modules/d3-interpolate": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/d3-interpolate/-/d3-interpolate-3.0.1.tgz", + "integrity": "sha512-3bYs1rOD33uo8aqJfKP3JWPAibgw8Zm2+L9vBKEHJ2Rg+viTR7o5Mmv5mZcieN+FRYaAOWX5SJATX6k1PWz72g==", + "license": "ISC", + "dependencies": { + "d3-color": "1 - 3" + }, + "engines": { + "node": ">=12" + } + }, + "node_modules/d3-selection": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/d3-selection/-/d3-selection-3.0.0.tgz", + "integrity": "sha512-fmTRWbNMmsmWq6xJV8D19U/gw/bwrHfNXxrIN+HfZgnzqTHp9jOmKMhsTUjXOJnZOdZY9Q28y4yebKzqDKlxlQ==", + "license": "ISC", + "engines": { + "node": ">=12" + } + }, + "node_modules/d3-timer": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/d3-timer/-/d3-timer-3.0.1.tgz", + "integrity": "sha512-ndfJ/JxxMd3nw31uyKoY2naivF+r29V+Lc0svZxe1JvvIRmi8hUsrMvdOwgS1o6uBHmiz91geQ0ylPP0aj1VUA==", + "license": "ISC", + "engines": { + "node": ">=12" + } + }, + "node_modules/d3-transition": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/d3-transition/-/d3-transition-3.0.1.tgz", + "integrity": "sha512-ApKvfjsSR6tg06xrL434C0WydLr7JewBB3V+/39RMHsaXTOG0zmt/OAXeng5M5LBm0ojmxJrpomQVZ1aPvBL4w==", + "license": "ISC", + "dependencies": { + "d3-color": "1 - 3", + "d3-dispatch": "1 - 3", + "d3-ease": "1 - 3", + "d3-interpolate": "1 - 3", + "d3-timer": "1 - 3" + }, + "engines": { + "node": ">=12" + }, + "peerDependencies": { + "d3-selection": "2 - 3" + } + }, + "node_modules/d3-zoom": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/d3-zoom/-/d3-zoom-3.0.0.tgz", + "integrity": "sha512-b8AmV3kfQaqWAuacbPuNbL6vahnOJflOhexLzMMNLga62+/nh0JzvJ0aO/5a5MVgUFGS7Hu1P9P03o3fJkDCyw==", + "license": "ISC", + "dependencies": { + "d3-dispatch": "1 - 3", + "d3-drag": "2 - 3", + "d3-interpolate": "1 - 3", + "d3-selection": "2 - 3", + "d3-transition": "2 - 3" + }, + "engines": { + "node": ">=12" + } + }, "node_modules/debug": { "version": "4.4.1", "resolved": "https://registry.npmjs.org/debug/-/debug-4.4.1.tgz", @@ -5194,6 +5387,15 @@ } } }, + "node_modules/use-sync-external-store": { + "version": "1.6.0", + "resolved": "https://registry.npmjs.org/use-sync-external-store/-/use-sync-external-store-1.6.0.tgz", + "integrity": "sha512-Pp6GSwGP/NrPIrxVFAIkOQeyw8lFenOHijQWkUTrDvrF4ALqylP2C/KCkeS9dpUM3KvYRQhna5vt7IL95+ZQ9w==", + "license": "MIT", + "peerDependencies": { + "react": "^16.8.0 || ^17.0.0 || ^18.0.0 || ^19.0.0" + } + }, "node_modules/vite": { "version": "7.3.0", "resolved": "https://registry.npmjs.org/vite/-/vite-7.3.0.tgz", @@ -5345,6 +5547,34 @@ "funding": { "url": "https://github.com/sponsors/sindresorhus" } + }, + "node_modules/zustand": { + "version": "4.5.7", + "resolved": "https://registry.npmjs.org/zustand/-/zustand-4.5.7.tgz", + "integrity": "sha512-CHOUy7mu3lbD6o6LJLfllpjkzhHXSBlX8B9+qPddUsIfeF5S/UZ5q0kmCsnRqT1UHFQZchNFDDzMbQsuesHWlw==", + "license": "MIT", + "dependencies": { + "use-sync-external-store": "^1.2.2" + }, + "engines": { + "node": ">=12.7.0" + }, + "peerDependencies": { + "@types/react": ">=16.8", + "immer": ">=9.0.6", + "react": ">=16.8" + }, + "peerDependenciesMeta": { + "@types/react": { + "optional": true + }, + "immer": { + "optional": true + }, + "react": { + "optional": true + } + } } } } diff --git a/frontend/package.json b/frontend/package.json index e1c92f6e5..329894fbd 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -24,6 +24,7 @@ "@radix-ui/react-toggle": "^1.1.10", "@radix-ui/react-toggle-group": "^1.1.11", "@radix-ui/react-tooltip": "^1.2.8", + "@xyflow/react": "^12.10.0", "class-variance-authority": "^0.7.1", "clsx": "^2.1.1", "lucide-react": "^0.544.0", diff --git a/frontend/src/components/ComplexFields.tsx b/frontend/src/components/ComplexFields.tsx index 77e162d32..0234c03d5 100644 --- a/frontend/src/components/ComplexFields.tsx +++ b/frontend/src/components/ComplexFields.tsx @@ -72,6 +72,8 @@ export interface SchemaComplexFieldContext { isStreaming?: boolean; isLoading?: boolean; isCloudMode?: boolean; + /** When true, VACE controls are disabled (pipeline chain from DAG editor). */ + graphMode?: boolean; /** Per-field overrides for schema-driven fields (e.g. image path). */ schemaFieldOverrides?: Record; onSchemaFieldOverrideChange?: ( @@ -132,7 +134,7 @@ export function SchemaComplexField({ if (component === "vace" && !rendered.has("vace")) { rendered.add("vace"); return ( -
+
("general"); @@ -114,6 +116,15 @@ export function Header({

Daydream Scope

+ + + +
+ + + + + + {status && ( + + {status} + {dagSource && ( + ({dagSource}) + )} + + )} +
+ + {/* Flow Canvas */} +
+ + + + + + + +
+
+ ); +} diff --git a/frontend/src/components/dag/DagEditorDialog.tsx b/frontend/src/components/dag/DagEditorDialog.tsx new file mode 100644 index 000000000..1fa822d0f --- /dev/null +++ b/frontend/src/components/dag/DagEditorDialog.tsx @@ -0,0 +1,33 @@ +import * as Dialog from "@radix-ui/react-dialog"; +import { X } from "lucide-react"; +import { DagEditor } from "./DagEditor"; + +interface DagEditorDialogProps { + open: boolean; + onClose: () => void; +} + +export function DagEditorDialog({ open, onClose }: DagEditorDialogProps) { + return ( + !val && onClose()}> + + + +
+ + DAG Editor + + + + +
+
+ +
+
+
+
+ ); +} diff --git a/frontend/src/components/dag/DagPreviewContext.tsx b/frontend/src/components/dag/DagPreviewContext.tsx new file mode 100644 index 000000000..b53e82d7e --- /dev/null +++ b/frontend/src/components/dag/DagPreviewContext.tsx @@ -0,0 +1,11 @@ +import { createContext, useContext } from "react"; + +/** Maps node_id (e.g. "input", pipeline_id, or sink id) to a data URL */ +export type PreviewMap = Record; + +export const DagPreviewContext = createContext({}); + +export function useDagPreview(nodeId: string): string | undefined { + const previews = useContext(DagPreviewContext); + return previews[nodeId]; +} diff --git a/frontend/src/components/dag/PipelineNode.tsx b/frontend/src/components/dag/PipelineNode.tsx new file mode 100644 index 000000000..7641ee2cf --- /dev/null +++ b/frontend/src/components/dag/PipelineNode.tsx @@ -0,0 +1,122 @@ +import { Handle, Position, useReactFlow } from "@xyflow/react"; +import type { NodeProps, Node } from "@xyflow/react"; +import type { FlowNodeData } from "../../lib/dagUtils"; +import { useDagPreview } from "./DagPreviewContext"; + +type PipelineNodeType = Node; + +/** Color palette for port handles - each port gets a consistent color */ +const PORT_COLORS: Record = { + video: "bg-blue-400", + video2: "bg-cyan-400", + vace_input_frames: "bg-purple-400", + vace_input_masks: "bg-pink-400", +}; + +function getPortColor(portName: string): string { + return PORT_COLORS[portName] ?? "bg-gray-400"; +} + +export function PipelineNode({ id, data }: NodeProps) { + const { setNodes } = useReactFlow(); + const previewUrl = useDagPreview(id); + + const pipelineIds = data.availablePipelineIds || []; + const portsMap = data.pipelinePortsMap; + const streamInputs = data.streamInputs ?? ["video"]; + const streamOutputs = data.streamOutputs ?? ["video"]; + + const handleChange = (e: React.ChangeEvent) => { + const newPipelineId = e.target.value; + const ports = newPipelineId && portsMap ? portsMap[newPipelineId] : null; + setNodes(nds => + nds.map(n => + n.id === id + ? { + ...n, + data: { + ...n.data, + pipelineId: newPipelineId || null, + label: newPipelineId || n.id, + streamInputs: ports?.inputs ?? ["video"], + streamOutputs: ports?.outputs ?? ["video"], + }, + } + : n + ) + ); + }; + + // Calculate handle positions evenly distributed + const inputCount = streamInputs.length; + const outputCount = streamOutputs.length; + + return ( +
+
Pipeline
+ + + {/* Port labels */} +
+
+ {streamInputs.map(port => ( +
+ {port} +
+ ))} +
+
+ {streamOutputs.map(port => ( +
+ {port} +
+ ))} +
+
+ + {/* Preview thumbnail */} + {previewUrl ? ( + preview + ) : null} + + {/* Input handles (left side) */} + {streamInputs.map((port, i) => ( + + ))} + + {/* Output handles (right side) */} + {streamOutputs.map((port, i) => ( + + ))} +
+ ); +} diff --git a/frontend/src/components/dag/SinkNode.tsx b/frontend/src/components/dag/SinkNode.tsx new file mode 100644 index 000000000..d29faa4db --- /dev/null +++ b/frontend/src/components/dag/SinkNode.tsx @@ -0,0 +1,30 @@ +import { Handle, Position } from "@xyflow/react"; +import type { NodeProps, Node } from "@xyflow/react"; +import type { FlowNodeData } from "../../lib/dagUtils"; +import { useDagPreview } from "./DagPreviewContext"; + +type SinkNodeType = Node; + +export function SinkNode({ id, data }: NodeProps) { + const previewUrl = useDagPreview(id); + + return ( +
+
Sink
+
{data.label}
+ {previewUrl ? ( + preview + ) : null} + +
+ ); +} diff --git a/frontend/src/components/dag/SourceNode.tsx b/frontend/src/components/dag/SourceNode.tsx new file mode 100644 index 000000000..a181dcafd --- /dev/null +++ b/frontend/src/components/dag/SourceNode.tsx @@ -0,0 +1,30 @@ +import { Handle, Position } from "@xyflow/react"; +import type { NodeProps, Node } from "@xyflow/react"; +import type { FlowNodeData } from "../../lib/dagUtils"; +import { useDagPreview } from "./DagPreviewContext"; + +type SourceNodeType = Node; + +export function SourceNode({ data }: NodeProps) { + const previewUrl = useDagPreview("input"); + + return ( +
+
Source
+
{data.label}
+ {previewUrl ? ( + preview + ) : null} + +
+ ); +} diff --git a/frontend/src/components/dag/useDagPreviews.ts b/frontend/src/components/dag/useDagPreviews.ts new file mode 100644 index 000000000..923ad8090 --- /dev/null +++ b/frontend/src/components/dag/useDagPreviews.ts @@ -0,0 +1,70 @@ +import { useEffect, useRef, useState } from "react"; +import type { PreviewMap } from "./DagPreviewContext"; + +/** + * Connects to the DAG previews WebSocket and returns live preview data URLs + * keyed by node id. + */ +export function useDagPreviews(enabled: boolean): PreviewMap { + const [previews, setPreviews] = useState({}); + const wsRef = useRef(null); + const reconnectTimer = useRef | null>(null); + + useEffect(() => { + if (!enabled) { + // Close existing connection when disabled + if (wsRef.current) { + wsRef.current.close(); + wsRef.current = null; + } + setPreviews({}); + return; + } + + function connect() { + const protocol = window.location.protocol === "https:" ? "wss:" : "ws:"; + const ws = new WebSocket( + `${protocol}//${window.location.host}/api/v1/dag/previews` + ); + wsRef.current = ws; + + ws.onmessage = event => { + try { + const data = JSON.parse(event.data); + if (data.previews) { + setPreviews(data.previews); + } + } catch { + // ignore malformed messages + } + }; + + ws.onclose = () => { + wsRef.current = null; + if (enabled) { + // Auto-reconnect after 2 seconds + reconnectTimer.current = setTimeout(connect, 2000); + } + }; + + ws.onerror = () => { + ws.close(); + }; + } + + connect(); + + return () => { + if (reconnectTimer.current) { + clearTimeout(reconnectTimer.current); + reconnectTimer.current = null; + } + if (wsRef.current) { + wsRef.current.close(); + wsRef.current = null; + } + }; + }, [enabled]); + + return previews; +} diff --git a/frontend/src/hooks/usePipeline.ts b/frontend/src/hooks/usePipeline.ts index 2dc7d4b80..f326e42c5 100644 --- a/frontend/src/hooks/usePipeline.ts +++ b/frontend/src/hooks/usePipeline.ts @@ -78,7 +78,8 @@ export function usePipeline(options: UsePipelineOptions = {}) { const triggerLoad = useCallback( async ( pipelineIds?: string[], - loadParams?: PipelineLoadParams + loadParams?: PipelineLoadParams, + useDag: boolean = true ): Promise => { if (isLoading) { console.log("Pipeline already loading"); @@ -98,6 +99,7 @@ export function usePipeline(options: UsePipelineOptions = {}) { // Start the load request await loadPipelineRequest({ pipeline_ids: pipelineIds, + use_dag: useDag, load_params: loadParams, }); @@ -193,10 +195,11 @@ export function usePipeline(options: UsePipelineOptions = {}) { const loadPipelineAsync = useCallback( async ( pipelineIds?: string[], - loadParams?: PipelineLoadParams + loadParams?: PipelineLoadParams, + useDag: boolean = true ): Promise => { // Always trigger load - let the backend decide if reload is needed - return await triggerLoad(pipelineIds, loadParams); + return await triggerLoad(pipelineIds, loadParams, useDag); }, [triggerLoad] ); diff --git a/frontend/src/hooks/useStreamState.ts b/frontend/src/hooks/useStreamState.ts index 72ffb49e2..662270018 100644 --- a/frontend/src/hooks/useStreamState.ts +++ b/frontend/src/hooks/useStreamState.ts @@ -161,7 +161,8 @@ export function useStreamState() { const defaultPipelineId = "longlive"; // Get initial defaults (use fallback since schemas haven't loaded yet) - const initialDefaults = getFallbackDefaults("text"); + // Use video mode defaults since graphMode (default ON) uses video input + const initialDefaults = getFallbackDefaults("video"); const [settings, setSettings] = useState({ pipelineId: "longlive", @@ -177,7 +178,8 @@ export function useStreamState() { kvCacheAttentionBias: 0.3, paused: false, loraMergeStrategy: "permanent_merge", - inputMode: initialDefaults.inputMode, + inputMode: "video", + graphMode: true, }); const [promptData, setPromptData] = useState({ @@ -322,7 +324,9 @@ export function useStreamState() { // Track previous pipelineId so we only reset inputMode when the pipeline actually changes const prevPipelineIdRef = useRef(null); - // Update inputMode when schemas first load or pipeline changes + // Update inputMode and resolution when schemas first load or pipeline changes. + // When graphMode is ON, force video input and apply video-mode resolution from schema. + // When graphMode is OFF, apply pipeline's default mode. useEffect(() => { if (pipelineSchemas) { const schema = pipelineSchemas.pipelines[settings.pipelineId]; @@ -330,14 +334,30 @@ export function useStreamState() { schema?.default_mode && prevPipelineIdRef.current !== settings.pipelineId ) { - setSettings(prev => ({ - ...prev, - inputMode: schema.default_mode, - })); + if (settings.graphMode) { + // Graph mode: keep video input, but apply video-mode resolution from schema + const videoDefaults = getDefaults(settings.pipelineId, "video"); + setSettings(prev => ({ + ...prev, + inputMode: "video", + resolution: { + height: videoDefaults.height, + width: videoDefaults.width, + }, + denoisingSteps: videoDefaults.denoisingSteps, + noiseScale: videoDefaults.noiseScale, + noiseController: videoDefaults.noiseController, + })); + } else { + setSettings(prev => ({ + ...prev, + inputMode: schema.default_mode, + })); + } } prevPipelineIdRef.current = settings.pipelineId; } - }, [pipelineSchemas, settings.pipelineId]); + }, [pipelineSchemas, settings.pipelineId, settings.graphMode, getDefaults]); // Set recommended quantization based on pipeline schema and available VRAM useEffect(() => { diff --git a/frontend/src/lib/api.ts b/frontend/src/lib/api.ts index 511a79ca1..4f3d11db4 100644 --- a/frontend/src/lib/api.ts +++ b/frontend/src/lib/api.ts @@ -40,6 +40,8 @@ export type PipelineLoadParamsGeneric = Record; export interface PipelineLoadRequest { pipeline_ids: string[]; + /** When false, use pipeline_ids only (Pipeline ID + Preprocessor + Postprocessor). When true, server may use DAG. */ + use_dag?: boolean; load_params?: PipelineLoadParamsGeneric | null; } @@ -520,6 +522,9 @@ export interface PipelineSchemaInfo { recommended_quantization_vram_threshold: number | null; modified: boolean; plugin_name: string | null; + // DAG port declarations + stream_inputs: string[]; + stream_outputs: string[]; } export interface PipelineSchemasResponse { @@ -797,6 +802,81 @@ export const deleteApiKey = async ( return response.json(); }; +// DAG Configuration types and API functions + +export interface DagNode { + id: string; + type: "source" | "pipeline" | "sink"; + pipeline_id?: string | null; +} + +export interface DagEdge { + from: string; + from_port: string; + to_node: string; + to_port: string; + kind?: "stream" | "parameter"; +} + +export interface DagConfig { + nodes: DagNode[]; + edges: DagEdge[]; +} + +export interface DagResponse { + source: "api" | "input.json" | null; + dag: DagConfig | null; +} + +export const getDag = async (): Promise => { + const response = await fetch("/api/v1/dag", { + method: "GET", + headers: { "Content-Type": "application/json" }, + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error( + `Get DAG failed: ${response.status} ${response.statusText}: ${errorText}` + ); + } + + return response.json(); +}; + +export const setDag = async ( + dag: DagConfig +): Promise<{ message: string; nodes: number; edges: number }> => { + const response = await fetch("/api/v1/dag", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(dag), + }); + + if (!response.ok) { + const detail = await extractErrorDetail(response); + throw new Error(detail); + } + + return response.json(); +}; + +export const clearDag = async (): Promise<{ message: string }> => { + const response = await fetch("/api/v1/dag", { + method: "DELETE", + headers: { "Content-Type": "application/json" }, + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error( + `Clear DAG failed: ${response.status} ${response.statusText}: ${errorText}` + ); + } + + return response.json(); +}; + export const downloadRecording = async (sessionId: string): Promise => { if (!sessionId) { throw new Error("Session ID is required to download recording"); diff --git a/frontend/src/lib/dagUtils.ts b/frontend/src/lib/dagUtils.ts new file mode 100644 index 000000000..9a9c938bb --- /dev/null +++ b/frontend/src/lib/dagUtils.ts @@ -0,0 +1,159 @@ +import type { Node, Edge } from "@xyflow/react"; +import type { DagConfig, DagNode, DagEdge, PipelineSchemaInfo } from "./api"; + +// Layout constants +const NODE_WIDTH = 200; +const NODE_HEIGHT = 60; +const COLUMN_GAP = 300; +const ROW_GAP = 100; +const START_X = 50; +const START_Y = 50; + +export interface PortInfo { + name: string; +} + +export interface FlowNodeData { + label: string; + pipelineId?: string | null; + nodeType: "source" | "pipeline" | "sink"; + availablePipelineIds?: string[]; + /** Declared input ports for the selected pipeline */ + streamInputs?: string[]; + /** Declared output ports for the selected pipeline */ + streamOutputs?: string[]; + /** Pipeline schemas keyed by pipeline_id, for looking up ports on selection change */ + pipelinePortsMap?: Record; + [key: string]: unknown; +} + +/** + * Build a map of pipeline_id -> { inputs, outputs } from schemas. + */ +export function buildPipelinePortsMap( + schemas: Record +): Record { + const map: Record = {}; + for (const [id, schema] of Object.entries(schemas)) { + map[id] = { + inputs: schema.stream_inputs ?? ["video"], + outputs: schema.stream_outputs ?? ["video"], + }; + } + return map; +} + +/** + * Convert backend DagConfig to React Flow nodes and edges. + * Auto-layout: sources on the left, pipelines in the middle, sinks on the right. + */ +export function dagConfigToFlow( + dag: DagConfig, + portsMap?: Record +): { + nodes: Node[]; + edges: Edge[]; +} { + const sources = dag.nodes.filter(n => n.type === "source"); + const pipelines = dag.nodes.filter(n => n.type === "pipeline"); + const sinks = dag.nodes.filter(n => n.type === "sink"); + + const nodes: Node[] = []; + + // Layout sources (column 0) + sources.forEach((n, i) => { + nodes.push({ + id: n.id, + type: "source", + position: { x: START_X, y: START_Y + i * (NODE_HEIGHT + ROW_GAP) }, + data: { label: n.id, nodeType: "source" }, + }); + }); + + // Layout pipelines (column 1) + pipelines.forEach((n, i) => { + const ports = n.pipeline_id && portsMap ? portsMap[n.pipeline_id] : null; + nodes.push({ + id: n.id, + type: "pipeline", + position: { + x: START_X + COLUMN_GAP, + y: START_Y + i * (NODE_HEIGHT + ROW_GAP), + }, + data: { + label: n.pipeline_id || n.id, + pipelineId: n.pipeline_id, + nodeType: "pipeline", + streamInputs: ports?.inputs ?? ["video"], + streamOutputs: ports?.outputs ?? ["video"], + }, + }); + }); + + // Layout sinks (column 2) + sinks.forEach((n, i) => { + nodes.push({ + id: n.id, + type: "sink", + position: { + x: START_X + COLUMN_GAP * 2, + y: START_Y + i * (NODE_HEIGHT + ROW_GAP), + }, + data: { label: n.id, nodeType: "sink" }, + }); + }); + + // Convert edges + const edges: Edge[] = dag.edges.map((e, i) => ({ + id: `e-${i}-${e.from}-${e.to_node}`, + source: e.from, + sourceHandle: e.from_port, + target: e.to_node, + targetHandle: e.to_port, + label: e.from_port !== "video" ? e.from_port : undefined, + animated: true, + })); + + return { nodes, edges }; +} + +/** + * Convert React Flow state back to backend DagConfig JSON. + */ +export function flowToDagConfig( + nodes: Node[], + edges: Edge[] +): DagConfig { + const dagNodes: DagNode[] = nodes.map(n => ({ + id: n.id, + type: n.data.nodeType, + pipeline_id: + n.data.nodeType === "pipeline" ? (n.data.pipelineId ?? null) : undefined, + })); + + const dagEdges: DagEdge[] = edges.map(e => ({ + from: e.source, + from_port: e.sourceHandle || "video", + to_node: e.target, + to_port: e.targetHandle || "video", + kind: "stream" as const, + })); + + return { nodes: dagNodes, edges: dagEdges }; +} + +/** + * Generate a unique node ID with a given prefix. + */ +export function generateNodeId( + prefix: string, + existingIds: Set +): string { + if (!existingIds.has(prefix)) return prefix; + let i = 1; + while (existingIds.has(`${prefix}_${i}`)) i++; + return `${prefix}_${i}`; +} + +// Default node dimensions for reference +export { NODE_WIDTH, NODE_HEIGHT }; diff --git a/frontend/src/pages/StreamPage.tsx b/frontend/src/pages/StreamPage.tsx index 3d7099a36..d2d0ecea5 100644 --- a/frontend/src/pages/StreamPage.tsx +++ b/frontend/src/pages/StreamPage.tsx @@ -28,7 +28,7 @@ import type { DownloadProgress, } from "../types"; import type { PromptItem, PromptTransition } from "../lib/api"; -import { getInputSourceResolution } from "../lib/api"; +import { getInputSourceResolution, getDag } from "../lib/api"; import { sendLoRAScaleUpdates } from "../utils/loraHelpers"; import { toast } from "sonner"; @@ -786,6 +786,10 @@ export function StreamPage() { }); }; + const handleGraphModeChange = (enabled: boolean) => { + updateSettings({ graphMode: enabled }); + }; + const handleSpoutSenderChange = ( spoutSender: { enabled: boolean; name: string } | undefined ) => { @@ -1000,14 +1004,43 @@ export function StreamPage() { const pipelineIdToUse = overridePipelineId || settings.pipelineId; try { - // Build pipeline chain: preprocessors + main pipeline + postprocessors - const pipelineIds: string[] = []; - if (settings.preprocessorIds && settings.preprocessorIds.length > 0) { - pipelineIds.push(...settings.preprocessorIds); - } - pipelineIds.push(pipelineIdToUse); - if (settings.postprocessorIds && settings.postprocessorIds.length > 0) { - pipelineIds.push(...settings.postprocessorIds); + // Build pipeline chain depending on graph mode + let pipelineIds: string[]; + + if (settings.graphMode) { + // Graph mode: fetch pipeline IDs from the DAG + try { + const dagResponse = await getDag(); + if (dagResponse.dag) { + const dagPipelineIds = dagResponse.dag.nodes + .filter(n => n.type === "pipeline" && n.pipeline_id) + .map(n => n.pipeline_id as string); + if (dagPipelineIds.length > 0) { + pipelineIds = dagPipelineIds; + } else { + console.warn( + "DAG has no pipeline nodes, falling back to settings" + ); + pipelineIds = [pipelineIdToUse]; + } + } else { + console.warn("No DAG configured, falling back to settings"); + pipelineIds = [pipelineIdToUse]; + } + } catch (err) { + console.error("Failed to fetch DAG, falling back to settings:", err); + pipelineIds = [pipelineIdToUse]; + } + } else { + // Manual mode: build chain from preprocessors + main + postprocessors + pipelineIds = []; + if (settings.preprocessorIds && settings.preprocessorIds.length > 0) { + pipelineIds.push(...settings.preprocessorIds); + } + pipelineIds.push(pipelineIdToUse); + if (settings.postprocessorIds && settings.postprocessorIds.length > 0) { + pipelineIds.push(...settings.postprocessorIds); + } } // Check if models are needed but not downloaded for all pipelines in the chain @@ -1161,7 +1194,8 @@ export function StreamPage() { const loadSuccess = await loadPipeline( pipelineIds, - loadParams || undefined + loadParams || undefined, + settings.graphMode ?? true ); if (!loadSuccess) { console.error("Failed to load pipeline, cannot start stream"); @@ -1203,6 +1237,7 @@ export function StreamPage() { vace_context_scale?: number; vace_enabled?: boolean; pipeline_ids?: string[]; + use_dag?: boolean; first_frame_image?: string; last_frame_image?: string; images?: string[]; @@ -1240,6 +1275,8 @@ export function StreamPage() { // Pipeline chain: preprocessors + main pipeline (already built above) initialParameters.pipeline_ids = pipelineIds; + // When graph mode is OFF, backend uses Pipeline ID + Preprocessor + Postprocessor only + initialParameters.use_dag = settings.graphMode ?? true; // VACE-specific parameters if (currentPipeline?.supportsVACE) { @@ -1699,6 +1736,8 @@ export function StreamPage() { } }} isCloudMode={isCloudMode} + graphMode={settings.graphMode ?? true} + onGraphModeChange={handleGraphModeChange} />
diff --git a/frontend/src/types/index.ts b/frontend/src/types/index.ts index 97a10b799..aa30e82e5 100644 --- a/frontend/src/types/index.ts +++ b/frontend/src/types/index.ts @@ -86,6 +86,8 @@ export interface SettingsState { source_type: string; source_name: string; }; + // Graph mode: when enabled, pipeline chain comes from DAG editor instead of manual controls + graphMode?: boolean; // Dynamic schema-driven fields (key = schema field name snake_case, value = parsed value) schemaFieldOverrides?: Record; // Schema-driven overrides for preprocessor/postprocessor plugin configs diff --git a/frontend/vite.config.ts b/frontend/vite.config.ts index b6c4e9294..62083603f 100644 --- a/frontend/vite.config.ts +++ b/frontend/vite.config.ts @@ -14,6 +14,7 @@ export default defineConfig({ "/api": { target: "http://localhost:8000", changeOrigin: true, + ws: true, }, "/health": { target: "http://localhost:8000", diff --git a/input-passthrough-rife.json b/input-passthrough-rife.json new file mode 100644 index 000000000..b61d78281 --- /dev/null +++ b/input-passthrough-rife.json @@ -0,0 +1,13 @@ +{ + "nodes": [ + {"id": "input", "type": "source"}, + {"id": "passthrough", "type": "pipeline", "pipeline_id": "passthrough"}, + {"id": "rife", "type": "pipeline", "pipeline_id": "rife"}, + {"id": "output", "type": "sink"} + ], + "edges": [ + {"from": "input", "from_port": "video", "to_node": "passthrough", "to_port": "video", "kind": "stream"}, + {"from": "passthrough", "from_port": "video", "to_node": "rife", "to_port": "video", "kind": "stream"}, + {"from": "rife", "from_port": "video", "to_node": "output", "to_port": "video", "kind": "stream"} + ] +} diff --git a/input.json b/input.json new file mode 100644 index 000000000..e09ed5e68 --- /dev/null +++ b/input.json @@ -0,0 +1,16 @@ +{ + "nodes": [ + {"id": "input", "type": "source"}, + {"id": "passthrough", "type": "pipeline", "pipeline_id": "passthrough"}, + {"id": "depth", "type": "pipeline", "pipeline_id": "video-depth-anything"}, + {"id": "combine", "type": "pipeline", "pipeline_id": "combine_streams"}, + {"id": "output", "type": "sink"} + ], + "edges": [ + {"from": "input", "from_port": "video", "to_node": "passthrough", "to_port": "video", "kind": "stream"}, + {"from": "input", "from_port": "video", "to_node": "depth", "to_port": "video", "kind": "stream"}, + {"from": "passthrough", "from_port": "video", "to_node": "combine", "to_port": "video", "kind": "stream"}, + {"from": "depth", "from_port": "video", "to_node": "combine", "to_port": "video2", "kind": "stream"}, + {"from": "combine", "from_port": "video", "to_node": "output", "to_port": "video", "kind": "stream"} + ] +} diff --git a/pyproject.toml b/pyproject.toml index 0c006a2bb..15ad1f4aa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,7 @@ dependencies = [ "httpx>=0.28.1", "twilio>=9.8.0", "uvicorn>=0.35.0", + "websockets>=15.0", "torch==2.9.1", "torchvision==0.24.1", "easydict>=1.13", diff --git a/src/scope/core/pipelines/base_schema.py b/src/scope/core/pipelines/base_schema.py index 714e988d0..4c875bd12 100644 --- a/src/scope/core/pipelines/base_schema.py +++ b/src/scope/core/pipelines/base_schema.py @@ -245,6 +245,13 @@ class BasePipelineConfig(BaseModel): # to appear in the preprocessor dropdown. usage: ClassVar[list[UsageType]] = [] + # DAG port declaration: named inputs/outputs for graph wiring. + # stream_inputs = port names this pipeline reads from (e.g. "video", "vace_input_frames"). + # stream_outputs = port names this pipeline writes to (e.g. "video", "vace_input_masks"). + # Used by the DAG executor to connect queues and parameter forwarding. + stream_inputs: ClassVar[list[str]] = ["video"] + stream_outputs: ClassVar[list[str]] = ["video"] + # Mode configuration - keys are mode names, values are ModeDefaults with field overrides # Use default=True to mark the default mode. Only include fields that differ from base. modes: ClassVar[dict[str, ModeDefaults]] = {"text": ModeDefaults(default=True)} @@ -382,6 +389,8 @@ def get_schema_with_metadata(cls) -> dict[str, Any]: metadata["modified"] = cls.modified # Convert UsageType enum values to strings for JSON serialization metadata["usage"] = [usage.value for usage in cls.usage] if cls.usage else [] + metadata["stream_inputs"] = list(cls.stream_inputs) + metadata["stream_outputs"] = list(cls.stream_outputs) metadata["config_schema"] = cls.model_json_schema() # Include mode-specific defaults (excluding None values and the "default" flag) diff --git a/src/scope/core/pipelines/longlive/schema.py b/src/scope/core/pipelines/longlive/schema.py index c2598d2a9..53215f0a8 100644 --- a/src/scope/core/pipelines/longlive/schema.py +++ b/src/scope/core/pipelines/longlive/schema.py @@ -17,6 +17,9 @@ class LongLiveConfig(BasePipelineConfig): pipeline_id = "longlive" pipeline_name = "LongLive" + # DAG: accepts main video and optional VACE inputs from upstream (e.g. YOLO plugin) + stream_inputs = ["video", "vace_input_frames", "vace_input_masks"] + stream_outputs = ["video"] pipeline_description = ( "A streaming pipeline and autoregressive video diffusion model from Nvidia, MIT, HKUST, HKU and THU. " "The model is trained using Self-Forcing on Wan2.1 1.3b with modifications to support smoother prompt " diff --git a/src/scope/server/app.py b/src/scope/server/app.py index ac13a219d..147b09e68 100644 --- a/src/scope/server/app.py +++ b/src/scope/server/app.py @@ -19,7 +19,16 @@ import click import uvicorn -from fastapi import BackgroundTasks, Depends, FastAPI, HTTPException, Query, Request +from fastapi import ( + BackgroundTasks, + Depends, + FastAPI, + HTTPException, + Query, + Request, + WebSocket, + WebSocketDisconnect, +) from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, Response from fastapi.staticfiles import StaticFiles @@ -496,17 +505,50 @@ async def load_pipeline( cloud-hosted scope backend. """ try: - # Get pipeline IDs to load - pipeline_ids = request.pipeline_ids + # Override pipeline IDs only when use_dag is True: API DAG > input.json > request + from .dag_state import get_api_dag + from .frame_processor import get_pipeline_ids_from_input_json + + if request.use_dag: + api_dag = get_api_dag() + if api_dag is not None: + dag_pipeline_ids = [ + n.pipeline_id + for n in api_dag.nodes + if n.type == "pipeline" and n.pipeline_id is not None + ] + logger.info( + f"Overriding pipeline_ids from API DAG: {dag_pipeline_ids} " + f"(was: {request.pipeline_ids})" + ) + pipeline_ids = dag_pipeline_ids + # Pass UI load_params (resolution, seed, vae_type, etc.) so each + # pipeline in the DAG is loaded with the same params; each pipeline + # uses only the params it needs (e.g. longlive uses height/width). + load_params_dict = request.load_params + elif (dag_pipeline_ids := get_pipeline_ids_from_input_json()) is not None: + logger.info( + f"Overriding pipeline_ids from input.json: {dag_pipeline_ids} " + f"(was: {request.pipeline_ids})" + ) + pipeline_ids = dag_pipeline_ids + # Pass UI load_params so resolution and other params are applied to + # each pipeline in the DAG (e.g. longlive gets height/width/seed). + load_params_dict = request.load_params + else: + pipeline_ids = request.pipeline_ids + load_params_dict = request.load_params + else: + # Graph mode OFF: use standard Pipeline ID + Preprocessor + Postprocessor + pipeline_ids = request.pipeline_ids + load_params_dict = request.load_params + if not pipeline_ids: raise HTTPException( status_code=400, detail="pipeline_ids must be provided and cannot be empty", ) - # load_params is already a dict (or None) - load_params_dict = request.load_params - # Local mode: start loading in background without blocking asyncio.create_task( pipeline_manager.load_pipelines( @@ -1701,6 +1743,133 @@ async def reload_plugin( ) from e +# ============================================================================= +# DAG Configuration Endpoints +# ============================================================================= + + +@app.post("/api/v1/dag") +async def set_dag_config(request: Request): + """Accept and store a DAG configuration. + + Validates the structure and checks that pipeline_ids exist in the registry. + """ + from scope.core.pipelines.registry import PipelineRegistry + + from .dag_schema import DagConfig + from .dag_state import set_api_dag + + try: + body = await request.json() + dag = DagConfig.model_validate(body) + + # Structural validation + errors = dag.validate_structure() + if errors: + raise HTTPException(status_code=422, detail={"errors": errors}) + + # Validate that pipeline_ids exist in the registry + available = set(PipelineRegistry.list_pipelines()) + for node in dag.nodes: + if node.type == "pipeline" and node.pipeline_id not in available: + raise HTTPException( + status_code=422, + detail={ + "errors": [ + f"Pipeline '{node.pipeline_id}' not found in registry. " + f"Available: {sorted(available)}" + ] + }, + ) + + set_api_dag(dag) + return { + "message": "DAG configuration saved", + "nodes": len(dag.nodes), + "edges": len(dag.edges), + } + except HTTPException: + raise + except Exception as e: + logger.error(f"Error setting DAG config: {e}") + raise HTTPException(status_code=400, detail=str(e)) from e + + +@app.get("/api/v1/dag") +async def get_dag_config(): + """Return the current DAG configuration. + + Priority: API-set DAG > input.json > none. + """ + from .dag_state import get_api_dag + from .frame_processor import _load_dag_from_input_json + + api_dag = get_api_dag() + if api_dag is not None: + return { + "source": "api", + "dag": api_dag.model_dump(by_alias=True), + } + + file_dag = _load_dag_from_input_json() + if file_dag is not None: + return { + "source": "input.json", + "dag": file_dag.model_dump(by_alias=True), + } + + return {"source": None, "dag": None} + + +@app.delete("/api/v1/dag") +async def clear_dag_config(): + """Clear the API-set DAG, reverting to fallback behavior.""" + from .dag_state import clear_api_dag + + clear_api_dag() + return {"message": "DAG configuration cleared"} + + +@app.websocket("/api/v1/dag/previews") +async def dag_previews_ws(ws: WebSocket): + """Stream live preview thumbnails from each DAG node at ~5 FPS.""" + import base64 + + await ws.accept() + + try: + while True: + # Find the active session's frame_processor + frame_processor = None + if webrtc_manager: + for session in webrtc_manager.list_sessions().values(): + vt = session.video_track + if vt and hasattr(vt, "frame_processor") and vt.frame_processor: + fp = vt.frame_processor + if fp.running: + frame_processor = fp + break + + if frame_processor is not None: + try: + previews_raw = await asyncio.get_event_loop().run_in_executor( + None, frame_processor.get_preview_frames + ) + previews = { + node_id: f"data:image/jpeg;base64,{base64.b64encode(jpeg_bytes).decode()}" + for node_id, jpeg_bytes in previews_raw.items() + } + await ws.send_json({"previews": previews}) + except Exception as e: + logger.debug(f"Error getting preview frames: {e}") + + await asyncio.sleep(0.2) # ~5 FPS + except WebSocketDisconnect: + pass + except Exception as e: + logger.debug(f"DAG preview WebSocket closed: {e}") + + # ============================================================================= # Cloud Integration Endpoints # ============================================================================= diff --git a/src/scope/server/dag_executor.py b/src/scope/server/dag_executor.py new file mode 100644 index 000000000..c98541c74 --- /dev/null +++ b/src/scope/server/dag_executor.py @@ -0,0 +1,211 @@ +"""DAG executor: builds and runs pipeline graphs from DagConfig. + +Wires source queues (for put()), pipeline processors (one per pipeline node), +and identifies the sink for get(). All frame ports (video, vace_input_frames, +vace_input_masks) use stream edges (queues); no separate parameter path. +""" + +from __future__ import annotations + +import logging +import queue +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any + +from .dag_schema import DagConfig, DagEdge, DagNode +from .pipeline_processor import PipelineProcessor + +if TYPE_CHECKING: + from .pipeline_manager import PipelineManager + +logger = logging.getLogger(__name__) + +# Default queue sizes (match pipeline_processor) +# Use larger size for inter-pipeline queues so downstream can accumulate a full chunk +DEFAULT_INPUT_QUEUE_MAXSIZE = 64 +DEFAULT_OUTPUT_QUEUE_MAXSIZE = 8 + + +@dataclass +class DagRun: + """Result of building a DAG: queues and processors to run.""" + + # When put(frame) is called, put to each of these queues (source fan-out) + source_queues: list[queue.Queue] + # The processor whose output_queue we read from for get() + sink_processor: PipelineProcessor | None + # All pipeline processors (for start/stop/update_parameters) + processors: list[PipelineProcessor] + # Pipeline IDs in graph order (for logging/events) + pipeline_ids: list[str] + # Node id of the sink (for clarity) + sink_node_id: str | None + # Node ids of output/sink nodes (e.g. "output") for preview mapping + output_node_ids: list[str] + + +def build_dag( + dag: DagConfig, + pipeline_manager: PipelineManager, + initial_parameters: dict[str, Any], + session_id: str | None = None, + user_id: str | None = None, + connection_id: str | None = None, + connection_info: dict | None = None, +) -> DagRun: + """Build executable DAG: create queues and processors, wire edges. + + Args: + dag: Parsed DAG config (nodes + edges). + pipeline_manager: Manager to resolve pipeline_id -> instance. + initial_parameters: Parameters passed to all pipelines. + session_id, user_id, connection_id, connection_info: For processors. + + Returns: + DagRun with source_queues, sink_processor, processors, pipeline_ids. + """ + # 1) Create one queue per edge (all edges are stream; frame-by-frame) + stream_queues: dict[tuple[str, str], queue.Queue] = {} + for e in dag.edges: + if e.kind == "stream": + stream_queues[(e.to_node, e.to_port)] = queue.Queue( + maxsize=DEFAULT_INPUT_QUEUE_MAXSIZE + ) + + # 2) Source queues: all queues that receive from a source node + source_queues: list[queue.Queue] = [] + for node_id in dag.get_source_node_ids(): + for e in dag.stream_edges_from(node_id): + q = stream_queues.get((e.to_node, e.to_port)) + if q is not None: + source_queues.append(q) + + # 3) Create a processor per pipeline node and wire input_queues per port + node_processors: dict[str, PipelineProcessor] = {} + pipeline_ids: list[str] = [] + + for node in dag.nodes: + if node.type != "pipeline" or node.pipeline_id is None: + continue + pipeline = pipeline_manager.get_pipeline_by_id(node.pipeline_id) + processor = PipelineProcessor( + pipeline=pipeline, + pipeline_id=node.id, + initial_parameters=initial_parameters.copy(), + session_id=session_id, + user_id=user_id, + connection_id=connection_id, + connection_info=connection_info, + ) + node_processors[node.id] = processor + pipeline_ids.append(node.pipeline_id) + + for e in dag.edges_to(node.id): + if e.kind != "stream": + continue + q = stream_queues.get((node.id, e.to_port)) + if q is not None: + processor.input_queues[e.to_port] = q + with processor.input_queue_lock: + processor.input_queue = processor.input_queues.get("video") + + # 4) Set each producer's output_queues per port and wire consumer input to same queue + for node in dag.nodes: + if node.type != "pipeline" or node.id not in node_processors: + continue + proc = node_processors[node.id] + out_by_port: dict[str, list[queue.Queue]] = {} + for e in dag.edges_from(node.id): + if e.kind != "stream": + continue + q = stream_queues.get((e.to_node, e.to_port)) + if q is not None and q not in out_by_port.get(e.from_port, []): + out_by_port.setdefault(e.from_port, []).append(q) + # Symmetric wiring: ensure consumer reads from this queue (fixes chained pipelines) + consumer = node_processors.get(e.to_node) + if consumer is not None: + consumer.input_queues[e.to_port] = q + with consumer.input_queue_lock: + consumer.input_queue = consumer.input_queues.get("video") + for port, qlist in out_by_port.items(): + proc.output_queues[port] = qlist + + # 4) Identify sink: node that has an edge to "output" (or type sink) + sink_node_id: str | None = None + for e in dag.edges: + if e.to_node == "output" and e.kind == "stream": + sink_node_id = e.from_node + break + if sink_node_id is None: + # No explicit output node: treat last pipeline node as sink (linear) + pipeline_node_ids = dag.get_pipeline_node_ids() + if pipeline_node_ids: + sink_node_id = pipeline_node_ids[-1] + + sink_processor = node_processors.get(sink_node_id) if sink_node_id else None + if sink_node_id and sink_processor is None: + logger.warning( + "DAG sink node %s not found in processors (missing pipeline?)", + sink_node_id, + ) + elif sink_node_id: + logger.info("DAG sink for playback: node_id=%s", sink_node_id) + + # Collect output/sink node IDs for preview mapping + output_node_ids = [n.id for n in dag.nodes if n.type == "sink"] + + return DagRun( + source_queues=source_queues, + sink_processor=sink_processor, + processors=list(node_processors.values()), + pipeline_ids=pipeline_ids, + sink_node_id=sink_node_id, + output_node_ids=output_node_ids, + ) + + +def linear_dag_from_pipeline_ids(pipeline_ids: list[str]) -> DagConfig: + """Build a linear DAG from a list of pipeline IDs (backward compatibility). + + Produces: source -> p0 -> p1 -> ... -> sink. + """ + nodes = [ + DagNode(id="input", type="source"), + *[DagNode(id=pid, type="pipeline", pipeline_id=pid) for pid in pipeline_ids], + DagNode(id="output", type="sink"), + ] + edges: list[DagEdge] = [] + prev = "input" + for pid in pipeline_ids: + edges.append( + DagEdge( + from_node=prev, + from_port="video", + to_node=pid, + to_port="video", + kind="stream", + ) + ) + prev = pid + edges.append( + DagEdge( + from_node=prev, + from_port="video", + to_node="output", + to_port="video", + kind="stream", + ) + ) + # Vace streams: preprocessor outputs (vace_*) to next in chain (frame-by-frame queues) + for i in range(len(pipeline_ids) - 1): + for port in ("vace_input_frames", "vace_input_masks"): + edges.append( + DagEdge( + from_node=pipeline_ids[i], + from_port=port, + to_node=pipeline_ids[i + 1], + to_port=port, + kind="stream", + ) + ) + return DagConfig(nodes=nodes, edges=edges) diff --git a/src/scope/server/dag_schema.py b/src/scope/server/dag_schema.py new file mode 100644 index 000000000..c29b4c1d3 --- /dev/null +++ b/src/scope/server/dag_schema.py @@ -0,0 +1,154 @@ +"""DAG (Directed Acyclic Graph) schema for pipeline execution. + +Defines a JSON-friendly format to describe: +- Nodes: source (input), pipeline instances, sink (output) +- Edges: connections between nodes via named ports + +All frame ports (video, vace_input_frames, vace_input_masks) use stream edges +(frame-by-frame queues). The optional "parameter" kind is for future event-like +data only. + +Example (YOLO plugin + Longlive with shared input video): + + { + "nodes": [ + {"id": "input", "type": "source"}, + {"id": "yolo_plugin", "type": "pipeline", "pipeline_id": "yolo_plugin"}, + {"id": "longlive", "type": "pipeline", "pipeline_id": "longlive"}, + {"id": "output", "type": "sink"} + ], + "edges": [ + {"from": "input", "from_port": "video", "to_node": "yolo_plugin", "to_port": "video", "kind": "stream"}, + {"from": "input", "from_port": "video", "to_node": "longlive", "to_port": "video", "kind": "stream"}, + {"from": "yolo_plugin", "from_port": "vace_input_frames", "to_node": "longlive", "to_port": "vace_input_frames", "kind": "stream"}, + {"from": "yolo_plugin", "from_port": "vace_input_masks", "to_node": "longlive", "to_port": "vace_input_masks", "kind": "stream"}, + {"from": "longlive", "from_port": "video", "to_node": "output", "to_port": "video", "kind": "stream"} + ] + } +""" + +from __future__ import annotations + +from typing import Literal + +from pydantic import BaseModel, Field + + +class DagNode(BaseModel): + """A node in the pipeline DAG.""" + + id: str = Field( + ..., + description="Unique node id (e.g. 'input', 'yolo_plugin', 'longlive', 'output')", + ) + type: Literal["source", "pipeline", "sink"] = Field( + ..., + description="source = external input, pipeline = pipeline instance, sink = output", + ) + pipeline_id: str | None = Field( + default=None, + description="Pipeline ID (registry key) when type is 'pipeline'", + ) + + +class DagEdge(BaseModel): + """An edge connecting an output port to an input port.""" + + from_node: str = Field(..., alias="from", description="Source node id") + from_port: str = Field( + ..., description="Source port (e.g. 'video', 'vace_input_frames')" + ) + to_node: str = Field(..., description="Target node id") + to_port: str = Field(..., description="Target port name") + kind: Literal["stream", "parameter"] = Field( + default="stream", + description="stream = queue (frame-by-frame), parameter = chunk-level pass-through", + ) + + model_config = {"populate_by_name": True} + + +class DagConfig(BaseModel): + """Root DAG configuration (graph definition).""" + + nodes: list[DagNode] = Field(..., description="DAG nodes") + edges: list[DagEdge] = Field(..., description="Connections between nodes") + + def get_pipeline_node_ids(self) -> list[str]: + """Return node ids that are pipeline nodes, in definition order.""" + return [n.id for n in self.nodes if n.type == "pipeline"] + + def get_source_node_ids(self) -> list[str]: + """Return node ids that are source nodes.""" + return [n.id for n in self.nodes if n.type == "source"] + + def get_sink_node_ids(self) -> list[str]: + """Return node ids that are sink nodes.""" + return [n.id for n in self.nodes if n.type == "sink"] + + def edges_from(self, node_id: str) -> list[DagEdge]: + """Return edges whose source is the given node.""" + return [e for e in self.edges if e.from_node == node_id] + + def edges_to(self, node_id: str) -> list[DagEdge]: + """Return edges whose target is the given node.""" + return [e for e in self.edges if e.to_node == node_id] + + def stream_edges_from(self, node_id: str) -> list[DagEdge]: + """Return stream edges whose source is the given node.""" + return [e for e in self.edges_from(node_id) if e.kind == "stream"] + + def parameter_edges_from(self, node_id: str) -> list[DagEdge]: + """Return parameter edges whose source is the given node.""" + return [e for e in self.edges_from(node_id) if e.kind == "parameter"] + + def node_by_id(self, node_id: str) -> DagNode | None: + """Return the node with the given id.""" + for n in self.nodes: + if n.id == node_id: + return n + return None + + def validate_structure(self) -> list[str]: + """Validate the DAG structure and return a list of error messages. + + Checks: + - No duplicate node IDs + - At least one source and one sink node + - Pipeline nodes have a pipeline_id + - All edge references point to existing nodes + """ + errors: list[str] = [] + node_ids = [n.id for n in self.nodes] + + # Check for duplicate node IDs + seen: set[str] = set() + for nid in node_ids: + if nid in seen: + errors.append(f"Duplicate node ID: '{nid}'") + seen.add(nid) + + # At least one source and one sink + if not self.get_source_node_ids(): + errors.append("DAG must have at least one source node") + if not self.get_sink_node_ids(): + errors.append("DAG must have at least one sink node") + + # Pipeline nodes must have pipeline_id + for node in self.nodes: + if node.type == "pipeline" and not node.pipeline_id: + errors.append(f"Pipeline node '{node.id}' is missing pipeline_id") + + # Edge references must point to existing nodes + node_id_set = set(node_ids) + for edge in self.edges: + if edge.from_node not in node_id_set: + errors.append( + f"Edge references non-existent source node: '{edge.from_node}'" + ) + if edge.to_node not in node_id_set: + errors.append( + f"Edge references non-existent target node: '{edge.to_node}'" + ) + + return errors diff --git a/src/scope/server/dag_state.py b/src/scope/server/dag_state.py new file mode 100644 index 000000000..8a472c5df --- /dev/null +++ b/src/scope/server/dag_state.py @@ -0,0 +1,39 @@ +"""In-memory DAG configuration store. + +Holds a DAG config set via the API that takes priority over input.json. +Thread-safe via a threading lock. +""" + +from __future__ import annotations + +import logging +import threading + +from .dag_schema import DagConfig + +logger = logging.getLogger(__name__) + +_lock = threading.Lock() +_dag_config: DagConfig | None = None + + +def get_api_dag() -> DagConfig | None: + """Return the API-set DAG config, or None if not set.""" + with _lock: + return _dag_config + + +def set_api_dag(dag: DagConfig) -> None: + """Store a DAG config set via the API.""" + with _lock: + global _dag_config + _dag_config = dag + logger.info(f"API DAG set with {len(dag.nodes)} nodes and {len(dag.edges)} edges") + + +def clear_api_dag() -> None: + """Clear the API-set DAG config, reverting to fallback behavior.""" + with _lock: + global _dag_config + _dag_config = None + logger.info("API DAG cleared") diff --git a/src/scope/server/frame_processor.py b/src/scope/server/frame_processor.py index 9a0bfca13..d9c5a823a 100644 --- a/src/scope/server/frame_processor.py +++ b/src/scope/server/frame_processor.py @@ -1,13 +1,17 @@ +import json import logging import queue import threading import time import uuid +from pathlib import Path from typing import TYPE_CHECKING, Any import torch from aiortc.mediastreams import VideoFrame +from .dag_executor import DagRun, build_dag, linear_dag_from_pipeline_ids +from .dag_schema import DagConfig from .kafka_publisher import publish_event from .pipeline_manager import PipelineManager from .pipeline_processor import PipelineProcessor @@ -30,6 +34,37 @@ # Heartbeat interval for stream stats logging and Kafka events HEARTBEAT_INTERVAL_SECONDS = 10.0 +# Path to input.json DAG configuration file (project root) +_INPUT_JSON_PATH = Path(__file__).resolve().parents[3] / "input.json" + + +def _load_dag_from_input_json() -> DagConfig | None: + """Load DAG config from input.json if it exists. Returns None otherwise.""" + if not _INPUT_JSON_PATH.exists(): + return None + try: + raw = json.loads(_INPUT_JSON_PATH.read_text()) + dag = DagConfig.model_validate(raw) + logger.info( + f"Loaded DAG from {_INPUT_JSON_PATH}: {dag.get_pipeline_node_ids()}" + ) + return dag + except Exception as e: + logger.error(f"Failed to load DAG from {_INPUT_JSON_PATH}: {e}") + return None + + +def get_pipeline_ids_from_input_json() -> list[str] | None: + """Return pipeline IDs from input.json, or None if not available.""" + dag = _load_dag_from_input_json() + if dag is None: + return None + return [ + n.pipeline_id + for n in dag.nodes + if n.type == "pipeline" and n.pipeline_id is not None + ] + class FrameProcessor: """Processes video frames through pipelines or cloud relay. @@ -101,9 +136,17 @@ def __init__( # Input mode: video waits for frames, text generates immediately self._video_mode = (initial_parameters or {}).get("input_mode") == "video" - # Pipeline chaining support + # Pipeline chaining / DAG support self.pipeline_processors: list[PipelineProcessor] = [] self.pipeline_ids: list[str] = [] + # DAG run (set when using DAG executor; enables shared input and multi-port) + self._dag_run: DagRun | None = None + # When False, use pipeline_ids from initial_parameters only (Pipeline ID + Preprocessor + Postprocessor) + self._use_dag: bool = (initial_parameters or {}).get("use_dag", True) + + # Latest input frame for DAG preview + self._preview_input_frame: torch.Tensor | None = None + self._preview_input_lock = threading.Lock() # Frame counting for debug logging self._frames_in = 0 @@ -113,10 +156,38 @@ def __init__( self._playback_ready_emitted = False self._stream_start_time: float | None = None - # Store pipeline_ids from initial_parameters if provided - pipeline_ids = (initial_parameters or {}).get("pipeline_ids") - if pipeline_ids is not None: - self.pipeline_ids = pipeline_ids + # Override pipeline_ids only when use_dag: API DAG > input.json > UI params + if self._use_dag: + from .dag_state import get_api_dag + + api_dag = get_api_dag() + if api_dag is not None: + dag_pipeline_ids = [ + n.pipeline_id + for n in api_dag.nodes + if n.type == "pipeline" and n.pipeline_id is not None + ] + self.pipeline_ids = dag_pipeline_ids + logger.info( + f"[FRAME-PROCESSOR] Using pipeline_ids from API DAG: {dag_pipeline_ids}" + ) + elif (dag_pipeline_ids := get_pipeline_ids_from_input_json()) is not None: + self.pipeline_ids = dag_pipeline_ids + logger.info( + f"[FRAME-PROCESSOR] Using pipeline_ids from input.json: {dag_pipeline_ids}" + ) + else: + pipeline_ids = (initial_parameters or {}).get("pipeline_ids") + if pipeline_ids is not None: + self.pipeline_ids = pipeline_ids + else: + # Graph mode OFF: use standard Pipeline ID + Preprocessor + Postprocessor + pipeline_ids = (initial_parameters or {}).get("pipeline_ids") + if pipeline_ids is not None: + self.pipeline_ids = pipeline_ids + logger.info( + "[FRAME-PROCESSOR] Using pipeline_ids from settings (graph mode OFF)" + ) def start(self): if self.running: @@ -235,8 +306,9 @@ def stop(self, error_message: str = None): for processor in self.pipeline_processors: processor.stop() - # Clear pipeline processors + # Clear pipeline processors and DAG run self.pipeline_processors.clear() + self._dag_run = None # Clean up output sink self.output_sink_enabled = False @@ -365,18 +437,13 @@ def put(self, frame: VideoFrame) -> bool: return False return False - # Local mode: put into first processor's input queue + # Local mode: put into source queue(s) (DAG) or first processor's input queue (chain) if self.pipeline_processors: - first_processor = self.pipeline_processors[0] - frame_array = frame.to_ndarray(format="rgb24") if torch.cuda.is_available(): shape = frame_array.shape pinned_buffer = self._get_or_create_pinned_buffer(shape) - # Note: We reuse pinned buffers for performance. This assumes the copy_() - # operation completes before the next frame arrives. - # In practice, copy_() is very fast (~microseconds) and frames arrive at 60 FPS max pinned_buffer.copy_(torch.as_tensor(frame_array, dtype=torch.uint8)) frame_tensor = pinned_buffer.cuda(non_blocking=True) else: @@ -384,13 +451,27 @@ def put(self, frame: VideoFrame) -> bool: frame_tensor = frame_tensor.unsqueeze(0) - # Put frame into first processor's input queue - try: - first_processor.input_queue.put_nowait(frame_tensor) - except queue.Full: - # Queue full, drop frame (non-blocking) - logger.debug("First processor input queue full, dropping frame") - return False + # Store input frame for DAG preview + with self._preview_input_lock: + self._preview_input_frame = frame_tensor + + if self._dag_run and self._dag_run.source_queues: + # DAG: fan-out to all source queues (e.g. shared input -> yolo + longlive) + for i, q in enumerate(self._dag_run.source_queues): + try: + # Clone for 2nd+ queues so each consumer has its own tensor + tensor = frame_tensor if i == 0 else frame_tensor.clone() + q.put_nowait(tensor) + except queue.Full: + logger.debug("Source queue full, dropping frame") + return False + else: + first_processor = self.pipeline_processors[0] + try: + first_processor.input_queue.put_nowait(frame_tensor) + except queue.Full: + logger.debug("First processor input queue full, dropping frame") + return False return True @@ -409,16 +490,26 @@ def get(self) -> torch.Tensor | None: except queue.Empty: return None else: - # Local mode: get from pipeline processor + # Local mode: get from sink processor (DAG) or last in chain if not self.pipeline_processors: return None - last_processor = self.pipeline_processors[-1] - if not last_processor.output_queue: + out_proc = ( + self._dag_run.sink_processor + if self._dag_run and self._dag_run.sink_processor + else self.pipeline_processors[-1] + ) + if not out_proc: + return None + if not out_proc.output_queue: + logger.debug( + "DAG sink processor %s has no output_queue (playback will wait)", + getattr(out_proc, "pipeline_id", "?"), + ) return None try: - frame = last_processor.output_queue.get_nowait() + frame = out_proc.output_queue.get_nowait() # Frame is stored as [1, H, W, C], convert to [H, W, C] for output # Move to CPU here for WebRTC streaming (frames stay on GPU between pipeline processors) frame = frame.squeeze(0) @@ -496,9 +587,12 @@ def get_fps(self) -> float: if not self.pipeline_processors: return DEFAULT_FPS - # Get FPS from the last processor in the chain - last_processor = self.pipeline_processors[-1] - return last_processor.get_fps() + out_proc = ( + self._dag_run.sink_processor + if self._dag_run and self._dag_run.sink_processor + else self.pipeline_processors[-1] + ) + return out_proc.get_fps() def _log_frame_stats(self): """Log frame processing statistics and emit heartbeat event.""" @@ -579,6 +673,67 @@ def get_frame_stats(self) -> dict: return stats + def get_preview_frames(self) -> dict[str, bytes]: + """Collect preview frames from all processors and encode as JPEG thumbnails. + + Returns a dict mapping node_id -> JPEG bytes for each node that has a frame. + """ + import io + + from PIL import Image + + previews: dict[str, bytes] = {} + max_width = 192 + + def encode_frame(tensor: torch.Tensor) -> bytes: + """Convert a [1, H, W, C] uint8 tensor to JPEG bytes.""" + frame = tensor.squeeze(0) + if frame.is_cuda: + frame = frame.cpu() + frame_np = frame.numpy() + h, w = frame_np.shape[:2] + if w > max_width: + scale = max_width / w + new_w = max_width + new_h = int(h * scale) + img = Image.fromarray(frame_np).resize((new_w, new_h), Image.LANCZOS) + else: + img = Image.fromarray(frame_np) + buf = io.BytesIO() + img.save(buf, format="JPEG", quality=60) + return buf.getvalue() + + # Input frame (source nodes) + with self._preview_input_lock: + input_frame = self._preview_input_frame + if input_frame is not None: + try: + previews["input"] = encode_frame(input_frame) + except Exception: + pass + + # Each pipeline processor's output frame + for processor in self.pipeline_processors: + frame = processor.get_preview_frame() + if frame is not None: + try: + previews[processor.pipeline_id] = encode_frame(frame) + except Exception: + pass + + # Sink/output nodes reuse the sink processor's preview + if self._dag_run and self._dag_run.sink_processor: + sink_frame = self._dag_run.sink_processor.get_preview_frame() + if sink_frame is not None: + try: + sink_jpeg = encode_frame(sink_frame) + for output_id in self._dag_run.output_node_ids: + previews[output_id] = sink_jpeg + except Exception: + pass + + return previews + def _get_pipeline_dimensions(self) -> tuple[int, int]: """Get current pipeline dimensions from pipeline manager.""" try: @@ -905,43 +1060,67 @@ def _input_source_receiver_loop(self): ) def _setup_pipeline_chain_sync(self): - """Create pipeline processor chain (synchronous). + """Create pipeline DAG or linear chain (synchronous). + When use_dag is False, builds a linear chain from pipeline_ids (Pipeline ID + + Preprocessor + Postprocessor). Otherwise: API DAG > input.json > parameters > + linear fallback. Assumes all pipelines are already loaded by the pipeline manager. """ - if not self.pipeline_ids: - logger.error("No pipeline IDs provided") - return - - # Create pipeline processors (each creates its own queues) - for pipeline_id in self.pipeline_ids: - # Get pipeline instance from manager - pipeline = self.pipeline_manager.get_pipeline_by_id(pipeline_id) - - # Create processor with its own queues - processor = PipelineProcessor( - pipeline=pipeline, - pipeline_id=pipeline_id, - initial_parameters=self.parameters.copy(), - session_id=self.session_id, - user_id=self.user_id, - connection_id=self.connection_id, - connection_info=self.connection_info, + dag_config: DagConfig + + if not self._use_dag: + # Graph mode OFF: linear chain from pipeline_ids only + if not self.pipeline_ids: + logger.error("No pipeline IDs provided (graph mode OFF)") + return + dag_config = linear_dag_from_pipeline_ids(self.pipeline_ids) + logger.info( + "[FRAME-PROCESSOR] Using linear chain from settings (graph mode OFF)" ) + else: + # Priority: API DAG > input.json > parameters > linear fallback + from .dag_state import get_api_dag + + api_dag = get_api_dag() + if api_dag is not None: + dag_config = api_dag + logger.info("[FRAME-PROCESSOR] Using DAG from API") + elif (file_dag := _load_dag_from_input_json()) is not None: + dag_config = file_dag + logger.info("[FRAME-PROCESSOR] Using DAG from input.json") + elif not self.pipeline_ids and not self.parameters.get("dag"): + logger.error("No pipeline IDs or DAG config provided") + return + else: + dag_raw = self.parameters.get("dag") + if isinstance(dag_raw, dict): + dag_config = DagConfig.model_validate(dag_raw) + elif isinstance(dag_raw, str): + dag_config = DagConfig.model_validate_json(dag_raw) + else: + dag_config = linear_dag_from_pipeline_ids(self.pipeline_ids) - self.pipeline_processors.append(processor) + self._dag_run = build_dag( + dag=dag_config, + pipeline_manager=self.pipeline_manager, + initial_parameters=self.parameters.copy(), + session_id=self.session_id, + user_id=self.user_id, + connection_id=self.connection_id, + connection_info=self.connection_info, + ) - for i in range(1, len(self.pipeline_processors)): - prev_processor = self.pipeline_processors[i - 1] - curr_processor = self.pipeline_processors[i] - prev_processor.set_next_processor(curr_processor) + self.pipeline_processors = self._dag_run.processors + if self._dag_run.pipeline_ids: + self.pipeline_ids = self._dag_run.pipeline_ids - # Start all processors for processor in self.pipeline_processors: processor.start() logger.info( - f"Created pipeline chain with {len(self.pipeline_processors)} processors" + f"Created pipeline DAG with {len(self.pipeline_processors)} processor(s): " + f"{self.pipeline_ids}" ) def __enter__(self): diff --git a/src/scope/server/pipeline_processor.py b/src/scope/server/pipeline_processor.py index 14559f95f..08f949ddc 100644 --- a/src/scope/server/pipeline_processor.py +++ b/src/scope/server/pipeline_processor.py @@ -61,9 +61,15 @@ def __init__( self.connection_id = connection_id self.connection_info = connection_info - # Each processor creates its own queues - self.input_queue = queue.Queue(maxsize=30) - self.output_queue = queue.Queue(maxsize=8) + # Unified port-based queues: all frame streams (video, vace_input_frames, etc.) use queues + self.input_queues: dict[str, queue.Queue] = { + "video": queue.Queue(maxsize=30), + } + self.output_queues: dict[str, list[queue.Queue]] = { + "video": [queue.Queue(maxsize=8)], + } + # Primary queue refs for backward compat (chain mode, get_fps, resize) + self.input_queue = self.input_queues["video"] # Lock to protect input_queue assignment for thread-safe reference swapping self.input_queue_lock = threading.Lock() @@ -109,35 +115,32 @@ def __init__( # the next pipeline in the chain can consume them self.throttler = PipelineThrottler() - def _resize_output_queue(self, target_size: int): - """Resize the output queue to the target size, transferring existing frames. + # Latest output frame for DAG preview (read-only from preview side) + self._preview_frame: torch.Tensor | None = None + self._preview_frame_lock = threading.Lock() - Args: - target_size: The desired maximum size for the output queue - """ - if self.output_queue is None: + def _resize_output_queue(self, target_size: int): + """Resize the primary video output queue, transferring existing frames.""" + video_queues = self.output_queues.get("video") + if not video_queues: return - - if self.output_queue.maxsize < target_size: + primary = video_queues[0] + if primary.maxsize < target_size: logger.info( - f"Increasing output queue size to {target_size}, current size {self.output_queue.maxsize}" + f"Increasing output queue size to {target_size}, current size {primary.maxsize}" ) - - # Transfer frames from old queue to new queue - old_queue = self.output_queue - self.output_queue = queue.Queue(maxsize=target_size) - while not old_queue.empty(): + new_queue = queue.Queue(maxsize=target_size) + while not primary.empty(): try: - frame = old_queue.get_nowait() - self.output_queue.put_nowait(frame) + frame = primary.get_nowait() + new_queue.put_nowait(frame) except queue.Empty: break - - # Update next processor's input_queue to point to the new output_queue - # Use lock to ensure thread-safe reference swapping + self.output_queues["video"] = [new_queue] + video_queues[1:] if self.next_processor is not None: with self.next_processor.input_queue_lock: - self.next_processor.input_queue = self.output_queue + self.next_processor.input_queues["video"] = new_queue + self.next_processor.input_queue = new_queue def set_next_processor(self, next_processor: "PipelineProcessor"): """Set the next processor in the chain and update output queue size accordingly. @@ -161,7 +164,14 @@ def set_next_processor(self, next_processor: "PipelineProcessor"): # Update next processor's input_queue to point to this output_queue # Use lock to ensure thread-safe reference swapping with next_processor.input_queue_lock: - next_processor.input_queue = self.output_queue + next_processor.input_queues["video"] = self.output_queues["video"][0] + next_processor.input_queue = next_processor.input_queues["video"] + + @property + def output_queue(self) -> queue.Queue | None: + """Primary video output queue (for chain mode and sink get()).""" + queues = self.output_queues.get("video") + return queues[0] if queues else None def start(self): """Start the pipeline processor thread.""" @@ -199,12 +209,13 @@ def stop(self): except queue.Empty: break - if self.output_queue: - while not self.output_queue.empty(): - try: - self.output_queue.get_nowait() - except queue.Empty: - break + for queues in self.output_queues.values(): + for q in queues: + while not q.empty(): + try: + q.get_nowait() + except queue.Empty: + break logger.info(f"PipelineProcessor stopped for pipeline: {self.pipeline_id}") @@ -267,41 +278,11 @@ def prepare_chunk( self, input_queue_ref: queue.Queue, chunk_size: int ) -> list[torch.Tensor]: """ - Sample frames uniformly from the queue, convert them to tensors, and remove processed frames. - - This function implements uniform sampling across the entire queue to ensure - temporal coverage of input frames. It samples frames at evenly distributed - indices and removes all frames up to the last sampled frame to prevent - queue buildup. - - Note: - This function must be called with a queue reference obtained while holding - input_queue_lock. The caller is responsible for thread safety. - - Example: - With queue_size=8 and chunk_size=4: - - step = 8/4 = 2.0 - - indices = [0, 2, 4, 6] (uniformly distributed) - - Returns frames at positions 0, 2, 4, 6 - - Removes frames 0-6 from queue (7 frames total) - - Keeps frame 7 in queue - - Args: - input_queue_ref: Reference to the input queue (obtained while holding lock) - chunk_size: Number of frames to sample - - Returns: - List of tensor frames, each (1, H, W, C) for downstream preprocess_chunk + Sample frames uniformly from one queue (used when only video port is present). """ - - # Calculate uniform sampling step step = input_queue_ref.qsize() / chunk_size - # Generate indices for uniform sampling indices = [round(i * step) for i in range(chunk_size)] - # Extract VideoFrames at sampled indices video_frames = [] - - # Drop all frames up to and including the last sampled frame last_idx = indices[-1] for i in range(last_idx + 1): frame = input_queue_ref.get_nowait() @@ -309,6 +290,36 @@ def prepare_chunk( video_frames.append(frame) return video_frames + def prepare_multi_chunk( + self, + input_queues_ref: dict[str, queue.Queue], + primary_port: str, + chunk_size: int, + ) -> dict[str, list[torch.Tensor]]: + """ + Sample frames uniformly from the primary queue, then the same indices from other ports. + + Returns dict mapping port name to list of tensors (each 1,H,W,C). All ports + must have at least as many frames as we consume from the primary. + """ + primary = input_queues_ref.get(primary_port) + if primary is None or primary.qsize() < chunk_size: + return {} + step = primary.qsize() / chunk_size + indices = [round(i * step) for i in range(chunk_size)] + last_idx = indices[-1] + out: dict[str, list[torch.Tensor]] = {port: [] for port in input_queues_ref} + for port, q in input_queues_ref.items(): + if q.qsize() <= last_idx: + return {} + for port in input_queues_ref: + q = input_queues_ref[port] + for i in range(last_idx + 1): + frame = q.get_nowait() + if i in indices: + out[port].append(frame) + return out + def process_chunk(self): """Process a single chunk of frames.""" # Check if there are new parameters @@ -361,16 +372,16 @@ def process_chunk(self): reset_cache = self.parameters.pop("reset_cache", None) lora_scales = self.parameters.pop("lora_scales", None) - # Handle reset_cache: clear this processor's cache + # Handle reset_cache: clear this processor's output queues if reset_cache: logger.info(f"Clearing cache for pipeline processor: {self.pipeline_id}") - # Clear output queue - if self.output_queue: - while not self.output_queue.empty(): - try: - self.output_queue.get_nowait() - except queue.Empty: - break + for queues in self.output_queues.values(): + for q in queues: + while not q.empty(): + try: + q.get_nowait() + except queue.Empty: + break requirements = None if hasattr(self.pipeline, "prepare"): @@ -380,25 +391,26 @@ def process_chunk(self): prepare_params["video"] = True requirements = self.pipeline.prepare(**prepare_params) - video_input = None + chunks: dict[str, list[torch.Tensor]] = {} input_frame_count = 0 if requirements is not None: current_chunk_size = requirements.input_size - - # Capture a local reference to input_queue while holding the lock - # This ensures thread-safe access even if input_queue is reassigned with self.input_queue_lock: - input_queue_ref = self.input_queue - - # Check if queue has enough frames before consuming them - if input_queue_ref.qsize() < current_chunk_size: - # Not enough frames in queue, sleep briefly and try again next iteration + input_queues_ref = dict(self.input_queues) + primary = input_queues_ref.get("video") + if primary is None or primary.qsize() < current_chunk_size: self.shutdown_event.wait(SLEEP_TIME) return - - # Use prepare_chunk to uniformly sample frames from the queue - video_input = self.prepare_chunk(input_queue_ref, current_chunk_size) - input_frame_count = len(video_input) if video_input else 0 + if len(input_queues_ref) == 1: + chunks["video"] = self.prepare_chunk(primary, current_chunk_size) + else: + chunks = self.prepare_multi_chunk( + input_queues_ref, "video", current_chunk_size + ) + if not chunks: + self.shutdown_event.wait(SLEEP_TIME) + return + input_frame_count = len(chunks.get("video") or []) try: # Pass parameters (excluding prepare-only parameters) @@ -420,33 +432,35 @@ def process_chunk(self): # Reset mouse accumulator, keep key state self.parameters["ctrl_input"]["mouse"] = [0.0, 0.0] - # Route video input based on VACE status - # Don't overwrite if preprocessor already provided vace_input_frames - if video_input is not None and "vace_input_frames" not in call_params: - if ( - self._pipeline_supports_vace - and self.vace_enabled - and self.vace_use_input_video - ): - call_params["vace_input_frames"] = video_input - else: - call_params["video"] = video_input + # Fill call_params from stream chunks + if chunks: + if "vace_input_frames" in chunks and "vace_input_masks" in chunks: + call_params["vace_input_frames"] = chunks["vace_input_frames"] + call_params["vace_input_masks"] = chunks["vace_input_masks"] + if "video" in chunks: + if ( + self._pipeline_supports_vace + and self.vace_enabled + and self.vace_use_input_video + and "vace_input_frames" not in call_params + ): + call_params["vace_input_frames"] = chunks["video"] + else: + call_params["video"] = chunks["video"] + # Pass any other stream ports (e.g. video2 for combine_streams) + for port, frame_list in chunks.items(): + if port in ("video", "vace_input_frames", "vace_input_masks"): + continue + call_params[port] = frame_list processing_start = time.time() output_dict = self.pipeline(**call_params) processing_time = time.time() - processing_start - # Extract video from the returned dictionary output = output_dict.get("video") if output is None: return - # Forward extra params to downstream pipeline (dual-output pattern) - # Preprocessors return {"video": frames, "vace_input_frames": ..., "vace_input_masks": ...} - extra_params = {k: v for k, v in output_dict.items() if k != "video"} - if extra_params and self.next_processor is not None: - self.next_processor.update_parameters(extra_params) - # Clear one-shot parameters after use to prevent sending them on subsequent chunks # These parameters should only be sent when explicitly provided in parameter updates one_shot_params = [ @@ -489,29 +503,62 @@ def process_chunk(self): .detach() ) - # Resize output queue to meet target max size - target_output_queue_max_size = num_frames * OUTPUT_QUEUE_MAX_SIZE_FACTOR - self._resize_output_queue(target_output_queue_max_size) - - # Put frames in output queue - # For intermediate pipelines, output goes to next pipeline's input - # For last pipeline, output goes to frame_processor's output_queue - # Output frames are [H, W, C], convert to [1, H, W, C] for consistency - for frame in output: - frame = frame.unsqueeze(0) - # Track when a frame is ready (production rate) - self._track_output_frame() - try: - self.output_queue.put_nowait(frame) - except queue.Full: - logger.info( - f"Output queue full for {self.pipeline_id}, dropping processed frame" - ) + # Resize primary video output queue to meet target max size. + # Skip for the sink (no next_processor): keep the DAG-wired queue so + # frame_processor.get() always reads from the same queue we write to. + if self.next_processor is not None: + target_output_queue_max_size = num_frames * OUTPUT_QUEUE_MAX_SIZE_FACTOR + self._resize_output_queue(target_output_queue_max_size) + + # Put each output port's frames to its queues (all frame ports are streamed) + for port, value in output_dict.items(): + if value is None or not isinstance(value, torch.Tensor): + continue + queues = self.output_queues.get(port) + if not queues: continue + if value.dtype != torch.uint8: + value = ( + (value * 255.0) + .clamp(0, 255) + .to(dtype=torch.uint8) + .contiguous() + .detach() + ) + frames = [value[i].unsqueeze(0) for i in range(value.shape[0])] + for frame in frames: + if port == "video": + self._track_output_frame() + for q in queues: + try: + q.put_nowait(frame if q is queues[0] else frame.clone()) + except queue.Full: + if port == "video": + logger.info( + f"Output queue full for {self.pipeline_id}, dropping frame" + ) + break + + # Store the last video output frame for DAG preview + video_output = output_dict.get("video") + if ( + video_output is not None + and isinstance(video_output, torch.Tensor) + and video_output.shape[0] > 0 + ): + last_frame = video_output[-1].unsqueeze(0) + if last_frame.dtype != torch.uint8: + last_frame = ( + (last_frame * 255.0) + .clamp(0, 255) + .to(dtype=torch.uint8) + .contiguous() + .detach() + ) + with self._preview_frame_lock: + self._preview_frame = last_frame - # Apply throttling if this pipeline is producing faster than next can consume - # Only throttle if: (1) has video input, (2) has next processor - if video_input is not None and self.next_processor is not None: + if chunks and self.next_processor is not None: self.throttler.throttle() except Exception as e: @@ -553,6 +600,11 @@ def _calculate_output_fps(self): estimated_fps = max(MIN_FPS, min(MAX_FPS, estimated_fps)) self.current_output_fps = estimated_fps + def get_preview_frame(self) -> torch.Tensor | None: + """Get the latest output frame for preview (thread-safe).""" + with self._preview_frame_lock: + return self._preview_frame + def get_fps(self) -> float: """Get the current dynamically calculated pipeline FPS. diff --git a/src/scope/server/schema.py b/src/scope/server/schema.py index 843b0684b..30b29c6c0 100644 --- a/src/scope/server/schema.py +++ b/src/scope/server/schema.py @@ -450,6 +450,11 @@ class PipelineLoadRequest(BaseModel): """Pipeline load request schema.""" pipeline_ids: list[str] = Field(..., description="List of pipeline IDs to load") + use_dag: bool = Field( + default=True, + description="When False, use request pipeline_ids only (Pipeline ID + Preprocessor + Postprocessor). " + "When True, server may override with API DAG or input.json.", + ) load_params: dict[str, Any] | None = Field( default=None, description="Pipeline-specific load parameters (applies to all pipelines). " diff --git a/tests/test_dag.py b/tests/test_dag.py new file mode 100644 index 000000000..5a60f378b --- /dev/null +++ b/tests/test_dag.py @@ -0,0 +1,113 @@ +"""Tests for DAG pipeline execution (dag_schema, dag_executor, linear DAG).""" + +import queue +from unittest.mock import MagicMock + +import pytest + +from scope.server.dag_executor import build_dag, linear_dag_from_pipeline_ids +from scope.server.dag_schema import DagConfig + + +class TestLinearDagFromPipelineIds: + """Tests for linear_dag_from_pipeline_ids.""" + + def test_single_pipeline(self): + dag = linear_dag_from_pipeline_ids(["longlive"]) + assert [n.id for n in dag.nodes] == ["input", "longlive", "output"] + edges = [(e.from_node, e.to_node, e.from_port, e.kind) for e in dag.edges] + assert ("input", "longlive", "video", "stream") in edges + assert ("longlive", "output", "video", "stream") in edges + assert all(e.kind == "stream" for e in dag.edges) + + def test_two_pipelines_includes_vace_stream_edges(self): + dag = linear_dag_from_pipeline_ids(["passthrough", "longlive"]) + assert len(dag.get_pipeline_node_ids()) == 2 + vace_edges = [ + e + for e in dag.edges + if e.from_port in ("vace_input_frames", "vace_input_masks") + ] + assert len(vace_edges) == 2 + assert all(e.kind == "stream" for e in vace_edges) + + def test_explicit_dag_config_roundtrip(self): + raw = { + "nodes": [ + {"id": "input", "type": "source"}, + {"id": "p1", "type": "pipeline", "pipeline_id": "passthrough"}, + {"id": "output", "type": "sink"}, + ], + "edges": [ + { + "from": "input", + "from_port": "video", + "to_node": "p1", + "to_port": "video", + "kind": "stream", + }, + { + "from": "p1", + "from_port": "video", + "to_node": "output", + "to_port": "video", + "kind": "stream", + }, + ], + } + config = DagConfig.model_validate(raw) + assert config.node_by_id("p1").pipeline_id == "passthrough" + assert len(config.edges) == 2 + + +class TestBuildDag: + """Tests for build_dag with a mock pipeline manager.""" + + @pytest.fixture + def mock_pipeline(self): + """Minimal pipeline mock: prepare() and __call__ return video.""" + p = MagicMock() + p.prepare.return_value = MagicMock(input_size=4) + return p + + @pytest.fixture + def mock_pipeline_manager(self, mock_pipeline): + """Pipeline manager that returns the mock pipeline for any known id.""" + mgr = MagicMock() + mgr.get_pipeline_by_id.side_effect = lambda pid: mock_pipeline + return mgr + + def test_build_linear_dag_returns_dag_run(self, mock_pipeline_manager): + dag = linear_dag_from_pipeline_ids(["passthrough"]) + run = build_dag( + dag=dag, + pipeline_manager=mock_pipeline_manager, + initial_parameters={"pipeline_ids": ["passthrough"], "input_mode": "video"}, + ) + assert run.sink_processor is not None + assert run.pipeline_ids == ["passthrough"] + assert len(run.processors) == 1 + assert len(run.source_queues) == 1 + + def test_build_dag_wires_source_queues(self, mock_pipeline_manager): + dag = linear_dag_from_pipeline_ids(["passthrough"]) + run = build_dag( + dag=dag, + pipeline_manager=mock_pipeline_manager, + initial_parameters={}, + ) + # One source node (input) -> one queue to first pipeline + assert len(run.source_queues) == 1 + q = run.source_queues[0] + assert isinstance(q, queue.Queue) + + def test_build_dag_sink_has_video_output_queue(self, mock_pipeline_manager): + dag = linear_dag_from_pipeline_ids(["passthrough"]) + run = build_dag( + dag=dag, + pipeline_manager=mock_pipeline_manager, + initial_parameters={}, + ) + assert run.sink_processor is not None + assert run.sink_processor.output_queue is not None + assert "video" in run.sink_processor.output_queues diff --git a/uv.lock b/uv.lock index 48eaf59e3..01dec3396 100644 --- a/uv.lock +++ b/uv.lock @@ -557,6 +557,7 @@ dependencies = [ { name = "triton-windows", marker = "sys_platform == 'win32'" }, { name = "twilio" }, { name = "uvicorn" }, + { name = "websockets" }, ] [package.optional-dependencies] @@ -611,6 +612,7 @@ requires-dist = [ { name = "triton-windows", marker = "sys_platform == 'win32'", specifier = "==3.5.1.post24" }, { name = "twilio", specifier = ">=9.8.0" }, { name = "uvicorn", specifier = ">=0.35.0" }, + { name = "websockets", specifier = ">=15.0" }, ] provides-extras = ["kafka"] @@ -2785,6 +2787,51 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/af/b5/123f13c975e9f27ab9c0770f514345bd406d0e8d3b7a0723af9d43f710af/wcwidth-0.2.14-py2.py3-none-any.whl", hash = "sha256:a7bb560c8aee30f9957e5f9895805edd20602f2d7f720186dfd906e82b4982e1", size = 37286, upload-time = "2025-09-22T16:29:51.641Z" }, ] +[[package]] +name = "websockets" +version = "16.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/04/24/4b2031d72e840ce4c1ccb255f693b15c334757fc50023e4db9537080b8c4/websockets-16.0.tar.gz", hash = "sha256:5f6261a5e56e8d5c42a4497b364ea24d94d9563e8fbd44e78ac40879c60179b5", size = 179346, upload-time = "2026-01-10T09:23:47.181Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/84/7b/bac442e6b96c9d25092695578dda82403c77936104b5682307bd4deb1ad4/websockets-16.0-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:71c989cbf3254fbd5e84d3bff31e4da39c43f884e64f2551d14bb3c186230f00", size = 177365, upload-time = "2026-01-10T09:22:46.787Z" }, + { url = "https://files.pythonhosted.org/packages/b0/fe/136ccece61bd690d9c1f715baaeefd953bb2360134de73519d5df19d29ca/websockets-16.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:8b6e209ffee39ff1b6d0fa7bfef6de950c60dfb91b8fcead17da4ee539121a79", size = 175038, upload-time = "2026-01-10T09:22:47.999Z" }, + { url = "https://files.pythonhosted.org/packages/40/1e/9771421ac2286eaab95b8575b0cb701ae3663abf8b5e1f64f1fd90d0a673/websockets-16.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:86890e837d61574c92a97496d590968b23c2ef0aeb8a9bc9421d174cd378ae39", size = 175328, upload-time = "2026-01-10T09:22:49.809Z" }, + { url = "https://files.pythonhosted.org/packages/18/29/71729b4671f21e1eaa5d6573031ab810ad2936c8175f03f97f3ff164c802/websockets-16.0-cp312-cp312-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:9b5aca38b67492ef518a8ab76851862488a478602229112c4b0d58d63a7a4d5c", size = 184915, upload-time = "2026-01-10T09:22:51.071Z" }, + { url = "https://files.pythonhosted.org/packages/97/bb/21c36b7dbbafc85d2d480cd65df02a1dc93bf76d97147605a8e27ff9409d/websockets-16.0-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:e0334872c0a37b606418ac52f6ab9cfd17317ac26365f7f65e203e2d0d0d359f", size = 186152, upload-time = "2026-01-10T09:22:52.224Z" }, + { url = "https://files.pythonhosted.org/packages/4a/34/9bf8df0c0cf88fa7bfe36678dc7b02970c9a7d5e065a3099292db87b1be2/websockets-16.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:a0b31e0b424cc6b5a04b8838bbaec1688834b2383256688cf47eb97412531da1", size = 185583, upload-time = "2026-01-10T09:22:53.443Z" }, + { url = "https://files.pythonhosted.org/packages/47/88/4dd516068e1a3d6ab3c7c183288404cd424a9a02d585efbac226cb61ff2d/websockets-16.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:485c49116d0af10ac698623c513c1cc01c9446c058a4e61e3bf6c19dff7335a2", size = 184880, upload-time = "2026-01-10T09:22:55.033Z" }, + { url = "https://files.pythonhosted.org/packages/91/d6/7d4553ad4bf1c0421e1ebd4b18de5d9098383b5caa1d937b63df8d04b565/websockets-16.0-cp312-cp312-win32.whl", hash = "sha256:eaded469f5e5b7294e2bdca0ab06becb6756ea86894a47806456089298813c89", size = 178261, upload-time = "2026-01-10T09:22:56.251Z" }, + { url = "https://files.pythonhosted.org/packages/c3/f0/f3a17365441ed1c27f850a80b2bc680a0fa9505d733fe152fdf5e98c1c0b/websockets-16.0-cp312-cp312-win_amd64.whl", hash = "sha256:5569417dc80977fc8c2d43a86f78e0a5a22fee17565d78621b6bb264a115d4ea", size = 178693, upload-time = "2026-01-10T09:22:57.478Z" }, + { url = "https://files.pythonhosted.org/packages/cc/9c/baa8456050d1c1b08dd0ec7346026668cbc6f145ab4e314d707bb845bf0d/websockets-16.0-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:878b336ac47938b474c8f982ac2f7266a540adc3fa4ad74ae96fea9823a02cc9", size = 177364, upload-time = "2026-01-10T09:22:59.333Z" }, + { url = "https://files.pythonhosted.org/packages/7e/0c/8811fc53e9bcff68fe7de2bcbe75116a8d959ac699a3200f4847a8925210/websockets-16.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:52a0fec0e6c8d9a784c2c78276a48a2bdf099e4ccc2a4cad53b27718dbfd0230", size = 175039, upload-time = "2026-01-10T09:23:01.171Z" }, + { url = "https://files.pythonhosted.org/packages/aa/82/39a5f910cb99ec0b59e482971238c845af9220d3ab9fa76dd9162cda9d62/websockets-16.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:e6578ed5b6981005df1860a56e3617f14a6c307e6a71b4fff8c48fdc50f3ed2c", size = 175323, upload-time = "2026-01-10T09:23:02.341Z" }, + { url = "https://files.pythonhosted.org/packages/bd/28/0a25ee5342eb5d5f297d992a77e56892ecb65e7854c7898fb7d35e9b33bd/websockets-16.0-cp313-cp313-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:95724e638f0f9c350bb1c2b0a7ad0e83d9cc0c9259f3ea94e40d7b02a2179ae5", size = 184975, upload-time = "2026-01-10T09:23:03.756Z" }, + { url = "https://files.pythonhosted.org/packages/f9/66/27ea52741752f5107c2e41fda05e8395a682a1e11c4e592a809a90c6a506/websockets-16.0-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:c0204dc62a89dc9d50d682412c10b3542d748260d743500a85c13cd1ee4bde82", size = 186203, upload-time = "2026-01-10T09:23:05.01Z" }, + { url = "https://files.pythonhosted.org/packages/37/e5/8e32857371406a757816a2b471939d51c463509be73fa538216ea52b792a/websockets-16.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:52ac480f44d32970d66763115edea932f1c5b1312de36df06d6b219f6741eed8", size = 185653, upload-time = "2026-01-10T09:23:06.301Z" }, + { url = "https://files.pythonhosted.org/packages/9b/67/f926bac29882894669368dc73f4da900fcdf47955d0a0185d60103df5737/websockets-16.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:6e5a82b677f8f6f59e8dfc34ec06ca6b5b48bc4fcda346acd093694cc2c24d8f", size = 184920, upload-time = "2026-01-10T09:23:07.492Z" }, + { url = "https://files.pythonhosted.org/packages/3c/a1/3d6ccdcd125b0a42a311bcd15a7f705d688f73b2a22d8cf1c0875d35d34a/websockets-16.0-cp313-cp313-win32.whl", hash = "sha256:abf050a199613f64c886ea10f38b47770a65154dc37181bfaff70c160f45315a", size = 178255, upload-time = "2026-01-10T09:23:09.245Z" }, + { url = "https://files.pythonhosted.org/packages/6b/ae/90366304d7c2ce80f9b826096a9e9048b4bb760e44d3b873bb272cba696b/websockets-16.0-cp313-cp313-win_amd64.whl", hash = "sha256:3425ac5cf448801335d6fdc7ae1eb22072055417a96cc6b31b3861f455fbc156", size = 178689, upload-time = "2026-01-10T09:23:10.483Z" }, + { url = "https://files.pythonhosted.org/packages/f3/1d/e88022630271f5bd349ed82417136281931e558d628dd52c4d8621b4a0b2/websockets-16.0-cp314-cp314-macosx_10_15_universal2.whl", hash = "sha256:8cc451a50f2aee53042ac52d2d053d08bf89bcb31ae799cb4487587661c038a0", size = 177406, upload-time = "2026-01-10T09:23:12.178Z" }, + { url = "https://files.pythonhosted.org/packages/f2/78/e63be1bf0724eeb4616efb1ae1c9044f7c3953b7957799abb5915bffd38e/websockets-16.0-cp314-cp314-macosx_10_15_x86_64.whl", hash = "sha256:daa3b6ff70a9241cf6c7fc9e949d41232d9d7d26fd3522b1ad2b4d62487e9904", size = 175085, upload-time = "2026-01-10T09:23:13.511Z" }, + { url = "https://files.pythonhosted.org/packages/bb/f4/d3c9220d818ee955ae390cf319a7c7a467beceb24f05ee7aaaa2414345ba/websockets-16.0-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:fd3cb4adb94a2a6e2b7c0d8d05cb94e6f1c81a0cf9dc2694fb65c7e8d94c42e4", size = 175328, upload-time = "2026-01-10T09:23:14.727Z" }, + { url = "https://files.pythonhosted.org/packages/63/bc/d3e208028de777087e6fb2b122051a6ff7bbcca0d6df9d9c2bf1dd869ae9/websockets-16.0-cp314-cp314-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:781caf5e8eee67f663126490c2f96f40906594cb86b408a703630f95550a8c3e", size = 185044, upload-time = "2026-01-10T09:23:15.939Z" }, + { url = "https://files.pythonhosted.org/packages/ad/6e/9a0927ac24bd33a0a9af834d89e0abc7cfd8e13bed17a86407a66773cc0e/websockets-16.0-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:caab51a72c51973ca21fa8a18bd8165e1a0183f1ac7066a182ff27107b71e1a4", size = 186279, upload-time = "2026-01-10T09:23:17.148Z" }, + { url = "https://files.pythonhosted.org/packages/b9/ca/bf1c68440d7a868180e11be653c85959502efd3a709323230314fda6e0b3/websockets-16.0-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:19c4dc84098e523fd63711e563077d39e90ec6702aff4b5d9e344a60cb3c0cb1", size = 185711, upload-time = "2026-01-10T09:23:18.372Z" }, + { url = "https://files.pythonhosted.org/packages/c4/f8/fdc34643a989561f217bb477cbc47a3a07212cbda91c0e4389c43c296ebf/websockets-16.0-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:a5e18a238a2b2249c9a9235466b90e96ae4795672598a58772dd806edc7ac6d3", size = 184982, upload-time = "2026-01-10T09:23:19.652Z" }, + { url = "https://files.pythonhosted.org/packages/dd/d1/574fa27e233764dbac9c52730d63fcf2823b16f0856b3329fc6268d6ae4f/websockets-16.0-cp314-cp314-win32.whl", hash = "sha256:a069d734c4a043182729edd3e9f247c3b2a4035415a9172fd0f1b71658a320a8", size = 177915, upload-time = "2026-01-10T09:23:21.458Z" }, + { url = "https://files.pythonhosted.org/packages/8a/f1/ae6b937bf3126b5134ce1f482365fde31a357c784ac51852978768b5eff4/websockets-16.0-cp314-cp314-win_amd64.whl", hash = "sha256:c0ee0e63f23914732c6d7e0cce24915c48f3f1512ec1d079ed01fc629dab269d", size = 178381, upload-time = "2026-01-10T09:23:22.715Z" }, + { url = "https://files.pythonhosted.org/packages/06/9b/f791d1db48403e1f0a27577a6beb37afae94254a8c6f08be4a23e4930bc0/websockets-16.0-cp314-cp314t-macosx_10_15_universal2.whl", hash = "sha256:a35539cacc3febb22b8f4d4a99cc79b104226a756aa7400adc722e83b0d03244", size = 177737, upload-time = "2026-01-10T09:23:24.523Z" }, + { url = "https://files.pythonhosted.org/packages/bd/40/53ad02341fa33b3ce489023f635367a4ac98b73570102ad2cdd770dacc9a/websockets-16.0-cp314-cp314t-macosx_10_15_x86_64.whl", hash = "sha256:b784ca5de850f4ce93ec85d3269d24d4c82f22b7212023c974c401d4980ebc5e", size = 175268, upload-time = "2026-01-10T09:23:25.781Z" }, + { url = "https://files.pythonhosted.org/packages/74/9b/6158d4e459b984f949dcbbb0c5d270154c7618e11c01029b9bbd1bb4c4f9/websockets-16.0-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:569d01a4e7fba956c5ae4fc988f0d4e187900f5497ce46339c996dbf24f17641", size = 175486, upload-time = "2026-01-10T09:23:27.033Z" }, + { url = "https://files.pythonhosted.org/packages/e5/2d/7583b30208b639c8090206f95073646c2c9ffd66f44df967981a64f849ad/websockets-16.0-cp314-cp314t-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:50f23cdd8343b984957e4077839841146f67a3d31ab0d00e6b824e74c5b2f6e8", size = 185331, upload-time = "2026-01-10T09:23:28.259Z" }, + { url = "https://files.pythonhosted.org/packages/45/b0/cce3784eb519b7b5ad680d14b9673a31ab8dcb7aad8b64d81709d2430aa8/websockets-16.0-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:152284a83a00c59b759697b7f9e9cddf4e3c7861dd0d964b472b70f78f89e80e", size = 186501, upload-time = "2026-01-10T09:23:29.449Z" }, + { url = "https://files.pythonhosted.org/packages/19/60/b8ebe4c7e89fb5f6cdf080623c9d92789a53636950f7abacfc33fe2b3135/websockets-16.0-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:bc59589ab64b0022385f429b94697348a6a234e8ce22544e3681b2e9331b5944", size = 186062, upload-time = "2026-01-10T09:23:31.368Z" }, + { url = "https://files.pythonhosted.org/packages/88/a8/a080593f89b0138b6cba1b28f8df5673b5506f72879322288b031337c0b8/websockets-16.0-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:32da954ffa2814258030e5a57bc73a3635463238e797c7375dc8091327434206", size = 185356, upload-time = "2026-01-10T09:23:32.627Z" }, + { url = "https://files.pythonhosted.org/packages/c2/b6/b9afed2afadddaf5ebb2afa801abf4b0868f42f8539bfe4b071b5266c9fe/websockets-16.0-cp314-cp314t-win32.whl", hash = "sha256:5a4b4cc550cb665dd8a47f868c8d04c8230f857363ad3c9caf7a0c3bf8c61ca6", size = 178085, upload-time = "2026-01-10T09:23:33.816Z" }, + { url = "https://files.pythonhosted.org/packages/9f/3e/28135a24e384493fa804216b79a6a6759a38cc4ff59118787b9fb693df93/websockets-16.0-cp314-cp314t-win_amd64.whl", hash = "sha256:b14dc141ed6d2dde437cddb216004bcac6a1df0935d79656387bd41632ba0bbd", size = 178531, upload-time = "2026-01-10T09:23:35.016Z" }, + { url = "https://files.pythonhosted.org/packages/6f/28/258ebab549c2bf3e64d2b0217b973467394a9cea8c42f70418ca2c5d0d2e/websockets-16.0-py3-none-any.whl", hash = "sha256:1637db62fad1dc833276dded54215f2c7fa46912301a24bd94d45d46a011ceec", size = 171598, upload-time = "2026-01-10T09:23:45.395Z" }, +] + [[package]] name = "yarl" version = "1.22.0"