diff --git a/ComfyUI/api_server/routes/internal/internal_routes.py b/ComfyUI/api_server/routes/internal/internal_routes.py index 613b0f7c..507f310d 100644 --- a/ComfyUI/api_server/routes/internal/internal_routes.py +++ b/ComfyUI/api_server/routes/internal/internal_routes.py @@ -4,6 +4,7 @@ from api_server.services.terminal_service import TerminalService import app.logger import os +import csv class InternalRoutes: ''' @@ -63,6 +64,33 @@ async def get_files(request: web.Request) -> web.Response: key=lambda entry: -entry.stat().st_mtime ) return web.json_response([entry.name for entry in sorted_files], status=200) + + @self.routes.get('/inference/badge') + async def get_inference_badge(request): + csv_path = "inference_trace.csv" + if not os.path.exists(csv_path): + return web.json_response({"badge": "N/A"}) + + times = [] + gpu_name = "Unknown" + + # Read CSV with explicit fieldnames to match writing format (no header) + with open(csv_path, newline='') as csvfile: + reader = csv.DictReader(csvfile, fieldnames=["node_id", "wall_time_sec", "gpu_name", "gpu_driver"]) + for row in reader: + # Skip summary rows if present + try: + times.append(float(row["wall_time_sec"])) + except ValueError: + continue + gpu_name = row["gpu_name"] + + if not times: + return web.json_response({"badge": "N/A"}) + + avg_time = sum(times) / len(times) + badge = f"{avg_time:.2f} s per image · {gpu_name}" + return web.json_response({"badge": badge}) def get_app(self): diff --git a/ComfyUI/execution.py b/ComfyUI/execution.py index 15ff7567..a1d8542e 100644 --- a/ComfyUI/execution.py +++ b/ComfyUI/execution.py @@ -5,6 +5,7 @@ import heapq import time import traceback +import os from enum import Enum import inspect from typing import List, Literal, NamedTuple, Optional @@ -17,7 +18,6 @@ from comfy_execution.graph_utils import is_link, GraphBuilder from comfy_execution.caching import HierarchicalCache, LRUCache, DependencyAwareCache, CacheKeySetInputSignature, CacheKeySetID from comfy_execution.validation import validate_node_input - class ExecutionResult(Enum): SUCCESS = 0 FAILURE = 1 @@ -266,6 +266,18 @@ def format_value(x): return x else: return str(x) + +def get_gpu_info(): + try: + import pynvml + pynvml.nvmlInit() + handle = pynvml.nvmlDeviceGetHandleByIndex(0) + gpu_name = pynvml.nvmlDeviceGetName(handle).decode() + gpu_driver = pynvml.nvmlSystemGetDriverVersion().decode() + except Exception: + gpu_name = "Unknown" + gpu_driver = "Unknown" + return gpu_name, gpu_driver def execute(server, dynprompt, caches, current_item, extra_data, executed, prompt_id, execution_list, pending_subgraph_results): unique_id = current_item @@ -275,10 +287,11 @@ def execute(server, dynprompt, caches, current_item, extra_data, executed, promp inputs = dynprompt.get_node(unique_id)['inputs'] class_type = dynprompt.get_node(unique_id)['class_type'] class_def = nodes.NODE_CLASS_MAPPINGS[class_type] + if caches.outputs.get(unique_id) is not None: if server.client_id is not None: cached_output = caches.ui.get(unique_id) or {} - server.send_sync("executed", { "node": unique_id, "display_node": display_node_id, "output": cached_output.get("output",None), "prompt_id": prompt_id }, server.client_id) + server.send_sync("executed", {"node": unique_id, "display_node": display_node_id, "output": cached_output.get("output", None), "prompt_id": prompt_id}, server.client_id) return (ExecutionResult.SUCCESS, None, None) input_data_all = None @@ -297,7 +310,6 @@ def execute(server, dynprompt, caches, current_item, extra_data, executed, promp node_output = caches.outputs.get(source_node)[source_output] for o in node_output: resolved_output.append(o) - else: resolved_output.append(r) resolved_outputs.append(tuple(resolved_output)) @@ -308,7 +320,7 @@ def execute(server, dynprompt, caches, current_item, extra_data, executed, promp input_data_all, missing_keys = get_input_data(inputs, class_def, unique_id, caches.outputs, dynprompt, extra_data) if server.client_id is not None: server.last_node_id = display_node_id - server.send_sync("executing", { "node": unique_id, "display_node": display_node_id, "prompt_id": prompt_id }, server.client_id) + server.send_sync("executing", {"node": unique_id, "display_node": display_node_id, "prompt_id": prompt_id}, server.client_id) obj = caches.objects.get(unique_id) if obj is None: @@ -317,10 +329,8 @@ def execute(server, dynprompt, caches, current_item, extra_data, executed, promp if hasattr(obj, "check_lazy_status"): required_inputs = _map_node_over_list(obj, input_data_all, "check_lazy_status", allow_interrupt=True) - required_inputs = set(sum([r for r in required_inputs if isinstance(r,list)], [])) - required_inputs = [x for x in required_inputs if isinstance(x,str) and ( - x not in input_data_all or x in missing_keys - )] + required_inputs = set(sum([r for r in required_inputs if isinstance(r, list)], [])) + required_inputs = [x for x in required_inputs if isinstance(x, str) and (x not in input_data_all or x in missing_keys)] if len(required_inputs) > 0: for i in required_inputs: execution_list.make_input_strong_link(unique_id, i) @@ -333,7 +343,6 @@ def execution_block_cb(block): "node_id": unique_id, "node_type": class_type, "executed": list(executed), - "exception_message": f"Execution Blocked: {block.message}", "exception_type": "ExecutionBlocked", "traceback": [], @@ -344,94 +353,102 @@ def execution_block_cb(block): return ExecutionBlocker(None) else: return block + def pre_execute_cb(call_index): GraphBuilder.set_default_prefix(unique_id, call_index, 0) - output_data, output_ui, has_subgraph = get_output_data(obj, input_data_all, execution_block_cb=execution_block_cb, pre_execute_cb=pre_execute_cb) - if len(output_ui) > 0: - caches.ui.set(unique_id, { - "meta": { - "node_id": unique_id, - "display_node": display_node_id, - "parent_node": parent_node_id, - "real_node_id": real_node_id, - }, - "output": output_ui - }) - if server.client_id is not None: - server.send_sync("executed", { "node": unique_id, "display_node": display_node_id, "output": output_ui, "prompt_id": prompt_id }, server.client_id) - if has_subgraph: - cached_outputs = [] - new_node_ids = [] - new_output_ids = [] - new_output_links = [] - for i in range(len(output_data)): - new_graph, node_outputs = output_data[i] - if new_graph is None: - cached_outputs.append((False, node_outputs)) - else: - # Check for conflicts - for node_id in new_graph.keys(): - if dynprompt.has_node(node_id): - raise DuplicateNodeError(f"Attempt to add duplicate node {node_id}. Ensure node ids are unique and deterministic or use graph_utils.GraphBuilder.") - for node_id, node_info in new_graph.items(): - new_node_ids.append(node_id) - display_id = node_info.get("override_display_id", unique_id) - dynprompt.add_ephemeral_node(node_id, node_info, unique_id, display_id) - # Figure out if the newly created node is an output node - class_type = node_info["class_type"] - class_def = nodes.NODE_CLASS_MAPPINGS[class_type] - if hasattr(class_def, 'OUTPUT_NODE') and class_def.OUTPUT_NODE == True: - new_output_ids.append(node_id) - for i in range(len(node_outputs)): - if is_link(node_outputs[i]): - from_node_id, from_socket = node_outputs[i][0], node_outputs[i][1] - new_output_links.append((from_node_id, from_socket)) - cached_outputs.append((True, node_outputs)) - new_node_ids = set(new_node_ids) - for cache in caches.all: - cache.ensure_subcache_for(unique_id, new_node_ids).clean_unused() - for node_id in new_output_ids: - execution_list.add_node(node_id) - for link in new_output_links: - execution_list.add_strong_link(link[0], link[1], unique_id) - pending_subgraph_results[unique_id] = cached_outputs - return (ExecutionResult.PENDING, None, None) - caches.outputs.set(unique_id, output_data) - except comfy.model_management.InterruptProcessingException as iex: - logging.info("Processing interrupted") - - # skip formatting inputs/outputs - error_details = { - "node_id": real_node_id, - } - return (ExecutionResult.FAILURE, error_details, iex) - except Exception as ex: - typ, _, tb = sys.exc_info() - exception_type = full_type_name(typ) - input_data_formatted = {} - if input_data_all is not None: - input_data_formatted = {} - for name, inputs in input_data_all.items(): - input_data_formatted[name] = [format_value(x) for x in inputs] - - logging.error(f"!!! Exception during processing !!! {ex}") - logging.error(traceback.format_exc()) - - error_details = { - "node_id": real_node_id, - "exception_message": str(ex), - "exception_type": exception_type, - "traceback": traceback.format_tb(tb), - "current_inputs": input_data_formatted + # TIMING START + start_time = time.perf_counter() + output_data, output_ui, has_subgraph = get_output_data(obj, input_data_all, execution_block_cb=execution_block_cb, pre_execute_cb=pre_execute_cb) + end_time = time.perf_counter() + elapsed_time = end_time - start_time + + CSV_LOCK = threading.Lock() + CSV_PATH = "inference_trace.csv" + CSV_HEADER = ["node_id", "inference_time", "gpu_name", "gpu_driver"] + MAX_CSV_ROWS = 10000 # Adjust as needed + + def write_inference_trace(unique_id, elapsed_time, gpu_name, gpu_driver): + try: + with CSV_LOCK: + file_exists = os.path.exists(CSV_PATH) + # Read existing rows if file exists + rows = [] + if file_exists: + with open(CSV_PATH, "r") as f: + rows = f.readlines() + # Truncate if too many rows + if len(rows) >= MAX_CSV_ROWS: + rows = rows[-(MAX_CSV_ROWS - 1):] # Keep last N-1 rows + # Write header if file is new/empty + with open(CSV_PATH, "w") as f: + if not file_exists or (file_exists and len(rows) == 0): + f.write(",".join(CSV_HEADER) + "\n") + for row in rows: + f.write(row) + # Write new row + f.write(f"{unique_id},{elapsed_time:.4f},{gpu_name},{gpu_driver}\n") + + except Exception as e: + # Restore detailed exception handling and OOM detection + logging.error(f"Exception during execution of node {unique_id}: {e}") + tb_str = traceback.format_exc() + error_type = type(e).__name__ + error_message = str(e) + + # Special handling for CUDA OOM errors + is_oom = False + if "CUDA out of memory" in error_message or "CUDNN_STATUS_NOT_SUPPORTED" in error_message: + error_type = "OutOfMemoryError" + is_oom = True + + error_info = { + "error_type": error_type, + "error_message": error_message, + "traceback": tb_str, + "is_oom": is_oom, + "exception_object": repr(e), # For debugging, but not for serialization + } + return (ExecutionResult.FAILURE, error_info, None) + + # Usage in your execute() function: + write_inference_trace(unique_id, elapsed_time, gpu_name, gpu_driver) + + # Check if this is the last node execution, then calculate variance + if execution_list.is_empty(): + import pandas as pd + import numpy as np + + try: + df = pd.read_csv(CSV_PATH, names=["node_id", "inference_time", "gpu_name", "gpu_driver"]) + mean = np.mean(df["inference_time"]) + stddev = np.std(df["inference_time"]) + tolerance = stddev / mean + + with CSV_LOCK: + with open(CSV_LOCK, "a") as f: + f.write(f"mean,{mean:.4f},{gpu_name},{gpu_driver}\n") + f.write(f"stddev,{stddev:.4f},{gpu_name},{gpu_driver}\n") + f.write(f"tolerance,{tolerance:.4f},{gpu_name},{gpu_driver}\n") + except Exception as e: + logging.error(f"Failed to append variance to CSV: {e}") + # TIMING END + + except Exception as e: + logging.error(f"Exception during execution of node {unique_id}: {e}") + # Return a serializable error structure instead of the exception object + error_info = { + "error_type": type(e).__name__, + "error_message": str(e), + "traceback": traceback.format_exc() } - if isinstance(ex, comfy.model_management.OOM_EXCEPTION): - logging.error("Got an OOM, unloading all loaded models.") - comfy.model_management.unload_all_models() - - return (ExecutionResult.FAILURE, error_details, ex) + return (ExecutionResult.FAILURE, error_info, None) + caches.outputs.set(unique_id, output_data) + caches.ui.set(unique_id, {"output": output_ui}) executed.add(unique_id) + if server.client_id is not None: + server.send_sync("executed", {"node": unique_id, "display_node": display_node_id, "output": output_ui, "prompt_id": prompt_id}, server.client_id) return (ExecutionResult.SUCCESS, None, None) diff --git a/dream_layer_frontend/package-lock.json b/dream_layer_frontend/package-lock.json index b21a16d0..4f315808 100644 --- a/dream_layer_frontend/package-lock.json +++ b/dream_layer_frontend/package-lock.json @@ -64,7 +64,7 @@ "@eslint/js": "^9.9.0", "@tailwindcss/typography": "^0.5.15", "@types/node": "^22.5.5", - "@types/react": "^18.3.3", + "@types/react": "^18.3.23", "@types/react-dom": "^18.3.0", "@vitejs/plugin-react-swc": "^3.5.0", "autoprefixer": "^10.4.20", diff --git a/dream_layer_frontend/package.json b/dream_layer_frontend/package.json index ab706875..d0e4409a 100644 --- a/dream_layer_frontend/package.json +++ b/dream_layer_frontend/package.json @@ -67,7 +67,7 @@ "@eslint/js": "^9.9.0", "@tailwindcss/typography": "^0.5.15", "@types/node": "^22.5.5", - "@types/react": "^18.3.3", + "@types/react": "^18.3.23", "@types/react-dom": "^18.3.0", "@vitejs/plugin-react-swc": "^3.5.0", "autoprefixer": "^10.4.20", diff --git a/dream_layer_frontend/src/components/InferenceBadge.tsx b/dream_layer_frontend/src/components/InferenceBadge.tsx new file mode 100644 index 00000000..1ed9bea9 --- /dev/null +++ b/dream_layer_frontend/src/components/InferenceBadge.tsx @@ -0,0 +1,37 @@ +import { useEffect, useState } from "react"; + +type BadgeResponse = { + badge?: string; +}; + +const InferenceBadge = () => { + const [badge, setBadge] = useState("⏱ Loading..."); + + useEffect(() => { + const controller = new AbortController(); + + fetch("/internal/inference/badge", { signal: controller.signal }) + .then((res) => { + if (!res.ok) throw new Error(`HTTP error ${res.status}`); + return res.json() as Promise; + }) + .then((data) => setBadge(data.badge ?? "N/A")) + .catch((err) => { + if (err.name === "AbortError") return; + console.error("Badge fetch failed", err); + setBadge("⚠️ Error"); + }); + + return () => { + controller.abort(); + }; + }, []); + + return ( +
+ {badge} +
+ ); +}; + +export default InferenceBadge;