Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
},
"runArgs": [
"--gpus=all"
],
],
// Features to add to the dev container. More info: https://containers.dev/features.
// Configure tool-specific properties.
"customizations": {
Expand All @@ -31,9 +31,10 @@
"appPort": [
"8188:8188", // ComfyUI
"8889:8889", // ComfyStream
"3000:3000" // ComfyStream UI (optional)
"3000:3000", // ComfyStream UI (optional)
"8001:8001" // ComfyStream BYOC
],
"forwardPorts": [8188, 8889, 3000],
"forwardPorts": [8188, 8889, 3000, 8001],
// Use 'forwardPorts' to make a list of ports inside the container available locally.
// Use 'mounts' to make a list of local folders available inside the container.
"workspaceFolder": "/workspace/comfystream",
Expand Down
2 changes: 1 addition & 1 deletion .devcontainer/post-create.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ cd /workspace/comfystream

# Install Comfystream in editable mode.
echo -e "\e[32mInstalling Comfystream in editable mode...\e[0m"
/workspace/miniconda3/envs/comfystream/bin/python3 -m pip install -e .[server] -c src/comfystream/scripts/constraints.txt --root-user-action=ignore > /dev/null
/workspace/miniconda3/envs/comfystream/bin/python3 -m pip install -e .[dev,server] -c src/comfystream/scripts/constraints.txt --root-user-action=ignore > /dev/null

# Install npm packages if needed
if [ ! -d "/workspace/comfystream/ui/node_modules" ]; then
Expand Down
6 changes: 4 additions & 2 deletions docker/Dockerfile.base
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ARG BASE_IMAGE=nvidia/cuda:12.8.1-cudnn-devel-ubuntu22.04 \
ARG BASE_IMAGE=nvidia/cuda:12.8.1-devel-ubuntu22.04 \
CONDA_VERSION=latest \
PYTHON_VERSION=3.12

Expand All @@ -11,7 +11,8 @@ ENV DEBIAN_FRONTEND=noninteractive \
TensorRT_ROOT=/opt/TensorRT-10.12.0.36 \
CONDA_VERSION="${CONDA_VERSION}" \
PATH="/workspace/miniconda3/bin:${PATH}" \
PYTHON_VERSION="${PYTHON_VERSION}"
PYTHON_VERSION="${PYTHON_VERSION}" \
LD_LIBRARY_PATH="/workspace/miniconda3/envs/comfystream/lib:${LD_LIBRARY_PATH}"

# System dependencies
RUN apt update && apt install -yqq --no-install-recommends \
Expand Down Expand Up @@ -48,6 +49,7 @@ conda run -n comfystream --no-capture-output pip install wheel

RUN apt-get remove --purge -y libcudnn9-cuda-12 libcudnn9-dev-cuda-12 || true && \
apt-get autoremove -y && \
rm -rf /usr/local/cuda/lib64/libcudnn* /usr/lib/x86_64-linux-gnu/libcudnn* && \
rm -rf /var/lib/apt/lists/*

# Install numpy<2.0.0 first
Expand Down
4 changes: 2 additions & 2 deletions docker/Dockerfile.opencv
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ARG BASE_IMAGE=nvidia/cuda:12.8.1-cudnn-devel-ubuntu22.04 \
ARG BASE_IMAGE=nvidia/cuda:12.8.1-devel-ubuntu22.04 \
CONDA_VERSION=latest \
PYTHON_VERSION=3.12 \
CUDA_VERSION=12.8
Expand Down Expand Up @@ -112,7 +112,7 @@ RUN cd /workspace/opencv/build && \
-D HAVE_opencv_python3=ON \
-D WITH_NVCUVID=OFF \
-D WITH_NVCUVENC=OFF \
.. && \
.. && \
make -j$(nproc) && \
make install && \
ldconfig
Expand Down
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ dependencies = [
"toml",
"twilio",
"prometheus_client",
"librosa"
"librosa",
"mediapipe==0.10.15"
]

[project.optional-dependencies]
dev = ["pytest", "pytest-cov", "ruff"]
server = [
"pytrickle @ git+https://github.com/livepeer/pytrickle.git@v0.1.5"
"pytrickle @ git+https://github.com/livepeer/pytrickle.git@v0.1.7"
]

[project.urls]
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ toml
twilio
prometheus_client
librosa
mediapipe==0.10.15
93 changes: 80 additions & 13 deletions server/frame_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@

from comfystream.pipeline import Pipeline
from comfystream.pipeline_state import PipelineState
from comfystream.utils import convert_prompt
from comfystream.utils import (
convert_prompt,
get_default_workflow,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -151,6 +154,49 @@ async def _forward_text_loop():
except Exception:
logger.warning("Failed to set up text monitoring", exc_info=True)

def _set_loading_overlay(self, enabled: bool) -> bool:
"""Toggle the StreamProcessor loading overlay if available."""
processor = self._stream_processor
if not processor:
return False
try:
processor.set_loading_overlay(enabled)
logger.debug("Set loading overlay to %s", enabled)
return True
except Exception:
logger.warning("Failed to update loading overlay state", exc_info=True)
return False

def _schedule_overlay_reset_on_ingest_enabled(self) -> None:
"""Disable the loading overlay after pipeline ingest resumes."""
if not self.pipeline:
self._set_loading_overlay(False)
return

if self.pipeline.is_ingest_enabled():
self._set_loading_overlay(False)
return

async def _wait_for_ingest_enable():
try:
while True:
if self._stop_event.is_set():
break
if not self.pipeline:
break
if self.pipeline.is_ingest_enabled():
break
await asyncio.sleep(0.05)
except asyncio.CancelledError:
raise
except Exception:
logger.debug("Loading overlay watcher error", exc_info=True)
finally:
self._set_loading_overlay(False)

task = asyncio.create_task(_wait_for_ingest_enable())
self._background_tasks.append(task)

async def _stop_text_forwarder(self) -> None:
"""Stop the background text forwarder task if running."""
task = self._text_forward_task
Expand Down Expand Up @@ -212,16 +258,27 @@ async def on_stream_start(self, params: Optional[Dict[str, Any]] = None):
logger.info("Stream starting")
self._reset_stop_event()
logger.info(f"Stream start params: {params}")
overlay_managed = False

if not self.pipeline:
logger.debug("Stream start requested before pipeline initialization")
return

stream_params = normalize_stream_params(params)
stream_width = stream_params.get("width")
stream_height = stream_params.get("height")
stream_width = int(stream_width) if stream_width is not None else None
stream_height = int(stream_height) if stream_height is not None else None
prompt_payload = stream_params.pop("prompts", None)
if prompt_payload is None:
prompt_payload = stream_params.pop("prompt", None)

if not prompt_payload and not self.pipeline.state_manager.is_initialized():
logger.info(
"No prompts provided for new stream; applying default workflow for initialization"
)
prompt_payload = get_default_workflow()

if prompt_payload:
try:
await self._apply_stream_start_prompt(prompt_payload)
Expand All @@ -240,6 +297,19 @@ async def on_stream_start(self, params: Optional[Dict[str, Any]] = None):
logger.exception("Failed to process stream start parameters")
return

overlay_managed = self._set_loading_overlay(True)

try:
await self.pipeline.ensure_warmup(stream_width, stream_height)
except Exception:
if overlay_managed:
self._set_loading_overlay(False)
logger.exception("Failed to ensure pipeline warmup during stream start")
return

if overlay_managed:
self._schedule_overlay_reset_on_ingest_enabled()

try:
if (
self.pipeline.state != PipelineState.STREAMING
Expand Down Expand Up @@ -312,6 +382,12 @@ async def process_video_async(
if not self.pipeline:
return frame

# TODO: Do we really need this here?
await self.pipeline.ensure_warmup()

if not self.pipeline.state_manager.is_initialized():
return VideoProcessingResult.WITHHELD

# If pipeline ingestion is paused, withhold frame so pytrickle renders the overlay
if not self.pipeline.is_ingest_enabled():
return VideoProcessingResult.WITHHELD
Expand All @@ -324,18 +400,9 @@ async def process_video_async(
# Process through pipeline
await self.pipeline.put_video_frame(av_frame)

# Try to get processed frame with short timeout
try:
processed_av_frame = await asyncio.wait_for(
self.pipeline.get_processed_video_frame(),
timeout=self._stream_processor.overlay_config.auto_timeout_seconds,
)
processed_frame = VideoFrame.from_av_frame_with_timing(processed_av_frame, frame)
return processed_frame

except asyncio.TimeoutError:
# No frame ready yet - return withheld sentinel to trigger overlay
return VideoProcessingResult.WITHHELD
processed_av_frame = await self.pipeline.get_processed_video_frame()
processed_frame = VideoFrame.from_av_frame_with_timing(processed_av_frame, frame)
return processed_frame

except Exception as e:
logger.error(f"Video processing failed: {e}")
Expand Down
70 changes: 70 additions & 0 deletions src/comfystream/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ def __init__(
self._initialize_lock = asyncio.Lock()
self._ingest_enabled = True
self._prompt_update_lock = asyncio.Lock()
self._warmup_lock = asyncio.Lock()
self._warmup_task: Optional[asyncio.Task] = None
self._warmup_completed = False
self._last_warmup_resolution: Optional[tuple[int, int]] = None

@property
def state(self) -> PipelineState:
Expand Down Expand Up @@ -155,6 +159,10 @@ async def warmup(
await self.state_manager.transition_to(PipelineState.ERROR)
raise
finally:
if warmup_successful:
self._warmup_completed = True
self._last_warmup_resolution = (self.width, self.height)

if transitioned and warmup_successful:
try:
await self.state_manager.transition_to(PipelineState.READY)
Expand All @@ -168,6 +176,63 @@ async def warmup(
except Exception:
logger.exception("Failed to restore STREAMING state after warmup")

async def ensure_warmup(self, width: Optional[int] = None, height: Optional[int] = None):
"""Ensure the pipeline has been warmed up for the given resolution."""
if width and width > 0:
self.width = int(width)
if height and height > 0:
self.height = int(height)

if self._warmup_completed and self._last_warmup_resolution:
if (self.width, self.height) != self._last_warmup_resolution:
self._warmup_completed = False

if self._warmup_completed:
return

if not self.state_manager.is_initialized():
logger.debug("Skipping warmup scheduling - pipeline not initialized")
return

async with self._warmup_lock:
if self._warmup_completed:
return
if not self.state_manager.is_initialized():
return
if self._warmup_task and not self._warmup_task.done():
return

logger.info("Scheduling pipeline warmup for %sx%s", self.width, self.height)
self.disable_ingest()
self._warmup_task = asyncio.create_task(self._run_background_warmup())

async def _run_background_warmup(self):
try:
await self.warmup()
except asyncio.CancelledError:
logger.debug("Pipeline warmup task cancelled")
raise
except Exception:
logger.exception("Pipeline warmup failed")
finally:
self.enable_ingest()
self._warmup_task = None

async def _reset_warmup_state(self):
"""Reset warmup bookkeeping and cancel any in-flight warmup tasks."""
async with self._warmup_lock:
if self._warmup_task and not self._warmup_task.done():
self._warmup_task.cancel()
try:
await self._warmup_task
except asyncio.CancelledError:
pass
except Exception:
logger.debug("Warmup task raised during cancellation", exc_info=True)
self._warmup_task = None
self._warmup_completed = False
self._last_warmup_resolution = None

async def _run_warmup(
self,
*,
Expand Down Expand Up @@ -266,6 +331,8 @@ async def set_prompts(
skip_warmup: Skip automatic warmup even if auto_warmup is enabled
"""
try:
await self._reset_warmup_state()

prompt_list = prompts if isinstance(prompts, list) else [prompts]
await self.client.set_prompts(prompt_list)

Expand Down Expand Up @@ -312,6 +379,8 @@ async def update_prompts(
if was_streaming and should_warmup:
await self.state_manager.transition_to(PipelineState.READY)

await self._reset_warmup_state()

prompt_list = prompts if isinstance(prompts, list) else [prompts]
await self.client.update_prompts(prompt_list)

Expand Down Expand Up @@ -775,6 +844,7 @@ async def cleanup(self):
# Clear cached modalities and I/O capabilities since we're resetting
self._cached_modalities = None
self._cached_io_capabilities = None
await self._reset_warmup_state()

# Clear pipeline queues
await self._clear_pipeline_queues()
Expand Down
Loading