From 063b14dad4f080566d20f5fe2cdcbfed603fd88e Mon Sep 17 00:00:00 2001 From: Christophe Guillon Date: Fri, 27 Feb 2026 05:20:07 +0000 Subject: [PATCH] cli/explore: add resumable output/run manifest and CSV callback tests Implement resume/append output handling and run manifest metadata for explore CLI. Add CSVCallback unit tests for resume dedup, append behavior, and default overwrite behavior (normal mode). Incorporate PR feedback and keep changes squashed into a single update commit. --- docs/tutorials/cli_tools.md | 14 ++ src/xtc/cli/explore.py | 149 +++++++++++++++++- .../unit/cli/test_explore_csv_callback.py | 120 ++++++++++++++ 3 files changed, 276 insertions(+), 7 deletions(-) create mode 100644 tests/pytest/unit/cli/test_explore_csv_callback.py diff --git a/docs/tutorials/cli_tools.md b/docs/tutorials/cli_tools.md index 8a8309953..bdbb0f46d 100644 --- a/docs/tutorials/cli_tools.md +++ b/docs/tutorials/cli_tools.md @@ -30,6 +30,20 @@ Use exhaustive search on a tiling strategy limited to tile4d + only vectorized t 450/450 [22:30<00:00, 3.00s/it] real 1352.37 +Resume interrupted exploration while keeping reproducibility metadata: + + # Start a run + loop-explore --search random --trials 200 --output results.random.csv + + # Resume the same output (skips already recorded samples) + loop-explore --search random --trials 200 --output results.random.csv --resume + + # Append regardless of duplicates + loop-explore --search random --trials 200 --output results.random.csv --append + +Each run also writes `results.random.csv.meta.json` with the command arguments, +Python/runtime information, and git commit hash to simplify result provenance. + Test a single tiling: # Dumps and execute MLIR tiling diff --git a/src/xtc/cli/explore.py b/src/xtc/cli/explore.py index 88faf6f9b..829893717 100644 --- a/src/xtc/cli/explore.py +++ b/src/xtc/cli/explore.py @@ -22,7 +22,9 @@ import logging import itertools import csv +import json import random +from datetime import datetime, timezone import numpy as np import numpy.typing from tqdm import tqdm @@ -574,17 +576,113 @@ def peak_time(args: NS) -> float: return time +def _args_to_metadata(args: NS) -> dict[str, Any]: + metadata_args: dict[str, Any] = {} + for key, value in vars(args).items(): + if key == "eval_parameters": + continue + if isinstance(value, Path): + metadata_args[key] = str(value) + elif isinstance(value, (str, int, float, bool, list, dict)) or value is None: + metadata_args[key] = value + else: + metadata_args[key] = str(value) + return metadata_args + + +def _git_commit_hash() -> str | None: + try: + proc = subprocess.run( + ["git", "rev-parse", "HEAD"], + check=True, + capture_output=True, + text=True, + ) + return proc.stdout.strip() + except (subprocess.CalledProcessError, FileNotFoundError): + return None + + +def write_run_manifest(args: NS, strategy: Strategy) -> None: + output_path = Path(args.output) + metadata_path = Path(f"{args.output}.meta.json") + payload = { + "generatedAt": datetime.now(timezone.utc).isoformat(), + "output": str(output_path), + "resume": args.resume, + "append": args.append, + "sampleNames": strategy.sample_names, + "gitCommit": _git_commit_hash(), + "python": { + "version": sys.version, + "executable": sys.executable, + }, + "platform": { + "platform": sys.platform, + "cwd": str(Path.cwd()), + }, + "args": _args_to_metadata(args), + } + with open(metadata_path, "w", encoding="utf-8") as metadata_file: + json.dump(payload, metadata_file, indent=2, sort_keys=True) + metadata_file.write("\n") + + class CSVCallback: - def __init__(self, fname: str, peak_time: float, sample_names: list[str]) -> None: + def __init__( + self, + fname: str, + peak_time: float, + sample_names: list[str], + *, + resume: bool = False, + append: bool = False, + ) -> None: self._fname = fname self._peak_time = peak_time - self._outf = open(fname, "w", newline="") self._sample_names = sample_names self._header = sample_names + ["X", "time", "peak", "backend"] + self._results: list[Sequence] = [] + self._rows: list[Sequence] = [] + self._seen_keys: set[tuple[str, tuple[int, ...]]] = set() + self._resume = resume + self._append = append + + out_path = Path(fname) + has_existing_file = out_path.exists() and out_path.stat().st_size > 0 + if resume: + self._load_existing_rows() + mode = "a" + elif append: + mode = "a" + else: + mode = "w" + + self._outf = open(fname, mode, newline="") self._writer = csv.writer(self._outf, delimiter=",") - self._write_header() - self._results = [] - self._rows = [] + + should_write_header = (not has_existing_file) or (mode == "w") + if should_write_header: + self._write_header() + + def _load_existing_rows(self) -> None: + in_path = Path(self._fname) + if not in_path.exists() or in_path.stat().st_size == 0: + return + with open(in_path, newline="") as infile: + reader = csv.DictReader(infile, delimiter=",") + for row in reader: + backend = row.get("backend") + if backend is None: + continue + try: + sample = tuple(int(row[name]) for name in self._sample_names) + except (TypeError, ValueError, KeyError): + continue + self._seen_keys.add((backend, sample)) + + def _sample_key(self, x: Sample, backend: str) -> tuple[str, tuple[int, ...]]: + return backend, tuple(int(v) for v in x) def _write_header(self) -> None: self._writer.writerow(self._header) @@ -594,6 +692,10 @@ def _write_row(self, row: Sequence) -> None: self._rows.append(row) self._writer.writerow(row) self._outf.flush() + try: + os.fsync(self._outf.fileno()) + except OSError: + logger.debug("Unable to fsync output file %s", self._fname) def _write_result(self, result: Sequence) -> None: self._results.append(result) @@ -601,12 +703,17 @@ def _write_result(self, result: Sequence) -> None: if error != 0: logger.debug(f"Skip recording error for: {backend}: {x}") return + key = self._sample_key(x, backend) + if self._resume and key in self._seen_keys: + logger.debug("Skip already recorded sample for resume mode: %s", key) + return peak = self._peak_time / time s = str(x).replace(",", ";") row = [s, time, peak, backend] row = x + row logger.debug(f"Record row: {row}") self._write_row(row) + self._seen_keys.add(key) def __call__(self, result: Sequence) -> None: self._write_result(result) @@ -627,7 +734,13 @@ def search_some(strategy: Strategy, graph: Graph, args: NS): ) ptime = peak_time(args) sample_names = strategy.sample_names - result_callback = CSVCallback(args.output, ptime, sample_names) + result_callback = CSVCallback( + args.output, + ptime, + sample_names, + resume=args.resume, + append=args.append, + ) callbacks = { "result": result_callback, "search": search_callback, @@ -665,6 +778,7 @@ def optimize(args: NS): op_args = (*dims, dtype) graph = OPERATORS[args.operator]["operation"](*op_args, name=args.func_name) strategy = get_strategy(graph, args) + write_run_manifest(args, strategy) if args.test or args.opt_level in [0, 1, 2, 3]: schedule = args.test if not schedule: @@ -679,7 +793,13 @@ def optimize(args: NS): ) ptime = peak_time(args) sample_names = strategy.sample_names - result_callback = CSVCallback(args.output, ptime, sample_names) + result_callback = CSVCallback( + args.output, + ptime, + sample_names, + resume=args.resume, + append=args.append, + ) callbacks = { "result": result_callback, "search": search_callback, @@ -959,6 +1079,18 @@ def main(): parser.add_argument( "--output", type=str, default="results.csv", help="output csv file for search" ) + parser.add_argument( + "--resume", + action=argparse.BooleanOptionalAction, + default=False, + help="resume from an existing output file and skip already recorded samples", + ) + parser.add_argument( + "--append", + action=argparse.BooleanOptionalAction, + default=False, + help="append new results to output file without deduplication", + ) parser.add_argument( "--eval", type=str, choices=["eval"], default="eval", help="evaluation method" ) @@ -1040,6 +1172,9 @@ def main(): ) args = parser.parse_args() + if args.resume and args.append: + parser.error("--resume and --append cannot be used together") + logging.basicConfig() logger.setLevel(logging.INFO) if args.debug: diff --git a/tests/pytest/unit/cli/test_explore_csv_callback.py b/tests/pytest/unit/cli/test_explore_csv_callback.py new file mode 100644 index 000000000..ef914d485 --- /dev/null +++ b/tests/pytest/unit/cli/test_explore_csv_callback.py @@ -0,0 +1,120 @@ +import csv +from pathlib import Path + +from xtc.cli.explore import CSVCallback + + +def _read_rows(path: Path) -> list[list[str]]: + with path.open(newline="") as infile: + return list(csv.reader(infile, delimiter=",")) + + +def test_csv_callback_resume_dedup_skips_existing_and_keeps_new(tmp_path: Path): + output = tmp_path / "results.csv" + sample_names = ["M", "N"] + + # Existing output already has sample [8, 16] for backend mlir. + with output.open("w", newline="") as out: + writer = csv.writer(out, delimiter=",") + writer.writerow(sample_names + ["X", "time", "peak", "backend"]) + writer.writerow([8, 16, "[8; 16]", 0.2, 5.0, "mlir"]) + + cb = CSVCallback( + str(output), + peak_time=1.0, + sample_names=sample_names, + resume=True, + ) + + # Duplicate for same backend must be skipped in resume mode. + cb(([8, 16], 0, 0.2, "mlir")) + # Different backend for same sample must be recorded. + cb(([8, 16], 0, 0.25, "tvm")) + # Different sample must be recorded. + cb(([8, 32], 0, 0.3, "mlir")) + del cb + + rows = _read_rows(output) + + # Header + original row + two new rows. + assert len(rows) == 4 + assert rows[0] == ["M", "N", "X", "time", "peak", "backend"] + + # Check appended rows (order matters). + assert rows[2][0:2] == ["8", "16"] + assert rows[2][-1] == "tvm" + assert rows[3][0:2] == ["8", "32"] + assert rows[3][-1] == "mlir" + + +def test_csv_callback_default_mode_overwrites_existing_file(tmp_path: Path): + output = tmp_path / "results.csv" + sample_names = ["M", "N"] + + with output.open("w", newline="") as out: + writer = csv.writer(out, delimiter=",") + writer.writerow(sample_names + ["X", "time", "peak", "backend"]) + writer.writerow([9, 9, "[9; 9]", 0.9, 1.1, "mlir"]) + + cb = CSVCallback( + str(output), + peak_time=2.0, + sample_names=sample_names, + ) + + # Default mode is neither --resume nor --append, so file is rewritten. + cb(([2, 3], 0, 0.5, "mlir")) + del cb + + rows = _read_rows(output) + assert rows[0] == ["M", "N", "X", "time", "peak", "backend"] + assert len(rows) == 2 + assert rows[1][0:2] == ["2", "3"] + assert rows[1][-1] == "mlir" + + +def test_csv_callback_append_mode_allows_duplicates(tmp_path: Path): + output = tmp_path / "results.csv" + sample_names = ["M", "N"] + + with output.open("w", newline="") as out: + writer = csv.writer(out, delimiter=",") + writer.writerow(sample_names + ["X", "time", "peak", "backend"]) + writer.writerow([4, 4, "[4; 4]", 0.1, 10.0, "mlir"]) + + cb = CSVCallback( + str(output), + peak_time=1.0, + sample_names=sample_names, + append=True, + ) + + # Same sample/backend is allowed in append mode. + cb(([4, 4], 0, 0.12, "mlir")) + del cb + + rows = _read_rows(output) + + # Header + original row + duplicate appended row. + assert len(rows) == 3 + assert rows[1][0:2] == ["4", "4"] + assert rows[2][0:2] == ["4", "4"] + assert rows[1][-1] == "mlir" + assert rows[2][-1] == "mlir" + + +def test_csv_callback_append_writes_header_for_new_file(tmp_path: Path): + output = tmp_path / "results.csv" + + cb = CSVCallback( + str(output), + peak_time=2.0, + sample_names=["M", "N"], + append=True, + ) + cb(([2, 3], 0, 0.5, "mlir")) + del cb + + rows = _read_rows(output) + assert rows[0] == ["M", "N", "X", "time", "peak", "backend"] + assert rows[1][0:2] == ["2", "3"]