diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 1c184f5c..ab59e36d 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -8,7 +8,7 @@ }, "runArgs": [ "--gpus=all" - ], + ], // Features to add to the dev container. More info: https://containers.dev/features. // Configure tool-specific properties. "customizations": { @@ -31,9 +31,10 @@ "appPort": [ "8188:8188", // ComfyUI "8889:8889", // ComfyStream - "3000:3000" // ComfyStream UI (optional) + "3000:3000", // ComfyStream UI (optional) + "8001:8001" // ComfyStream BYOC ], - "forwardPorts": [8188, 8889, 3000], + "forwardPorts": [8188, 8889, 3000, 8001], // Use 'forwardPorts' to make a list of ports inside the container available locally. // Use 'mounts' to make a list of local folders available inside the container. "workspaceFolder": "/workspace/comfystream", diff --git a/.devcontainer/post-create.sh b/.devcontainer/post-create.sh index 9ac5dbec..88422d85 100755 --- a/.devcontainer/post-create.sh +++ b/.devcontainer/post-create.sh @@ -5,7 +5,7 @@ cd /workspace/comfystream # Install Comfystream in editable mode. echo -e "\e[32mInstalling Comfystream in editable mode...\e[0m" -/workspace/miniconda3/envs/comfystream/bin/python3 -m pip install -e .[server] -c src/comfystream/scripts/constraints.txt --root-user-action=ignore > /dev/null +/workspace/miniconda3/envs/comfystream/bin/python3 -m pip install -e .[dev,server] -c src/comfystream/scripts/constraints.txt --root-user-action=ignore > /dev/null # Install npm packages if needed if [ ! -d "/workspace/comfystream/ui/node_modules" ]; then diff --git a/docker/Dockerfile.base b/docker/Dockerfile.base index 4172e977..5bf6fb05 100644 --- a/docker/Dockerfile.base +++ b/docker/Dockerfile.base @@ -1,4 +1,4 @@ -ARG BASE_IMAGE=nvidia/cuda:12.8.1-cudnn-devel-ubuntu22.04 \ +ARG BASE_IMAGE=nvidia/cuda:12.8.1-devel-ubuntu22.04 \ CONDA_VERSION=latest \ PYTHON_VERSION=3.12 @@ -11,7 +11,8 @@ ENV DEBIAN_FRONTEND=noninteractive \ TensorRT_ROOT=/opt/TensorRT-10.12.0.36 \ CONDA_VERSION="${CONDA_VERSION}" \ PATH="/workspace/miniconda3/bin:${PATH}" \ - PYTHON_VERSION="${PYTHON_VERSION}" + PYTHON_VERSION="${PYTHON_VERSION}" \ + LD_LIBRARY_PATH="/workspace/miniconda3/envs/comfystream/lib:${LD_LIBRARY_PATH}" # System dependencies RUN apt update && apt install -yqq --no-install-recommends \ @@ -48,6 +49,7 @@ conda run -n comfystream --no-capture-output pip install wheel RUN apt-get remove --purge -y libcudnn9-cuda-12 libcudnn9-dev-cuda-12 || true && \ apt-get autoremove -y && \ + rm -rf /usr/local/cuda/lib64/libcudnn* /usr/lib/x86_64-linux-gnu/libcudnn* && \ rm -rf /var/lib/apt/lists/* # Install numpy<2.0.0 first diff --git a/docker/Dockerfile.opencv b/docker/Dockerfile.opencv index 848db1fe..2c526859 100644 --- a/docker/Dockerfile.opencv +++ b/docker/Dockerfile.opencv @@ -1,4 +1,4 @@ -ARG BASE_IMAGE=nvidia/cuda:12.8.1-cudnn-devel-ubuntu22.04 \ +ARG BASE_IMAGE=nvidia/cuda:12.8.1-devel-ubuntu22.04 \ CONDA_VERSION=latest \ PYTHON_VERSION=3.12 \ CUDA_VERSION=12.8 @@ -112,7 +112,7 @@ RUN cd /workspace/opencv/build && \ -D HAVE_opencv_python3=ON \ -D WITH_NVCUVID=OFF \ -D WITH_NVCUVENC=OFF \ - .. && \ + .. && \ make -j$(nproc) && \ make install && \ ldconfig diff --git a/pyproject.toml b/pyproject.toml index d9dfaece..1513fc70 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,13 +16,14 @@ dependencies = [ "toml", "twilio", "prometheus_client", - "librosa" + "librosa", + "mediapipe==0.10.15" ] [project.optional-dependencies] dev = ["pytest", "pytest-cov", "ruff"] server = [ - "pytrickle @ git+https://github.com/livepeer/pytrickle.git@v0.1.5" + "pytrickle @ git+https://github.com/livepeer/pytrickle.git@v0.1.7" ] [project.urls] diff --git a/requirements.txt b/requirements.txt index 3b6bd29d..7728cffd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,3 +7,4 @@ toml twilio prometheus_client librosa +mediapipe==0.10.15 diff --git a/server/frame_processor.py b/server/frame_processor.py index af927935..3dcb835d 100644 --- a/server/frame_processor.py +++ b/server/frame_processor.py @@ -11,7 +11,10 @@ from comfystream.pipeline import Pipeline from comfystream.pipeline_state import PipelineState -from comfystream.utils import convert_prompt +from comfystream.utils import ( + convert_prompt, + get_default_workflow, +) logger = logging.getLogger(__name__) @@ -151,6 +154,49 @@ async def _forward_text_loop(): except Exception: logger.warning("Failed to set up text monitoring", exc_info=True) + def _set_loading_overlay(self, enabled: bool) -> bool: + """Toggle the StreamProcessor loading overlay if available.""" + processor = self._stream_processor + if not processor: + return False + try: + processor.set_loading_overlay(enabled) + logger.debug("Set loading overlay to %s", enabled) + return True + except Exception: + logger.warning("Failed to update loading overlay state", exc_info=True) + return False + + def _schedule_overlay_reset_on_ingest_enabled(self) -> None: + """Disable the loading overlay after pipeline ingest resumes.""" + if not self.pipeline: + self._set_loading_overlay(False) + return + + if self.pipeline.is_ingest_enabled(): + self._set_loading_overlay(False) + return + + async def _wait_for_ingest_enable(): + try: + while True: + if self._stop_event.is_set(): + break + if not self.pipeline: + break + if self.pipeline.is_ingest_enabled(): + break + await asyncio.sleep(0.05) + except asyncio.CancelledError: + raise + except Exception: + logger.debug("Loading overlay watcher error", exc_info=True) + finally: + self._set_loading_overlay(False) + + task = asyncio.create_task(_wait_for_ingest_enable()) + self._background_tasks.append(task) + async def _stop_text_forwarder(self) -> None: """Stop the background text forwarder task if running.""" task = self._text_forward_task @@ -212,16 +258,27 @@ async def on_stream_start(self, params: Optional[Dict[str, Any]] = None): logger.info("Stream starting") self._reset_stop_event() logger.info(f"Stream start params: {params}") + overlay_managed = False if not self.pipeline: logger.debug("Stream start requested before pipeline initialization") return stream_params = normalize_stream_params(params) + stream_width = stream_params.get("width") + stream_height = stream_params.get("height") + stream_width = int(stream_width) if stream_width is not None else None + stream_height = int(stream_height) if stream_height is not None else None prompt_payload = stream_params.pop("prompts", None) if prompt_payload is None: prompt_payload = stream_params.pop("prompt", None) + if not prompt_payload and not self.pipeline.state_manager.is_initialized(): + logger.info( + "No prompts provided for new stream; applying default workflow for initialization" + ) + prompt_payload = get_default_workflow() + if prompt_payload: try: await self._apply_stream_start_prompt(prompt_payload) @@ -240,6 +297,19 @@ async def on_stream_start(self, params: Optional[Dict[str, Any]] = None): logger.exception("Failed to process stream start parameters") return + overlay_managed = self._set_loading_overlay(True) + + try: + await self.pipeline.ensure_warmup(stream_width, stream_height) + except Exception: + if overlay_managed: + self._set_loading_overlay(False) + logger.exception("Failed to ensure pipeline warmup during stream start") + return + + if overlay_managed: + self._schedule_overlay_reset_on_ingest_enabled() + try: if ( self.pipeline.state != PipelineState.STREAMING @@ -312,6 +382,12 @@ async def process_video_async( if not self.pipeline: return frame + # TODO: Do we really need this here? + await self.pipeline.ensure_warmup() + + if not self.pipeline.state_manager.is_initialized(): + return VideoProcessingResult.WITHHELD + # If pipeline ingestion is paused, withhold frame so pytrickle renders the overlay if not self.pipeline.is_ingest_enabled(): return VideoProcessingResult.WITHHELD @@ -324,18 +400,9 @@ async def process_video_async( # Process through pipeline await self.pipeline.put_video_frame(av_frame) - # Try to get processed frame with short timeout - try: - processed_av_frame = await asyncio.wait_for( - self.pipeline.get_processed_video_frame(), - timeout=self._stream_processor.overlay_config.auto_timeout_seconds, - ) - processed_frame = VideoFrame.from_av_frame_with_timing(processed_av_frame, frame) - return processed_frame - - except asyncio.TimeoutError: - # No frame ready yet - return withheld sentinel to trigger overlay - return VideoProcessingResult.WITHHELD + processed_av_frame = await self.pipeline.get_processed_video_frame() + processed_frame = VideoFrame.from_av_frame_with_timing(processed_av_frame, frame) + return processed_frame except Exception as e: logger.error(f"Video processing failed: {e}") diff --git a/src/comfystream/pipeline.py b/src/comfystream/pipeline.py index cf2302de..9143e591 100644 --- a/src/comfystream/pipeline.py +++ b/src/comfystream/pipeline.py @@ -72,6 +72,10 @@ def __init__( self._initialize_lock = asyncio.Lock() self._ingest_enabled = True self._prompt_update_lock = asyncio.Lock() + self._warmup_lock = asyncio.Lock() + self._warmup_task: Optional[asyncio.Task] = None + self._warmup_completed = False + self._last_warmup_resolution: Optional[tuple[int, int]] = None @property def state(self) -> PipelineState: @@ -155,6 +159,10 @@ async def warmup( await self.state_manager.transition_to(PipelineState.ERROR) raise finally: + if warmup_successful: + self._warmup_completed = True + self._last_warmup_resolution = (self.width, self.height) + if transitioned and warmup_successful: try: await self.state_manager.transition_to(PipelineState.READY) @@ -168,6 +176,63 @@ async def warmup( except Exception: logger.exception("Failed to restore STREAMING state after warmup") + async def ensure_warmup(self, width: Optional[int] = None, height: Optional[int] = None): + """Ensure the pipeline has been warmed up for the given resolution.""" + if width and width > 0: + self.width = int(width) + if height and height > 0: + self.height = int(height) + + if self._warmup_completed and self._last_warmup_resolution: + if (self.width, self.height) != self._last_warmup_resolution: + self._warmup_completed = False + + if self._warmup_completed: + return + + if not self.state_manager.is_initialized(): + logger.debug("Skipping warmup scheduling - pipeline not initialized") + return + + async with self._warmup_lock: + if self._warmup_completed: + return + if not self.state_manager.is_initialized(): + return + if self._warmup_task and not self._warmup_task.done(): + return + + logger.info("Scheduling pipeline warmup for %sx%s", self.width, self.height) + self.disable_ingest() + self._warmup_task = asyncio.create_task(self._run_background_warmup()) + + async def _run_background_warmup(self): + try: + await self.warmup() + except asyncio.CancelledError: + logger.debug("Pipeline warmup task cancelled") + raise + except Exception: + logger.exception("Pipeline warmup failed") + finally: + self.enable_ingest() + self._warmup_task = None + + async def _reset_warmup_state(self): + """Reset warmup bookkeeping and cancel any in-flight warmup tasks.""" + async with self._warmup_lock: + if self._warmup_task and not self._warmup_task.done(): + self._warmup_task.cancel() + try: + await self._warmup_task + except asyncio.CancelledError: + pass + except Exception: + logger.debug("Warmup task raised during cancellation", exc_info=True) + self._warmup_task = None + self._warmup_completed = False + self._last_warmup_resolution = None + async def _run_warmup( self, *, @@ -266,6 +331,8 @@ async def set_prompts( skip_warmup: Skip automatic warmup even if auto_warmup is enabled """ try: + await self._reset_warmup_state() + prompt_list = prompts if isinstance(prompts, list) else [prompts] await self.client.set_prompts(prompt_list) @@ -312,6 +379,8 @@ async def update_prompts( if was_streaming and should_warmup: await self.state_manager.transition_to(PipelineState.READY) + await self._reset_warmup_state() + prompt_list = prompts if isinstance(prompts, list) else [prompts] await self.client.update_prompts(prompt_list) @@ -775,6 +844,7 @@ async def cleanup(self): # Clear cached modalities and I/O capabilities since we're resetting self._cached_modalities = None self._cached_io_capabilities = None + await self._reset_warmup_state() # Clear pipeline queues await self._clear_pipeline_queues()