From ff6bf03813bd1fb0ebb5d337a47146f96e4dd88f Mon Sep 17 00:00:00 2001 From: Kirby Robinson Date: Tue, 5 Aug 2025 11:04:15 -0400 Subject: [PATCH 1/6] Add Timer to Inference Block --- ComfyUI/execution.py | 117 +++++++++++-------------------------------- 1 file changed, 28 insertions(+), 89 deletions(-) diff --git a/ComfyUI/execution.py b/ComfyUI/execution.py index 15ff7567..7fa239df 100644 --- a/ComfyUI/execution.py +++ b/ComfyUI/execution.py @@ -18,6 +18,13 @@ from comfy_execution.caching import HierarchicalCache, LRUCache, DependencyAwareCache, CacheKeySetInputSignature, CacheKeySetID from comfy_execution.validation import validate_node_input +from pynvml import * + +nvmlInit() +handle = nvmlDeviceGetHandleByIndex(0) +gpu_name = nvmlDeviceGetName(handle).decode() +gpu_driver = nvmlSystemGetDriverVersion().decode() + class ExecutionResult(Enum): SUCCESS = 0 FAILURE = 1 @@ -275,10 +282,11 @@ def execute(server, dynprompt, caches, current_item, extra_data, executed, promp inputs = dynprompt.get_node(unique_id)['inputs'] class_type = dynprompt.get_node(unique_id)['class_type'] class_def = nodes.NODE_CLASS_MAPPINGS[class_type] + if caches.outputs.get(unique_id) is not None: if server.client_id is not None: cached_output = caches.ui.get(unique_id) or {} - server.send_sync("executed", { "node": unique_id, "display_node": display_node_id, "output": cached_output.get("output",None), "prompt_id": prompt_id }, server.client_id) + server.send_sync("executed", {"node": unique_id, "display_node": display_node_id, "output": cached_output.get("output", None), "prompt_id": prompt_id}, server.client_id) return (ExecutionResult.SUCCESS, None, None) input_data_all = None @@ -297,7 +305,6 @@ def execute(server, dynprompt, caches, current_item, extra_data, executed, promp node_output = caches.outputs.get(source_node)[source_output] for o in node_output: resolved_output.append(o) - else: resolved_output.append(r) resolved_outputs.append(tuple(resolved_output)) @@ -308,7 +315,7 @@ def execute(server, dynprompt, caches, current_item, extra_data, executed, promp input_data_all, missing_keys = get_input_data(inputs, class_def, unique_id, caches.outputs, dynprompt, extra_data) if server.client_id is not None: server.last_node_id = display_node_id - server.send_sync("executing", { "node": unique_id, "display_node": display_node_id, "prompt_id": prompt_id }, server.client_id) + server.send_sync("executing", {"node": unique_id, "display_node": display_node_id, "prompt_id": prompt_id}, server.client_id) obj = caches.objects.get(unique_id) if obj is None: @@ -317,10 +324,8 @@ def execute(server, dynprompt, caches, current_item, extra_data, executed, promp if hasattr(obj, "check_lazy_status"): required_inputs = _map_node_over_list(obj, input_data_all, "check_lazy_status", allow_interrupt=True) - required_inputs = set(sum([r for r in required_inputs if isinstance(r,list)], [])) - required_inputs = [x for x in required_inputs if isinstance(x,str) and ( - x not in input_data_all or x in missing_keys - )] + required_inputs = set(sum([r for r in required_inputs if isinstance(r, list)], [])) + required_inputs = [x for x in required_inputs if isinstance(x, str) and (x not in input_data_all or x in missing_keys)] if len(required_inputs) > 0: for i in required_inputs: execution_list.make_input_strong_link(unique_id, i) @@ -333,7 +338,6 @@ def execution_block_cb(block): "node_id": unique_id, "node_type": class_type, "executed": list(executed), - "exception_message": f"Execution Blocked: {block.message}", "exception_type": "ExecutionBlocked", "traceback": [], @@ -344,94 +348,29 @@ def execution_block_cb(block): return ExecutionBlocker(None) else: return block + def pre_execute_cb(call_index): GraphBuilder.set_default_prefix(unique_id, call_index, 0) + + # TIMING START + start_time = time.perf_counter() output_data, output_ui, has_subgraph = get_output_data(obj, input_data_all, execution_block_cb=execution_block_cb, pre_execute_cb=pre_execute_cb) - if len(output_ui) > 0: - caches.ui.set(unique_id, { - "meta": { - "node_id": unique_id, - "display_node": display_node_id, - "parent_node": parent_node_id, - "real_node_id": real_node_id, - }, - "output": output_ui - }) - if server.client_id is not None: - server.send_sync("executed", { "node": unique_id, "display_node": display_node_id, "output": output_ui, "prompt_id": prompt_id }, server.client_id) - if has_subgraph: - cached_outputs = [] - new_node_ids = [] - new_output_ids = [] - new_output_links = [] - for i in range(len(output_data)): - new_graph, node_outputs = output_data[i] - if new_graph is None: - cached_outputs.append((False, node_outputs)) - else: - # Check for conflicts - for node_id in new_graph.keys(): - if dynprompt.has_node(node_id): - raise DuplicateNodeError(f"Attempt to add duplicate node {node_id}. Ensure node ids are unique and deterministic or use graph_utils.GraphBuilder.") - for node_id, node_info in new_graph.items(): - new_node_ids.append(node_id) - display_id = node_info.get("override_display_id", unique_id) - dynprompt.add_ephemeral_node(node_id, node_info, unique_id, display_id) - # Figure out if the newly created node is an output node - class_type = node_info["class_type"] - class_def = nodes.NODE_CLASS_MAPPINGS[class_type] - if hasattr(class_def, 'OUTPUT_NODE') and class_def.OUTPUT_NODE == True: - new_output_ids.append(node_id) - for i in range(len(node_outputs)): - if is_link(node_outputs[i]): - from_node_id, from_socket = node_outputs[i][0], node_outputs[i][1] - new_output_links.append((from_node_id, from_socket)) - cached_outputs.append((True, node_outputs)) - new_node_ids = set(new_node_ids) - for cache in caches.all: - cache.ensure_subcache_for(unique_id, new_node_ids).clean_unused() - for node_id in new_output_ids: - execution_list.add_node(node_id) - for link in new_output_links: - execution_list.add_strong_link(link[0], link[1], unique_id) - pending_subgraph_results[unique_id] = cached_outputs - return (ExecutionResult.PENDING, None, None) - caches.outputs.set(unique_id, output_data) - except comfy.model_management.InterruptProcessingException as iex: - logging.info("Processing interrupted") - - # skip formatting inputs/outputs - error_details = { - "node_id": real_node_id, - } + end_time = time.perf_counter() + elapsed_time = end_time - start_time - return (ExecutionResult.FAILURE, error_details, iex) - except Exception as ex: - typ, _, tb = sys.exc_info() - exception_type = full_type_name(typ) - input_data_formatted = {} - if input_data_all is not None: - input_data_formatted = {} - for name, inputs in input_data_all.items(): - input_data_formatted[name] = [format_value(x) for x in inputs] - - logging.error(f"!!! Exception during processing !!! {ex}") - logging.error(traceback.format_exc()) - - error_details = { - "node_id": real_node_id, - "exception_message": str(ex), - "exception_type": exception_type, - "traceback": traceback.format_tb(tb), - "current_inputs": input_data_formatted - } - if isinstance(ex, comfy.model_management.OOM_EXCEPTION): - logging.error("Got an OOM, unloading all loaded models.") - comfy.model_management.unload_all_models() + with open("inference_trace.csv", "a") as f: + f.write(f"{unique_id},{elapsed_time:.4f},{gpu_name},{gpu_driver}\n") + # TIMING END - return (ExecutionResult.FAILURE, error_details, ex) + except Exception as e: + logging.error(f"Exception during execution of node {unique_id}: {e}") + return (ExecutionResult.FAILURE, e, e) + caches.outputs.set(unique_id, output_data) + caches.ui.set(unique_id, {"output": output_ui}) executed.add(unique_id) + if server.client_id is not None: + server.send_sync("executed", {"node": unique_id, "display_node": display_node_id, "output": output_ui, "prompt_id": prompt_id}, server.client_id) return (ExecutionResult.SUCCESS, None, None) From 03722845c513e4941c157990e8642f237bd97aeb Mon Sep 17 00:00:00 2001 From: Kirby Robinson Date: Tue, 5 Aug 2025 11:53:19 -0400 Subject: [PATCH 2/6] Add badge route --- .../routes/internal/internal_routes.py | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/ComfyUI/api_server/routes/internal/internal_routes.py b/ComfyUI/api_server/routes/internal/internal_routes.py index 613b0f7c..f0225798 100644 --- a/ComfyUI/api_server/routes/internal/internal_routes.py +++ b/ComfyUI/api_server/routes/internal/internal_routes.py @@ -4,6 +4,7 @@ from api_server.services.terminal_service import TerminalService import app.logger import os +import csv class InternalRoutes: ''' @@ -63,6 +64,28 @@ async def get_files(request: web.Request) -> web.Response: key=lambda entry: -entry.stat().st_mtime ) return web.json_response([entry.name for entry in sorted_files], status=200) + + @self.routes.get('/inference/badge') + async def get_inference_badge(request): + csv_path = "inference_trace.csv" + if not os.path.exists(csv_path): + return web.json_response({"badge": "N/A"}) + + times = [] + gpu_name = "Unknown" + + with open(csv_path, newline='') as csvfile: + reader = csv.DictReader(csvfile) + for row in reader: + times.append(float(row["wall_time_sec"])) + gpu_name = row["gpu_name"] + + if not times: + return web.json_response({"badge": "N/A"}) + + avg_time = sum(times) / len(times) + badge = f"{avg_time:.2f} s per image · {gpu_name}" + return web.json_response({"badge": badge}) def get_app(self): From 1b975b456b778fc0cf5a26899696bd1defecb92c Mon Sep 17 00:00:00 2001 From: Kirby Robinson Date: Tue, 5 Aug 2025 13:58:21 -0400 Subject: [PATCH 3/6] InferenceBadge UI component to be imported --- .../src/components/InferenceBadge.tsx | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 dream_layer_frontend/src/components/InferenceBadge.tsx diff --git a/dream_layer_frontend/src/components/InferenceBadge.tsx b/dream_layer_frontend/src/components/InferenceBadge.tsx new file mode 100644 index 00000000..add1c0ab --- /dev/null +++ b/dream_layer_frontend/src/components/InferenceBadge.tsx @@ -0,0 +1,23 @@ +import { useEffect, useState } from "react"; + +const InferenceBadge = () => { + const [badge, setBadge] = useState("⏱ Loading..."); + + useEffect(() => { + fetch("/internal/inference/badge") + .then((res) => res.json()) + .then((data) => setBadge(data.badge || "N/A")) + .catch((err) => { + console.error("Badge fetch failed", err); + setBadge("⚠️ Error"); + }); + }, []); + + return ( +
+ {badge} +
+ ); +}; + +export default InferenceBadge; From 932d194a79416c02749b49c5e6c02f63ca9ce1d3 Mon Sep 17 00:00:00 2001 From: Kirby Robinson Date: Thu, 7 Aug 2025 11:14:25 -0400 Subject: [PATCH 4/6] compute trace feature --- dream_layer_frontend/package-lock.json | 2 +- dream_layer_frontend/package.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dream_layer_frontend/package-lock.json b/dream_layer_frontend/package-lock.json index b21a16d0..4f315808 100644 --- a/dream_layer_frontend/package-lock.json +++ b/dream_layer_frontend/package-lock.json @@ -64,7 +64,7 @@ "@eslint/js": "^9.9.0", "@tailwindcss/typography": "^0.5.15", "@types/node": "^22.5.5", - "@types/react": "^18.3.3", + "@types/react": "^18.3.23", "@types/react-dom": "^18.3.0", "@vitejs/plugin-react-swc": "^3.5.0", "autoprefixer": "^10.4.20", diff --git a/dream_layer_frontend/package.json b/dream_layer_frontend/package.json index ab706875..d0e4409a 100644 --- a/dream_layer_frontend/package.json +++ b/dream_layer_frontend/package.json @@ -67,7 +67,7 @@ "@eslint/js": "^9.9.0", "@tailwindcss/typography": "^0.5.15", "@types/node": "^22.5.5", - "@types/react": "^18.3.3", + "@types/react": "^18.3.23", "@types/react-dom": "^18.3.0", "@vitejs/plugin-react-swc": "^3.5.0", "autoprefixer": "^10.4.20", From 0834ae791f81a61d6d3d512064e07a98dfaad850 Mon Sep 17 00:00:00 2001 From: Kirby Robinson Date: Thu, 7 Aug 2025 11:50:07 -0400 Subject: [PATCH 5/6] add variance data to csv output --- ComfyUI/execution.py | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/ComfyUI/execution.py b/ComfyUI/execution.py index 7fa239df..4e26da8e 100644 --- a/ComfyUI/execution.py +++ b/ComfyUI/execution.py @@ -358,8 +358,27 @@ def pre_execute_cb(call_index): end_time = time.perf_counter() elapsed_time = end_time - start_time - with open("inference_trace.csv", "a") as f: + csv_path = "inference_trace.csv" + with open(csv_path, "a") as f: f.write(f"{unique_id},{elapsed_time:.4f},{gpu_name},{gpu_driver}\n") + + # Check if this is the last node execution, then calculate variance + if execution_list.is_empty(): + import pandas as pd + import numpy as np + + try: + df = pd.read_csv(csv_path, names=["node_id", "inference_time", "gpu_name", "gpu_driver"]) + mean = np.mean(df["inference_time"]) + stddev = np.std(df["inference_time"]) + tolerance = stddev / mean + + with open(csv_path, "a") as f: + f.write(f"mean,{mean:.4f},{gpu_name},{gpu_driver}\n") + f.write(f"stddev,{stddev:.4f},{gpu_name},{gpu_driver}\n") + f.write(f"tolerance,{tolerance:.4f},{gpu_name},{gpu_driver}\n") + except Exception as e: + logging.error(f"Failed to append variance to CSV: {e}") # TIMING END except Exception as e: From 67946c4574971e61be79a53173f010a82cb378f8 Mon Sep 17 00:00:00 2001 From: Kirby Robinson Date: Thu, 7 Aug 2025 14:16:30 -0400 Subject: [PATCH 6/6] Add cleanup for fetch and improve error handling, Improve CSV file handling with headers, error handling, and thread safety, Fix star imports and add error handling for NVML initialization, Fix the CSV reading to match the format written by execution.py, Return a structured error message or dictionary instead, Use file locking --- .../routes/internal/internal_routes.py | 9 +- ComfyUI/execution.py | 93 +++++++++++++++---- .../src/components/InferenceBadge.tsx | 22 ++++- 3 files changed, 101 insertions(+), 23 deletions(-) diff --git a/ComfyUI/api_server/routes/internal/internal_routes.py b/ComfyUI/api_server/routes/internal/internal_routes.py index f0225798..507f310d 100644 --- a/ComfyUI/api_server/routes/internal/internal_routes.py +++ b/ComfyUI/api_server/routes/internal/internal_routes.py @@ -74,10 +74,15 @@ async def get_inference_badge(request): times = [] gpu_name = "Unknown" + # Read CSV with explicit fieldnames to match writing format (no header) with open(csv_path, newline='') as csvfile: - reader = csv.DictReader(csvfile) + reader = csv.DictReader(csvfile, fieldnames=["node_id", "wall_time_sec", "gpu_name", "gpu_driver"]) for row in reader: - times.append(float(row["wall_time_sec"])) + # Skip summary rows if present + try: + times.append(float(row["wall_time_sec"])) + except ValueError: + continue gpu_name = row["gpu_name"] if not times: diff --git a/ComfyUI/execution.py b/ComfyUI/execution.py index 4e26da8e..a1d8542e 100644 --- a/ComfyUI/execution.py +++ b/ComfyUI/execution.py @@ -5,6 +5,7 @@ import heapq import time import traceback +import os from enum import Enum import inspect from typing import List, Literal, NamedTuple, Optional @@ -17,14 +18,6 @@ from comfy_execution.graph_utils import is_link, GraphBuilder from comfy_execution.caching import HierarchicalCache, LRUCache, DependencyAwareCache, CacheKeySetInputSignature, CacheKeySetID from comfy_execution.validation import validate_node_input - -from pynvml import * - -nvmlInit() -handle = nvmlDeviceGetHandleByIndex(0) -gpu_name = nvmlDeviceGetName(handle).decode() -gpu_driver = nvmlSystemGetDriverVersion().decode() - class ExecutionResult(Enum): SUCCESS = 0 FAILURE = 1 @@ -273,6 +266,18 @@ def format_value(x): return x else: return str(x) + +def get_gpu_info(): + try: + import pynvml + pynvml.nvmlInit() + handle = pynvml.nvmlDeviceGetHandleByIndex(0) + gpu_name = pynvml.nvmlDeviceGetName(handle).decode() + gpu_driver = pynvml.nvmlSystemGetDriverVersion().decode() + except Exception: + gpu_name = "Unknown" + gpu_driver = "Unknown" + return gpu_name, gpu_driver def execute(server, dynprompt, caches, current_item, extra_data, executed, prompt_id, execution_list, pending_subgraph_results): unique_id = current_item @@ -358,9 +363,56 @@ def pre_execute_cb(call_index): end_time = time.perf_counter() elapsed_time = end_time - start_time - csv_path = "inference_trace.csv" - with open(csv_path, "a") as f: - f.write(f"{unique_id},{elapsed_time:.4f},{gpu_name},{gpu_driver}\n") + CSV_LOCK = threading.Lock() + CSV_PATH = "inference_trace.csv" + CSV_HEADER = ["node_id", "inference_time", "gpu_name", "gpu_driver"] + MAX_CSV_ROWS = 10000 # Adjust as needed + + def write_inference_trace(unique_id, elapsed_time, gpu_name, gpu_driver): + try: + with CSV_LOCK: + file_exists = os.path.exists(CSV_PATH) + # Read existing rows if file exists + rows = [] + if file_exists: + with open(CSV_PATH, "r") as f: + rows = f.readlines() + # Truncate if too many rows + if len(rows) >= MAX_CSV_ROWS: + rows = rows[-(MAX_CSV_ROWS - 1):] # Keep last N-1 rows + # Write header if file is new/empty + with open(CSV_PATH, "w") as f: + if not file_exists or (file_exists and len(rows) == 0): + f.write(",".join(CSV_HEADER) + "\n") + for row in rows: + f.write(row) + # Write new row + f.write(f"{unique_id},{elapsed_time:.4f},{gpu_name},{gpu_driver}\n") + + except Exception as e: + # Restore detailed exception handling and OOM detection + logging.error(f"Exception during execution of node {unique_id}: {e}") + tb_str = traceback.format_exc() + error_type = type(e).__name__ + error_message = str(e) + + # Special handling for CUDA OOM errors + is_oom = False + if "CUDA out of memory" in error_message or "CUDNN_STATUS_NOT_SUPPORTED" in error_message: + error_type = "OutOfMemoryError" + is_oom = True + + error_info = { + "error_type": error_type, + "error_message": error_message, + "traceback": tb_str, + "is_oom": is_oom, + "exception_object": repr(e), # For debugging, but not for serialization + } + return (ExecutionResult.FAILURE, error_info, None) + + # Usage in your execute() function: + write_inference_trace(unique_id, elapsed_time, gpu_name, gpu_driver) # Check if this is the last node execution, then calculate variance if execution_list.is_empty(): @@ -368,22 +420,29 @@ def pre_execute_cb(call_index): import numpy as np try: - df = pd.read_csv(csv_path, names=["node_id", "inference_time", "gpu_name", "gpu_driver"]) + df = pd.read_csv(CSV_PATH, names=["node_id", "inference_time", "gpu_name", "gpu_driver"]) mean = np.mean(df["inference_time"]) stddev = np.std(df["inference_time"]) tolerance = stddev / mean - with open(csv_path, "a") as f: - f.write(f"mean,{mean:.4f},{gpu_name},{gpu_driver}\n") - f.write(f"stddev,{stddev:.4f},{gpu_name},{gpu_driver}\n") - f.write(f"tolerance,{tolerance:.4f},{gpu_name},{gpu_driver}\n") + with CSV_LOCK: + with open(CSV_LOCK, "a") as f: + f.write(f"mean,{mean:.4f},{gpu_name},{gpu_driver}\n") + f.write(f"stddev,{stddev:.4f},{gpu_name},{gpu_driver}\n") + f.write(f"tolerance,{tolerance:.4f},{gpu_name},{gpu_driver}\n") except Exception as e: logging.error(f"Failed to append variance to CSV: {e}") # TIMING END except Exception as e: logging.error(f"Exception during execution of node {unique_id}: {e}") - return (ExecutionResult.FAILURE, e, e) + # Return a serializable error structure instead of the exception object + error_info = { + "error_type": type(e).__name__, + "error_message": str(e), + "traceback": traceback.format_exc() + } + return (ExecutionResult.FAILURE, error_info, None) caches.outputs.set(unique_id, output_data) caches.ui.set(unique_id, {"output": output_ui}) diff --git a/dream_layer_frontend/src/components/InferenceBadge.tsx b/dream_layer_frontend/src/components/InferenceBadge.tsx index add1c0ab..1ed9bea9 100644 --- a/dream_layer_frontend/src/components/InferenceBadge.tsx +++ b/dream_layer_frontend/src/components/InferenceBadge.tsx @@ -1,16 +1,30 @@ import { useEffect, useState } from "react"; +type BadgeResponse = { + badge?: string; +}; + const InferenceBadge = () => { - const [badge, setBadge] = useState("⏱ Loading..."); + const [badge, setBadge] = useState("⏱ Loading..."); useEffect(() => { - fetch("/internal/inference/badge") - .then((res) => res.json()) - .then((data) => setBadge(data.badge || "N/A")) + const controller = new AbortController(); + + fetch("/internal/inference/badge", { signal: controller.signal }) + .then((res) => { + if (!res.ok) throw new Error(`HTTP error ${res.status}`); + return res.json() as Promise; + }) + .then((data) => setBadge(data.badge ?? "N/A")) .catch((err) => { + if (err.name === "AbortError") return; console.error("Badge fetch failed", err); setBadge("⚠️ Error"); }); + + return () => { + controller.abort(); + }; }, []); return (