-
Notifications
You must be signed in to change notification settings - Fork 210
Feature/compute trace #88
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
ff6bf03
0372284
1b975b4
932d194
0834ae7
67946c4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,6 +4,7 @@ | |
| from api_server.services.terminal_service import TerminalService | ||
| import app.logger | ||
| import os | ||
| import csv | ||
|
|
||
| class InternalRoutes: | ||
| ''' | ||
|
|
@@ -63,6 +64,33 @@ async def get_files(request: web.Request) -> web.Response: | |
| key=lambda entry: -entry.stat().st_mtime | ||
| ) | ||
| return web.json_response([entry.name for entry in sorted_files], status=200) | ||
|
|
||
| @self.routes.get('/inference/badge') | ||
| async def get_inference_badge(request): | ||
| csv_path = "inference_trace.csv" | ||
| if not os.path.exists(csv_path): | ||
| return web.json_response({"badge": "N/A"}) | ||
|
|
||
| times = [] | ||
| gpu_name = "Unknown" | ||
|
|
||
| # Read CSV with explicit fieldnames to match writing format (no header) | ||
| with open(csv_path, newline='') as csvfile: | ||
| reader = csv.DictReader(csvfile, fieldnames=["node_id", "wall_time_sec", "gpu_name", "gpu_driver"]) | ||
| for row in reader: | ||
| # Skip summary rows if present | ||
| try: | ||
|
Comment on lines
+79
to
+82
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Field-name mismatch with writer Writer uses 🤖 Prompt for AI Agents |
||
| times.append(float(row["wall_time_sec"])) | ||
| except ValueError: | ||
| continue | ||
| gpu_name = row["gpu_name"] | ||
|
|
||
| if not times: | ||
| return web.json_response({"badge": "N/A"}) | ||
|
|
||
| avg_time = sum(times) / len(times) | ||
| badge = f"{avg_time:.2f} s per image · {gpu_name}" | ||
| return web.json_response({"badge": badge}) | ||
|
Comment on lines
+68
to
+93
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. security (python.lang.maintainability.useless-inner-function): function Source: opengrep |
||
|
|
||
|
|
||
| def get_app(self): | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,6 +5,7 @@ | |
| import heapq | ||
| import time | ||
| import traceback | ||
| import os | ||
| from enum import Enum | ||
| import inspect | ||
| from typing import List, Literal, NamedTuple, Optional | ||
|
|
@@ -17,7 +18,6 @@ | |
| from comfy_execution.graph_utils import is_link, GraphBuilder | ||
| from comfy_execution.caching import HierarchicalCache, LRUCache, DependencyAwareCache, CacheKeySetInputSignature, CacheKeySetID | ||
| from comfy_execution.validation import validate_node_input | ||
|
|
||
| class ExecutionResult(Enum): | ||
| SUCCESS = 0 | ||
| FAILURE = 1 | ||
|
|
@@ -266,6 +266,18 @@ def format_value(x): | |
| return x | ||
| else: | ||
| return str(x) | ||
|
|
||
| def get_gpu_info(): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue: get_gpu_info() is defined but never called in the new code. If you intend to record GPU info in the CSV, call this function and use its output. Otherwise, consider removing it as unused code. |
||
| try: | ||
| import pynvml | ||
| pynvml.nvmlInit() | ||
| handle = pynvml.nvmlDeviceGetHandleByIndex(0) | ||
| gpu_name = pynvml.nvmlDeviceGetName(handle).decode() | ||
| gpu_driver = pynvml.nvmlSystemGetDriverVersion().decode() | ||
| except Exception: | ||
| gpu_name = "Unknown" | ||
| gpu_driver = "Unknown" | ||
| return gpu_name, gpu_driver | ||
|
Comment on lines
+270
to
+280
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion
The helper returns GPU data but nothing assigns + gpu_name, gpu_driver = get_gpu_info()Place this immediately before
🧰 Tools🪛 Ruff (0.12.2)272-272: Trailing whitespace Remove trailing whitespace (W291) 🤖 Prompt for AI Agents |
||
|
|
||
| def execute(server, dynprompt, caches, current_item, extra_data, executed, prompt_id, execution_list, pending_subgraph_results): | ||
| unique_id = current_item | ||
|
|
@@ -275,10 +287,11 @@ def execute(server, dynprompt, caches, current_item, extra_data, executed, promp | |
| inputs = dynprompt.get_node(unique_id)['inputs'] | ||
| class_type = dynprompt.get_node(unique_id)['class_type'] | ||
| class_def = nodes.NODE_CLASS_MAPPINGS[class_type] | ||
|
|
||
| if caches.outputs.get(unique_id) is not None: | ||
| if server.client_id is not None: | ||
| cached_output = caches.ui.get(unique_id) or {} | ||
| server.send_sync("executed", { "node": unique_id, "display_node": display_node_id, "output": cached_output.get("output",None), "prompt_id": prompt_id }, server.client_id) | ||
| server.send_sync("executed", {"node": unique_id, "display_node": display_node_id, "output": cached_output.get("output", None), "prompt_id": prompt_id}, server.client_id) | ||
| return (ExecutionResult.SUCCESS, None, None) | ||
|
|
||
| input_data_all = None | ||
|
|
@@ -297,7 +310,6 @@ def execute(server, dynprompt, caches, current_item, extra_data, executed, promp | |
| node_output = caches.outputs.get(source_node)[source_output] | ||
| for o in node_output: | ||
| resolved_output.append(o) | ||
|
|
||
| else: | ||
| resolved_output.append(r) | ||
| resolved_outputs.append(tuple(resolved_output)) | ||
|
|
@@ -308,7 +320,7 @@ def execute(server, dynprompt, caches, current_item, extra_data, executed, promp | |
| input_data_all, missing_keys = get_input_data(inputs, class_def, unique_id, caches.outputs, dynprompt, extra_data) | ||
| if server.client_id is not None: | ||
| server.last_node_id = display_node_id | ||
| server.send_sync("executing", { "node": unique_id, "display_node": display_node_id, "prompt_id": prompt_id }, server.client_id) | ||
| server.send_sync("executing", {"node": unique_id, "display_node": display_node_id, "prompt_id": prompt_id}, server.client_id) | ||
|
|
||
| obj = caches.objects.get(unique_id) | ||
| if obj is None: | ||
|
|
@@ -317,10 +329,8 @@ def execute(server, dynprompt, caches, current_item, extra_data, executed, promp | |
|
|
||
| if hasattr(obj, "check_lazy_status"): | ||
| required_inputs = _map_node_over_list(obj, input_data_all, "check_lazy_status", allow_interrupt=True) | ||
| required_inputs = set(sum([r for r in required_inputs if isinstance(r,list)], [])) | ||
| required_inputs = [x for x in required_inputs if isinstance(x,str) and ( | ||
| x not in input_data_all or x in missing_keys | ||
| )] | ||
| required_inputs = set(sum([r for r in required_inputs if isinstance(r, list)], [])) | ||
| required_inputs = [x for x in required_inputs if isinstance(x, str) and (x not in input_data_all or x in missing_keys)] | ||
| if len(required_inputs) > 0: | ||
| for i in required_inputs: | ||
| execution_list.make_input_strong_link(unique_id, i) | ||
|
|
@@ -333,7 +343,6 @@ def execution_block_cb(block): | |
| "node_id": unique_id, | ||
| "node_type": class_type, | ||
| "executed": list(executed), | ||
|
|
||
| "exception_message": f"Execution Blocked: {block.message}", | ||
| "exception_type": "ExecutionBlocked", | ||
| "traceback": [], | ||
|
|
@@ -344,94 +353,102 @@ def execution_block_cb(block): | |
| return ExecutionBlocker(None) | ||
| else: | ||
| return block | ||
|
|
||
| def pre_execute_cb(call_index): | ||
| GraphBuilder.set_default_prefix(unique_id, call_index, 0) | ||
| output_data, output_ui, has_subgraph = get_output_data(obj, input_data_all, execution_block_cb=execution_block_cb, pre_execute_cb=pre_execute_cb) | ||
| if len(output_ui) > 0: | ||
| caches.ui.set(unique_id, { | ||
| "meta": { | ||
| "node_id": unique_id, | ||
| "display_node": display_node_id, | ||
| "parent_node": parent_node_id, | ||
| "real_node_id": real_node_id, | ||
| }, | ||
| "output": output_ui | ||
| }) | ||
| if server.client_id is not None: | ||
| server.send_sync("executed", { "node": unique_id, "display_node": display_node_id, "output": output_ui, "prompt_id": prompt_id }, server.client_id) | ||
| if has_subgraph: | ||
| cached_outputs = [] | ||
| new_node_ids = [] | ||
| new_output_ids = [] | ||
| new_output_links = [] | ||
| for i in range(len(output_data)): | ||
| new_graph, node_outputs = output_data[i] | ||
| if new_graph is None: | ||
| cached_outputs.append((False, node_outputs)) | ||
| else: | ||
| # Check for conflicts | ||
| for node_id in new_graph.keys(): | ||
| if dynprompt.has_node(node_id): | ||
| raise DuplicateNodeError(f"Attempt to add duplicate node {node_id}. Ensure node ids are unique and deterministic or use graph_utils.GraphBuilder.") | ||
| for node_id, node_info in new_graph.items(): | ||
| new_node_ids.append(node_id) | ||
| display_id = node_info.get("override_display_id", unique_id) | ||
| dynprompt.add_ephemeral_node(node_id, node_info, unique_id, display_id) | ||
| # Figure out if the newly created node is an output node | ||
| class_type = node_info["class_type"] | ||
| class_def = nodes.NODE_CLASS_MAPPINGS[class_type] | ||
| if hasattr(class_def, 'OUTPUT_NODE') and class_def.OUTPUT_NODE == True: | ||
| new_output_ids.append(node_id) | ||
| for i in range(len(node_outputs)): | ||
| if is_link(node_outputs[i]): | ||
| from_node_id, from_socket = node_outputs[i][0], node_outputs[i][1] | ||
| new_output_links.append((from_node_id, from_socket)) | ||
| cached_outputs.append((True, node_outputs)) | ||
| new_node_ids = set(new_node_ids) | ||
| for cache in caches.all: | ||
| cache.ensure_subcache_for(unique_id, new_node_ids).clean_unused() | ||
| for node_id in new_output_ids: | ||
| execution_list.add_node(node_id) | ||
| for link in new_output_links: | ||
| execution_list.add_strong_link(link[0], link[1], unique_id) | ||
| pending_subgraph_results[unique_id] = cached_outputs | ||
| return (ExecutionResult.PENDING, None, None) | ||
| caches.outputs.set(unique_id, output_data) | ||
| except comfy.model_management.InterruptProcessingException as iex: | ||
| logging.info("Processing interrupted") | ||
|
|
||
| # skip formatting inputs/outputs | ||
| error_details = { | ||
| "node_id": real_node_id, | ||
| } | ||
|
|
||
| return (ExecutionResult.FAILURE, error_details, iex) | ||
| except Exception as ex: | ||
| typ, _, tb = sys.exc_info() | ||
| exception_type = full_type_name(typ) | ||
| input_data_formatted = {} | ||
| if input_data_all is not None: | ||
| input_data_formatted = {} | ||
| for name, inputs in input_data_all.items(): | ||
| input_data_formatted[name] = [format_value(x) for x in inputs] | ||
|
|
||
| logging.error(f"!!! Exception during processing !!! {ex}") | ||
| logging.error(traceback.format_exc()) | ||
|
|
||
| error_details = { | ||
| "node_id": real_node_id, | ||
| "exception_message": str(ex), | ||
| "exception_type": exception_type, | ||
| "traceback": traceback.format_tb(tb), | ||
| "current_inputs": input_data_formatted | ||
| # TIMING START | ||
| start_time = time.perf_counter() | ||
| output_data, output_ui, has_subgraph = get_output_data(obj, input_data_all, execution_block_cb=execution_block_cb, pre_execute_cb=pre_execute_cb) | ||
| end_time = time.perf_counter() | ||
| elapsed_time = end_time - start_time | ||
|
|
||
| CSV_LOCK = threading.Lock() | ||
| CSV_PATH = "inference_trace.csv" | ||
| CSV_HEADER = ["node_id", "inference_time", "gpu_name", "gpu_driver"] | ||
| MAX_CSV_ROWS = 10000 # Adjust as needed | ||
|
|
||
| def write_inference_trace(unique_id, elapsed_time, gpu_name, gpu_driver): | ||
| try: | ||
| with CSV_LOCK: | ||
| file_exists = os.path.exists(CSV_PATH) | ||
| # Read existing rows if file exists | ||
| rows = [] | ||
| if file_exists: | ||
| with open(CSV_PATH, "r") as f: | ||
| rows = f.readlines() | ||
| # Truncate if too many rows | ||
| if len(rows) >= MAX_CSV_ROWS: | ||
| rows = rows[-(MAX_CSV_ROWS - 1):] # Keep last N-1 rows | ||
| # Write header if file is new/empty | ||
| with open(CSV_PATH, "w") as f: | ||
| if not file_exists or (file_exists and len(rows) == 0): | ||
| f.write(",".join(CSV_HEADER) + "\n") | ||
| for row in rows: | ||
| f.write(row) | ||
| # Write new row | ||
| f.write(f"{unique_id},{elapsed_time:.4f},{gpu_name},{gpu_driver}\n") | ||
|
|
||
| except Exception as e: | ||
| # Restore detailed exception handling and OOM detection | ||
| logging.error(f"Exception during execution of node {unique_id}: {e}") | ||
| tb_str = traceback.format_exc() | ||
| error_type = type(e).__name__ | ||
| error_message = str(e) | ||
|
|
||
| # Special handling for CUDA OOM errors | ||
| is_oom = False | ||
| if "CUDA out of memory" in error_message or "CUDNN_STATUS_NOT_SUPPORTED" in error_message: | ||
| error_type = "OutOfMemoryError" | ||
| is_oom = True | ||
|
|
||
| error_info = { | ||
| "error_type": error_type, | ||
| "error_message": error_message, | ||
| "traceback": tb_str, | ||
| "is_oom": is_oom, | ||
| "exception_object": repr(e), # For debugging, but not for serialization | ||
| } | ||
| return (ExecutionResult.FAILURE, error_info, None) | ||
|
|
||
| # Usage in your execute() function: | ||
| write_inference_trace(unique_id, elapsed_time, gpu_name, gpu_driver) | ||
|
|
||
| # Check if this is the last node execution, then calculate variance | ||
| if execution_list.is_empty(): | ||
| import pandas as pd | ||
| import numpy as np | ||
|
|
||
| try: | ||
| df = pd.read_csv(CSV_PATH, names=["node_id", "inference_time", "gpu_name", "gpu_driver"]) | ||
| mean = np.mean(df["inference_time"]) | ||
| stddev = np.std(df["inference_time"]) | ||
| tolerance = stddev / mean | ||
|
|
||
|
Comment on lines
+418
to
+427
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue (bug_risk): Variance calculation and CSV appending may cause file corruption due to incorrect file handle. CSV_LOCK is a threading.Lock, not a file path, so passing it to open() will cause an exception. Use CSV_PATH in the open() call instead. |
||
| with CSV_LOCK: | ||
| with open(CSV_LOCK, "a") as f: | ||
| f.write(f"mean,{mean:.4f},{gpu_name},{gpu_driver}\n") | ||
| f.write(f"stddev,{stddev:.4f},{gpu_name},{gpu_driver}\n") | ||
| f.write(f"tolerance,{tolerance:.4f},{gpu_name},{gpu_driver}\n") | ||
| except Exception as e: | ||
| logging.error(f"Failed to append variance to CSV: {e}") | ||
|
Comment on lines
+422
to
+434
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Undefined identifiers & summary write bug
🧰 Tools🪛 Ruff (0.12.2)430-430: Undefined name (F821) 430-430: Undefined name (F821) 431-431: Undefined name (F821) 431-431: Undefined name (F821) 432-432: Undefined name (F821) 432-432: Undefined name (F821) 🤖 Prompt for AI Agents |
||
| # TIMING END | ||
|
|
||
| except Exception as e: | ||
| logging.error(f"Exception during execution of node {unique_id}: {e}") | ||
| # Return a serializable error structure instead of the exception object | ||
| error_info = { | ||
| "error_type": type(e).__name__, | ||
| "error_message": str(e), | ||
| "traceback": traceback.format_exc() | ||
| } | ||
| if isinstance(ex, comfy.model_management.OOM_EXCEPTION): | ||
| logging.error("Got an OOM, unloading all loaded models.") | ||
| comfy.model_management.unload_all_models() | ||
|
|
||
| return (ExecutionResult.FAILURE, error_details, ex) | ||
| return (ExecutionResult.FAILURE, error_info, None) | ||
|
|
||
| caches.outputs.set(unique_id, output_data) | ||
| caches.ui.set(unique_id, {"output": output_ui}) | ||
| executed.add(unique_id) | ||
| if server.client_id is not None: | ||
| server.send_sync("executed", {"node": unique_id, "display_node": display_node_id, "output": output_ui, "prompt_id": prompt_id}, server.client_id) | ||
|
|
||
| return (ExecutionResult.SUCCESS, None, None) | ||
|
|
||
|
|
||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,37 @@ | ||
| import { useEffect, useState } from "react"; | ||
|
|
||
| type BadgeResponse = { | ||
| badge?: string; | ||
| }; | ||
|
|
||
| const InferenceBadge = () => { | ||
| const [badge, setBadge] = useState<string>("⏱ Loading..."); | ||
|
|
||
| useEffect(() => { | ||
| const controller = new AbortController(); | ||
|
|
||
| fetch("/internal/inference/badge", { signal: controller.signal }) | ||
| .then((res) => { | ||
| if (!res.ok) throw new Error(`HTTP error ${res.status}`); | ||
| return res.json() as Promise<BadgeResponse>; | ||
| }) | ||
| .then((data) => setBadge(data.badge ?? "N/A")) | ||
| .catch((err) => { | ||
| if (err.name === "AbortError") return; | ||
| console.error("Badge fetch failed", err); | ||
| setBadge("⚠️ Error"); | ||
| }); | ||
|
|
||
| return () => { | ||
| controller.abort(); | ||
| }; | ||
| }, []); | ||
|
|
||
| return ( | ||
| <div className="rounded bg-gray-100 px-3 py-1 text-sm text-gray-600 shadow-sm ml-4"> | ||
| {badge} | ||
| </div> | ||
| ); | ||
| }; | ||
|
|
||
| export default InferenceBadge; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (bug_risk): The badge endpoint averages all rows, including possible summary rows.
Explicitly filter out rows with node_id values like 'mean', 'stddev', or 'tolerance' to ensure summary statistics are not included in the average, rather than relying on catching ValueError.
Suggested implementation: