diff --git a/examples/models/qwen3/launch_distributed_worker.py b/examples/models/qwen3/launch_distributed_worker.py new file mode 100644 index 0000000..5648e23 --- /dev/null +++ b/examples/models/qwen3/launch_distributed_worker.py @@ -0,0 +1,16 @@ +#!/usr/bin/env python3 +"""Wrapper that launches torchrun under scalene. + +Called by ``test.sh --profile``, not directly. Scalene initializes in this +process and ``--profile-all`` causes ``redirect_python`` to replace +``python`` on PATH. When torchrun spawns workers via subprocess, they pick +up the redirected python and each gets its own dormant scalene instance +(``--off``). Only rank 0's KernelProfiler calls +``scalene_profiler.start()``/``stop()`` to activate CPU profiling. +""" + +import sys +import torch.distributed.run as torchrun_main + +if __name__ == "__main__": + sys.exit(torchrun_main.main()) diff --git a/examples/models/qwen3/profile.py b/examples/models/qwen3/profile.py new file mode 100644 index 0000000..2017264 --- /dev/null +++ b/examples/models/qwen3/profile.py @@ -0,0 +1,84 @@ +#!/usr/bin/env python3 +"""Profile Qwen3-30B with device tracing in a distributed (TP) setting. + +Run via test.sh:: + + bash test.sh --profile +""" + +import argparse +import os +import sys + +import torch.distributed as dist + +# Add model dir to path so we can import qwen3 modules +_MODEL_DIR = os.path.dirname(os.path.abspath(__file__)) +sys.path.insert(0, _MODEL_DIR) +os.chdir(_MODEL_DIR) + +from nkipy.tools.profiler import KernelProfiler +from qwen3 import Qwen3Model, load_model + + +def main(): + parser = argparse.ArgumentParser(description="Profile Qwen3 distributed") + parser.add_argument("-n", "--max-new-tokens", type=int, default=16) + parser.add_argument("prompt", nargs="?", default="The capital of France is") + parser.add_argument("--checkpoint", default="./tmp_qwen3-30b-a3b") + parser.add_argument("--model", default="Qwen/Qwen3-30B-A3B") + parser.add_argument( + "--profile-all-ranks", + action="store_true", + help="Profile all ranks (default: rank 0 only)", + ) + parser.add_argument( + "--output-dir", default=None, help="Output directory for profiles" + ) + parser.add_argument( + "--no-scalene", + action="store_true", + help="Disable scalene CPU profiling", + ) + args = parser.parse_args() + + output_dir = args.output_dir + if output_dir is None: + output_dir = os.path.join(os.path.dirname(os.path.abspath(__file__))) + + # load_model handles dist init, weight loading, kernel compilation, warmup + model, input_ids, tokenizer = load_model(args) + rank = dist.get_rank() + world_size = dist.get_world_size() + + target_ranks = list(range(world_size)) if args.profile_all_ranks else [0] + output_path = os.path.join(output_dir, "kernel_profile.json") + + # Scalene CPU profiling only on rank 0 (requires process to be launched + # under `scalene run --off`). Other ranks skip scalene entirely. + use_scalene = (not args.no_scalene) and (rank == 0) + + dist.barrier() + + # Each rank sees its core as local core 0 via NEURON_RT_VISIBLE_CORES + with KernelProfiler( + core_id=0, + scalene=use_scalene, + output_path=output_path, + target_ranks=target_ranks, + ): + t = 0 + for token_id in model.generate(input_ids): + t += 1 + output_id = token_id[0].tolist() + if output_id[-1] in [151643, 151645]: + break + if rank == 0: + print(tokenizer.decode(output_id), end="", flush=True) + + if rank == 0: + print(f"\nGenerated {t} tokens") + + +if __name__ == "__main__": + main() diff --git a/examples/models/qwen3/test.sh b/examples/models/qwen3/test.sh index 1e5951b..6d8df50 100755 --- a/examples/models/qwen3/test.sh +++ b/examples/models/qwen3/test.sh @@ -1,9 +1,21 @@ #!/bin/bash # Test script for Qwen3-30B-A3B on Trainium -# Usage: bash test.sh +# Usage: bash test.sh [--profile [--no-scalene]] set -e +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_DIR="$(cd "$SCRIPT_DIR/../../.." && pwd)" + +PROFILE="" +NO_SCALENE="" +for arg in "$@"; do + case $arg in + --profile) PROFILE=1 ;; + --no-scalene) NO_SCALENE=1 ;; + esac +done + echo "==========================================" echo "Qwen3-30B-A3B Test Script" echo "==========================================" @@ -29,15 +41,62 @@ else echo "✓ Weights found at $WEIGHTS_PATH" fi -# Step 3: Run example +# Ensure NRT inspect is disabled (conflicts with SystemTraceSession) +unset NEURON_RT_INSPECT_ENABLE NEURON_RT_INSPECT_OUTPUT_DIR 2>/dev/null || true + +# Step 3: Run echo "" -echo "[3/3] Running Qwen3 inference..." -echo "==========================================" +if [ -n "$PROFILE" ]; then + echo "[3/3] Running profiled inference (TP=$TP_DEGREE)..." + echo "==========================================" + + if [ -z "$NO_SCALENE" ]; then + # Scalene on rank 0 + device tracing + scalene run --off --profile-all --cpu-only \ + -o "$SCRIPT_DIR/scalene_profile.json" --- \ + "$SCRIPT_DIR/launch_distributed_worker.py" \ + --nproc-per-node "$TP_DEGREE" \ + "$SCRIPT_DIR/profile.py" \ + --output-dir "$SCRIPT_DIR" \ + --checkpoint "$WEIGHTS_PATH" --model Qwen/Qwen3-30B-A3B || true -# Enable async to improve performance -export NEURON_RT_ASYNC_EXEC_MAX_INFLIGHT_REQUESTS=16 -# export NEURON_LOGICAL_NC_CONFIG=1 -torchrun --nproc-per-node "$TP_DEGREE" qwen3.py -n 500 --checkpoint "$WEIGHTS_PATH" --model Qwen/Qwen3-30B-A3B + # Merge CPU + device profiles + cd "$REPO_DIR" + if [ -f "$SCRIPT_DIR/scalene_profile.json" ]; then + uv run python -m nkipy.tools.profiler \ + "$SCRIPT_DIR/scalene_profile.json" \ + "$SCRIPT_DIR/kernel_profile.json" \ + "$SCRIPT_DIR/merged_profile.json" + else + uv run python -m nkipy.tools.profiler --kernel-only \ + "$SCRIPT_DIR/kernel_profile.json" \ + "$SCRIPT_DIR/merged_profile.json" + fi + echo "" + echo "View: scalene view $SCRIPT_DIR/merged_profile.json" + else + # Device tracing only, no scalene + export NEURON_RT_ASYNC_EXEC_MAX_INFLIGHT_REQUESTS=16 + torchrun --nproc-per-node "$TP_DEGREE" \ + "$SCRIPT_DIR/profile.py" \ + --output-dir "$SCRIPT_DIR" --no-scalene \ + --checkpoint "$WEIGHTS_PATH" --model Qwen/Qwen3-30B-A3B + + cd "$REPO_DIR" + uv run python -m nkipy.tools.profiler --kernel-only \ + "$SCRIPT_DIR/kernel_profile.json" \ + "$SCRIPT_DIR/merged_profile.json" + echo "" + echo "View: scalene view $SCRIPT_DIR/merged_profile.json" + fi +else + echo "[3/3] Running Qwen3 inference..." + echo "==========================================" + + # Enable async to improve performance + export NEURON_RT_ASYNC_EXEC_MAX_INFLIGHT_REQUESTS=16 + torchrun --nproc-per-node "$TP_DEGREE" qwen3.py -n 500 --checkpoint "$WEIGHTS_PATH" --model Qwen/Qwen3-30B-A3B +fi echo "" echo "==========================================" diff --git a/examples/models/qwen3_embedding/profile.py b/examples/models/qwen3_embedding/profile.py new file mode 100644 index 0000000..b2e8bf1 --- /dev/null +++ b/examples/models/qwen3_embedding/profile.py @@ -0,0 +1,124 @@ +#!/usr/bin/env python3 +"""Profile Qwen3-Embedding with scalene CPU profiling + device tracing. + +Run via test.sh:: + + bash test.sh --profile +""" + +import argparse +import logging +import os +import sys +from pathlib import Path + +import numpy as np + +# Save original CWD for output path resolution, then chdir to model dir +# (config uses relative paths for weights_dir and build_dir) +_ORIG_CWD = os.getcwd() +_MODEL_DIR = str(Path(__file__).resolve().parent) +sys.path.insert(0, _MODEL_DIR) +os.chdir(_MODEL_DIR) + +from config import get_config +from embedding_utils import get_detailed_instruct +from model import Qwen3EmbeddingModel +from nkipy.tools.profiler import KernelProfiler +from prepare_weights import load_qwen3_weights +from transformers import AutoTokenizer + + +def main(): + # Resolve output path before chdir happens at import time + parser = argparse.ArgumentParser(description="Profile Qwen3-Embedding") + parser.add_argument( + "--model-size", + choices=["0.6b", "8b"], + default="0.6b", + help="Model size (default: 0.6b)", + ) + parser.add_argument("--lnc", type=int, choices=[1, 2], default=2) + parser.add_argument("--seq-len", type=int, default=None) + parser.add_argument( + "--output", + default="kernel_profile.json", + help="Kernel profile output path (default: kernel_profile.json)", + ) + parser.add_argument("--num-warmup", type=int, default=3) + parser.add_argument( + "--num-iterations", + type=int, + default=1, + help="Forward passes inside profiled region (default: 1)", + ) + parser.add_argument( + "--no-scalene", + action="store_true", + help="Disable scalene integration", + ) + args = parser.parse_args() + + # Resolve output path relative to original CWD (not the model dir) + output_path = Path(args.output) + if not output_path.is_absolute(): + output_path = Path(_ORIG_CWD) / output_path + + # Suppress verbose logging + logging.getLogger().setLevel(logging.ERROR) + + overrides = {} + if args.seq_len is not None: + overrides["max_model_len"] = args.seq_len + + config = get_config(args.model_size, **overrides) + + print(f"Model: {config.model_name}") + print(f"Sequence length: {config.max_model_len}") + print(f"LNC: {args.lnc}") + + # Load tokenizer and model + print("\nLoading tokenizer...") + tokenizer = AutoTokenizer.from_pretrained(config.model_name) + + print("Loading model and compiling kernels...") + weights = load_qwen3_weights(config.weights_path) + model = Qwen3EmbeddingModel(weights, config, lnc=args.lnc) + + # Prepare input + task = "Given a web search query, retrieve relevant passages that answer the query" + sample_text = get_detailed_instruct(task, "What is the capital of China?") + + batch_dict = tokenizer( + [sample_text], + padding="max_length", + truncation=True, + max_length=config.max_model_len, + return_tensors="np", + ) + input_ids = batch_dict["input_ids"].astype(np.uint32) + attention_mask = batch_dict["attention_mask"].astype(np.float32) + + # Warmup (outside profiler) + print(f"\nWarmup ({args.num_warmup} iterations)...") + for _ in range(args.num_warmup): + model.forward(input_ids, attention_mask) + + # Profiled forward pass(es) + n = args.num_iterations + print(f"\nRunning profiled forward pass ({n} iteration(s))...") + with KernelProfiler( + core_id=0, + scalene=not args.no_scalene, + output_path=output_path, + ) as profiler: + for _ in range(n): + model.forward(input_ids, attention_mask) + + result = profiler.result + print(f"\nDone. Captured {len(result.kernel_calls)} kernel calls.") + print(f"Kernel profile saved to: {output_path}") + + +if __name__ == "__main__": + main() diff --git a/examples/models/qwen3_embedding/test.sh b/examples/models/qwen3_embedding/test.sh old mode 100644 new mode 100755 index 6f497f7..742f9bb --- a/examples/models/qwen3_embedding/test.sh +++ b/examples/models/qwen3_embedding/test.sh @@ -1,10 +1,23 @@ #!/bin/bash # Test script for Qwen3-Embedding on Trainium -# Usage: bash test.sh [0.6b|8b] +# Usage: bash test.sh [--model-size 0.6b|8b] [--profile [--no-scalene]] set -e -MODEL_SIZE="${1:-0.6b}" +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_DIR="$(cd "$SCRIPT_DIR/../../.." && pwd)" + +MODEL_SIZE="0.6b" +PROFILE="" +NO_SCALENE="" +while [[ $# -gt 0 ]]; do + case $1 in + --model-size) MODEL_SIZE="$2"; shift 2 ;; + --profile) PROFILE=1; shift ;; + --no-scalene) NO_SCALENE=1; shift ;; + *) shift ;; + esac +done echo "==========================================" echo "Qwen3 Embedding Test Script" @@ -35,11 +48,51 @@ else echo "✓ Weights found at $WEIGHTS_PATH" fi -# Step 3: Run example +# Ensure NRT inspect is disabled (conflicts with SystemTraceSession) +unset NEURON_RT_INSPECT_ENABLE NEURON_RT_INSPECT_OUTPUT_DIR 2>/dev/null || true + +# Step 3: Run echo "" -echo "[3/3] Running retrieval example..." -echo "==========================================" -python example_retrieval.py --model-size "$MODEL_SIZE" --compare +if [ -n "$PROFILE" ]; then + echo "[3/3] Running profiled inference..." + echo "==========================================" + + PROFILE_ARGS="--model-size $MODEL_SIZE --num-iterations 100" + + if [ -z "$NO_SCALENE" ]; then + # Scalene CPU + device tracing + cd "$SCRIPT_DIR" + scalene run --off --profile-all --profile-only model.py --cpu-only \ + -o "$SCRIPT_DIR/scalene_profile.json" \ + "$SCRIPT_DIR/profile.py" --- $PROFILE_ARGS + + # scalene_profile.json may land in the model dir due to chdir + if [ -f "$SCRIPT_DIR/scalene_profile.json" ]; then + cd "$REPO_DIR" + uv run python -m nkipy.tools.profiler \ + "$SCRIPT_DIR/scalene_profile.json" \ + "$SCRIPT_DIR/kernel_profile.json" \ + "$SCRIPT_DIR/merged_profile.json" + fi + echo "" + echo "View: scalene view $SCRIPT_DIR/merged_profile.json" + else + # Device tracing only + cd "$SCRIPT_DIR" + python profile.py --no-scalene $PROFILE_ARGS + + cd "$REPO_DIR" + uv run python -m nkipy.tools.profiler --kernel-only \ + "$SCRIPT_DIR/kernel_profile.json" \ + "$SCRIPT_DIR/merged_profile.json" + echo "" + echo "View: scalene view $SCRIPT_DIR/merged_profile.json" + fi +else + echo "[3/3] Running retrieval example..." + echo "==========================================" + python example_retrieval.py --model-size "$MODEL_SIZE" --compare +fi echo "" echo "==========================================" diff --git a/nkipy/src/nkipy/tools/profiler/__init__.py b/nkipy/src/nkipy/tools/profiler/__init__.py new file mode 100644 index 0000000..06357a3 --- /dev/null +++ b/nkipy/src/nkipy/tools/profiler/__init__.py @@ -0,0 +1,14 @@ +"""NKIPy profiling tools: kernel profiling and profile merging.""" + +from .kernel_profiler import KernelExecution, KernelProfiler, KernelProfileResult +from .merge_profiles import merge_kernel_only, merge_scalene_and_kernel_profiles +from .trace_timeline import TraceTimeline + +__all__ = [ + "KernelProfiler", + "KernelProfileResult", + "KernelExecution", + "TraceTimeline", + "merge_scalene_and_kernel_profiles", + "merge_kernel_only", +] diff --git a/nkipy/src/nkipy/tools/profiler/__main__.py b/nkipy/src/nkipy/tools/profiler/__main__.py new file mode 100644 index 0000000..a6fea2b --- /dev/null +++ b/nkipy/src/nkipy/tools/profiler/__main__.py @@ -0,0 +1,28 @@ +"""CLI entry point: python -m nkipy.tools.profiler ...""" + +import sys + +from .merge_profiles import merge_kernel_only, merge_scalene_and_kernel_profiles + +USAGE = ( + "Usage: python -m nkipy.tools.profiler " + " \n" + " python -m nkipy.tools.profiler --kernel-only " + " [more...] " +) + +args = sys.argv[1:] + +if "--kernel-only" in args: + args.remove("--kernel-only") + if len(args) < 2: + print(USAGE) + sys.exit(1) + output = args[-1] + kernel_paths = args[:-1] + merge_kernel_only(kernel_paths, output) +elif len(args) == 3: + merge_scalene_and_kernel_profiles(args[0], args[1], args[2]) +else: + print(USAGE) + sys.exit(1) diff --git a/nkipy/src/nkipy/tools/profiler/kernel_profiler.py b/nkipy/src/nkipy/tools/profiler/kernel_profiler.py new file mode 100644 index 0000000..38e7339 --- /dev/null +++ b/nkipy/src/nkipy/tools/profiler/kernel_profiler.py @@ -0,0 +1,268 @@ +"""Kernel profiler for correlating device traces with Python source lines.""" + +import json +import logging +import os +import sys +import time +from dataclasses import dataclass, field +from pathlib import Path +from typing import Optional + +from spike import SpikeModel +from spike._spike import SystemTraceSession + +from .trace_timeline import _parse_trace_timeline + +logger = logging.getLogger(__name__) + + +# Paths within the framework that should be skipped when walking frames +_SKIP_PATH_FRAGMENTS = ( + os.sep + os.path.join("spike", "src", "spike") + os.sep, + os.sep + os.path.join("nkipy", "src", "nkipy", "runtime") + os.sep, + os.sep + os.path.join("nkipy", "src", "nkipy", "core") + os.sep, + os.sep + os.path.join("nkipy", "src", "nkipy", "tools", "profiler") + os.sep, +) + + +def _find_user_frame(): + """Walk the call stack to find the first user-code frame. + + Skips frames inside spike/ and nkipy/ internals so that source lines + point to the actual model code (e.g. model.py) rather than framework + wrappers. The merge step handles creating entries for files that scalene + didn't profile. + + Uses sys._getframe() for speed (avoids reading source lines unlike + inspect.stack()). + """ + frame = sys._getframe(1) + while frame is not None: + filename = frame.f_code.co_filename + if not any(frag in filename for frag in _SKIP_PATH_FRAGMENTS): + return filename, frame.f_lineno + frame = frame.f_back + return "", 0 + + +@dataclass +class KernelExecution: + """A single kernel execution with its source location.""" + + filename: str + lineno: int + kernel_name: str + call_index: int + + +@dataclass +class KernelProfileResult: + """Result of a kernel profiling session.""" + + kernel_calls: list[KernelExecution] = field(default_factory=list) + events_json: str = "" + wall_start_ns: int = 0 + wall_stop_ns: int = 0 + + def save(self, path: str | Path) -> None: + """Save the profile result to a JSON file.""" + data = { + "kernel_calls": [ + { + "filename": kc.filename, + "lineno": kc.lineno, + "kernel_name": kc.kernel_name, + "call_index": kc.call_index, + } + for kc in self.kernel_calls + ], + "events_json": self.events_json, + "wall_start_ns": self.wall_start_ns, + "wall_stop_ns": self.wall_stop_ns, + } + Path(path).write_text(json.dumps(data, indent=2)) + + @classmethod + def load(cls, path: str | Path) -> "KernelProfileResult": + """Load a profile result from a JSON file.""" + data = json.loads(Path(path).read_text()) + kernel_calls = [KernelExecution(**kc) for kc in data["kernel_calls"]] + return cls( + kernel_calls=kernel_calls, + events_json=data["events_json"], + wall_start_ns=data.get("wall_start_ns", 0), + wall_stop_ns=data.get("wall_stop_ns", 0), + ) + + +def _get_current_rank() -> int: + """Get the current distributed rank, or 0 if not in a distributed context.""" + try: + import torch.distributed as dist + + if dist.is_initialized(): + return dist.get_rank() + except ImportError: + pass + return 0 + + +class KernelProfiler: + """Context manager that profiles kernel executions with device traces. + + Simultaneously: + - Starts a SystemTraceSession to capture device timing + - Monkey-patches SpikeModel.__call__ to record source locations + - Optionally starts/stops scalene CPU profiling + + Usage:: + + with KernelProfiler(core_id=0, output_path="kernel_profile.json"): + model.forward(input_ids, attention_mask) + + For distributed runs, use ``target_ranks`` to control which ranks profile:: + + with KernelProfiler(core_id=0, target_ranks=[0]): + model.forward(input_ids, attention_mask) + + When run under ``scalene --off``, the profiler controls exactly which + region gets CPU-profiled, keeping compilation/warmup out of the profile. + """ + + def __init__( + self, + core_id: Optional[int] = None, + scalene: bool = True, + output_path: str | Path = "kernel_profile.json", + target_ranks: Optional[list[int]] = None, + ): + self._core_id = core_id + self._scalene_enabled = scalene + self._output_path = Path(output_path) + self._target_ranks = target_ranks + self._result: Optional[KernelProfileResult] = None + self._trace: Optional[SystemTraceSession] = None + self._original_call: Optional[object] = None + self._kernel_calls: list[KernelExecution] = [] + self._call_index = 0 + self._scalene_profiler = None + self._wall_start_ns = 0 + self._wall_stop_ns = 0 + self._active = True # Whether this rank is actively profiling + + def __enter__(self): + # Check if current rank should be profiled + if self._target_ranks is not None: + rank = _get_current_rank() + if rank not in self._target_ranks: + self._active = False + return self + + self._kernel_calls = [] + self._call_index = 0 + + # Monkey-patch SpikeModel.__call__ + self._original_call = SpikeModel.__call__ + profiler_self = self + + original_call = self._original_call + + def patched_call(model_self, *args, **kwargs): + filename, lineno = _find_user_frame() + profiler_self._kernel_calls.append( + KernelExecution( + filename=filename, + lineno=lineno, + kernel_name=model_self.name, + call_index=profiler_self._call_index, + ) + ) + profiler_self._call_index += 1 + return original_call(model_self, *args, **kwargs) + + SpikeModel.__call__ = patched_call + + # Start device trace + self._trace = SystemTraceSession(self._core_id) + self._trace.__enter__() + + # Start scalene profiling if enabled + if self._scalene_enabled: + try: + from scalene import scalene_profiler + + self._scalene_profiler = scalene_profiler + scalene_profiler.start() + except (ImportError, SystemExit, Exception) as e: + logger.debug(f"Scalene not available, skipping: {e}") + self._scalene_profiler = None + + # Record wall clock start + self._wall_start_ns = time.time_ns() + + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if not self._active: + return False + + # Record wall clock stop + self._wall_stop_ns = time.time_ns() + + # Stop scalene profiling first (before any cleanup overhead) + if self._scalene_profiler is not None: + try: + self._scalene_profiler.stop() + except Exception as e: + logger.debug(f"Scalene stop failed: {e}") + + # Restore original __call__ + if self._original_call is not None: + SpikeModel.__call__ = self._original_call + self._original_call = None + + # Fetch trace events and stop session + events_json = "" + if self._trace is not None: + events_json = self._trace.fetch_events_json() + self._trace.__exit__(exc_type, exc_val, exc_tb) + self._trace = None + + # Determine output path (include rank suffix for multi-rank) + output_path = self._output_path + if self._target_ranks is not None and len(self._target_ranks) > 1: + rank = _get_current_rank() + stem = output_path.stem + output_path = output_path.with_name( + f"{stem}_rank{rank}{output_path.suffix}" + ) + + # Build result + self._result = KernelProfileResult( + kernel_calls=list(self._kernel_calls), + events_json=events_json, + wall_start_ns=self._wall_start_ns, + wall_stop_ns=self._wall_stop_ns, + ) + + # Save to disk + self._result.save(output_path) + + # Print one-line summary + timeline = _parse_trace_timeline(events_json) + total_nc_ms = sum(timeline.nc_durations_ms) + wall_ms = (self._wall_stop_ns - self._wall_start_ns) / 1_000_000.0 + util_pct = (total_nc_ms / wall_ms * 100) if wall_ms > 0 else 0.0 + n_calls = len(self._kernel_calls) + print( + f"Kernel profile: {n_calls} calls, {total_nc_ms:.1f}ms NC, " + f"{util_pct:.1f}% utilization -> {output_path}" + ) + + return False + + @property + def result(self) -> Optional[KernelProfileResult]: + """Access the profile result after the context manager exits.""" + return self._result diff --git a/nkipy/src/nkipy/tools/profiler/merge_profiles.py b/nkipy/src/nkipy/tools/profiler/merge_profiles.py new file mode 100644 index 0000000..70c57dd --- /dev/null +++ b/nkipy/src/nkipy/tools/profiler/merge_profiles.py @@ -0,0 +1,554 @@ +"""Merge scalene CPU profiles with kernel device profiles. + +Usage:: + + python -m nkipy.tools.profiler scalene.json kernel_profile.json merged.json + python -m nkipy.tools.profiler --kernel-only kernel_profile.json output.json + python -m nkipy.tools.profiler --kernel-only kp_rank0.json kp_rank1.json output.json +""" + +import copy +import json +import linecache +import os +import sys +from collections import defaultdict +from pathlib import Path + +from .kernel_profiler import KernelProfileResult +from .trace_timeline import _parse_trace_timeline + +# Default values for scalene LineData fields the GUI accesses unconditionally. +_LINE_DEFAULTS = { + "line": "", + "n_cpu_percent_python": 0.0, + "n_cpu_percent_c": 0.0, + "n_sys_percent": 0.0, + "n_core_utilization": 0.0, + "n_peak_mb": 0.0, + "n_avg_mb": 0.0, + "n_python_fraction": 0.0, + "n_copy_mb_s": 0.0, + "n_copy_mb": 0.0, + "n_gpu_percent": 0.0, + "n_gpu_peak_memory_mb": 0.0, + "n_gpu_avg_memory_mb": 0.0, + "n_usage_fraction": 0.0, + "n_malloc_mb": 0.0, + "n_mallocs": 0, + "n_growth_mb": 0.0, + "memory_samples": [], + "start_region_line": 0, + "end_region_line": 0, + "start_function_line": 0, + "end_function_line": 0, + "start_outermost_loop": 0, + "end_outermost_loop": 0, + "cpu_samples_list": [], + "async_task_names": [], + "is_coroutine": False, + "n_async_await_percent": 0.0, + "n_async_concurrency_mean": 0.0, + "n_async_concurrency_peak": 0.0, +} + +_FILE_DEFAULTS = { + "functions": [], + "imports": [], + "percent_cpu_time": 0.0, +} + + +def _load_scalene_json(scalene_path: str | Path) -> dict: + """Load scalene JSON, handling both raw JSON and HTML-embedded JSON.""" + content = Path(scalene_path).read_text().strip() + if "" in content: + marker = "const profile = " + json_start = content.find(marker) + len(marker) + if json_start > len(marker) - 1: + brace_count = 0 + json_end = json_start + for i, char in enumerate(content[json_start:]): + if char == "{": + brace_count += 1 + elif char == "}": + brace_count -= 1 + if brace_count == 0: + json_end = json_start + i + 1 + break + content = content[json_start:json_end] + return json.loads(content) + + +def _normalize_path(p: str) -> str: + """Normalize a file path for comparison.""" + try: + return os.path.realpath(p) + except (OSError, ValueError): + return p + + +def _match_filename(kernel_filename: str, scalene_filenames: list[str]) -> str | None: + """Match a kernel filename to one of the scalene profile filenames. + + Tries realpath match first, then falls back to basename match. + """ + norm_kernel = _normalize_path(kernel_filename) + for sf in scalene_filenames: + if _normalize_path(sf) == norm_kernel: + return sf + + # Fallback: basename match + kernel_base = os.path.basename(kernel_filename) + for sf in scalene_filenames: + if os.path.basename(sf) == kernel_base: + return sf + + return None + + +def _calculate_cpu_sample_overlap( + cpu_samples_list: list[float], + nc_intervals_sec: list[tuple[float, float]], + start_time_absolute: float, + start_time_perf: float, +) -> float: + """Calculate what percent of CPU samples overlap with NC execution intervals. + + Args: + cpu_samples_list: CPU sample timestamps (perf_counter values). + nc_intervals_sec: (start, stop) in absolute seconds for nc_exec_running. + start_time_absolute: Scalene's absolute start time (Unix timestamp seconds). + start_time_perf: Scalene's perf_counter start time (seconds). + + Returns: + Overlap percentage (0-100). + """ + if not cpu_samples_list or not nc_intervals_sec: + return 0.0 + + overlap_count = 0 + for sample_perf in cpu_samples_list: + sample_abs = start_time_absolute + (sample_perf - start_time_perf) + for start, end in nc_intervals_sec: + if start <= sample_abs <= end: + overlap_count += 1 + break + + return (overlap_count / len(cpu_samples_list) * 100) if cpu_samples_list else 0.0 + + +def _print_utilization_summary( + profile: KernelProfileResult, + timeline, + total_nc_time: float, + total_nrt_time: float, + line_nc_durations: dict, + line_nrt_durations: dict, + line_counts: dict, + line_kernel_names: dict, +) -> None: + """Print utilization summary to stdout.""" + wall_ms = (profile.wall_stop_ns - profile.wall_start_ns) / 1_000_000.0 + idle_ms = wall_ms - total_nc_time if wall_ms > total_nc_time else 0.0 + util_pct = (total_nc_time / wall_ms * 100) if wall_ms > 0 else 0.0 + + print("\n=== Device Profiling Summary ===") + print(f"Wall time: {wall_ms:.1f} ms") + print(f"NRT execution: {total_nrt_time:.1f} ms (host-side)") + print(f"NC execution: {total_nc_time:.1f} ms ({util_pct:.1f}% of wall)") + if total_nrt_time > 0: + print( + f"NC/NRT ratio: {total_nc_time / total_nrt_time:.2f} " + f"({'async' if total_nc_time > total_nrt_time else 'sync'})" + ) + print(f"Device idle: {idle_ms:.1f} ms ({100 - util_pct:.1f}%)") + + if timeline.gap_breakdown_ms and idle_ms > 0: + sorted_gaps = sorted( + timeline.gap_breakdown_ms.items(), key=lambda x: x[1], reverse=True + ) + category_labels = { + "d2h_transfer": "D2H transfers", + "h2d_transfer": "H2D transfers", + "memory_mgmt": "Memory mgmt", + "exec_overhead": "Exec overhead", + "sync_wait": "Sync waits", + } + for category, ms in sorted_gaps: + label = category_labels.get(category, category) + pct_of_idle = (ms / idle_ms * 100) if idle_ms > 0 else 0.0 + print(f" {label + ':':18s} {ms:6.1f} ms ({pct_of_idle:4.1f}% of idle)") + + # Per-kernel-name aggregation + kernel_agg: dict[str, dict] = {} + for kc in profile.kernel_calls: + key = (kc.filename, kc.lineno) + nc_dur = line_nc_durations.get(key, 0.0) / line_counts.get(key, 1) + nrt_dur = line_nrt_durations.get(key, 0.0) / line_counts.get(key, 1) + name = kc.kernel_name + if name not in kernel_agg: + kernel_agg[name] = {"nc_time_ms": 0.0, "nrt_time_ms": 0.0, "count": 0} + kernel_agg[name]["nc_time_ms"] += nc_dur + kernel_agg[name]["nrt_time_ms"] += nrt_dur + kernel_agg[name]["count"] += 1 + + if kernel_agg: + print("\nTop kernels:") + sorted_kernels = sorted( + kernel_agg.items(), key=lambda x: x[1]["nc_time_ms"], reverse=True + ) + for name, agg in sorted_kernels: + nc_pct = ( + (agg["nc_time_ms"] / total_nc_time * 100) if total_nc_time > 0 else 0.0 + ) + nc_avg = agg["nc_time_ms"] / agg["count"] if agg["count"] > 0 else 0.0 + nrt_avg = agg["nrt_time_ms"] / agg["count"] if agg["count"] > 0 else 0.0 + print( + f" {name:30s} {agg['nc_time_ms']:8.1f} ms ({nc_pct:5.1f}%) " + f"{agg['count']:5d} calls nc={nc_avg:.3f} nrt={nrt_avg:.3f} ms/call" + ) + + +def merge_scalene_and_kernel_profiles( + scalene_json_path: str | Path, + kernel_profile_path: str | Path, + output_path: str | Path, +) -> None: + """Merge a scalene CPU profile with a kernel device profile. + + The merged output adds per-line device timing fields (``nrt_time_ms``, + ``nc_time_ms``, etc.) to the scalene JSON so the scalene GUI can render + NeuronCore time bars alongside CPU time. + + Args: + scalene_json_path: Path to scalene output JSON (or HTML). + kernel_profile_path: Path to KernelProfileResult JSON. + output_path: Where to write the merged JSON. + """ + scalene_data = _load_scalene_json(scalene_json_path) + profile = KernelProfileResult.load(kernel_profile_path) + + # Parse full timeline for utilization analysis + timeline = _parse_trace_timeline(profile.events_json) + nc_durations = timeline.nc_durations_ms + nrt_durations = timeline.nrt_durations_ms + + # Correlate by call index: kernel_calls[i] -> nc/nrt durations[i] + # Aggregate per (filename, lineno) + line_nc_durations: dict[tuple[str, int], float] = defaultdict(float) + line_nrt_durations: dict[tuple[str, int], float] = defaultdict(float) + line_counts: dict[tuple[str, int], int] = defaultdict(int) + line_kernel_names: dict[tuple[str, int], str] = {} + + for i, kc in enumerate(profile.kernel_calls): + nc_dur = nc_durations[i] if i < len(nc_durations) else 0.0 + nrt_dur = nrt_durations[i] if i < len(nrt_durations) else 0.0 + key = (kc.filename, kc.lineno) + line_nc_durations[key] += nc_dur + line_nrt_durations[key] += nrt_dur + line_counts[key] += 1 + if key not in line_kernel_names: + line_kernel_names[key] = kc.kernel_name + + total_nc_time = sum(nc_durations) if nc_durations else 0.0 + total_nrt_time = sum(nrt_durations) if nrt_durations else 0.0 + + # CPU sample overlap: convert NC host intervals to seconds + nc_intervals_sec: list[tuple[float, float]] = [] + for start_ns, stop_ns in timeline.nc_host_intervals_ns: + nc_intervals_sec.append((start_ns / 1e9, stop_ns / 1e9)) + + start_time_absolute = scalene_data.get("start_time_absolute", 0.0) + start_time_perf = scalene_data.get("start_time_perf", 0.0) + + # Inject into scalene JSON, creating missing file/line entries as needed. + if "files" not in scalene_data: + scalene_data["files"] = {} + + scalene_filenames = list(scalene_data["files"].keys()) + + # Process CPU sample overlap for existing lines + if nc_intervals_sec and start_time_absolute and start_time_perf: + for filename, file_data in scalene_data["files"].items(): + for line_data in file_data.get("lines", []): + cpu_samples = line_data.get("cpu_samples_list", []) + if cpu_samples: + overlap_pct = _calculate_cpu_sample_overlap( + cpu_samples, + nc_intervals_sec, + start_time_absolute, + start_time_perf, + ) + line_data["cpu_samples_nc_overlap_percent"] = overlap_pct + + for kernel_filename, lineno in line_nc_durations: + matched_file = _match_filename(kernel_filename, scalene_filenames) + + if matched_file is None: + matched_file = kernel_filename + scalene_data["files"][matched_file] = { + "lines": [], + **copy.deepcopy(_FILE_DEFAULTS), + } + scalene_filenames.append(matched_file) + + file_data = scalene_data["files"][matched_file] + if "lines" not in file_data: + file_data["lines"] = [] + # Ensure file-level defaults exist + for k, v in _FILE_DEFAULTS.items(): + if k not in file_data: + file_data[k] = copy.deepcopy(v) + + # Find existing line entry or create one with GUI-required defaults + line_data = None + for ld in file_data["lines"]: + if ld.get("lineno") == lineno: + line_data = ld + break + if line_data is None: + source_line = linecache.getline(kernel_filename, lineno).rstrip() + line_data = { + "lineno": lineno, + **copy.deepcopy(_LINE_DEFAULTS), + "line": source_line, + } + file_data["lines"].append(line_data) + + key = (kernel_filename, lineno) + nc_ms = line_nc_durations[key] + nrt_ms = line_nrt_durations.get(key, 0.0) + nc_percent = (nc_ms / total_nc_time * 100) if total_nc_time > 0 else 0.0 + nrt_percent = (nrt_ms / total_nrt_time * 100) if total_nrt_time > 0 else 0.0 + + line_data["nc_time_ms"] = nc_ms + line_data["nc_percent"] = nc_percent + line_data["nc_execute_count"] = line_counts[key] + line_data["nrt_time_ms"] = nrt_ms + line_data["nrt_percent"] = nrt_percent + if nrt_ms > 0: + line_data["nc_nrt_ratio"] = nc_ms / nrt_ms + + # Per-kernel-name aggregation -> function entries in scalene JSON + kernel_agg: dict[str, dict] = defaultdict( + lambda: {"nc_time_ms": 0.0, "nrt_time_ms": 0.0, "count": 0, "filename": ""} + ) + for i, kc in enumerate(profile.kernel_calls): + nc_dur = nc_durations[i] if i < len(nc_durations) else 0.0 + nrt_dur = nrt_durations[i] if i < len(nrt_durations) else 0.0 + agg = kernel_agg[kc.kernel_name] + agg["nc_time_ms"] += nc_dur + agg["nrt_time_ms"] += nrt_dur + agg["count"] += 1 + if not agg["filename"]: + agg["filename"] = kc.filename + + for kernel_name, agg in kernel_agg.items(): + filename = agg["filename"] + matched_file = _match_filename(filename, scalene_filenames) + if matched_file is None: + matched_file = filename + if matched_file not in scalene_data["files"]: + scalene_data["files"][matched_file] = { + "lines": [], + **copy.deepcopy(_FILE_DEFAULTS), + } + scalene_filenames.append(matched_file) + + file_data = scalene_data["files"][matched_file] + if "functions" not in file_data: + file_data["functions"] = [] + + nc_pct = (agg["nc_time_ms"] / total_nc_time * 100) if total_nc_time > 0 else 0.0 + nrt_pct = ( + (agg["nrt_time_ms"] / total_nrt_time * 100) if total_nrt_time > 0 else 0.0 + ) + func_entry = { + **copy.deepcopy(_LINE_DEFAULTS), + "line": kernel_name, + "nc_time_ms": agg["nc_time_ms"], + "nc_percent": nc_pct, + "nc_execute_count": agg["count"], + "nrt_time_ms": agg["nrt_time_ms"], + "nrt_percent": nrt_pct, + } + if agg["nrt_time_ms"] > 0: + func_entry["nc_nrt_ratio"] = agg["nc_time_ms"] / agg["nrt_time_ms"] + file_data["functions"].append(func_entry) + + # Top-level metadata + scalene_data["neuron_total_nc_time_ms"] = total_nc_time + scalene_data["neuron_total_time_ms"] = total_nrt_time + scalene_data["neuron_nc_event_count"] = len(nc_durations) + scalene_data["neuron_event_count"] = len(nrt_durations) + + Path(output_path).write_text(json.dumps(scalene_data, indent=2)) + print(f"Merged profile written to {output_path}") + print( + f"Total NRT time: {total_nrt_time:.2f}ms, NC time: {total_nc_time:.2f}ms " + f"({len(nc_durations)} events)" + ) + + # Print utilization summary + _print_utilization_summary( + profile, + timeline, + total_nc_time, + total_nrt_time, + line_nc_durations, + line_nrt_durations, + line_counts, + line_kernel_names, + ) + + +def merge_kernel_only( + kernel_profile_paths: list[str | Path], + output_path: str | Path, +) -> None: + """Merge one or more kernel profiles without a scalene CPU profile. + + Creates a scalene-compatible JSON with device timing only. + + Args: + kernel_profile_paths: Paths to KernelProfileResult JSON files. + output_path: Where to write the merged JSON. + """ + scalene_data: dict = {"files": {}} + all_summaries: list[dict] = [] + + for rank_idx, kp_path in enumerate(kernel_profile_paths): + profile = KernelProfileResult.load(kp_path) + timeline = _parse_trace_timeline(profile.events_json) + nc_durations = timeline.nc_durations_ms + nrt_durations = timeline.nrt_durations_ms + + total_nc_time = sum(nc_durations) if nc_durations else 0.0 + total_nrt_time = sum(nrt_durations) if nrt_durations else 0.0 + wall_ms = (profile.wall_stop_ns - profile.wall_start_ns) / 1_000_000.0 + util_pct = (total_nc_time / wall_ms * 100) if wall_ms > 0 else 0.0 + + all_summaries.append( + { + "rank": rank_idx, + "total_nc_ms": total_nc_time, + "total_nrt_ms": total_nrt_time, + "wall_ms": wall_ms, + "util_pct": util_pct, + "n_events": len(nc_durations), + "timeline": timeline, + "profile": profile, + } + ) + + # Aggregate per (filename, lineno) + line_nc_durations: dict[tuple[str, int], float] = defaultdict(float) + line_nrt_durations: dict[tuple[str, int], float] = defaultdict(float) + line_counts: dict[tuple[str, int], int] = defaultdict(int) + line_kernel_names: dict[tuple[str, int], str] = {} + + for i, kc in enumerate(profile.kernel_calls): + nc_dur = nc_durations[i] if i < len(nc_durations) else 0.0 + nrt_dur = nrt_durations[i] if i < len(nrt_durations) else 0.0 + key = (kc.filename, kc.lineno) + line_nc_durations[key] += nc_dur + line_nrt_durations[key] += nrt_dur + line_counts[key] += 1 + if key not in line_kernel_names: + line_kernel_names[key] = kc.kernel_name + + scalene_filenames = list(scalene_data["files"].keys()) + + for kernel_filename, lineno in line_nc_durations: + matched_file = _match_filename(kernel_filename, scalene_filenames) + + if matched_file is None: + matched_file = kernel_filename + if matched_file not in scalene_data["files"]: + scalene_data["files"][matched_file] = { + "lines": [], + **copy.deepcopy(_FILE_DEFAULTS), + } + scalene_filenames.append(matched_file) + + file_data = scalene_data["files"][matched_file] + if "lines" not in file_data: + file_data["lines"] = [] + for k, v in _FILE_DEFAULTS.items(): + if k not in file_data: + file_data[k] = copy.deepcopy(v) + + line_data = None + for ld in file_data["lines"]: + if ld.get("lineno") == lineno: + line_data = ld + break + if line_data is None: + source_line = linecache.getline(kernel_filename, lineno).rstrip() + line_data = { + "lineno": lineno, + **copy.deepcopy(_LINE_DEFAULTS), + "line": source_line, + } + file_data["lines"].append(line_data) + + key = (kernel_filename, lineno) + nc_ms = line_nc_durations[key] + nrt_ms = line_nrt_durations.get(key, 0.0) + nc_percent = (nc_ms / total_nc_time * 100) if total_nc_time > 0 else 0.0 + nrt_percent = (nrt_ms / total_nrt_time * 100) if total_nrt_time > 0 else 0.0 + + # Accumulate across ranks + line_data["nc_time_ms"] = line_data.get("nc_time_ms", 0.0) + nc_ms + line_data["nc_percent"] = nc_percent + line_data["nc_execute_count"] = ( + line_data.get("nc_execute_count", 0) + line_counts[key] + ) + line_data["nrt_time_ms"] = line_data.get("nrt_time_ms", 0.0) + nrt_ms + line_data["nrt_percent"] = nrt_percent + if nrt_ms > 0: + line_data["nc_nrt_ratio"] = nc_ms / nrt_ms + + # Global metadata + total_nc_all = sum(s["total_nc_ms"] for s in all_summaries) + total_nrt_all = sum(s["total_nrt_ms"] for s in all_summaries) + total_events_all = sum(s["n_events"] for s in all_summaries) + max_wall_ms = max(s["wall_ms"] for s in all_summaries) if all_summaries else 0.0 + scalene_data["neuron_total_nc_time_ms"] = total_nc_all + scalene_data["neuron_total_time_ms"] = total_nrt_all + scalene_data["neuron_nc_event_count"] = total_events_all + + # Scalene GUI required top-level fields + scalene_data.setdefault("elapsed_time_sec", max_wall_ms / 1000.0) + scalene_data.setdefault("gpu", False) + scalene_data.setdefault("gpu_device", "") + scalene_data.setdefault("memory", False) + scalene_data.setdefault("max_footprint_mb", 0) + scalene_data.setdefault("samples", []) + scalene_data.setdefault("growth_rate", 0.0) + + Path(output_path).write_text(json.dumps(scalene_data, indent=2)) + + n_ranks = len(all_summaries) + if n_ranks > 1: + print(f"\n=== Device Profiling Summary ({n_ranks} ranks) ===") + for s in all_summaries: + print( + f"Rank {s['rank']}: {s['total_nc_ms']:.1f}ms NC " + f"({s['util_pct']:.1f}% util)" + ) + else: + s = all_summaries[0] + _print_utilization_summary( + s["profile"], + s["timeline"], + s["total_nc_ms"], + s["total_nrt_ms"], + line_nc_durations, + line_nrt_durations, + line_counts, + line_kernel_names, + ) + + print(f"\nMerged profile written to {output_path}") diff --git a/nkipy/src/nkipy/tools/profiler/trace_timeline.py b/nkipy/src/nkipy/tools/profiler/trace_timeline.py new file mode 100644 index 0000000..94f218a --- /dev/null +++ b/nkipy/src/nkipy/tools/profiler/trace_timeline.py @@ -0,0 +1,196 @@ +"""Trace timeline parsing for NRT system trace events.""" + +import json +from dataclasses import dataclass, field + + +@dataclass +class TraceTimeline: + """Parsed timeline from a device trace, including gap analysis. + + Attributes: + nc_durations_ms: Device-clock durations for each nc_exec_running event. + nc_host_intervals_ns: Host-clock (start_ns, stop_ns) for nc_exec_running. + nrt_durations_ms: Host-clock durations for each nrt_execute event. + nrt_host_intervals_ns: Host-clock (start_ns, stop_ns) for nrt_execute. + wall_span_ns: (first_event_ts, last_event_ts) across all events. + gap_breakdown_ms: Category -> total ms during NC-idle gaps. + """ + + nc_durations_ms: list[float] = field(default_factory=list) + nc_host_intervals_ns: list[tuple[int, int]] = field(default_factory=list) + nrt_durations_ms: list[float] = field(default_factory=list) + nrt_host_intervals_ns: list[tuple[int, int]] = field(default_factory=list) + wall_span_ns: tuple[int, int] = (0, 0) + gap_breakdown_ms: dict[str, float] = field(default_factory=dict) + + +# Mapping from event_type to human-readable gap category +_GAP_CATEGORIES: dict[str, str] = { + # D2H transfers + "nrt_tensor_read": "d2h_transfer", + "dmem_buf_copyout": "d2h_transfer", + # H2D transfers + "nrt_tensor_write": "h2d_transfer", + "dmem_buf_copyin": "h2d_transfer", + # Memory management + "nrt_tensor_allocate": "memory_mgmt", + "nrt_tensor_free": "memory_mgmt", + "nrt_dma_mem_alloc": "memory_mgmt", + "nrt_dma_mem_dealloc": "memory_mgmt", + # Exec overhead + "nrt_execute": "exec_overhead", + "nrt_model_submit": "exec_overhead", + "kbl_exec_pre": "exec_overhead", + "kbl_exec_post": "exec_overhead", + # Sync waits + "nrt_async_sema_wait": "sync_wait", + "tensor_block_while_exec": "sync_wait", +} + + +def _parse_trace_timeline(events_json: str) -> TraceTimeline: + """Parse a full trace timeline with gap analysis from NRT sys trace JSON. + + Parses nc_exec_running events for device durations and host intervals, + then analyzes gaps between consecutive NC executions to determine what + the device was doing while idle (transfers, memory ops, sync waits, etc.). + + Returns: + TraceTimeline with durations, host intervals, wall span, and gap breakdown. + """ + if not events_json: + return TraceTimeline() + + root = json.loads(events_json) + events = root.get("events", []) + + if not events: + return TraceTimeline() + + # Collect start/stop pairs for all event types using tracking_id + timestamp_ns + # Key: (event_type, tracking_id) -> start timestamp_ns (host clock) + starts: dict[tuple[str, int], int] = {} + # Intervals per event type: event_type -> list of (start_ns, stop_ns) + type_intervals: dict[str, list[tuple[int, int]]] = {} + + # NC-specific: device clock durations + nc_device_starts: dict[int, int] = {} # tracking_id -> nc_timestamp_ns + nc_durations_ms: list[float] = [] + nc_host_intervals_ns: list[tuple[int, int]] = [] + + # NRT-specific: host clock durations for nrt_execute + nrt_durations_ms: list[float] = [] + nrt_host_intervals_ns: list[tuple[int, int]] = [] + + # Track wall span + min_ts = float("inf") + max_ts = float("-inf") + + for event in events: + event_type = event.get("event_type") + phase = event.get("phase") + tracking_id = event.get("tracking_id") + ts_ns = event.get("timestamp_ns") + data = event.get("data", {}) + + if tracking_id is None: + continue + + # Track wall span across all events with host timestamps + if ts_ns is not None: + if ts_ns < min_ts: + min_ts = ts_ns + if ts_ns > max_ts: + max_ts = ts_ns + + # NC exec running: collect both device-clock durations and host intervals + if event_type == "nc_exec_running": + nc_ts = data.get("nc_timestamp_ns") + if phase == "start": + if nc_ts is not None: + nc_device_starts[tracking_id] = nc_ts + if ts_ns is not None: + starts[("nc_exec_running", tracking_id)] = ts_ns + elif phase == "stop": + # Device clock duration + if nc_ts is not None: + start_nc = nc_device_starts.pop(tracking_id, None) + if start_nc is not None: + nc_durations_ms.append((nc_ts - start_nc) / 1_000_000.0) + # Host clock interval + if ts_ns is not None: + start_host = starts.pop(("nc_exec_running", tracking_id), None) + if start_host is not None: + nc_host_intervals_ns.append((start_host, ts_ns)) + continue + + # nrt_execute: collect per-call host-clock durations AND feed into + # gap analysis (exec_overhead category). Does not have device clock. + if event_type == "nrt_execute": + if phase == "start" and ts_ns is not None: + starts[("nrt_execute", tracking_id)] = ts_ns + elif phase == "stop" and ts_ns is not None: + start_ts = starts.pop(("nrt_execute", tracking_id), None) + if start_ts is not None: + nrt_durations_ms.append((ts_ns - start_ts) / 1_000_000.0) + nrt_host_intervals_ns.append((start_ts, ts_ns)) + # Also record for gap analysis + type_intervals.setdefault("nrt_execute", []).append( + (start_ts, ts_ns) + ) + continue + + # All other categorized event types: collect host-clock intervals + if event_type in _GAP_CATEGORIES: + if phase == "start" and ts_ns is not None: + starts[(event_type, tracking_id)] = ts_ns + elif phase == "stop" and ts_ns is not None: + start_ts = starts.pop((event_type, tracking_id), None) + if start_ts is not None: + type_intervals.setdefault(event_type, []).append((start_ts, ts_ns)) + + # Compute wall span + if min_ts == float("inf"): + wall_span_ns = (0, 0) + else: + wall_span_ns = (int(min_ts), int(max_ts)) + + # Sort NC host intervals by start time + nc_host_intervals_ns.sort(key=lambda x: x[0]) + + # Compute gaps between consecutive NC executions + gap_breakdown_ms: dict[str, float] = {} + if len(nc_host_intervals_ns) >= 2: + gaps: list[tuple[int, int]] = [] + for i in range(len(nc_host_intervals_ns) - 1): + gap_start = nc_host_intervals_ns[i][1] + gap_end = nc_host_intervals_ns[i + 1][0] + if gap_end > gap_start: + gaps.append((gap_start, gap_end)) + + # For each gap, check which other event intervals overlap and attribute time + for gap_start, gap_end in gaps: + for event_type, intervals in type_intervals.items(): + category = _GAP_CATEGORIES[event_type] + for iv_start, iv_end in intervals: + # Compute overlap with this gap + overlap_start = max(gap_start, iv_start) + overlap_end = min(gap_end, iv_end) + if overlap_end > overlap_start: + overlap_ms = (overlap_end - overlap_start) / 1_000_000.0 + gap_breakdown_ms[category] = ( + gap_breakdown_ms.get(category, 0.0) + overlap_ms + ) + + # Sort NRT host intervals by start time + nrt_host_intervals_ns.sort(key=lambda x: x[0]) + + return TraceTimeline( + nc_durations_ms=nc_durations_ms, + nc_host_intervals_ns=nc_host_intervals_ns, + nrt_durations_ms=nrt_durations_ms, + nrt_host_intervals_ns=nrt_host_intervals_ns, + wall_span_ns=wall_span_ns, + gap_breakdown_ms=gap_breakdown_ms, + ) diff --git a/tests/unit/test_kernel_profiler.py b/tests/unit/test_kernel_profiler.py new file mode 100644 index 0000000..77152a2 --- /dev/null +++ b/tests/unit/test_kernel_profiler.py @@ -0,0 +1,1005 @@ +"""Tests for KernelProfiler and merge_profiles.""" + +import json +import tempfile +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from nkipy.tools.profiler.kernel_profiler import ( + KernelExecution, + KernelProfiler, + KernelProfileResult, + _find_user_frame, + _get_current_rank, +) +from nkipy.tools.profiler.merge_profiles import ( + _calculate_cpu_sample_overlap, + merge_kernel_only, + merge_scalene_and_kernel_profiles, +) +from nkipy.tools.profiler.trace_timeline import TraceTimeline, _parse_trace_timeline + + +# --- Helpers for building synthetic trace events --- + + +def _make_nc_exec_events( + durations_ms: list[float], + base_host_ns: int = 1_000_000_000, + base_device_ns: int = 1_000_000_000, + gap_ns: int = 1_000_000, +) -> list[dict]: + """Build synthetic nc_exec_running start/stop event pairs.""" + events = [] + host_ts = base_host_ns + device_ts = base_device_ns + for tid, dur_ms in enumerate(durations_ms): + dur_ns = int(dur_ms * 1_000_000) + events.append( + { + "event_type": "nc_exec_running", + "phase": "start", + "tracking_id": tid, + "timestamp_ns": host_ts, + "data": {"nc_timestamp_ns": device_ts}, + } + ) + events.append( + { + "event_type": "nc_exec_running", + "phase": "stop", + "tracking_id": tid, + "timestamp_ns": host_ts + dur_ns, + "data": {"nc_timestamp_ns": device_ts + dur_ns}, + } + ) + host_ts += dur_ns + gap_ns + device_ts += dur_ns + gap_ns + return events + + +def _make_nrt_execute_events( + durations_ms: list[float], + base_host_ns: int = 1_000_000_000, + gap_ns: int = 1_000_000, + base_tracking_id: int = 5000, +) -> list[dict]: + """Build synthetic nrt_execute start/stop event pairs (host clock only).""" + events = [] + host_ts = base_host_ns + for i, dur_ms in enumerate(durations_ms): + tid = base_tracking_id + i + dur_ns = int(dur_ms * 1_000_000) + events.append( + { + "event_type": "nrt_execute", + "phase": "start", + "tracking_id": tid, + "timestamp_ns": host_ts, + "data": {}, + } + ) + events.append( + { + "event_type": "nrt_execute", + "phase": "stop", + "tracking_id": tid, + "timestamp_ns": host_ts + dur_ns, + "data": {}, + } + ) + host_ts += dur_ns + gap_ns + return events + + +def _make_categorized_events( + event_type: str, + intervals_ns: list[tuple[int, int]], + base_tracking_id: int = 1000, +) -> list[dict]: + """Build synthetic start/stop pairs for a categorized event type.""" + events = [] + for i, (start_ns, stop_ns) in enumerate(intervals_ns): + tid = base_tracking_id + i + events.append( + { + "event_type": event_type, + "phase": "start", + "tracking_id": tid, + "timestamp_ns": start_ns, + "data": {}, + } + ) + events.append( + { + "event_type": event_type, + "phase": "stop", + "tracking_id": tid, + "timestamp_ns": stop_ns, + "data": {}, + } + ) + return events + + +def _events_json(events: list[dict]) -> str: + return json.dumps({"events": events}) + + +# --- TraceTimeline parsing --- + + +class TestParseTraceTimeline: + def test_empty_input(self): + tl = _parse_trace_timeline("") + assert tl.nc_durations_ms == [] + assert tl.nc_host_intervals_ns == [] + assert tl.nrt_durations_ms == [] + assert tl.nrt_host_intervals_ns == [] + assert tl.wall_span_ns == (0, 0) + assert tl.gap_breakdown_ms == {} + + def test_empty_events(self): + tl = _parse_trace_timeline('{"events": []}') + assert tl.nc_durations_ms == [] + assert tl.nrt_durations_ms == [] + + def test_nc_durations_and_host_intervals(self): + events = _make_nc_exec_events([5.0, 3.0]) + tl = _parse_trace_timeline(_events_json(events)) + + assert len(tl.nc_durations_ms) == 2 + assert tl.nc_durations_ms[0] == pytest.approx(5.0) + assert tl.nc_durations_ms[1] == pytest.approx(3.0) + + assert len(tl.nc_host_intervals_ns) == 2 + # Intervals should be sorted by start + assert tl.nc_host_intervals_ns[0][0] < tl.nc_host_intervals_ns[1][0] + + def test_nrt_execute_parsed_separately(self): + """nrt_execute events produce separate durations from nc_exec_running.""" + nc_events = _make_nc_exec_events([5.0, 3.0]) + # nrt_execute is shorter (host-side async submit) + nrt_events = _make_nrt_execute_events([0.1, 0.08]) + tl = _parse_trace_timeline(_events_json(nc_events + nrt_events)) + + assert len(tl.nc_durations_ms) == 2 + assert len(tl.nrt_durations_ms) == 2 + assert tl.nrt_durations_ms[0] == pytest.approx(0.1) + assert tl.nrt_durations_ms[1] == pytest.approx(0.08) + assert len(tl.nrt_host_intervals_ns) == 2 + + def test_nrt_execute_in_gap_analysis(self): + """nrt_execute events also feed into gap breakdown as exec_overhead.""" + # NC: 0-5ms, gap 5-10ms, NC: 10-15ms + nc_events = _make_nc_exec_events( + [5.0, 5.0], + base_host_ns=0, + gap_ns=5_000_000, + ) + # nrt_execute during the gap: 6-7ms + nrt_events = _make_nrt_execute_events( + [1.0], + base_host_ns=6_000_000, + base_tracking_id=9000, + ) + tl = _parse_trace_timeline(_events_json(nc_events + nrt_events)) + assert "exec_overhead" in tl.gap_breakdown_ms + assert tl.gap_breakdown_ms["exec_overhead"] == pytest.approx(1.0) + + def test_wall_span(self): + events = _make_nc_exec_events([10.0], base_host_ns=500) + tl = _parse_trace_timeline(_events_json(events)) + + assert tl.wall_span_ns[0] == 500 + assert tl.wall_span_ns[1] == 500 + 10_000_000 # 10ms in ns + + def test_gap_breakdown_with_transfer(self): + """Transfer event during NC gap gets attributed correctly.""" + # NC exec: 0-5ms, then 10-15ms (gap from 5ms to 10ms) + nc_events = _make_nc_exec_events( + [5.0, 5.0], + base_host_ns=0, + gap_ns=5_000_000, # 5ms gap + ) + + # D2H transfer during the gap: 5.5ms to 8ms (host clock) + d2h_events = _make_categorized_events( + "nrt_tensor_read", + [(5_500_000, 8_000_000)], # 2.5ms within the 5ms gap + ) + + tl = _parse_trace_timeline(_events_json(nc_events + d2h_events)) + + assert "d2h_transfer" in tl.gap_breakdown_ms + assert tl.gap_breakdown_ms["d2h_transfer"] == pytest.approx(2.5) + + def test_gap_breakdown_multiple_categories(self): + """Multiple event types in a single gap.""" + # NC: 0-2ms, gap 2-7ms, NC: 7-9ms + nc_events = _make_nc_exec_events( + [2.0, 2.0], + base_host_ns=0, + gap_ns=5_000_000, # 5ms gap + ) + + # H2D during gap: 2.5-4ms (1.5ms) + h2d = _make_categorized_events( + "nrt_tensor_write", + [(2_500_000, 4_000_000)], + base_tracking_id=100, + ) + # Sync wait during gap: 4-6ms (2ms) + sync = _make_categorized_events( + "nrt_async_sema_wait", + [(4_000_000, 6_000_000)], + base_tracking_id=200, + ) + + tl = _parse_trace_timeline(_events_json(nc_events + h2d + sync)) + + assert tl.gap_breakdown_ms.get("h2d_transfer", 0.0) == pytest.approx(1.5) + assert tl.gap_breakdown_ms.get("sync_wait", 0.0) == pytest.approx(2.0) + + def test_event_outside_gap_not_attributed(self): + """Events that don't overlap NC gaps are not counted.""" + # NC: 0-10ms (single long exec, no gap) + nc_events = _make_nc_exec_events([10.0], base_host_ns=0) + + # D2H happening during NC exec (not a gap) + d2h = _make_categorized_events( + "nrt_tensor_read", + [(2_000_000, 5_000_000)], + ) + + tl = _parse_trace_timeline(_events_json(nc_events + d2h)) + + # No gaps exist (single NC execution), so no breakdown + assert tl.gap_breakdown_ms == {} + + def test_partial_overlap_with_gap(self): + """Event that partially overlaps a gap only counts the overlapping portion.""" + # NC: 0-5ms, gap 5-10ms, NC: 10-15ms + nc_events = _make_nc_exec_events( + [5.0, 5.0], + base_host_ns=0, + gap_ns=5_000_000, + ) + + # Memory event spans 3-7ms (overlaps gap from 5-7ms = 2ms) + mem = _make_categorized_events( + "nrt_tensor_allocate", + [(3_000_000, 7_000_000)], + ) + + tl = _parse_trace_timeline(_events_json(nc_events + mem)) + assert tl.gap_breakdown_ms.get("memory_mgmt", 0.0) == pytest.approx(2.0) + + +# --- KernelProfileResult save/load roundtrip --- + + +class TestKernelProfileResult: + def test_save_load_roundtrip(self, tmp_path): + original = KernelProfileResult( + kernel_calls=[ + KernelExecution( + filename="/home/user/model.py", + lineno=42, + kernel_name="attention", + call_index=0, + ), + KernelExecution( + filename="/home/user/model.py", + lineno=55, + kernel_name="ffn", + call_index=1, + ), + ], + events_json='{"events": []}', + wall_start_ns=1000000000, + wall_stop_ns=2000000000, + ) + + path = tmp_path / "profile.json" + original.save(path) + + loaded = KernelProfileResult.load(path) + assert len(loaded.kernel_calls) == 2 + assert loaded.kernel_calls[0].filename == "/home/user/model.py" + assert loaded.kernel_calls[0].lineno == 42 + assert loaded.kernel_calls[0].kernel_name == "attention" + assert loaded.kernel_calls[0].call_index == 0 + assert loaded.kernel_calls[1].lineno == 55 + assert loaded.kernel_calls[1].kernel_name == "ffn" + assert loaded.events_json == '{"events": []}' + assert loaded.wall_start_ns == 1000000000 + assert loaded.wall_stop_ns == 2000000000 + + def test_save_load_empty(self, tmp_path): + original = KernelProfileResult() + path = tmp_path / "empty.json" + original.save(path) + + loaded = KernelProfileResult.load(path) + assert loaded.kernel_calls == [] + assert loaded.events_json == "" + assert loaded.wall_start_ns == 0 + assert loaded.wall_stop_ns == 0 + + def test_load_legacy_format(self, tmp_path): + """Loading a file without wall_start_ns/wall_stop_ns defaults to 0.""" + data = { + "kernel_calls": [], + "events_json": "", + } + path = tmp_path / "legacy.json" + path.write_text(json.dumps(data)) + + loaded = KernelProfileResult.load(path) + assert loaded.wall_start_ns == 0 + assert loaded.wall_stop_ns == 0 + + +# --- KernelProfiler monkey-patching --- + + +class TestKernelProfiler: + def test_monkey_patch_records_calls(self, tmp_path): + """Verify monkey-patching records kernel name and restores on exit.""" + from spike.spike_model import SpikeModel + + original_call = SpikeModel.__call__ + + output_path = tmp_path / "kp.json" + + # Mock SystemTraceSession to avoid needing hardware + mock_trace = MagicMock() + mock_trace.__enter__ = MagicMock(return_value=mock_trace) + mock_trace.__exit__ = MagicMock(return_value=False) + mock_trace.fetch_events_json = MagicMock(return_value='{"events": []}') + + # Also mock the original __call__ so we don't need a real model + with ( + patch( + "nkipy.tools.profiler.kernel_profiler.SystemTraceSession", + return_value=mock_trace, + ), + patch.object(SpikeModel, "__call__", return_value=None) as mock_call, + ): + profiler = KernelProfiler(core_id=0, scalene=False, output_path=output_path) + with profiler: + # SpikeModel.__call__ should be patched (not the mock we set) + assert SpikeModel.__call__ is not mock_call + + # Simulate kernel calls via the monkey-patched __call__ + mock_model = MagicMock() + mock_model.name = "test_kernel" + SpikeModel.__call__(mock_model, inputs={}, outputs={}) + + # After exit, __call__ should be restored to the mock + assert SpikeModel.__call__ is mock_call + + # Check recorded calls + result = profiler.result + assert len(result.kernel_calls) == 1 + assert result.kernel_calls[0].kernel_name == "test_kernel" + assert result.kernel_calls[0].call_index == 0 + # Source location should point to this test file + assert "test_kernel_profiler.py" in result.kernel_calls[0].filename + + def test_output_saved_to_disk(self, tmp_path): + """Verify output JSON file is written on exit.""" + output_path = tmp_path / "kp.json" + + mock_trace = MagicMock() + mock_trace.__enter__ = MagicMock(return_value=mock_trace) + mock_trace.__exit__ = MagicMock(return_value=False) + mock_trace.fetch_events_json = MagicMock(return_value='{"events": []}') + + with patch( + "nkipy.tools.profiler.kernel_profiler.SystemTraceSession", + return_value=mock_trace, + ): + with KernelProfiler(core_id=0, scalene=False, output_path=output_path): + pass + + assert output_path.exists() + data = json.loads(output_path.read_text()) + assert "kernel_calls" in data + assert "events_json" in data + assert "wall_start_ns" in data + assert "wall_stop_ns" in data + assert data["wall_start_ns"] > 0 + assert data["wall_stop_ns"] >= data["wall_start_ns"] + + def test_scalene_import_failure_is_silent(self, tmp_path): + """When scalene is not installed, profiler should still work.""" + output_path = tmp_path / "kp.json" + + mock_trace = MagicMock() + mock_trace.__enter__ = MagicMock(return_value=mock_trace) + mock_trace.__exit__ = MagicMock(return_value=False) + mock_trace.fetch_events_json = MagicMock(return_value='{"events": []}') + + with patch( + "nkipy.tools.profiler.kernel_profiler.SystemTraceSession", + return_value=mock_trace, + ): + # scalene=True but import will fail in test environment + with KernelProfiler(core_id=0, scalene=True, output_path=output_path): + pass + + assert output_path.exists() + + +# --- Rank filtering --- + + +class TestRankFiltering: + def test_get_current_rank_no_dist(self): + """Without torch.distributed, rank is 0.""" + assert _get_current_rank() == 0 + + def test_non_target_rank_is_noop(self, tmp_path): + """Profiler is a no-op on non-target ranks.""" + output_path = tmp_path / "kp.json" + + with patch( + "nkipy.tools.profiler.kernel_profiler._get_current_rank", return_value=1 + ): + profiler = KernelProfiler( + core_id=0, + scalene=False, + output_path=output_path, + target_ranks=[0], + ) + with profiler: + pass + + # Non-target rank should not write output + assert not output_path.exists() + assert profiler.result is None + + def test_target_rank_profiles_normally(self, tmp_path): + """Profiler works normally on a target rank.""" + output_path = tmp_path / "kp.json" + + mock_trace = MagicMock() + mock_trace.__enter__ = MagicMock(return_value=mock_trace) + mock_trace.__exit__ = MagicMock(return_value=False) + mock_trace.fetch_events_json = MagicMock(return_value='{"events": []}') + + with ( + patch( + "nkipy.tools.profiler.kernel_profiler._get_current_rank", return_value=0 + ), + patch( + "nkipy.tools.profiler.kernel_profiler.SystemTraceSession", + return_value=mock_trace, + ), + ): + profiler = KernelProfiler( + core_id=0, + scalene=False, + output_path=output_path, + target_ranks=[0], + ) + with profiler: + pass + + assert output_path.exists() + assert profiler.result is not None + + def test_multi_rank_output_path(self, tmp_path): + """With multiple target ranks, output includes rank suffix.""" + output_path = tmp_path / "kernel_profile.json" + + mock_trace = MagicMock() + mock_trace.__enter__ = MagicMock(return_value=mock_trace) + mock_trace.__exit__ = MagicMock(return_value=False) + mock_trace.fetch_events_json = MagicMock(return_value='{"events": []}') + + with ( + patch( + "nkipy.tools.profiler.kernel_profiler._get_current_rank", return_value=2 + ), + patch( + "nkipy.tools.profiler.kernel_profiler.SystemTraceSession", + return_value=mock_trace, + ), + ): + with KernelProfiler( + core_id=2, + scalene=False, + output_path=output_path, + target_ranks=[0, 1, 2, 3], + ): + pass + + expected = tmp_path / "kernel_profile_rank2.json" + assert expected.exists() + + def test_no_target_ranks_profiles_all(self, tmp_path): + """target_ranks=None means profile on any rank (default behavior).""" + output_path = tmp_path / "kp.json" + + mock_trace = MagicMock() + mock_trace.__enter__ = MagicMock(return_value=mock_trace) + mock_trace.__exit__ = MagicMock(return_value=False) + mock_trace.fetch_events_json = MagicMock(return_value='{"events": []}') + + with ( + patch( + "nkipy.tools.profiler.kernel_profiler._get_current_rank", return_value=5 + ), + patch( + "nkipy.tools.profiler.kernel_profiler.SystemTraceSession", + return_value=mock_trace, + ), + ): + profiler = KernelProfiler( + core_id=5, + scalene=False, + output_path=output_path, + target_ranks=None, + ) + with profiler: + pass + + assert output_path.exists() + + +# --- CPU sample overlap --- + + +class TestCPUSampleOverlap: + def test_full_overlap(self): + """All samples fall within NC intervals.""" + samples = [10.0, 11.0, 12.0] + # NC interval covers 9-13 seconds absolute + nc_intervals = [(9.0, 13.0)] + pct = _calculate_cpu_sample_overlap( + samples, + nc_intervals, + start_time_absolute=0.0, + start_time_perf=0.0, + ) + assert pct == pytest.approx(100.0) + + def test_no_overlap(self): + """No samples overlap NC intervals.""" + samples = [1.0, 2.0, 3.0] + nc_intervals = [(10.0, 20.0)] + pct = _calculate_cpu_sample_overlap( + samples, + nc_intervals, + start_time_absolute=0.0, + start_time_perf=0.0, + ) + assert pct == pytest.approx(0.0) + + def test_partial_overlap(self): + """Some samples overlap, some don't.""" + samples = [1.0, 5.0, 15.0, 25.0] + nc_intervals = [(4.0, 6.0), (14.0, 16.0)] + pct = _calculate_cpu_sample_overlap( + samples, + nc_intervals, + start_time_absolute=0.0, + start_time_perf=0.0, + ) + assert pct == pytest.approx(50.0) + + def test_perf_to_absolute_conversion(self): + """Verify perf_counter -> absolute time conversion.""" + # perf starts at 100, absolute starts at 1000 + samples = [105.0, 110.0] # absolute: 1005, 1010 + nc_intervals = [(1004.0, 1006.0)] # covers sample at 1005 + pct = _calculate_cpu_sample_overlap( + samples, + nc_intervals, + start_time_absolute=1000.0, + start_time_perf=100.0, + ) + assert pct == pytest.approx(50.0) + + def test_empty_samples(self): + pct = _calculate_cpu_sample_overlap([], [(1.0, 2.0)], 0.0, 0.0) + assert pct == pytest.approx(0.0) + + def test_empty_intervals(self): + pct = _calculate_cpu_sample_overlap([1.0, 2.0], [], 0.0, 0.0) + assert pct == pytest.approx(0.0) + + +# --- merge_profiles --- + + +class TestMergeProfiles: + def _make_scalene_json( + self, path: Path, lines: list[dict], filename: str = "model.py" + ): + """Create a minimal scalene JSON file.""" + data = { + "files": { + filename: { + "lines": lines, + } + } + } + path.write_text(json.dumps(data)) + + def _make_kernel_profile( + self, + path: Path, + calls: list[tuple[str, int, str]], + durations_ms: list[float], + nrt_durations_ms: list[float] | None = None, + wall_start_ns: int = 0, + wall_stop_ns: int = 0, + ): + """Create a kernel profile with matching trace events. + + Args: + nrt_durations_ms: Host-side nrt_execute durations. If None, + defaults to 20% of each nc duration (simulating async exec). + """ + kernel_calls = [ + KernelExecution( + filename=filename, lineno=lineno, kernel_name=name, call_index=i + ) + for i, (filename, lineno, name) in enumerate(calls) + ] + + if nrt_durations_ms is None: + nrt_durations_ms = [d * 0.2 for d in durations_ms] + + # Build synthetic nc_exec_running + nrt_execute events + nc_events = _make_nc_exec_events(durations_ms) + nrt_events = _make_nrt_execute_events(nrt_durations_ms) + events_json = json.dumps({"events": nc_events + nrt_events}) + + result = KernelProfileResult( + kernel_calls=kernel_calls, + events_json=events_json, + wall_start_ns=wall_start_ns, + wall_stop_ns=wall_stop_ns, + ) + result.save(path) + + def test_basic_merge(self, tmp_path): + """Verify merge injects nc_time_ms and nrt_time_ms into scalene JSON.""" + scalene_path = tmp_path / "scalene.json" + kernel_path = tmp_path / "kernel.json" + output_path = tmp_path / "merged.json" + + # Scalene has two lines of interest + self._make_scalene_json( + scalene_path, + [ + {"lineno": 10, "n_cpu_percent_python": 50.0}, + {"lineno": 20, "n_cpu_percent_python": 30.0}, + {"lineno": 30, "n_cpu_percent_python": 20.0}, + ], + ) + + # Two kernel calls at lines 10 and 20 + # nc=5.0,3.0 nrt=1.0,0.6 (default 20%) + self._make_kernel_profile( + kernel_path, + calls=[ + ("model.py", 10, "attention"), + ("model.py", 20, "ffn"), + ], + durations_ms=[5.0, 3.0], + ) + + merge_scalene_and_kernel_profiles(scalene_path, kernel_path, output_path) + + merged = json.loads(output_path.read_text()) + + # Check line 10: nc and nrt are separate + line10 = merged["files"]["model.py"]["lines"][0] + assert line10["nc_time_ms"] == pytest.approx(5.0) + assert line10["nrt_time_ms"] == pytest.approx(1.0) + assert line10["nc_execute_count"] == 1 + assert line10["nc_nrt_ratio"] == pytest.approx(5.0) + + # Check line 20 + line20 = merged["files"]["model.py"]["lines"][1] + assert line20["nc_time_ms"] == pytest.approx(3.0) + assert line20["nrt_time_ms"] == pytest.approx(0.6) + + # Check line 30 has no nc_time_ms + line30 = merged["files"]["model.py"]["lines"][2] + assert "nc_time_ms" not in line30 + + # Check metadata + assert merged["neuron_total_nc_time_ms"] == pytest.approx(8.0) + assert merged["neuron_total_time_ms"] == pytest.approx(1.6) + assert merged["neuron_nc_event_count"] == 2 + assert merged["neuron_event_count"] == 2 + + def test_aggregation_same_line(self, tmp_path): + """Multiple kernel calls on the same line aggregate durations.""" + scalene_path = tmp_path / "scalene.json" + kernel_path = tmp_path / "kernel.json" + output_path = tmp_path / "merged.json" + + self._make_scalene_json( + scalene_path, + [{"lineno": 10, "n_cpu_percent_python": 100.0}], + ) + + # Three kernel calls all on line 10 + self._make_kernel_profile( + kernel_path, + calls=[ + ("model.py", 10, "layer_kernel"), + ("model.py", 10, "layer_kernel"), + ("model.py", 10, "layer_kernel"), + ], + durations_ms=[2.0, 3.0, 1.5], + ) + + merge_scalene_and_kernel_profiles(scalene_path, kernel_path, output_path) + + merged = json.loads(output_path.read_text()) + line10 = merged["files"]["model.py"]["lines"][0] + assert line10["nc_time_ms"] == pytest.approx(6.5) + assert line10["nc_execute_count"] == 3 + assert line10["nc_percent"] == pytest.approx(100.0) + + def test_html_embedded_scalene(self, tmp_path): + """Merge handles scalene HTML output with embedded JSON.""" + scalene_path = tmp_path / "scalene.html" + kernel_path = tmp_path / "kernel.json" + output_path = tmp_path / "merged.json" + + scalene_json = json.dumps( + { + "files": { + "model.py": { + "lines": [{"lineno": 5, "n_cpu_percent_python": 100.0}] + } + } + } + ) + html = f"const profile = {scalene_json};" + scalene_path.write_text(html) + + self._make_kernel_profile( + kernel_path, + calls=[("model.py", 5, "kernel")], + durations_ms=[10.0], + ) + + merge_scalene_and_kernel_profiles(scalene_path, kernel_path, output_path) + + merged = json.loads(output_path.read_text()) + assert merged["files"]["model.py"]["lines"][0]["nc_time_ms"] == 10.0 + + def test_path_matching_by_basename(self, tmp_path): + """Kernel paths with different prefixes match by basename.""" + scalene_path = tmp_path / "scalene.json" + kernel_path = tmp_path / "kernel.json" + output_path = tmp_path / "merged.json" + + # Scalene uses a different absolute path + self._make_scalene_json( + scalene_path, + [{"lineno": 10, "n_cpu_percent_python": 100.0}], + filename="/opt/project/model.py", + ) + + # Kernel profile uses yet another path + self._make_kernel_profile( + kernel_path, + calls=[("/home/user/project/model.py", 10, "kernel")], + durations_ms=[7.0], + ) + + merge_scalene_and_kernel_profiles(scalene_path, kernel_path, output_path) + + merged = json.loads(output_path.read_text()) + line10 = merged["files"]["/opt/project/model.py"]["lines"][0] + assert line10["nc_time_ms"] == 7.0 + + def test_per_kernel_function_entries(self, tmp_path): + """Per-kernel-name aggregation creates function entries with nrt/nc.""" + scalene_path = tmp_path / "scalene.json" + kernel_path = tmp_path / "kernel.json" + output_path = tmp_path / "merged.json" + + self._make_scalene_json( + scalene_path, + [{"lineno": 10}, {"lineno": 20}], + ) + + self._make_kernel_profile( + kernel_path, + calls=[ + ("model.py", 10, "attention"), + ("model.py", 10, "attention"), + ("model.py", 20, "ffn"), + ], + durations_ms=[3.0, 4.0, 5.0], + ) + + merge_scalene_and_kernel_profiles(scalene_path, kernel_path, output_path) + + merged = json.loads(output_path.read_text()) + functions = merged["files"]["model.py"]["functions"] + assert len(functions) == 2 + + func_by_name = {f["line"]: f for f in functions} + assert "attention" in func_by_name + assert "ffn" in func_by_name + + attn = func_by_name["attention"] + assert attn["nc_time_ms"] == pytest.approx(7.0) + assert attn["nc_execute_count"] == 2 + # nrt is 20% of nc by default + assert attn["nrt_time_ms"] == pytest.approx(1.4) + assert "nc_nrt_ratio" in attn + + ffn = func_by_name["ffn"] + assert ffn["nc_time_ms"] == pytest.approx(5.0) + assert ffn["nc_execute_count"] == 1 + + def test_cpu_overlap_injected(self, tmp_path): + """CPU sample overlap is computed and injected into line data.""" + scalene_path = tmp_path / "scalene.json" + kernel_path = tmp_path / "kernel.json" + output_path = tmp_path / "merged.json" + + # Scalene with start times and cpu_samples_list + data = { + "start_time_absolute": 1000.0, + "start_time_perf": 100.0, + "files": { + "model.py": { + "lines": [ + { + "lineno": 10, + "cpu_samples_list": [105.0, 106.0, 107.0, 108.0], + }, + ], + } + }, + } + scalene_path.write_text(json.dumps(data)) + + # NC host intervals will be at ~1s = 1_000_000_000ns + # cpu_samples at absolute 1005, 1006, 1007, 1008 -- these are in seconds + # NC intervals at 1s in ns -- that's 1.0s in seconds + # So samples at 1005s won't overlap NC intervals at 1.0s + # Let's adjust: make NC intervals match sample times + events = [] + # NC interval from 1004.5s to 1006.5s in ns + events.append( + { + "event_type": "nc_exec_running", + "phase": "start", + "tracking_id": 0, + "timestamp_ns": 1_004_500_000_000, # 1004.5s + "data": {"nc_timestamp_ns": 1000}, + } + ) + events.append( + { + "event_type": "nc_exec_running", + "phase": "stop", + "tracking_id": 0, + "timestamp_ns": 1_006_500_000_000, # 1006.5s + "data": {"nc_timestamp_ns": 3_000_000}, # 2ms device time + } + ) + + result = KernelProfileResult( + kernel_calls=[ + KernelExecution("model.py", 10, "kernel", 0), + ], + events_json=json.dumps({"events": events}), + wall_start_ns=1_004_000_000_000, + wall_stop_ns=1_007_000_000_000, + ) + result.save(kernel_path) + + merge_scalene_and_kernel_profiles(scalene_path, kernel_path, output_path) + + merged = json.loads(output_path.read_text()) + line10 = merged["files"]["model.py"]["lines"][0] + # Samples at abs 1005, 1006 overlap NC [1004.5, 1006.5] + # Samples at abs 1007, 1008 don't + assert line10["cpu_samples_nc_overlap_percent"] == pytest.approx(50.0) + + +# --- merge_kernel_only --- + + +class TestMergeKernelOnly: + def _make_kernel_profile( + self, + path: Path, + calls: list[tuple[str, int, str]], + durations_ms: list[float], + nrt_durations_ms: list[float] | None = None, + wall_start_ns: int = 0, + wall_stop_ns: int = 100_000_000, + ): + kernel_calls = [ + KernelExecution( + filename=filename, lineno=lineno, kernel_name=name, call_index=i + ) + for i, (filename, lineno, name) in enumerate(calls) + ] + if nrt_durations_ms is None: + nrt_durations_ms = [d * 0.2 for d in durations_ms] + nc_events = _make_nc_exec_events(durations_ms) + nrt_events = _make_nrt_execute_events(nrt_durations_ms) + result = KernelProfileResult( + kernel_calls=kernel_calls, + events_json=json.dumps({"events": nc_events + nrt_events}), + wall_start_ns=wall_start_ns, + wall_stop_ns=wall_stop_ns, + ) + result.save(path) + + def test_single_rank_kernel_only(self, tmp_path): + """kernel-only merge with a single profile.""" + kp = tmp_path / "kernel.json" + out = tmp_path / "merged.json" + + self._make_kernel_profile( + kp, + calls=[("model.py", 10, "attn"), ("model.py", 20, "ffn")], + durations_ms=[5.0, 3.0], + ) + + merge_kernel_only([kp], out) + + merged = json.loads(out.read_text()) + assert merged["neuron_total_nc_time_ms"] == pytest.approx(8.0) + assert merged["neuron_total_time_ms"] == pytest.approx(1.6) + assert merged["neuron_nc_event_count"] == 2 + assert "model.py" in merged["files"] + + # nrt and nc should be separate + line10 = merged["files"]["model.py"]["lines"][0] + assert line10["nc_time_ms"] == pytest.approx(5.0) + assert line10["nrt_time_ms"] == pytest.approx(1.0) + + def test_multi_rank_kernel_only(self, tmp_path): + """kernel-only merge with multiple rank profiles.""" + kp0 = tmp_path / "kernel_rank0.json" + kp1 = tmp_path / "kernel_rank1.json" + out = tmp_path / "merged.json" + + self._make_kernel_profile( + kp0, + calls=[("model.py", 10, "attn")], + durations_ms=[5.0], + ) + self._make_kernel_profile( + kp1, + calls=[("model.py", 10, "attn")], + durations_ms=[6.0], + ) + + merge_kernel_only([kp0, kp1], out) + + merged = json.loads(out.read_text()) + # Both ranks contribute + assert merged["neuron_total_nc_time_ms"] == pytest.approx(11.0) + assert merged["neuron_nc_event_count"] == 2 + + # Line data accumulates across ranks + line10 = merged["files"]["model.py"]["lines"][0] + assert line10["nc_time_ms"] == pytest.approx(11.0) + assert line10["nc_execute_count"] == 2 + # nrt also accumulates + assert line10["nrt_time_ms"] == pytest.approx(2.2) # 1.0 + 1.2