diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 902d66cac..dc5a5eb03 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -1,34 +1,52 @@ // For format details, see https://aka.ms/devcontainer.json. For config options, see the // README at: https://github.com/devcontainers/templates/tree/main/src/ubuntu { - "name": "ai-runner", - // Image to use for the dev container. More info: https://containers.dev/guide/dockerfile. - "build": { - "dockerfile": "../Dockerfile", - // "dockerfile": "../docker/Dockerfile.text_to_speech", - "context": ".." - }, - "runArgs": [ - "--gpus=all" - ], - // Features to add to the dev container. More info: https://containers.dev/features. - // Configure tool-specific properties. - "customizations": { - "vscode": { - "settings": {}, - "extensions": [ - "ms-python.python", - "ms-python.black-formatter" - ] - } - }, - // Use 'forwardPorts' to make a list of ports inside the container available locally. - "forwardPorts": [ - 8000 - ], - // Use 'mounts' to make a list of local folders available inside the container. - "mounts": [ - // "source=${localWorkspaceFolder}/models,target=/models,type=bind" - "source=${localEnv:HOME}/.lpData/models,target=/models,type=bind" - ] + "name": "ai-runner", + "initializeCommand": "ls", + // Image to use for the dev container. More info: https://containers.dev/guide/dockerfile. + "containerEnv": { + "PIPELINE": "comfyui" + }, + "build": { + "dockerfile": "../runner/docker/Dockerfile.live-app__PIPELINE__", + "args": { + "PIPELINE": "comfyui" + }, + // "dockerfile": "../Dockerfile", + // "dockerfile": "../docker/Dockerfile.text_to_speech", + "context": "../runner" + }, + "runArgs": [ + "--gpus=all" + ], + // Features to add to the dev container. More info: https://containers.dev/features. + // Configure tool-specific properties. + "customizations": { + "vscode": { + "settings": { + "python.defaultInterpreterPath": "/workspace/miniconda3/envs/comfystream/bin/python", + "python.venvPath": "/workspace/miniconda3/envs", + "python.terminal.activateEnvInCurrentTerminal": false, + "python.terminal.activateEnvironment": true, + "terminal.integrated.shellIntegration.enabled": true + }, + "extensions": [ + "ms-python.python", + "ms-python.black-formatter" + ] + } + }, + // Use 'forwardPorts' to make a list of ports inside the container available locally. + "forwardPorts": [ + 8000 + ], + "appPort": [ + "8000:8000" + ], + // Use 'mounts' to make a list of local folders available inside the container. + "mounts": [ + // "source=${localWorkspaceFolder}/models,target=/models,type=bind" + "source=${localEnv:HOME}/models/ComfyUI--models/,target=/workspace/ComfyUI/models,type=bind", + "source=${localEnv:HOME}/models/ComfyUI--output/,target=/workspace/ComfyUI/output,type=bind" + ] } diff --git a/.github/workflows/ai-runner-live-pipelines-docker.yaml b/.github/workflows/ai-runner-live-pipelines-docker.yaml index 4482bbe07..a24f210e5 100644 --- a/.github/workflows/ai-runner-live-pipelines-docker.yaml +++ b/.github/workflows/ai-runner-live-pipelines-docker.yaml @@ -39,7 +39,7 @@ jobs: uses: tj-actions/changed-files@823fcebdb31bb35fdf2229d9f769b400309430d0 # v46.0.3 with: files: | - runner/docker/Dockerfile.live-base + runner/docker/Dockerfile.live-base* - name: Check if build needed id: check_build diff --git a/.github/workflows/ai-runner-test.yaml b/.github/workflows/ai-runner-test.yaml index 12040fe8e..350e92907 100644 --- a/.github/workflows/ai-runner-test.yaml +++ b/.github/workflows/ai-runner-test.yaml @@ -30,6 +30,10 @@ jobs: pip install -r runner/requirements-dev.txt pip install -r runner/requirements.txt + - name: Run mypy type checking + working-directory: runner + run: mypy app/ + - name: Run tests working-directory: runner run: pytest --verbose --showlocals diff --git a/runner/app/live/pipelines/comfyui.py b/runner/app/live/pipelines/comfyui.py index 72d141439..0f1fc9df9 100644 --- a/runner/app/live/pipelines/comfyui.py +++ b/runner/app/live/pipelines/comfyui.py @@ -1,18 +1,20 @@ import os import json import torch -from PIL import Image import asyncio import numpy as np +from PIL import Image from typing import Union from pydantic import BaseModel, field_validator from .interface import Pipeline from comfystream.client import ComfyStreamClient +from trickle import VideoFrame, VideoOutput import logging COMFY_UI_WORKSPACE_ENV = "COMFY_UI_WORKSPACE" +WARMUP_RUNS = 1 DEFAULT_WORKFLOW_JSON = json.loads(""" { "1": { @@ -263,46 +265,54 @@ def validate_prompt(cls, v) -> dict: class ComfyUI(Pipeline): - def __init__(self, **params): - super().__init__(**params) - + def __init__(self): comfy_ui_workspace = os.getenv(COMFY_UI_WORKSPACE_ENV) self.client = ComfyStreamClient(cwd=comfy_ui_workspace) self.params: ComfyUIParams + self.video_incoming_frames = asyncio.Queue() - self.update_params(**params) + async def warm_video(self): + dummy_frame = VideoFrame(None, 0, 0) + dummy_frame.side_data.input = torch.randn(1, 512, 512, 3) - # Comfy will cache nodes that only need to be run once (i.e. a node that loads model weights) - # We can run the prompt once before actual inputs come in to "warmup" - warmup_input = torch.randn(1, 512, 512, 3) - asyncio.get_event_loop().run_until_complete(self.client.queue_prompt(warmup_input)) + for _ in range(WARMUP_RUNS): + self.client.put_video_input(dummy_frame) + _ = await self.client.get_video_output() + logging.info("Video frame warmup done") - def process_frame(self, image: Image.Image) -> Image.Image: - # Normalize by dividing by 255 to ensure the tensor values are between 0 and 1 - image_np = np.array(image.convert("RGB")).astype(np.float32) / 255.0 - # Convert from numpy to torch.Tensor - # Initially, the torch.Tensor will have shape HWC but we want BHWC - # unsqueeze(0) will add a batch dimension at the beginning of 1 which means we just have 1 image - image_tensor = torch.tensor(image_np).unsqueeze(0) + async def put_video_frame(self, frame: VideoFrame): + image_np = np.array(frame.image.convert("RGB")).astype(np.float32) / 255.0 + frame.side_data.input = torch.tensor(image_np).unsqueeze(0) + frame.side_data.skipped = True + self.client.put_video_input(frame) + await self.video_incoming_frames.put(frame) - # Process using ComfyUI pipeline - result_tensor = asyncio.get_event_loop().run_until_complete(self.client.queue_prompt(image_tensor)) + async def get_processed_video_frame(self): + result_tensor = await self.client.get_video_output() + frame = await self.video_incoming_frames.get() + while frame.side_data.skipped: + frame = await self.video_incoming_frames.get() - # Convert back from Tensor to PIL.Image result_tensor = result_tensor.squeeze(0) result_image_np = (result_tensor * 255).byte() result_image = Image.fromarray(result_image_np.cpu().numpy()) - return result_image + return frame.replace_image(result_image) + + async def set_params(self, **params): + new_params = ComfyUIParams(**params) + logging.info(f"Setting ComfyUI Pipeline Prompt: {new_params.prompt}") + # TODO: currently its a single prompt, but need to support multiple prompts + await self.client.set_prompts([new_params.prompt]) + self.params = new_params - def update_params(self, **params): + async def update_params(self, **params): new_params = ComfyUIParams(**params) - logging.info(f"ComfyUI Pipeline Prompt: {new_params.prompt}") - self.client.set_prompt(new_params.prompt) + logging.info(f"Updating ComfyUI Pipeline Prompt: {new_params.prompt}") + # TODO: currently its a single prompt, but need to support multiple prompts + await self.client.update_prompts([new_params.prompt]) self.params = new_params - #TODO: This is a hack to stop the ComfyStreamClient. Use the comfystream api to stop the client in 0.0.2 async def stop(self): logging.info("Stopping ComfyUI pipeline") - if self.client.comfy_client.is_running: - await self.client.comfy_client.__aexit__(None, None, None) + await self.client.stop() logging.info("ComfyUI pipeline stopped") diff --git a/runner/app/live/pipelines/interface.py b/runner/app/live/pipelines/interface.py index 5832a2f4e..fe3cc2d28 100644 --- a/runner/app/live/pipelines/interface.py +++ b/runner/app/live/pipelines/interface.py @@ -1,5 +1,6 @@ -from abc import ABC, abstractmethod from PIL import Image +from abc import ABC, abstractmethod +from trickle import VideoFrame, VideoOutput class Pipeline(ABC): """Abstract base class for image processing pipelines. @@ -22,21 +23,37 @@ def __init__(self, **params): pass @abstractmethod - def process_frame(self, frame: Image.Image) -> Image.Image: - """Process a single frame through the pipeline. - - Called sequentially with each frame from the stream. + async def put_video_frame(self, frame: VideoFrame): + """Put a frame into the pipeline. Args: - frame: Input PIL Image + frame: Input VideoFrame + """ + pass + + @abstractmethod + async def get_processed_video_frame(self) -> VideoFrame: + """Get a processed frame from the pipeline. Returns: - Processed PIL Image + Processed VideoFrame + """ + pass + + @abstractmethod + async def set_params(self, **params): + """Set pipeline parameters initally. + + Must maintain valid state on success or restore previous state on failure. + set_params starts the prompt loops in comfystream. + + Args: + **params: Implementation-specific parameters """ pass @abstractmethod - def update_params(self, **params): + async def update_params(self, **params): """Update pipeline parameters. Must maintain valid state on success or restore previous state on failure. diff --git a/runner/app/live/pipelines/loader.py b/runner/app/live/pipelines/loader.py index 53b523777..41b2f2617 100644 --- a/runner/app/live/pipelines/loader.py +++ b/runner/app/live/pipelines/loader.py @@ -1,10 +1,10 @@ from .interface import Pipeline -def load_pipeline(name: str, **params) -> Pipeline: +def load_pipeline(name: str) -> Pipeline: if name == "comfyui": from .comfyui import ComfyUI - return ComfyUI(**params) + return ComfyUI() elif name == "noop": from .noop import Noop - return Noop(**params) + return Noop() raise ValueError(f"Unknown pipeline: {name}") diff --git a/runner/app/live/pipelines/noop.py b/runner/app/live/pipelines/noop.py index 620ad6710..a69719de0 100644 --- a/runner/app/live/pipelines/noop.py +++ b/runner/app/live/pipelines/noop.py @@ -1,14 +1,31 @@ -from PIL import Image import logging +import asyncio +from PIL import Image + from .interface import Pipeline +from trickle import VideoFrame, VideoOutput class Noop(Pipeline): - def __init__(self, **params): - super().__init__(**params) + def __init__(self): + self.frame_queue = asyncio.Queue() + + async def put_video_frame(self, frame: VideoFrame): + await self.frame_queue.put(frame) - def process_frame(self, image: Image.Image) -> Image.Image: - return image.convert("RGB") + async def get_processed_video_frame(self) -> VideoFrame: + frame = await self.frame_queue.get() + processed_frame = frame.image.convert("RGB") + return frame.replace_image(processed_frame) - def update_params(self, **params): + async def warm_video(self): + logging.info("Warming video") + + async def set_params(self, **params): + logging.info(f"Setting params: {params}") + + async def update_params(self, **params): logging.info(f"Updating params: {params}") + + async def stop(self): + logging.info("Stopping pipeline") diff --git a/runner/app/live/streamer/process.py b/runner/app/live/streamer/process.py index 164f4700f..98964f820 100644 --- a/runner/app/live/streamer/process.py +++ b/runner/app/live/streamer/process.py @@ -8,7 +8,7 @@ from typing import Any import torch -from pipelines import load_pipeline +from pipelines import Pipeline, load_pipeline from log import config_logging, config_logging_fields, log_timing from trickle import InputFrame, AudioFrame, VideoFrame, OutputFrame, VideoOutput, AudioOutput @@ -79,6 +79,8 @@ def reset_stream(self, request_id: str, stream_id: str): clear_queue(self.log_queue) self.param_update_queue.put({"request_id": request_id, "stream_id": stream_id}) + # TODO: Once audio is implemented, combined send_input with input_loop + # We don't need additional queueing as comfystream already maintains a queue def send_input(self, frame: InputFrame): self._queue_put_fifo(self.input_queue, frame) @@ -122,93 +124,113 @@ def process_loop(self): # ( load_gpu_models is calling logging.info() for every frame ) logging.getLogger("comfy").setLevel(logging.WARNING) - def report_error(error_msg: str): - error_event = { - "message": error_msg, - "timestamp": time.time() - } - logging.error(error_msg) - self._queue_put_fifo(self.error_queue, error_event) - - def _handle_logging_params(params: dict) -> bool: - if isinstance(params, dict) and "request_id" in params and "stream_id" in params: - logging.info(f"PipelineProcess: Resetting logging fields with request_id={params['request_id']}, stream_id={params['stream_id']}") - self.request_id = params["request_id"] - self._reset_logging_fields( - params["request_id"], params["stream_id"] - ) - return True - return False + try: + asyncio.run(self._run_pipeline_loops()) + except Exception as e: + self._report_error(f"Error in process run method: {e}") + + + def _handle_logging_params(self, params: dict) -> bool: + if isinstance(params, dict) and "request_id" in params and "stream_id" in params: + logging.info(f"PipelineProcess: Resetting logging fields with request_id={params['request_id']}, stream_id={params['stream_id']}") + self.request_id = params["request_id"] + self._reset_logging_fields( + params["request_id"], params["stream_id"] + ) + return True + return False + async def _initialize_pipeline(self): try: - stream_id = "" params = {} try: params = self.param_update_queue.get_nowait() - params = {} if _handle_logging_params(params) else params + logging.info(f"PipelineProcess: Got params from param_update_queue {params}") + params = {} if self._handle_logging_params(params) else params except queue.Empty: logging.info("PipelineProcess: No params found in param_update_queue, loading with default params") + + with log_timing(f"PipelineProcess: Pipeline loading with {params}"): + pipeline = load_pipeline(self.pipeline_name) + await pipeline.set_params(**params) + return pipeline + except Exception as e: + self._report_error(f"Error loading pipeline: {e}") + if not params: + self._report_error(f"Pipeline failed to load with default params") + raise + + try: + with log_timing(f"PipelineProcess: Pipeline loading with default params due to error with params: {params}"): + pipeline = load_pipeline(self.pipeline_name) + await pipeline.set_params() + return pipeline except Exception as e: - report_error(f"Error getting params: {e}") + self._report_error(f"Error loading pipeline with default params: {e}") + raise + async def _run_pipeline_loops(self): + pipeline = await self._initialize_pipeline() + await pipeline.warm_video() + input_task = asyncio.create_task(self._input_loop(pipeline)) + output_task = asyncio.create_task(self._output_loop(pipeline)) + param_task = asyncio.create_task(self._param_update_loop(pipeline)) + + try: + await asyncio.gather(input_task, output_task, param_task) + except Exception as e: + self._report_error(f"Error in pipeline loops: {e}") + await self._cleanup_pipeline(pipeline) + raise + + async def _input_loop(self, pipeline: Pipeline): + while not self.is_done(): try: - with log_timing(f"PipelineProcess: Pipeline loading with {params}"): - pipeline = load_pipeline(self.pipeline_name, **params) + input_frame = await asyncio.to_thread(self.input_queue.get, timeout=0.1) + if isinstance(input_frame, VideoFrame): + input_frame.log_timestamps["pre_process_frame"] = time.time() + await pipeline.put_video_frame(input_frame) + elif isinstance(input_frame, AudioFrame): + self.output_queue.put(AudioOutput([input_frame], self.request_id)) + # TODO wire in a proper pipeline here + except queue.Empty: + continue except Exception as e: - report_error(f"Error loading pipeline: {e}") - if not params: - report_error(f"Pipeline failed to load with default params") - raise + self._report_error(f"Error processing input frame: {e}") - try: - with log_timing(f"PipelineProcess: Pipeline loading with default params due to error with params: {params}"): - pipeline = load_pipeline(self.pipeline_name) - except Exception as e: - report_error(f"Error loading pipeline with default params: {e}") - raise - - while not self.is_done(): - while not self.param_update_queue.empty(): - params = self.param_update_queue.get_nowait() - try: - logging.info(f"PipelineProcess: Processing parameter update from queue: {params}") - if not _handle_logging_params(params): - logging.info(f"PipelineProcess: Updating pipeline parameters") - pipeline.update_params(**params) - logging.info(f"PipelineProcess: Successfully applied params to pipeline: {params}") - except Exception as e: - error_msg = f"Error updating params: {str(e)}" - logging.error(error_msg, exc_info=True) - report_error(error_msg) + async def _output_loop(self, pipeline: Pipeline): + while not self.is_done(): + try: + output_frame = await pipeline.get_processed_video_frame() + output_frame.log_timestamps["post_process_frame"] = time.time() + await asyncio.to_thread(self.output_queue.put, VideoOutput(output_frame, self.request_id)) + except Exception as e: + self._report_error(f"Error processing output frame: {e}") - try: - input_frame = self.input_queue.get(timeout=0.1) - except queue.Empty: - continue + async def _param_update_loop(self, pipeline: Pipeline): + while not self.is_done(): + try: + params = self.param_update_queue.get_nowait() + if not self._handle_logging_params(params): + logging.info(f"PipelineProcess: Updating pipeline parameters: {params}") + await pipeline.update_params(**params) + except queue.Empty: + await asyncio.sleep(0.1) + except Exception as e: + self._report_error(f"Error updating params: {e}") - try: - if isinstance(input_frame, VideoFrame): - input_frame.log_timestamps["pre_process_frame"] = time.time() - output_image = pipeline.process_frame(input_frame.image) - input_frame.log_timestamps["post_process_frame"] = time.time() - output_frame = VideoOutput(input_frame.replace_image(output_image), self.request_id) - self.output_queue.put(output_frame) - elif isinstance(input_frame, AudioFrame): - self.output_queue.put(AudioOutput([input_frame], self.request_id)) - # TODO wire in a proper pipeline here - else: - report_error(f"Unsupported input frame type {type(input_frame)}") - except Exception as e: - report_error(f"Error processing frame: {e}") - except Exception as e: - report_error(f"Error in process run method: {e}") - finally: - self._cleanup_pipeline(pipeline) + def _report_error(self, error_msg: str): + error_event = { + "message": error_msg, + "timestamp": time.time() + } + logging.error(error_msg) + self._queue_put_fifo(self.error_queue, error_event) - def _cleanup_pipeline(self, pipeline): + async def _cleanup_pipeline(self, pipeline): if pipeline is not None: try: - asyncio.get_event_loop().run_until_complete(pipeline.stop()) + await pipeline.stop() except Exception as e: logging.error(f"Error stopping pipeline: {e}") diff --git a/runner/app/live/streamer/protocol/last_value_cache.py b/runner/app/live/streamer/protocol/last_value_cache.py index c5d7444d2..d8bb75a6a 100644 --- a/runner/app/live/streamer/protocol/last_value_cache.py +++ b/runner/app/live/streamer/protocol/last_value_cache.py @@ -1,5 +1,6 @@ import threading -from typing import Optional, TypeVar, Generic, Callable +import logging +from typing import Optional, TypeVar, Generic, Callable, cast T = TypeVar('T') @@ -41,6 +42,7 @@ def value_is_set(): logging.warning(f"Timed out waiting for value (timeout={timeout}s)") return None - return self._value + # we know the value is set, so we can cast it to T + return cast(T, self._value) diff --git a/runner/app/live/streamer/streamer.py b/runner/app/live/streamer/streamer.py index 35782d6f1..1dbc56925 100644 --- a/runner/app/live/streamer/streamer.py +++ b/runner/app/live/streamer/streamer.py @@ -55,10 +55,10 @@ async def start(self, params: dict): run_in_background("ingress_loop", self.run_ingress_loop()), run_in_background("egress_loop", self.run_egress_loop()), run_in_background("report_status_loop", self.report_status_loop()), - run_in_background("control_loop", self.run_control_loop()), ] # auxiliary tasks that are not critical to the supervisor, but which we want to run self.auxiliary_tasks = [ + run_in_background("control_loop", self.run_control_loop()), ] self.tasks_supervisor_task = run_in_background( "tasks_supervisor", self.tasks_supervisor() diff --git a/runner/app/live/trickle/frame.py b/runner/app/live/trickle/frame.py index 0832e0777..126fce055 100644 --- a/runner/app/live/trickle/frame.py +++ b/runner/app/live/trickle/frame.py @@ -1,8 +1,15 @@ import av from PIL import Image -from typing import List +from typing import List, Union import numpy as np +class SideData: + """ + Base class for side data, needed to keep it consistent with av frame side_data + """ + skipped: bool = True + input: Union[Image.Image, np.ndarray] = None + class InputFrame: """ Base class for a frame (either audio or video). @@ -13,6 +20,7 @@ class InputFrame: timestamp: int time_base: int log_timestamps: dict[str, float] = {} + side_data: SideData = SideData() def __init__(self): self.timestamp = av.AV_NOPTS_VALUE @@ -36,7 +44,9 @@ def __init__(self, image: Image.Image, timestamp: int, time_base: int, log_times self.log_timestamps = log_timestamps # Returns a copy of an existing VideoFrame with its image replaced def replace_image(self, image: Image.Image): - return VideoFrame(image, self.timestamp, self.time_base, self.log_timestamps) + new_frame = VideoFrame(image, self.timestamp, self.time_base, self.log_timestamps) + new_frame.side_data = self.side_data + return new_frame class AudioFrame(InputFrame): samples: np.ndarray diff --git a/runner/app/main.py b/runner/app/main.py index 3efb3738d..0e25ec050 100644 --- a/runner/app/main.py +++ b/runner/app/main.py @@ -3,20 +3,25 @@ from contextlib import asynccontextmanager from app.routes import health, hardware -from fastapi import FastAPI +from fastapi import FastAPI, APIRouter from fastapi.routing import APIRoute from app.utils.hardware import HardwareInfo +from app.pipelines.base import Pipeline from app.live.log import config_logging from prometheus_client import Gauge, generate_latest, CONTENT_TYPE_LATEST from starlette.responses import Response +class RunnerFastAPI(FastAPI): + hardware_info_service: HardwareInfo + pipeline: Pipeline + config_logging(log_level=logging.DEBUG if os.getenv("VERBOSE_LOGGING")=="1" else logging.INFO) logger = logging.getLogger(__name__) VERSION = Gauge('version', 'Runner version', ['app', 'version']) @asynccontextmanager -async def lifespan(app: FastAPI): +async def lifespan(app: RunnerFastAPI): # Create application wide hardware info service. app.hardware_info_service = HardwareInfo() @@ -39,7 +44,7 @@ async def lifespan(app: FastAPI): logger.info("Shutting down") -def load_pipeline(pipeline: str, model_id: str) -> any: +def load_pipeline(pipeline: str, model_id: str) -> Pipeline: match pipeline: case "text-to-image": from app.pipelines.text_to_image import TextToImagePipeline @@ -89,7 +94,7 @@ def load_pipeline(pipeline: str, model_id: str) -> any: ) -def load_route(pipeline: str) -> any: +def load_route(pipeline: str) -> APIRouter: match pipeline: case "text-to-image": from app.routes import text_to_image @@ -143,7 +148,7 @@ def use_route_names_as_operation_ids(app: FastAPI) -> None: route.operation_id = route.name -app = FastAPI(lifespan=lifespan) +app = RunnerFastAPI(lifespan=lifespan) @app.get("/metrics", include_in_schema=False) async def metrics(): diff --git a/runner/app/pipelines/live_video_to_video.py b/runner/app/pipelines/live_video_to_video.py index 2c736faa7..dfd7054da 100644 --- a/runner/app/pipelines/live_video_to_video.py +++ b/runner/app/pipelines/live_video_to_video.py @@ -252,7 +252,7 @@ def read_proc_as_map(path: str) -> dict | str: with open(path, "r") as f: return f.read() - os_proc_info = {} + os_proc_info: dict[str, dict | str] = {} for proc_file in ["status", "wchan", "io"]: try: path = f"/proc/{pid}/{proc_file}" diff --git a/runner/app/pipelines/llm.py b/runner/app/pipelines/llm.py index b60fec53e..53c4a7d1c 100644 --- a/runner/app/pipelines/llm.py +++ b/runner/app/pipelines/llm.py @@ -172,7 +172,7 @@ async def generate( self, messages: List[Dict[str, str]], generation_config: Optional[GenerationConfig] = None, - ) -> AsyncGenerator[Dict[str, Any], None]: + ) -> AsyncGenerator[LLMResponse, None]: """Internal generation method""" start_time = time.time() config = generation_config or GenerationConfig() @@ -248,7 +248,7 @@ async def generate( duration = end_time - start_time logger.info(f"Generation completed in {duration:.2f}s") logger.info( - f" Time to first token: {(first_token_time - start_time):.2f} seconds") + f" Time to first token: {((first_token_time or 0) - start_time):.2f} seconds") logger.info(f" Total tokens: {total_tokens}") logger.info(f" Prompt tokens: {input_tokens}") logger.info(f" Generated tokens: {total_tokens}") @@ -288,9 +288,9 @@ async def generate( async def __call__( self, - messages: List[Dict[str, str]], + messages: List[Dict[str, str]] = [], **kwargs - ) -> AsyncGenerator[Union[str, Dict[str, Any]], None]: + ) -> AsyncGenerator[LLMResponse, None]: """ Generate responses for messages. diff --git a/runner/docker/Dockerfile.live-app__PIPELINE__ b/runner/docker/Dockerfile.live-app__PIPELINE__ index 3a71093de..7dcef9415 100644 --- a/runner/docker/Dockerfile.live-app__PIPELINE__ +++ b/runner/docker/Dockerfile.live-app__PIPELINE__ @@ -1,6 +1,8 @@ ARG PIPELINE=comfyui ARG BASE_IMAGE=livepeer/ai-runner:live-base-${PIPELINE} FROM ${BASE_IMAGE} +ARG PIPELINE +ENV PIPELINE=${PIPELINE} # Install latest stable Go version and system dependencies RUN apt-get update && apt-get install -y \ @@ -11,13 +13,16 @@ RUN apt-get update && apt-get install -y \ && apt-get clean && rm -rf /var/lib/apt/lists/* # Install any additional Python packages - COPY requirements.live-ai.txt /app/requirements.txt -RUN pip install --no-cache-dir -r /app/requirements.txt -# TODO: Figure out a way to have this in requirements file -RUN pip install --no-cache-dir triton==3.1.0 -RUN pip uninstall -y onnx onnxruntime onnxruntime-gpu -RUN pip install onnx==1.17.0 onnxruntime-gpu==1.17.0 +COPY requirements.live.comfyui.txt /app/requirements.comfyui.txt + +# Install requirements in normal env +RUN pip install --no-cache-dir -r /app/requirements.txt; + +# Also install in conda env if needed +RUN if [ "$PIPELINE" = "comfyui" ]; then \ + conda run -n comfystream pip install -r /app/requirements.comfyui.txt; \ +fi # Set environment variables ENV MAX_WORKERS=1 @@ -35,4 +40,4 @@ WORKDIR /app ARG GIT_SHA ENV GIT_SHA=$GIT_SHA -CMD ["uvicorn", "app.main:app", "--log-config", "app/cfg/uvicorn_logging_config.json", "--host", "", "--port", "8000"] +CMD ["uvicorn", "app.main:app", "--log-config", "app/cfg/uvicorn_logging_config.json", "--host", "", "--port", "8000"] \ No newline at end of file diff --git a/runner/docker/Dockerfile.live-base-comfyui b/runner/docker/Dockerfile.live-base-comfyui index 874aa6995..84eab9b5d 100644 --- a/runner/docker/Dockerfile.live-base-comfyui +++ b/runner/docker/Dockerfile.live-base-comfyui @@ -1,4 +1,4 @@ -ARG BASE_IMAGE=livepeer/comfyui-base:ai-runner-9166aa9 +ARG BASE_IMAGE=livepeer/comfyui-base:latest FROM ${BASE_IMAGE} # ----------------------------------------------------------------------------- @@ -8,6 +8,7 @@ FROM ${BASE_IMAGE} ENV DEBIAN_FRONTEND=noninteractive ENV NVIDIA_VISIBLE_DEVICES=all ENV NVIDIA_DRIVER_CAPABILITIES=compute,utility,video +ENV PATH="/workspace/miniconda3/bin:${PATH}" # Install prerequisites RUN apt-get update && \ @@ -72,7 +73,6 @@ RUN apt-get clean && rm -rf /var/lib/apt/lists/* # Set up ComfyUI workspace ENV COMFY_UI_WORKSPACE="/workspace/ComfyUI" -ENV PYTHONPATH=/workspace/ComfyUI:${PYTHONPATH:-} RUN mkdir -p /workspace/ComfyUI RUN rm -rf /workspace/ComfyUI/models && ln -s /models/ComfyUI--models /workspace/ComfyUI/models RUN rm -rf /workspace/ComfyUI/output && ln -s /models/ComfyUI--output /workspace/ComfyUI/output @@ -81,6 +81,5 @@ RUN rm -rf /workspace/ComfyUI/output && ln -s /models/ComfyUI--output /workspace RUN echo "source /workspace/miniconda3/etc/profile.d/conda.sh && conda activate comfystream" >> /conda_activate.sh && \ chmod +x /conda_activate.sh ENV BASH_ENV=/conda_activate.sh -SHELL ["/bin/bash", "-c"] # Ensure the app run from CMD is also in the comfystream conda environment (just wrap in bash and it will run the .bashrc above) ENTRYPOINT ["bash", "-c", "exec /opt/nvidia/nvidia_entrypoint.sh \"$@\"", "--"] diff --git a/runner/mypy.ini b/runner/mypy.ini new file mode 100644 index 000000000..f5e14b82d --- /dev/null +++ b/runner/mypy.ini @@ -0,0 +1,38 @@ +[mypy] +python_version = 3.11 +warn_return_any = True +warn_unused_configs = True +disallow_untyped_defs = False +disallow_incomplete_defs = False +check_untyped_defs = True +disallow_untyped_decorators = True +no_implicit_optional = True +ignore_missing_imports = True +implicit_reexport = True +warn_no_return = False +disallow_untyped_calls = False + +[mypy.plugins.numpy.*] +follow_imports = skip + +[mypy.plugins.pandas.*] +follow_imports = skip + +# Per-module options: +[mypy.app.*] +disallow_untyped_defs = True + +# Stricter typing specifically for the live folder +[mypy.app.live.*] +mypy_path = $MYPY_CONFIG_FILE_DIR/app/live +namespace_packages = True +disallow_untyped_defs = True +disallow_any_generics = True +disallow_incomplete_defs = True +disallow_untyped_calls = True +warn_redundant_casts = True +warn_return_any = True +warn_unreachable = True + +[mypy.tests.*] +disallow_untyped_defs = False diff --git a/runner/requirements-dev.txt b/runner/requirements-dev.txt index 121364943..372b02414 100644 --- a/runner/requirements-dev.txt +++ b/runner/requirements-dev.txt @@ -1,2 +1,3 @@ pytest pytest-mock +mypy diff --git a/runner/requirements.live-ai.txt b/runner/requirements.live-ai.txt index 70c1ac400..1e63b08af 100644 --- a/runner/requirements.live-ai.txt +++ b/runner/requirements.live-ai.txt @@ -6,7 +6,7 @@ Pillow==10.3.0 python-multipart==0.0.9 uvicorn==0.34.0 huggingface_hub==0.23.2 -triton>=2.1.0 +triton>=3.1.0 peft==0.11.1 deepcache==0.1.1 safetensors==0.4.3 diff --git a/runner/requirements.live.comfyui.txt b/runner/requirements.live.comfyui.txt new file mode 100644 index 000000000..4c87b0b58 --- /dev/null +++ b/runner/requirements.live.comfyui.txt @@ -0,0 +1,14 @@ +fastapi==0.111.0 +pydantic>=2.7.2 +Pillow==10.3.0 +python-multipart==0.0.9 +uvicorn==0.34.0 +triton>=2.1.0 +peft>=0.11.1 +deepcache>=0.1.1 +safetensors>=0.4.3 +scipy>=1.13.0 +numpy>=1.26.4 +--no-binary=av +#av<=14.0.0 +pyzmq==26.2.0 diff --git a/runner/run-lv2v.sh b/runner/run-lv2v.sh index e69a54720..d8ec102e2 100755 --- a/runner/run-lv2v.sh +++ b/runner/run-lv2v.sh @@ -1,15 +1,8 @@ #!/bin/bash set -ex -if [ $# -lt 2 ]; then - echo "Usage: $0 " - exit 1 -fi - -INPUT_ROOM=$1 -OUTPUT_ROOM=$2 -PIPELINE=${3:-comfyui} -PORT=${4:-9000} +PIPELINE=noop +PORT=8900 # Build images, this will be quick if everything is cached docker build -t livepeer/ai-runner:live-base -f docker/Dockerfile.live-base . @@ -39,14 +32,16 @@ echo "Waiting for server to start..." while ! grep -aq "Uvicorn running" ./run-lv2v.log; do sleep 1 done -sleep 2 -echo "Starting pipeline from ${INPUT_ROOM} to ${OUTPUT_ROOM}..." +sleep 5 set -x -curl -vvv http://localhost:${PORT}/live-video-to-video/ \ - -H 'Content-Type: application/json' \ - -d "{\"publish_url\":\"https://wwgcyxykwg9dys.transfix.ai/trickle/${OUTPUT_ROOM}\",\"subscribe_url\":\"https://wwgcyxykwg9dys.transfix.ai/trickle/${INPUT_ROOM}\"}" +curl --location 'http://127.0.0.1:8900/live-video-to-video/' \ +--header 'Content-Type: application/json' \ +--data '{ + "subscribe_url": "http://172.17.0.1:3389/sample", + "publish_url": "http://172.17.0.1:3389/sample-out" +}' # let docker container take over wait $DOCKER_PID