From 5b700b4176405e7b6965ddabe82dba866448f716 Mon Sep 17 00:00:00 2001 From: John | Elite Encoder Date: Mon, 14 Apr 2025 16:00:26 +0000 Subject: [PATCH 01/11] update comfyui.py to use comfystream pipeline --- runner/app/live/pipelines/comfyui.py | 38 ++++++++-------------- runner/docker/Dockerfile.live-base-comfyui | 2 +- 2 files changed, 15 insertions(+), 25 deletions(-) diff --git a/runner/app/live/pipelines/comfyui.py b/runner/app/live/pipelines/comfyui.py index c06c2e0e3..cb9f58d1c 100644 --- a/runner/app/live/pipelines/comfyui.py +++ b/runner/app/live/pipelines/comfyui.py @@ -1,7 +1,6 @@ import os import json import torch -import asyncio import numpy as np from PIL import Image from typing import Union @@ -9,7 +8,7 @@ import pathlib from .interface import Pipeline -from comfystream.client import ComfyStreamClient +from comfystream.pipeline import Pipeline as ComfyStreamPipeline from trickle import VideoFrame, VideoOutput import logging @@ -52,52 +51,43 @@ def validate_prompt(cls, v) -> dict: class ComfyUI(Pipeline): def __init__(self): comfy_ui_workspace = os.getenv(COMFY_UI_WORKSPACE_ENV) - self.client = ComfyStreamClient(cwd=comfy_ui_workspace) + self.pipeline = ComfyStreamPipeline(width=512, height=512, cwd=comfy_ui_workspace) self.params: ComfyUIParams - self.video_incoming_frames: asyncio.Queue[VideoOutput] = asyncio.Queue() async def initialize(self, **params): new_params = ComfyUIParams(**params) logging.info(f"Initializing ComfyUI Pipeline with prompt: {new_params.prompt}") - # TODO: currently its a single prompt, but need to support multiple prompts - await self.client.set_prompts([new_params.prompt]) + await self.pipeline.set_prompts([new_params.prompt]) self.params = new_params # Warm up the pipeline - dummy_frame = VideoFrame(None, 0, 0) - dummy_frame.side_data.input = torch.randn(1, 512, 512, 3) - - for _ in range(WARMUP_RUNS): - self.client.put_video_input(dummy_frame) - _ = await self.client.get_video_output() + await self.pipeline.warm_video() logging.info("Pipeline initialization and warmup complete") async def put_video_frame(self, frame: VideoFrame, request_id: str): + # Convert VideoFrame to format expected by comfystream image_np = np.array(frame.image.convert("RGB")).astype(np.float32) / 255.0 frame.side_data.input = torch.tensor(image_np).unsqueeze(0) frame.side_data.skipped = True - self.client.put_video_input(frame) - await self.video_incoming_frames.put(VideoOutput(frame, request_id)) - - async def get_processed_video_frame(self): - result_tensor = await self.client.get_video_output() - out = await self.video_incoming_frames.get() - while out.frame.side_data.skipped: - out = await self.video_incoming_frames.get() + frame.side_data.request_id = request_id + await self.pipeline.put_video_frame(frame) + async def get_processed_video_frame(self) -> VideoOutput: + processed_frame = await self.pipeline.get_processed_video_frame() + # Convert back to VideoOutput format + result_tensor = processed_frame.side_data.input result_tensor = result_tensor.squeeze(0) result_image_np = (result_tensor * 255).byte() result_image = Image.fromarray(result_image_np.cpu().numpy()) - return out.replace_image(result_image) + return VideoOutput(processed_frame, processed_frame.side_data.request_id).replace_image(result_image) async def update_params(self, **params): new_params = ComfyUIParams(**params) logging.info(f"Updating ComfyUI Pipeline Prompt: {new_params.prompt}") - # TODO: currently its a single prompt, but need to support multiple prompts - await self.client.update_prompts([new_params.prompt]) + await self.pipeline.update_prompts([new_params.prompt]) self.params = new_params async def stop(self): logging.info("Stopping ComfyUI pipeline") - await self.client.cleanup() + await self.pipeline.cleanup() logging.info("ComfyUI pipeline stopped") diff --git a/runner/docker/Dockerfile.live-base-comfyui b/runner/docker/Dockerfile.live-base-comfyui index 74b1b2811..ab9c874e4 100644 --- a/runner/docker/Dockerfile.live-base-comfyui +++ b/runner/docker/Dockerfile.live-base-comfyui @@ -1,4 +1,4 @@ -ARG BASE_IMAGE=livepeer/comfyui-base@sha256:4435bad85c3a2fce2b491135bee49eedb8edbd8bdf5d124cb0a95a1d4ecb6856 +ARG BASE_IMAGE=livepeer/comfyui-base:feat-refactor-package-export FROM ${BASE_IMAGE} # ----------------------------------------------------------------------------- From 53a0beb1ccaac8282e9079b2bab94ea96143533f Mon Sep 17 00:00:00 2001 From: Elite Encoder Date: Mon, 14 Apr 2025 21:09:48 +0000 Subject: [PATCH 02/11] poc comfystream.pipeline implementation --- runner/app/live/pipelines/comfyui.py | 52 ++++++++++++++++++-------- runner/app/live/pipelines/interface.py | 4 +- runner/app/live/pipelines/noop.py | 4 +- runner/app/live/streamer/process.py | 8 ++-- runner/app/live/streamer/streamer.py | 7 ++-- 5 files changed, 48 insertions(+), 27 deletions(-) diff --git a/runner/app/live/pipelines/comfyui.py b/runner/app/live/pipelines/comfyui.py index cb9f58d1c..29a99c4a4 100644 --- a/runner/app/live/pipelines/comfyui.py +++ b/runner/app/live/pipelines/comfyui.py @@ -6,10 +6,11 @@ from typing import Union from pydantic import BaseModel, field_validator import pathlib +import av from .interface import Pipeline from comfystream.pipeline import Pipeline as ComfyStreamPipeline -from trickle import VideoFrame, VideoOutput +from trickle import VideoFrame, VideoOutput, AudioFrame, AudioOutput import logging @@ -64,22 +65,22 @@ async def initialize(self, **params): await self.pipeline.warm_video() logging.info("Pipeline initialization and warmup complete") + async def put_video_frame(self, frame: VideoFrame, request_id: str): - # Convert VideoFrame to format expected by comfystream - image_np = np.array(frame.image.convert("RGB")).astype(np.float32) / 255.0 - frame.side_data.input = torch.tensor(image_np).unsqueeze(0) - frame.side_data.skipped = True - frame.side_data.request_id = request_id - await self.pipeline.put_video_frame(frame) - - async def get_processed_video_frame(self) -> VideoOutput: - processed_frame = await self.pipeline.get_processed_video_frame() - # Convert back to VideoOutput format - result_tensor = processed_frame.side_data.input - result_tensor = result_tensor.squeeze(0) - result_image_np = (result_tensor * 255).byte() - result_image = Image.fromarray(result_image_np.cpu().numpy()) - return VideoOutput(processed_frame, processed_frame.side_data.request_id).replace_image(result_image) + await self.pipeline.put_video_frame(self._convert_to_av_frame(frame)) + + async def put_audio_frame(self, frame: AudioFrame, request_id: str): + await self.pipeline.put_audio_frame(self._convert_to_av_frame(frame)) + + async def get_processed_video_frame(self, request_id: str) -> VideoOutput: + av_frame = await self.pipeline.get_processed_video_frame() + video_frame = VideoFrame.from_av_video(av_frame) + video_frame.side_data.request_id = request_id + return VideoOutput(video_frame).replace_image(av_frame.to_image()) + + async def get_processed_audio_frame(self, request_id: str) -> AudioOutput: + av_frame = await self.pipeline.get_processed_audio_frame() + return AudioOutput(av_frame, request_id) async def update_params(self, **params): new_params = ComfyUIParams(**params) @@ -91,3 +92,22 @@ async def stop(self): logging.info("Stopping ComfyUI pipeline") await self.pipeline.cleanup() logging.info("ComfyUI pipeline stopped") + + def _convert_to_av_frame(self, frame: Union[VideoFrame, AudioFrame]) -> Union[av.VideoFrame, av.AudioFrame]: + """Convert trickle frame to av frame""" + if isinstance(frame, VideoFrame): + av_frame = av.VideoFrame.from_ndarray( + np.array(frame.image.convert("RGB")), + format='rgb24' + ) + elif isinstance(frame, AudioFrame): + av_frame = av.AudioFrame.from_ndarray( + frame.samples.reshape(-1, 1), + layout='mono', + rate=frame.rate + ) + + # Common frame properties + av_frame.pts = frame.timestamp + av_frame.time_base = frame.time_base + return av_frame diff --git a/runner/app/live/pipelines/interface.py b/runner/app/live/pipelines/interface.py index eebc88aba..ca678a5a7 100644 --- a/runner/app/live/pipelines/interface.py +++ b/runner/app/live/pipelines/interface.py @@ -23,7 +23,7 @@ def __init__(self, **params): pass @abstractmethod - async def put_video_frame(self, frame: VideoFrame, request_id: str): + async def put_video_frame(self, frame: VideoFrame): """Put a frame into the pipeline. Args: @@ -32,7 +32,7 @@ async def put_video_frame(self, frame: VideoFrame, request_id: str): pass @abstractmethod - async def get_processed_video_frame(self) -> VideoOutput: + async def get_processed_video_frame(self, request_id: str) -> VideoOutput: """Get a processed frame from the pipeline. Returns: diff --git a/runner/app/live/pipelines/noop.py b/runner/app/live/pipelines/noop.py index a2b603d5c..c0e280c00 100644 --- a/runner/app/live/pipelines/noop.py +++ b/runner/app/live/pipelines/noop.py @@ -13,10 +13,10 @@ def __init__(self): async def put_video_frame(self, frame: VideoFrame, request_id: str): await self.frame_queue.put(VideoOutput(frame, request_id)) - async def get_processed_video_frame(self) -> VideoOutput: + async def get_processed_video_frame(self, request_id: str) -> VideoOutput: out = await self.frame_queue.get() processed_frame = out.image.convert("RGB") - return out.replace_image(processed_frame) + return VideoOutput(out.frame.replace_image(processed_frame), request_id) async def initialize(self, **params): logging.info(f"Initializing Noop pipeline with params: {params}") diff --git a/runner/app/live/streamer/process.py b/runner/app/live/streamer/process.py index dcdca0b4e..bebad6cd7 100644 --- a/runner/app/live/streamer/process.py +++ b/runner/app/live/streamer/process.py @@ -113,7 +113,7 @@ def get_recent_logs(self, n=None) -> list[str]: def process_loop(self): self._setup_logging() - pipeline = None +# pipeline = None # Ensure CUDA environment is available inside the subprocess. # Multiprocessing (spawn mode) does not inherit environment variables by default, @@ -150,7 +150,7 @@ async def _initialize_pipeline(self): params = {} try: params = self.param_update_queue.get_nowait() - logging.info(f"PipelineProcess: Got params from param_update_queue {params}") + logging.info(f"PipelineProcess: Got params from param_update_queue {params}") params = self._handle_logging_params(params) except queue.Empty: logging.info("PipelineProcess: No params found in param_update_queue, loading with default params") @@ -210,8 +210,8 @@ async def _input_loop(self, pipeline: Pipeline): async def _output_loop(self, pipeline: Pipeline): while not self.is_done(): try: - output_frame = await pipeline.get_processed_video_frame() - output_frame.log_timestamps["post_process_frame"] = time.time() + output_frame = await pipeline.get_processed_video_frame(self.request_id) + #output_frame.log_timestamps["post_process_frame"] = time.time() await asyncio.to_thread(self.output_queue.put, output_frame) except Exception as e: self._report_error(f"Error processing output frame: {e}") diff --git a/runner/app/live/streamer/streamer.py b/runner/app/live/streamer/streamer.py index ae6f902af..8651885c4 100644 --- a/runner/app/live/streamer/streamer.py +++ b/runner/app/live/streamer/streamer.py @@ -54,12 +54,13 @@ async def start(self, params: dict): self.main_tasks = [ run_in_background("ingress_loop", self.run_ingress_loop()), run_in_background("egress_loop", self.run_egress_loop()), - run_in_background("report_status_loop", self.report_status_loop()), - run_in_background("control_loop", self.run_control_loop()), + run_in_background("report_status_loop", self.report_status_loop()) ] # auxiliary tasks that are not critical to the supervisor, but which we want to run # TODO: maybe remove this since we had to move the control loop to main tasks - self.auxiliary_tasks: list[asyncio.Task] = [] + self.auxiliary_tasks = [ + run_in_background("control_loop", self.run_control_loop()) + ] self.tasks_supervisor_task = run_in_background( "tasks_supervisor", self.tasks_supervisor() ) From c1a8f63d1b720be23f25057bc778aaef865bd9fe Mon Sep 17 00:00:00 2001 From: Elite Encoder Date: Tue, 15 Apr 2025 12:27:41 -0400 Subject: [PATCH 03/11] revert control loop and move pipeline = None --- runner/app/live/streamer/process.py | 3 ++- runner/app/live/streamer/streamer.py | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/runner/app/live/streamer/process.py b/runner/app/live/streamer/process.py index bebad6cd7..523e18024 100644 --- a/runner/app/live/streamer/process.py +++ b/runner/app/live/streamer/process.py @@ -113,7 +113,6 @@ def get_recent_logs(self, n=None) -> list[str]: def process_loop(self): self._setup_logging() -# pipeline = None # Ensure CUDA environment is available inside the subprocess. # Multiprocessing (spawn mode) does not inherit environment variables by default, @@ -146,6 +145,7 @@ def _handle_logging_params(self, params: dict) -> dict: async def _initialize_pipeline(self): try: + pipeline = None stream_id = "" params = {} try: @@ -177,6 +177,7 @@ async def _run_pipeline_loops(self): output_task = asyncio.create_task(self._output_loop(pipeline)) param_task = asyncio.create_task(self._param_update_loop(pipeline)) + async def wait_for_stop(): while not self.is_done(): await asyncio.sleep(0.1) diff --git a/runner/app/live/streamer/streamer.py b/runner/app/live/streamer/streamer.py index 8651885c4..b857754d7 100644 --- a/runner/app/live/streamer/streamer.py +++ b/runner/app/live/streamer/streamer.py @@ -54,12 +54,13 @@ async def start(self, params: dict): self.main_tasks = [ run_in_background("ingress_loop", self.run_ingress_loop()), run_in_background("egress_loop", self.run_egress_loop()), - run_in_background("report_status_loop", self.report_status_loop()) + run_in_background("report_status_loop", self.report_status_loop()), + run_in_background("control_loop", self.run_control_loop()) ] # auxiliary tasks that are not critical to the supervisor, but which we want to run # TODO: maybe remove this since we had to move the control loop to main tasks self.auxiliary_tasks = [ - run_in_background("control_loop", self.run_control_loop()) + ] self.tasks_supervisor_task = run_in_background( "tasks_supervisor", self.tasks_supervisor() From 979e6e75bff7cdf2e3a6de22bebbbb0de6d41c97 Mon Sep 17 00:00:00 2001 From: Elite Encoder Date: Sat, 3 May 2025 21:04:05 +0000 Subject: [PATCH 04/11] pass request id to frames --- runner/app/live/pipelines/comfyui.py | 28 ++++++++++++-------------- runner/app/live/pipelines/interface.py | 4 ++-- runner/app/live/streamer/process.py | 7 +++---- runner/app/live/trickle/frame.py | 1 + 4 files changed, 19 insertions(+), 21 deletions(-) diff --git a/runner/app/live/pipelines/comfyui.py b/runner/app/live/pipelines/comfyui.py index 29a99c4a4..b115f05c7 100644 --- a/runner/app/live/pipelines/comfyui.py +++ b/runner/app/live/pipelines/comfyui.py @@ -52,45 +52,43 @@ def validate_prompt(cls, v) -> dict: class ComfyUI(Pipeline): def __init__(self): comfy_ui_workspace = os.getenv(COMFY_UI_WORKSPACE_ENV) - self.pipeline = ComfyStreamPipeline(width=512, height=512, cwd=comfy_ui_workspace) + self.comfystream = ComfyStreamPipeline(width=512, height=512, cwd=comfy_ui_workspace) self.params: ComfyUIParams async def initialize(self, **params): new_params = ComfyUIParams(**params) logging.info(f"Initializing ComfyUI Pipeline with prompt: {new_params.prompt}") - await self.pipeline.set_prompts([new_params.prompt]) + await self.comfystream.set_prompts([new_params.prompt]) self.params = new_params # Warm up the pipeline - await self.pipeline.warm_video() + await self.comfystream.warm_video() logging.info("Pipeline initialization and warmup complete") - async def put_video_frame(self, frame: VideoFrame, request_id: str): - await self.pipeline.put_video_frame(self._convert_to_av_frame(frame)) + await self.comfystream.put_video_frame(self._convert_to_av_frame(frame), request_id) async def put_audio_frame(self, frame: AudioFrame, request_id: str): - await self.pipeline.put_audio_frame(self._convert_to_av_frame(frame)) + await self.comfystream.put_audio_frame(self._convert_to_av_frame(frame), request_id) - async def get_processed_video_frame(self, request_id: str) -> VideoOutput: - av_frame = await self.pipeline.get_processed_video_frame() + async def get_processed_video_frame(self) -> VideoOutput: + av_frame = await self.comfystream.get_processed_video_frame() video_frame = VideoFrame.from_av_video(av_frame) - video_frame.side_data.request_id = request_id - return VideoOutput(video_frame).replace_image(av_frame.to_image()) + return VideoOutput(video_frame, av_frame.side_data.request_id).replace_image(av_frame.to_image()) - async def get_processed_audio_frame(self, request_id: str) -> AudioOutput: - av_frame = await self.pipeline.get_processed_audio_frame() - return AudioOutput(av_frame, request_id) + async def get_processed_audio_frame(self) -> AudioOutput: + av_frame = await self.comfystream.get_processed_audio_frame() + return AudioOutput(av_frame) async def update_params(self, **params): new_params = ComfyUIParams(**params) logging.info(f"Updating ComfyUI Pipeline Prompt: {new_params.prompt}") - await self.pipeline.update_prompts([new_params.prompt]) + await self.comfystream.update_prompts([new_params.prompt]) self.params = new_params async def stop(self): logging.info("Stopping ComfyUI pipeline") - await self.pipeline.cleanup() + await self.comfystream.cleanup() logging.info("ComfyUI pipeline stopped") def _convert_to_av_frame(self, frame: Union[VideoFrame, AudioFrame]) -> Union[av.VideoFrame, av.AudioFrame]: diff --git a/runner/app/live/pipelines/interface.py b/runner/app/live/pipelines/interface.py index ca678a5a7..eebc88aba 100644 --- a/runner/app/live/pipelines/interface.py +++ b/runner/app/live/pipelines/interface.py @@ -23,7 +23,7 @@ def __init__(self, **params): pass @abstractmethod - async def put_video_frame(self, frame: VideoFrame): + async def put_video_frame(self, frame: VideoFrame, request_id: str): """Put a frame into the pipeline. Args: @@ -32,7 +32,7 @@ async def put_video_frame(self, frame: VideoFrame): pass @abstractmethod - async def get_processed_video_frame(self, request_id: str) -> VideoOutput: + async def get_processed_video_frame(self) -> VideoOutput: """Get a processed frame from the pipeline. Returns: diff --git a/runner/app/live/streamer/process.py b/runner/app/live/streamer/process.py index 523e18024..7ffe3b40c 100644 --- a/runner/app/live/streamer/process.py +++ b/runner/app/live/streamer/process.py @@ -150,7 +150,7 @@ async def _initialize_pipeline(self): params = {} try: params = self.param_update_queue.get_nowait() - logging.info(f"PipelineProcess: Got params from param_update_queue {params}") + logging.info(f"PipelineProcess: Got params from param_update_queue {params}") params = self._handle_logging_params(params) except queue.Empty: logging.info("PipelineProcess: No params found in param_update_queue, loading with default params") @@ -177,7 +177,6 @@ async def _run_pipeline_loops(self): output_task = asyncio.create_task(self._output_loop(pipeline)) param_task = asyncio.create_task(self._param_update_loop(pipeline)) - async def wait_for_stop(): while not self.is_done(): await asyncio.sleep(0.1) @@ -211,8 +210,8 @@ async def _input_loop(self, pipeline: Pipeline): async def _output_loop(self, pipeline: Pipeline): while not self.is_done(): try: - output_frame = await pipeline.get_processed_video_frame(self.request_id) - #output_frame.log_timestamps["post_process_frame"] = time.time() + output_frame = await pipeline.get_processed_video_frame() + output_frame.log_timestamps["post_process_frame"] = time.time() await asyncio.to_thread(self.output_queue.put, output_frame) except Exception as e: self._report_error(f"Error processing output frame: {e}") diff --git a/runner/app/live/trickle/frame.py b/runner/app/live/trickle/frame.py index 9101a8e4d..963d1585a 100644 --- a/runner/app/live/trickle/frame.py +++ b/runner/app/live/trickle/frame.py @@ -10,6 +10,7 @@ class SideData: """ skipped: bool = True input: Image.Image | np.ndarray | None + request_id: str = '' class InputFrame: """ From cb4f5afec36e7ffdd6c87a4b4164dcb9db2dd73f Mon Sep 17 00:00:00 2001 From: Elite Encoder Date: Sat, 3 May 2025 21:04:55 +0000 Subject: [PATCH 05/11] bump comfyui-base tag --- runner/docker/Dockerfile.live-base-comfyui | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runner/docker/Dockerfile.live-base-comfyui b/runner/docker/Dockerfile.live-base-comfyui index ab9c874e4..a2d500192 100644 --- a/runner/docker/Dockerfile.live-base-comfyui +++ b/runner/docker/Dockerfile.live-base-comfyui @@ -1,4 +1,4 @@ -ARG BASE_IMAGE=livepeer/comfyui-base:feat-refactor-package-export +ARG BASE_IMAGE=livepeer/comfyui-base:v0.1.0 FROM ${BASE_IMAGE} # ----------------------------------------------------------------------------- From 8568ae3072f9aaf5c4ef3df3eb3e568efae9659e Mon Sep 17 00:00:00 2001 From: John | Elite Encoder Date: Tue, 13 May 2025 21:34:16 +0000 Subject: [PATCH 06/11] update openapi spec --- runner/gateway.openapi.yaml | 13 +++++-------- runner/openapi.yaml | 13 +++++-------- 2 files changed, 10 insertions(+), 16 deletions(-) diff --git a/runner/gateway.openapi.yaml b/runner/gateway.openapi.yaml index 2a84a1d03..1e7ddac8d 100644 --- a/runner/gateway.openapi.yaml +++ b/runner/gateway.openapi.yaml @@ -525,8 +525,7 @@ components: AudioResponse: properties: audio: - allOf: - - $ref: '#/components/schemas/MediaURL' + $ref: '#/components/schemas/MediaURL' description: The generated audio. type: object required: @@ -827,8 +826,7 @@ components: HTTPError: properties: detail: - allOf: - - $ref: '#/components/schemas/APIError' + $ref: '#/components/schemas/APIError' description: Detailed error information. type: object required: @@ -878,11 +876,9 @@ components: title: Finish Reason default: '' delta: - allOf: - - $ref: '#/components/schemas/LLMMessage' + $ref: '#/components/schemas/LLMMessage' message: - allOf: - - $ref: '#/components/schemas/LLMMessage' + $ref: '#/components/schemas/LLMMessage' type: object required: - index @@ -1010,6 +1006,7 @@ components: argument. default: '' params: + additionalProperties: true type: object title: Params description: Initial parameters for the pipeline. diff --git a/runner/openapi.yaml b/runner/openapi.yaml index fc24c15fc..466ff79c4 100644 --- a/runner/openapi.yaml +++ b/runner/openapi.yaml @@ -569,8 +569,7 @@ components: AudioResponse: properties: audio: - allOf: - - $ref: '#/components/schemas/MediaURL' + $ref: '#/components/schemas/MediaURL' description: The generated audio. type: object required: @@ -930,8 +929,7 @@ components: HTTPError: properties: detail: - allOf: - - $ref: '#/components/schemas/APIError' + $ref: '#/components/schemas/APIError' description: Detailed error information. type: object required: @@ -1035,11 +1033,9 @@ components: title: Finish Reason default: '' delta: - allOf: - - $ref: '#/components/schemas/LLMMessage' + $ref: '#/components/schemas/LLMMessage' message: - allOf: - - $ref: '#/components/schemas/LLMMessage' + $ref: '#/components/schemas/LLMMessage' type: object required: - index @@ -1167,6 +1163,7 @@ components: argument. default: '' params: + additionalProperties: true type: object title: Params description: Initial parameters for the pipeline. From b2cbb69677c570e0c85c6c5fc93f74dffa7da526 Mon Sep 17 00:00:00 2001 From: John | Elite Encoder Date: Tue, 13 May 2025 21:34:27 +0000 Subject: [PATCH 07/11] bump comfyui-base --- runner/docker/Dockerfile.live-base-comfyui | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runner/docker/Dockerfile.live-base-comfyui b/runner/docker/Dockerfile.live-base-comfyui index a2d500192..b37320187 100644 --- a/runner/docker/Dockerfile.live-base-comfyui +++ b/runner/docker/Dockerfile.live-base-comfyui @@ -1,4 +1,4 @@ -ARG BASE_IMAGE=livepeer/comfyui-base:v0.1.0 +ARG BASE_IMAGE=livepeer/comfyui-base@sha256:3cc56a54ce5972bde1747210ba122a100733ec32127278e932010bcafd701889 FROM ${BASE_IMAGE} # ----------------------------------------------------------------------------- From 45ded00b929c228eb5478275c7894c0a96af4d3b Mon Sep 17 00:00:00 2001 From: John | Elite Encoder Date: Tue, 13 May 2025 22:08:21 -0400 Subject: [PATCH 08/11] Revert "update openapi spec" This reverts commit 999d268e7ef9a6be159d724b0ffabc29d44df144. --- runner/gateway.openapi.yaml | 13 ++++++++----- runner/openapi.yaml | 13 ++++++++----- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/runner/gateway.openapi.yaml b/runner/gateway.openapi.yaml index 1e7ddac8d..2a84a1d03 100644 --- a/runner/gateway.openapi.yaml +++ b/runner/gateway.openapi.yaml @@ -525,7 +525,8 @@ components: AudioResponse: properties: audio: - $ref: '#/components/schemas/MediaURL' + allOf: + - $ref: '#/components/schemas/MediaURL' description: The generated audio. type: object required: @@ -826,7 +827,8 @@ components: HTTPError: properties: detail: - $ref: '#/components/schemas/APIError' + allOf: + - $ref: '#/components/schemas/APIError' description: Detailed error information. type: object required: @@ -876,9 +878,11 @@ components: title: Finish Reason default: '' delta: - $ref: '#/components/schemas/LLMMessage' + allOf: + - $ref: '#/components/schemas/LLMMessage' message: - $ref: '#/components/schemas/LLMMessage' + allOf: + - $ref: '#/components/schemas/LLMMessage' type: object required: - index @@ -1006,7 +1010,6 @@ components: argument. default: '' params: - additionalProperties: true type: object title: Params description: Initial parameters for the pipeline. diff --git a/runner/openapi.yaml b/runner/openapi.yaml index 466ff79c4..fc24c15fc 100644 --- a/runner/openapi.yaml +++ b/runner/openapi.yaml @@ -569,7 +569,8 @@ components: AudioResponse: properties: audio: - $ref: '#/components/schemas/MediaURL' + allOf: + - $ref: '#/components/schemas/MediaURL' description: The generated audio. type: object required: @@ -929,7 +930,8 @@ components: HTTPError: properties: detail: - $ref: '#/components/schemas/APIError' + allOf: + - $ref: '#/components/schemas/APIError' description: Detailed error information. type: object required: @@ -1033,9 +1035,11 @@ components: title: Finish Reason default: '' delta: - $ref: '#/components/schemas/LLMMessage' + allOf: + - $ref: '#/components/schemas/LLMMessage' message: - $ref: '#/components/schemas/LLMMessage' + allOf: + - $ref: '#/components/schemas/LLMMessage' type: object required: - index @@ -1163,7 +1167,6 @@ components: argument. default: '' params: - additionalProperties: true type: object title: Params description: Initial parameters for the pipeline. From 131c2ce818d02daf63bb8c0acecec66d14293248 Mon Sep 17 00:00:00 2001 From: John | Elite Encoder Date: Wed, 14 May 2025 20:47:22 +0000 Subject: [PATCH 09/11] fix docker tag --- runner/docker/Dockerfile.live-base-comfyui | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runner/docker/Dockerfile.live-base-comfyui b/runner/docker/Dockerfile.live-base-comfyui index b37320187..31733ad05 100644 --- a/runner/docker/Dockerfile.live-base-comfyui +++ b/runner/docker/Dockerfile.live-base-comfyui @@ -1,4 +1,4 @@ -ARG BASE_IMAGE=livepeer/comfyui-base@sha256:3cc56a54ce5972bde1747210ba122a100733ec32127278e932010bcafd701889 +ARG BASE_IMAGE=livepeer/comfyui-base@sha256:9364f249297ea9aebc053f361a41efa78ce0fbbb15b73b4a73215f3edb8ada2f FROM ${BASE_IMAGE} # ----------------------------------------------------------------------------- From c65ed9bc615ddd9f9b92b25eb3df0bed4104db86 Mon Sep 17 00:00:00 2001 From: John | Elite Encoder Date: Wed, 14 May 2025 22:40:07 +0000 Subject: [PATCH 10/11] fix noop --- runner/app/live/pipelines/noop.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runner/app/live/pipelines/noop.py b/runner/app/live/pipelines/noop.py index c0e280c00..35d6a8ca6 100644 --- a/runner/app/live/pipelines/noop.py +++ b/runner/app/live/pipelines/noop.py @@ -13,10 +13,10 @@ def __init__(self): async def put_video_frame(self, frame: VideoFrame, request_id: str): await self.frame_queue.put(VideoOutput(frame, request_id)) - async def get_processed_video_frame(self, request_id: str) -> VideoOutput: + async def get_processed_video_frame(self) -> VideoOutput: out = await self.frame_queue.get() processed_frame = out.image.convert("RGB") - return VideoOutput(out.frame.replace_image(processed_frame), request_id) + return VideoOutput(out.frame.replace_image(processed_frame), out.request_id) async def initialize(self, **params): logging.info(f"Initializing Noop pipeline with params: {params}") From dd1844103d88e2063e5cc7d2b0b88dc8b61296e2 Mon Sep 17 00:00:00 2001 From: John | Elite Encoder Date: Thu, 15 May 2025 01:08:34 +0000 Subject: [PATCH 11/11] request_id cleanup --- runner/app/live/pipelines/comfyui.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/runner/app/live/pipelines/comfyui.py b/runner/app/live/pipelines/comfyui.py index b115f05c7..942ab0f10 100644 --- a/runner/app/live/pipelines/comfyui.py +++ b/runner/app/live/pipelines/comfyui.py @@ -74,11 +74,12 @@ async def put_audio_frame(self, frame: AudioFrame, request_id: str): async def get_processed_video_frame(self) -> VideoOutput: av_frame = await self.comfystream.get_processed_video_frame() video_frame = VideoFrame.from_av_video(av_frame) - return VideoOutput(video_frame, av_frame.side_data.request_id).replace_image(av_frame.to_image()) + return VideoOutput(video_frame, av_frame.side_data.request_id) async def get_processed_audio_frame(self) -> AudioOutput: av_frame = await self.comfystream.get_processed_audio_frame() - return AudioOutput(av_frame) + audio_frame = AudioFrame.from_av_audio(av_frame) + return AudioOutput(audio_frame, av_frame.side_data.request_id) async def update_params(self, **params): new_params = ComfyUIParams(**params)