diff --git a/docs/api/batch.md b/docs/api/batch.md new file mode 100644 index 000000000..7b8ba3d10 --- /dev/null +++ b/docs/api/batch.md @@ -0,0 +1,102 @@ +# Batch Endpoint + +Batch video generation via HTTP. Unlike the WebRTC streaming path (real-time, interactive), the batch endpoint produces a complete video in one request, processing it chunk-by-chunk with SSE progress events. + +Primary consumer: ComfyUI custom nodes (`comfyui-scope`). + +## Endpoints + +| Endpoint | Method | Purpose | +|---|---|---| +| `/api/v1/batch` | POST | Generate video (SSE stream) | +| `/api/v1/batch/cancel` | POST | Cancel after current chunk | +| `/api/v1/batch/upload` | POST | Upload input video for v2v | +| `/api/v1/batch/upload-data` | POST | Upload binary data blob (VACE, per-chunk video) | +| `/api/v1/batch/download` | GET | Download output video | + +Only one generation can run at a time (409 if busy). + +## Flow + +``` +1. [optional] POST /batch/upload → input_path +2. [optional] POST /batch/upload-data → data_blob_path +3. POST /batch (JSON body, references paths from steps 1-2) + ← SSE: event: progress {chunk, total_chunks, frames, latency, fps} + ← SSE: event: complete {output_path, video_shape, num_frames, ...} +4. GET /batch/download?path= + ← binary video data +``` + +## Binary Protocol + +### Video Upload (`/batch/upload`) + +**Request**: Raw uint8 bytes in THWC order (frames × height × width × channels). + +**Headers** (required): +- `X-Video-Frames`: T +- `X-Video-Height`: H +- `X-Video-Width`: W +- `X-Video-Channels`: C (default 3) + +**Stored format**: 20-byte header + raw data. +``` +[4 bytes: ndim (little-endian u32)] +[4 bytes × ndim: shape dimensions (little-endian u32 each)] +[raw uint8 video bytes] +``` + +### Data Blob Upload (`/batch/upload-data`) + +**Request**: Raw binary blob containing packed arrays. Max size: 2 GB. + +The blob is an opaque byte buffer. `ChunkSpec` entries in the batch request reference regions of this blob by offset: + +```json +{ + "chunk": 0, + "vace_frames_offset": 0, + "vace_frames_shape": [1, 3, 12, 320, 576], + "vace_masks_offset": 26542080, + "vace_masks_shape": [1, 1, 12, 320, 576] +} +``` + +Arrays are packed as contiguous float32 (VACE frames/masks) or uint8 (input video). The client is responsible for computing offsets when packing the blob. + +### Video Download (`/batch/download`) + +**Response**: Same binary format as upload (20-byte header + raw uint8 THWC data). + +**Response headers**: +- `X-Video-Frames`, `X-Video-Height`, `X-Video-Width`, `X-Video-Channels` + +## BatchRequest + +```json +{ + "pipeline_id": "longlive", + "prompt": "a cat walking", + "num_frames": 48, + "seed": 42, + "noise_scale": 0.7, + "input_path": "", + "data_blob_path": "", + "chunk_specs": [ + { + "chunk": 0, + "text": "override prompt for chunk 0", + "lora_scales": {"path/to/lora.safetensors": 0.5}, + "vace_frames_offset": 0, + "vace_frames_shape": [1, 3, 12, 320, 576] + } + ], + "pre_processor_id": null, + "post_processor_id": null +} +``` + +Request-level fields are global defaults. `chunk_specs` entries override any field for a specific chunk index. Only fields that change need to be specified — prompts are sticky (last-set persists). + +See `schema.py` for the full `GenerateRequest` and `ChunkSpec` field definitions. diff --git a/scripts/test_generate_endpoint.py b/scripts/test_generate_endpoint.py new file mode 100644 index 000000000..753df0e8e --- /dev/null +++ b/scripts/test_generate_endpoint.py @@ -0,0 +1,401 @@ +"""Test script for the /api/v1/batch endpoint. + +Usage: + python test_generate_endpoint.py + python test_generate_endpoint.py --list +""" + +import json +import sys +import time + +import numpy as np +import requests +from diffusers.utils import export_to_video + +from scope.core.pipelines.video import load_video +from scope.server.schema import ( + GenerateRequest, + LoRAConfig, + LoRAMergeMode, + PipelineLoadRequest, + PipelineStatusResponse, +) + +# ============================================================================= +# Configuration +# ============================================================================= + +SERVER_URL = "http://localhost:8000" +DEFAULT_PIPELINE = "longlive" + +# Asset paths (tests skip gracefully if missing) +LORA = r"C:\Users\ryanf\.daydream-scope\models\lora\lora\output\model_245889_dissolve_imgvid\dissolve-000064.safetensors" +TEST_VIDEO = r"frontend\public\assets\test.mp4" +VACE_CONDITIONING_VIDEO = r"controlnet_test\control_frames_depth.mp4" +MASK_VIDEO = r"src\scope\core\pipelines\longlive\vace_tests\static_mask_half_white_half_black.mp4" + +# ============================================================================= +# Test Definitions +# ============================================================================= + +TESTS = { + "lora": { + "description": "LoRA strength ramping over chunks", + "pipeline": "longlive", + "resolution": (576, 320), + "num_frames": 96, + "prompt": "a woman dissolving into particles, ethereal, magical transformation", + "lora": LORA, + "lora_ramp": [0.0, 0.15, 0.3, 0.45, 0.6, 0.75, 0.9, 1.0], + "manage_cache": False, + }, + "v2v": { + "description": "Video-to-video transformation", + "resolution": (512, 512), + "num_frames": 48, + "prompt": "A 3D animated scene. A **panda** sitting in the grass, looking around.", + "input_video": TEST_VIDEO, + "noise_scale": 0.6, + }, + "v2v_lora": { + "description": "Video-to-video with LoRA ramp (0 -> 1.5 -> 0)", + "resolution": (512, 512), + "num_frames": 120, + "prompt": "a woman made of ral-dissolve, dissolving into particles", + "input_video": TEST_VIDEO, + "noise_scale": 0.7, + "lora": LORA, + "lora_ramp": [0.0, 0.3, 0.6, 1.0, 1.5, 1.5, 1.0, 0.6, 0.3, 0.0], + }, + "vace_conditioning": { + "description": "VACE structural conditioning (depth, pose, etc.)", + "resolution": (576, 320), + "num_frames": 48, + "prompt": "a cat walking towards the camera", + "vace_frames": VACE_CONDITIONING_VIDEO, + "vace_context_scale": 1.5, + }, + "inpainting": { + "description": "VACE inpainting with mask", + "resolution": (512, 512), + "num_frames": 48, + "prompt": "fireball doom flames", + "vace_frames": TEST_VIDEO, + "vace_masks": MASK_VIDEO, + }, +} + +# ============================================================================= +# Helpers +# ============================================================================= + + +def upload_video_for_v2v(path: str, height: int, width: int) -> str: + """Load and upload video for video-to-video mode. Returns input_path.""" + tensor = load_video(path, resize_hw=(height, width), normalize=False) + arr = tensor.permute(1, 2, 3, 0).numpy().astype(np.uint8) + num_frames, h, w, c = arr.shape + + response = requests.post( + f"{SERVER_URL}/api/v1/batch/upload", + data=arr.tobytes(), + headers={ + "Content-Type": "application/octet-stream", + "X-Video-Frames": str(num_frames), + "X-Video-Height": str(h), + "X-Video-Width": str(w), + "X-Video-Channels": str(c), + }, + timeout=300, + ) + response.raise_for_status() + return response.json()["input_path"] + + +def upload_vace_data( + vace_frames_path: str | None, + vace_masks_path: str | None, + height: int, + width: int, + num_frames: int, + chunk_size: int, + vace_context_scale: float = 1.0, +) -> tuple[str, list[dict]]: + """Load VACE frames/masks, pack into blob, upload, return (data_blob_path, chunk_specs).""" + blob = bytearray() + num_chunks = (num_frames + chunk_size - 1) // chunk_size + chunk_specs = [] + + # Load tensors + vace_frames_tensor = None + vace_masks_tensor = None + if vace_frames_path: + vace_frames_tensor = load_video(vace_frames_path, resize_hw=(height, width)) + vace_frames_tensor = vace_frames_tensor.unsqueeze(0).numpy().astype(np.float32) + if vace_masks_path: + masks_tensor = load_video(vace_masks_path, resize_hw=(height, width)) + vace_masks_tensor = (masks_tensor[0:1].unsqueeze(0).numpy() > 0.0).astype( + np.float32 + ) + + for chunk_idx in range(num_chunks): + spec = {"chunk": chunk_idx, "vace_temporally_locked": True} + start = chunk_idx * chunk_size + end = start + chunk_size + + if vace_frames_tensor is not None: + sliced = vace_frames_tensor[:, :, start:end, :, :] + spec["vace_frames_offset"] = len(blob) + spec["vace_frames_shape"] = list(sliced.shape) + blob.extend(sliced.tobytes()) + + if vace_masks_tensor is not None: + sliced_masks = vace_masks_tensor[:, :, start:end, :, :] + spec["vace_masks_offset"] = len(blob) + spec["vace_masks_shape"] = list(sliced_masks.shape) + blob.extend(sliced_masks.tobytes()) + + if vace_context_scale != 1.0: + spec["vace_context_scale"] = vace_context_scale + + chunk_specs.append(spec) + + # Upload blob + response = requests.post( + f"{SERVER_URL}/api/v1/batch/upload-data", + data=bytes(blob), + headers={"Content-Type": "application/octet-stream"}, + timeout=300, + ) + response.raise_for_status() + data_blob_path = response.json()["data_blob_path"] + + return data_blob_path, chunk_specs + + +def parse_sse_events(response): + """Parse SSE events using iter_content (handles large payloads).""" + buffer = "" + event_type = None + data_lines = [] + + for chunk in response.iter_content(chunk_size=None, decode_unicode=True): + buffer += chunk + while "\n" in buffer: + line, buffer = buffer.split("\n", 1) + line = line.rstrip("\r") + + if line.startswith("event:"): + event_type = line[6:].strip() + elif line.startswith("data:"): + data_lines.append(line[5:].strip()) + elif line == "": + if data_lines: + yield (event_type or "message", json.loads("\n".join(data_lines))) + event_type = None + data_lines = [] + + +def wait_for_pipeline(timeout: int = 300): + """Wait for pipeline to finish loading.""" + start = time.time() + while time.time() - start < timeout: + resp = requests.get(f"{SERVER_URL}/api/v1/pipeline/status") + status = PipelineStatusResponse.model_validate(resp.json()) + if status.status.value == "loaded": + return time.time() - start + if status.status.value == "error": + raise RuntimeError(f"Pipeline failed: {status.error}") + time.sleep(1) + raise TimeoutError(f"Pipeline did not load within {timeout}s") + + +def download_video(output_path: str) -> np.ndarray: + """Download generated video from server.""" + response = requests.get( + f"{SERVER_URL}/api/v1/batch/download", + params={"path": output_path}, + timeout=300, + ) + response.raise_for_status() + + num_frames = int(response.headers.get("X-Video-Frames", 0)) + height = int(response.headers.get("X-Video-Height", 0)) + width = int(response.headers.get("X-Video-Width", 0)) + channels = int(response.headers.get("X-Video-Channels", 3)) + + # Skip header (ndim + shape) + content = response.content + header_size = 4 + 4 * 4 + video_bytes = content[header_size:] + + return np.frombuffer(video_bytes, dtype=np.uint8).reshape( + (num_frames, height, width, channels) + ) + + +# ============================================================================= +# Test Runner +# ============================================================================= + + +def run_test(name: str): + """Run a single test by name.""" + if name not in TESTS: + print(f"Unknown test: {name}") + print(f"Available: {', '.join(TESTS.keys())}") + return + + cfg = TESTS[name] + width, height = cfg.get("resolution", (576, 320)) + pipeline_id = cfg.get("pipeline", DEFAULT_PIPELINE) + + print(f"\n{'=' * 60}") + print(f"Test: {name}") + print(f"Description: {cfg['description']}") + print(f"{'=' * 60}") + + # Build LoRA config if specified + loras = None + lora_scales = None + if "lora" in cfg: + loras = [ + LoRAConfig( + path=cfg["lora"], scale=0.0, merge_mode=LoRAMergeMode.RUNTIME_PEFT + ) + ] + if "lora_ramp" in cfg: + lora_scales = cfg["lora_ramp"] + print(f"LoRA ramp: {lora_scales}") + + # Load pipeline + print(f"Loading pipeline '{pipeline_id}' at {width}x{height}...") + load_params = {"height": height, "width": width} + if loras: + load_params["loras"] = [lora.model_dump() for lora in loras] + load_params["lora_merge_mode"] = "runtime_peft" + request = PipelineLoadRequest(pipeline_ids=[pipeline_id], load_params=load_params) + requests.post( + f"{SERVER_URL}/api/v1/pipeline/load", json=request.model_dump(mode="json") + ).raise_for_status() + load_time = wait_for_pipeline() + print(f"Pipeline loaded in {load_time:.1f}s") + + # Build request kwargs + request_kwargs = { + "pipeline_id": pipeline_id, + "prompt": cfg["prompt"], + "num_frames": cfg["num_frames"], + "noise_scale": cfg.get("noise_scale", 0.7), + "vace_context_scale": cfg.get("vace_context_scale", 1.0), + "manage_cache": cfg.get("manage_cache", True), + } + + # Upload input video if specified + if "input_video" in cfg: + input_path = upload_video_for_v2v(cfg["input_video"], height, width) + request_kwargs["input_path"] = input_path + print(f"Input video uploaded: {input_path}") + + # Build chunk_specs for LoRA ramp + chunk_specs = [] + if lora_scales and "lora" in cfg: + for i, scale in enumerate(lora_scales): + chunk_specs.append( + { + "chunk": i, + "lora_scales": {cfg["lora"]: scale}, + } + ) + + # Handle VACE data + if "vace_frames" in cfg or "vace_masks" in cfg: + # Assume chunk_size=12 (default for longlive) + chunk_size = 12 + data_blob_path, vace_specs = upload_vace_data( + vace_frames_path=cfg.get("vace_frames"), + vace_masks_path=cfg.get("vace_masks"), + height=height, + width=width, + num_frames=cfg["num_frames"], + chunk_size=chunk_size, + vace_context_scale=cfg.get("vace_context_scale", 1.0), + ) + request_kwargs["data_blob_path"] = data_blob_path + # Merge VACE specs into chunk_specs + existing_chunks = {s["chunk"] for s in chunk_specs} + for vs in vace_specs: + if vs["chunk"] in existing_chunks: + # Merge into existing spec + for cs in chunk_specs: + if cs["chunk"] == vs["chunk"]: + cs.update(vs) + break + else: + chunk_specs.append(vs) + print(f"VACE data uploaded: {data_blob_path}") + + if chunk_specs: + chunk_specs.sort(key=lambda s: s["chunk"]) + request_kwargs["chunk_specs"] = chunk_specs + + gen_request = GenerateRequest(**request_kwargs) + + print(f"Generating {cfg['num_frames']} frames...") + start = time.time() + + with requests.post( + f"{SERVER_URL}/api/v1/batch", + json=gen_request.model_dump(exclude_none=True), + stream=True, + headers={"Accept": "text/event-stream"}, + ) as resp: + resp.raise_for_status() + result = None + for event_type, data in parse_sse_events(resp): + if event_type == "progress": + print( + f" Chunk {data['chunk']}/{data['total_chunks']}: {data['fps']:.1f} fps" + ) + elif event_type == "complete": + result = data + break + elif event_type == "error": + raise RuntimeError(f"Generation failed: {data['error']}") + + if result is None: + raise RuntimeError("No complete event received") + + # Download and save + if "output_path" in result: + video = download_video(result["output_path"]) + video_float = video.astype(np.float32) / 255.0 + else: + raise RuntimeError("No output_path in result") + + output_path = f"test_{name}.mp4" + export_to_video(video_float, output_path, fps=16) + + print(f"\nComplete in {time.time() - start:.1f}s") + print(f"Output: {output_path} ({result['video_shape']})") + + +def main(): + if len(sys.argv) < 2 or sys.argv[1] == "--list": + print("Available tests:") + for name, cfg in TESTS.items(): + print(f" {name:20} - {cfg['description']}") + print("\nUsage: python test_generate_endpoint.py ") + print(" python test_generate_endpoint.py all") + return + + if sys.argv[1] == "all": + for name in TESTS: + run_test(name) + else: + run_test(sys.argv[1]) + + +if __name__ == "__main__": + main() diff --git a/src/scope/server/app.py b/src/scope/server/app.py index ac8080f58..272852f84 100644 --- a/src/scope/server/app.py +++ b/src/scope/server/app.py @@ -78,6 +78,7 @@ AssetsResponse, CloudConnectRequest, CloudStatusResponse, + GenerateRequest, HardwareInfoResponse, HealthResponse, IceCandidateRequest, @@ -1126,6 +1127,233 @@ def download_in_background(): raise HTTPException(status_code=500, detail=str(e)) from e +@app.post("/api/v1/batch") +async def batch_video( + request: "GenerateRequest", + pipeline_manager: "PipelineManager" = Depends(get_pipeline_manager), +): + """Generate video frames in batch mode with SSE progress streaming.""" + from .batch import batch_video_stream, is_batch_active + + if is_batch_active(): + raise HTTPException( + status_code=409, + detail="A generation is already in progress. Cancel it first or wait for completion.", + ) + + status_info = await pipeline_manager.get_status_info_async() + if status_info["status"] != "loaded": + raise HTTPException( + status_code=400, + detail="Pipeline not loaded. Please load pipeline first.", + ) + + return StreamingResponse( + batch_video_stream(request, pipeline_manager, status_info, logger), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) + + +@app.post("/api/v1/batch/cancel") +async def cancel_batch(): + """Cancel the current video generation after the current chunk completes.""" + from .batch import cancel_batch as _cancel_batch + + _cancel_batch() + return {"status": "cancelling"} + + +@app.post("/api/v1/batch/upload") +async def upload_video_for_batch(request: Request): + """Upload a video for batch generation (file-based transfer for large videos). + + Accepts raw binary video data with metadata headers: + - X-Video-Frames: number of frames (T) + - X-Video-Height: frame height (H) + - X-Video-Width: frame width (W) + - X-Video-Channels: number of channels (C), typically 3 for RGB + + Video data should be raw uint8 bytes in THWC order. + + Returns input_path to use in the batch request. + """ + from .recording import TEMP_FILE_PREFIXES, RecordingManager + from .schema import VideoUploadResponse + + try: + # Get video dimensions from headers + num_frames = int(request.headers.get("X-Video-Frames", 0)) + height = int(request.headers.get("X-Video-Height", 0)) + width = int(request.headers.get("X-Video-Width", 0)) + channels = int(request.headers.get("X-Video-Channels", 3)) + + if not all([num_frames, height, width]): + raise HTTPException( + status_code=400, + detail="Missing required headers: X-Video-Frames, X-Video-Height, X-Video-Width", + ) + + expected_size = num_frames * height * width * channels + shape = (num_frames, height, width, channels) + + # Create temp file (reuse recording pattern) + file_path = RecordingManager._create_temp_file( + ".bin", TEMP_FILE_PREFIXES["batch_input"] + ) + + # Stream body to file + with open(file_path, "wb") as f: + # Write header: ndim (4 bytes) + shape (ndim * 4 bytes) + f.write(len(shape).to_bytes(4, "little")) + for dim in shape: + f.write(dim.to_bytes(4, "little")) + + # Stream video data + bytes_written = 0 + async for chunk in request.stream(): + f.write(chunk) + bytes_written += len(chunk) + + if bytes_written != expected_size: + Path(file_path).unlink(missing_ok=True) + raise HTTPException( + status_code=400, + detail=f"Video data size mismatch: expected {expected_size}, got {bytes_written}", + ) + + logger.info(f"Uploaded video: {file_path} (shape: {shape})") + + return VideoUploadResponse( + input_path=file_path, + num_frames=num_frames, + shape=list(shape), + ) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error uploading video: {e}") + raise HTTPException(status_code=500, detail=str(e)) from e + + +@app.post("/api/v1/batch/upload-data") +async def upload_data_blob(request: Request): + """Upload binary data blob for batch generation. + + Accepts raw binary data containing VACE frames/masks, input video, or other + array data referenced by ChunkSpec offsets in the generate request. + + Returns data_blob_path to use in the batch request. + """ + + from .recording import TEMP_FILE_PREFIXES, RecordingManager + from .schema import DataUploadResponse + + try: + # Create temp file + file_path = RecordingManager._create_temp_file( + ".bin", TEMP_FILE_PREFIXES["batch_data"] + ) + + from .batch import MAX_DATA_BLOB_BYTES + + # Stream body to file with size limit + bytes_written = 0 + with open(file_path, "wb") as f: + async for chunk in request.stream(): + bytes_written += len(chunk) + if bytes_written > MAX_DATA_BLOB_BYTES: + f.close() + Path(file_path).unlink(missing_ok=True) + raise HTTPException( + status_code=413, + detail=f"Data blob exceeds maximum size of {MAX_DATA_BLOB_BYTES} bytes", + ) + f.write(chunk) + + if bytes_written == 0: + Path(file_path).unlink(missing_ok=True) + raise HTTPException(status_code=400, detail="Empty request body") + + logger.info(f"Uploaded data blob: {file_path} ({bytes_written} bytes)") + + return DataUploadResponse( + data_blob_path=file_path, + size_bytes=bytes_written, + ) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error uploading data blob: {e}") + raise HTTPException(status_code=500, detail=str(e)) from e + + +@app.get("/api/v1/batch/download") +async def download_generated_video( + path: str = Query(..., description="Path to output video file"), + background_tasks: BackgroundTasks = None, +): + """Download a generated video by path. + + Returns raw binary video data with metadata headers: + - X-Video-Frames: number of frames (T) + - X-Video-Height: frame height (H) + - X-Video-Width: frame width (W) + - X-Video-Channels: number of channels (C) + + Video data is raw uint8 bytes in THWC order. + """ + import tempfile + + from .recording import TEMP_FILE_PREFIXES, cleanup_temp_file + + try: + file_path = Path(path) + + # Security: only allow files in temp dir with our prefix + temp_dir = Path(tempfile.gettempdir()) + if not file_path.is_relative_to(temp_dir): + raise HTTPException(status_code=403, detail="Invalid file path") + if not file_path.name.startswith(TEMP_FILE_PREFIXES["batch_output"]): + raise HTTPException(status_code=403, detail="Invalid file path") + + if not file_path.exists(): + raise HTTPException(status_code=404, detail="Output video not found") + + # Read header to get shape + with open(file_path, "rb") as f: + ndim = int.from_bytes(f.read(4), "little") + shape = tuple(int.from_bytes(f.read(4), "little") for _ in range(ndim)) + + # Schedule cleanup after download + if background_tasks: + background_tasks.add_task(cleanup_temp_file, str(file_path)) + + # Return file with metadata headers + return FileResponse( + file_path, + media_type="application/octet-stream", + headers={ + "X-Video-Frames": str(shape[0]), + "X-Video-Height": str(shape[1]), + "X-Video-Width": str(shape[2]), + "X-Video-Channels": str(shape[3]) if len(shape) > 3 else "3", + }, + ) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error downloading generated video: {e}") + raise HTTPException(status_code=500, detail=str(e)) from e + + def is_spout_available() -> bool: """Check if Spout is available (native Windows only, not WSL).""" return sys.platform == "win32" diff --git a/src/scope/server/batch.py b/src/scope/server/batch.py new file mode 100644 index 000000000..a5cfbd802 --- /dev/null +++ b/src/scope/server/batch.py @@ -0,0 +1,780 @@ +"""Video generation service for batch mode with chunked processing.""" + +import concurrent.futures +import json +import queue +import threading +import time +from collections.abc import Iterator +from dataclasses import dataclass, field +from pathlib import Path +from typing import IO, TYPE_CHECKING + +import numpy as np +import torch + +# Cancellation support (single-client, so one event suffices) +_cancel_event = threading.Event() + +# Generation lock (single-client: only one generation at a time) +_batch_lock = threading.Lock() + +# Max data blob upload size (2 GB) +MAX_DATA_BLOB_BYTES = 2 * 1024 * 1024 * 1024 + + +def cancel_batch(): + """Signal the current generation to stop after the current chunk.""" + _cancel_event.set() + + +def is_batch_cancelled() -> bool: + """Check if cancellation has been requested.""" + return _cancel_event.is_set() + + +def is_batch_active() -> bool: + """Check if a generation is currently in progress.""" + return _batch_lock.locked() + + +# Defaults +DEFAULT_HEIGHT = 320 +DEFAULT_WIDTH = 576 +DEFAULT_CHUNK_SIZE = 12 +DEFAULT_SEED = 42 +DEFAULT_NOISE_SCALE = 0.7 +PROMPT_WEIGHT = 100 + +if TYPE_CHECKING: + from logging import Logger + + from .pipeline_manager import PipelineManager + from .schema import ChunkSpec, GenerateRequest + + +# --------------------------------------------------------------------------- +# Array utilities +# --------------------------------------------------------------------------- + + +def loop_to_length(arr: np.ndarray, target: int, axis: int) -> np.ndarray: + """Tile array along axis to reach target length.""" + current = arr.shape[axis] + if current >= target: + return arr + repeats = (target + current - 1) // current + tiled = np.concatenate([arr] * repeats, axis=axis) + slices = [slice(None)] * arr.ndim + slices[axis] = slice(0, target) + return tiled[tuple(slices)] + + +def pad_chunk(arr: np.ndarray, target_size: int, axis: int) -> np.ndarray: + """Pad array with last frame along axis to reach target size.""" + current = arr.shape[axis] + if current >= target_size: + return arr + slices = [slice(None)] * arr.ndim + slices[axis] = slice(-1, None) + last_frame = arr[tuple(slices)] + padding = np.repeat(last_frame, target_size - current, axis=axis) + return np.concatenate([arr, padding], axis=axis) + + +# --------------------------------------------------------------------------- +# SSE helpers +# --------------------------------------------------------------------------- + + +def sse_event(event_type: str, data: dict) -> str: + """Format a server-sent event.""" + return f"event: {event_type}\ndata: {json.dumps(data)}\n\n" + + +# --------------------------------------------------------------------------- +# Dataclasses +# --------------------------------------------------------------------------- + + +@dataclass +class DecodedInputs: + """Decoded and preprocessed inputs for generation.""" + + input_video: np.ndarray | None = None + first_frames: dict[int, str] = field(default_factory=dict) + last_frames: dict[int, str] = field(default_factory=dict) + ref_images: dict[int, list[str]] = field(default_factory=dict) + prompts: dict[int, list[dict]] = field(default_factory=dict) + transitions: dict[int, dict] = field(default_factory=dict) + vace_chunk_specs: dict[int, dict] = field(default_factory=dict) + input_video_chunks: dict[int, np.ndarray] = field(default_factory=dict) + chunk_specs_map: "dict[int, ChunkSpec]" = field(default_factory=dict) + + +@dataclass +class BatchState: + """Mutable state accumulated during chunk-by-chunk generation.""" + + output_file: IO[bytes] + num_chunks: int + logger: "Logger" + total_frames: int = 0 + height: int | None = None + width: int | None = None + channels: int | None = None + latencies: list[float] = field(default_factory=list) + fps_measures: list[float] = field(default_factory=list) + + def build_chunk_sse(self, chunk_idx: int, chunk_latency: float) -> str: + """Build SSE progress event (call from main thread before write).""" + return sse_event( + "progress", + { + "chunk": chunk_idx + 1, + "total_chunks": self.num_chunks, + "latency": round(chunk_latency, 3), + }, + ) + + def write_chunk(self, result: dict, chunk_idx: int, chunk_latency: float) -> None: + """Write chunk output to file (safe to call from background thread).""" + chunk_output = result["video"] + num_output_frames = chunk_output.shape[0] + chunk_fps = num_output_frames / chunk_latency + + self.latencies.append(chunk_latency) + self.fps_measures.append(chunk_fps) + + self.logger.info( + f"Chunk {chunk_idx + 1}/{self.num_chunks}: " + f"{num_output_frames} frames, latency={chunk_latency:.2f}s, fps={chunk_fps:.2f}" + ) + + chunk_np = chunk_output.detach().cpu().numpy() + chunk_uint8 = (chunk_np * 255).clip(0, 255).astype(np.uint8) + self.output_file.write(chunk_uint8.tobytes()) + + self.total_frames += num_output_frames + if self.height is None: + self.height = chunk_np.shape[1] + self.width = chunk_np.shape[2] + self.channels = chunk_np.shape[3] + + @property + def output_shape(self) -> list[int]: + return [self.total_frames, self.height, self.width, self.channels] + + def log_summary(self): + """Log performance summary.""" + if not self.latencies: + return + avg_lat = sum(self.latencies) / len(self.latencies) + avg_fps = sum(self.fps_measures) / len(self.fps_measures) + self.logger.info( + f"=== Performance Summary ({self.num_chunks} chunks) ===\n" + f" Latency - Avg: {avg_lat:.2f}s, " + f"Max: {max(self.latencies):.2f}s, Min: {min(self.latencies):.2f}s\n" + f" FPS - Avg: {avg_fps:.2f}, " + f"Max: {max(self.fps_measures):.2f}, Min: {min(self.fps_measures):.2f}" + ) + + +# --------------------------------------------------------------------------- +# Input decoding +# --------------------------------------------------------------------------- + + +def load_video_from_file(file_path: str) -> np.ndarray: + """Load video from temp file with header (ndim + shape + raw uint8).""" + with open(file_path, "rb") as f: + ndim = int.from_bytes(f.read(4), "little") + shape = tuple(int.from_bytes(f.read(4), "little") for _ in range(ndim)) + data = np.frombuffer(f.read(), dtype=np.uint8).reshape(shape) + return data + + +def _read_blob_array( + blob: bytes, offset: int, shape: list[int], dtype=np.float32 +) -> np.ndarray: + """Read a contiguous array from a binary blob at a given offset.""" + count = 1 + for d in shape: + count *= d + return np.frombuffer(blob, dtype=dtype, count=count, offset=offset).reshape(shape) + + +def decode_inputs( + request: "GenerateRequest", num_frames: int, logger: "Logger" +) -> DecodedInputs: + """Decode all inputs from request using unified ChunkSpec.""" + inputs = DecodedInputs() + + # Input video from file path + if request.input_path: + logger.info(f"Loading input video from file: {request.input_path}") + inputs.input_video = load_video_from_file(request.input_path) + inputs.input_video = loop_to_length(inputs.input_video, num_frames, axis=0) + + # Default prompt + if isinstance(request.prompt, str): + inputs.prompts = {0: [{"text": request.prompt, "weight": PROMPT_WEIGHT}]} + else: + inputs.prompts = { + 0: [{"text": p.text, "weight": p.weight} for p in request.prompt] + } + + # Load binary blob if provided + blob: bytes | None = None + if request.data_blob_path: + import tempfile + + from .recording import TEMP_FILE_PREFIXES + + blob_path = Path(request.data_blob_path) + temp_dir = Path(tempfile.gettempdir()) + if not blob_path.is_relative_to(temp_dir) or not blob_path.name.startswith( + TEMP_FILE_PREFIXES["batch_data"] + ): + raise ValueError( + f"Invalid data_blob_path: must be a temp file with prefix {TEMP_FILE_PREFIXES['batch_data']}" + ) + with open(blob_path, "rb") as f: + blob = f.read() + logger.info( + f"decode_inputs: Loaded data blob from {request.data_blob_path} ({len(blob)} bytes)" + ) + + # Process chunk specs — single loop, single source of truth + for spec in request.chunk_specs or []: + inputs.chunk_specs_map[spec.chunk] = spec + + # Prompts + if spec.prompts: + inputs.prompts[spec.chunk] = [ + {"text": p.text, "weight": p.weight} for p in spec.prompts + ] + elif spec.text: + inputs.prompts[spec.chunk] = [{"text": spec.text, "weight": PROMPT_WEIGHT}] + + # Transitions + if spec.transition_target_prompts: + inputs.transitions[spec.chunk] = { + "target_prompts": [ + {"text": p.text, "weight": p.weight} + for p in spec.transition_target_prompts + ], + "num_steps": spec.transition_num_steps or 4, + "temporal_interpolation_method": spec.transition_method or "linear", + } + + # Keyframes + if spec.first_frame_image: + inputs.first_frames[spec.chunk] = spec.first_frame_image + if spec.last_frame_image: + inputs.last_frames[spec.chunk] = spec.last_frame_image + if spec.vace_ref_images: + inputs.ref_images[spec.chunk] = spec.vace_ref_images + + # VACE from blob + if blob is not None and spec.vace_frames_offset is not None: + decoded: dict = {"vace_temporally_locked": spec.vace_temporally_locked} + if spec.vace_frames_shape and spec.vace_frames_offset is not None: + arr = _read_blob_array( + blob, spec.vace_frames_offset, spec.vace_frames_shape + ) + decoded["frames"] = arr + logger.info( + f"decode_inputs: chunk {spec.chunk} VACE frames shape={arr.shape}" + ) + if spec.vace_masks_shape and spec.vace_masks_offset is not None: + arr = _read_blob_array( + blob, spec.vace_masks_offset, spec.vace_masks_shape + ) + decoded["masks"] = arr + logger.info( + f"decode_inputs: chunk {spec.chunk} VACE masks shape={arr.shape}" + ) + if spec.vace_context_scale is not None: + decoded["context_scale"] = spec.vace_context_scale + inputs.vace_chunk_specs[spec.chunk] = decoded + + # Input video from blob (per-chunk video-to-video) + if ( + blob is not None + and spec.input_video_offset is not None + and spec.input_video_shape is not None + ): + inputs.input_video_chunks[spec.chunk] = _read_blob_array( + blob, spec.input_video_offset, spec.input_video_shape, dtype=np.uint8 + ) + + logger.info( + f"decode_inputs: prompts={list(inputs.prompts.keys())}, " + f"transitions={list(inputs.transitions.keys())}, " + f"vace_specs={list(inputs.vace_chunk_specs.keys())}, " + f"input_video_chunks={list(inputs.input_video_chunks.keys())}, " + f"first_frames={list(inputs.first_frames.keys())}, " + f"last_frames={list(inputs.last_frames.keys())}" + ) + + return inputs + + +# --------------------------------------------------------------------------- +# Chunk kwargs builder +# --------------------------------------------------------------------------- + + +def _resolve(spec, attr: str, request, fallback=None): + """Return per-chunk spec value if set, else request-level value, else fallback.""" + if spec is not None: + val = getattr(spec, attr, None) + if val is not None: + return val + val = getattr(request, attr, None) + return val if val is not None else fallback + + +def build_chunk_kwargs( + request: "GenerateRequest", + inputs: DecodedInputs, + chunk_idx: int, + chunk_size: int, + start_frame: int, + end_frame: int, + status_info: dict, + device: torch.device, + dtype: torch.dtype, + logger: "Logger", +) -> dict: + """Build pipeline kwargs for a single chunk. + + Per-chunk ChunkSpec values override request-level globals. + """ + spec = inputs.chunk_specs_map.get(chunk_idx) + load_params = status_info.get("load_params", {}) + + kwargs: dict = { + "height": request.height + if request.height is not None + else load_params.get("height", DEFAULT_HEIGHT), + "width": request.width + if request.width is not None + else load_params.get("width", DEFAULT_WIDTH), + "base_seed": _resolve(spec, "seed", request, DEFAULT_SEED), + "init_cache": chunk_idx == 0 or (spec is not None and spec.reset_cache), + "manage_cache": _resolve(spec, "manage_cache", request, True), + } + + # Prompt (sticky — only send when it changes) + if chunk_idx in inputs.prompts: + kwargs["prompts"] = inputs.prompts[chunk_idx] + + # Temporal transition + if chunk_idx in inputs.transitions: + kwargs["transition"] = inputs.transitions[chunk_idx] + + if request.denoising_steps: + kwargs["denoising_step_list"] = request.denoising_steps + + # Video-to-video: per-chunk input video takes priority over global + if chunk_idx in inputs.input_video_chunks: + chunk_frames = pad_chunk( + inputs.input_video_chunks[chunk_idx], chunk_size, axis=0 + ) + kwargs["video"] = [torch.from_numpy(f).unsqueeze(0) for f in chunk_frames] + kwargs["noise_scale"] = _resolve( + spec, "noise_scale", request, DEFAULT_NOISE_SCALE + ) + logger.info( + f"Chunk {chunk_idx}: Using per-chunk input video ({chunk_frames.shape[0]} frames)" + ) + elif inputs.input_video is not None: + chunk_frames = pad_chunk( + inputs.input_video[start_frame:end_frame], chunk_size, axis=0 + ) + kwargs["video"] = [torch.from_numpy(f).unsqueeze(0) for f in chunk_frames] + kwargs["noise_scale"] = _resolve( + spec, "noise_scale", request, DEFAULT_NOISE_SCALE + ) + else: + kwargs["num_frames"] = chunk_size + + kwargs["vace_context_scale"] = _resolve(spec, "vace_context_scale", request, 1.0) + kwargs["prompt_interpolation_method"] = _resolve( + spec, "prompt_interpolation_method", request, "linear" + ) + + # Optional overrides (only include in kwargs when non-None) + noise_ctrl = _resolve(spec, "noise_controller", request) + if noise_ctrl is not None: + kwargs["noise_controller"] = noise_ctrl + + kv_bias = _resolve(spec, "kv_cache_attention_bias", request) + if kv_bias is not None: + kwargs["kv_cache_attention_bias"] = kv_bias + + if request.vace_use_input_video is not None: + kwargs["vace_use_input_video"] = request.vace_use_input_video + + # LoRA scales: per-chunk spec overrides global + lora_scales = spec.lora_scales if spec and spec.lora_scales else request.lora_scales + if lora_scales: + kwargs["lora_scales"] = [ + {"path": p, "scale": s} for p, s in lora_scales.items() + ] + for p, s in lora_scales.items(): + logger.info(f"Chunk {chunk_idx}: LoRA scale={s:.3f} for {Path(p).name}") + + # Keyframes + if chunk_idx in inputs.first_frames: + kwargs["first_frame_image"] = inputs.first_frames[chunk_idx] + kwargs["extension_mode"] = ( + "firstlastframe" if chunk_idx in inputs.last_frames else "firstframe" + ) + if chunk_idx in inputs.last_frames: + kwargs["last_frame_image"] = inputs.last_frames[chunk_idx] + if chunk_idx not in inputs.first_frames: + kwargs["extension_mode"] = "lastframe" + if chunk_idx in inputs.ref_images: + kwargs["vace_ref_images"] = inputs.ref_images[chunk_idx] + + # VACE conditioning from blob + if chunk_idx in inputs.vace_chunk_specs: + vace_spec = inputs.vace_chunk_specs[chunk_idx] + if "frames" in vace_spec: + frames = pad_chunk(vace_spec["frames"], chunk_size, axis=2) + kwargs["vace_input_frames"] = torch.from_numpy(frames).to(device, dtype) + if "masks" in vace_spec: + masks = pad_chunk(vace_spec["masks"], chunk_size, axis=2) + kwargs["vace_input_masks"] = torch.from_numpy(masks).to(device, dtype) + if "context_scale" in vace_spec: + kwargs["vace_context_scale"] = vace_spec["context_scale"] + + return kwargs + + +# --------------------------------------------------------------------------- +# Chunk logging +# --------------------------------------------------------------------------- + +# (key, format_string) — format_string uses {v} for the value +_CHUNK_LOG_ENTRIES = [ + ("init_cache", "Resetting cache (init_cache=True)", lambda v: v), + ("extension_mode", "Extension mode: {v}", None), + ("vace_context_scale", "VACE context scale: {v}", lambda v: v != 1.0), + ("vace_use_input_video", "VACE use input video: {v}", None), + ("denoising_step_list", "Denoising steps: {v}", None), + ("noise_controller", "Using noise controller: {v}", None), + ("kv_cache_attention_bias", "KV cache attention bias: {v}", None), +] + + +def _log_chunk_info(kwargs: dict, chunk_idx: int, num_chunks: int, logger: "Logger"): + """Log detailed chunk information.""" + prefix = f"generate: Chunk {chunk_idx}" + logger.info(f"generate: Starting chunk {chunk_idx + 1}/{num_chunks}") + + # Structured entries + if "prompts" in kwargs: + logger.info(f"{prefix}: Prompt → {[p['text'] for p in kwargs['prompts']]}") + if "transition" in kwargs: + t = kwargs["transition"] + logger.info( + f"{prefix}: Transition → {[p['text'] for p in t['target_prompts']]} " + f"over {t['num_steps']} steps ({t['temporal_interpolation_method']})" + ) + if "first_frame_image" in kwargs: + logger.info(f"{prefix}: Using first frame keyframe") + if "last_frame_image" in kwargs: + logger.info(f"{prefix}: Using last frame keyframe") + if "vace_ref_images" in kwargs: + logger.info( + f"{prefix}: Using {len(kwargs['vace_ref_images'])} VACE reference images" + ) + if "vace_input_frames" in kwargs: + logger.info( + f"{prefix}: VACE input frames shape: {kwargs['vace_input_frames'].shape}" + ) + if "vace_input_masks" in kwargs: + logger.info( + f"{prefix}: VACE input masks shape: {kwargs['vace_input_masks'].shape}" + ) + if "video" in kwargs: + logger.info( + f"{prefix}: Video-to-video ({len(kwargs['video'])} frames, " + f"noise_scale={kwargs.get('noise_scale', DEFAULT_NOISE_SCALE)})" + ) + elif "num_frames" in kwargs: + logger.info(f"{prefix}: Text-to-video ({kwargs['num_frames']} frames)") + + # Table-driven simple entries + for key, msg, condition in _CHUNK_LOG_ENTRIES: + if key in kwargs: + v = kwargs[key] + if condition is None or condition(v): + logger.info(f"{prefix}: {msg.format(v=v)}") + + +# --------------------------------------------------------------------------- +# Generation engine +# --------------------------------------------------------------------------- + + +def _batch_chunks( + request: "GenerateRequest", + pipeline, + pipeline_manager: "PipelineManager", + inputs: DecodedInputs, + num_chunks: int, + chunk_size: int, + status_info: dict, + device: torch.device, + dtype: torch.dtype, + state: BatchState, + logger: "Logger", +) -> Iterator[str]: + """Process chunks through a processor chain, yielding SSE events. + + Always uses PipelineProcessor — when there are no pre/post processors + the chain is just [main_pipeline]. + """ + from .pipeline_processor import _SENTINEL, PipelineProcessor + + # Build processor chain: [pre?] → main → [post?] + processors: list[PipelineProcessor] = [] + + if request.pre_processor_id: + pre_pipeline = pipeline_manager.get_pipeline_by_id(request.pre_processor_id) + processors.append( + PipelineProcessor( + pipeline=pre_pipeline, + pipeline_id=request.pre_processor_id, + batch_mode=True, + ) + ) + logger.info(f"Pre-processor: {request.pre_processor_id}") + + processors.append( + PipelineProcessor( + pipeline=pipeline, pipeline_id=request.pipeline_id, batch_mode=True + ) + ) + + if request.post_processor_id: + post_pipeline = pipeline_manager.get_pipeline_by_id(request.post_processor_id) + processors.append( + PipelineProcessor( + pipeline=post_pipeline, + pipeline_id=request.post_processor_id, + batch_mode=True, + ) + ) + logger.info(f"Post-processor: {request.post_processor_id}") + + # Chain and start + for i in range(len(processors) - 1): + processors[i].set_next_processor(processors[i + 1]) + for proc in processors: + proc.start() + + first_proc = processors[0] + last_proc = processors[-1] + + write_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) + write_future: concurrent.futures.Future | None = None + pending_sse: str | None = None + + try: + for chunk_idx in range(num_chunks): + if _cancel_event.is_set(): + logger.info("Generation cancelled by user") + yield sse_event( + "cancelled", + { + "chunk": chunk_idx, + "total_chunks": num_chunks, + "frames_completed": state.total_frames, + }, + ) + return + + start_frame = chunk_idx * chunk_size + end_frame = min(start_frame + chunk_size, request.num_frames) + + kwargs = build_chunk_kwargs( + request, + inputs, + chunk_idx, + chunk_size, + start_frame, + end_frame, + status_info, + device, + dtype, + logger, + ) + _log_chunk_info(kwargs, chunk_idx, num_chunks, logger) + + chunk_start = time.time() + + first_proc.input_queue.put(kwargs) + + # Collect result from last processor + while True: + try: + result = last_proc.output_queue.get(timeout=1.0) + break + except queue.Empty: + if _cancel_event.is_set(): + return + continue + + chunk_latency = time.time() - chunk_start + + # Wait for previous async write before starting a new one + if write_future is not None: + write_future.result() + if pending_sse is not None: + yield pending_sse + + # Offload CPU transfer + disk I/O to background thread + pending_sse = state.build_chunk_sse(chunk_idx, chunk_latency) + write_future = write_executor.submit( + state.write_chunk, result, chunk_idx, chunk_latency + ) + + # Wait for final write + if write_future is not None: + write_future.result() + if pending_sse is not None: + yield pending_sse + + # Signal end of input + first_proc.input_queue.put(_SENTINEL) + + finally: + write_executor.shutdown(wait=True) + for proc in processors: + proc.stop() + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + + +def batch_video_stream( + request: "GenerateRequest", + pipeline_manager: "PipelineManager", + status_info: dict, + logger: "Logger", +) -> Iterator[str]: + """Generate video frames, yielding SSE events. + + Writes output to temp file incrementally, returns output_path for download. + Only one generation can run at a time (single-client). + """ + if not _batch_lock.acquire(blocking=False): + yield sse_event("error", {"error": "A generation is already in progress"}) + return + + _cancel_event.clear() + output_file_path = None + completed = False + + try: + pipeline = pipeline_manager.get_pipeline_by_id(request.pipeline_id) + + # Determine chunk size from pipeline + has_video = request.input_path is not None + requirements = pipeline.prepare(video=[] if has_video else None) + chunk_size = requirements.input_size if requirements else DEFAULT_CHUNK_SIZE + num_chunks = (request.num_frames + chunk_size - 1) // chunk_size + + inputs = decode_inputs(request, request.num_frames, logger) + + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + dtype = torch.bfloat16 + + # Create output file with placeholder header + from .recording import TEMP_FILE_PREFIXES, RecordingManager + + output_file_path = RecordingManager._create_temp_file( + ".bin", TEMP_FILE_PREFIXES["batch_output"] + ) + output_file = open(output_file_path, "wb") + + # Header: ndim (4 bytes) + shape (4 * ndim bytes) = 20 bytes for [T, H, W, C] + header_size = 4 + 4 * 4 + output_file.write(b"\x00" * header_size) + + state = BatchState( + output_file=output_file, num_chunks=num_chunks, logger=logger + ) + + try: + yield from _batch_chunks( + request, + pipeline, + pipeline_manager, + inputs, + num_chunks, + chunk_size, + status_info, + device, + dtype, + state, + logger, + ) + + # Update header with actual shape + output_file.seek(0) + shape = tuple(state.output_shape) + output_file.write(len(shape).to_bytes(4, "little")) + for dim in shape: + output_file.write(dim.to_bytes(4, "little")) + + finally: + output_file.close() + + logger.info(f"Output video saved: {output_file_path}") + state.log_summary() + + yield sse_event( + "complete", + { + "output_path": output_file_path, + "video_shape": state.output_shape, + "num_frames": state.total_frames, + "num_chunks": num_chunks, + "chunk_size": chunk_size, + }, + ) + completed = True + + except Exception as e: + logger.exception("Error generating video") + yield sse_event("error", {"error": str(e)}) + + finally: + # Clean up uploaded files + for path_attr in ("data_blob_path", "input_path"): + path = getattr(request, path_attr, None) + if path: + try: + Path(path).unlink(missing_ok=True) + logger.info(f"Cleaned up {path_attr}: {path}") + except Exception as e: + logger.warning(f"Failed to clean up {path_attr}: {e}") + + # Clean up output file if generation didn't complete + if not completed and output_file_path: + try: + Path(output_file_path).unlink(missing_ok=True) + logger.info(f"Cleaned up orphaned output file: {output_file_path}") + except Exception as e: + logger.warning(f"Failed to clean up output file: {e}") + + _batch_lock.release() diff --git a/src/scope/server/pipeline_processor.py b/src/scope/server/pipeline_processor.py index b11638996..1ac243a91 100644 --- a/src/scope/server/pipeline_processor.py +++ b/src/scope/server/pipeline_processor.py @@ -23,6 +23,9 @@ SLEEP_TIME = 0.01 +# Sentinel value to signal end of batch input +_SENTINEL = object() + # FPS calculation constants MIN_FPS = 1.0 # Minimum FPS to prevent division by zero MAX_FPS = 60.0 # Maximum FPS cap @@ -42,6 +45,7 @@ def __init__( user_id: str | None = None, connection_id: str | None = None, connection_info: dict | None = None, + batch_mode: bool = False, ): """Initialize a pipeline processor. @@ -60,10 +64,15 @@ def __init__( self.user_id = user_id self.connection_id = connection_id self.connection_info = connection_info + self.batch_mode = batch_mode # Each processor creates its own queues - self.input_queue = queue.Queue(maxsize=30) - self.output_queue = queue.Queue(maxsize=8) + if batch_mode: + self.input_queue = queue.Queue(maxsize=2) + self.output_queue = queue.Queue(maxsize=2) + else: + self.input_queue = queue.Queue(maxsize=30) + self.output_queue = queue.Queue(maxsize=8) # Lock to protect input_queue assignment for thread-safe reference swapping self.input_queue_lock = threading.Lock() @@ -226,6 +235,10 @@ def worker_loop(self): """Main worker loop that processes frames.""" logger.info(f"Worker thread started for pipeline: {self.pipeline_id}") + if self.batch_mode: + self._worker_loop_batch() + return + while self.running and not self.shutdown_event.is_set(): try: self.process_chunk() @@ -267,6 +280,69 @@ def worker_loop(self): logger.info(f"Worker thread stopped for pipeline: {self.pipeline_id}") + def _worker_loop_batch(self): + """Batch-mode worker loop: processes chunk kwargs dicts from queue.""" + while self.running and not self.shutdown_event.is_set(): + try: + item = self.input_queue.get(timeout=1.0) + except queue.Empty: + continue + if item is _SENTINEL: + if self.next_processor: + self.next_processor.input_queue.put(_SENTINEL) + break + try: + self.process_chunk_batch(item) + except Exception as e: + logger.error( + f"Error in batch processing for {self.pipeline_id}: {e}", + exc_info=True, + ) + if not self._is_recoverable(e): + break + logger.info(f"Batch worker thread stopped for pipeline: {self.pipeline_id}") + + def process_chunk_batch(self, chunk_kwargs: dict): + """Process a single chunk in batch mode. + + Args: + chunk_kwargs: Pre-built kwargs dict for the pipeline call. + """ + dtype = torch.bfloat16 + with torch.amp.autocast("cuda", dtype=dtype): + result = self.pipeline(**chunk_kwargs) + + # Forward extra params to downstream processor + extra_params = {k: v for k, v in result.items() if k != "video"} + if extra_params and self.next_processor is not None: + self.next_processor.update_parameters(extra_params) + + if self.next_processor is not None: + # Convert video output to list-of-frames format for next pipeline. + # Pipeline __call__ expects video as list of [1, H, W, C] uint8 tensors + # (same format as real-time path: process_chunk converts to uint8 + # before putting on output queue, and preprocess_chunk expects [0, 255]). + video = result.get("video") + if video is not None: + video_uint8 = ( + (video * 255.0) + .clamp(0, 255) + .to(dtype=torch.uint8) + .contiguous() + .detach() + ) + next_kwargs = dict(chunk_kwargs) + next_kwargs["video"] = [f.unsqueeze(0) for f in video_uint8] + # Remove keys that are only valid for the original pipeline + for key in ("init_cache", "num_frames"): + next_kwargs.pop(key, None) + self.output_queue.put(next_kwargs) + else: + self.output_queue.put(chunk_kwargs) + else: + # Last processor: put raw result for collection + self.output_queue.put(result) + def prepare_chunk( self, input_queue_ref: queue.Queue, chunk_size: int ) -> list[torch.Tensor]: diff --git a/src/scope/server/recording.py b/src/scope/server/recording.py index 5ac39fc2b..568239314 100644 --- a/src/scope/server/recording.py +++ b/src/scope/server/recording.py @@ -17,6 +17,9 @@ TEMP_FILE_PREFIXES = { "recording": "scope_recording_", "download": "scope_download_", + "batch_input": "scope_gen_input_", + "batch_output": "scope_gen_output_", + "batch_data": "scope_gen_data_", } # Environment variables @@ -437,6 +440,9 @@ def cleanup_recording_files(): patterns = [ f"{TEMP_FILE_PREFIXES['recording']}*.mp4", f"{TEMP_FILE_PREFIXES['download']}*.mp4", + f"{TEMP_FILE_PREFIXES['batch_input']}*.bin", + f"{TEMP_FILE_PREFIXES['batch_output']}*.bin", + f"{TEMP_FILE_PREFIXES['batch_data']}*.bin", ] deleted_count = 0 diff --git a/src/scope/server/schema.py b/src/scope/server/schema.py index c50127aab..2c8b8848b 100644 --- a/src/scope/server/schema.py +++ b/src/scope/server/schema.py @@ -816,3 +816,234 @@ class ApiKeySetResponse(BaseModel): class ApiKeyDeleteResponse(BaseModel): success: bool message: str + + +class ChunkSpec(BaseModel): + """Unified per-chunk specification. All fields optional — only set what changes.""" + + chunk: int = Field(..., ge=0, description="Chunk index (required)") + + # Prompt + text: str | None = Field( + default=None, + description="Simple prompt text (mutually exclusive with prompts)", + ) + prompts: list[PromptItem] | None = Field( + default=None, + description="Weighted prompt list for spatial blending (mutually exclusive with text)", + ) + prompt_interpolation_method: Literal["linear", "slerp"] | None = Field( + default=None, + description="Spatial interpolation method override for this chunk", + ) + + # Temporal transition + transition_target_prompts: list[PromptItem] | None = Field( + default=None, + description="Target prompt blend to interpolate to", + ) + transition_num_steps: int | None = Field( + default=None, + ge=0, + description="Number of generation calls to transition over (0 = instant)", + ) + transition_method: Literal["linear", "slerp"] | None = Field( + default=None, + description="Method for temporal interpolation between blends", + ) + + # Keyframe images (paths) + first_frame_image: str | None = Field( + default=None, description="Path to first frame reference image" + ) + last_frame_image: str | None = Field( + default=None, description="Path to last frame reference image" + ) + vace_ref_images: list[str] | None = Field( + default=None, description="List of reference image paths for VACE conditioning" + ) + + # Generation parameters + seed: int | None = Field(default=None, description="Random seed override") + noise_scale: float | None = Field(default=None, description="Noise scale override") + kv_cache_attention_bias: float | None = Field( + default=None, description="KV cache attention bias override" + ) + reset_cache: bool = Field( + default=False, description="Force cache reset at this chunk" + ) + noise_controller: bool | None = Field( + default=None, description="Noise controller override" + ) + manage_cache: bool | None = Field( + default=None, description="Cache management override" + ) + + # LoRA scales: {path: scale} + lora_scales: dict[str, float] | None = Field( + default=None, description="LoRA scales by path for this chunk" + ) + + # VACE conditioning (offsets into binary blob) + vace_context_scale: float | None = Field( + default=None, description="VACE context scale override" + ) + vace_temporally_locked: bool = Field( + default=True, + description="When True, frames/masks are sliced temporally. When False, used as-is.", + ) + vace_frames_shape: list[int] | None = Field( + default=None, description="Shape of VACE frames ([1, C, T, H, W] float32)" + ) + vace_frames_offset: int | None = Field( + default=None, description="Byte offset into blob for VACE frames" + ) + vace_masks_shape: list[int] | None = Field( + default=None, description="Shape of VACE masks ([1, 1, T, H, W] float32)" + ) + vace_masks_offset: int | None = Field( + default=None, description="Byte offset into blob for VACE masks" + ) + + # Input video for this chunk (offset into binary blob) + input_video_shape: list[int] | None = Field( + default=None, description="Shape of per-chunk input video [T, H, W, C] uint8" + ) + input_video_offset: int | None = Field( + default=None, description="Byte offset into blob for per-chunk input video" + ) + + +class VideoUploadResponse(BaseModel): + """Response after uploading a video for generation.""" + + input_path: str = Field( + ..., description="Path to uploaded video file for generate request" + ) + num_frames: int = Field(..., description="Number of frames in uploaded video") + shape: list[int] = Field(..., description="Video shape [T, H, W, C]") + + +class GenerateRequest(BaseModel): + """Request for batch video generation.""" + + pipeline_id: str = Field(..., description="Pipeline ID to use for generation") + prompt: str | list[PromptItem] = Field( + ..., + description="Text prompt for generation (sent on chunk 0). Can be a simple string or a list of weighted prompts for spatial blending.", + ) + num_frames: int = Field( + default=64, + ge=1, + le=10000, + description="Total number of frames to generate", + ) + height: int | None = Field( + default=None, + ge=64, + le=2048, + description="Output height (defaults to pipeline's native resolution)", + ) + width: int | None = Field( + default=None, + ge=64, + le=2048, + description="Output width (defaults to pipeline's native resolution)", + ) + + # Per-chunk specs (replaces all scattered per-chunk lists) + chunk_specs: list[ChunkSpec] | None = Field( + default=None, + description="Unified per-chunk specifications. Each entry can override prompt, transition, " + "keyframes, generation parameters, LoRA scales, and VACE conditioning for a specific chunk.", + ) + + # Binary blob path (from /generate/upload-data) + data_blob_path: str | None = Field( + default=None, + description="Path to uploaded binary data blob (from /generate/upload-data). " + "Contains raw arrays referenced by chunk_specs offsets (VACE frames/masks, input video).", + ) + + # Global defaults (applied to chunks without per-chunk override) + seed: int = Field( + default=42, + description="Random seed (default for all chunks).", + ) + noise_scale: float = Field( + default=0.7, + description="Noise scale for video-to-video mode (default for all chunks).", + ) + manage_cache: bool = Field( + default=True, + description="Enable automatic cache management.", + ) + noise_controller: bool | None = Field( + default=None, + description="Enable automatic noise scale adjustment based on motion detection.", + ) + kv_cache_attention_bias: float | None = Field( + default=None, + description="Controls reliance on past frames in cache. Lower values mitigate error accumulation.", + ) + prompt_interpolation_method: Literal["linear", "slerp"] = Field( + default="linear", + description="Spatial interpolation method for blending multiple prompts.", + ) + vace_context_scale: float = Field( + default=1.0, + description="VACE context scale (default for all chunks).", + ) + vace_use_input_video: bool | None = Field( + default=None, + description="When enabled in video-to-video mode, input video is used for VACE conditioning.", + ) + denoising_steps: list[int] | None = Field( + default=None, + description="Denoising timesteps (e.g., [1000, 750, 500, 250])", + ) + lora_scales: dict[str, float] | None = Field( + default=None, + description="Global LoRA scales by path (default for all chunks).", + ) + + # Video-to-video input (file-based upload) + input_path: str | None = Field( + default=None, + description="Path to uploaded video file (from /generate/upload).", + ) + + # Processors + pre_processor_id: str | None = Field( + default=None, + description="Pipeline ID for pre-processing each chunk before the main pipeline.", + ) + post_processor_id: str | None = Field( + default=None, + description="Pipeline ID for post-processing each chunk after the main pipeline.", + ) + + +class DataUploadResponse(BaseModel): + """Response after uploading binary data blob for generate request.""" + + data_blob_path: str = Field( + ..., description="Path to uploaded data blob file for generate request" + ) + size_bytes: int = Field(..., description="Size of the uploaded blob in bytes") + + +class GenerateResponse(BaseModel): + """Response from batch video generation.""" + + output_path: str = Field( + ..., + description="Path to output video file for download via /generate/download.", + ) + video_shape: list[int] = Field( + ..., + description="Shape of output video [T, H, W, C]", + ) + num_frames: int = Field(..., description="Number of frames generated") + num_chunks: int = Field(..., description="Number of chunks processed") + chunk_size: int = Field(..., description="Frames per chunk")