From 527346e191f52e7335c090dc8ae75e25fba05e0d Mon Sep 17 00:00:00 2001 From: John | Elite Encoder Date: Fri, 11 Apr 2025 02:57:58 +0000 Subject: [PATCH 1/2] refactor comfystream to export server and pipeline --- nodes/server_manager.py | 2 +- src/comfystream/__init__.py | 17 +++ {server => src/comfystream}/pipeline.py | 105 +++++++++++++++--- .../comfystream/server}/__init__.py | 0 {server => src/comfystream/server}/app.py | 13 +-- .../comfystream/server}/metrics/__init__.py | 0 .../server}/metrics/prometheus_metrics.py | 0 .../server}/metrics/stream_stats.py | 0 .../comfystream/server}/utils/__init__.py | 0 .../comfystream/server}/utils/fps_meter.py | 2 +- .../comfystream/server}/utils/utils.py | 0 11 files changed, 114 insertions(+), 25 deletions(-) rename {server => src/comfystream}/pipeline.py (61%) rename {server => src/comfystream/server}/__init__.py (100%) rename {server => src/comfystream/server}/app.py (98%) rename {server => src/comfystream/server}/metrics/__init__.py (100%) rename {server => src/comfystream/server}/metrics/prometheus_metrics.py (100%) rename {server => src/comfystream/server}/metrics/stream_stats.py (100%) rename {server => src/comfystream/server}/utils/__init__.py (100%) rename {server => src/comfystream/server}/utils/fps_meter.py (98%) rename {server => src/comfystream/server}/utils/utils.py (100%) diff --git a/nodes/server_manager.py b/nodes/server_manager.py index a7436d33..454b1dc0 100644 --- a/nodes/server_manager.py +++ b/nodes/server_manager.py @@ -148,7 +148,7 @@ async def start(self, port=None, host=None): self.host = host # Get the path to the ComfyStream server directory and script - server_dir = Path(__file__).parent.parent / "server" + server_dir = Path(__file__).parent.parent / "src" / "comfystream" / "server" server_script = server_dir / "app.py" logging.info(f"Server script: {server_script}") diff --git a/src/comfystream/__init__.py b/src/comfystream/__init__.py index e69de29b..5e1b9fac 100644 --- a/src/comfystream/__init__.py +++ b/src/comfystream/__init__.py @@ -0,0 +1,17 @@ +from .client import ComfyStreamClient +from .pipeline import Pipeline +from .server.utils import temporary_log_level +from .server.app import VideoStreamTrack, AudioStreamTrack +from .server.utils import FPSMeter +from .server.metrics import MetricsManager, StreamStatsManager + +__all__ = [ + 'ComfyStreamClient', + 'Pipeline', + 'temporary_log_level', + 'VideoStreamTrack', + 'AudioStreamTrack', + 'FPSMeter', + 'MetricsManager', + 'StreamStatsManager' +] diff --git a/server/pipeline.py b/src/comfystream/pipeline.py similarity index 61% rename from server/pipeline.py rename to src/comfystream/pipeline.py index d781639e..a5776dfc 100644 --- a/server/pipeline.py +++ b/src/comfystream/pipeline.py @@ -3,10 +3,10 @@ import numpy as np import asyncio import logging +from typing import Any, Dict, Union, List, Optional -from typing import Any, Dict, Union, List from comfystream.client import ComfyStreamClient -from utils import temporary_log_level +from comfystream.server.utils import temporary_log_level WARMUP_RUNS = 5 @@ -14,16 +14,27 @@ class Pipeline: - def __init__(self, width=512, height=512, comfyui_inference_log_level: int = None, **kwargs): + """A pipeline for processing video and audio frames using ComfyUI. + + This class provides a high-level interface for processing video and audio frames + through a ComfyUI-based processing pipeline. It handles frame preprocessing, + postprocessing, and queue management. + """ + + def __init__(self, width: int = 512, height: int = 512, + comfyui_inference_log_level: Optional[int] = None, **kwargs): """Initialize the pipeline with the given configuration. + Args: + width: Width of the video frames (default: 512) + height: Height of the video frames (default: 512) comfyui_inference_log_level: The logging level for ComfyUI inference. Defaults to None, using the global ComfyUI log level. **kwargs: Additional arguments to pass to the ComfyStreamClient """ self.client = ComfyStreamClient(**kwargs) - self.width = kwargs.get("width", 512) - self.height = kwargs.get("height", 512) + self.width = width + self.height = height self.video_incoming_frames = asyncio.Queue() self.audio_incoming_frames = asyncio.Queue() @@ -33,7 +44,8 @@ def __init__(self, width=512, height=512, comfyui_inference_log_level: int = Non self._comfyui_inference_log_level = comfyui_inference_log_level async def warm_video(self): - # Create dummy frame with the CURRENT resolution settings (which might have been updated via control channel) + """Warm up the video processing pipeline with dummy frames.""" + # Create dummy frame with the CURRENT resolution settings dummy_frame = av.VideoFrame() dummy_frame.side_data.input = torch.randn(1, self.height, self.width, 3) @@ -44,6 +56,7 @@ async def warm_video(self): await self.client.get_video_output() async def warm_audio(self): + """Warm up the audio processing pipeline with dummy frames.""" dummy_frame = av.AudioFrame() dummy_frame.side_data.input = np.random.randint(-32768, 32767, int(48000 * 0.5), dtype=np.int16) # TODO: adds a lot of delay if it doesn't match the buffer size, is warmup needed? dummy_frame.sample_rate = 48000 @@ -53,60 +66,121 @@ async def warm_audio(self): await self.client.get_audio_output() async def set_prompts(self, prompts: Union[Dict[Any, Any], List[Dict[Any, Any]]]): + """Set the processing prompts for the pipeline. + + Args: + prompts: Either a single prompt dictionary or a list of prompt dictionaries + """ if isinstance(prompts, list): await self.client.set_prompts(prompts) else: await self.client.set_prompts([prompts]) async def update_prompts(self, prompts: Union[Dict[Any, Any], List[Dict[Any, Any]]]): + """Update the existing processing prompts. + + Args: + prompts: Either a single prompt dictionary or a list of prompt dictionaries + """ if isinstance(prompts, list): await self.client.update_prompts(prompts) else: await self.client.update_prompts([prompts]) async def put_video_frame(self, frame: av.VideoFrame): + """Queue a video frame for processing. + + Args: + frame: The video frame to process + """ frame.side_data.input = self.video_preprocess(frame) frame.side_data.skipped = True self.client.put_video_input(frame) await self.video_incoming_frames.put(frame) async def put_audio_frame(self, frame: av.AudioFrame): + """Queue an audio frame for processing. + + Args: + frame: The audio frame to process + """ frame.side_data.input = self.audio_preprocess(frame) frame.side_data.skipped = True self.client.put_audio_input(frame) await self.audio_incoming_frames.put(frame) def video_preprocess(self, frame: av.VideoFrame) -> Union[torch.Tensor, np.ndarray]: + """Preprocess a video frame before processing. + + Args: + frame: The video frame to preprocess + + Returns: + The preprocessed frame as a tensor or numpy array + """ frame_np = frame.to_ndarray(format="rgb24").astype(np.float32) / 255.0 return torch.from_numpy(frame_np).unsqueeze(0) def audio_preprocess(self, frame: av.AudioFrame) -> Union[torch.Tensor, np.ndarray]: + """Preprocess an audio frame before processing. + + Args: + frame: The audio frame to preprocess + + Returns: + The preprocessed frame as a tensor or numpy array + """ return frame.to_ndarray().ravel().reshape(-1, 2).mean(axis=1).astype(np.int16) def video_postprocess(self, output: Union[torch.Tensor, np.ndarray]) -> av.VideoFrame: + """Postprocess a video frame after processing. + + Args: + output: The processed output tensor or numpy array + + Returns: + The postprocessed video frame + """ return av.VideoFrame.from_ndarray( (output * 255.0).clamp(0, 255).to(dtype=torch.uint8).squeeze(0).cpu().numpy() ) def audio_postprocess(self, output: Union[torch.Tensor, np.ndarray]) -> av.AudioFrame: + """Postprocess an audio frame after processing. + + Args: + output: The processed output tensor or numpy array + + Returns: + The postprocessed audio frame + """ return av.AudioFrame.from_ndarray(np.repeat(output, 2).reshape(1, -1)) - async def get_processed_video_frame(self): - # TODO: make it generic to support purely generative video cases + # TODO: make it generic to support purely generative video cases + async def get_processed_video_frame(self) -> av.VideoFrame: + """Get the next processed video frame. + + Returns: + The processed video frame + """ async with temporary_log_level("comfy", self._comfyui_inference_log_level): out_tensor = await self.client.get_video_output() frame = await self.video_incoming_frames.get() while frame.side_data.skipped: frame = await self.video_incoming_frames.get() - processed_frame = self.video_postprocess(out_tensor) + processed_frame = self.video_postprocess(out_tensor) processed_frame.pts = frame.pts processed_frame.time_base = frame.time_base return processed_frame - async def get_processed_audio_frame(self): - # TODO: make it generic to support purely generative audio cases and also add frame skipping + async def get_processed_audio_frame(self) -> av.AudioFrame: + """Get the next processed audio frame. + + Returns: + The processed audio frame + """ frame = await self.audio_incoming_frames.get() if frame.samples > len(self.processed_audio_buffer): async with temporary_log_level("comfy", self._comfyui_inference_log_level): @@ -123,9 +197,14 @@ async def get_processed_audio_frame(self): return processed_frame async def get_nodes_info(self) -> Dict[str, Any]: - """Get information about all nodes in the current prompt including metadata.""" + """Get information about all nodes in the current prompt including metadata. + + Returns: + Dictionary containing node information + """ nodes_info = await self.client.get_available_nodes() return nodes_info async def cleanup(self): - await self.client.cleanup() + """Clean up resources used by the pipeline.""" + await self.client.cleanup() \ No newline at end of file diff --git a/server/__init__.py b/src/comfystream/server/__init__.py similarity index 100% rename from server/__init__.py rename to src/comfystream/server/__init__.py diff --git a/server/app.py b/src/comfystream/server/app.py similarity index 98% rename from server/app.py rename to src/comfystream/server/app.py index 83bc943f..676b3b74 100644 --- a/server/app.py +++ b/src/comfystream/server/app.py @@ -5,13 +5,6 @@ import os import sys -import torch - -# Initialize CUDA before any other imports to prevent core dump. -if torch.cuda.is_available(): - torch.cuda.init() - - from aiohttp import web from aiortc import ( MediaStreamTrack, @@ -22,10 +15,10 @@ ) from aiortc.codecs import h264 from aiortc.rtcrtpsender import RTCRtpSender -from pipeline import Pipeline +from comfystream.pipeline import Pipeline from twilio.rest import Client -from utils import patch_loop_datagram, add_prefix_to_app_routes, FPSMeter -from metrics import MetricsManager, StreamStatsManager +from comfystream.server.utils import patch_loop_datagram, add_prefix_to_app_routes, FPSMeter +from comfystream.server.metrics import MetricsManager, StreamStatsManager import time logger = logging.getLogger(__name__) diff --git a/server/metrics/__init__.py b/src/comfystream/server/metrics/__init__.py similarity index 100% rename from server/metrics/__init__.py rename to src/comfystream/server/metrics/__init__.py diff --git a/server/metrics/prometheus_metrics.py b/src/comfystream/server/metrics/prometheus_metrics.py similarity index 100% rename from server/metrics/prometheus_metrics.py rename to src/comfystream/server/metrics/prometheus_metrics.py diff --git a/server/metrics/stream_stats.py b/src/comfystream/server/metrics/stream_stats.py similarity index 100% rename from server/metrics/stream_stats.py rename to src/comfystream/server/metrics/stream_stats.py diff --git a/server/utils/__init__.py b/src/comfystream/server/utils/__init__.py similarity index 100% rename from server/utils/__init__.py rename to src/comfystream/server/utils/__init__.py diff --git a/server/utils/fps_meter.py b/src/comfystream/server/utils/fps_meter.py similarity index 98% rename from server/utils/fps_meter.py rename to src/comfystream/server/utils/fps_meter.py index ce94317b..87e75d46 100644 --- a/server/utils/fps_meter.py +++ b/src/comfystream/server/utils/fps_meter.py @@ -4,7 +4,7 @@ import logging import time from collections import deque -from metrics import MetricsManager +from comfystream.server.metrics import MetricsManager logger = logging.getLogger(__name__) diff --git a/server/utils/utils.py b/src/comfystream/server/utils/utils.py similarity index 100% rename from server/utils/utils.py rename to src/comfystream/server/utils/utils.py From a397d457298f78adf56b2e6d8600756058607c8a Mon Sep 17 00:00:00 2001 From: Elite Encoder Date: Tue, 15 Apr 2025 11:33:43 -0400 Subject: [PATCH 2/2] rename temporary_log_level to set_temporary_log_level --- src/comfystream/__init__.py | 2 +- src/comfystream/pipeline.py | 6 +++--- src/comfystream/server/utils/__init__.py | 2 +- src/comfystream/server/utils/utils.py | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/comfystream/__init__.py b/src/comfystream/__init__.py index 5e1b9fac..cd3528e7 100644 --- a/src/comfystream/__init__.py +++ b/src/comfystream/__init__.py @@ -1,6 +1,6 @@ from .client import ComfyStreamClient from .pipeline import Pipeline -from .server.utils import temporary_log_level +from .server.utils import set_temporary_log_level from .server.app import VideoStreamTrack, AudioStreamTrack from .server.utils import FPSMeter from .server.metrics import MetricsManager, StreamStatsManager diff --git a/src/comfystream/pipeline.py b/src/comfystream/pipeline.py index a5776dfc..cd6d93fe 100644 --- a/src/comfystream/pipeline.py +++ b/src/comfystream/pipeline.py @@ -6,7 +6,7 @@ from typing import Any, Dict, Union, List, Optional from comfystream.client import ComfyStreamClient -from comfystream.server.utils import temporary_log_level +from comfystream.server.utils import set_temporary_log_level WARMUP_RUNS = 5 @@ -163,7 +163,7 @@ async def get_processed_video_frame(self) -> av.VideoFrame: Returns: The processed video frame """ - async with temporary_log_level("comfy", self._comfyui_inference_log_level): + async with set_temporary_log_level("comfy", self._comfyui_inference_log_level): out_tensor = await self.client.get_video_output() frame = await self.video_incoming_frames.get() while frame.side_data.skipped: @@ -183,7 +183,7 @@ async def get_processed_audio_frame(self) -> av.AudioFrame: """ frame = await self.audio_incoming_frames.get() if frame.samples > len(self.processed_audio_buffer): - async with temporary_log_level("comfy", self._comfyui_inference_log_level): + async with set_temporary_log_level("comfy", self._comfyui_inference_log_level): out_tensor = await self.client.get_audio_output() self.processed_audio_buffer = np.concatenate([self.processed_audio_buffer, out_tensor]) out_data = self.processed_audio_buffer[:frame.samples] diff --git a/src/comfystream/server/utils/__init__.py b/src/comfystream/server/utils/__init__.py index daa71bb1..f28eca91 100644 --- a/src/comfystream/server/utils/__init__.py +++ b/src/comfystream/server/utils/__init__.py @@ -1,2 +1,2 @@ -from .utils import patch_loop_datagram, add_prefix_to_app_routes, temporary_log_level +from .utils import patch_loop_datagram, add_prefix_to_app_routes, set_temporary_log_level from .fps_meter import FPSMeter diff --git a/src/comfystream/server/utils/utils.py b/src/comfystream/server/utils/utils.py index c7a7ac30..f21a8329 100644 --- a/src/comfystream/server/utils/utils.py +++ b/src/comfystream/server/utils/utils.py @@ -67,7 +67,7 @@ def add_prefix_to_app_routes(app: web.Application, prefix: str): @asynccontextmanager -async def temporary_log_level(logger_name: str, level: int): +async def set_temporary_log_level(logger_name: str, level: int): """Temporarily set the log level of a logger. Args: