diff --git a/.devcontainer/post-create.sh b/.devcontainer/post-create.sh index 84787613b..bc2fdfa6a 100755 --- a/.devcontainer/post-create.sh +++ b/.devcontainer/post-create.sh @@ -5,7 +5,7 @@ cd /workspace/comfystream # Install Comfystream in editable mode. echo -e "\e[32mInstalling Comfystream in editable mode...\e[0m" -/workspace/miniconda3/envs/comfystream/bin/python3 -m pip install -e . --root-user-action=ignore > /dev/null +/workspace/miniconda3/envs/comfystream/bin/python3 -m pip install -e . -c src/comfystream/scripts/constraints.txt --root-user-action=ignore > /dev/null # Install npm packages if needed if [ ! -d "/workspace/comfystream/ui/node_modules" ]; then diff --git a/.editorconfig b/.editorconfig index c39bef288..17f476700 100644 --- a/.editorconfig +++ b/.editorconfig @@ -13,7 +13,9 @@ insert_final_newline = true insert_final_newline = unset [*.py] +indent_style = space indent_size = 4 +trim_trailing_whitespace = false [workflows/comfy*/*.json] insert_final_newline = unset diff --git a/.github/workflows/docker.yaml b/.github/workflows/docker.yaml index 608a0edc4..f231448d7 100644 --- a/.github/workflows/docker.yaml +++ b/.github/workflows/docker.yaml @@ -9,6 +9,12 @@ on: - main tags: - "v*" + workflow_dispatch: + inputs: + nodes_config: + description: "Custom nodes config filename or path for base image build" + required: false + default: "nodes.yaml" concurrency: group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} @@ -27,7 +33,7 @@ jobs: runs-on: [self-hosted, linux, gpu] steps: - name: Check out code - uses: actions/checkout@v4 + uses: actions/checkout@v6 with: fetch-depth: 0 ref: ${{ github.event.pull_request.head.sha }} @@ -78,33 +84,12 @@ jobs: file: docker/Dockerfile.base build-args: | CACHEBUST=${{ github.run_id }} + NODES_CONFIG=${{ github.event_name == 'workflow_dispatch' && github.event.inputs.nodes_config || 'nodes.yaml' }} labels: ${{ steps.meta.outputs.labels }} annotations: ${{ steps.meta.outputs.annotations }} cache-from: type=registry,ref=livepeer/comfyui-base:build-cache cache-to: type=registry,mode=max,ref=livepeer/comfyui-base:build-cache - trigger: - name: Trigger ai-runner workflow - needs: base - if: ${{ github.repository == 'livepeer/comfystream' }} - runs-on: ubuntu-latest - steps: - - name: Send workflow dispatch event to ai-runner - uses: actions/github-script@v7 - with: - github-token: ${{ secrets.CI_GITHUB_TOKEN }} - script: | - await github.rest.actions.createWorkflowDispatch({ - owner: context.repo.owner, - repo: "ai-runner", - workflow_id: "comfyui-trigger.yaml", - ref: "main", - inputs: { - "comfyui-base-digest": "${{ needs.base.outputs.image-digest }}", - "triggering-branch": "${{ github.head_ref || github.ref_name }}", - }, - }); - comfystream: name: comfystream image needs: base @@ -115,7 +100,7 @@ jobs: runs-on: [self-hosted, linux, amd64] steps: - name: Check out code - uses: actions/checkout@v4 + uses: actions/checkout@v6 with: fetch-depth: 0 ref: ${{ github.event.pull_request.head.sha }} diff --git a/.github/workflows/opencv-cuda-artifact.yml b/.github/workflows/opencv-cuda-artifact.yml new file mode 100644 index 000000000..df7071903 --- /dev/null +++ b/.github/workflows/opencv-cuda-artifact.yml @@ -0,0 +1,184 @@ +name: Build OpenCV CUDA Artifact + +on: + workflow_dispatch: + inputs: + python_version: + description: 'Python version to build' + required: false + default: '3.12' + type: string + cuda_version: + description: 'CUDA version to build' + required: false + default: '12.8' + type: string + +concurrency: + group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} + cancel-in-progress: true + +env: + PYTHON_VERSION: ${{ github.event.inputs.python_version || '3.12' }} + CUDA_VERSION: ${{ github.event.inputs.cuda_version || '12.8' }} + +jobs: + build-opencv-artifact: + name: Build OpenCV CUDA Artifact + runs-on: [self-hosted, linux, gpu] + + steps: + - name: Checkout code + uses: actions/checkout@v6 + with: + fetch-depth: 0 + ref: ${{ github.event.pull_request.head.sha || github.sha }} + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Build OpenCV CUDA Docker image + uses: docker/build-push-action@v6 + with: + context: . + file: docker/Dockerfile.opencv + build-args: | + BASE_IMAGE=nvidia/cuda:${{ env.CUDA_VERSION }}.1-cudnn-devel-ubuntu22.04 + PYTHON_VERSION=${{ env.PYTHON_VERSION }} + CUDA_VERSION=${{ env.CUDA_VERSION }} + tags: opencv-cuda-artifact:latest + load: true + cache-from: type=gha + cache-to: type=gha,mode=max + + - name: Extract OpenCV libraries from Docker container + run: | + echo "Creating temporary container..." + docker create --name opencv-extract opencv-cuda-artifact:latest + + echo "Creating workspace directory..." + mkdir -p ./opencv-artifacts + + # Try to copy from system installation + docker cp opencv-extract:/usr/local/lib/python${{ env.PYTHON_VERSION }}/site-packages/cv2 ./opencv-artifacts/cv2 || echo "cv2 not found in system site-packages" + + echo "Copying OpenCV source directories..." + # Copy opencv and opencv_contrib source directories + docker cp opencv-extract:/workspace/opencv ./opencv-artifacts/ || echo "opencv source not found" + docker cp opencv-extract:/workspace/opencv_contrib ./opencv-artifacts/ || echo "opencv_contrib source not found" + + echo "Cleaning up container..." + docker rm opencv-extract + + echo "Contents of opencv-artifacts:" + ls -la ./opencv-artifacts/ + + - name: Create tarball artifact + run: | + echo "Creating opencv-cuda-release.tar.gz..." + cd ./opencv-artifacts + tar -czf ../opencv-cuda-release.tar.gz . || echo "Failed to create tarball" + cd .. + + echo "Generating checksums..." + sha256sum opencv-cuda-release.tar.gz > opencv-cuda-release.tar.gz.sha256 + md5sum opencv-cuda-release.tar.gz > opencv-cuda-release.tar.gz.md5 + + echo "Verifying archive contents..." + echo "Archive size: $(ls -lh opencv-cuda-release.tar.gz | awk '{print $5}')" + echo "First 20 files in archive:" + tar -tzf opencv-cuda-release.tar.gz | head -20 + + - name: Extract and verify tarball + run: | + echo "Testing tarball extraction..." + mkdir -p test-extract + cd test-extract + tar -xzf ../opencv-cuda-release.tar.gz + echo "Extracted contents:" + find . -maxdepth 2 -type d | sort + cd .. + rm -rf test-extract + + - name: Upload OpenCV CUDA Release Artifact + uses: actions/upload-artifact@v5 + with: + name: opencv-cuda-release-python${{ env.PYTHON_VERSION }}-cuda${{ env.CUDA_VERSION }}-${{ github.sha }} + path: | + opencv-cuda-release.tar.gz + opencv-cuda-release.tar.gz.sha256 + opencv-cuda-release.tar.gz.md5 + retention-days: 30 + + - name: Create Release Notes + run: | + cat > release-info.txt << EOF + OpenCV CUDA Release Artifact + + Build Details: + - Python Version: ${{ env.PYTHON_VERSION }} + - CUDA Version: ${{ env.CUDA_VERSION }} + - OpenCV Version: 4.11.0 + - Built on: $(date -u) + - Commit SHA: ${{ github.sha }} + + Contents: + - cv2: Python OpenCV module with CUDA support + - opencv: OpenCV source code + - opencv_contrib: OpenCV contrib modules source + - lib: Compiled OpenCV libraries + - include: OpenCV header files + + Installation: + 1. Download opencv-cuda-release.tar.gz + 2. Extract: tar -xzf opencv-cuda-release.tar.gz + 3. Copy cv2 to your Python environment's site-packages + 4. Ensure CUDA libraries are in your system PATH + + Checksums: + SHA256: $(cat opencv-cuda-release.tar.gz.sha256) + MD5: $(cat opencv-cuda-release.tar.gz.md5) + EOF + + - name: Upload Release Info + uses: actions/upload-artifact@v5 + with: + name: release-info-python${{ env.PYTHON_VERSION }}-cuda${{ env.CUDA_VERSION }}-${{ github.sha }} + path: release-info.txt + retention-days: 30 + + create-release-draft: + name: Create Release Draft + needs: build-opencv-artifact + runs-on: ubuntu-latest + if: github.event_name == 'push' && github.ref == 'refs/heads/main' + + steps: + - name: Checkout code + uses: actions/checkout@v6 + + - name: Download artifacts + uses: actions/download-artifact@v6 + with: + name: opencv-cuda-release-python${{ env.PYTHON_VERSION }}-cuda${{ env.CUDA_VERSION }}-${{ github.sha }} + path: ./artifacts + + - name: Download release info + uses: actions/download-artifact@v6 + with: + name: release-info-python${{ env.PYTHON_VERSION }}-cuda${{ env.CUDA_VERSION }}-${{ github.sha }} + path: ./artifacts + + - name: Create Release Draft + uses: softprops/action-gh-release@v2 + with: + tag_name: opencv-cuda-v${{ env.PYTHON_VERSION }}-${{ env.CUDA_VERSION }}-${{ github.run_number }} + name: OpenCV CUDA Release - Python ${{ env.PYTHON_VERSION }} CUDA ${{ env.CUDA_VERSION }} + body_path: ./artifacts/release-info.txt + draft: true + files: | + ./artifacts/opencv-cuda-release.tar.gz + ./artifacts/opencv-cuda-release.tar.gz.sha256 + ./artifacts/opencv-cuda-release.tar.gz.md5 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file diff --git a/.github/workflows/publish-comfyui-node.yaml b/.github/workflows/publish-comfyui-node.yaml new file mode 100644 index 000000000..0b609aee3 --- /dev/null +++ b/.github/workflows/publish-comfyui-node.yaml @@ -0,0 +1,26 @@ +name: Publish ComfyUI Custom Node + +on: + workflow_dispatch: + +permissions: + contents: read + issues: write + +jobs: + publish-comfyui-node: + name: Publish Custom Node to ComfyUI registry + runs-on: ubuntu-latest + # Ensure this only runs on main branch + if: ${{ github.ref == 'refs/heads/main' }} + steps: + - name: Check out code + uses: actions/checkout@v6 + with: + submodules: true + + - name: Publish Custom Node + uses: Comfy-Org/publish-node-action@v1 + with: + ## Add your own personal access token to your Github Repository secrets and reference it here. + personal_access_token: ${{ secrets.REGISTRY_ACCESS_TOKEN }} diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 01900b71a..58d270929 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -12,9 +12,9 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout code - uses: actions/checkout@v4 + uses: actions/checkout@v6 - - uses: actions/setup-node@v4 + - uses: actions/setup-node@v6 with: node-version: 18 cache: npm @@ -38,7 +38,7 @@ jobs: cd - - name: Upload artifacts - uses: actions/upload-artifact@v4 + uses: actions/upload-artifact@v5 with: name: release-artifacts path: releases/ diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 98d27d262..98baa6cf3 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -19,20 +19,20 @@ jobs: runs-on: ubuntu-latest steps: - name: Check out code - uses: actions/checkout@v4 + uses: actions/checkout@v6 with: fetch-depth: 0 ref: ${{ github.event.pull_request.head.sha }} # Initializes the CodeQL tools for scanning. - name: Initialize CodeQL - uses: github/codeql-action/init@v3 + uses: github/codeql-action/init@v4 with: languages: typescript,javascript,python config-file: ./.github/codeql-config.yaml - name: Perform CodeQL Analysis - uses: github/codeql-action/analyze@v3 + uses: github/codeql-action/analyze@v4 editorconfig: @@ -40,7 +40,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Check out code - uses: actions/checkout@v4 + uses: actions/checkout@v6 with: # Check https://github.com/livepeer/go-livepeer/pull/1891 # for ref value discussion @@ -59,14 +59,14 @@ jobs: runs-on: ubuntu-latest steps: - name: Check out code - uses: actions/checkout@v4 + uses: actions/checkout@v6 with: # Check https://github.com/livepeer/go-livepeer/pull/1891 # for ref value discussion ref: ${{ github.event.pull_request.head.sha }} - name: Set up Python - uses: actions/setup-python@v5 + uses: actions/setup-python@v6 with: python-version: '3.12' cache: pip diff --git a/.husky/pre-commit b/.husky/pre-commit new file mode 100755 index 000000000..d9a28aeb7 --- /dev/null +++ b/.husky/pre-commit @@ -0,0 +1,3 @@ +#!/bin/sh +cd ui && npx lint-staged + diff --git a/.vscode/launch.json b/.vscode/launch.json index 4d442c585..f05e02f5e 100755 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -56,7 +56,7 @@ "env": { "ORCH_URL": "https://172.17.0.1:9995", "ORCH_SECRET": "orch-secret", - "CAPABILITY_NAME": "comfystream-byoc-processor", + "CAPABILITY_NAME": "comfystream", "CAPABILITY_DESCRIPTION": "ComfyUI streaming processor for BYOC mode", "CAPABILITY_URL": "http://172.17.0.1:8000", "CAPABILITY_PRICE_PER_UNIT": "0", diff --git a/README.md b/README.md index 52c864f13..a9edf4be5 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ This repo also includes a WebRTC server and UI that uses comfystream to support Refer to [.devcontainer/README.md](.devcontainer/README.md) to setup ComfyStream in a devcontainer using a pre-configured ComfyUI docker environment. -For other installation options, refer to [Install ComfyUI and ComfyStream](https://pipelines.livepeer.org/docs/technical/install/local-testing) in the Livepeer pipelines documentation. +For other installation options, refer to [Install ComfyUI and ComfyStream](https://docs.comfystream.org/technical/get-started/install) in the ComfyStream documentation. For additional information, refer to the remaining sections below. @@ -35,7 +35,7 @@ For additional information, refer to the remaining sections below. You can quickly deploy ComfyStream using the docker image `livepeer/comfystream` -Refer to the documentation at [https://pipelines.livepeer.org/docs/technical/getting-started/install-comfystream](https://pipelines.livepeer.org/docs/technical/getting-started/install-comfystream) for instructions to run locally or on a remote server. +Refer to the documentation at [https://docs.comfystream.org/technical/get-started/install](https://docs.comfystream.org/technical/get-started/install) for instructions to run locally or on a remote server. #### RunPod diff --git a/__init__.py b/__init__.py index 1215db9b3..7782d363f 100644 --- a/__init__.py +++ b/__init__.py @@ -4,4 +4,4 @@ # Import and expose node classes from .nodes import NODE_CLASS_MAPPINGS, NODE_DISPLAY_NAME_MAPPINGS -__all__ = ['NODE_CLASS_MAPPINGS', 'NODE_DISPLAY_NAME_MAPPINGS'] +__all__ = ["NODE_CLASS_MAPPINGS", "NODE_DISPLAY_NAME_MAPPINGS"] diff --git a/benchmark.py b/benchmark.py index 359a8c237..8e096a77f 100644 --- a/benchmark.py +++ b/benchmark.py @@ -1,15 +1,16 @@ -import av +import argparse +import asyncio import json -import time -import torch import logging -import asyncio -import argparse +import time + +import av import numpy as np +import torch from comfystream.client import ComfyStreamClient -logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s') +logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger() @@ -22,14 +23,30 @@ def create_dummy_video_frame(width, height): async def main(): parser = argparse.ArgumentParser(description="Benchmark ComfyStreamClient workflow execution.") - parser.add_argument("--workflow-path", default="./workflows/comfystream/tensor-utils-example-api.json", help="Path to the workflow JSON file.") + parser.add_argument( + "--workflow-path", + default="./workflows/comfystream/tensor-utils-example-api.json", + help="Path to the workflow JSON file.", + ) parser.add_argument("--num-requests", type=int, default=100, help="Number of requests to send.") - parser.add_argument("--fps", type=float, default=None, help="Frames per second for FPS-based benchmarking.") - parser.add_argument("--cwd", default="/workspace/ComfyUI", help="Current working directory for ComfyStreamClient.") + parser.add_argument( + "--fps", type=float, default=None, help="Frames per second for FPS-based benchmarking." + ) + parser.add_argument( + "--cwd", + default="/workspace/ComfyUI", + help="Current working directory for ComfyStreamClient.", + ) parser.add_argument("--width", type=int, default=512, help="Width of dummy video frames.") parser.add_argument("--height", type=int, default=512, help="Height of dummy video frames.") - parser.add_argument("--verbose", action="store_true", help="Enable verbose logging (shows progress for each request).") - parser.add_argument("--warmup-runs", type=int, default=5, help="Number of warm-up runs before benchmarking.") + parser.add_argument( + "--verbose", + action="store_true", + help="Enable verbose logging (shows progress for each request).", + ) + parser.add_argument( + "--warmup-runs", type=int, default=5, help="Number of warm-up runs before benchmarking." + ) args = parser.parse_args() @@ -44,7 +61,9 @@ async def main(): client = ComfyStreamClient(cwd=args.cwd) await client.set_prompts([prompt]) - logger.info(f"Starting benchmark with workflow: {args.workflow_path}, requests: {args.num_requests}, resolution: {args.width}x{args.height}, warmup runs: {args.warmup_runs}") + logger.info( + f"Starting benchmark with workflow: {args.workflow_path}, requests: {args.num_requests}, resolution: {args.width}x{args.height}, warmup runs: {args.warmup_runs}" + ) if args.warmup_runs > 0: logger.info(f"Running {args.warmup_runs} warm-up runs...") @@ -66,11 +85,13 @@ async def main(): await client.get_video_output() request_end_time = time.time() round_trip_times.append(request_end_time - request_start_time) - logger.debug(f"Request {i+1}/{args.num_requests} completed in {round_trip_times[-1]:.4f} seconds") + logger.debug( + f"Request {i + 1}/{args.num_requests} completed in {round_trip_times[-1]:.4f} seconds" + ) end_time = time.time() total_time = end_time - start_time - output_fps = args.num_requests / total_time if total_time > 0 else float('inf') + output_fps = args.num_requests / total_time if total_time > 0 else float("inf") # Calculate percentiles for sequential mode p50_rtt = np.percentile(round_trip_times, 50) @@ -79,17 +100,17 @@ async def main(): p95_rtt = np.percentile(round_trip_times, 95) p99_rtt = np.percentile(round_trip_times, 99) - print("\n" + "="*40) + print("\n" + "=" * 40) print("FPS Results:") - print("="*40) + print("=" * 40) print(f"Total requests: {args.num_requests}") print(f"Total time: {total_time:.4f} seconds") print(f"Actual Output FPS:{output_fps:.2f}") print(f"Total requests: {args.num_requests}") print(f"Total time: {total_time:.4f} seconds") - print("\n" + "="*40) + print("\n" + "=" * 40) print("Latency Results:") - print("="*40) + print("=" * 40) print(f"Average: {np.mean(round_trip_times):.4f}") print(f"Min: {np.min(round_trip_times):.4f}") print(f"Max: {np.max(round_trip_times):.4f}") @@ -99,7 +120,6 @@ async def main(): print(f"P95: {p95_rtt:.4f}") print(f"P99: {p99_rtt:.4f}") - else: # This is mainly used to stress test the ComfyUI client, gives us a good idea on how frame skipping etc is working on the client end. logger.info(f"Running FPS-based benchmark at {args.fps} FPS...") @@ -118,9 +138,11 @@ async def collect_outputs_task(): last_output_receive_time = time.time() received_frames_count += 1 - logger.debug(f"Received output frame {received_frames_count} at {last_output_receive_time - start_time:.4f} seconds") + logger.debug( + f"Received output frame {received_frames_count} at {last_output_receive_time - start_time:.4f} seconds" + ) except asyncio.TimeoutError: - logger.debug(f"Output collection task timed out after waiting for 5 seconds.") + logger.debug("Output collection task timed out after waiting for 5 seconds.") break except Exception as e: logger.debug(f"Output collection task finished due to exception: {e}") @@ -140,7 +162,9 @@ async def collect_outputs_task(): request_send_time = time.time() client.put_video_input(frame) - logger.debug(f"Sent request {i+1}/{args.num_requests} at {request_send_time - start_time:.4f} seconds") + logger.debug( + f"Sent request {i + 1}/{args.num_requests} at {request_send_time - start_time:.4f} seconds" + ) await output_collector_task @@ -150,11 +174,11 @@ async def collect_outputs_task(): elif received_frames_count == 0: output_fps = 0.0 else: - output_fps = float('inf') + output_fps = float("inf") - print("\n" + "="*40) + print("\n" + "=" * 40) print("FPS Results:") - print("="*40) + print("=" * 40) print(f"Target Input FPS: {args.fps:.2f}") print(f"Actual Output FPS:{output_fps:.2f} ({received_frames_count} frames received)") print(f"Total requests: {args.num_requests}") @@ -162,4 +186,4 @@ async def collect_outputs_task(): if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file + asyncio.run(main()) diff --git a/configs/QUICK_REFERENCE.md b/configs/QUICK_REFERENCE.md new file mode 100644 index 000000000..a4a12481b --- /dev/null +++ b/configs/QUICK_REFERENCE.md @@ -0,0 +1,74 @@ +# Quick Reference: Model Configuration + +## Single File vs Directory Download + +### Single File (Default) +```yaml +my-model: + name: "My Model" + url: "https://huggingface.co/user/repo/resolve/main/file.safetensors" + path: "loras/model.safetensors" +``` + +### Directory (Add `is_directory: true`) +```yaml +my-directory: + name: "My Directory" + url: "https://huggingface.co/user/repo/tree/main/folder" + path: "models/folder" + is_directory: true # ← Add this! +``` + +## URL Patterns + +| Download Type | URL Pattern | Example | +|---------------|-------------|---------| +| **Single File** | `/resolve/` | `https://huggingface.co/h94/IP-Adapter/resolve/main/models/ip-adapter_sd15.safetensors` | +| **Directory** | `/tree/` | `https://huggingface.co/h94/IP-Adapter/tree/main/models/image_encoder` | + +## Common Model Paths + +| Model Type | Path Pattern | +|------------|--------------| +| Checkpoints | `checkpoints/SD1.5/` | +| LoRAs | `loras/SD1.5/` | +| ControlNet | `controlnet/` | +| VAE | `vae/` or `vae_approx/` | +| IP-Adapter | `ipadapter/` | +| Text Encoders | `text_encoders/CLIPText/` | +| TensorRT/ONNX | `tensorrt/` | + +## IP-Adapter Example + +```yaml +models: + # Single file - IP-Adapter model + ip-adapter-sd15: + name: "IP Adapter SD15" + url: "https://huggingface.co/h94/IP-Adapter/resolve/main/models/ip-adapter_sd15.safetensors" + path: "ipadapter/ip-adapter_sd15.safetensors" + + # Directory - CLIP image encoder + clip-image-encoder: + name: "CLIP Image Encoder" + url: "https://huggingface.co/h94/IP-Adapter/tree/main/models/image_encoder" + path: "ipadapter/models/image_encoder" + is_directory: true +``` + +## Usage + +```bash +# Use a config +python src/comfystream/scripts/setup_models.py --config my-config.yaml + +# Use default config (models.yaml) +python src/comfystream/scripts/setup_models.py +``` + +## See Also + +- [DIRECTORY_DOWNLOADS.md](../DIRECTORY_DOWNLOADS.md) - Detailed directory download guide +- [models-ipadapter-example.yaml](models-ipadapter-example.yaml) - Complete working example +- [README.md](README.md) - Full configuration reference + diff --git a/configs/models-ipadapter.yaml b/configs/models-ipadapter.yaml new file mode 100644 index 000000000..b6f225d2e --- /dev/null +++ b/configs/models-ipadapter.yaml @@ -0,0 +1,45 @@ +models: + # Example: IP-Adapter setup with directory download + + # Single file download (regular) + ip-adapter-plus-sd15: + name: "IP Adapter SD15" + url: "https://huggingface.co/h94/IP-Adapter/resolve/main/models/ip-adapter-plus_sd15.safetensors" + path: "ipadapter/ip-adapter-plus_sd15.safetensors" + type: "ipadapter" + extra_files: + - url: "https://huggingface.co/h94/IP-Adapter/resolve/main/models/ip-adapter-plus_sd15.bin" + path: "ipadapter/ip-adapter-plus_sd15.bin" + + clip-image-encoder: + name: "CLIP Image Encoder" + url: "https://huggingface.co/h94/IP-Adapter/resolve/main/models/image_encoder/model.safetensors" + path: "ipadapter/image_encoder/model.safetensors" + type: "image_encoder" + extra_files: + - url: "https://huggingface.co/h94/IP-Adapter/resolve/main/models/image_encoder/config.json" + path: "ipadapter/image_encoder/config.json" + + # Base model + sd-turbo: + name: "SD-Turbo" + url: "https://huggingface.co/stabilityai/sd-turbo/resolve/main/sd_turbo.safetensors" + path: "checkpoints/SD1.5/sd_turbo.safetensors" + type: "checkpoint" + + PixelArtRedmond15V-PixelArt-PIXARFK.safetensors: + name: "PixelArtRedmond15V-PixelArt-PIXARFK" + url: "https://huggingface.co/artificialguybr/pixelartredmond-1-5v-pixel-art-loras-for-sd-1-5/resolve/ab43d9e2cf8c9240189f01e9cdc4ca341362500c/PixelArtRedmond15V-PixelArt-PIXARFK.safetensors" + path: "loras/SD1.5/PixelArtRedmond15V-PixelArt-PIXARFK.safetensors" + type: "lora" + + # TAESD for fast VAE + taesd: + name: "TAESD" + url: "https://huggingface.co/madebyollin/taesd/resolve/main/taesd_decoder.safetensors" + path: "vae_approx/taesd_decoder.safetensors" + type: "vae_approx" + extra_files: + - url: "https://huggingface.co/madebyollin/taesd/resolve/main/taesd_encoder.safetensors" + path: "vae_approx/taesd_encoder.safetensors" + diff --git a/configs/models.yaml b/configs/models.yaml index 09149f954..bbcf0cdc7 100644 --- a/configs/models.yaml +++ b/configs/models.yaml @@ -29,19 +29,19 @@ models: # TAESD models taesd: name: "TAESD" - url: "https://raw.githubusercontent.com/madebyollin/taesd/main/taesd_decoder.pth" - path: "vae_approx/taesd_decoder.pth" + url: "https://huggingface.co/madebyollin/taesd/resolve/main/taesd_decoder.safetensors" + path: "vae_approx/taesd_decoder.safetensors" type: "vae_approx" extra_files: - - url: "https://raw.githubusercontent.com/madebyollin/taesd/main/taesd_encoder.pth" - path: "vae_approx/taesd_encoder.pth" + - url: "https://huggingface.co/madebyollin/taesd/resolve/main/taesd_encoder.safetensors" + path: "vae_approx/taesd_encoder.safetensors" # ControlNet models controlnet-depth: name: "ControlNet Depth" url: "https://huggingface.co/comfyanonymous/ControlNet-v1-1_fp16_safetensors/resolve/main/control_v11f1p_sd15_depth_fp16.safetensors" path: "controlnet/control_v11f1p_sd15_depth_fp16.safetensors" - type: "controlnet" + type: "controlnet" controlnet-mediapipe-face: name: "ControlNet MediaPipe Face" @@ -74,8 +74,82 @@ models: path: "text_encoders/CLIPText/model.fp16.safetensors" type: "text_encoder" + # JoyVASA models for ComfyUI-FasterLivePortrait + joyvasa_motion_generator: + name: "JoyVASA Motion Generator" + url: "https://huggingface.co/jdh-algo/JoyVASA/resolve/main/motion_generator/motion_generator_hubert_chinese.pt?download=true" + path: "liveportrait_onnx/joyvasa_models/motion_generator_hubert_chinese.pt" + type: "torch" + + joyvasa_audio_model: + name: "JoyVASA Hubert Chinese" + url: "https://huggingface.co/TencentGameMate/chinese-hubert-base/resolve/main/chinese-hubert-base-fairseq-ckpt.pt?download=true" + path: "liveportrait_onnx/joyvasa_models/chinese-hubert-base-fairseq-ckpt.pt" + type: "torch" + + joyvasa_motion_template: + name: "JoyVASA Motion Template" + url: "https://huggingface.co/jdh-algo/JoyVASA/resolve/main/motion_template/motion_template.pkl?download=true" + path: "liveportrait_onnx/joyvasa_models/motion_template.pkl" + type: "pickle" + + # LivePortrait ONNX models - only necessary to build TRT engines + warping_spade: + name: "WarpingSpadeModel" + url: "https://huggingface.co/warmshao/FasterLivePortrait/resolve/main/liveportrait_onnx/warping_spade-fix.onnx?download=true" + path: "liveportrait_onnx/warping_spade-fix.onnx" + type: "onnx" + + motion_extractor: + name: "MotionExtractorModel" + url: "https://huggingface.co/warmshao/FasterLivePortrait/resolve/main/liveportrait_onnx/motion_extractor.onnx?download=true" + path: "liveportrait_onnx/motion_extractor.onnx" + type: "onnx" + + landmark: + name: "LandmarkModel" + url: "https://huggingface.co/warmshao/FasterLivePortrait/resolve/main/liveportrait_onnx/landmark.onnx?download=true" + path: "liveportrait_onnx/landmark.onnx" + type: "onnx" + + face_analysis_retinaface: + name: "FaceAnalysisModel - RetinaFace" + url: "https://huggingface.co/warmshao/FasterLivePortrait/resolve/main/liveportrait_onnx/retinaface_det_static.onnx?download=true" + path: "liveportrait_onnx/retinaface_det_static.onnx" + type: "onnx" + + face_analysis_2dpose: + name: "FaceAnalysisModel - 2DPose" + url: "https://huggingface.co/warmshao/FasterLivePortrait/resolve/main/liveportrait_onnx/face_2dpose_106_static.onnx?download=true" + path: "liveportrait_onnx/face_2dpose_106_static.onnx" + type: "onnx" + + appearance_feature_extractor: + name: "AppearanceFeatureExtractorModel" + url: "https://huggingface.co/warmshao/FasterLivePortrait/resolve/main/liveportrait_onnx/appearance_feature_extractor.onnx?download=true" + path: "liveportrait_onnx/appearance_feature_extractor.onnx" + type: "onnx" + + stitching: + name: "StitchingModel" + url: "https://huggingface.co/warmshao/FasterLivePortrait/resolve/main/liveportrait_onnx/stitching.onnx?download=true" + path: "liveportrait_onnx/stitching.onnx" + type: "onnx" + + stitching_eye_retarget: + name: "StitchingModel (Eye Retargeting)" + url: "https://huggingface.co/warmshao/FasterLivePortrait/resolve/main/liveportrait_onnx/stitching_eye.onnx?download=true" + path: "liveportrait_onnx/stitching_eye.onnx" + type: "onnx" + + stitching_lip_retarget: + name: "StitchingModel (Lip Retargeting)" + url: "https://huggingface.co/warmshao/FasterLivePortrait/resolve/main/liveportrait_onnx/stitching_lip.onnx?download=true" + path: "liveportrait_onnx/stitching_lip.onnx" + type: "onnx" + sd-turbo: name: "SD-Turbo" url: "https://huggingface.co/stabilityai/sd-turbo/resolve/main/sd_turbo.safetensors" path: "checkpoints/SD1.5/sd_turbo.safetensors" - type: "checkpoint" \ No newline at end of file + type: "checkpoint" diff --git a/configs/nodes-streamdiffusion.yaml b/configs/nodes-streamdiffusion.yaml new file mode 100644 index 000000000..23cebb603 --- /dev/null +++ b/configs/nodes-streamdiffusion.yaml @@ -0,0 +1,37 @@ +nodes: + # Minimal node configuration for faster builds + comfyui-tensorrt: + name: "ComfyUI TensorRT" + url: "https://github.com/yondonfu/ComfyUI_TensorRT.git" + branch: "quantization_with_controlnet_fixes" + type: "tensorrt" + dependencies: + - "tensorrt==10.12.0.36" + + comfyui-streamdiffusion: + name: "ComfyUI StreamDiffusion" + url: "https://github.com/RUFFY-369/ComfyUI-StreamDiffusion" + branch: "main" + type: "tensorrt" + + comfyui-torch-compile: + name: "ComfyUI Torch Compile" + url: "https://github.com/yondonfu/ComfyUI-Torch-Compile" + type: "tensorrt" + + comfyui_controlnet_aux: + name: "ComfyUI ControlNet Auxiliary" + url: "https://github.com/Fannovel16/comfyui_controlnet_aux" + type: "controlnet" + + comfyui-stream-pack: + name: "ComfyUI Stream Pack" + url: "https://github.com/livepeer/ComfyUI-Stream-Pack" + branch: "main" + type: "utility" + + rgthree-comfy: + name: "rgthree Comfy" + url: "https://github.com/rgthree/rgthree-comfy.git" + type: "utility" + diff --git a/configs/nodes.yaml b/configs/nodes.yaml index 49d422a57..fb7e77a85 100644 --- a/configs/nodes.yaml +++ b/configs/nodes.yaml @@ -5,8 +5,6 @@ nodes: url: "https://github.com/yondonfu/ComfyUI_TensorRT.git" branch: "quantization_with_controlnet_fixes" type: "tensorrt" - dependencies: - - "tensorrt==10.12.0.36" comfyui-depthanything-tensorrt: name: "ComfyUI DepthAnything TensorRT" @@ -19,6 +17,12 @@ nodes: branch: "main" type: "tensorrt" + comfyui-fasterliveportrait: + name: "ComfyUI FasterLivePortrait" + url: "https://github.com/pschroedl/ComfyUI-FasterLivePortrait.git" + branch: "main" + type: "tensorrt" + # Ryan's nodes comfyui-ryanontheinside: name: "ComfyUI RyanOnTheInside" diff --git a/docker/Dockerfile.base b/docker/Dockerfile.base index c8f6b7ff1..9bedd7185 100644 --- a/docker/Dockerfile.base +++ b/docker/Dockerfile.base @@ -1,32 +1,29 @@ ARG BASE_IMAGE=nvidia/cuda:12.8.1-cudnn-devel-ubuntu22.04 \ CONDA_VERSION=latest \ - PYTHON_VERSION=3.12 + PYTHON_VERSION=3.12 \ + NODES_CONFIG=nodes.yaml FROM "${BASE_IMAGE}" ARG CONDA_VERSION \ - PYTHON_VERSION + PYTHON_VERSION \ + NODES_CONFIG ENV DEBIAN_FRONTEND=noninteractive \ + TensorRT_ROOT=/opt/TensorRT-10.12.0.36 \ CONDA_VERSION="${CONDA_VERSION}" \ PATH="/workspace/miniconda3/bin:${PATH}" \ PYTHON_VERSION="${PYTHON_VERSION}" # System dependencies RUN apt update && apt install -yqq --no-install-recommends \ - git \ - wget \ - nano \ - socat \ - libsndfile1 \ - build-essential \ - llvm \ - tk-dev \ - libglvnd-dev \ - cmake \ - swig \ - libprotobuf-dev \ - protobuf-compiler \ + git wget nano socat \ + libsndfile1 build-essential llvm tk-dev \ + libglvnd-dev cmake swig libprotobuf-dev \ + protobuf-compiler libcairo2-dev libpango1.0-dev libgdk-pixbuf2.0-dev \ + libffi-dev libgirepository1.0-dev pkg-config libgflags-dev \ + libgoogle-glog-dev libjpeg-dev libavcodec-dev libavformat-dev \ + libavutil-dev libswscale-dev \ && rm -rf /var/lib/apt/lists/* #enable opengl support with nvidia gpu @@ -51,40 +48,68 @@ RUN mkdir -p /workspace/comfystream && \ RUN conda run -n comfystream --no-capture-output pip install --upgrade pip && \ conda run -n comfystream --no-capture-output pip install wheel +RUN apt-get remove --purge -y libcudnn9-cuda-12 libcudnn9-dev-cuda-12 || true && \ + apt-get autoremove -y && \ + rm -rf /var/lib/apt/lists/* + +# Install numpy<2.0.0 first +# to ensure numpy 2.0 is not installed automatically by another package +RUN conda run -n comfystream --no-capture-output pip install "numpy<2.0.0" + +# Install cuDNN 9.8 via conda to match base system version +# Caution: Mixed versions installed in environment (system/python) can cause CUDNN_STATUS_SUBLIBRARY_VERSION_MISMATCH errors +RUN conda install -n comfystream -y -c nvidia -c conda-forge cudnn=9.8 cuda-version=12.8 + # Copy only files needed for setup COPY ./src/comfystream/scripts /workspace/comfystream/src/comfystream/scripts COPY ./configs /workspace/comfystream/configs +# TensorRT SDK +WORKDIR /opt +RUN wget --progress=dot:giga \ + https://developer.nvidia.com/downloads/compute/machine-learning/tensorrt/10.12.0/tars/TensorRT-10.12.0.36.Linux.x86_64-gnu.cuda-12.9.tar.gz \ + && tar -xzf TensorRT-10.12.0.36.Linux.x86_64-gnu.cuda-12.9.tar.gz \ + && rm TensorRT-10.12.0.36.Linux.x86_64-gnu.cuda-12.9.tar.gz + +# Link libraries and update linker cache +RUN echo "${TensorRT_ROOT}/lib" > /etc/ld.so.conf.d/tensorrt.conf \ + && ldconfig + +# Install matching TensorRT Python bindings for CPython 3.12 +RUN conda run -n comfystream pip install --no-cache-dir \ + ${TensorRT_ROOT}/python/tensorrt-10.12.0.36-cp312-none-linux_x86_64.whl + # Clone ComfyUI -RUN git clone --branch v0.3.56 --depth 1 https://github.com/comfyanonymous/ComfyUI.git /workspace/ComfyUI +RUN git clone --branch v0.3.60 --depth 1 https://github.com/comfyanonymous/ComfyUI.git /workspace/ComfyUI +RUN git clone https://github.com/Comfy-Org/ComfyUI-Manager.git /workspace/ComfyUI/custom_nodes/ComfyUI-Manager # Copy ComfyStream files into ComfyUI COPY . /workspace/comfystream -RUN conda run -n comfystream --cwd /workspace/comfystream --no-capture-output pip install -r ./src/comfystream/scripts/constraints.txt +RUN conda run -n comfystream --cwd /workspace/comfystream --no-capture-output pip install -r src/comfystream/scripts/constraints.txt # Copy comfystream and example workflows to ComfyUI COPY ./workflows/comfyui/* /workspace/ComfyUI/user/default/workflows/ COPY ./test/example-512x512.png /workspace/ComfyUI/input +COPY ./docker/entrypoint.sh /workspace/comfystream/docker/entrypoint.sh # Install ComfyUI requirements -RUN conda run -n comfystream --no-capture-output --cwd /workspace/ComfyUI pip install -r requirements.txt --root-user-action=ignore +RUN conda run -n comfystream --no-capture-output --cwd /workspace/ComfyUI pip install -r requirements.txt --constraint /workspace/comfystream/src/comfystream/scripts/constraints.txt --root-user-action=ignore # Install ComfyStream requirements RUN ln -s /workspace/comfystream /workspace/ComfyUI/custom_nodes/comfystream -RUN conda run -n comfystream --no-capture-output --cwd /workspace/comfystream pip install -e . --root-user-action=ignore +RUN conda run -n comfystream --no-capture-output --cwd /workspace/comfystream pip install -e . --constraint src/comfystream/scripts/constraints.txt --root-user-action=ignore RUN conda run -n comfystream --no-capture-output --cwd /workspace/comfystream python install.py --workspace /workspace/ComfyUI # Accept a build-arg that lets CI force-invalidate setup_nodes.py ARG CACHEBUST=static ENV CACHEBUST=${CACHEBUST} -# Run setup_nodes -RUN conda run -n comfystream --no-capture-output --cwd /workspace/comfystream python src/comfystream/scripts/setup_nodes.py --workspace /workspace/ComfyUI +# Run setup_nodes with custom config if specified +RUN conda run -n comfystream --no-capture-output --cwd /workspace/comfystream python src/comfystream/scripts/setup_nodes.py --workspace /workspace/ComfyUI --config ${NODES_CONFIG} -RUN conda run -n comfystream --no-capture-output pip install "numpy<2.0.0" - -RUN conda run -n comfystream --no-capture-output pip install --no-cache-dir xformers==0.0.30 --no-deps +# Setup opencv with CUDA support +RUN conda run -n comfystream --no-capture-output --cwd /workspace/comfystream --no-capture-output docker/entrypoint.sh --opencv-cuda # Configure no environment activation by default RUN conda config --set auto_activate_base false && \ @@ -94,3 +119,8 @@ RUN conda config --set auto_activate_base false && \ RUN echo "conda activate comfystream" >> ~/.bashrc WORKDIR /workspace/comfystream + +# Run ComfyStream BYOC server from /workspace/ComfyUI within the comfystream conda env +ENTRYPOINT ["conda", "run", "--no-capture-output", "-n", "comfystream", "--cwd", "/workspace/ComfyUI", "python", "/workspace/comfystream/server/byoc.py"] +# Default args; can be overridden/appended at runtime +CMD ["--workspace", "/workspace/ComfyUI", "--host", "0.0.0.0", "--port", "8000"] diff --git a/docker/Dockerfile.opencv b/docker/Dockerfile.opencv new file mode 100644 index 000000000..848db1fe8 --- /dev/null +++ b/docker/Dockerfile.opencv @@ -0,0 +1,124 @@ +ARG BASE_IMAGE=nvidia/cuda:12.8.1-cudnn-devel-ubuntu22.04 \ + CONDA_VERSION=latest \ + PYTHON_VERSION=3.12 \ + CUDA_VERSION=12.8 + +FROM "${BASE_IMAGE}" + +ARG CONDA_VERSION \ + PYTHON_VERSION \ + CUDA_VERSION + +ENV DEBIAN_FRONTEND=noninteractive \ + CONDA_VERSION="${CONDA_VERSION}" \ + PATH="/workspace/miniconda3/bin:${PATH}" \ + PYTHON_VERSION="${PYTHON_VERSION}" \ + CUDA_VERSION="${CUDA_VERSION}" + +# System dependencies +RUN apt update && apt install -yqq \ + git \ + wget \ + nano \ + socat \ + libsndfile1 \ + build-essential \ + llvm \ + tk-dev \ + cmake \ + libgflags-dev \ + libgoogle-glog-dev \ + libjpeg-dev \ + libavcodec-dev \ + libavformat-dev \ + libavutil-dev \ + libswscale-dev && \ + rm -rf /var/lib/apt/lists/* + +RUN mkdir -p /workspace/comfystream && \ + wget "https://repo.anaconda.com/miniconda/Miniconda3-${CONDA_VERSION}-Linux-x86_64.sh" -O /tmp/miniconda.sh && \ + bash /tmp/miniconda.sh -b -p /workspace/miniconda3 && \ + eval "$(/workspace/miniconda3/bin/conda shell.bash hook)" && \ + conda tos accept --override-channels --channel https://repo.anaconda.com/pkgs/main && \ + conda tos accept --override-channels --channel https://repo.anaconda.com/pkgs/r && \ + conda create -n comfystream python="${PYTHON_VERSION}" -c conda-forge -y && \ + rm /tmp/miniconda.sh && \ + conda run -n comfystream --no-capture-output pip install numpy==1.26.4 aiortc aiohttp requests tqdm pyyaml --root-user-action=ignore + +# Clone ComfyUI +ADD --link https://github.com/comfyanonymous/ComfyUI.git /workspace/ComfyUI + +# OpenCV with CUDA support +WORKDIR /workspace + +# Clone OpenCV repositories +RUN git clone --depth 1 --branch 4.11.0 https://github.com/opencv/opencv.git && \ + git clone --depth 1 --branch 4.11.0 https://github.com/opencv/opencv_contrib.git + +# Create build directory +RUN mkdir -p /workspace/opencv/build + +# Create a toolchain file with absolute path +RUN echo '# Custom toolchain file to exclude Conda paths\n\ +\n\ +# Set system compilers\n\ +set(CMAKE_C_COMPILER "/usr/bin/gcc")\n\ +set(CMAKE_CXX_COMPILER "/usr/bin/g++")\n\ +\n\ +# Set system root directories\n\ +set(CMAKE_FIND_ROOT_PATH "/usr")\n\ +set(CMAKE_FIND_ROOT_PATH_MODE_PROGRAM NEVER)\n\ +set(CMAKE_FIND_ROOT_PATH_MODE_LIBRARY ONLY)\n\ +set(CMAKE_FIND_ROOT_PATH_MODE_INCLUDE ONLY)\n\ +set(CMAKE_FIND_ROOT_PATH_MODE_PACKAGE ONLY)\n\ +\n\ +# Explicitly exclude Conda paths\n\ +list(APPEND CMAKE_IGNORE_PATH \n\ + "/workspace/miniconda3"\n\ + "/workspace/miniconda3/envs"\n\ + "/workspace/miniconda3/envs/comfystream"\n\ + "/workspace/miniconda3/envs/comfystream/lib"\n\ +)\n\ +\n\ +# Set RPATH settings\n\ +set(CMAKE_SKIP_BUILD_RPATH FALSE)\n\ +set(CMAKE_BUILD_WITH_INSTALL_RPATH FALSE)\n\ +set(CMAKE_INSTALL_RPATH "/usr/local/lib:/usr/lib/x86_64-linux-gnu")\n\ +set(PYTHON_LIBRARY "/workspace/miniconda3/envs/comfystream/lib/")\n\ +set(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE)' > /workspace/custom_toolchain.cmake + +# Set environment variables for OpenCV +RUN echo 'export LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH' >> /root/.bashrc + +# Build and install OpenCV with CUDA support +RUN cd /workspace/opencv/build && \ + # Build OpenCV + cmake \ + -D CMAKE_TOOLCHAIN_FILE=/workspace/custom_toolchain.cmake \ + -D CMAKE_BUILD_TYPE=RELEASE \ + -D CMAKE_INSTALL_PREFIX=/usr/local \ + -D WITH_CUDA=ON \ + -D WITH_CUDNN=ON \ + -D WITH_CUBLAS=ON \ + -D WITH_TBB=ON \ + -D CUDA_ARCH_LIST="8.0+PTX" \ + -D OPENCV_DNN_CUDA=ON \ + -D OPENCV_ENABLE_NONFREE=ON \ + -D CUDA_TOOLKIT_ROOT_DIR=/usr/local/cuda \ + -D OPENCV_EXTRA_MODULES_PATH=/workspace/opencv_contrib/modules \ + -D PYTHON3_EXECUTABLE=/workspace/miniconda3/envs/comfystream/bin/python3.12 \ + -D PYTHON_INCLUDE_DIR=/workspace/miniconda3/envs/comfystream/include/python3.12 \ + -D PYTHON_LIBRARY=/workspace/miniconda3/envs/comfystream/lib/libpython3.12.so \ + -D HAVE_opencv_python3=ON \ + -D WITH_NVCUVID=OFF \ + -D WITH_NVCUVENC=OFF \ + .. && \ + make -j$(nproc) && \ + make install && \ + ldconfig + +# Configure no environment activation by default +RUN conda config --set auto_activate_base false && \ + conda init bash + +WORKDIR /workspace/comfystream diff --git a/docker/README.md b/docker/README.md index ad691aceb..cabb2fd48 100644 --- a/docker/README.md +++ b/docker/README.md @@ -1,4 +1,4 @@ -# ComfyStream Docker +# ComfyStream Docker Build Configuration This folder contains the Docker files that can be used to run ComfyStream in a containerized fashion or to work on the codebase within a dev container. This README contains the general usage instructions while the [Devcontainer Readme](../.devcontainer/README.md) contains instructions on how to use Comfystream inside a dev container and get quickly started with your development journey. @@ -7,21 +7,48 @@ This folder contains the Docker files that can be used to run ComfyStream in a c - [Dockerfile](Dockerfile) - The main Dockerfile that can be used to run ComfyStream in a containerized fashion. - [Dockerfile.base](Dockerfile.base) - The base Dockerfile that can be used to build the base image for ComfyStream. -## Pre-requisites +## Building with Custom Nodes Configuration -- [Docker](https://docs.docker.com/get-docker/) -- [Nvidia Container Toolkit](https://docs.nvidia.com/datacenter/cloud-native/container-toolkit/latest/install-guide.html) +The base Docker image supports specifying a custom nodes configuration file during build time using the `NODES_CONFIG` build argument. -## Usage +### Usage -### Build the Base Image +#### Default build (uses `nodes.yaml`) +```bash +docker build -t livepeer/comfyui-base -f docker/Dockerfile . +``` -To build the base image, run the following command: +#### Build with custom config from configs directory +```bash +docker build -f docker/Dockerfile.base \ + --build-arg NODES_CONFIG=nodes-streamdiffusion.yaml \ + -t comfyui-base:streamdiffusion . +``` +#### Build with config from absolute path ```bash -docker build -t livepeer/comfyui-base -f docker/Dockerfile.base . +docker build -f docker/Dockerfile.base \ + --build-arg NODES_CONFIG=/path/to/custom-nodes.yaml \ + -t comfyui-base:custom . ``` +### Available Build Arguments + +| Argument | Default | Description | +|----------|---------|-------------| +| `BASE_IMAGE` | `nvidia/cuda:12.8.1-cudnn-devel-ubuntu22.04` | Base CUDA image | +| `CONDA_VERSION` | `latest` | Miniconda version | +| `PYTHON_VERSION` | `3.12` | Python version | +| `NODES_CONFIG` | `nodes.yaml` | Nodes configuration file (filename or path) | +| `CACHEBUST` | `static` | Cache invalidation for node setup | + +### Configuration Files in configs/ + +- **`nodes.yaml`** - Full node configuration (default) +- **`nodes-streamdiffusion.yaml`** - Minimal set of nodes for faster builds + +### Examples + ### Build the Main Image To build the main image, run the following command: diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh index ef55c77e2..e6d44463a 100755 --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -130,6 +130,14 @@ if [ "$1" = "--build-engines" ]; then echo "Engine for DepthAnything2 (large) already exists at ${DEPTH_ANYTHING_DIR}/${DEPTH_ANYTHING_ENGINE_LARGE}, skipping..." fi + # Build Engines for FasterLivePortrait + if [ ! -f "$FASTERLIVEPORTRAIT_DIR/warping_spade-fix.trt" ]; then + cd "$FASTERLIVEPORTRAIT_DIR" + bash /workspace/ComfyUI/custom_nodes/ComfyUI-FasterLivePortrait/scripts/build_fasterliveportrait_trt.sh "${FASTERLIVEPORTRAIT_DIR}" "${FASTERLIVEPORTRAIT_DIR}" "${FASTERLIVEPORTRAIT_DIR}" + else + echo "Engines for FasterLivePortrait already exists, skipping..." + fi + # Build Engine for StreamDiffusion if [ ! -f "$TENSORRT_DIR/StreamDiffusion-engines/stabilityai/sd-turbo--lcm_lora-True--tiny_vae-True--max_batch-3--min_batch-3--mode-img2img/unet.engine.opt.onnx" ]; then cd /workspace/ComfyUI/custom_nodes/ComfyUI-StreamDiffusion @@ -158,7 +166,7 @@ if [ "$1" = "--opencv-cuda" ]; then if [ ! -f "/workspace/comfystream/opencv-cuda-release.tar.gz" ]; then # Download and extract OpenCV CUDA build DOWNLOAD_NAME="opencv-cuda-release.tar.gz" - wget -q -O "$DOWNLOAD_NAME" https://github.com/JJassonn69/ComfyUI-Stream-Pack/releases/download/v2/opencv-cuda-release.tar.gz + wget -q -O "$DOWNLOAD_NAME" https://github.com/JJassonn69/ComfyUI-Stream-Pack/releases/download/v2.1/opencv-cuda-release.tar.gz tar -xzf "$DOWNLOAD_NAME" -C /workspace/comfystream/ rm "$DOWNLOAD_NAME" else @@ -166,15 +174,6 @@ if [ "$1" = "--opencv-cuda" ]; then fi # Install required libraries - apt-get update && apt-get install -y \ - libgflags-dev \ - libgoogle-glog-dev \ - libjpeg-dev \ - libavcodec-dev \ - libavformat-dev \ - libavutil-dev \ - libswscale-dev - # Remove existing cv2 package SITE_PACKAGES_DIR="/workspace/miniconda3/envs/comfystream/lib/python3.12/site-packages" rm -rf "${SITE_PACKAGES_DIR}/cv2"* diff --git a/example.py b/example.py index 6d68aac13..9f7567306 100644 --- a/example.py +++ b/example.py @@ -1,7 +1,8 @@ -import torch import asyncio import json +import torch + from comfystream.client import ComfyStreamClient diff --git a/install.py b/install.py index 385d160fd..7552ea8a9 100644 --- a/install.py +++ b/install.py @@ -1,19 +1,20 @@ -import os -import subprocess import argparse import logging +import os import pathlib +import subprocess import sys -import tarfile import tempfile import urllib.request -import toml import zipfile + +import toml from comfy_compatibility.workspace import auto_patch_workspace_and_restart -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) + def get_project_version(workspace: str) -> str: """Read project version from pyproject.toml""" pyproject_path = os.path.join(workspace, "pyproject.toml") @@ -25,19 +26,25 @@ def get_project_version(workspace: str) -> str: logger.error(f"Failed to read version from pyproject.toml: {e}") return "unknown" + def download_and_extract_ui_files(version: str): """Download and extract UI files to the workspace""" output_dir = os.path.join(os.getcwd(), "nodes", "web", "static") pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True) - base_url = urllib.parse.urljoin("https://github.com/livepeer/comfystream/releases/download/", f"v{version}/comfystream-uikit.zip") - fallback_url = "https://github.com/livepeer/comfystream/releases/latest/download/comfystream-uikit.zip" - + base_url = urllib.parse.urljoin( + "https://github.com/livepeer/comfystream/releases/download/", + f"v{version}/comfystream-uikit.zip", + ) + fallback_url = ( + "https://github.com/livepeer/comfystream/releases/latest/download/comfystream-uikit.zip" + ) + # Create a temporary directory instead of a temporary file with tempfile.TemporaryDirectory() as temp_dir: # Define the path for the downloaded file download_path = os.path.join(temp_dir, "comfystream-uikit.zip") - + # Download zip file logger.info(f"Downloading {base_url}") try: @@ -53,23 +60,27 @@ def download_and_extract_ui_files(version: str): else: logger.error(f"Error downloading package: {e}") raise - + # Extract contents try: logger.info(f"Extracting files to {output_dir}") - with zipfile.ZipFile(download_path, 'r') as zip_ref: + with zipfile.ZipFile(download_path, "r") as zip_ref: zip_ref.extractall(path=output_dir) except Exception as e: logger.error(f"Error extracting files: {e}") raise + if __name__ == "__main__": parser = argparse.ArgumentParser(description="Install custom node requirements") parser.add_argument( - "--workspace", default=os.environ.get('COMFY_UI_WORKSPACE', None), required=False, help="Set Comfy workspace" + "--workspace", + default=os.environ.get("COMFY_UI_WORKSPACE", None), + required=False, + help="Set Comfy workspace", ) args = parser.parse_args() - + workspace = args.workspace if workspace is None: # Look up to 3 directories up for ComfyUI @@ -93,12 +104,14 @@ def download_and_extract_ui_files(version: str): subprocess.check_call([sys.executable, "-m", "pip", "install", "-e", "."]) if workspace is None: - logger.warning("No ComfyUI workspace found. Please specify a valid workspace path to fully install") - + logger.warning( + "No ComfyUI workspace found. Please specify a valid workspace path to fully install" + ) + if workspace is not None: logger.info("Patching ComfyUI workspace...") auto_patch_workspace_and_restart(workspace) - + logger.info("Downloading and extracting UI files...") version = get_project_version(os.getcwd()) download_and_extract_ui_files(version) diff --git a/nodes/__init__.py b/nodes/__init__.py index e3ca8f2ae..a590e9ccc 100644 --- a/nodes/__init__.py +++ b/nodes/__init__.py @@ -1,5 +1,7 @@ """ComfyStream nodes package""" -from comfy_compatibility.imports import ImportContext, SITE_PACKAGES, MAIN_PY + +from comfy_compatibility.imports import MAIN_PY, SITE_PACKAGES, ImportContext + with ImportContext("comfy", "comfy_extras", order=[SITE_PACKAGES, MAIN_PY]): from .audio_utils import * from .tensor_utils import * @@ -13,15 +15,16 @@ # Import and update mappings from submodules for module in [audio_utils, tensor_utils, video_stream_utils, api, web]: - if hasattr(module, 'NODE_CLASS_MAPPINGS'): + if hasattr(module, "NODE_CLASS_MAPPINGS"): NODE_CLASS_MAPPINGS.update(module.NODE_CLASS_MAPPINGS) - if hasattr(module, 'NODE_DISPLAY_NAME_MAPPINGS'): + if hasattr(module, "NODE_DISPLAY_NAME_MAPPINGS"): NODE_DISPLAY_NAME_MAPPINGS.update(module.NODE_DISPLAY_NAME_MAPPINGS) # Web directory for UI components import os + WEB_DIRECTORY = os.path.join(os.path.dirname(os.path.realpath(__file__)), "web") NODE_DISPLAY_NAME_MAPPINGS["ComfyStreamLauncher"] = "Launch ComfyStream 🚀" -__all__ = ['NODE_CLASS_MAPPINGS', 'NODE_DISPLAY_NAME_MAPPINGS'] +__all__ = ["NODE_CLASS_MAPPINGS", "NODE_DISPLAY_NAME_MAPPINGS"] diff --git a/nodes/api/__init__.py b/nodes/api/__init__.py index ebc480859..eb6dbdf81 100644 --- a/nodes/api/__init__.py +++ b/nodes/api/__init__.py @@ -1,17 +1,22 @@ """ComfyStream API implementation""" + +import logging import os -import webbrowser -from server import PromptServer -from aiohttp import web import pathlib -import logging +import webbrowser + import aiohttp -from ..server_manager import LocalComfyStreamServer +from aiohttp import web + +from server import PromptServer + from .. import settings_storage +from ..server_manager import LocalComfyStreamServer routes = None server_manager = None + # Middleware to add Cache-Control: no-cache for index.html @web.middleware async def cache_control_middleware(request, handler): @@ -20,19 +25,22 @@ async def cache_control_middleware(request, handler): target_path = f"{STATIC_ROUTE}/index.html" # Log comparison details if request.path == target_path: - response.headers['Cache-Control'] = 'no-cache' - logging.debug(f"[CacheMiddleware] Added Cache-Control: no-cache for {request.path}") # Kept debug log + response.headers["Cache-Control"] = "no-cache" + logging.debug( + f"[CacheMiddleware] Added Cache-Control: no-cache for {request.path}" + ) # Kept debug log return response + # Only set up routes if we're in the main ComfyUI instance -if hasattr(PromptServer.instance, 'routes') and hasattr(PromptServer.instance.routes, 'static'): +if hasattr(PromptServer.instance, "routes") and hasattr(PromptServer.instance.routes, "static"): routes = PromptServer.instance.routes - + # Get the path to the static build directory - STATIC_DIR = pathlib.Path(__file__).parent.parent.parent / "nodes" / "web" / "static" - + STATIC_DIR = pathlib.Path(__file__).parent.parent.parent / "nodes" / "web" / "static" + # Dynamically determine the extension name from the directory structure - extension_name = "comfystream" # Define a local default for the try/except block + extension_name = "comfystream" # Define a local default for the try/except block try: # Get the parent directory of the current file # Then navigate up to get the extension root directory @@ -41,7 +49,9 @@ async def cache_control_middleware(request, handler): extension_name = EXTENSION_ROOT.name logging.info(f"Detected extension name: {extension_name}") except Exception as e: - logging.warning(f"Failed to get extension name dynamically: {e}, using fallback '{extension_name}'") + logging.warning( + f"Failed to get extension name dynamically: {e}, using fallback '{extension_name}'" + ) # Fallback name is already set by the initial local default # Define module-level constants AFTER determination @@ -53,30 +63,34 @@ async def cache_control_middleware(request, handler): routes.static(STATIC_ROUTE, str(STATIC_DIR), append_version=False, follow_symlinks=True) # Add the cache control middleware to the app - if hasattr(PromptServer.instance, 'app'): + if hasattr(PromptServer.instance, "app"): PromptServer.instance.app.middlewares.append(cache_control_middleware) logging.info(f"Added ComfyStream cache control middleware for {STATIC_ROUTE}/index.html") else: - logging.warning("Could not add ComfyStream cache control middleware: PromptServer.instance.app not found.") + logging.warning( + "Could not add ComfyStream cache control middleware: PromptServer.instance.app not found." + ) # Create server manager instance server_manager = LocalComfyStreamServer() - - @routes.get('/comfystream/extension_info') + + @routes.get("/comfystream/extension_info") async def get_extension_info(request): """Return extension information including name and paths""" try: - return web.json_response({ - "success": True, - "extension_name": EXTENSION_NAME, - "static_route": STATIC_ROUTE, - "ui_url": f"{STATIC_ROUTE}/index.html" - }) + return web.json_response( + { + "success": True, + "extension_name": EXTENSION_NAME, + "static_route": STATIC_ROUTE, + "ui_url": f"{STATIC_ROUTE}/index.html", + } + ) except Exception as e: logging.error(f"Error getting extension info: {str(e)}") return web.json_response({"success": False, "error": str(e)}, status=500) - - @routes.post('/api/offer') + + @routes.post("/api/offer") async def proxy_offer(request): """Proxy offer requests to the ComfyStream server""" try: @@ -89,64 +103,65 @@ async def proxy_offer(request): async with session.post( f"{target_url}/offer", json={"prompts": data.get("prompts"), "offer": data.get("offer")}, - headers={"Content-Type": "application/json"} + headers={"Content-Type": "application/json"}, ) as response: if not response.ok: return web.json_response( - {"error": f"Server error: {response.status}"}, - status=response.status + {"error": f"Server error: {response.status}"}, status=response.status ) return web.json_response(await response.json()) except Exception as e: logging.error(f"Error proxying offer: {str(e)}") return web.json_response({"error": str(e)}, status=500) - @routes.post('/comfystream/control') + @routes.post("/comfystream/control") async def control_server(request): """Handle server control requests""" try: data = await request.json() action = data.get("action") settings = data.get("settings", {}) - + # Extract host and port from settings if provided host = settings.get("host") if settings else None port = settings.get("port") if settings else None - + if action == "status": # Simply return the current server status - return web.json_response({ - "success": True, - "status": server_manager.get_status() - }) + return web.json_response({"success": True, "status": server_manager.get_status()}) elif action == "start": success = await server_manager.start(port=port, host=host) - return web.json_response({ - "success": success, - "status": server_manager.get_status() - }) + return web.json_response( + {"success": success, "status": server_manager.get_status()} + ) elif action == "stop": try: success = await server_manager.stop() - return web.json_response({ - "success": success, - "status": server_manager.get_status() - }) + return web.json_response( + {"success": success, "status": server_manager.get_status()} + ) except Exception as e: logging.error(f"Error stopping server: {str(e)}") # Force cleanup even if the normal stop fails server_manager.cleanup() - return web.json_response({ - "success": True, - "status": {"running": False, "port": None, "host": None, "pid": None, "type": "local"}, - "message": "Forced server shutdown due to error" - }) + return web.json_response( + { + "success": True, + "status": { + "running": False, + "port": None, + "host": None, + "pid": None, + "type": "local", + }, + "message": "Forced server shutdown due to error", + } + ) elif action == "restart": success = await server_manager.restart(port=port, host=host) - return web.json_response({ - "success": success, - "status": server_manager.get_status() - }) + return web.json_response( + {"success": success, "status": server_manager.get_status()} + ) else: return web.json_response({"error": "Invalid action"}, status=400) except Exception as e: @@ -155,17 +170,25 @@ async def control_server(request): if data and data.get("action") == "stop": try: server_manager.cleanup() - return web.json_response({ - "success": True, - "status": {"running": False, "port": None, "host": None, "pid": None, "type": "local"}, - "message": "Forced server shutdown due to error" - }) + return web.json_response( + { + "success": True, + "status": { + "running": False, + "port": None, + "host": None, + "pid": None, + "type": "local", + }, + "message": "Forced server shutdown due to error", + } + ) except Exception as cleanup_error: logging.error(f"Error during forced cleanup: {str(cleanup_error)}") - + return web.json_response({"error": str(e)}, status=500) - @routes.get('/comfystream/settings') + @routes.get("/comfystream/settings") async def get_settings(request): """Get ComfyStream settings""" try: @@ -174,63 +197,59 @@ async def get_settings(request): except Exception as e: logging.error(f"Error getting settings: {str(e)}") return web.json_response({"error": str(e)}, status=500) - - @routes.post('/comfystream/settings') + + @routes.post("/comfystream/settings") async def update_settings(request): """Update ComfyStream settings""" try: data = await request.json() success = settings_storage.update_settings(data) - return web.json_response({ - "success": success, - "settings": settings_storage.load_settings() - }) + return web.json_response( + {"success": success, "settings": settings_storage.load_settings()} + ) except Exception as e: logging.error(f"Error updating settings: {str(e)}") return web.json_response({"error": str(e)}, status=500) - - @routes.post('/comfystream/settings/configuration') + + @routes.post("/comfystream/settings/configuration") async def manage_configuration(request): """Add, remove, or select a configuration""" try: data = await request.json() action = data.get("action") - + if action == "add": name = data.get("name") host = data.get("host") port = data.get("port") if not name or not host or not port: return web.json_response({"error": "Missing required parameters"}, status=400) - + success = settings_storage.add_configuration(name, host, port) - return web.json_response({ - "success": success, - "settings": settings_storage.load_settings() - }) - + return web.json_response( + {"success": success, "settings": settings_storage.load_settings()} + ) + elif action == "remove": index = data.get("index") if index is None: return web.json_response({"error": "Missing index parameter"}, status=400) - + success = settings_storage.remove_configuration(index) - return web.json_response({ - "success": success, - "settings": settings_storage.load_settings() - }) - + return web.json_response( + {"success": success, "settings": settings_storage.load_settings()} + ) + elif action == "select": index = data.get("index") if index is None: return web.json_response({"error": "Missing index parameter"}, status=400) - + success = settings_storage.select_configuration(index) - return web.json_response({ - "success": success, - "settings": settings_storage.load_settings() - }) - + return web.json_response( + {"success": success, "settings": settings_storage.load_settings()} + ) + else: return web.json_response({"error": "Invalid action"}, status=400) except Exception as e: diff --git a/nodes/audio_utils/__init__.py b/nodes/audio_utils/__init__.py index bc090689a..f9c84d1f8 100644 --- a/nodes/audio_utils/__init__.py +++ b/nodes/audio_utils/__init__.py @@ -1,6 +1,6 @@ from .load_audio_tensor import LoadAudioTensor -from .save_audio_tensor import SaveAudioTensor from .pitch_shift import PitchShifter +from .save_audio_tensor import SaveAudioTensor NODE_CLASS_MAPPINGS = { "LoadAudioTensor": LoadAudioTensor, diff --git a/nodes/audio_utils/load_audio_tensor.py b/nodes/audio_utils/load_audio_tensor.py index ece7fca31..919a75991 100644 --- a/nodes/audio_utils/load_audio_tensor.py +++ b/nodes/audio_utils/load_audio_tensor.py @@ -1,71 +1,101 @@ +import queue + import numpy as np import torch from comfystream import tensor_cache +from comfystream.exceptions import ComfyStreamAudioBufferError, ComfyStreamInputTimeoutError + class LoadAudioTensor: - CATEGORY = "audio_utils" + CATEGORY = "ComfyStream/Loaders" RETURN_TYPES = ("AUDIO",) RETURN_NAMES = ("audio",) FUNCTION = "execute" - + DESCRIPTION = "Load audio tensor from ComfyStream input with timeout." + def __init__(self): self.audio_buffer = np.empty(0, dtype=np.int16) self.buffer_samples = None self.sample_rate = None - + self.leftover = np.empty(0, dtype=np.int16) + @classmethod - def INPUT_TYPES(s): + def INPUT_TYPES(cls): return { "required": { - "buffer_size": ("FLOAT", {"default": 500.0}), - } + "buffer_size": ( + "FLOAT", + {"default": 500.0, "tooltip": "Audio buffer size in milliseconds"}, + ), + }, + "optional": { + "timeout_seconds": ( + "FLOAT", + { + "default": 1.0, + "min": 0.1, + "max": 30.0, + "step": 0.1, + "tooltip": "Timeout in seconds", + }, + ), + }, } - + @classmethod - def IS_CHANGED(**kwargs): + def IS_CHANGED(cls, **kwargs): return float("nan") - - def execute(self, buffer_size): + + def execute(self, buffer_size: float, timeout_seconds: float = 1.0): + # Initialize if needed if self.sample_rate is None or self.buffer_samples is None: - frame = tensor_cache.audio_inputs.get(block=True) - self.sample_rate = frame.sample_rate - self.buffer_samples = int(self.sample_rate * buffer_size / 1000) - self.leftover = frame.side_data.input - - if self.leftover.shape[0] < self.buffer_samples: + try: + frame = tensor_cache.audio_inputs.get(block=True, timeout=timeout_seconds) + self.sample_rate = frame.sample_rate + self.buffer_samples = int(self.sample_rate * buffer_size / 1000) + self.leftover = frame.side_data.input + except queue.Empty: + raise ComfyStreamInputTimeoutError("audio", timeout_seconds) + + # Use leftover data if available + if self.leftover.shape[0] >= self.buffer_samples: + buffered_audio = self.leftover[: self.buffer_samples] + self.leftover = self.leftover[self.buffer_samples :] + else: + # Collect more audio chunks chunks = [self.leftover] if self.leftover.size > 0 else [] total_samples = self.leftover.shape[0] - + while total_samples < self.buffer_samples: - frame = tensor_cache.audio_inputs.get(block=True) - if frame.sample_rate != self.sample_rate: - raise ValueError("Sample rate mismatch") - chunks.append(frame.side_data.input) - total_samples += frame.side_data.input.shape[0] - + try: + frame = tensor_cache.audio_inputs.get(block=True, timeout=timeout_seconds) + if frame.sample_rate != self.sample_rate: + raise ValueError( + f"Sample rate mismatch: expected {self.sample_rate}Hz, got {frame.sample_rate}Hz" + ) + chunks.append(frame.side_data.input) + total_samples += frame.side_data.input.shape[0] + except queue.Empty: + raise ComfyStreamAudioBufferError( + timeout_seconds, self.buffer_samples, total_samples + ) + merged_audio = np.concatenate(chunks, dtype=np.int16) - buffered_audio = merged_audio[:self.buffer_samples] - self.leftover = merged_audio[self.buffer_samples:] - else: - buffered_audio = self.leftover[:self.buffer_samples] - self.leftover = self.leftover[self.buffer_samples:] - - # Convert numpy array to torch tensor and normalize int16 to float32 + buffered_audio = merged_audio[: self.buffer_samples] + self.leftover = ( + merged_audio[self.buffer_samples :] + if merged_audio.shape[0] > self.buffer_samples + else np.empty(0, dtype=np.int16) + ) + + # Convert to ComfyUI AUDIO format waveform_tensor = torch.from_numpy(buffered_audio.astype(np.float32) / 32768.0) - + # Ensure proper tensor shape: (batch, channels, samples) if waveform_tensor.dim() == 1: - # Mono: (samples,) -> (1, 1, samples) waveform_tensor = waveform_tensor.unsqueeze(0).unsqueeze(0) elif waveform_tensor.dim() == 2: - # Assume (channels, samples) and add batch dimension waveform_tensor = waveform_tensor.unsqueeze(0) - - # Return AUDIO dictionary format - audio_dict = { - "waveform": waveform_tensor, - "sample_rate": self.sample_rate - } - - return (audio_dict,) + + return ({"waveform": waveform_tensor, "sample_rate": self.sample_rate},) diff --git a/nodes/audio_utils/pitch_shift.py b/nodes/audio_utils/pitch_shift.py index 2fba9ee59..6e2456a2a 100644 --- a/nodes/audio_utils/pitch_shift.py +++ b/nodes/audio_utils/pitch_shift.py @@ -1,23 +1,19 @@ -import numpy as np import librosa +import numpy as np import torch + class PitchShifter: CATEGORY = "audio_utils" RETURN_TYPES = ("AUDIO",) FUNCTION = "execute" - + @classmethod def INPUT_TYPES(cls): return { "required": { "audio": ("AUDIO",), - "pitch_shift": ("FLOAT", { - "default": 4.0, - "min": 0.0, - "max": 12.0, - "step": 0.5 - }), + "pitch_shift": ("FLOAT", {"default": 4.0, "min": 0.0, "max": 12.0, "step": 0.5}), } } @@ -29,37 +25,36 @@ def execute(self, audio, pitch_shift): # Extract waveform and sample rate from AUDIO format waveform = audio["waveform"] sample_rate = audio["sample_rate"] - + # Convert tensor to numpy and ensure proper format for librosa if isinstance(waveform, torch.Tensor): audio_numpy = waveform.squeeze().cpu().numpy() else: audio_numpy = waveform.squeeze() - + # Ensure float32 format and proper normalization for librosa processing if audio_numpy.dtype != np.float32: audio_numpy = audio_numpy.astype(np.float32) - + # Check if data needs normalization (librosa expects [-1, 1] range) max_abs_val = np.abs(audio_numpy).max() if max_abs_val > 1.0: # Data appears to be in int16 range, normalize it audio_numpy = audio_numpy / 32768.0 - + # Apply pitch shift - shifted_audio = librosa.effects.pitch_shift(y=audio_numpy, sr=sample_rate, n_steps=pitch_shift) - + shifted_audio = librosa.effects.pitch_shift( + y=audio_numpy, sr=sample_rate, n_steps=pitch_shift + ) + # Convert back to tensor and restore original shape shifted_tensor = torch.from_numpy(shifted_audio).float() if waveform.dim() == 3: # (batch, channels, samples) shifted_tensor = shifted_tensor.unsqueeze(0).unsqueeze(0) - elif waveform.dim() == 2: # (channels, samples) + elif waveform.dim() == 2: # (channels, samples) shifted_tensor = shifted_tensor.unsqueeze(0) - + # Return AUDIO format - result_audio = { - "waveform": shifted_tensor, - "sample_rate": sample_rate - } - + result_audio = {"waveform": shifted_tensor, "sample_rate": sample_rate} + return (result_audio,) diff --git a/nodes/audio_utils/save_audio_tensor.py b/nodes/audio_utils/save_audio_tensor.py index 6b7b0281c..eed9bf6ad 100644 --- a/nodes/audio_utils/save_audio_tensor.py +++ b/nodes/audio_utils/save_audio_tensor.py @@ -1,20 +1,17 @@ import numpy as np + from comfystream import tensor_cache + class SaveAudioTensor: CATEGORY = "audio_utils" RETURN_TYPES = () FUNCTION = "execute" OUTPUT_NODE = True - @classmethod def INPUT_TYPES(s): - return { - "required": { - "audio": ("AUDIO",) - } - } + return {"required": {"audio": ("AUDIO",)}} @classmethod def IS_CHANGED(s): @@ -23,24 +20,24 @@ def IS_CHANGED(s): def execute(self, audio): # Extract waveform tensor from AUDIO format waveform = audio["waveform"] - + # Convert to numpy and flatten for pipeline compatibility - if hasattr(waveform, 'cpu'): + if hasattr(waveform, "cpu"): # PyTorch tensor waveform_numpy = waveform.squeeze().cpu().numpy() else: # Already numpy waveform_numpy = waveform.squeeze() - + # Ensure 1D array for pipeline buffer concatenation if waveform_numpy.ndim > 1: waveform_numpy = waveform_numpy.flatten() - + # Convert to int16 if needed (pipeline expects int16) if waveform_numpy.dtype == np.float32: waveform_numpy = (waveform_numpy * 32767).astype(np.int16) elif waveform_numpy.dtype != np.int16: waveform_numpy = waveform_numpy.astype(np.int16) - + tensor_cache.audio_outputs.put_nowait(waveform_numpy) return (audio,) diff --git a/nodes/server_manager.py b/nodes/server_manager.py index 9b55f4f57..cd29bc07f 100644 --- a/nodes/server_manager.py +++ b/nodes/server_manager.py @@ -1,88 +1,83 @@ """ComfyStream server management module""" -import os -import sys -import subprocess -import socket -import signal + +import asyncio import atexit import logging -import urllib.request +import os +import socket +import subprocess +import sys +import threading import urllib.error -from pathlib import Path -import time -import asyncio +import urllib.request from abc import ABC, abstractmethod -import threading +from pathlib import Path # Configure logging to output to console -logging.basicConfig( - level=logging.INFO, - format='[ComfyStream] %(message)s', - stream=sys.stdout -) +logging.basicConfig(level=logging.INFO, format="[ComfyStream] %(message)s", stream=sys.stdout) # Set up Windows specific event loop policy -if sys.platform == 'win32': - if hasattr(asyncio, 'WindowsSelectorEventLoopPolicy'): +if sys.platform == "win32": + if hasattr(asyncio, "WindowsSelectorEventLoopPolicy"): asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) - + class ComfyStreamServerBase(ABC): """Abstract base class for ComfyStream server management""" - + def __init__(self, host="0.0.0.0", port=None): self.host = host self.port = port self.is_running = False logging.info(f"Initializing {self.__class__.__name__}") - + @abstractmethod async def start(self, port=None, host=None) -> bool: """Start the ComfyStream server - + Args: port: Optional port to use. If None, implementation should choose a port. host: Optional host to use. If None, implementation should use the default host. - + Returns: bool: True if server started successfully, False otherwise """ pass - + @abstractmethod async def stop(self) -> bool: """Stop the ComfyStream server - + Returns: bool: True if server stopped successfully, False otherwise """ pass - + @abstractmethod def get_status(self) -> dict: """Get current server status - + Returns: dict: Server status information """ pass - + @abstractmethod def check_server_health(self) -> bool: """Check if server is responding to health checks - + Returns: bool: True if server is healthy, False otherwise """ pass - + async def restart(self, port=None, host=None) -> bool: """Restart the ComfyStream server - + Args: port: Optional port to use. If None, use the current port. host: Optional host to use. If None, use the current host. - + Returns: bool: True if server restarted successfully, False otherwise """ @@ -93,11 +88,18 @@ async def restart(self, port=None, host=None) -> bool: await self.stop() return await self.start(port=port_to_use, host=host_to_use) + class LocalComfyStreamServer(ComfyStreamServerBase): """Local ComfyStream server implementation""" - - def __init__(self, host="0.0.0.0", start_port=8889, max_port=65535, - health_check_timeout=30, health_check_interval=1): + + def __init__( + self, + host="0.0.0.0", + start_port=8889, + max_port=65535, + health_check_timeout=30, + health_check_interval=1, + ): super().__init__(host=host) self.process = None self.start_port = start_port @@ -105,7 +107,7 @@ def __init__(self, host="0.0.0.0", start_port=8889, max_port=65535, self.health_check_timeout = health_check_timeout self.health_check_interval = health_check_interval atexit.register(self.cleanup) - + def find_available_port(self): """Find an available port starting from start_port""" port = self.start_port @@ -123,7 +125,7 @@ def check_server_health(self): """Check if server is responding to health checks""" if not self.port: return False - + url = f"http://{self.host}:{self.port}" try: response = urllib.request.urlopen(url) @@ -133,7 +135,7 @@ def check_server_health(self): def log_subprocess_output(self, pipe, level): """Log the output from the subprocess to the logger.""" - for line in iter(pipe.readline, b''): + for line in iter(pipe.readline, b""): logging.log(level, line.decode().strip()) async def start(self, port=None, host=None): @@ -146,37 +148,48 @@ async def start(self, port=None, host=None): self.port = port or self.find_available_port() if host is not None: self.host = host - + # Get the path to the ComfyStream server directory and script server_dir = Path(__file__).parent.parent / "server" server_script = server_dir / "app.py" logging.info(f"Server script: {server_script}") - + # Get ComfyUI workspace path (which is where we'll run from) comfyui_workspace = Path(__file__).parent.parent.parent.parent logging.info(f"ComfyUI workspace: {comfyui_workspace}") - + # Use the system Python (which should have ComfyStream installed) - cmd = [sys.executable, "-u", str(server_script), - "--port", str(self.port), - "--host", str(self.host), - "--workspace", str(comfyui_workspace)] - + cmd = [ + sys.executable, + "-u", + str(server_script), + "--port", + str(self.port), + "--host", + str(self.host), + "--workspace", + str(comfyui_workspace), + ] + logging.info(f"Starting server with command: {' '.join(cmd)}") - + # Start process with output going to pipes self.process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=str(comfyui_workspace), # Run from ComfyUI root - env={**os.environ, 'PYTHONUNBUFFERED': '1'} + env={**os.environ, "PYTHONUNBUFFERED": "1"}, ) - + # Start threads to log stdout and stderr - threading.Thread(target=self.log_subprocess_output, args=(self.process.stdout, logging.INFO)).start() - threading.Thread(target=self.log_subprocess_output, args=(self.process.stderr, logging.ERROR)).start() - + threading.Thread( + target=self.log_subprocess_output, args=(self.process.stdout, logging.INFO) + ).start() + threading.Thread( + target=self.log_subprocess_output, args=(self.process.stderr, logging.ERROR) + ).start() + # Wait for server to start responding logging.info("Waiting for server to start...") for _ in range(self.health_check_timeout): @@ -185,15 +198,19 @@ async def start(self, port=None, host=None): break await asyncio.sleep(self.health_check_interval) else: - raise RuntimeError(f"Server failed to start after {self.health_check_timeout} seconds") - + raise RuntimeError( + f"Server failed to start after {self.health_check_timeout} seconds" + ) + if self.process.poll() is not None: raise RuntimeError(f"Server failed to start (exit code: {self.process.poll()})") - + self.is_running = True - logging.info(f"ComfyStream server started on port {self.port} (PID: {self.process.pid})") + logging.info( + f"ComfyStream server started on port {self.port} (PID: {self.process.pid})" + ) return True - + except Exception as e: logging.error(f"Error starting ComfyStream server: {str(e)}") self.cleanup() @@ -204,7 +221,7 @@ async def stop(self): if not self.is_running: logging.info("Server is not running") return False - + try: self.cleanup() logging.info("ComfyStream server stopped") @@ -220,7 +237,7 @@ def get_status(self): "port": self.port, "host": self.host, "pid": self.process.pid if self.process else None, - "type": "local" + "type": "local", } logging.info(f"Server status: {status}") return status @@ -247,4 +264,4 @@ def cleanup(self): except Exception as e: logging.error(f"Error cleaning up server process: {str(e)}") self.process = None - self.is_running = False \ No newline at end of file + self.is_running = False diff --git a/nodes/settings_storage.py b/nodes/settings_storage.py index 5d888b611..8b9ecf54b 100644 --- a/nodes/settings_storage.py +++ b/nodes/settings_storage.py @@ -1,53 +1,53 @@ """ComfyStream server-side settings storage module""" -import os + import json import logging +import os import threading from pathlib import Path # Configure logging -logging.basicConfig( - level=logging.INFO, - format='[ComfyStream Settings] %(message)s' -) +logging.basicConfig(level=logging.INFO, format="[ComfyStream Settings] %(message)s") # Default settings DEFAULT_SETTINGS = { "host": "0.0.0.0", "port": 8889, "configurations": [], - "selectedConfigIndex": -1 + "selectedConfigIndex": -1, } # Lock for thread-safe file operations settings_lock = threading.Lock() + def get_settings_file_path(): """Get the path to the settings file""" # Store settings in the extension directory extension_dir = Path(__file__).parent.parent settings_dir = extension_dir / "settings" - + # Create settings directory if it doesn't exist os.makedirs(settings_dir, exist_ok=True) - + return settings_dir / "comfystream_settings.json" + def load_settings(): """Load settings from file""" settings_file = get_settings_file_path() - + with settings_lock: try: if settings_file.exists(): - with open(settings_file, 'r') as f: + with open(settings_file, "r") as f: settings = json.load(f) - + # Ensure all default keys exist for key, value in DEFAULT_SETTINGS.items(): if key not in settings: settings[key] = value - + return settings else: return DEFAULT_SETTINGS.copy() @@ -55,53 +55,57 @@ def load_settings(): logging.error(f"Error loading settings: {str(e)}") return DEFAULT_SETTINGS.copy() + def save_settings(settings): """Save settings to file""" settings_file = get_settings_file_path() - + with settings_lock: try: - with open(settings_file, 'w') as f: + with open(settings_file, "w") as f: json.dump(settings, f, indent=2) return True except Exception as e: logging.error(f"Error saving settings: {str(e)}") return False + def update_settings(new_settings): """Update settings with new values""" current_settings = load_settings() - + # Update only the keys that are provided for key, value in new_settings.items(): current_settings[key] = value - + return save_settings(current_settings) + def add_configuration(name, host, port): """Add a new configuration""" settings = load_settings() - + # Create the new configuration config = {"name": name, "host": host, "port": port} - + # Add to configurations list settings["configurations"].append(config) - + # Save updated settings return save_settings(settings) + def remove_configuration(index): """Remove a configuration by index""" settings = load_settings() - + if index < 0 or index >= len(settings["configurations"]): logging.error(f"Invalid configuration index: {index}") return False - + # Remove the configuration settings["configurations"].pop(index) - + # Update selectedConfigIndex if needed if settings["selectedConfigIndex"] == index: # The selected config was deleted @@ -109,25 +113,26 @@ def remove_configuration(index): elif settings["selectedConfigIndex"] > index: # The selected config is after the deleted one, adjust index settings["selectedConfigIndex"] -= 1 - + # Save updated settings return save_settings(settings) + def select_configuration(index): """Select a configuration by index""" settings = load_settings() - + if index == -1 or (index >= 0 and index < len(settings["configurations"])): settings["selectedConfigIndex"] = index - + # If a valid configuration is selected, update host and port if index >= 0: config = settings["configurations"][index] settings["host"] = config["host"] settings["port"] = config["port"] - + # Save updated settings return save_settings(settings) else: logging.error(f"Invalid configuration index: {index}") - return False \ No newline at end of file + return False diff --git a/nodes/tensor_utils/load_tensor.py b/nodes/tensor_utils/load_tensor.py index c39fe8a1d..a2fb59408 100644 --- a/nodes/tensor_utils/load_tensor.py +++ b/nodes/tensor_utils/load_tensor.py @@ -1,20 +1,40 @@ +import queue + from comfystream import tensor_cache +from comfystream.exceptions import ComfyStreamInputTimeoutError class LoadTensor: - CATEGORY = "tensor_utils" + CATEGORY = "ComfyStream/Loaders" RETURN_TYPES = ("IMAGE",) FUNCTION = "execute" + DESCRIPTION = "Load image tensor from ComfyStream input with timeout." @classmethod - def INPUT_TYPES(s): - return {} + def INPUT_TYPES(cls): + return { + "optional": { + "timeout_seconds": ( + "FLOAT", + { + "default": 1.0, + "min": 0.1, + "max": 30.0, + "step": 0.1, + "tooltip": "Timeout in seconds", + }, + ), + } + } @classmethod - def IS_CHANGED(): + def IS_CHANGED(cls, **kwargs): return float("nan") - def execute(self): - frame = tensor_cache.image_inputs.get(block=True) - frame.side_data.skipped = False - return (frame.side_data.input,) + def execute(self, timeout_seconds: float = 1.0): + try: + frame = tensor_cache.image_inputs.get(block=True, timeout=timeout_seconds) + frame.side_data.skipped = False + return (frame.side_data.input,) + except queue.Empty: + raise ComfyStreamInputTimeoutError("video", timeout_seconds) diff --git a/nodes/tensor_utils/save_text_tensor.py b/nodes/tensor_utils/save_text_tensor.py index 098887e07..defb3aaa8 100644 --- a/nodes/tensor_utils/save_text_tensor.py +++ b/nodes/tensor_utils/save_text_tensor.py @@ -1,5 +1,6 @@ from comfystream import tensor_cache + class SaveTextTensor: CATEGORY = "text_utils" RETURN_TYPES = () @@ -13,8 +14,11 @@ def INPUT_TYPES(s): "data": ("STRING",), # Accept text string as input. }, "optional": { - "remove_linebreaks": ("BOOLEAN", {"default": True}), # Remove whitespace and line breaks - } + "remove_linebreaks": ( + "BOOLEAN", + {"default": True}, + ), # Remove whitespace and line breaks + }, } @classmethod @@ -23,7 +27,7 @@ def IS_CHANGED(s, **kwargs): def execute(self, data, remove_linebreaks=True): if remove_linebreaks: - result_text = data.replace('\n', '').replace('\r', '') + result_text = data.replace("\n", "").replace("\r", "") else: result_text = data tensor_cache.text_outputs.put_nowait(result_text) diff --git a/nodes/video_stream_utils/__init__.py b/nodes/video_stream_utils/__init__.py index 787f84ce9..f48631984 100644 --- a/nodes/video_stream_utils/__init__.py +++ b/nodes/video_stream_utils/__init__.py @@ -1,6 +1,6 @@ """Video stream utility nodes for ComfyStream""" -from .primary_input_load_image import PrimaryInputLoadImage +from .primary_input_load_image import PrimaryInputLoadImage NODE_CLASS_MAPPINGS = {"PrimaryInputLoadImage": PrimaryInputLoadImage} NODE_DISPLAY_NAME_MAPPINGS = {} diff --git a/nodes/web/__init__.py b/nodes/web/__init__.py index ef41b57aa..caa6b49a5 100644 --- a/nodes/web/__init__.py +++ b/nodes/web/__init__.py @@ -1,7 +1,10 @@ """ComfyStream Web UI nodes""" + import os + import folder_paths + # Define a simple Python class for the UI Preview node class ComfyStreamUIPreview: """ @@ -9,29 +12,24 @@ class ComfyStreamUIPreview: It's needed for ComfyUI to properly register and execute the node. The actual implementation is in the JavaScript file. """ + @classmethod def INPUT_TYPES(cls): - return { - "required": {}, - "optional": {} - } - + return {"required": {}, "optional": {}} + RETURN_TYPES = () - + FUNCTION = "execute" CATEGORY = "ComfyStream" - + def execute(self): # This function doesn't do anything as the real work is done in JavaScript # But we need to return something to satisfy the ComfyUI node execution system return ("UI Preview Node Executed",) + # Register the node class -NODE_CLASS_MAPPINGS = { - "ComfyStreamUIPreview": ComfyStreamUIPreview -} +NODE_CLASS_MAPPINGS = {"ComfyStreamUIPreview": ComfyStreamUIPreview} # Display names for the nodes -NODE_DISPLAY_NAME_MAPPINGS = { - "ComfyStreamUIPreview": "ComfyStream UI Preview" -} \ No newline at end of file +NODE_DISPLAY_NAME_MAPPINGS = {"ComfyStreamUIPreview": "ComfyStream UI Preview"} diff --git a/pyproject.toml b/pyproject.toml index 50e59935d..e22a59559 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,16 +1,16 @@ [build-system] -requires = ["setuptools>=64.0.0", "wheel"] +requires = ["setuptools>=64.0.0,<81", "wheel"] build-backend = "setuptools.build_meta" [project] name = "comfystream" description = "Build Live AI Video with ComfyUI" -version = "0.1.5" +version = "0.1.7" license = { file = "LICENSE" } dependencies = [ "asyncio", - "pytrickle @ git+https://github.com/livepeer/pytrickle.git@de37bea74679fa5db46b656a83c9b7240fc597b6", - "comfyui @ git+https://github.com/hiddenswitch/ComfyUI.git@58622c7e91cb5cc2bca985d713db55e5681ff316", + "pytrickle @ git+https://github.com/livepeer/pytrickle.git@v0.1.5", + "comfyui @ git+https://github.com/hiddenswitch/ComfyUI.git@e62df3a8811d8c652a195d4669f4fb27f6c9a9ba", "aiortc", "aiohttp", "aiohttp_cors", @@ -21,7 +21,7 @@ dependencies = [ ] [project.optional-dependencies] -dev = ["pytest", "pytest-cov"] +dev = ["pytest", "pytest-cov", "ruff"] [project.urls] Repository = "https://github.com/yondonfu/comfystream" @@ -37,3 +37,23 @@ packages = {find = {where = ["src", "nodes"]}} [tool.setuptools.dynamic] dependencies = {file = ["requirements.txt"]} + +[tool.ruff] +line-length = 100 +target-version = "py312" + +[tool.ruff.lint] +select = ["E", "F", "I"] +ignore = [ + "E501", # let the formatter handle long lines + "E402", # module level import not at top (required for ComfyUI) + "E722", # bare except (existing code patterns) + "F401", # imported but unused (many re-exports) + "F403", # star imports (ComfyUI pattern) + "F405", # may be undefined from star imports + "F841", # assigned but never used (temporary) +] + +[tool.ruff.format] +quote-style = "double" +indent-style = "space" diff --git a/requirements.txt b/requirements.txt index 790900bb1..da59730dc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ asyncio -pytrickle @ git+https://github.com/livepeer/pytrickle.git@de37bea74679fa5db46b656a83c9b7240fc597b6 -comfyui @ git+https://github.com/hiddenswitch/ComfyUI.git@58622c7e91cb5cc2bca985d713db55e5681ff316 +pytrickle @ git+https://github.com/livepeer/pytrickle.git@v0.1.5 +comfyui @ git+https://github.com/hiddenswitch/ComfyUI.git@e62df3a8811d8c652a195d4669f4fb27f6c9a9ba aiortc aiohttp aiohttp_cors diff --git a/scripts/monitor_pid_resources.py b/scripts/monitor_pid_resources.py index a7de85636..25ffc97a0 100644 --- a/scripts/monitor_pid_resources.py +++ b/scripts/monitor_pid_resources.py @@ -1,16 +1,17 @@ """A Python script to monitor system resources for a given PID and optionally create a py-spy profiler report.""" -import psutil -import pynvml -import time +import csv import subprocess -import click import threading -import csv +import time from pathlib import Path from typing import List +import click +import psutil +import pynvml + def is_running_inside_container(): """Detects if the script is running inside a container.""" @@ -131,30 +132,20 @@ def find_pid_by_name(name: str) -> int: for proc in psutil.process_iter(["pid", "name", "cmdline"]): if proc.info["cmdline"] and name in proc.info["cmdline"]: found_pid = proc.info["pid"] - click.echo( - click.style(f"Found process '{name}' with PID {found_pid}.", fg="green") - ) + click.echo(click.style(f"Found process '{name}' with PID {found_pid}.", fg="green")) return found_pid click.echo(click.style(f"Error: Process with name '{name}' not found.", fg="red")) return None @click.command() -@click.option( - "--pid", type=str, default="auto", help='Process ID or "auto" to find by name' -) -@click.option( - "--name", type=str, default="app.py", help="Process name (default: app.py)" -) +@click.option("--pid", type=str, default="auto", help='Process ID or "auto" to find by name') +@click.option("--name", type=str, default="app.py", help="Process name (default: app.py)") @click.option("--interval", type=int, default=2, help="Monitoring interval (seconds)") -@click.option( - "--duration", type=int, default=30, help="Total monitoring duration (seconds)" -) +@click.option("--duration", type=int, default=30, help="Total monitoring duration (seconds)") @click.option("--output", type=str, default=None, help="File to save logs (optional)") @click.option("--spy", is_flag=True, help="Enable py-spy profiling") -@click.option( - "--spy-output", type=str, default="pyspy_profile.svg", help="Py-Spy output file" -) +@click.option("--spy-output", type=str, default="pyspy_profile.svg", help="Py-Spy output file") @click.option( "--host-pid", type=int, @@ -213,21 +204,15 @@ def monitor_resources( click.echo(click.style(f"Error: Process with PID {pid} not found.", fg="red")) return - click.echo( - click.style(f"Monitoring PID {pid} for {duration} seconds...", fg="green") - ) + click.echo(click.style(f"Monitoring PID {pid} for {duration} seconds...", fg="green")) def run_py_spy(): """Run py-spy profiler for deep profiling.""" click.echo(click.style("Running py-spy for deep profiling...", fg="green")) spy_cmd = f"py-spy record -o {spy_output} --pid {pid} --duration {duration}" try: - subprocess.run( - spy_cmd, shell=True, check=True, capture_output=True, text=True - ) - click.echo( - click.style(f"Py-Spy flame graph saved to {spy_output}", fg="green") - ) + subprocess.run(spy_cmd, shell=True, check=True, capture_output=True, text=True) + click.echo(click.style(f"Py-Spy flame graph saved to {spy_output}", fg="green")) except subprocess.CalledProcessError as e: click.echo(click.style(f"Error running py-spy: {e.stderr}", fg="red")) diff --git a/scripts/requirements.txt b/scripts/requirements.txt index 6ea80ee2e..eb7cee7f7 100644 --- a/scripts/requirements.txt +++ b/scripts/requirements.txt @@ -8,4 +8,4 @@ bcrypt rich # Profiler psutil -pynvml +nvidia-ml-py diff --git a/server/app.py b/server/app.py index a3a42fc44..b93e35ee4 100644 --- a/server/app.py +++ b/server/app.py @@ -4,17 +4,13 @@ import logging import os import sys -import time -import secrets + import torch # Initialize CUDA before any other imports to prevent core dump. if torch.cuda.is_available(): torch.cuda.init() - -from aiohttp import web, MultipartWriter -from aiohttp_cors import setup as setup_cors, ResourceOptions from aiohttp import web from aiortc import ( MediaStreamTrack, @@ -23,15 +19,16 @@ RTCPeerConnection, RTCSessionDescription, ) + # Import HTTP streaming modules -from http_streaming.routes import setup_routes from aiortc.codecs import h264 from aiortc.rtcrtpsender import RTCRtpSender -from comfystream.pipeline import Pipeline from twilio.rest import Client -from comfystream.server.utils import patch_loop_datagram, add_prefix_to_app_routes, FPSMeter + +from comfystream.exceptions import ComfyStreamTimeoutFilter +from comfystream.pipeline import Pipeline from comfystream.server.metrics import MetricsManager, StreamStatsManager -import time +from comfystream.server.utils import FPSMeter, add_prefix_to_app_routes, patch_loop_datagram logger = logging.getLogger(__name__) logging.getLogger("aiortc.rtcrtpsender").setLevel(logging.WARNING) @@ -64,12 +61,10 @@ def __init__(self, track: MediaStreamTrack, pipeline: Pipeline): super().__init__() self.track = track self.pipeline = pipeline - self.fps_meter = FPSMeter( - metrics_manager=app["metrics_manager"], track_id=track.id - ) + self.fps_meter = FPSMeter(metrics_manager=app["metrics_manager"], track_id=track.id) self.running = True self.collect_task = asyncio.create_task(self.collect_frames()) - + # Add cleanup when track ends @track.on("ended") async def on_ended(): @@ -95,15 +90,13 @@ async def collect_frames(self): logger.error(f"Error collecting video frames: {str(e)}") self.running = False break - + # Perform cleanup outside the exception handler logger.info("Video frame collection stopped") except asyncio.CancelledError: logger.info("Frame collection task cancelled") except Exception as e: logger.error(f"Unexpected error in frame collection: {str(e)}") - finally: - await self.pipeline.cleanup() async def recv(self): """Receive a processed video frame from the pipeline, increment the frame @@ -111,15 +104,6 @@ async def recv(self): """ processed_frame = await self.pipeline.get_processed_video_frame() - # Update the frame buffer with the processed frame - try: - from frame_buffer import FrameBuffer - frame_buffer = FrameBuffer.get_instance() - frame_buffer.update_frame(processed_frame) - except Exception as e: - # Don't let frame buffer errors affect the main pipeline - print(f"Error updating frame buffer: {e}") - # Increment the frame count to calculate FPS. await self.fps_meter.increment_frame_count() @@ -128,6 +112,7 @@ async def recv(self): class NoopVideoStreamTrack(MediaStreamTrack): """Simple passthrough video track that bypasses pipeline processing.""" + kind = "video" def __init__(self, track: MediaStreamTrack): @@ -147,6 +132,7 @@ async def recv(self): class NoopAudioStreamTrack(MediaStreamTrack): """Simple passthrough audio track that bypasses pipeline processing.""" + kind = "audio" def __init__(self, track: MediaStreamTrack): @@ -174,7 +160,7 @@ def __init__(self, track: MediaStreamTrack, pipeline): self.running = True logger.info(f"AudioStreamTrack created for track {track.id}") self.collect_task = asyncio.create_task(self.collect_frames()) - + # Add cleanup when track ends @track.on("ended") async def on_ended(): @@ -200,19 +186,18 @@ async def collect_frames(self): logger.error(f"Error collecting audio frames: {str(e)}") self.running = False break - + # Perform cleanup outside the exception handler logger.info("Audio frame collection stopped") except asyncio.CancelledError: logger.info("Frame collection task cancelled") except Exception as e: logger.error(f"Unexpected error in audio frame collection: {str(e)}") - finally: - await self.pipeline.cleanup() async def recv(self): return await self.pipeline.get_processed_audio_frame() + def force_codec(pc, sender, forced_codec): kind = forced_codec.split("/")[0] codecs = RTCRtpSender.getCapabilities(kind).codecs @@ -258,39 +243,42 @@ async def offer(request): pcs = request.app["pcs"] params = await request.json() - + # Check if this is noop mode (no prompts provided) prompts = params.get("prompts") is_noop_mode = not prompts - - if is_noop_mode: - logger.info("[Offer] No prompts provided - entering noop passthrough mode") - else: - await pipeline.set_prompts(prompts) - logger.info("[Offer] Set workflow prompts") - - # Set resolution if provided in the offer + resolution = params.get("resolution") if resolution: pipeline.width = resolution["width"] pipeline.height = resolution["height"] - logger.info(f"[Offer] Set pipeline resolution to {resolution['width']}x{resolution['height']}") + logger.info( + f"[Offer] Set pipeline resolution to {resolution['width']}x{resolution['height']}" + ) + + if is_noop_mode: + logger.info("[Offer] No prompts provided - entering noop passthrough mode") + else: + await pipeline.apply_prompts( + prompts, + skip_warmup=False, + ) + await pipeline.start_streaming() + logger.info("[Offer] Set workflow prompts, warmed pipeline, and started execution") offer_params = params["offer"] offer = RTCSessionDescription(sdp=offer_params["sdp"], type=offer_params["type"]) - + ice_servers = get_ice_servers() if len(ice_servers) > 0: - pc = RTCPeerConnection( - configuration=RTCConfiguration(iceServers=get_ice_servers()) - ) + pc = RTCPeerConnection(configuration=RTCConfiguration(iceServers=get_ice_servers())) else: pc = RTCPeerConnection() pcs.add(pc) tracks = {"video": None, "audio": None} - + # Flag to track if we've received resolution update resolution_received = {"value": False} @@ -323,56 +311,140 @@ def on_datachannel(channel): @channel.on("message") async def on_message(message): - try: - params = json.loads(message) + def send_json(payload): + channel.send(json.dumps(payload)) + + def send_success_response(response_type, **extra): + payload = {"type": response_type, "success": True} + payload.update(extra) + send_json(payload) + + def send_error_response(response_type, error_message, **extra): + payload = { + "type": response_type, + "success": False, + "error": error_message, + } + payload.update(extra) + send_json(payload) + + async def handle_get_nodes(_params): + nodes_info = await pipeline.get_nodes_info() + send_json({"type": "nodes_info", "nodes": nodes_info}) + + async def handle_update_prompts(_params): + if "prompts" not in _params: + logger.warning("[Control] Missing prompt in update_prompt message") + send_error_response( + "prompts_updated", "Missing 'prompts' in control message" + ) + return + try: + await pipeline.update_prompts(_params["prompts"]) + except Exception as e: + logger.error(f"Error updating prompt: {str(e)}") + send_error_response("prompts_updated", str(e)) + return + send_success_response("prompts_updated") + + async def handle_update_resolution(_params): + width = _params.get("width") + height = _params.get("height") + if width is None or height is None: + logger.warning( + "[Control] Missing width or height in update_resolution message" + ) + send_error_response( + "resolution_updated", + "Missing 'width' or 'height' in control message", + ) + return + + if is_noop_mode: + logger.info( + f"[Control] Noop mode - resolution update to {width}x{height} (no pipeline involved)" + ) + else: + # Update pipeline resolution for future frames + pipeline.width = width + pipeline.height = height + logger.info(f"[Control] Updated resolution to {width}x{height}") + + # Mark that we've received resolution + resolution_received["value"] = True - if params.get("type") == "get_nodes": - nodes_info = await pipeline.get_nodes_info() - response = {"type": "nodes_info", "nodes": nodes_info} - channel.send(json.dumps(response)) - elif params.get("type") == "update_prompts": - if "prompts" not in params: - logger.warning( - "[Control] Missing prompt in update_prompt message" - ) + if is_noop_mode: + logger.info("[Control] Noop mode - no warmup needed") + else: + # Note: Video warmup now happens during offer, not here + logger.info( + "[Control] Resolution updated - warmup was already performed during offer" + ) + + send_success_response("resolution_updated") + + async def handle_pause_prompts(_params): + if is_noop_mode: + logger.info("[Control] Noop mode - no prompts to pause") + else: + try: + await pipeline.pause_prompts() + logger.info("[Control] Paused prompt execution") + except Exception as e: + logger.error(f"[Control] Error pausing prompts: {str(e)}") + send_error_response("prompts_paused", str(e)) return + send_success_response("prompts_paused") + + async def handle_resume_prompts(_params): + if is_noop_mode: + logger.info("[Control] Noop mode - no prompts to resume") + else: try: - await pipeline.update_prompts(params["prompts"]) + await pipeline.start_streaming() + logger.info("[Control] Resumed prompt execution") except Exception as e: - logger.error(f"Error updating prompt: {str(e)}") - response = {"type": "prompts_updated", "success": True} - channel.send(json.dumps(response)) - elif params.get("type") == "update_resolution": - if "width" not in params or "height" not in params: - logger.warning("[Control] Missing width or height in update_resolution message") + logger.error(f"[Control] Error resuming prompts: {str(e)}") + send_error_response("prompts_resumed", str(e)) return - - if is_noop_mode: - logger.info(f"[Control] Noop mode - resolution update to {params['width']}x{params['height']} (no pipeline involved)") - else: - # Update pipeline resolution for future frames - pipeline.width = params["width"] - pipeline.height = params["height"] - logger.info(f"[Control] Updated resolution to {params['width']}x{params['height']}") - - # Mark that we've received resolution - resolution_received["value"] = True - - if is_noop_mode: - logger.info("[Control] Noop mode - no warmup needed") - else: - # Note: Video warmup now happens during offer, not here - logger.info("[Control] Resolution updated - warmup was already performed during offer") - - response = { - "type": "resolution_updated", - "success": True - } - channel.send(json.dumps(response)) + send_success_response("prompts_resumed") + + async def handle_stop_prompts(_params): + if is_noop_mode: + logger.info("[Control] Noop mode - no prompts to stop") else: - logger.warning( - "[Server] Invalid message format - missing required fields" - ) + try: + await pipeline.stop_prompts(cleanup=False) + logger.info("[Control] Stopped prompt execution") + except Exception as e: + logger.error(f"[Control] Error stopping prompts: {str(e)}") + send_error_response("prompts_stopped", str(e)) + return + send_success_response("prompts_stopped") + + handlers = { + "get_nodes": handle_get_nodes, + "update_prompts": handle_update_prompts, + "update_resolution": handle_update_resolution, + "pause_prompts": handle_pause_prompts, + "resume_prompts": handle_resume_prompts, + "stop_prompts": handle_stop_prompts, + } + + try: + params = json.loads(message) + message_type = params.get("type") + + if not message_type: + logger.warning("[Server] Control message missing 'type'") + return + + handler = handlers.get(message_type) + if handler is None: + logger.warning(f"[Server] Unsupported control message: {message_type}") + return + + await handler(params) except json.JSONDecodeError: logger.error("[Server] Invalid JSON received") except Exception as e: @@ -381,12 +453,16 @@ async def on_message(message): elif channel.label == "data": if is_noop_mode: logger.debug("[TextChannel] Noop mode - skipping text output forwarding") + # In noop mode, just acknowledge the data channel but don't forward anything @channel.on("open") def on_data_channel_open(): - logger.debug("[TextChannel] Data channel opened in noop mode (no text forwarding)") + logger.debug( + "[TextChannel] Data channel opened in noop mode (no text forwarding)" + ) else: if pipeline.produces_text_output(): + async def forward_text(): try: while channel.readyState == "open": @@ -401,7 +477,9 @@ async def forward_text(): try: channel.send(json.dumps({"type": "text", "data": text})) except Exception as e: - logger.debug(f"[TextChannel] Send failed, stopping forwarder: {e}") + logger.debug( + f"[TextChannel] Send failed, stopping forwarder: {e}" + ) break except asyncio.CancelledError: logger.debug("[TextChannel] Forward text task cancelled") @@ -420,6 +498,7 @@ def _remove_forward_task(t): tasks = request.app.get("data_channel_tasks") if tasks is not None: tasks.discard(t) + forward_task.add_done_callback(_remove_forward_task) # Ensure cancellation on channel close event @@ -431,20 +510,22 @@ def on_data_channel_close(): if not t.done(): t.cancel() else: - logger.debug("[TextChannel] Workflow has no text outputs; not starting forward_text") + logger.debug( + "[TextChannel] Workflow has no text outputs; not starting forward_text" + ) @pc.on("track") def on_track(track): logger.info(f"Track received: {track.kind} (readyState: {track.readyState})") - + # Check if we already have a track of this type to avoid duplicate track errors if track.kind == "video" and tracks["video"] is not None: - logger.debug(f"Video track already exists, ignoring duplicate track event") + logger.debug("Video track already exists, ignoring duplicate track event") return elif track.kind == "audio" and tracks["audio"] is not None: - logger.debug(f"Audio track already exists, ignoring duplicate track event") + logger.debug("Audio track already exists, ignoring duplicate track event") return - + if track.kind == "video": if is_noop_mode: # Use simple passthrough track that bypasses pipeline @@ -454,7 +535,7 @@ def on_track(track): # Always use pipeline processing - it handles passthrough internally based on workflow videoTrack = VideoStreamTrack(track, pipeline) logger.info("[Pipeline] Using video processing pipeline") - + tracks["video"] = videoTrack sender = pc.addTrack(videoTrack) @@ -465,11 +546,10 @@ def on_track(track): codec = "video/H264" force_codec(pc, sender, codec) - - + elif track.kind == "audio": logger.info(f"Creating audio track for track {track.id}") - + if is_noop_mode: # Use simple passthrough track that bypasses pipeline audioTrack = NoopAudioStreamTrack(track) @@ -478,10 +558,10 @@ def on_track(track): # Always use pipeline processing - it handles passthrough internally based on workflow audioTrack = AudioStreamTrack(track, pipeline) logger.info("[Pipeline] Using audio processing pipeline") - + tracks["audio"] = audioTrack sender = pc.addTrack(audioTrack) - logger.debug(f"Audio track added to peer connection") + logger.debug("Audio track added to peer connection") @track.on("ended") async def on_ended(): @@ -500,6 +580,13 @@ async def on_connectionstatechange(): if not task.done(): task.cancel() request.app["data_channel_tasks"].clear() + # Cleanup pipeline once per connection (not per track) + if not is_noop_mode: + try: + await pipeline.stop_prompts(cleanup=True) + logger.info("Pipeline cleanup completed for failed connection") + except Exception as e: + logger.error(f"Error during pipeline cleanup on connection failure: {e}") elif pc.connectionState == "closed": await pc.close() pcs.discard(pc) @@ -509,6 +596,13 @@ async def on_connectionstatechange(): if not task.done(): task.cancel() request.app["data_channel_tasks"].clear() + # Cleanup pipeline once per connection (not per track) + if not is_noop_mode: + try: + await pipeline.stop_prompts(cleanup=True) + logger.info("Pipeline cleanup completed for closed connection") + except Exception as e: + logger.error(f"Error during pipeline cleanup on connection close: {e}") await pc.setRemoteDescription(offer) @@ -516,18 +610,12 @@ async def on_connectionstatechange(): transceivers = pc.getTransceivers() logger.debug(f"[Offer] After negotiation - Total transceivers: {len(transceivers)}") for i, t in enumerate(transceivers): - logger.debug(f"[Offer] Transceiver {i}: {t.kind} - direction: {t.direction} - currentDirection: {t.currentDirection}") + logger.debug( + f"[Offer] Transceiver {i}: {t.kind} - direction: {t.direction} - currentDirection: {t.currentDirection}" + ) # Warm up the pipeline based on detected modalities and SDP content (skip in noop mode) - if not is_noop_mode: - if "m=video" in pc.remoteDescription.sdp and pipeline.accepts_video_input(): - logger.info("[Offer] Warming up video pipeline") - await pipeline.warm_video() - - if "m=audio" in pc.remoteDescription.sdp and pipeline.accepts_audio_input(): - logger.info("[Offer] Warming up audio pipeline") - await pipeline.warm_audio() - else: + if is_noop_mode: logger.debug("[Offer] Skipping pipeline warmup in noop mode") answer = await pc.createAnswer() @@ -535,28 +623,29 @@ async def on_connectionstatechange(): return web.Response( content_type="application/json", - text=json.dumps( - {"sdp": pc.localDescription.sdp, "type": pc.localDescription.type} - ), + text=json.dumps({"sdp": pc.localDescription.sdp, "type": pc.localDescription.type}), ) + async def cancel_collect_frames(track): track.running = False - if hasattr(track, 'collect_task') is not None and not track.collect_task.done(): + if track.collect_task and not track.collect_task.done(): try: track.collect_task.cancel() await track.collect_task - except (asyncio.CancelledError): + except asyncio.CancelledError: pass + async def set_prompt(request): pipeline = request.app["pipeline"] prompt = await request.json() - await pipeline.set_prompts(prompt) + await pipeline.apply_prompts(prompt) return web.Response(content_type="application/json", text="OK") + def health(_): return web.Response(content_type="application/json", text="OK") @@ -568,12 +657,14 @@ async def on_startup(app: web.Application): app["pipeline"] = Pipeline( width=512, height=512, - cwd=app["workspace"], - disable_cuda_malloc=True, - gpu_only=True, - preview_method='none', - comfyui_inference_log_level=app.get("comfui_inference_log_level", None), + cwd=app["workspace"], + disable_cuda_malloc=True, + gpu_only=True, + preview_method="none", + comfyui_inference_log_level=app.get("comfyui_inference_log_level", None), + blacklist_custom_nodes=["ComfyUI-Manager"], ) + await app["pipeline"].initialize() app["pcs"] = set() app["video_tracks"] = {} @@ -588,13 +679,9 @@ async def on_shutdown(app: web.Application): if __name__ == "__main__": parser = argparse.ArgumentParser(description="Run comfystream server") parser.add_argument("--port", default=8889, help="Set the signaling port") - parser.add_argument( - "--media-ports", default=None, help="Set the UDP ports for WebRTC media" - ) + parser.add_argument("--media-ports", default=None, help="Set the UDP ports for WebRTC media") parser.add_argument("--host", default="127.0.0.1", help="Set the host") - parser.add_argument( - "--workspace", default=None, required=True, help="Set Comfy workspace" - ) + parser.add_argument("--workspace", default=None, required=True, help="Set Comfy workspace") parser.add_argument( "--log-level", default="INFO", @@ -636,16 +723,6 @@ async def on_shutdown(app: web.Application): app = web.Application() app["media_ports"] = args.media_ports.split(",") if args.media_ports else None app["workspace"] = args.workspace - - # Setup CORS - cors = setup_cors(app, defaults={ - "*": ResourceOptions( - allow_credentials=True, - expose_headers="*", - allow_headers="*", - allow_methods=["GET", "POST", "OPTIONS"] - ) - }) app.on_startup.append(on_startup) app.on_shutdown.append(on_shutdown) @@ -656,18 +733,10 @@ async def on_shutdown(app: web.Application): # WebRTC signalling and control routes. app.router.add_post("/offer", offer) app.router.add_post("/prompt", set_prompt) - - # Setup HTTP streaming routes - setup_routes(app, cors) - - # Serve static files from the public directory - app.router.add_static("/", path=os.path.join(os.path.dirname(__file__), "public"), name="static") # Add routes for getting stream statistics. stream_stats_manager = StreamStatsManager(app) - app.router.add_get( - "/streams/stats", stream_stats_manager.collect_all_stream_metrics - ) + app.router.add_get("/streams/stats", stream_stats_manager.collect_all_stream_metrics) app.router.add_get( "/stream/{stream_id}/stats", stream_stats_manager.collect_stream_metrics_by_id ) @@ -694,7 +763,12 @@ def force_print(*args, **kwargs): if args.comfyui_log_level: log_level = logging._nameToLevel.get(args.comfyui_log_level.upper()) logging.getLogger("comfy").setLevel(log_level) + + # Add ComfyStream timeout filter to suppress verbose execution logging + timeout_filter = ComfyStreamTimeoutFilter() + logging.getLogger("comfy.cmd.execution").addFilter(timeout_filter) + logging.getLogger("comfystream").addFilter(timeout_filter) if args.comfyui_inference_log_level: - app["comfui_inference_log_level"] = args.comfyui_inference_log_level + app["comfyui_inference_log_level"] = args.comfyui_inference_log_level web.run_app(app, host=args.host, port=int(args.port), print=force_print) diff --git a/server/byoc.py b/server/byoc.py index 0735674b4..3f8f3470c 100644 --- a/server/byoc.py +++ b/server/byoc.py @@ -5,54 +5,35 @@ import sys import torch + # Initialize CUDA before any other imports to prevent core dump. if torch.cuda.is_available(): torch.cuda.init() from aiohttp import web +from frame_processor import ComfyStreamFrameProcessor +from pytrickle.frame_overlay import OverlayConfig, OverlayMode +from pytrickle.frame_skipper import FrameSkipConfig from pytrickle.stream_processor import StreamProcessor from pytrickle.utils.register import RegisterCapability -from pytrickle.frame_skipper import FrameSkipConfig -from frame_processor import ComfyStreamFrameProcessor -logger = logging.getLogger(__name__) +from comfystream.exceptions import ComfyStreamTimeoutFilter +logger = logging.getLogger(__name__) -async def register_orchestrator(orch_url=None, orch_secret=None, capability_name=None, host="127.0.0.1", port=8889): - """Register capability with orchestrator if configured.""" - try: - orch_url = orch_url or os.getenv("ORCH_URL") - orch_secret = orch_secret or os.getenv("ORCH_SECRET") - - if orch_url and orch_secret: - os.environ.update({ - "CAPABILITY_NAME": capability_name or os.getenv("CAPABILITY_NAME") or "comfystream-processor", - "CAPABILITY_DESCRIPTION": "ComfyUI streaming processor", - "CAPABILITY_URL": f"http://{host}:{port}", - "CAPABILITY_CAPACITY": "1", - "ORCH_URL": orch_url, - "ORCH_SECRET": orch_secret - }) - - # Pass through explicit capability_name to ensure CLI/env override takes effect - result = await RegisterCapability.register( - logger=logger, - capability_name=capability_name - ) - if result: - logger.info(f"Registered capability: {result.geturl()}") - except Exception as e: - logger.error(f"Orchestrator registration failed: {e}") +DEFAULT_WITHHELD_TIMEOUT_SECONDS = 0.5 def main(): parser = argparse.ArgumentParser( description="Run comfystream server in BYOC (Bring Your Own Compute) mode using pytrickle." ) - parser.add_argument("--port", default=8889, help="Set the server port") - parser.add_argument("--host", default="127.0.0.1", help="Set the host") + parser.add_argument("--port", default=8000, help="Set the server port") + parser.add_argument("--host", default="0.0.0.0", help="Set the host") parser.add_argument( - "--workspace", default=None, required=True, help="Set Comfy workspace" + "--workspace", + default=os.getcwd() + "/../ComfyUI", + help="Set Comfy workspace (Default: ../ComfyUI)", ) parser.add_argument( "--log-level", @@ -72,21 +53,6 @@ def main(): choices=logging._nameToLevel.keys(), help="Set the logging level for ComfyUI inference", ) - parser.add_argument( - "--orch-url", - default=None, - help="Orchestrator URL for capability registration", - ) - parser.add_argument( - "--orch-secret", - default=None, - help="Orchestrator secret for capability registration", - ) - parser.add_argument( - "--capability-name", - default=None, - help="Name for this capability (default: comfystream-processor)", - ) parser.add_argument( "--disable-frame-skip", default=False, @@ -112,18 +78,28 @@ def main(): format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S", ) + logging.getLogger("comfy.model_detection").setLevel(logging.WARNING) # Allow overriding of ComfyUI log levels. if args.comfyui_log_level: log_level = logging._nameToLevel.get(args.comfyui_log_level.upper()) logging.getLogger("comfy").setLevel(log_level) + # Add ComfyStream timeout filter to suppress verbose execution logging + timeout_filter = ComfyStreamTimeoutFilter() + logging.getLogger("comfy.cmd.execution").addFilter(timeout_filter) + logging.getLogger("comfystream").addFilter(timeout_filter) + def force_print(*args, **kwargs): print(*args, **kwargs, flush=True) sys.stdout.flush() logger.info("Starting ComfyStream BYOC server with pytrickle StreamProcessor...") - + logger.info( + "Send initial workflow parameters (width/height/prompts/warmup) via /stream/start " + "params; runtime updates now apply incremental changes only." + ) + # Create frame processor with configuration frame_processor = ComfyStreamFrameProcessor( width=args.width, @@ -131,10 +107,12 @@ def force_print(*args, **kwargs): workspace=args.workspace, disable_cuda_malloc=True, gpu_only=True, - preview_method='none', - comfyui_inference_log_level=args.comfyui_inference_log_level + preview_method="none", + blacklist_custom_nodes=["ComfyUI-Manager"], + logging_level=args.comfyui_log_level, + comfyui_inference_log_level=args.comfyui_inference_log_level, ) - + # Create frame skip configuration only if enabled frame_skip_config = None if args.disable_frame_skip: @@ -142,67 +120,73 @@ def force_print(*args, **kwargs): else: frame_skip_config = FrameSkipConfig() logger.info("Frame skipping enabled: adaptive skipping based on queue sizes") - + # Create StreamProcessor with frame processor processor = StreamProcessor( video_processor=frame_processor.process_video_async, audio_processor=frame_processor.process_audio_async, model_loader=frame_processor.load_model, param_updater=frame_processor.update_params, + on_stream_start=frame_processor.on_stream_start, on_stream_stop=frame_processor.on_stream_stop, # Align processor name with capability for consistent logs - name=(args.capability_name or os.getenv("CAPABILITY_NAME") or "comfystream-processor"), + name=(os.getenv("CAPABILITY_NAME") or "comfystream"), port=int(args.port), host=args.host, frame_skip_config=frame_skip_config, + overlay_config=OverlayConfig( + mode=OverlayMode.PROGRESSBAR, + message="Loading...", + enabled=True, + auto_timeout_seconds=DEFAULT_WITHHELD_TIMEOUT_SECONDS, + frame_count_to_disable=20, + ), # Ensure server metadata reflects the desired capability name - capability_name=(args.capability_name or os.getenv("CAPABILITY_NAME") or "comfystream-processor") + capability_name=(os.getenv("CAPABILITY_NAME") or "comfystream"), + # server_kwargs... + route_prefix="/", ) # Set the stream processor reference for text data publishing frame_processor.set_stream_processor(processor) - - # Create async startup function to load model - async def load_model_on_startup(app): - await processor._frame_processor.load_model() - + + logger.info("Startup warmup runs automatically as part of on_stream_start.") + # Create async startup function for orchestrator registration async def register_orchestrator_startup(app): - await register_orchestrator( - orch_url=args.orch_url, - orch_secret=args.orch_secret, - capability_name=args.capability_name, - host=args.host, - port=args.port - ) - - # Add model loading and registration to startup hooks - processor.server.app.on_startup.append(load_model_on_startup) - processor.server.app.on_startup.append(register_orchestrator_startup) - - # Add warmup endpoint: accepts same body as prompts update - async def warmup_handler(request): try: - body = await request.json() + orch_url = os.getenv("ORCH_URL") + + if orch_url and os.getenv("ORCH_SECRET", None): + # CAPABILITY_URL always overrides host:port from args + capability_url = os.getenv("CAPABILITY_URL") or f"http://{args.host}:{args.port}" + + os.environ.update( + { + "CAPABILITY_NAME": os.getenv("CAPABILITY_NAME") or "comfystream", + "CAPABILITY_DESCRIPTION": "ComfyUI streaming processor", + "CAPABILITY_URL": capability_url, + "CAPABILITY_CAPACITY": "1", + "ORCH_URL": orch_url, + "ORCH_SECRET": os.getenv("ORCH_SECRET", None), + } + ) + + result = await RegisterCapability.register( + logger=logger, capability_name=os.getenv("CAPABILITY_NAME") or "comfystream" + ) + if result: + logger.info(f"Registered capability: {result.geturl()}") + # Clear ORCH_SECRET from environment after use for security + os.environ.pop("ORCH_SECRET", None) except Exception as e: - logger.error(f"Invalid JSON in warmup request: {e}") - return web.json_response({"error": "Invalid JSON"}, status=400) - try: - # Inject sentinel to trigger warmup inside update_params on the model thread - if isinstance(body, dict): - body["warmup"] = True - else: - body = {"warmup": True} - # Fire-and-forget: do not await warmup; update_params will schedule it - asyncio.get_running_loop().create_task(frame_processor.update_params(body)) - return web.json_response({"status": "accepted"}) - except Exception as e: - logger.error(f"Warmup failed: {e}") - return web.json_response({"error": str(e)}, status=500) + logger.error(f"Orchestrator registration failed: {e}") + # Clear ORCH_SECRET from environment even on error + os.environ.pop("ORCH_SECRET", None) + + # Add registration to startup hooks + processor.server.app.on_startup.append(register_orchestrator_startup) - # Mount at same API namespace as StreamProcessor defaults - processor.server.add_route("POST", "/api/stream/warmup", warmup_handler) - # Run the processor processor.run() diff --git a/server/frame_buffer.py b/server/frame_buffer.py deleted file mode 100644 index 2a16407ae..000000000 --- a/server/frame_buffer.py +++ /dev/null @@ -1,42 +0,0 @@ -import threading -import time -import numpy as np -import cv2 -import av -from typing import Optional - -class FrameBuffer: - _instance = None - - @classmethod - def get_instance(cls): - if cls._instance is None: - cls._instance = FrameBuffer() - return cls._instance - - def __init__(self): - self.current_frame = None - self.frame_lock = threading.Lock() - self.last_update_time = 0 - self.quality = 70 # JPEG quality (0-100) - - def update_frame(self, frame): - """Update the current frame in the buffer""" - with self.frame_lock: - # Convert frame to numpy array if it's an av.VideoFrame - if isinstance(frame, av.VideoFrame): - frame_np = frame.to_ndarray(format="rgb24") - else: - frame_np = frame - - # Store the frame as a JPEG-encoded bytes object for efficient serving - _, jpeg_frame = cv2.imencode('.jpg', cv2.cvtColor(frame_np, cv2.COLOR_RGB2BGR), - [cv2.IMWRITE_JPEG_QUALITY, self.quality]) - - self.current_frame = jpeg_frame.tobytes() - self.last_update_time = time.time() - - def get_current_frame(self) -> Optional[bytes]: - """Get the current frame from the buffer""" - with self.frame_lock: - return self.current_frame diff --git a/server/frame_processor.py b/server/frame_processor.py index bac139d4a..befb5c608 100644 --- a/server/frame_processor.py +++ b/server/frame_processor.py @@ -2,13 +2,19 @@ import json import logging import os -from typing import List +from typing import Any, Dict, List, Optional, Union -import numpy as np from pytrickle.frame_processor import FrameProcessor -from pytrickle.frames import VideoFrame, AudioFrame +from pytrickle.frames import AudioFrame, VideoFrame +from pytrickle.stream_processor import VideoProcessingResult + from comfystream.pipeline import Pipeline -from comfystream.utils import convert_prompt, ComfyStreamParamsUpdateRequest +from comfystream.pipeline_state import PipelineState +from comfystream.utils import ( + ComfyStreamParamsUpdateRequest, + convert_prompt, + normalize_stream_params, +) logger = logging.getLogger(__name__) @@ -16,17 +22,19 @@ class ComfyStreamFrameProcessor(FrameProcessor): """ Integrated ComfyStream FrameProcessor for pytrickle. - + This class wraps the ComfyStream Pipeline to work with pytrickle's streaming architecture. """ def __init__(self, text_poll_interval: float = 0.25, **load_params): """Initialize with load parameters for pipeline creation. - + Args: text_poll_interval: Interval in seconds to poll for text outputs (default: 0.25) **load_params: Parameters for pipeline creation """ + super().__init__() + self.pipeline = None self._load_params = load_params self._text_poll_interval = text_poll_interval @@ -35,13 +43,64 @@ def __init__(self, text_poll_interval: float = 0.25, **load_params): self._text_forward_task = None self._background_tasks = [] self._stop_event = asyncio.Event() - super().__init__() + + async def _apply_stream_start_prompt(self, prompt_value: Any) -> bool: + if not self.pipeline: + logger.debug("Cannot apply stream start prompt without pipeline") + return False + + # Parse prompt payload from various formats + prompt_dict = None + if prompt_value is None: + pass + elif isinstance(prompt_value, dict): + prompt_dict = prompt_value + elif isinstance(prompt_value, list): + for candidate in prompt_value: + if isinstance(candidate, dict): + prompt_dict = candidate + break + elif isinstance(prompt_value, str): + prompt_str = prompt_value.strip() + if prompt_str: + try: + parsed = json.loads(prompt_str) + if isinstance(parsed, dict): + prompt_dict = parsed + else: + logger.warning("Parsed prompt payload is %s, expected dict", type(parsed)) + except json.JSONDecodeError: + logger.error("Stream start prompt is not valid JSON") + else: + logger.warning("Unsupported prompt payload type: %s", type(prompt_value)) + + if not isinstance(prompt_dict, dict): + logger.warning("Skipping prompt application due to invalid payload") + return False + + try: + await self._process_prompts(prompt_dict, skip_warmup=True) + return True + except Exception: + logger.exception("Failed to apply stream start prompt") + raise + + def _workflow_has_video(self) -> bool: + """Return True if current workflow is expected to produce video output.""" + if not self.pipeline: + return False + try: + capabilities = self.pipeline.get_workflow_io_capabilities() + return bool(capabilities.get("video", {}).get("output", False)) + except Exception: + logger.debug("Unable to determine workflow video capability", exc_info=True) + return False def set_stream_processor(self, stream_processor): """Set reference to StreamProcessor for data publishing.""" self._stream_processor = stream_processor logger.info("StreamProcessor reference set for text data publishing") - + def _setup_text_monitoring(self): """Set up background text forwarding from the pipeline.""" try: @@ -75,7 +134,9 @@ async def _forward_text_loop(): if self._stream_processor: success = await self._stream_processor.send_data(text) if not success: - logger.debug("Text send failed; stopping text forwarder") + logger.debug( + "Text send failed; stopping text forwarder" + ) break except asyncio.CancelledError: logger.debug("Text forwarder task cancelled") @@ -105,7 +166,7 @@ async def _stop_text_forwarder(self) -> None: except Exception: logger.debug("Error while awaiting text forwarder cancellation", exc_info=True) self._text_forward_task = None - + async def on_stream_stop(self): """Called when stream stops - cleanup background tasks.""" logger.info("Stream stopped, cleaning up background tasks") @@ -113,6 +174,14 @@ async def on_stream_stop(self): # Set stop event to signal all background tasks to stop self._stop_event.set() + # Stop the ComfyStream client's prompt execution + if self.pipeline: + logger.info("Stopping ComfyStream client prompt execution") + try: + await self.pipeline.stop_prompts(cleanup=True) + except Exception as e: + logger.error(f"Error stopping ComfyStream client: {e}") + # Stop text forwarder await self._stop_text_forwarder() @@ -136,50 +205,89 @@ async def on_stream_stop(self): self._background_tasks.clear() logger.info("All background tasks cleaned up") - + def _reset_stop_event(self): """Reset the stop event for a new stream.""" self._stop_event.clear() + async def on_stream_start(self, params: Optional[Dict[str, Any]] = None): + """Handle stream start lifecycle events.""" + logger.info("Stream starting") + self._reset_stop_event() + logger.info(f"Stream start params: {params}") + + if not self.pipeline: + logger.debug("Stream start requested before pipeline initialization") + return + + stream_params = normalize_stream_params(params) + prompt_payload = stream_params.pop("prompts", None) + if prompt_payload is None: + prompt_payload = stream_params.pop("prompt", None) + + if prompt_payload: + try: + await self._apply_stream_start_prompt(prompt_payload) + except Exception: + logger.exception("Failed to apply stream start prompt") + return + + if not self.pipeline.state_manager.is_initialized(): + logger.info("Pipeline not initialized; waiting for prompts before streaming") + return + + if stream_params: + try: + await self.update_params(stream_params) + except Exception: + logger.exception("Failed to process stream start parameters") + return + + try: + if ( + self.pipeline.state != PipelineState.STREAMING + and self.pipeline.state_manager.can_stream() + ): + await self.pipeline.start_streaming() + + if self.pipeline.produces_text_output(): + self._setup_text_monitoring() + else: + await self._stop_text_forwarder() + except Exception: + logger.exception("Failed during stream start", exc_info=True) + async def load_model(self, **kwargs): """Load model and initialize the pipeline.""" params = {**self._load_params, **kwargs} - + if self.pipeline is None: self.pipeline = Pipeline( - width=int(params.get('width', 512)), - height=int(params.get('height', 512)), - cwd=params.get('workspace', os.getcwd()), - disable_cuda_malloc=params.get('disable_cuda_malloc', True), - gpu_only=params.get('gpu_only', True), - preview_method=params.get('preview_method', 'none'), - comfyui_inference_log_level=params.get('comfyui_inference_log_level'), + width=int(params.get("width", 512)), + height=int(params.get("height", 512)), + cwd=params.get("workspace", os.getcwd()), + disable_cuda_malloc=params.get("disable_cuda_malloc", True), + gpu_only=params.get("gpu_only", True), + preview_method=params.get("preview_method", "none"), + comfyui_inference_log_level=params.get("comfyui_inference_log_level", "INFO"), + logging_level=params.get("comfyui_inference_log_level", "INFO"), + blacklist_custom_nodes=["ComfyUI-Manager"], ) + await self.pipeline.initialize() async def warmup(self): - """Public warmup method that triggers pipeline warmup.""" + """Warm up the pipeline.""" if not self.pipeline: logger.warning("Warmup requested before pipeline initialization") return - + logger.info("Running pipeline warmup...") - """Run pipeline warmup.""" try: capabilities = self.pipeline.get_workflow_io_capabilities() - logger.info(f"Detected I/O capabilities for warmup: {capabilities}") - - # Warm video if there are video inputs or outputs - if capabilities.get("video", {}).get("input") or capabilities.get("video", {}).get("output"): - logger.info("Running video warmup...") - await self.pipeline.warm_video() - logger.info("Video warmup completed") - - # Warm audio if there are audio inputs or outputs - if capabilities.get("audio", {}).get("input") or capabilities.get("audio", {}).get("output"): - logger.info("Running audio warmup...") - await self.pipeline.warm_audio() - logger.info("Audio warmup completed") - + logger.info(f"Detected I/O capabilities: {capabilities}") + + await self.pipeline.warmup() + except Exception as e: logger.error(f"Warmup failed: {e}") @@ -195,23 +303,43 @@ def _schedule_warmup(self) -> None: except Exception: logger.warning("Failed to schedule warmup", exc_info=True) - async def process_video_async(self, frame: VideoFrame) -> VideoFrame: - """Process video frame through ComfyStream Pipeline.""" + async def process_video_async( + self, frame: VideoFrame + ) -> Union[VideoFrame, VideoProcessingResult]: + """Process video frame through ComfyStream Pipeline. + + Returns VideoProcessingResult.WITHHELD to trigger pytrickle's automatic overlay when + processed frames are not yet available. + """ try: - + if not self.pipeline: + return frame + + # If pipeline ingestion is paused, withhold frame so pytrickle renders the overlay + if not self.pipeline.is_ingest_enabled(): + return VideoProcessingResult.WITHHELD + # Convert pytrickle VideoFrame to av.VideoFrame av_frame = frame.to_av_frame(frame.tensor) av_frame.pts = frame.timestamp av_frame.time_base = frame.time_base - + # Process through pipeline await self.pipeline.put_video_frame(av_frame) - processed_av_frame = await self.pipeline.get_processed_video_frame() - - # Convert back to pytrickle VideoFrame - processed_frame = VideoFrame.from_av_frame_with_timing(processed_av_frame, frame) - return processed_frame - + + # Try to get processed frame with short timeout + try: + processed_av_frame = await asyncio.wait_for( + self.pipeline.get_processed_video_frame(), + timeout=self._stream_processor.overlay_config.auto_timeout_seconds, + ) + processed_frame = VideoFrame.from_av_frame_with_timing(processed_av_frame, frame) + return processed_frame + + except asyncio.TimeoutError: + # No frame ready yet - return withheld sentinel to trigger overlay + return VideoProcessingResult.WITHHELD + except Exception as e: logger.error(f"Video processing failed: {e}") return frame @@ -221,14 +349,20 @@ async def process_audio_async(self, frame: AudioFrame) -> List[AudioFrame]: try: if not self.pipeline: return [frame] - - # Audio processing needed - use pipeline + + # If pipeline ingestion is paused, passthrough audio + if not self.pipeline.is_ingest_enabled(): + frame.side_data.skipped = True + frame.side_data.passthrough = True + return [frame] + + # Audio processing - use pipeline av_frame = frame.to_av_frame() await self.pipeline.put_audio_frame(av_frame) processed_av_frame = await self.pipeline.get_processed_audio_frame() processed_frame = AudioFrame.from_av_audio(processed_av_frame) return [processed_frame] - + except Exception as e: logger.error(f"Audio processing failed: {e}") return [frame] @@ -237,44 +371,59 @@ async def update_params(self, params: dict): """Update processing parameters.""" if not self.pipeline: return - - # Handle list input - take first element - if isinstance(params, list) and params: - params = params[0] - + + params_payload: Dict[str, Any] = {} + if isinstance(params, list): + params = params[0] if params else {} + + if isinstance(params, dict): + params_payload = dict(params) + elif params is None: + params_payload = {} + else: + logger.warning("Unsupported params type for update_params: %s", type(params)) + return + + if not params_payload: + return + # Validate parameters using the centralized validation - validated = ComfyStreamParamsUpdateRequest(**params).model_dump() + validated = ComfyStreamParamsUpdateRequest(**params_payload).model_dump() logger.info(f"Parameter validation successful, keys: {list(validated.keys())}") - + # Process prompts if provided if "prompts" in validated and validated["prompts"]: - await self._process_prompts(validated["prompts"]) - + await self._process_prompts(validated["prompts"], skip_warmup=True) + # Update pipeline dimensions if "width" in validated: self.pipeline.width = int(validated["width"]) if "height" in validated: self.pipeline.height = int(validated["height"]) - - # Schedule warmup if requested - if validated.get("warmup", False): - self._schedule_warmup() - - async def _process_prompts(self, prompts): + async def _process_prompts(self, prompts, *, skip_warmup: bool = False): """Process and set prompts in the pipeline.""" + if not self.pipeline: + logger.warning("Prompt update requested before pipeline initialization") + return try: converted = convert_prompt(prompts, return_dict=True) - - # Set prompts in pipeline - await self.pipeline.set_prompts([converted]) - logger.info(f"Prompts set successfully: {list(prompts.keys())}") - - # Update text monitoring based on workflow capabilities + + await self.pipeline.apply_prompts( + [converted], + skip_warmup=skip_warmup, + ) + + if self.pipeline.state_manager.can_stream(): + await self.pipeline.start_streaming() + + logger.info(f"Prompts applied successfully: {list(prompts.keys())}") + if self.pipeline.produces_text_output(): self._setup_text_monitoring() else: await self._stop_text_forwarder() - + except Exception as e: logger.error(f"Failed to process prompts: {e}") + raise diff --git a/server/http_streaming/__init__.py b/server/http_streaming/__init__.py deleted file mode 100644 index 4fad17f79..000000000 --- a/server/http_streaming/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -""" -HTTP Streaming module for ComfyStream. - -This module contains components for token management and HTTP streaming routes. -""" diff --git a/server/http_streaming/routes.py b/server/http_streaming/routes.py deleted file mode 100644 index ac309bae5..000000000 --- a/server/http_streaming/routes.py +++ /dev/null @@ -1,69 +0,0 @@ -""" -HTTP streaming routes for ComfyStream. - -This module contains the routes for HTTP streaming. -""" -import asyncio -import logging -from aiohttp import web -from frame_buffer import FrameBuffer -from .tokens import cleanup_expired_sessions, validate_token, create_stream_token - -logger = logging.getLogger(__name__) - -async def stream_mjpeg(request): - """Serve an MJPEG stream with token validation""" - # Clean up expired sessions - cleanup_expired_sessions() - - stream_id = request.query.get("token") - - # Validate the stream token - is_valid, error_message = validate_token(stream_id) - if not is_valid: - return web.Response(status=403, text=error_message) - - frame_buffer = FrameBuffer.get_instance() - - # Use a fixed frame delay for 30 FPS - frame_delay = 1.0 / 30 - - response = web.StreamResponse( - status=200, - reason='OK', - headers={ - 'Content-Type': 'multipart/x-mixed-replace; boundary=frame', - 'Cache-Control': 'no-cache', - 'Connection': 'close', - } - ) - await response.prepare(request) - - try: - while True: - jpeg_frame = frame_buffer.get_current_frame() - if jpeg_frame is not None: - await response.write( - b'--frame\r\n' - b'Content-Type: image/jpeg\r\n\r\n' + jpeg_frame + b'\r\n' - ) - await asyncio.sleep(frame_delay) - except (ConnectionResetError, asyncio.CancelledError): - logger.info("MJPEG stream connection closed") - except Exception as e: - logger.error(f"Error in MJPEG stream: {e}") - finally: - return response - -def setup_routes(app, cors): - """Setup HTTP streaming routes - - Args: - app: The aiohttp web application - cors: The CORS setup object - """ - # Stream token endpoints - cors.add(app.router.add_post("/api/stream-token", create_stream_token)) - - # Stream endpoint with token validation - cors.add(app.router.add_get("/api/stream", stream_mjpeg)) diff --git a/server/http_streaming/tokens.py b/server/http_streaming/tokens.py deleted file mode 100644 index d424cf36d..000000000 --- a/server/http_streaming/tokens.py +++ /dev/null @@ -1,86 +0,0 @@ -""" -Token management system for ComfyStream HTTP streaming. - -This module handles the creation, validation, and management of stream tokens. -""" -import time -import secrets -import logging -from aiohttp import web - -logger = logging.getLogger(__name__) - -# Constants -SESSION_CLEANUP_INTERVAL = 60 # Clean up expired sessions every 60 seconds - -# Global token storage -active_stream_sessions = {} -last_cleanup_time = 0 - -def cleanup_expired_sessions(): - """Clean up expired stream sessions""" - global active_stream_sessions, last_cleanup_time - - current_time = time.time() - - # Only clean up if it's been at least SESSION_CLEANUP_INTERVAL since last cleanup - if current_time - last_cleanup_time < SESSION_CLEANUP_INTERVAL: - return - - # Update the last cleanup time - last_cleanup_time = current_time - - # Find expired sessions - expired_sessions = [sid for sid, expires in active_stream_sessions.items() if current_time > expires] - - # Remove expired sessions - for sid in expired_sessions: - logger.info(f"Removing expired session: {sid[:8]}...") - del active_stream_sessions[sid] - - if expired_sessions: - logger.info(f"Cleaned up {len(expired_sessions)} expired sessions. {len(active_stream_sessions)} active sessions remaining.") - -async def create_stream_token(request): - """Create a unique stream token for secure access to the stream""" - global active_stream_sessions - - # Clean up expired sessions - cleanup_expired_sessions() - - current_time = time.time() - - # Generate a new unique token - stream_id = secrets.token_urlsafe(32) - expires_at = current_time + 3600 # 1 hour from now - - # Store the new session - active_stream_sessions[stream_id] = expires_at - - logger.info(f"Generated new stream token: {stream_id[:8]}... ({len(active_stream_sessions)} active sessions)") - - return web.json_response({ - "stream_id": stream_id, - "expires_at": int(expires_at) - }) - -def validate_token(token): - """Validate a stream token and return whether it's valid - - Args: - token: The token to validate - - Returns: - tuple: (is_valid, error_message) - """ - if not token or token not in active_stream_sessions: - return False, "Invalid stream token" - - # Check if token is expired - current_time = time.time() - if current_time > active_stream_sessions[token]: - # Remove expired token - del active_stream_sessions[token] - return False, "Stream token expired" - - return True, None diff --git a/server/public/stream.html b/server/public/stream.html deleted file mode 100644 index 536781f97..000000000 --- a/server/public/stream.html +++ /dev/null @@ -1,60 +0,0 @@ - - -
- - -