From 8b2b6d9a55268121d30dfa28c65faef1faa633e0 Mon Sep 17 00:00:00 2001 From: Elite Encoder Date: Fri, 14 Mar 2025 00:48:40 +0000 Subject: [PATCH 01/36] initial bump comfui-latest --- .devcontainer/devcontainer.json | 75 +++++++++++++--------- runner/docker/Dockerfile.live-base-comfyui | 3 +- 2 files changed, 46 insertions(+), 32 deletions(-) diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 902d66cac..b9f8c5cdc 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -1,34 +1,49 @@ // For format details, see https://aka.ms/devcontainer.json. For config options, see the // README at: https://github.com/devcontainers/templates/tree/main/src/ubuntu { - "name": "ai-runner", - // Image to use for the dev container. More info: https://containers.dev/guide/dockerfile. - "build": { - "dockerfile": "../Dockerfile", - // "dockerfile": "../docker/Dockerfile.text_to_speech", - "context": ".." - }, - "runArgs": [ - "--gpus=all" - ], - // Features to add to the dev container. More info: https://containers.dev/features. - // Configure tool-specific properties. - "customizations": { - "vscode": { - "settings": {}, - "extensions": [ - "ms-python.python", - "ms-python.black-formatter" - ] - } - }, - // Use 'forwardPorts' to make a list of ports inside the container available locally. - "forwardPorts": [ - 8000 - ], - // Use 'mounts' to make a list of local folders available inside the container. - "mounts": [ - // "source=${localWorkspaceFolder}/models,target=/models,type=bind" - "source=${localEnv:HOME}/.lpData/models,target=/models,type=bind" - ] + "name": "ai-runner", + "initializeCommand": "ls", + // Image to use for the dev container. More info: https://containers.dev/guide/dockerfile. + "build": { + "dockerfile": "../runner/docker/Dockerfile.live-app__PIPELINE__", + "args": { + "PIPELINE": "comfyui" + }, + // "dockerfile": "../Dockerfile", + // "dockerfile": "../docker/Dockerfile.text_to_speech", + "context": "../runner" + }, + "runArgs": [ + "--gpus=all" + ], + // Features to add to the dev container. More info: https://containers.dev/features. + // Configure tool-specific properties. + "customizations": { + "vscode": { + "settings": { + "python.defaultInterpreterPath": "/workspace/miniconda3/envs/comfystream/bin/python", + "python.venvPath": "/workspace/miniconda3/envs", + "python.terminal.activateEnvInCurrentTerminal": false, + "python.terminal.activateEnvironment": true, + "terminal.integrated.shellIntegration.enabled": true + }, + "extensions": [ + "ms-python.python", + "ms-python.black-formatter" + ] + } + }, + // Use 'forwardPorts' to make a list of ports inside the container available locally. + "forwardPorts": [ + 8000 + ], + "appPort": [ + "8000:8000" + ], + // Use 'mounts' to make a list of local folders available inside the container. + "mounts": [ + // "source=${localWorkspaceFolder}/models,target=/models,type=bind" + "source=${localEnv:HOME}/models/ComfyUI--models/,target=/workspace/ComfyUI/models,type=bind", + "source=${localEnv:HOME}/models/ComfyUI--output/,target=/workspace/ComfyUI/output,type=bind" + ] } diff --git a/runner/docker/Dockerfile.live-base-comfyui b/runner/docker/Dockerfile.live-base-comfyui index 874aa6995..2ef0237c3 100644 --- a/runner/docker/Dockerfile.live-base-comfyui +++ b/runner/docker/Dockerfile.live-base-comfyui @@ -1,4 +1,4 @@ -ARG BASE_IMAGE=livepeer/comfyui-base:ai-runner-9166aa9 +ARG BASE_IMAGE=livepeer/comfyui-base:latest FROM ${BASE_IMAGE} # ----------------------------------------------------------------------------- @@ -72,7 +72,6 @@ RUN apt-get clean && rm -rf /var/lib/apt/lists/* # Set up ComfyUI workspace ENV COMFY_UI_WORKSPACE="/workspace/ComfyUI" -ENV PYTHONPATH=/workspace/ComfyUI:${PYTHONPATH:-} RUN mkdir -p /workspace/ComfyUI RUN rm -rf /workspace/ComfyUI/models && ln -s /models/ComfyUI--models /workspace/ComfyUI/models RUN rm -rf /workspace/ComfyUI/output && ln -s /models/ComfyUI--output /workspace/ComfyUI/output From aab5fae72a24d8c390386c6d597a14bed72b0a87 Mon Sep 17 00:00:00 2001 From: Elite Encoder Date: Tue, 18 Mar 2025 04:24:35 +0000 Subject: [PATCH 02/36] loosen requirements.txt and make customizable per pipeline, update set_prompts. (WIP for async changes) --- runner/app/live/pipelines/comfyui.py | 2 +- runner/docker/Dockerfile.live-app__PIPELINE__ | 14 +++++++++++--- runner/docker/Dockerfile.live-base-comfyui | 8 +------- runner/requirements.live.comfyui.txt | 16 ++++++++++++++++ 4 files changed, 29 insertions(+), 11 deletions(-) create mode 100644 runner/requirements.live.comfyui.txt diff --git a/runner/app/live/pipelines/comfyui.py b/runner/app/live/pipelines/comfyui.py index 72d141439..573673dfa 100644 --- a/runner/app/live/pipelines/comfyui.py +++ b/runner/app/live/pipelines/comfyui.py @@ -297,7 +297,7 @@ def process_frame(self, image: Image.Image) -> Image.Image: def update_params(self, **params): new_params = ComfyUIParams(**params) logging.info(f"ComfyUI Pipeline Prompt: {new_params.prompt}") - self.client.set_prompt(new_params.prompt) + self.client.set_prompts([new_params.prompt]) self.params = new_params #TODO: This is a hack to stop the ComfyStreamClient. Use the comfystream api to stop the client in 0.0.2 diff --git a/runner/docker/Dockerfile.live-app__PIPELINE__ b/runner/docker/Dockerfile.live-app__PIPELINE__ index 3a71093de..60d266e08 100644 --- a/runner/docker/Dockerfile.live-app__PIPELINE__ +++ b/runner/docker/Dockerfile.live-app__PIPELINE__ @@ -1,6 +1,9 @@ ARG PIPELINE=comfyui ARG BASE_IMAGE=livepeer/ai-runner:live-base-${PIPELINE} FROM ${BASE_IMAGE} +#TODO: Set environment variable from build argument +# ENV PIPELINE=${PIPELINE} +ENV PIPELINE=comfyui # Install latest stable Go version and system dependencies RUN apt-get update && apt-get install -y \ @@ -11,9 +14,14 @@ RUN apt-get update && apt-get install -y \ && apt-get clean && rm -rf /var/lib/apt/lists/* # Install any additional Python packages - -COPY requirements.live-ai.txt /app/requirements.txt -RUN pip install --no-cache-dir -r /app/requirements.txt +#TODO: Update the other files to use this method to find requirements.txt files, renaming the existing ones to requirements.pipeline.txt + +COPY requirements.live.${PIPELINE}.txt /app/requirements.txt +RUN if [ "$PIPELINE" = "comfyui" ]; then \ + conda run -n comfystream pip install --no-cache-dir -r /app/requirements.txt; \ +else \ + pip install --no-cache-dir -r /app/requirements.txt; \ +fi # TODO: Figure out a way to have this in requirements file RUN pip install --no-cache-dir triton==3.1.0 RUN pip uninstall -y onnx onnxruntime onnxruntime-gpu diff --git a/runner/docker/Dockerfile.live-base-comfyui b/runner/docker/Dockerfile.live-base-comfyui index 2ef0237c3..9a1d25f43 100644 --- a/runner/docker/Dockerfile.live-base-comfyui +++ b/runner/docker/Dockerfile.live-base-comfyui @@ -8,6 +8,7 @@ FROM ${BASE_IMAGE} ENV DEBIAN_FRONTEND=noninteractive ENV NVIDIA_VISIBLE_DEVICES=all ENV NVIDIA_DRIVER_CAPABILITIES=compute,utility,video +ENV PATH="/workspace/miniconda3/bin:${PATH}" # Install prerequisites RUN apt-get update && \ @@ -75,11 +76,4 @@ ENV COMFY_UI_WORKSPACE="/workspace/ComfyUI" RUN mkdir -p /workspace/ComfyUI RUN rm -rf /workspace/ComfyUI/models && ln -s /models/ComfyUI--models /workspace/ComfyUI/models RUN rm -rf /workspace/ComfyUI/output && ln -s /models/ComfyUI--output /workspace/ComfyUI/output - -# Ensure all the next RUN commands are run in the comfystream conda environment -RUN echo "source /workspace/miniconda3/etc/profile.d/conda.sh && conda activate comfystream" >> /conda_activate.sh && \ - chmod +x /conda_activate.sh -ENV BASH_ENV=/conda_activate.sh -SHELL ["/bin/bash", "-c"] -# Ensure the app run from CMD is also in the comfystream conda environment (just wrap in bash and it will run the .bashrc above) ENTRYPOINT ["bash", "-c", "exec /opt/nvidia/nvidia_entrypoint.sh \"$@\"", "--"] diff --git a/runner/requirements.live.comfyui.txt b/runner/requirements.live.comfyui.txt new file mode 100644 index 000000000..2dc468d6f --- /dev/null +++ b/runner/requirements.live.comfyui.txt @@ -0,0 +1,16 @@ +accelerate==0.30.1 +transformers==4.43.3 +fastapi==0.111.0 +pydantic>=2.7.2 +Pillow==10.3.0 +python-multipart==0.0.9 +uvicorn==0.34.0 +triton>=2.1.0 +peft>=0.11.1 +deepcache>=0.1.1 +safetensors>=0.4.3 +scipy>=1.13.0 +numpy>=1.26.4 +--no-binary=av +#av<=14.0.0 +pyzmq==26.2.0 From 213b18d5bafe6e8e306b6e8069732f0fb71de605 Mon Sep 17 00:00:00 2001 From: Elite Encoder Date: Tue, 18 Mar 2025 21:01:46 +0000 Subject: [PATCH 03/36] add frame processing (wip) --- runner/app/live/pipelines/comfyui.py | 140 +++++++++++++++++++++++---- runner/app/live/streamer/process.py | 3 +- 2 files changed, 123 insertions(+), 20 deletions(-) diff --git a/runner/app/live/pipelines/comfyui.py b/runner/app/live/pipelines/comfyui.py index 573673dfa..3808d0f0f 100644 --- a/runner/app/live/pipelines/comfyui.py +++ b/runner/app/live/pipelines/comfyui.py @@ -1,10 +1,11 @@ import os import json import torch -from PIL import Image -import asyncio import numpy as np -from typing import Union +import asyncio +import av +from PIL import Image +from typing import Union, Dict, List, Any from pydantic import BaseModel, field_validator from .interface import Pipeline @@ -13,6 +14,7 @@ import logging COMFY_UI_WORKSPACE_ENV = "COMFY_UI_WORKSPACE" +WARMUP_RUNS = 1 DEFAULT_WORKFLOW_JSON = json.loads(""" { "1": { @@ -267,42 +269,142 @@ def __init__(self, **params): super().__init__(**params) comfy_ui_workspace = os.getenv(COMFY_UI_WORKSPACE_ENV) - self.client = ComfyStreamClient(cwd=comfy_ui_workspace) + self.client = ComfyStreamClient(cwd=comfy_ui_workspace, **params) self.params: ComfyUIParams + + self.video_incoming_frames = asyncio.Queue() + self.audio_incoming_frames = asyncio.Queue() + self.processed_audio_buffer = np.array([], dtype=np.int16) - self.update_params(**params) + # Use asyncio.run to handle update_params since it's now async + asyncio.run(self.update_params(**params)) + async def warm_video(self): # Comfy will cache nodes that only need to be run once (i.e. a node that loads model weights) # We can run the prompt once before actual inputs come in to "warmup" - warmup_input = torch.randn(1, 512, 512, 3) - asyncio.get_event_loop().run_until_complete(self.client.queue_prompt(warmup_input)) + dummy_frame = av.VideoFrame() + dummy_frame.side_data.input = torch.randn(1, 512, 512, 3) + + for _ in range(WARMUP_RUNS): + self.client.put_video_input(dummy_frame) + await self.client.get_video_output() + + async def warm_audio(self): + dummy_frame = av.AudioFrame() + dummy_frame.side_data.input = np.random.randint(-32768, 32767, int(48000 * 0.5), dtype=np.int16) + dummy_frame.sample_rate = 48000 + + for _ in range(WARMUP_RUNS): + self.client.put_audio_input(dummy_frame) + await self.client.get_audio_output() + + def video_preprocess(self, frame: av.VideoFrame) -> Union[torch.Tensor, np.ndarray]: + frame_np = frame.to_ndarray(format="rgb24").astype(np.float32) / 255.0 + return torch.from_numpy(frame_np).unsqueeze(0) + + def audio_preprocess(self, frame: av.AudioFrame) -> Union[torch.Tensor, np.ndarray]: + return frame.to_ndarray().ravel().reshape(-1, 2).mean(axis=1).astype(np.int16) + + def video_postprocess(self, output: Union[torch.Tensor, np.ndarray]) -> av.VideoFrame: + return av.VideoFrame.from_ndarray( + (output * 255.0).clamp(0, 255).to(dtype=torch.uint8).squeeze(0).cpu().numpy() + ) + + def audio_postprocess(self, output: Union[torch.Tensor, np.ndarray]) -> av.AudioFrame: + return av.AudioFrame.from_ndarray(np.repeat(output, 2).reshape(1, -1)) + + async def put_video_frame(self, frame: av.VideoFrame): + frame.side_data.input = self.video_preprocess(frame) + frame.side_data.skipped = True + self.client.put_video_input(frame) + await self.video_incoming_frames.put(frame) + async def put_audio_frame(self, frame: av.AudioFrame): + frame.side_data.input = self.audio_preprocess(frame) + frame.side_data.skipped = True + self.client.put_audio_input(frame) + await self.audio_incoming_frames.put(frame) + + async def get_processed_video_frame(self): + out_tensor = await self.client.get_video_output() + frame = await self.video_incoming_frames.get() + while frame.side_data.skipped: + frame = await self.video_incoming_frames.get() + + processed_frame = self.video_postprocess(out_tensor) + processed_frame.pts = frame.pts + processed_frame.time_base = frame.time_base + + return processed_frame + + async def get_processed_audio_frame(self): + frame = await self.audio_incoming_frames.get() + if frame.samples > len(self.processed_audio_buffer): + out_tensor = await self.client.get_audio_output() + self.processed_audio_buffer = np.concatenate([self.processed_audio_buffer, out_tensor]) + out_data = self.processed_audio_buffer[:frame.samples] + self.processed_audio_buffer = self.processed_audio_buffer[frame.samples:] + + processed_frame = self.audio_postprocess(out_data) + processed_frame.pts = frame.pts + processed_frame.time_base = frame.time_base + processed_frame.sample_rate = frame.sample_rate + + return processed_frame + + # For backward compatibility def process_frame(self, image: Image.Image) -> Image.Image: # Normalize by dividing by 255 to ensure the tensor values are between 0 and 1 - image_np = np.array(image.convert("RGB")).astype(np.float32) / 255.0 + # image_np = np.array(image.convert("RGB")).astype(np.float32) / 255.0 + + #image_np = self.video_preprocess(image_np) + # Convert from numpy to torch.Tensor # Initially, the torch.Tensor will have shape HWC but we want BHWC # unsqueeze(0) will add a batch dimension at the beginning of 1 which means we just have 1 image - image_tensor = torch.tensor(image_np).unsqueeze(0) + #image_tensor = torch.tensor(image_np).unsqueeze(0) # Process using ComfyUI pipeline - result_tensor = asyncio.get_event_loop().run_until_complete(self.client.queue_prompt(image_tensor)) + result_tensor = asyncio.get_event_loop().run_until_complete(self.put_video_frame(image)) + video_postprocess = self.video_postprocess(result_tensor) +# result_tensor = asyncio.get_event_loop().run_until_complete(self.client.run_(image_tensor)) + + result_image = self.video_postprocess(video_postprocess) - # Convert back from Tensor to PIL.Image - result_tensor = result_tensor.squeeze(0) - result_image_np = (result_tensor * 255).byte() - result_image = Image.fromarray(result_image_np.cpu().numpy()) + # # Convert back from Tensor to PIL.Image + # result_tensor = result_tensor.squeeze(0) + # result_image_np = (result_tensor * 255).byte() + # result_image = Image.fromarray(result_image_np.cpu().numpy()) return result_image - def update_params(self, **params): + async def update_params(self, **params): new_params = ComfyUIParams(**params) logging.info(f"ComfyUI Pipeline Prompt: {new_params.prompt}") - self.client.set_prompts([new_params.prompt]) + await self.set_prompts(new_params.prompt) self.params = new_params - #TODO: This is a hack to stop the ComfyStreamClient. Use the comfystream api to stop the client in 0.0.2 + async def set_prompts(self, prompts: Union[Dict[Any, Any], List[Dict[Any, Any]]]): + if isinstance(prompts, list): + await self.client.set_prompts(prompts) + else: + await self.client.set_prompts([prompts]) + + async def update_prompts(self, prompts: Union[Dict[Any, Any], List[Dict[Any, Any]]]): + if isinstance(prompts, list): + await self.client.update_prompts(prompts) + else: + await self.client.update_prompts([prompts]) + + async def get_nodes_info(self) -> Dict[str, Any]: + """Get information about all nodes in the current prompt including metadata.""" + nodes_info = await self.client.get_available_nodes() + return nodes_info + + # Modified stop method to use cleanup async def stop(self): logging.info("Stopping ComfyUI pipeline") - if self.client.comfy_client.is_running: - await self.client.comfy_client.__aexit__(None, None, None) + await self.cleanup() logging.info("ComfyUI pipeline stopped") + + async def cleanup(self): + await self.client.cleanup() diff --git a/runner/app/live/streamer/process.py b/runner/app/live/streamer/process.py index 4e302ecd9..e8d3d1825 100644 --- a/runner/app/live/streamer/process.py +++ b/runner/app/live/streamer/process.py @@ -161,6 +161,7 @@ def _handle_logging_params(params: dict) -> dict: raise while not self.is_done(): + logging.debug("PipelineProcess: Entering main loop") while not self.param_update_queue.empty(): params = self.param_update_queue.get_nowait() try: @@ -182,7 +183,7 @@ def _handle_logging_params(params: dict) -> dict: try: if isinstance(input_frame, VideoFrame): input_frame.log_timestamps["pre_process_frame"] = time.time() - output_image = pipeline.process_frame(input_frame.image) + output_image = pipeline.process_frame(input_frame) input_frame.log_timestamps["post_process_frame"] = time.time() output_frame = VideoOutput(input_frame.replace_image(output_image)) self.output_queue.put(output_frame) From 193fce7ac0776ab36a274a88eed08d4931dbd9a1 Mon Sep 17 00:00:00 2001 From: Peter Schroedl Date: Tue, 18 Mar 2025 23:35:35 +0000 Subject: [PATCH 04/36] replace asyncio.run with new event loop --- runner/app/live/pipelines/comfyui.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/runner/app/live/pipelines/comfyui.py b/runner/app/live/pipelines/comfyui.py index 3808d0f0f..e6d5651c4 100644 --- a/runner/app/live/pipelines/comfyui.py +++ b/runner/app/live/pipelines/comfyui.py @@ -276,8 +276,12 @@ def __init__(self, **params): self.audio_incoming_frames = asyncio.Queue() self.processed_audio_buffer = np.array([], dtype=np.int16) - # Use asyncio.run to handle update_params since it's now async - asyncio.run(self.update_params(**params)) + self.loop = asyncio.new_event_loop() + self.loop.run_until_complete(self._init_async(**params)) + + async def _init_async(self, **params): + await self.update_params(**params) + await self.warm_video() async def warm_video(self): # Comfy will cache nodes that only need to be run once (i.e. a node that loads model weights) @@ -287,7 +291,9 @@ async def warm_video(self): for _ in range(WARMUP_RUNS): self.client.put_video_input(dummy_frame) - await self.client.get_video_output() + _ = await self.client.get_video_output() + logging.info("Warmup complete") + async def warm_audio(self): dummy_frame = av.AudioFrame() From 9ae59dfecd21b552d639e60dd691a7379beb0b5f Mon Sep 17 00:00:00 2001 From: Varshith B Date: Wed, 19 Mar 2025 17:40:01 +0530 Subject: [PATCH 05/36] feat: temp workaround --- runner/app/live/pipelines/comfyui.py | 160 ++++++------------------- runner/app/live/pipelines/interface.py | 9 +- runner/app/live/pipelines/noop.py | 5 +- runner/app/live/streamer/process.py | 7 +- runner/app/live/trickle/frame.py | 15 ++- 5 files changed, 61 insertions(+), 135 deletions(-) diff --git a/runner/app/live/pipelines/comfyui.py b/runner/app/live/pipelines/comfyui.py index e6d5651c4..260db2c32 100644 --- a/runner/app/live/pipelines/comfyui.py +++ b/runner/app/live/pipelines/comfyui.py @@ -1,15 +1,15 @@ import os import json import torch -import numpy as np import asyncio -import av +import numpy as np from PIL import Image -from typing import Union, Dict, List, Any +from typing import Union from pydantic import BaseModel, field_validator from .interface import Pipeline from comfystream.client import ComfyStreamClient +from trickle import VideoFrame, VideoOutput import logging @@ -273,144 +273,58 @@ def __init__(self, **params): self.params: ComfyUIParams self.video_incoming_frames = asyncio.Queue() - self.audio_incoming_frames = asyncio.Queue() - self.processed_audio_buffer = np.array([], dtype=np.int16) self.loop = asyncio.new_event_loop() - self.loop.run_until_complete(self._init_async(**params)) - - async def _init_async(self, **params): - await self.update_params(**params) - await self.warm_video() - - async def warm_video(self): - # Comfy will cache nodes that only need to be run once (i.e. a node that loads model weights) - # We can run the prompt once before actual inputs come in to "warmup" - dummy_frame = av.VideoFrame() - dummy_frame.side_data.input = torch.randn(1, 512, 512, 3) + self.set_params(**params) + self.warm_video() - for _ in range(WARMUP_RUNS): - self.client.put_video_input(dummy_frame) - _ = await self.client.get_video_output() - logging.info("Warmup complete") - - - async def warm_audio(self): - dummy_frame = av.AudioFrame() - dummy_frame.side_data.input = np.random.randint(-32768, 32767, int(48000 * 0.5), dtype=np.int16) - dummy_frame.sample_rate = 48000 + def warm_video(self): + dummy_frame = VideoFrame(None, 0, 0) + dummy_frame.side_data.processed_input = torch.randn(1, 512, 512, 3) for _ in range(WARMUP_RUNS): - self.client.put_audio_input(dummy_frame) - await self.client.get_audio_output() - - def video_preprocess(self, frame: av.VideoFrame) -> Union[torch.Tensor, np.ndarray]: - frame_np = frame.to_ndarray(format="rgb24").astype(np.float32) / 255.0 - return torch.from_numpy(frame_np).unsqueeze(0) - - def audio_preprocess(self, frame: av.AudioFrame) -> Union[torch.Tensor, np.ndarray]: - return frame.to_ndarray().ravel().reshape(-1, 2).mean(axis=1).astype(np.int16) - - def video_postprocess(self, output: Union[torch.Tensor, np.ndarray]) -> av.VideoFrame: - return av.VideoFrame.from_ndarray( - (output * 255.0).clamp(0, 255).to(dtype=torch.uint8).squeeze(0).cpu().numpy() - ) + self.client.put_video_input(dummy_frame) + _ = self.loop.run_until_complete(self.client.get_video_output()) + logging.info("Video frame warmup done") - def audio_postprocess(self, output: Union[torch.Tensor, np.ndarray]) -> av.AudioFrame: - return av.AudioFrame.from_ndarray(np.repeat(output, 2).reshape(1, -1)) - - async def put_video_frame(self, frame: av.VideoFrame): - frame.side_data.input = self.video_preprocess(frame) + async def put_video_frame(self, frame: VideoFrame): + image_np = np.array(frame.image.convert("RGB")).astype(np.float32) / 255.0 + frame.side_data.processed_input = torch.tensor(image_np).unsqueeze(0) frame.side_data.skipped = True self.client.put_video_input(frame) await self.video_incoming_frames.put(frame) - async def put_audio_frame(self, frame: av.AudioFrame): - frame.side_data.input = self.audio_preprocess(frame) - frame.side_data.skipped = True - self.client.put_audio_input(frame) - await self.audio_incoming_frames.put(frame) - async def get_processed_video_frame(self): - out_tensor = await self.client.get_video_output() + result_tensor = await self.client.get_video_output() frame = await self.video_incoming_frames.get() while frame.side_data.skipped: frame = await self.video_incoming_frames.get() - processed_frame = self.video_postprocess(out_tensor) - processed_frame.pts = frame.pts - processed_frame.time_base = frame.time_base - - return processed_frame - - async def get_processed_audio_frame(self): - frame = await self.audio_incoming_frames.get() - if frame.samples > len(self.processed_audio_buffer): - out_tensor = await self.client.get_audio_output() - self.processed_audio_buffer = np.concatenate([self.processed_audio_buffer, out_tensor]) - out_data = self.processed_audio_buffer[:frame.samples] - self.processed_audio_buffer = self.processed_audio_buffer[frame.samples:] - - processed_frame = self.audio_postprocess(out_data) - processed_frame.pts = frame.pts - processed_frame.time_base = frame.time_base - processed_frame.sample_rate = frame.sample_rate - - return processed_frame - - # For backward compatibility - def process_frame(self, image: Image.Image) -> Image.Image: - # Normalize by dividing by 255 to ensure the tensor values are between 0 and 1 - # image_np = np.array(image.convert("RGB")).astype(np.float32) / 255.0 - - #image_np = self.video_preprocess(image_np) - - # Convert from numpy to torch.Tensor - # Initially, the torch.Tensor will have shape HWC but we want BHWC - # unsqueeze(0) will add a batch dimension at the beginning of 1 which means we just have 1 image - #image_tensor = torch.tensor(image_np).unsqueeze(0) - - # Process using ComfyUI pipeline - result_tensor = asyncio.get_event_loop().run_until_complete(self.put_video_frame(image)) - video_postprocess = self.video_postprocess(result_tensor) -# result_tensor = asyncio.get_event_loop().run_until_complete(self.client.run_(image_tensor)) - - result_image = self.video_postprocess(video_postprocess) - - # # Convert back from Tensor to PIL.Image - # result_tensor = result_tensor.squeeze(0) - # result_image_np = (result_tensor * 255).byte() - # result_image = Image.fromarray(result_image_np.cpu().numpy()) - return result_image - - async def update_params(self, **params): + result_tensor = result_tensor.squeeze(0) + result_image_np = (result_tensor * 255).byte() + result_image = Image.fromarray(result_image_np.cpu().numpy()) + return VideoOutput(frame.replace_image(result_image)) + + def process_frame(self, frame: VideoFrame) -> VideoOutput: + #TODO: this is a temporary solution, need to make this async throughout the pipeline + self.loop.run_until_complete(self.put_video_frame(frame)) + return self.loop.run_until_complete(self.get_processed_video_frame()) + + def set_params(self, **params): new_params = ComfyUIParams(**params) - logging.info(f"ComfyUI Pipeline Prompt: {new_params.prompt}") - await self.set_prompts(new_params.prompt) + logging.info(f"Setting ComfyUI Pipeline Prompt: {new_params.prompt}") + # TODO: currently its a single prompt, but need to support multiple prompts + self.loop.run_until_complete(self.client.set_prompts([new_params.prompt])) self.params = new_params - async def set_prompts(self, prompts: Union[Dict[Any, Any], List[Dict[Any, Any]]]): - if isinstance(prompts, list): - await self.client.set_prompts(prompts) - else: - await self.client.set_prompts([prompts]) - - async def update_prompts(self, prompts: Union[Dict[Any, Any], List[Dict[Any, Any]]]): - if isinstance(prompts, list): - await self.client.update_prompts(prompts) - else: - await self.client.update_prompts([prompts]) - - async def get_nodes_info(self) -> Dict[str, Any]: - """Get information about all nodes in the current prompt including metadata.""" - nodes_info = await self.client.get_available_nodes() - return nodes_info + def update_params(self, **params): + new_params = ComfyUIParams(**params) + logging.info(f"Updating ComfyUI Pipeline Prompt: {new_params.prompt}") + # TODO: currently its a single prompt, but need to support multiple prompts + self.loop.run_until_complete(self.client.update_prompts([new_params.prompt])) + self.params = new_params - # Modified stop method to use cleanup - async def stop(self): + def stop(self): logging.info("Stopping ComfyUI pipeline") - await self.cleanup() + self.loop.run_until_complete(self.client.stop()) logging.info("ComfyUI pipeline stopped") - - async def cleanup(self): - await self.client.cleanup() diff --git a/runner/app/live/pipelines/interface.py b/runner/app/live/pipelines/interface.py index 5832a2f4e..4e19e90ad 100644 --- a/runner/app/live/pipelines/interface.py +++ b/runner/app/live/pipelines/interface.py @@ -1,5 +1,6 @@ -from abc import ABC, abstractmethod from PIL import Image +from abc import ABC, abstractmethod +from trickle import VideoFrame, VideoOutput class Pipeline(ABC): """Abstract base class for image processing pipelines. @@ -22,16 +23,16 @@ def __init__(self, **params): pass @abstractmethod - def process_frame(self, frame: Image.Image) -> Image.Image: + def process_frame(self, frame: VideoFrame) -> VideoOutput: """Process a single frame through the pipeline. Called sequentially with each frame from the stream. Args: - frame: Input PIL Image + frame: Input VideoFrame Returns: - Processed PIL Image + Processed VideoFrame """ pass diff --git a/runner/app/live/pipelines/noop.py b/runner/app/live/pipelines/noop.py index 620ad6710..27cf22eba 100644 --- a/runner/app/live/pipelines/noop.py +++ b/runner/app/live/pipelines/noop.py @@ -2,13 +2,14 @@ import logging from .interface import Pipeline +from trickle import VideoFrame, VideoOutput class Noop(Pipeline): def __init__(self, **params): super().__init__(**params) - def process_frame(self, image: Image.Image) -> Image.Image: - return image.convert("RGB") + def process_frame(self, frame: VideoFrame) -> VideoOutput: + return VideoOutput(frame.replace_image(frame.image.convert("RGB"))) def update_params(self, **params): logging.info(f"Updating params: {params}") diff --git a/runner/app/live/streamer/process.py b/runner/app/live/streamer/process.py index e8d3d1825..d3f1a1100 100644 --- a/runner/app/live/streamer/process.py +++ b/runner/app/live/streamer/process.py @@ -183,13 +183,12 @@ def _handle_logging_params(params: dict) -> dict: try: if isinstance(input_frame, VideoFrame): input_frame.log_timestamps["pre_process_frame"] = time.time() - output_image = pipeline.process_frame(input_frame) - input_frame.log_timestamps["post_process_frame"] = time.time() - output_frame = VideoOutput(input_frame.replace_image(output_image)) + output_frame = pipeline.process_frame(input_frame) + output_frame.log_timestamps["post_process_frame"] = time.time() self.output_queue.put(output_frame) elif isinstance(input_frame, AudioFrame): self.output_queue.put(AudioOutput([input_frame])) - # TODO wire in a proper pipeline here + # TODO: wire in a proper pipeline here else: report_error(f"Unsupported input frame type {type(input_frame)}") except Exception as e: diff --git a/runner/app/live/trickle/frame.py b/runner/app/live/trickle/frame.py index 822c51bbb..5e3ccd353 100644 --- a/runner/app/live/trickle/frame.py +++ b/runner/app/live/trickle/frame.py @@ -1,8 +1,16 @@ import av from PIL import Image -from typing import List +from typing import List, Union import numpy as np +class SideData: + """ + Base class for side data, needed to keep it consistent with av frame side_data + """ + skipped: bool = True + # TODO: update input to be processed_input in comfystream + processed_input: Union[Image.Image, np.ndarray] = None + class InputFrame: """ Base class for a frame (either audio or video). @@ -13,6 +21,7 @@ class InputFrame: timestamp: int time_base: int log_timestamps: dict[str, float] = {} + side_data: SideData = SideData() def __init__(self): self.timestamp = av.AV_NOPTS_VALUE @@ -36,7 +45,9 @@ def __init__(self, image: Image.Image, timestamp: int, time_base: int, log_times self.log_timestamps = log_timestamps # Returns a copy of an existing VideoFrame with its image replaced def replace_image(self, image: Image.Image): - return VideoFrame(image, self.timestamp, self.time_base, self.log_timestamps) + new_frame = VideoFrame(image, self.timestamp, self.time_base, self.log_timestamps) + new_frame.side_data = self.side_data + return new_frame class AudioFrame(InputFrame): samples: np.ndarray From 252194f71b46f0fcc75ea3cf4f76355d261a9a2c Mon Sep 17 00:00:00 2001 From: Varshith B Date: Tue, 25 Mar 2025 00:52:11 +0530 Subject: [PATCH 06/36] fix: temp async --- runner/app/live/pipelines/comfyui.py | 5 - runner/app/live/pipelines/noop.py | 17 +++- runner/app/live/streamer/process.py | 146 ++++++++++++++------------- 3 files changed, 92 insertions(+), 76 deletions(-) diff --git a/runner/app/live/pipelines/comfyui.py b/runner/app/live/pipelines/comfyui.py index 260db2c32..56b7b19d9 100644 --- a/runner/app/live/pipelines/comfyui.py +++ b/runner/app/live/pipelines/comfyui.py @@ -305,11 +305,6 @@ async def get_processed_video_frame(self): result_image = Image.fromarray(result_image_np.cpu().numpy()) return VideoOutput(frame.replace_image(result_image)) - def process_frame(self, frame: VideoFrame) -> VideoOutput: - #TODO: this is a temporary solution, need to make this async throughout the pipeline - self.loop.run_until_complete(self.put_video_frame(frame)) - return self.loop.run_until_complete(self.get_processed_video_frame()) - def set_params(self, **params): new_params = ComfyUIParams(**params) logging.info(f"Setting ComfyUI Pipeline Prompt: {new_params.prompt}") diff --git a/runner/app/live/pipelines/noop.py b/runner/app/live/pipelines/noop.py index 27cf22eba..48835e151 100644 --- a/runner/app/live/pipelines/noop.py +++ b/runner/app/live/pipelines/noop.py @@ -1,5 +1,7 @@ -from PIL import Image import logging +import asyncio +from PIL import Image + from .interface import Pipeline from trickle import VideoFrame, VideoOutput @@ -7,9 +9,18 @@ class Noop(Pipeline): def __init__(self, **params): super().__init__(**params) + self.frame_queue = asyncio.Queue() - def process_frame(self, frame: VideoFrame) -> VideoOutput: - return VideoOutput(frame.replace_image(frame.image.convert("RGB"))) + async def put_video_frame(self, frame: VideoFrame): + await self.frame_queue.put(frame) + + async def get_video_frame(self) -> VideoOutput: + frame = await self.frame_queue.get() + processed_frame = frame.image.convert("RGB") + return VideoOutput(frame.replace_image(processed_frame)) def update_params(self, **params): logging.info(f"Updating params: {params}") + + async def stop(self): + logging.info("Stopping pipeline") diff --git a/runner/app/live/streamer/process.py b/runner/app/live/streamer/process.py index d3f1a1100..8baf03992 100644 --- a/runner/app/live/streamer/process.py +++ b/runner/app/live/streamer/process.py @@ -12,6 +12,7 @@ from log import config_logging, config_logging_fields, log_timing from trickle import InputFrame, AudioFrame, VideoFrame, OutputFrame, VideoOutput, AudioOutput +#TODO: figure out pro vs con for asyncio.run() vs asyncio.get_event_loop().run_until_complete() class PipelineProcess: @staticmethod @@ -74,6 +75,8 @@ def reset_stream(self, request_id: str, stream_id: str): # We internally use the param update queue to reset the logging configs self.param_update_queue.put({"request_id": request_id, "stream_id": stream_id}) + # TODO: Once audio is implemented, combined send_input with input_loop + # We don't need additional queueing as comfystream already maintains a queue def send_input(self, frame: InputFrame): self._queue_put_fifo(self.input_queue, frame) @@ -117,91 +120,98 @@ def process_loop(self): # ( load_gpu_models is calling logging.info() for every frame ) logging.getLogger("comfy").setLevel(logging.WARNING) - def report_error(error_msg: str): - error_event = { - "message": error_msg, - "timestamp": time.time() - } - logging.error(error_msg) - self._queue_put_fifo(self.error_queue, error_event) - - def _handle_logging_params(params: dict) -> dict: - if isinstance(params, dict) and "request_id" in params and "stream_id" in params: - logging.info(f"PipelineProcess: Resetting logging fields with request_id={params['request_id']}, stream_id={params['stream_id']}") - self._reset_logging_fields( - params["request_id"], params["stream_id"] - ) - return {} - return params + try: + pipeline = self._initialize_pipeline() + asyncio.run(self._run_pipeline_loops(pipeline)) + except Exception as e: + self._report_error(f"Error in process run method: {e}") + finally: + self._cleanup_pipeline(pipeline) + def _handle_logging_params(self, params: dict) -> dict: + if isinstance(params, dict) and "request_id" in params and "stream_id" in params: + logging.info(f"PipelineProcess: Resetting logging fields with request_id={params['request_id']}, stream_id={params['stream_id']}") + self._reset_logging_fields( + params["request_id"], params["stream_id"] + ) + return False + return True + + def _initialize_pipeline(self): try: params = {} try: params = self.param_update_queue.get_nowait() - params = _handle_logging_params(params) + if self._handle_logging_params(params): + with log_timing(f"PipelineProcess: Pipeline loading with {params}"): + return load_pipeline(self.pipeline_name, **params) + except queue.Empty: + with log_timing("PipelineProcess: Pipeline loading with default params"): + return load_pipeline(self.pipeline_name) + except Exception as e: + self._report_error(f"Error loading pipeline: {e}") + raise + + async def _run_pipeline_loops(self, pipeline): + input_task = asyncio.create_task(self._input_loop(pipeline)) + output_task = asyncio.create_task(self._output_loop(pipeline)) + param_task = asyncio.create_task(self._param_update_loop(pipeline)) + + try: + await asyncio.gather(input_task, output_task, param_task) + except Exception as e: + self._report_error(f"Error in pipeline loops: {e}") + raise + + async def _input_loop(self, pipeline): + while not self.is_done(): + try: + input_frame = await asyncio.to_thread(self.input_queue.get, timeout=0.1) + if isinstance(input_frame, VideoFrame): + input_frame.log_timestamps["pre_process_frame"] = time.time() + await pipeline.put_video_frame(input_frame) + elif isinstance(input_frame, AudioFrame): + self.output_queue.put(AudioOutput([input_frame])) except queue.Empty: logging.info("PipelineProcess: No params found in param_update_queue, loading with default params") + continue except Exception as e: - report_error(f"Error getting params: {e}") + self._report_error(f"Error processing input frame: {e}") + async def _output_loop(self, pipeline): + while not self.is_done(): try: - with log_timing(f"PipelineProcess: Pipeline loading with {params}"): - pipeline = load_pipeline(self.pipeline_name, **params) + output_frame = await pipeline.get_processed_video_frame() + output_frame.log_timestamps["post_process_frame"] = time.time() + await asyncio.to_thread(self.output_queue.put, output_frame) except Exception as e: - report_error(f"Error loading pipeline: {e}") - if not params: - report_error(f"Pipeline failed to load with default params") - raise - - try: - with log_timing(f"PipelineProcess: Pipeline loading with default params due to error with params: {params}"): - pipeline = load_pipeline(self.pipeline_name) - except Exception as e: - report_error(f"Error loading pipeline with default params: {e}") - raise - - while not self.is_done(): - logging.debug("PipelineProcess: Entering main loop") - while not self.param_update_queue.empty(): - params = self.param_update_queue.get_nowait() - try: - logging.info(f"PipelineProcess: Processing parameter update from queue: {params}") - if _handle_logging_params(params): - logging.info(f"PipelineProcess: Updating pipeline parameters") - pipeline.update_params(**params) - logging.info(f"PipelineProcess: Successfully applied params to pipeline: {params}") - except Exception as e: - error_msg = f"Error updating params: {str(e)}" - logging.error(error_msg, exc_info=True) - report_error(error_msg) + self._report_error(f"Error processing output frame: {e}") + await asyncio.sleep(0.01) - try: - input_frame = self.input_queue.get(timeout=0.1) - except queue.Empty: - continue + async def _param_update_loop(self, pipeline): + while not self.is_done(): + try: + params = self.param_update_queue.get_nowait() + if self._handle_logging_params(params): + logging.info(f"PipelineProcess: Updating pipeline parameters: {params}") + pipeline.update_params(**params) + except queue.Empty: + await asyncio.sleep(0.1) + except Exception as e: + self._report_error(f"Error updating params: {e}") - try: - if isinstance(input_frame, VideoFrame): - input_frame.log_timestamps["pre_process_frame"] = time.time() - output_frame = pipeline.process_frame(input_frame) - output_frame.log_timestamps["post_process_frame"] = time.time() - self.output_queue.put(output_frame) - elif isinstance(input_frame, AudioFrame): - self.output_queue.put(AudioOutput([input_frame])) - # TODO: wire in a proper pipeline here - else: - report_error(f"Unsupported input frame type {type(input_frame)}") - except Exception as e: - report_error(f"Error processing frame: {e}") - except Exception as e: - report_error(f"Error in process run method: {e}") - finally: - self._cleanup_pipeline(pipeline) + def _report_error(self, error_msg: str): + error_event = { + "message": error_msg, + "timestamp": time.time() + } + logging.error(error_msg) + self._queue_put_fifo(self.error_queue, error_event) def _cleanup_pipeline(self, pipeline): if pipeline is not None: try: - asyncio.get_event_loop().run_until_complete(pipeline.stop()) + asyncio.run(pipeline.stop()) except Exception as e: logging.error(f"Error stopping pipeline: {e}") From fb9fd08975a9df58e7c72bde7a71c8d5ead49a1f Mon Sep 17 00:00:00 2001 From: Varshith B Date: Tue, 25 Mar 2025 19:17:21 +0530 Subject: [PATCH 07/36] fix: noop test --- runner/app/live/pipelines/comfyui.py | 20 +++++++++----------- runner/app/live/pipelines/interface.py | 26 +++++++++++++++++++++----- runner/app/live/pipelines/noop.py | 10 ++++++++-- runner/app/live/streamer/process.py | 3 ++- runner/run-lv2v.sh | 24 +++++++++--------------- 5 files changed, 49 insertions(+), 34 deletions(-) diff --git a/runner/app/live/pipelines/comfyui.py b/runner/app/live/pipelines/comfyui.py index 56b7b19d9..bbf9ccd2f 100644 --- a/runner/app/live/pipelines/comfyui.py +++ b/runner/app/live/pipelines/comfyui.py @@ -274,17 +274,15 @@ def __init__(self, **params): self.video_incoming_frames = asyncio.Queue() - self.loop = asyncio.new_event_loop() - self.set_params(**params) - self.warm_video() + asyncio.run(self.set_params(**params)) - def warm_video(self): + async def warm_video(self): dummy_frame = VideoFrame(None, 0, 0) dummy_frame.side_data.processed_input = torch.randn(1, 512, 512, 3) for _ in range(WARMUP_RUNS): self.client.put_video_input(dummy_frame) - _ = self.loop.run_until_complete(self.client.get_video_output()) + _ = await self.client.get_video_output() logging.info("Video frame warmup done") async def put_video_frame(self, frame: VideoFrame): @@ -305,21 +303,21 @@ async def get_processed_video_frame(self): result_image = Image.fromarray(result_image_np.cpu().numpy()) return VideoOutput(frame.replace_image(result_image)) - def set_params(self, **params): + async def set_params(self, **params): new_params = ComfyUIParams(**params) logging.info(f"Setting ComfyUI Pipeline Prompt: {new_params.prompt}") # TODO: currently its a single prompt, but need to support multiple prompts - self.loop.run_until_complete(self.client.set_prompts([new_params.prompt])) + await self.client.set_prompts([new_params.prompt]) self.params = new_params - def update_params(self, **params): + async def update_params(self, **params): new_params = ComfyUIParams(**params) logging.info(f"Updating ComfyUI Pipeline Prompt: {new_params.prompt}") # TODO: currently its a single prompt, but need to support multiple prompts - self.loop.run_until_complete(self.client.update_prompts([new_params.prompt])) + await self.client.update_prompts([new_params.prompt]) self.params = new_params - def stop(self): + async def stop(self): logging.info("Stopping ComfyUI pipeline") - self.loop.run_until_complete(self.client.stop()) + await self.client.stop() logging.info("ComfyUI pipeline stopped") diff --git a/runner/app/live/pipelines/interface.py b/runner/app/live/pipelines/interface.py index 4e19e90ad..906610e43 100644 --- a/runner/app/live/pipelines/interface.py +++ b/runner/app/live/pipelines/interface.py @@ -23,13 +23,17 @@ def __init__(self, **params): pass @abstractmethod - def process_frame(self, frame: VideoFrame) -> VideoOutput: - """Process a single frame through the pipeline. - - Called sequentially with each frame from the stream. + async def put_video_frame(self, frame: VideoFrame): + """Put a frame into the pipeline. Args: frame: Input VideoFrame + """ + pass + + @abstractmethod + async def get_processed_video_frame(self) -> VideoOutput: + """Get a processed frame from the pipeline. Returns: Processed VideoFrame @@ -37,7 +41,19 @@ def process_frame(self, frame: VideoFrame) -> VideoOutput: pass @abstractmethod - def update_params(self, **params): + async def set_params(self, **params): + """Set pipeline parameters initally. + + Must maintain valid state on success or restore previous state on failure. + set_params starts the prompt loops in comfystream. + + Args: + **params: Implementation-specific parameters + """ + pass + + @abstractmethod + async def update_params(self, **params): """Update pipeline parameters. Must maintain valid state on success or restore previous state on failure. diff --git a/runner/app/live/pipelines/noop.py b/runner/app/live/pipelines/noop.py index 48835e151..77659f63f 100644 --- a/runner/app/live/pipelines/noop.py +++ b/runner/app/live/pipelines/noop.py @@ -14,12 +14,18 @@ def __init__(self, **params): async def put_video_frame(self, frame: VideoFrame): await self.frame_queue.put(frame) - async def get_video_frame(self) -> VideoOutput: + async def get_processed_video_frame(self) -> VideoOutput: frame = await self.frame_queue.get() processed_frame = frame.image.convert("RGB") return VideoOutput(frame.replace_image(processed_frame)) + + async def warm_video(self): + logging.info("Warming video") - def update_params(self, **params): + async def set_params(self, **params): + logging.info(f"Setting params: {params}") + + async def update_params(self, **params): logging.info(f"Updating params: {params}") async def stop(self): diff --git a/runner/app/live/streamer/process.py b/runner/app/live/streamer/process.py index 8baf03992..20278fd50 100644 --- a/runner/app/live/streamer/process.py +++ b/runner/app/live/streamer/process.py @@ -153,6 +153,7 @@ def _initialize_pipeline(self): raise async def _run_pipeline_loops(self, pipeline): + await pipeline.warm_video() input_task = asyncio.create_task(self._input_loop(pipeline)) output_task = asyncio.create_task(self._output_loop(pipeline)) param_task = asyncio.create_task(self._param_update_loop(pipeline)) @@ -194,7 +195,7 @@ async def _param_update_loop(self, pipeline): params = self.param_update_queue.get_nowait() if self._handle_logging_params(params): logging.info(f"PipelineProcess: Updating pipeline parameters: {params}") - pipeline.update_params(**params) + await pipeline.update_params(**params) except queue.Empty: await asyncio.sleep(0.1) except Exception as e: diff --git a/runner/run-lv2v.sh b/runner/run-lv2v.sh index e69a54720..6217681e2 100755 --- a/runner/run-lv2v.sh +++ b/runner/run-lv2v.sh @@ -1,15 +1,8 @@ #!/bin/bash set -ex -if [ $# -lt 2 ]; then - echo "Usage: $0 " - exit 1 -fi - -INPUT_ROOM=$1 -OUTPUT_ROOM=$2 -PIPELINE=${3:-comfyui} -PORT=${4:-9000} +PIPELINE=noop +PORT=8900 # Build images, this will be quick if everything is cached docker build -t livepeer/ai-runner:live-base -f docker/Dockerfile.live-base . @@ -24,7 +17,6 @@ CONTAINER_NAME=live-video-to-video-${PIPELINE} docker run -it --rm --name ${CONTAINER_NAME} \ -e PIPELINE=live-video-to-video \ -e MODEL_ID=${PIPELINE} \ - --gpus all \ -p ${PORT}:8000 \ -v ./models:/models \ livepeer/ai-runner:live-app-${PIPELINE} 2>&1 | tee ./run-lv2v.log & @@ -39,14 +31,16 @@ echo "Waiting for server to start..." while ! grep -aq "Uvicorn running" ./run-lv2v.log; do sleep 1 done -sleep 2 -echo "Starting pipeline from ${INPUT_ROOM} to ${OUTPUT_ROOM}..." +sleep 5 set -x -curl -vvv http://localhost:${PORT}/live-video-to-video/ \ - -H 'Content-Type: application/json' \ - -d "{\"publish_url\":\"https://wwgcyxykwg9dys.transfix.ai/trickle/${OUTPUT_ROOM}\",\"subscribe_url\":\"https://wwgcyxykwg9dys.transfix.ai/trickle/${INPUT_ROOM}\"}" +curl --location 'http://127.0.0.1:8900/live-video-to-video/' \ +--header 'Content-Type: application/json' \ +--data '{ + "subscribe_url": "http://172.17.0.1:3389/sample", + "publish_url": "http://172.17.0.1:3389/sample-out" +}' # let docker container take over wait $DOCKER_PID From c4d6945c7baf9d3c11e319640e7a03df8db7b808 Mon Sep 17 00:00:00 2001 From: Rafal Leszko Date: Tue, 25 Mar 2025 18:19:25 +0000 Subject: [PATCH 08/36] fix: noop works --- runner/app/live/streamer/process.py | 18 +++++++++--------- runner/app/live/streamer/streamer.py | 2 +- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/runner/app/live/streamer/process.py b/runner/app/live/streamer/process.py index 20278fd50..1bcc5289e 100644 --- a/runner/app/live/streamer/process.py +++ b/runner/app/live/streamer/process.py @@ -12,7 +12,6 @@ from log import config_logging, config_logging_fields, log_timing from trickle import InputFrame, AudioFrame, VideoFrame, OutputFrame, VideoOutput, AudioOutput -#TODO: figure out pro vs con for asyncio.run() vs asyncio.get_event_loop().run_until_complete() class PipelineProcess: @staticmethod @@ -134,20 +133,22 @@ def _handle_logging_params(self, params: dict) -> dict: self._reset_logging_fields( params["request_id"], params["stream_id"] ) - return False - return True + return {} + return params def _initialize_pipeline(self): try: params = {} try: params = self.param_update_queue.get_nowait() - if self._handle_logging_params(params): - with log_timing(f"PipelineProcess: Pipeline loading with {params}"): - return load_pipeline(self.pipeline_name, **params) + logging.info(f"PipelineProcess: Got params from param_update_queue {params}") + params = self._handle_logging_params(params) except queue.Empty: - with log_timing("PipelineProcess: Pipeline loading with default params"): - return load_pipeline(self.pipeline_name) + logging.info("PipelineProcess: No params found in param_update_queue, loading with default params") + + with log_timing(f"PipelineProcess: Pipeline loading with {params}"): + return load_pipeline(self.pipeline_name, **params) + except Exception as e: self._report_error(f"Error loading pipeline: {e}") raise @@ -187,7 +188,6 @@ async def _output_loop(self, pipeline): await asyncio.to_thread(self.output_queue.put, output_frame) except Exception as e: self._report_error(f"Error processing output frame: {e}") - await asyncio.sleep(0.01) async def _param_update_loop(self, pipeline): while not self.is_done(): diff --git a/runner/app/live/streamer/streamer.py b/runner/app/live/streamer/streamer.py index 822354815..d5ca91d7b 100644 --- a/runner/app/live/streamer/streamer.py +++ b/runner/app/live/streamer/streamer.py @@ -55,10 +55,10 @@ async def start(self, params: dict): run_in_background("ingress_loop", self.run_ingress_loop()), run_in_background("egress_loop", self.run_egress_loop()), run_in_background("report_status_loop", self.report_status_loop()), - run_in_background("control_loop", self.run_control_loop()), ] # auxiliary tasks that are not critical to the supervisor, but which we want to run self.auxiliary_tasks = [ + run_in_background("control_loop", self.run_control_loop()), ] self.tasks_supervisor_task = run_in_background( "tasks_supervisor", self.tasks_supervisor() From 93d5542c03b8f7b6c5bc7cd46bb35bd1d082c26e Mon Sep 17 00:00:00 2001 From: Elite Encoder Date: Tue, 25 Mar 2025 15:26:37 +0000 Subject: [PATCH 09/36] use dedicated base image for integration work --- runner/docker/Dockerfile.live-base-comfyui | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runner/docker/Dockerfile.live-base-comfyui b/runner/docker/Dockerfile.live-base-comfyui index 9a1d25f43..17bff3a48 100644 --- a/runner/docker/Dockerfile.live-base-comfyui +++ b/runner/docker/Dockerfile.live-base-comfyui @@ -1,4 +1,4 @@ -ARG BASE_IMAGE=livepeer/comfyui-base:latest +ARG BASE_IMAGE=livepeer/comfyui-base:v0.0.4-runner FROM ${BASE_IMAGE} # ----------------------------------------------------------------------------- From 4a8502dec077176b3fd961878324126ac1abcfa2 Mon Sep 17 00:00:00 2001 From: Elite Encoder Date: Tue, 25 Mar 2025 22:21:52 +0000 Subject: [PATCH 10/36] defer async initialization --- runner/app/live/pipelines/comfyui.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/runner/app/live/pipelines/comfyui.py b/runner/app/live/pipelines/comfyui.py index bbf9ccd2f..0025af1cd 100644 --- a/runner/app/live/pipelines/comfyui.py +++ b/runner/app/live/pipelines/comfyui.py @@ -274,7 +274,16 @@ def __init__(self, **params): self.video_incoming_frames = asyncio.Queue() - asyncio.run(self.set_params(**params)) + # Defer async initialization + loop = asyncio.get_event_loop() + if loop.is_running(): + self._async_init_task = asyncio.create_task(self._async_init(params)) + else: + loop.run_until_complete(self._async_init(params)) + + async def _async_init(self, params): + """Perform async initialization.""" + await self.set_params(**params) async def warm_video(self): dummy_frame = VideoFrame(None, 0, 0) From 2dd12ff47833ff8ff7e6b4db2a8733c55d60206c Mon Sep 17 00:00:00 2001 From: Varshith B Date: Wed, 26 Mar 2025 17:45:26 +0000 Subject: [PATCH 11/36] fix: event loop bug --- runner/app/live/pipelines/comfyui.py | 18 ++------------- runner/app/live/pipelines/loader.py | 6 ++--- runner/app/live/pipelines/noop.py | 3 +-- runner/app/live/streamer/process.py | 33 ++++++++++++++++++---------- 4 files changed, 27 insertions(+), 33 deletions(-) diff --git a/runner/app/live/pipelines/comfyui.py b/runner/app/live/pipelines/comfyui.py index 0025af1cd..6b8245321 100644 --- a/runner/app/live/pipelines/comfyui.py +++ b/runner/app/live/pipelines/comfyui.py @@ -265,26 +265,12 @@ def validate_prompt(cls, v) -> dict: class ComfyUI(Pipeline): - def __init__(self, **params): - super().__init__(**params) - + def __init__(self): comfy_ui_workspace = os.getenv(COMFY_UI_WORKSPACE_ENV) - self.client = ComfyStreamClient(cwd=comfy_ui_workspace, **params) + self.client = ComfyStreamClient(cwd=comfy_ui_workspace) self.params: ComfyUIParams - self.video_incoming_frames = asyncio.Queue() - # Defer async initialization - loop = asyncio.get_event_loop() - if loop.is_running(): - self._async_init_task = asyncio.create_task(self._async_init(params)) - else: - loop.run_until_complete(self._async_init(params)) - - async def _async_init(self, params): - """Perform async initialization.""" - await self.set_params(**params) - async def warm_video(self): dummy_frame = VideoFrame(None, 0, 0) dummy_frame.side_data.processed_input = torch.randn(1, 512, 512, 3) diff --git a/runner/app/live/pipelines/loader.py b/runner/app/live/pipelines/loader.py index 53b523777..41b2f2617 100644 --- a/runner/app/live/pipelines/loader.py +++ b/runner/app/live/pipelines/loader.py @@ -1,10 +1,10 @@ from .interface import Pipeline -def load_pipeline(name: str, **params) -> Pipeline: +def load_pipeline(name: str) -> Pipeline: if name == "comfyui": from .comfyui import ComfyUI - return ComfyUI(**params) + return ComfyUI() elif name == "noop": from .noop import Noop - return Noop(**params) + return Noop() raise ValueError(f"Unknown pipeline: {name}") diff --git a/runner/app/live/pipelines/noop.py b/runner/app/live/pipelines/noop.py index 77659f63f..3f3605b63 100644 --- a/runner/app/live/pipelines/noop.py +++ b/runner/app/live/pipelines/noop.py @@ -7,8 +7,7 @@ from trickle import VideoFrame, VideoOutput class Noop(Pipeline): - def __init__(self, **params): - super().__init__(**params) + def __init__(self): self.frame_queue = asyncio.Queue() async def put_video_frame(self, frame: VideoFrame): diff --git a/runner/app/live/streamer/process.py b/runner/app/live/streamer/process.py index 1bcc5289e..77e389353 100644 --- a/runner/app/live/streamer/process.py +++ b/runner/app/live/streamer/process.py @@ -120,12 +120,10 @@ def process_loop(self): logging.getLogger("comfy").setLevel(logging.WARNING) try: - pipeline = self._initialize_pipeline() - asyncio.run(self._run_pipeline_loops(pipeline)) + asyncio.run(self._run_pipeline_loops()) except Exception as e: self._report_error(f"Error in process run method: {e}") - finally: - self._cleanup_pipeline(pipeline) + def _handle_logging_params(self, params: dict) -> dict: if isinstance(params, dict) and "request_id" in params and "stream_id" in params: @@ -136,7 +134,7 @@ def _handle_logging_params(self, params: dict) -> dict: return {} return params - def _initialize_pipeline(self): + async def _initialize_pipeline(self): try: params = {} try: @@ -147,13 +145,23 @@ def _initialize_pipeline(self): logging.info("PipelineProcess: No params found in param_update_queue, loading with default params") with log_timing(f"PipelineProcess: Pipeline loading with {params}"): - return load_pipeline(self.pipeline_name, **params) - + pipeline = load_pipeline(self.pipeline_name) + await pipeline.set_params(**params) + return pipeline except Exception as e: self._report_error(f"Error loading pipeline: {e}") - raise - - async def _run_pipeline_loops(self, pipeline): + if params: + try: + with log_timing(f"PipelineProcess: Pipeline loading with default params due to error with params: {params}"): + pipeline = load_pipeline(self.pipeline_name) + await pipeline.set_params() + return pipeline + except Exception as e: + self._report_error(f"Error loading pipeline with default params: {e}") + raise + + async def _run_pipeline_loops(self): + pipeline = await self._initialize_pipeline() await pipeline.warm_video() input_task = asyncio.create_task(self._input_loop(pipeline)) output_task = asyncio.create_task(self._output_loop(pipeline)) @@ -163,6 +171,7 @@ async def _run_pipeline_loops(self, pipeline): await asyncio.gather(input_task, output_task, param_task) except Exception as e: self._report_error(f"Error in pipeline loops: {e}") + await self._cleanup_pipeline(pipeline) raise async def _input_loop(self, pipeline): @@ -209,10 +218,10 @@ def _report_error(self, error_msg: str): logging.error(error_msg) self._queue_put_fifo(self.error_queue, error_event) - def _cleanup_pipeline(self, pipeline): + async def _cleanup_pipeline(self, pipeline): if pipeline is not None: try: - asyncio.run(pipeline.stop()) + await pipeline.stop() except Exception as e: logging.error(f"Error stopping pipeline: {e}") From 07a369d55595be2d8b2b9766f1a4ea255cf44365 Mon Sep 17 00:00:00 2001 From: Elite Encoder Date: Wed, 26 Mar 2025 14:31:24 -0400 Subject: [PATCH 12/36] remove extra log --- runner/app/live/streamer/process.py | 1 - 1 file changed, 1 deletion(-) diff --git a/runner/app/live/streamer/process.py b/runner/app/live/streamer/process.py index 77e389353..0563b6c0a 100644 --- a/runner/app/live/streamer/process.py +++ b/runner/app/live/streamer/process.py @@ -184,7 +184,6 @@ async def _input_loop(self, pipeline): elif isinstance(input_frame, AudioFrame): self.output_queue.put(AudioOutput([input_frame])) except queue.Empty: - logging.info("PipelineProcess: No params found in param_update_queue, loading with default params") continue except Exception as e: self._report_error(f"Error processing input frame: {e}") From 1660b4d6772fbfd70f757ba0fa350d22a445fe61 Mon Sep 17 00:00:00 2001 From: Elite Encoder Date: Thu, 27 Mar 2025 14:37:42 +0000 Subject: [PATCH 13/36] fix requirements install --- .devcontainer/devcontainer.json | 17 ++++++++++------- runner/docker/Dockerfile.live-app__PIPELINE__ | 12 +++++++----- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index b9f8c5cdc..bf3abf84b 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -4,6 +4,9 @@ "name": "ai-runner", "initializeCommand": "ls", // Image to use for the dev container. More info: https://containers.dev/guide/dockerfile. + "containerEnv": { + "PIPELINE": "comfyui" + }, "build": { "dockerfile": "../runner/docker/Dockerfile.live-app__PIPELINE__", "args": { @@ -20,13 +23,13 @@ // Configure tool-specific properties. "customizations": { "vscode": { - "settings": { - "python.defaultInterpreterPath": "/workspace/miniconda3/envs/comfystream/bin/python", - "python.venvPath": "/workspace/miniconda3/envs", - "python.terminal.activateEnvInCurrentTerminal": false, - "python.terminal.activateEnvironment": true, - "terminal.integrated.shellIntegration.enabled": true - }, + "settings": { + "python.defaultInterpreterPath": "/workspace/miniconda3/envs/comfystream/bin/python", + "python.venvPath": "/workspace/miniconda3/envs", + "python.terminal.activateEnvInCurrentTerminal": false, + "python.terminal.activateEnvironment": true, + "terminal.integrated.shellIntegration.enabled": true + }, "extensions": [ "ms-python.python", "ms-python.black-formatter" diff --git a/runner/docker/Dockerfile.live-app__PIPELINE__ b/runner/docker/Dockerfile.live-app__PIPELINE__ index 60d266e08..5e442d831 100644 --- a/runner/docker/Dockerfile.live-app__PIPELINE__ +++ b/runner/docker/Dockerfile.live-app__PIPELINE__ @@ -1,9 +1,8 @@ ARG PIPELINE=comfyui ARG BASE_IMAGE=livepeer/ai-runner:live-base-${PIPELINE} FROM ${BASE_IMAGE} -#TODO: Set environment variable from build argument -# ENV PIPELINE=${PIPELINE} -ENV PIPELINE=comfyui +ARG PIPELINE +ENV PIPELINE=${PIPELINE} # Install latest stable Go version and system dependencies RUN apt-get update && apt-get install -y \ @@ -14,9 +13,12 @@ RUN apt-get update && apt-get install -y \ && apt-get clean && rm -rf /var/lib/apt/lists/* # Install any additional Python packages -#TODO: Update the other files to use this method to find requirements.txt files, renaming the existing ones to requirements.pipeline.txt - COPY requirements.live.${PIPELINE}.txt /app/requirements.txt + +# Install requirements in normal env +RUN pip install --no-cache-dir -r /app/requirements.txt; + +# Also install in conda env if needed RUN if [ "$PIPELINE" = "comfyui" ]; then \ conda run -n comfystream pip install --no-cache-dir -r /app/requirements.txt; \ else \ From a00e4c011e82b3a7817207345259d93016f048c1 Mon Sep 17 00:00:00 2001 From: Elite Encoder Date: Thu, 27 Mar 2025 14:38:30 +0000 Subject: [PATCH 14/36] revert whitespace in devcontainer.json --- .devcontainer/devcontainer.json | 90 ++++++++++++++++----------------- 1 file changed, 45 insertions(+), 45 deletions(-) diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index bf3abf84b..dc5a5eb03 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -1,52 +1,52 @@ // For format details, see https://aka.ms/devcontainer.json. For config options, see the // README at: https://github.com/devcontainers/templates/tree/main/src/ubuntu { - "name": "ai-runner", - "initializeCommand": "ls", - // Image to use for the dev container. More info: https://containers.dev/guide/dockerfile. - "containerEnv": { - "PIPELINE": "comfyui" - }, - "build": { - "dockerfile": "../runner/docker/Dockerfile.live-app__PIPELINE__", - "args": { + "name": "ai-runner", + "initializeCommand": "ls", + // Image to use for the dev container. More info: https://containers.dev/guide/dockerfile. + "containerEnv": { "PIPELINE": "comfyui" }, - // "dockerfile": "../Dockerfile", - // "dockerfile": "../docker/Dockerfile.text_to_speech", - "context": "../runner" - }, - "runArgs": [ - "--gpus=all" - ], - // Features to add to the dev container. More info: https://containers.dev/features. - // Configure tool-specific properties. - "customizations": { - "vscode": { - "settings": { - "python.defaultInterpreterPath": "/workspace/miniconda3/envs/comfystream/bin/python", - "python.venvPath": "/workspace/miniconda3/envs", - "python.terminal.activateEnvInCurrentTerminal": false, - "python.terminal.activateEnvironment": true, - "terminal.integrated.shellIntegration.enabled": true + "build": { + "dockerfile": "../runner/docker/Dockerfile.live-app__PIPELINE__", + "args": { + "PIPELINE": "comfyui" }, - "extensions": [ - "ms-python.python", - "ms-python.black-formatter" - ] - } - }, - // Use 'forwardPorts' to make a list of ports inside the container available locally. - "forwardPorts": [ - 8000 - ], - "appPort": [ - "8000:8000" - ], - // Use 'mounts' to make a list of local folders available inside the container. - "mounts": [ - // "source=${localWorkspaceFolder}/models,target=/models,type=bind" - "source=${localEnv:HOME}/models/ComfyUI--models/,target=/workspace/ComfyUI/models,type=bind", - "source=${localEnv:HOME}/models/ComfyUI--output/,target=/workspace/ComfyUI/output,type=bind" - ] + // "dockerfile": "../Dockerfile", + // "dockerfile": "../docker/Dockerfile.text_to_speech", + "context": "../runner" + }, + "runArgs": [ + "--gpus=all" + ], + // Features to add to the dev container. More info: https://containers.dev/features. + // Configure tool-specific properties. + "customizations": { + "vscode": { + "settings": { + "python.defaultInterpreterPath": "/workspace/miniconda3/envs/comfystream/bin/python", + "python.venvPath": "/workspace/miniconda3/envs", + "python.terminal.activateEnvInCurrentTerminal": false, + "python.terminal.activateEnvironment": true, + "terminal.integrated.shellIntegration.enabled": true + }, + "extensions": [ + "ms-python.python", + "ms-python.black-formatter" + ] + } + }, + // Use 'forwardPorts' to make a list of ports inside the container available locally. + "forwardPorts": [ + 8000 + ], + "appPort": [ + "8000:8000" + ], + // Use 'mounts' to make a list of local folders available inside the container. + "mounts": [ + // "source=${localWorkspaceFolder}/models,target=/models,type=bind" + "source=${localEnv:HOME}/models/ComfyUI--models/,target=/workspace/ComfyUI/models,type=bind", + "source=${localEnv:HOME}/models/ComfyUI--output/,target=/workspace/ComfyUI/output,type=bind" + ] } From d2509eabb1867274da0cce63bb9bf061914574a2 Mon Sep 17 00:00:00 2001 From: Elite Encoder Date: Thu, 27 Mar 2025 16:24:58 +0000 Subject: [PATCH 15/36] fix requirements and env activation, revert processed_input change for load_tensor --- runner/app/live/pipelines/comfyui.py | 4 ++-- runner/app/live/trickle/frame.py | 3 +-- runner/docker/Dockerfile.live-app__PIPELINE__ | 16 +++++++++++----- runner/docker/Dockerfile.live-base-comfyui | 2 +- runner/requirements.live.comfyui.txt | 2 -- 5 files changed, 15 insertions(+), 12 deletions(-) diff --git a/runner/app/live/pipelines/comfyui.py b/runner/app/live/pipelines/comfyui.py index 6b8245321..cf592aec3 100644 --- a/runner/app/live/pipelines/comfyui.py +++ b/runner/app/live/pipelines/comfyui.py @@ -273,7 +273,7 @@ def __init__(self): async def warm_video(self): dummy_frame = VideoFrame(None, 0, 0) - dummy_frame.side_data.processed_input = torch.randn(1, 512, 512, 3) + dummy_frame.side_data.input = torch.randn(1, 512, 512, 3) for _ in range(WARMUP_RUNS): self.client.put_video_input(dummy_frame) @@ -282,7 +282,7 @@ async def warm_video(self): async def put_video_frame(self, frame: VideoFrame): image_np = np.array(frame.image.convert("RGB")).astype(np.float32) / 255.0 - frame.side_data.processed_input = torch.tensor(image_np).unsqueeze(0) + frame.side_data.input = torch.tensor(image_np).unsqueeze(0) frame.side_data.skipped = True self.client.put_video_input(frame) await self.video_incoming_frames.put(frame) diff --git a/runner/app/live/trickle/frame.py b/runner/app/live/trickle/frame.py index 5e3ccd353..4d2936130 100644 --- a/runner/app/live/trickle/frame.py +++ b/runner/app/live/trickle/frame.py @@ -8,8 +8,7 @@ class SideData: Base class for side data, needed to keep it consistent with av frame side_data """ skipped: bool = True - # TODO: update input to be processed_input in comfystream - processed_input: Union[Image.Image, np.ndarray] = None + input: Union[Image.Image, np.ndarray] = None class InputFrame: """ diff --git a/runner/docker/Dockerfile.live-app__PIPELINE__ b/runner/docker/Dockerfile.live-app__PIPELINE__ index 5e442d831..55f08cf9e 100644 --- a/runner/docker/Dockerfile.live-app__PIPELINE__ +++ b/runner/docker/Dockerfile.live-app__PIPELINE__ @@ -13,17 +13,17 @@ RUN apt-get update && apt-get install -y \ && apt-get clean && rm -rf /var/lib/apt/lists/* # Install any additional Python packages -COPY requirements.live.${PIPELINE}.txt /app/requirements.txt +COPY requirements.live-ai.txt /app/requirements.txt +COPY requirements.live.comfyui.txt /app/requirements.comfyui.txt # Install requirements in normal env RUN pip install --no-cache-dir -r /app/requirements.txt; # Also install in conda env if needed RUN if [ "$PIPELINE" = "comfyui" ]; then \ - conda run -n comfystream pip install --no-cache-dir -r /app/requirements.txt; \ -else \ - pip install --no-cache-dir -r /app/requirements.txt; \ + conda run -n comfystream pip install -r /app/requirements.comfyui.txt; \ fi + # TODO: Figure out a way to have this in requirements file RUN pip install --no-cache-dir triton==3.1.0 RUN pip uninstall -y onnx onnxruntime onnxruntime-gpu @@ -45,4 +45,10 @@ WORKDIR /app ARG GIT_SHA ENV GIT_SHA=$GIT_SHA -CMD ["uvicorn", "app.main:app", "--log-config", "app/cfg/uvicorn_logging_config.json", "--host", "", "--port", "8000"] +#TODO if this dockerfile will support other pipelines, we need to not always activate comfystream +RUN echo "source /workspace/miniconda3/etc/profile.d/conda.sh && conda activate comfystream" >> /conda_activate.sh && \ + chmod +x /conda_activate.sh +ENV BASH_ENV=/conda_activate.sh +SHELL ["/bin/bash", "-c"] + +CMD ["bash", "-c", "uvicorn app.main:app --log-config app/cfg/uvicorn_logging_config.json --host '' --port 8000"] diff --git a/runner/docker/Dockerfile.live-base-comfyui b/runner/docker/Dockerfile.live-base-comfyui index 17bff3a48..3a637358e 100644 --- a/runner/docker/Dockerfile.live-base-comfyui +++ b/runner/docker/Dockerfile.live-base-comfyui @@ -1,4 +1,4 @@ -ARG BASE_IMAGE=livepeer/comfyui-base:v0.0.4-runner +ARG BASE_IMAGE=livepeer/comfyui-base:feat-runner-integration FROM ${BASE_IMAGE} # ----------------------------------------------------------------------------- diff --git a/runner/requirements.live.comfyui.txt b/runner/requirements.live.comfyui.txt index 2dc468d6f..4c87b0b58 100644 --- a/runner/requirements.live.comfyui.txt +++ b/runner/requirements.live.comfyui.txt @@ -1,5 +1,3 @@ -accelerate==0.30.1 -transformers==4.43.3 fastapi==0.111.0 pydantic>=2.7.2 Pillow==10.3.0 From d118b711d4efc622426fd00354cc9c8d76a5db49 Mon Sep 17 00:00:00 2001 From: Elite Encoder Date: Thu, 27 Mar 2025 13:57:17 -0400 Subject: [PATCH 16/36] remove unnecessary shell cmd --- runner/docker/Dockerfile.live-app__PIPELINE__ | 1 - 1 file changed, 1 deletion(-) diff --git a/runner/docker/Dockerfile.live-app__PIPELINE__ b/runner/docker/Dockerfile.live-app__PIPELINE__ index 55f08cf9e..70534534b 100644 --- a/runner/docker/Dockerfile.live-app__PIPELINE__ +++ b/runner/docker/Dockerfile.live-app__PIPELINE__ @@ -49,6 +49,5 @@ ENV GIT_SHA=$GIT_SHA RUN echo "source /workspace/miniconda3/etc/profile.d/conda.sh && conda activate comfystream" >> /conda_activate.sh && \ chmod +x /conda_activate.sh ENV BASH_ENV=/conda_activate.sh -SHELL ["/bin/bash", "-c"] CMD ["bash", "-c", "uvicorn app.main:app --log-config app/cfg/uvicorn_logging_config.json --host '' --port 8000"] From e3d458f83ea304316e2c6c7ad16c5d5bc1ddf35b Mon Sep 17 00:00:00 2001 From: Elite Encoder Date: Thu, 27 Mar 2025 18:53:36 +0000 Subject: [PATCH 17/36] attempt to overcome CI caching images --- runner/docker/Dockerfile.live-base-comfyui | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runner/docker/Dockerfile.live-base-comfyui b/runner/docker/Dockerfile.live-base-comfyui index 3a637358e..3bfc4c5c8 100644 --- a/runner/docker/Dockerfile.live-base-comfyui +++ b/runner/docker/Dockerfile.live-base-comfyui @@ -1,4 +1,4 @@ -ARG BASE_IMAGE=livepeer/comfyui-base:feat-runner-integration +ARG BASE_IMAGE=livepeer/comfyui-base:sha-9b663d0 FROM ${BASE_IMAGE} # ----------------------------------------------------------------------------- From e35f57325bdd658b222eed55f7c05f701b84e6b8 Mon Sep 17 00:00:00 2001 From: Elite Encoder Date: Thu, 27 Mar 2025 16:44:47 -0400 Subject: [PATCH 18/36] update comfyui-base image --- runner/docker/Dockerfile.live-base-comfyui | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runner/docker/Dockerfile.live-base-comfyui b/runner/docker/Dockerfile.live-base-comfyui index 3bfc4c5c8..ea7389e3e 100644 --- a/runner/docker/Dockerfile.live-base-comfyui +++ b/runner/docker/Dockerfile.live-base-comfyui @@ -1,4 +1,4 @@ -ARG BASE_IMAGE=livepeer/comfyui-base:sha-9b663d0 +ARG BASE_IMAGE=livepeer/comfyui-base:sha-83b4dcb FROM ${BASE_IMAGE} # ----------------------------------------------------------------------------- From 39473e4c7e1d45f525d724e91216e97393097211 Mon Sep 17 00:00:00 2001 From: Elite Encoder Date: Fri, 28 Mar 2025 10:27:22 -0400 Subject: [PATCH 19/36] update comfyui-base image --- runner/docker/Dockerfile.live-base-comfyui | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runner/docker/Dockerfile.live-base-comfyui b/runner/docker/Dockerfile.live-base-comfyui index ea7389e3e..2ddeea022 100644 --- a/runner/docker/Dockerfile.live-base-comfyui +++ b/runner/docker/Dockerfile.live-base-comfyui @@ -1,4 +1,4 @@ -ARG BASE_IMAGE=livepeer/comfyui-base:sha-83b4dcb +ARG BASE_IMAGE=livepeer/comfyui-base:sha-c73cc63 FROM ${BASE_IMAGE} # ----------------------------------------------------------------------------- From f38876905c10f230d549ad0d7fc88812c46a610c Mon Sep 17 00:00:00 2001 From: Elite Encoder Date: Fri, 28 Mar 2025 12:34:38 -0400 Subject: [PATCH 20/36] pin live-base for comfyui --- runner/docker/Dockerfile.live-app__PIPELINE__ | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runner/docker/Dockerfile.live-app__PIPELINE__ b/runner/docker/Dockerfile.live-app__PIPELINE__ index 70534534b..add62ec65 100644 --- a/runner/docker/Dockerfile.live-app__PIPELINE__ +++ b/runner/docker/Dockerfile.live-app__PIPELINE__ @@ -1,5 +1,5 @@ ARG PIPELINE=comfyui -ARG BASE_IMAGE=livepeer/ai-runner:live-base-${PIPELINE} +ARG BASE_IMAGE=livepeer/ai-runner:live-base-comfyui-runner FROM ${BASE_IMAGE} ARG PIPELINE ENV PIPELINE=${PIPELINE} From 455129a4233e695d1f1ac4f51d51c29d2c78215d Mon Sep 17 00:00:00 2001 From: PSchroedl Date: Fri, 28 Mar 2025 09:56:40 -0700 Subject: [PATCH 21/36] Update ai-runner-live-pipelines-docker.yaml MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Trigger “Build common live base image” when any of the runner/docker/Dockerfile.live-base* files are edited --- .github/workflows/ai-runner-live-pipelines-docker.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ai-runner-live-pipelines-docker.yaml b/.github/workflows/ai-runner-live-pipelines-docker.yaml index 4482bbe07..a24f210e5 100644 --- a/.github/workflows/ai-runner-live-pipelines-docker.yaml +++ b/.github/workflows/ai-runner-live-pipelines-docker.yaml @@ -39,7 +39,7 @@ jobs: uses: tj-actions/changed-files@823fcebdb31bb35fdf2229d9f769b400309430d0 # v46.0.3 with: files: | - runner/docker/Dockerfile.live-base + runner/docker/Dockerfile.live-base* - name: Check if build needed id: check_build From 7199dce0ef96a542ae85a54c1ca80d2b42a9f6ca Mon Sep 17 00:00:00 2001 From: Elite Encoder Date: Fri, 28 Mar 2025 13:13:05 -0400 Subject: [PATCH 22/36] Revert "pin live-base for comfyui" This reverts commit 25c5183575dec349cd42168915a2505090da7608. --- runner/docker/Dockerfile.live-app__PIPELINE__ | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runner/docker/Dockerfile.live-app__PIPELINE__ b/runner/docker/Dockerfile.live-app__PIPELINE__ index add62ec65..70534534b 100644 --- a/runner/docker/Dockerfile.live-app__PIPELINE__ +++ b/runner/docker/Dockerfile.live-app__PIPELINE__ @@ -1,5 +1,5 @@ ARG PIPELINE=comfyui -ARG BASE_IMAGE=livepeer/ai-runner:live-base-comfyui-runner +ARG BASE_IMAGE=livepeer/ai-runner:live-base-${PIPELINE} FROM ${BASE_IMAGE} ARG PIPELINE ENV PIPELINE=${PIPELINE} From d3bb50e9bf99e42ff86979f1dc06e6bc70fc37d3 Mon Sep 17 00:00:00 2001 From: Elite Encoder Date: Mon, 31 Mar 2025 14:47:47 -0400 Subject: [PATCH 23/36] use comfyui-base:latest --- runner/docker/Dockerfile.live-base-comfyui | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runner/docker/Dockerfile.live-base-comfyui b/runner/docker/Dockerfile.live-base-comfyui index 2ddeea022..9a1d25f43 100644 --- a/runner/docker/Dockerfile.live-base-comfyui +++ b/runner/docker/Dockerfile.live-base-comfyui @@ -1,4 +1,4 @@ -ARG BASE_IMAGE=livepeer/comfyui-base:sha-c73cc63 +ARG BASE_IMAGE=livepeer/comfyui-base:latest FROM ${BASE_IMAGE} # ----------------------------------------------------------------------------- From 1176f9dc95a9a1ad36c49848bfaa53a75c8bd0c1 Mon Sep 17 00:00:00 2001 From: Elite Date: Tue, 1 Apr 2025 16:35:40 +0000 Subject: [PATCH 24/36] cleanup dockerfile --- runner/docker/Dockerfile.live-app__PIPELINE__ | 13 ++----------- runner/docker/Dockerfile.live-base-comfyui | 2 ++ runner/requirements.live-ai.txt | 2 +- 3 files changed, 5 insertions(+), 12 deletions(-) diff --git a/runner/docker/Dockerfile.live-app__PIPELINE__ b/runner/docker/Dockerfile.live-app__PIPELINE__ index 70534534b..f57ac8d6d 100644 --- a/runner/docker/Dockerfile.live-app__PIPELINE__ +++ b/runner/docker/Dockerfile.live-app__PIPELINE__ @@ -24,11 +24,6 @@ RUN if [ "$PIPELINE" = "comfyui" ]; then \ conda run -n comfystream pip install -r /app/requirements.comfyui.txt; \ fi -# TODO: Figure out a way to have this in requirements file -RUN pip install --no-cache-dir triton==3.1.0 -RUN pip uninstall -y onnx onnxruntime onnxruntime-gpu -RUN pip install onnx==1.17.0 onnxruntime-gpu==1.17.0 - # Set environment variables ENV MAX_WORKERS=1 ENV HUGGINGFACE_HUB_CACHE=/models @@ -45,9 +40,5 @@ WORKDIR /app ARG GIT_SHA ENV GIT_SHA=$GIT_SHA -#TODO if this dockerfile will support other pipelines, we need to not always activate comfystream -RUN echo "source /workspace/miniconda3/etc/profile.d/conda.sh && conda activate comfystream" >> /conda_activate.sh && \ - chmod +x /conda_activate.sh -ENV BASH_ENV=/conda_activate.sh - -CMD ["bash", "-c", "uvicorn app.main:app --log-config app/cfg/uvicorn_logging_config.json --host '' --port 8000"] +RUN echo "source /workspace/miniconda3/etc/profile.d/conda.sh && conda activate comfystream" >> ~/.bashrc +CMD ["uvicorn", "app.main:app", "--log-config", "app/cfg/uvicorn_logging_config.json", "--host", "", "--port", "8000"] \ No newline at end of file diff --git a/runner/docker/Dockerfile.live-base-comfyui b/runner/docker/Dockerfile.live-base-comfyui index 9a1d25f43..317f3e905 100644 --- a/runner/docker/Dockerfile.live-base-comfyui +++ b/runner/docker/Dockerfile.live-base-comfyui @@ -76,4 +76,6 @@ ENV COMFY_UI_WORKSPACE="/workspace/ComfyUI" RUN mkdir -p /workspace/ComfyUI RUN rm -rf /workspace/ComfyUI/models && ln -s /models/ComfyUI--models /workspace/ComfyUI/models RUN rm -rf /workspace/ComfyUI/output && ln -s /models/ComfyUI--output /workspace/ComfyUI/output + +ENV BASH_ENV=/conda_activate.sh ENTRYPOINT ["bash", "-c", "exec /opt/nvidia/nvidia_entrypoint.sh \"$@\"", "--"] diff --git a/runner/requirements.live-ai.txt b/runner/requirements.live-ai.txt index 70c1ac400..1e63b08af 100644 --- a/runner/requirements.live-ai.txt +++ b/runner/requirements.live-ai.txt @@ -6,7 +6,7 @@ Pillow==10.3.0 python-multipart==0.0.9 uvicorn==0.34.0 huggingface_hub==0.23.2 -triton>=2.1.0 +triton>=3.1.0 peft==0.11.1 deepcache==0.1.1 safetensors==0.4.3 From 34facd6c8e0b99aea0086441e3468b8e29b1042d Mon Sep 17 00:00:00 2001 From: Elite Date: Tue, 1 Apr 2025 16:35:59 +0000 Subject: [PATCH 25/36] temporarily pin livepeer/comfyui-base:feat-runner-integration --- runner/docker/Dockerfile.live-base-comfyui | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runner/docker/Dockerfile.live-base-comfyui b/runner/docker/Dockerfile.live-base-comfyui index 317f3e905..a96030869 100644 --- a/runner/docker/Dockerfile.live-base-comfyui +++ b/runner/docker/Dockerfile.live-base-comfyui @@ -1,4 +1,4 @@ -ARG BASE_IMAGE=livepeer/comfyui-base:latest +ARG BASE_IMAGE=livepeer/comfyui-base:feat-runner-integration FROM ${BASE_IMAGE} # ----------------------------------------------------------------------------- From 5520c1bc01116d3930ce5c0b04ae880fd3d011b1 Mon Sep 17 00:00:00 2001 From: Elite Encoder Date: Tue, 1 Apr 2025 11:43:48 -0400 Subject: [PATCH 26/36] restore gpus flag --- runner/run-lv2v.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/runner/run-lv2v.sh b/runner/run-lv2v.sh index 6217681e2..d8ec102e2 100755 --- a/runner/run-lv2v.sh +++ b/runner/run-lv2v.sh @@ -17,6 +17,7 @@ CONTAINER_NAME=live-video-to-video-${PIPELINE} docker run -it --rm --name ${CONTAINER_NAME} \ -e PIPELINE=live-video-to-video \ -e MODEL_ID=${PIPELINE} \ + --gpus all \ -p ${PORT}:8000 \ -v ./models:/models \ livepeer/ai-runner:live-app-${PIPELINE} 2>&1 | tee ./run-lv2v.log & From 5f2a579fb0009cd00e93adedfa3ded455df005f8 Mon Sep 17 00:00:00 2001 From: Elite Encoder Date: Tue, 1 Apr 2025 21:40:02 -0400 Subject: [PATCH 27/36] revert conda env change --- runner/docker/Dockerfile.live-app__PIPELINE__ | 1 - runner/docker/Dockerfile.live-base-comfyui | 4 ++++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/runner/docker/Dockerfile.live-app__PIPELINE__ b/runner/docker/Dockerfile.live-app__PIPELINE__ index f57ac8d6d..7dcef9415 100644 --- a/runner/docker/Dockerfile.live-app__PIPELINE__ +++ b/runner/docker/Dockerfile.live-app__PIPELINE__ @@ -40,5 +40,4 @@ WORKDIR /app ARG GIT_SHA ENV GIT_SHA=$GIT_SHA -RUN echo "source /workspace/miniconda3/etc/profile.d/conda.sh && conda activate comfystream" >> ~/.bashrc CMD ["uvicorn", "app.main:app", "--log-config", "app/cfg/uvicorn_logging_config.json", "--host", "", "--port", "8000"] \ No newline at end of file diff --git a/runner/docker/Dockerfile.live-base-comfyui b/runner/docker/Dockerfile.live-base-comfyui index a96030869..d35922107 100644 --- a/runner/docker/Dockerfile.live-base-comfyui +++ b/runner/docker/Dockerfile.live-base-comfyui @@ -77,5 +77,9 @@ RUN mkdir -p /workspace/ComfyUI RUN rm -rf /workspace/ComfyUI/models && ln -s /models/ComfyUI--models /workspace/ComfyUI/models RUN rm -rf /workspace/ComfyUI/output && ln -s /models/ComfyUI--output /workspace/ComfyUI/output +# Ensure all the next RUN commands are run in the comfystream conda environment +RUN echo "source /workspace/miniconda3/etc/profile.d/conda.sh && conda activate comfystream" >> /conda_activate.sh && \ + chmod +x /conda_activate.sh ENV BASH_ENV=/conda_activate.sh +# Ensure the app run from CMD is also in the comfystream conda environment (just wrap in bash and it will run the .bashrc above) ENTRYPOINT ["bash", "-c", "exec /opt/nvidia/nvidia_entrypoint.sh \"$@\"", "--"] From 4ac7aefebaa2676a70999ea834b5072fc7b8a598 Mon Sep 17 00:00:00 2001 From: Elite Encoder Date: Wed, 2 Apr 2025 10:28:20 -0400 Subject: [PATCH 28/36] update base image name --- runner/docker/Dockerfile.live-base-comfyui | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runner/docker/Dockerfile.live-base-comfyui b/runner/docker/Dockerfile.live-base-comfyui index d35922107..84eab9b5d 100644 --- a/runner/docker/Dockerfile.live-base-comfyui +++ b/runner/docker/Dockerfile.live-base-comfyui @@ -1,4 +1,4 @@ -ARG BASE_IMAGE=livepeer/comfyui-base:feat-runner-integration +ARG BASE_IMAGE=livepeer/comfyui-base:latest FROM ${BASE_IMAGE} # ----------------------------------------------------------------------------- From 106553b99f4117b52bd93a128a11bdb5ed7f0387 Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Fri, 4 Apr 2025 15:13:53 -0300 Subject: [PATCH 29/36] .github: Add mypy step on test CI --- .github/workflows/ai-runner-test.yaml | 5 +++++ runner/requirements-dev.txt | 1 + 2 files changed, 6 insertions(+) diff --git a/.github/workflows/ai-runner-test.yaml b/.github/workflows/ai-runner-test.yaml index 12040fe8e..b0de37771 100644 --- a/.github/workflows/ai-runner-test.yaml +++ b/.github/workflows/ai-runner-test.yaml @@ -29,6 +29,11 @@ jobs: python -m pip install --upgrade pip pip install -r runner/requirements-dev.txt pip install -r runner/requirements.txt + pip install -r runner/requirements.live-ai.txt + + - name: Run mypy type checking + working-directory: runner + run: mypy runner/ - name: Run tests working-directory: runner diff --git a/runner/requirements-dev.txt b/runner/requirements-dev.txt index 121364943..372b02414 100644 --- a/runner/requirements-dev.txt +++ b/runner/requirements-dev.txt @@ -1,2 +1,3 @@ pytest pytest-mock +mypy From 42c7a0ddaf58d49605350c03c43bde404beb5c3f Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Fri, 4 Apr 2025 15:14:18 -0300 Subject: [PATCH 30/36] live/protocol: Fix typing on last_value_cache --- runner/app/live/streamer/protocol/last_value_cache.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/runner/app/live/streamer/protocol/last_value_cache.py b/runner/app/live/streamer/protocol/last_value_cache.py index c5d7444d2..d8bb75a6a 100644 --- a/runner/app/live/streamer/protocol/last_value_cache.py +++ b/runner/app/live/streamer/protocol/last_value_cache.py @@ -1,5 +1,6 @@ import threading -from typing import Optional, TypeVar, Generic, Callable +import logging +from typing import Optional, TypeVar, Generic, Callable, cast T = TypeVar('T') @@ -41,6 +42,7 @@ def value_is_set(): logging.warning(f"Timed out waiting for value (timeout={timeout}s)") return None - return self._value + # we know the value is set, so we can cast it to T + return cast(T, self._value) From c2643ec73d9646f2841872f4fddaf421bf5f590a Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Fri, 4 Apr 2025 15:15:53 -0300 Subject: [PATCH 31/36] Add mypy.ini file --- runner/mypy.ini | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 runner/mypy.ini diff --git a/runner/mypy.ini b/runner/mypy.ini new file mode 100644 index 000000000..050e614b7 --- /dev/null +++ b/runner/mypy.ini @@ -0,0 +1,23 @@ +[mypy] +python_version = 3.11 +warn_return_any = True +warn_unused_configs = True +disallow_untyped_defs = True +disallow_incomplete_defs = True +check_untyped_defs = True +disallow_untyped_decorators = True +no_implicit_optional = True +ignore_missing_imports = True + +[mypy.plugins.numpy.*] +follow_imports = skip + +[mypy.plugins.pandas.*] +follow_imports = skip + +# Per-module options: +[mypy.app.*] +disallow_untyped_defs = True + +[mypy.tests.*] +disallow_untyped_defs = False From e4b24d0e6cf08bc6aeb16f90c9a32cac525cdc86 Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Fri, 4 Apr 2025 15:18:46 -0300 Subject: [PATCH 32/36] Remove live-ai requirements from mypy ci --- .github/workflows/ai-runner-test.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/ai-runner-test.yaml b/.github/workflows/ai-runner-test.yaml index b0de37771..b0d1a4ee6 100644 --- a/.github/workflows/ai-runner-test.yaml +++ b/.github/workflows/ai-runner-test.yaml @@ -29,7 +29,6 @@ jobs: python -m pip install --upgrade pip pip install -r runner/requirements-dev.txt pip install -r runner/requirements.txt - pip install -r runner/requirements.live-ai.txt - name: Run mypy type checking working-directory: runner From 0f17edacec2c175ae3f88f73ca40577b71b3b699 Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Fri, 4 Apr 2025 15:33:49 -0300 Subject: [PATCH 33/36] Fix mypy folder --- .github/workflows/ai-runner-test.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ai-runner-test.yaml b/.github/workflows/ai-runner-test.yaml index b0d1a4ee6..350e92907 100644 --- a/.github/workflows/ai-runner-test.yaml +++ b/.github/workflows/ai-runner-test.yaml @@ -32,7 +32,7 @@ jobs: - name: Run mypy type checking working-directory: runner - run: mypy runner/ + run: mypy app/ - name: Run tests working-directory: runner From 2be150841800f313bf283332d28ed11a53bc6e33 Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Fri, 4 Apr 2025 15:56:00 -0300 Subject: [PATCH 34/36] Make mypy config less strict --- runner/mypy.ini | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/runner/mypy.ini b/runner/mypy.ini index 050e614b7..f964e4215 100644 --- a/runner/mypy.ini +++ b/runner/mypy.ini @@ -2,12 +2,16 @@ python_version = 3.11 warn_return_any = True warn_unused_configs = True -disallow_untyped_defs = True -disallow_incomplete_defs = True +disallow_untyped_defs = False +disallow_incomplete_defs = False check_untyped_defs = True disallow_untyped_decorators = True no_implicit_optional = True ignore_missing_imports = True +implicit_reexport = True +warn_no_return = False +disallow_untyped_args = True +disallow_untyped_calls = True [mypy.plugins.numpy.*] follow_imports = skip @@ -17,7 +21,9 @@ follow_imports = skip # Per-module options: [mypy.app.*] -disallow_untyped_defs = True +disallow_untyped_defs = False +disallow_untyped_args = True [mypy.tests.*] disallow_untyped_defs = False +disallow_untyped_args = False From 4334d18bf010a995feeba2ff3fe768772df35c0e Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Fri, 4 Apr 2025 16:01:59 -0300 Subject: [PATCH 35/36] mypy: Remove hallucinated config --- runner/mypy.ini | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/runner/mypy.ini b/runner/mypy.ini index f964e4215..7b63d7a54 100644 --- a/runner/mypy.ini +++ b/runner/mypy.ini @@ -10,7 +10,6 @@ no_implicit_optional = True ignore_missing_imports = True implicit_reexport = True warn_no_return = False -disallow_untyped_args = True disallow_untyped_calls = True [mypy.plugins.numpy.*] @@ -21,9 +20,7 @@ follow_imports = skip # Per-module options: [mypy.app.*] -disallow_untyped_defs = False -disallow_untyped_args = True +disallow_untyped_defs = True [mypy.tests.*] disallow_untyped_defs = False -disallow_untyped_args = False From 0fb4a831dead70312f00996d68bed8bf28f778d6 Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Sun, 6 Apr 2025 12:51:55 -0300 Subject: [PATCH 36/36] WIP --- runner/app/main.py | 15 ++++++++++----- runner/app/pipelines/live_video_to_video.py | 2 +- runner/app/pipelines/llm.py | 8 ++++---- runner/mypy.ini | 14 +++++++++++++- 4 files changed, 28 insertions(+), 11 deletions(-) diff --git a/runner/app/main.py b/runner/app/main.py index 3efb3738d..0e25ec050 100644 --- a/runner/app/main.py +++ b/runner/app/main.py @@ -3,20 +3,25 @@ from contextlib import asynccontextmanager from app.routes import health, hardware -from fastapi import FastAPI +from fastapi import FastAPI, APIRouter from fastapi.routing import APIRoute from app.utils.hardware import HardwareInfo +from app.pipelines.base import Pipeline from app.live.log import config_logging from prometheus_client import Gauge, generate_latest, CONTENT_TYPE_LATEST from starlette.responses import Response +class RunnerFastAPI(FastAPI): + hardware_info_service: HardwareInfo + pipeline: Pipeline + config_logging(log_level=logging.DEBUG if os.getenv("VERBOSE_LOGGING")=="1" else logging.INFO) logger = logging.getLogger(__name__) VERSION = Gauge('version', 'Runner version', ['app', 'version']) @asynccontextmanager -async def lifespan(app: FastAPI): +async def lifespan(app: RunnerFastAPI): # Create application wide hardware info service. app.hardware_info_service = HardwareInfo() @@ -39,7 +44,7 @@ async def lifespan(app: FastAPI): logger.info("Shutting down") -def load_pipeline(pipeline: str, model_id: str) -> any: +def load_pipeline(pipeline: str, model_id: str) -> Pipeline: match pipeline: case "text-to-image": from app.pipelines.text_to_image import TextToImagePipeline @@ -89,7 +94,7 @@ def load_pipeline(pipeline: str, model_id: str) -> any: ) -def load_route(pipeline: str) -> any: +def load_route(pipeline: str) -> APIRouter: match pipeline: case "text-to-image": from app.routes import text_to_image @@ -143,7 +148,7 @@ def use_route_names_as_operation_ids(app: FastAPI) -> None: route.operation_id = route.name -app = FastAPI(lifespan=lifespan) +app = RunnerFastAPI(lifespan=lifespan) @app.get("/metrics", include_in_schema=False) async def metrics(): diff --git a/runner/app/pipelines/live_video_to_video.py b/runner/app/pipelines/live_video_to_video.py index 2c736faa7..dfd7054da 100644 --- a/runner/app/pipelines/live_video_to_video.py +++ b/runner/app/pipelines/live_video_to_video.py @@ -252,7 +252,7 @@ def read_proc_as_map(path: str) -> dict | str: with open(path, "r") as f: return f.read() - os_proc_info = {} + os_proc_info: dict[str, dict | str] = {} for proc_file in ["status", "wchan", "io"]: try: path = f"/proc/{pid}/{proc_file}" diff --git a/runner/app/pipelines/llm.py b/runner/app/pipelines/llm.py index b60fec53e..53c4a7d1c 100644 --- a/runner/app/pipelines/llm.py +++ b/runner/app/pipelines/llm.py @@ -172,7 +172,7 @@ async def generate( self, messages: List[Dict[str, str]], generation_config: Optional[GenerationConfig] = None, - ) -> AsyncGenerator[Dict[str, Any], None]: + ) -> AsyncGenerator[LLMResponse, None]: """Internal generation method""" start_time = time.time() config = generation_config or GenerationConfig() @@ -248,7 +248,7 @@ async def generate( duration = end_time - start_time logger.info(f"Generation completed in {duration:.2f}s") logger.info( - f" Time to first token: {(first_token_time - start_time):.2f} seconds") + f" Time to first token: {((first_token_time or 0) - start_time):.2f} seconds") logger.info(f" Total tokens: {total_tokens}") logger.info(f" Prompt tokens: {input_tokens}") logger.info(f" Generated tokens: {total_tokens}") @@ -288,9 +288,9 @@ async def generate( async def __call__( self, - messages: List[Dict[str, str]], + messages: List[Dict[str, str]] = [], **kwargs - ) -> AsyncGenerator[Union[str, Dict[str, Any]], None]: + ) -> AsyncGenerator[LLMResponse, None]: """ Generate responses for messages. diff --git a/runner/mypy.ini b/runner/mypy.ini index 7b63d7a54..f5e14b82d 100644 --- a/runner/mypy.ini +++ b/runner/mypy.ini @@ -10,7 +10,7 @@ no_implicit_optional = True ignore_missing_imports = True implicit_reexport = True warn_no_return = False -disallow_untyped_calls = True +disallow_untyped_calls = False [mypy.plugins.numpy.*] follow_imports = skip @@ -22,5 +22,17 @@ follow_imports = skip [mypy.app.*] disallow_untyped_defs = True +# Stricter typing specifically for the live folder +[mypy.app.live.*] +mypy_path = $MYPY_CONFIG_FILE_DIR/app/live +namespace_packages = True +disallow_untyped_defs = True +disallow_any_generics = True +disallow_incomplete_defs = True +disallow_untyped_calls = True +warn_redundant_casts = True +warn_return_any = True +warn_unreachable = True + [mypy.tests.*] disallow_untyped_defs = False