Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
8b2b6d9
initial bump comfui-latest
eliteprox Mar 14, 2025
aab5fae
loosen requirements.txt and make customizable per pipeline, update se…
eliteprox Mar 18, 2025
213b18d
add frame processing (wip)
eliteprox Mar 18, 2025
193fce7
replace asyncio.run with new event loop
pschroedl Mar 18, 2025
9ae59df
feat: temp workaround
varshith15 Mar 19, 2025
252194f
fix: temp async
varshith15 Mar 24, 2025
fb9fd08
fix: noop test
varshith15 Mar 25, 2025
c4d6945
fix: noop works
leszko Mar 25, 2025
93d5542
use dedicated base image for integration work
eliteprox Mar 25, 2025
4a8502d
defer async initialization
eliteprox Mar 25, 2025
2dd12ff
fix: event loop bug
varshith15 Mar 26, 2025
07a369d
remove extra log
eliteprox Mar 26, 2025
1660b4d
fix requirements install
eliteprox Mar 27, 2025
a00e4c0
revert whitespace in devcontainer.json
eliteprox Mar 27, 2025
d2509ea
fix requirements and env activation, revert processed_input change fo…
eliteprox Mar 27, 2025
d118b71
remove unnecessary shell cmd
eliteprox Mar 27, 2025
e3d458f
attempt to overcome CI caching images
eliteprox Mar 27, 2025
e35f573
update comfyui-base image
eliteprox Mar 27, 2025
39473e4
update comfyui-base image
eliteprox Mar 28, 2025
f388769
pin live-base for comfyui
eliteprox Mar 28, 2025
455129a
Update ai-runner-live-pipelines-docker.yaml
pschroedl Mar 28, 2025
7199dce
Revert "pin live-base for comfyui"
eliteprox Mar 28, 2025
d3bb50e
use comfyui-base:latest
eliteprox Mar 31, 2025
1176f9d
cleanup dockerfile
eliteprox Apr 1, 2025
34facd6
temporarily pin livepeer/comfyui-base:feat-runner-integration
eliteprox Apr 1, 2025
5520c1b
restore gpus flag
eliteprox Apr 1, 2025
5f2a579
revert conda env change
eliteprox Apr 2, 2025
4ac7aef
update base image name
eliteprox Apr 2, 2025
b0e1afa
Merge branch 'main' into feat/upgrade-comfystream
victorges Apr 4, 2025
106553b
.github: Add mypy step on test CI
victorges Apr 4, 2025
42c7a0d
live/protocol: Fix typing on last_value_cache
victorges Apr 4, 2025
c2643ec
Add mypy.ini file
victorges Apr 4, 2025
e4b24d0
Remove live-ai requirements from mypy ci
victorges Apr 4, 2025
0f17eda
Fix mypy folder
victorges Apr 4, 2025
2be1508
Make mypy config less strict
victorges Apr 4, 2025
4334d18
mypy: Remove hallucinated config
victorges Apr 4, 2025
0fb4a83
WIP
victorges Apr 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 48 additions & 30 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
@@ -1,34 +1,52 @@
// For format details, see https://aka.ms/devcontainer.json. For config options, see the
// README at: https://github.com/devcontainers/templates/tree/main/src/ubuntu
{
"name": "ai-runner",
// Image to use for the dev container. More info: https://containers.dev/guide/dockerfile.
"build": {
"dockerfile": "../Dockerfile",
// "dockerfile": "../docker/Dockerfile.text_to_speech",
"context": ".."
},
"runArgs": [
"--gpus=all"
],
// Features to add to the dev container. More info: https://containers.dev/features.
// Configure tool-specific properties.
"customizations": {
"vscode": {
"settings": {},
"extensions": [
"ms-python.python",
"ms-python.black-formatter"
]
}
},
// Use 'forwardPorts' to make a list of ports inside the container available locally.
"forwardPorts": [
8000
],
// Use 'mounts' to make a list of local folders available inside the container.
"mounts": [
// "source=${localWorkspaceFolder}/models,target=/models,type=bind"
"source=${localEnv:HOME}/.lpData/models,target=/models,type=bind"
]
"name": "ai-runner",
"initializeCommand": "ls",
// Image to use for the dev container. More info: https://containers.dev/guide/dockerfile.
"containerEnv": {
"PIPELINE": "comfyui"
},
"build": {
"dockerfile": "../runner/docker/Dockerfile.live-app__PIPELINE__",
"args": {
"PIPELINE": "comfyui"
},
// "dockerfile": "../Dockerfile",
// "dockerfile": "../docker/Dockerfile.text_to_speech",
"context": "../runner"
},
"runArgs": [
"--gpus=all"
],
// Features to add to the dev container. More info: https://containers.dev/features.
// Configure tool-specific properties.
"customizations": {
"vscode": {
"settings": {
"python.defaultInterpreterPath": "/workspace/miniconda3/envs/comfystream/bin/python",
"python.venvPath": "/workspace/miniconda3/envs",
"python.terminal.activateEnvInCurrentTerminal": false,
"python.terminal.activateEnvironment": true,
"terminal.integrated.shellIntegration.enabled": true
},
"extensions": [
"ms-python.python",
"ms-python.black-formatter"
]
}
},
// Use 'forwardPorts' to make a list of ports inside the container available locally.
"forwardPorts": [
8000
],
"appPort": [
"8000:8000"
],
// Use 'mounts' to make a list of local folders available inside the container.
"mounts": [
// "source=${localWorkspaceFolder}/models,target=/models,type=bind"
"source=${localEnv:HOME}/models/ComfyUI--models/,target=/workspace/ComfyUI/models,type=bind",
"source=${localEnv:HOME}/models/ComfyUI--output/,target=/workspace/ComfyUI/output,type=bind"
]
}
2 changes: 1 addition & 1 deletion .github/workflows/ai-runner-live-pipelines-docker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
uses: tj-actions/changed-files@823fcebdb31bb35fdf2229d9f769b400309430d0 # v46.0.3
with:
files: |
runner/docker/Dockerfile.live-base
runner/docker/Dockerfile.live-base*

- name: Check if build needed
id: check_build
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/ai-runner-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ jobs:
pip install -r runner/requirements-dev.txt
pip install -r runner/requirements.txt

- name: Run mypy type checking
working-directory: runner
run: mypy app/

- name: Run tests
working-directory: runner
run: pytest --verbose --showlocals
62 changes: 36 additions & 26 deletions runner/app/live/pipelines/comfyui.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
import os
import json
import torch
from PIL import Image
import asyncio
import numpy as np
from PIL import Image
from typing import Union
from pydantic import BaseModel, field_validator

from .interface import Pipeline
from comfystream.client import ComfyStreamClient
from trickle import VideoFrame, VideoOutput

import logging

COMFY_UI_WORKSPACE_ENV = "COMFY_UI_WORKSPACE"
WARMUP_RUNS = 1
DEFAULT_WORKFLOW_JSON = json.loads("""
{
"1": {
Expand Down Expand Up @@ -263,46 +265,54 @@ def validate_prompt(cls, v) -> dict:


class ComfyUI(Pipeline):
def __init__(self, **params):
super().__init__(**params)

def __init__(self):
comfy_ui_workspace = os.getenv(COMFY_UI_WORKSPACE_ENV)
self.client = ComfyStreamClient(cwd=comfy_ui_workspace)
self.params: ComfyUIParams
self.video_incoming_frames = asyncio.Queue()

self.update_params(**params)
async def warm_video(self):
dummy_frame = VideoFrame(None, 0, 0)
dummy_frame.side_data.input = torch.randn(1, 512, 512, 3)

# Comfy will cache nodes that only need to be run once (i.e. a node that loads model weights)
# We can run the prompt once before actual inputs come in to "warmup"
warmup_input = torch.randn(1, 512, 512, 3)
asyncio.get_event_loop().run_until_complete(self.client.queue_prompt(warmup_input))
for _ in range(WARMUP_RUNS):
self.client.put_video_input(dummy_frame)
_ = await self.client.get_video_output()
logging.info("Video frame warmup done")

def process_frame(self, image: Image.Image) -> Image.Image:
# Normalize by dividing by 255 to ensure the tensor values are between 0 and 1
image_np = np.array(image.convert("RGB")).astype(np.float32) / 255.0
# Convert from numpy to torch.Tensor
# Initially, the torch.Tensor will have shape HWC but we want BHWC
# unsqueeze(0) will add a batch dimension at the beginning of 1 which means we just have 1 image
image_tensor = torch.tensor(image_np).unsqueeze(0)
async def put_video_frame(self, frame: VideoFrame):
image_np = np.array(frame.image.convert("RGB")).astype(np.float32) / 255.0
frame.side_data.input = torch.tensor(image_np).unsqueeze(0)
frame.side_data.skipped = True
self.client.put_video_input(frame)
await self.video_incoming_frames.put(frame)

# Process using ComfyUI pipeline
result_tensor = asyncio.get_event_loop().run_until_complete(self.client.queue_prompt(image_tensor))
async def get_processed_video_frame(self):
result_tensor = await self.client.get_video_output()
frame = await self.video_incoming_frames.get()
while frame.side_data.skipped:
frame = await self.video_incoming_frames.get()

# Convert back from Tensor to PIL.Image
result_tensor = result_tensor.squeeze(0)
result_image_np = (result_tensor * 255).byte()
result_image = Image.fromarray(result_image_np.cpu().numpy())
return result_image
return frame.replace_image(result_image)

async def set_params(self, **params):
new_params = ComfyUIParams(**params)
logging.info(f"Setting ComfyUI Pipeline Prompt: {new_params.prompt}")
# TODO: currently its a single prompt, but need to support multiple prompts
await self.client.set_prompts([new_params.prompt])
self.params = new_params

def update_params(self, **params):
async def update_params(self, **params):
new_params = ComfyUIParams(**params)
logging.info(f"ComfyUI Pipeline Prompt: {new_params.prompt}")
self.client.set_prompt(new_params.prompt)
logging.info(f"Updating ComfyUI Pipeline Prompt: {new_params.prompt}")
# TODO: currently its a single prompt, but need to support multiple prompts
await self.client.update_prompts([new_params.prompt])
self.params = new_params

#TODO: This is a hack to stop the ComfyStreamClient. Use the comfystream api to stop the client in 0.0.2
async def stop(self):
logging.info("Stopping ComfyUI pipeline")
if self.client.comfy_client.is_running:
await self.client.comfy_client.__aexit__(None, None, None)
await self.client.stop()
logging.info("ComfyUI pipeline stopped")
33 changes: 25 additions & 8 deletions runner/app/live/pipelines/interface.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from abc import ABC, abstractmethod
from PIL import Image
from abc import ABC, abstractmethod
from trickle import VideoFrame, VideoOutput

class Pipeline(ABC):
"""Abstract base class for image processing pipelines.
Expand All @@ -22,21 +23,37 @@ def __init__(self, **params):
pass

@abstractmethod
def process_frame(self, frame: Image.Image) -> Image.Image:
"""Process a single frame through the pipeline.

Called sequentially with each frame from the stream.
async def put_video_frame(self, frame: VideoFrame):
"""Put a frame into the pipeline.

Args:
frame: Input PIL Image
frame: Input VideoFrame
"""
pass

@abstractmethod
async def get_processed_video_frame(self) -> VideoFrame:
"""Get a processed frame from the pipeline.

Returns:
Processed PIL Image
Processed VideoFrame
"""
pass

@abstractmethod
async def set_params(self, **params):
"""Set pipeline parameters initally.

Must maintain valid state on success or restore previous state on failure.
set_params starts the prompt loops in comfystream.

Args:
**params: Implementation-specific parameters
"""
pass

@abstractmethod
def update_params(self, **params):
async def update_params(self, **params):
"""Update pipeline parameters.

Must maintain valid state on success or restore previous state on failure.
Expand Down
6 changes: 3 additions & 3 deletions runner/app/live/pipelines/loader.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from .interface import Pipeline

def load_pipeline(name: str, **params) -> Pipeline:
def load_pipeline(name: str) -> Pipeline:
if name == "comfyui":
from .comfyui import ComfyUI
return ComfyUI(**params)
return ComfyUI()
elif name == "noop":
from .noop import Noop
return Noop(**params)
return Noop()
raise ValueError(f"Unknown pipeline: {name}")
29 changes: 23 additions & 6 deletions runner/app/live/pipelines/noop.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,31 @@
from PIL import Image
import logging
import asyncio
from PIL import Image


from .interface import Pipeline
from trickle import VideoFrame, VideoOutput

class Noop(Pipeline):
def __init__(self, **params):
super().__init__(**params)
def __init__(self):
self.frame_queue = asyncio.Queue()

async def put_video_frame(self, frame: VideoFrame):
await self.frame_queue.put(frame)

def process_frame(self, image: Image.Image) -> Image.Image:
return image.convert("RGB")
async def get_processed_video_frame(self) -> VideoFrame:
frame = await self.frame_queue.get()
processed_frame = frame.image.convert("RGB")
return frame.replace_image(processed_frame)

def update_params(self, **params):
async def warm_video(self):
logging.info("Warming video")

async def set_params(self, **params):
logging.info(f"Setting params: {params}")

async def update_params(self, **params):
logging.info(f"Updating params: {params}")

async def stop(self):
logging.info("Stopping pipeline")
Loading
Loading