diff --git a/examples/merge_root/merge_root.json.tmpl b/examples/merge_root/merge_root.json.tmpl index 7467fbb85..f9c43f090 100644 --- a/examples/merge_root/merge_root.json.tmpl +++ b/examples/merge_root/merge_root.json.tmpl @@ -2,8 +2,9 @@ {% set first_file = input_files['root_files'][0] %} {% set file_dir = first_file | dirname %} {% set run_name = file_dir.split('/')[-1] %} - "job_id": {{ job_id }}, - "description": "Merge ROOT files for {{ run_name }} (job {{ job_id }})", + {% set mass = file_dir.split('/')[-2] %} + "job_id": {{ job_id + 1}}, + "description": "Merge ROOT files for {{ mass }}/{{ run_name }} (job {{ job_id }})", "input_files": { {% for file in input_files['root_files'] -%} "{{ file }}": "{{ file | basename }}"{% if not loop.last %},{% endif %} @@ -11,13 +12,16 @@ {% endfor -%} }, "output_files": { - "merged_{{ run_name }}_job{{ job_id }}.root": "merged_{{ run_name }}_job{{ job_id }}.root" + "merged_{{ run_name }}_job{{ job_id }}.root": "merged_{{ run_name }}_job{{ job_id }}.root", + "merged_{{ run_name }}_job{{ job_id }}_stats.json": "merged_{{ run_name }}_job{{ job_id }}_stats.json" }, - "output_dir": "output/{{ run_name }}", + "output_dir": "output/", + "mass": "{{ mass }}", "run_name": "{{ run_name }}", "year": 2021, "num_input_files": {{ input_files['root_files'] | length }}, "force": true, "compression": 6, - "validate": true + "validate": true, + "write_stats": true } diff --git a/examples/merge_root/merge_root_example.sh b/examples/merge_root/merge_root_example.sh index e8fd42fed..f1c48667c 100644 --- a/examples/merge_root/merge_root_example.sh +++ b/examples/merge_root/merge_root_example.sh @@ -8,8 +8,9 @@ # -n: Maximum number of files per merge job (default: 20) # -f: File pattern to match (default: *.root) # -r: Run directory pattern (default: hps_*) +# -F: Path filter - only include files whose full path contains this string -PARENT_DIR="/sdf/data/hps/physics2021/data/recon/pass4_v8/" +PARENT_DIR="" OUTPUT_PREFIX="merge_jobs" MAX_FILES=50 @@ -17,8 +18,10 @@ hps-mc-prepare-merge-jobs \ $PARENT_DIR \ -o $OUTPUT_PREFIX \ -n $MAX_FILES \ - -f *v0skim*root \ - --single-list + -F reprocess +# -r ap* \ +# -F "pass5_v9" \ +# --single-list # This creates: # - merge_jobs_input_files.txt (single consolidated file list) @@ -37,4 +40,6 @@ for batch_file in ${OUTPUT_PREFIX}_batch*_files.txt; do ${OUTPUT_PREFIX}_batch${batch_num}_jobs.json done +#signal.tmpl \ + cat ${OUTPUT_PREFIX}_batch*_jobs.json | jq -s 'add' > ${OUTPUT_PREFIX}_jobs.json diff --git a/examples/merge_root/run.sh b/examples/merge_root/run.sh index 8ef141aa7..73a8e8365 100755 --- a/examples/merge_root/run.sh +++ b/examples/merge_root/run.sh @@ -3,5 +3,12 @@ #SBATCH --time=24:00:00 #SBATCH --partition=shared #SBATCH --job-name=examples +#hps-mc-job run -d $PWD/scratch root_merge job.json -hps-mc-job run -d $PWD/scratch root_merge job.json +timestamp=`date +"%Y-%m-%d_%H-%M-%S"` + +hps-mc-batch slurm -o -A hps:hps-prod -m 8000 -q milano -W 6 \ + -E /sdf/data/hps/users/mgignac/software/dev/hps-mc/examples/merge_root/setup_env.sh \ + -d /sdf/scratch/hps/mgignac/merged/run/$timestamp \ + -l /sdf/scratch/hps/mgignac/merged/logs/$timestamp/ \ + root_merge merge_jobs_jobs.json diff --git a/examples/merge_root/signal.tmpl b/examples/merge_root/signal.tmpl new file mode 100644 index 000000000..9d5f58bc6 --- /dev/null +++ b/examples/merge_root/signal.tmpl @@ -0,0 +1,26 @@ +{ + {% set first_file = input_files['root_files'][0] %} + {% set file_dir = first_file | dirname %} + {% set run_name = file_dir.split('/')[-1] %} + {% set mass = file_dir.split('/')[-2] %} + "job_id": {{ job_id }}, + "description": "Merge ROOT files for {{ mass }}/{{ run_name }} (job {{ job_id }})", + "input_files": { + {% for file in input_files['root_files'] -%} + "{{ file }}": "{{ file | basename }}"{% if not loop.last %},{% endif %} + + {% endfor -%} + }, + "output_files": { + "merged_{{ run_name }}_job{{ job_id }}.root": "merged_{{ run_name }}_job{{ job_id }}.root", + "merged_{{ run_name }}_job{{ job_id }}_stats.json": "merged_{{ run_name }}_job{{ job_id }}_stats.json" + }, + "output_dir": "/sdf/data/hps/physics2021/mc/hpstr/simp_pulser_new/pass5_v9/{{ mass }}/{{ run_name }}-merged", + "mass": "{{ mass }}", + "run_name": "{{ run_name }}", + "year": 2021, + "num_input_files": {{ input_files['root_files'] | length }}, + "force": true, + "compression": 6, + "validate": true +} diff --git a/python/hpsmc/collect_merge_stats.py b/python/hpsmc/collect_merge_stats.py new file mode 100644 index 000000000..5bd77b214 --- /dev/null +++ b/python/hpsmc/collect_merge_stats.py @@ -0,0 +1,283 @@ +#!/usr/bin/env python3 +""" +Collector script for MergeROOT statistics files. + +This script aggregates merge statistics JSON files from multiple jobs +and produces a summary report. + +Usage: + hps-mc-collect-merge-stats [-o output.json] [-p pattern] [-q] +""" + +import argparse +import glob +import json +import os +import sys + + +class MergeStatsCollector: + """ + Collects and aggregates MergeROOT statistics from JSON files. + """ + + def __init__(self, search_dir, pattern="**/*_stats.json"): + """ + Initialize the collector. + + Parameters + ---------- + search_dir : str + Directory to search for stats files + pattern : str + Glob pattern for finding stats files (default: **/*_stats.json) + """ + self.search_dir = search_dir + self.pattern = pattern + self.stats_files = [] + self.job_stats = [] + self.summary = {} + + def find_stats_files(self): + """ + Find all stats JSON files matching the pattern. + + Returns + ------- + list + List of paths to stats files found + """ + search_pattern = os.path.join(self.search_dir, self.pattern) + self.stats_files = sorted(glob.glob(search_pattern, recursive=True)) + return self.stats_files + + def collect(self): + """ + Collect and aggregate statistics from all found files. + + Returns + ------- + dict + Aggregated statistics dictionary + """ + if not self.stats_files: + self.find_stats_files() + + self.job_stats = [] + successful_jobs = 0 + failed_jobs = 0 + total_input_files = 0 + tree_totals = {} + + for stats_file in self.stats_files: + try: + with open(stats_file, 'r') as f: + stats = json.load(f) + + self.job_stats.append({ + "file": stats_file, + "stats": stats + }) + + # Track validation results + if stats.get("validation_passed", False): + successful_jobs += 1 + else: + failed_jobs += 1 + + # Count input files + total_input_files += stats.get("num_input_files", 0) + + # Aggregate tree totals (use output events as source of truth) + output_events = stats.get("output_events", {}) + for tree_name, count in output_events.items(): + if tree_name not in tree_totals: + tree_totals[tree_name] = {"input": 0, "output": 0} + tree_totals[tree_name]["output"] += count + + # Sum up input events + total_input_events = stats.get("total_input_events", {}) + for tree_name, count in total_input_events.items(): + if tree_name not in tree_totals: + tree_totals[tree_name] = {"input": 0, "output": 0} + tree_totals[tree_name]["input"] += count + + except (json.JSONDecodeError, IOError) as e: + print("WARNING: Could not read stats file %s: %s" % (stats_file, e), + file=sys.stderr) + failed_jobs += 1 + continue + + # Build summary + self.summary = { + "summary": { + "total_jobs": len(self.job_stats), + "successful_jobs": successful_jobs, + "failed_jobs": failed_jobs, + "total_input_files": total_input_files, + "all_validations_passed": failed_jobs == 0 and len(self.job_stats) > 0 + }, + "tree_totals": tree_totals, + "jobs": [js["stats"] for js in self.job_stats] + } + + return self.summary + + def write_summary(self, output_file): + """ + Write the summary to a JSON file. + + Parameters + ---------- + output_file : str + Path to output JSON file + """ + if not self.summary: + self.collect() + + with open(output_file, 'w') as f: + json.dump(self.summary, f, indent=2) + + def print_report(self, quiet=False): + """ + Print a human-readable summary report. + + Parameters + ---------- + quiet : bool + If True, only print summary line + """ + if not self.summary: + self.collect() + + s = self.summary["summary"] + + if quiet: + status = "PASS" if s["all_validations_passed"] else "FAIL" + print("%s: %d jobs, %d input files" % ( + status, s["total_jobs"], s["total_input_files"])) + return + + print() + print("=" * 70) + print("MergeROOT Statistics Summary") + print("=" * 70) + print() + print("Search directory: %s" % self.search_dir) + print("Stats files found: %d" % len(self.stats_files)) + print() + print("-" * 70) + print("Job Summary") + print("-" * 70) + print(" Total jobs: %d" % s["total_jobs"]) + print(" Successful jobs: %d" % s["successful_jobs"]) + print(" Failed jobs: %d" % s["failed_jobs"]) + print(" Total input files: %d" % s["total_input_files"]) + print() + + # Tree totals + tree_totals = self.summary.get("tree_totals", {}) + if tree_totals: + print("-" * 70) + print("Event Counts by Tree") + print("-" * 70) + print("%-30s %15s %15s %10s" % ("Tree Name", "Input Events", "Output Events", "Status")) + print("-" * 70) + + for tree_name, counts in sorted(tree_totals.items()): + input_count = counts["input"] + output_count = counts["output"] + if input_count == output_count: + status = "PASS" + else: + status = "FAIL" + print("%-30s %15d %15d %10s" % (tree_name, input_count, output_count, status)) + + print("-" * 70) + print() + + # Final status + if s["all_validations_passed"]: + print("OVERALL STATUS: PASS") + else: + print("OVERALL STATUS: FAIL") + + print("=" * 70) + print() + + +def main(): + """Main entry point for the collector script.""" + parser = argparse.ArgumentParser( + description="Collect and aggregate MergeROOT statistics from JSON files.", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Collect stats from current directory + hps-mc-collect-merge-stats . + + # Collect stats and write summary to file + hps-mc-collect-merge-stats /path/to/output -o summary.json + + # Use custom pattern + hps-mc-collect-merge-stats . -p "**/merge_*_stats.json" + + # Quiet mode (single line output) + hps-mc-collect-merge-stats . -q +""" + ) + + parser.add_argument( + "directory", + help="Directory to search for stats files" + ) + parser.add_argument( + "-o", "--output", + help="Output JSON file for aggregated summary" + ) + parser.add_argument( + "-p", "--pattern", + default="**/*_stats.json", + help="Glob pattern for stats files (default: **/*_stats.json)" + ) + parser.add_argument( + "-q", "--quiet", + action="store_true", + help="Quiet mode - only print summary line" + ) + + args = parser.parse_args() + + # Validate directory + if not os.path.isdir(args.directory): + print("ERROR: Directory not found: %s" % args.directory, file=sys.stderr) + sys.exit(1) + + # Create collector and run + collector = MergeStatsCollector(args.directory, args.pattern) + stats_files = collector.find_stats_files() + + if not stats_files: + print("WARNING: No stats files found matching pattern '%s' in %s" % ( + args.pattern, args.directory), file=sys.stderr) + sys.exit(0) + + # Collect and report + collector.collect() + collector.print_report(quiet=args.quiet) + + # Write output file if requested + if args.output: + collector.write_summary(args.output) + if not args.quiet: + print("Summary written to: %s" % args.output) + + # Exit with appropriate code + if collector.summary["summary"]["all_validations_passed"]: + sys.exit(0) + else: + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/python/hpsmc/job.py b/python/hpsmc/job.py index 9862db105..169b9c535 100644 --- a/python/hpsmc/job.py +++ b/python/hpsmc/job.py @@ -322,7 +322,7 @@ def parse_args(self): if cl.params: self.param_file = os.path.abspath(cl.params) params = {} - if cl.job_id: + if cl.job_id is not None: # Load data from a job store containing multiple jobs. self.job_id = cl.job_id logger.debug("Loading job with ID %d from job store '%s'" % (self.job_id, self.param_file)) diff --git a/python/hpsmc/prepare_merge_jobs.py b/python/hpsmc/prepare_merge_jobs.py index 9f08051d5..a668cbe9f 100644 --- a/python/hpsmc/prepare_merge_jobs.py +++ b/python/hpsmc/prepare_merge_jobs.py @@ -18,7 +18,7 @@ class MergeJobPreparation: """! Prepare merge jobs by scanning directories for ROOT files.""" def __init__(self, parent_dir, output_prefix="merge_jobs", max_files_per_job=20, - file_pattern="*.root", run_pattern="hps_*"): + file_pattern="*.root", run_pattern="hps_*", max_depth=3, path_filter=None): """! Initialize the merge job preparation. @param parent_dir Parent directory containing run subdirectories @@ -26,19 +26,49 @@ def __init__(self, parent_dir, output_prefix="merge_jobs", max_files_per_job=20, @param max_files_per_job Maximum number of ROOT files per merge job @param file_pattern Glob pattern for files to merge (default: *.root) @param run_pattern Glob pattern for run directories (default: hps_*) + @param max_depth Maximum depth to search for ROOT files if not found at top level (default: 3) + @param path_filter String that must appear somewhere in the full file path (default: None) """ self.parent_dir = Path(parent_dir).resolve() self.output_prefix = output_prefix self.max_files_per_job = max_files_per_job self.file_pattern = file_pattern self.run_pattern = run_pattern + self.max_depth = max_depth + self.path_filter = path_filter if not self.parent_dir.is_dir(): raise ValueError(f"Parent directory does not exist: {self.parent_dir}") + def _find_files_recursive(self, directory, current_depth=0): + """! Recursively search for files matching pattern up to max_depth. + + @param directory Directory to search in + @param current_depth Current recursion depth + @return List of file paths found + """ + # First, check for files at current level + root_files = sorted(directory.glob(self.file_pattern)) + if root_files: + return root_files + + # If no files found and we haven't exceeded max depth, search subdirectories + if current_depth < self.max_depth: + all_files = [] + for subdir in sorted(directory.iterdir()): + if subdir.is_dir(): + files = self._find_files_recursive(subdir, current_depth + 1) + all_files.extend(files) + return all_files + + return [] + def scan_directories(self): """! Scan parent directory for run directories and ROOT files. + If no ROOT files are found directly in a run directory, searches + recursively up to max_depth levels deep. + @return Dictionary mapping run names to lists of ROOT file paths """ run_files = {} @@ -47,10 +77,26 @@ def scan_directories(self): run_dirs = sorted(self.parent_dir.glob(self.run_pattern)) if not run_dirs: - print(f"Warning: No directories matching '{self.run_pattern}' found in {self.parent_dir}") - return run_files + # If no directories match the pattern, try scanning deeper + print(f"No directories matching '{self.run_pattern}' found in {self.parent_dir}") + print(f"Searching recursively up to {self.max_depth} levels deep...") - print(f"Found {len(run_dirs)} run directories") + # Search the parent directory itself + root_files = self._find_files_recursive(self.parent_dir, current_depth=0) + if root_files: + # Group files by their parent directory name + files_by_parent = {} + for f in root_files: + parent_name = f.parent.name + if parent_name not in files_by_parent: + files_by_parent[parent_name] = [] + files_by_parent[parent_name].append(str(f)) + + for parent_name, files in sorted(files_by_parent.items()): + run_files[parent_name] = sorted(files) + print(f" {parent_name}: {len(files)} files") + else: + print(f"Found {len(run_dirs)} run directories") # Scan each run directory for ROOT files for run_dir in run_dirs: @@ -58,11 +104,32 @@ def scan_directories(self): continue run_name = run_dir.name + + # First try direct glob, then recursive search if needed root_files = sorted(run_dir.glob(self.file_pattern)) + if not root_files: + # Search deeper + root_files = self._find_files_recursive(run_dir, current_depth=0) + if root_files: + print(f" {run_name}: {len(root_files)} files (found in subdirectories)") + else: + print(f" {run_name}: {len(root_files)} files") + if root_files: run_files[run_name] = [str(f) for f in root_files] - print(f" {run_name}: {len(root_files)} files") + + # Apply path filter if specified + if self.path_filter: + filtered_run_files = {} + total_before = sum(len(files) for files in run_files.values()) + for run_name, files in run_files.items(): + filtered = [f for f in files if self.path_filter in f] + if filtered: + filtered_run_files[run_name] = filtered + total_after = sum(len(files) for files in filtered_run_files.values()) + print(f"\nPath filter '{self.path_filter}': {total_before} -> {total_after} files") + run_files = filtered_run_files return run_files @@ -195,6 +262,9 @@ def run(self, write_vars=True, write_metadata=True, separate_lists=True): print(f"Run pattern: {self.run_pattern}") print(f"File pattern: {self.file_pattern}") print(f"Max files per job: {self.max_files_per_job}") + print(f"Max search depth: {self.max_depth}") + if self.path_filter: + print(f"Path filter: {self.path_filter}") print() # Scan directories @@ -255,6 +325,15 @@ def main(): # Custom file and directory patterns %(prog)s /path/to/runs -f "*_recon.root" -r "run_*" + # Search deeper for nested ROOT files (up to 5 levels) + %(prog)s /path/to/runs -d 5 + + # Search with wildcard pattern when files are in subdirectories + %(prog)s /path/to/runs -r "ap*" -d 3 + + # Filter files by path substring (e.g., only include files with "pass5" in path) + %(prog)s /path/to/runs -F "pass5" + # Skip generating vars file (only create input file list) %(prog)s /path/to/runs --no-vars """ @@ -290,6 +369,19 @@ def main(): help='Glob pattern for run directories (default: hps_*)' ) + parser.add_argument( + '-d', '--max-depth', + type=int, + default=3, + help='Maximum depth to search for ROOT files if not found at top level (default: 3)' + ) + + parser.add_argument( + '-F', '--path-filter', + default=None, + help='Only include files whose full path contains this string' + ) + parser.add_argument( '--no-vars', action='store_true', @@ -316,7 +408,9 @@ def main(): output_prefix=args.output_prefix, max_files_per_job=args.max_files, file_pattern=args.file_pattern, - run_pattern=args.run_pattern + run_pattern=args.run_pattern, + max_depth=args.max_depth, + path_filter=args.path_filter ) result = prep.run( diff --git a/python/hpsmc/tools.py b/python/hpsmc/tools.py index 9ffc88d1b..7c35a12ea 100644 --- a/python/hpsmc/tools.py +++ b/python/hpsmc/tools.py @@ -1,5 +1,6 @@ """! Tools that can be used in HPSMC jobs.""" +import json import os import gzip import shutil @@ -1697,6 +1698,10 @@ def __init__(self, **kwargs): Compression level for output file (0-9, default: None uses hadd default) validate : bool, optional Validate event counts after merge (default: True) + write_stats : bool, optional + Write JSON stats file after merge (default: True when validate=True) + job_id : int, optional + Job ID to include in stats output """ Component.__init__(self, **kwargs) @@ -1716,10 +1721,21 @@ def __init__(self, **kwargs): if not hasattr(self, "validate"): self.validate = True + # Write stats JSON (default: True when validate=True) + if not hasattr(self, "write_stats"): + self.write_stats = self.validate + + # Optional job ID for stats output + if not hasattr(self, "job_id"): + self.job_id = None + # Store event counts self.input_tree_counts = {} self.output_tree_counts = {} + # Track validation result + self._validation_passed = None + def cmd_args(self): """ Build command line arguments for hadd. @@ -1729,6 +1745,13 @@ def cmd_args(self): list List of command arguments """ + import sys + sys.stderr.write("MergeROOT DEBUG: cmd_args() called\n") + sys.stderr.write(" self.force=%s, self.compression=%s\n" % (self.force, self.compression)) + sys.stderr.write(" self.inputs=%s\n" % self.inputs) + sys.stderr.write(" self.outputs=%s\n" % self.outputs) + sys.stderr.flush() + args = [] # Add force flag if enabled @@ -1743,17 +1766,23 @@ def cmd_args(self): if self.outputs and len(self.outputs) > 0: args.append(self.outputs[0]) else: + sys.stderr.write("MergeROOT DEBUG: ERROR - No output file specified!\n") + sys.stderr.flush() raise RuntimeError("MergeROOT: No output file specified") # Add input files if self.inputs and len(self.inputs) > 0: args.extend(self.inputs) else: + sys.stderr.write("MergeROOT DEBUG: ERROR - No input files specified!\n") + sys.stderr.flush() raise RuntimeError("MergeROOT: No input files specified") + sys.stderr.write("MergeROOT DEBUG: cmd_args() returning: %s\n" % args) + sys.stderr.flush() return args - def scan_root_file(self, filename): + def scan_root_file(self, filename, log_out=None): """ Scan a ROOT file and extract TTree event counts. @@ -1761,6 +1790,8 @@ def scan_root_file(self, filename): ---------- filename : str Path to ROOT file + log_out : file, optional + Log file for output (used to report multiple key cycles) Returns ------- @@ -1775,6 +1806,7 @@ def scan_root_file(self, filename): ) tree_counts = {} + tree_cycles = {} # Track cycle numbers: {tree_name: [(cycle, entries), ...]} # Open ROOT file root_file = ROOT.TFile.Open(filename, "READ") @@ -1788,11 +1820,30 @@ def scan_root_file(self, filename): # Check if it's a TTree if obj.InheritsFrom("TTree"): tree_name = obj.GetName() + cycle = key.GetCycle() num_entries = obj.GetEntries() - tree_counts[tree_name] = num_entries + + if tree_name not in tree_cycles: + tree_cycles[tree_name] = [] + tree_cycles[tree_name].append((cycle, num_entries)) root_file.Close() + # Process collected cycles - use highest cycle number for each tree + for tree_name, cycles in tree_cycles.items(): + if len(cycles) > 1: + # Sort by cycle number (highest first) + cycles.sort(key=lambda x: x[0], reverse=True) + highest_cycle, highest_entries = cycles[0] + if log_out: + log_out.write(" WARNING: Multiple key cycles found for tree '%s':\n" % tree_name) + for cyc, ent in cycles: + marker = " <-- using" if cyc == highest_cycle else "" + log_out.write(" Cycle %d: %d entries%s\n" % (cyc, ent, marker)) + tree_counts[tree_name] = highest_entries + else: + tree_counts[tree_name] = cycles[0][1] + return tree_counts def scan_input_files(self, log_out): @@ -1813,7 +1864,7 @@ def scan_input_files(self, log_out): raise RuntimeError("MergeROOT: Input file not found: %s" % input_file) log_out.write("\nScanning: %s\n" % input_file) - tree_counts = self.scan_root_file(input_file) + tree_counts = self.scan_root_file(input_file, log_out) if not tree_counts: log_out.write(" WARNING: No TTrees found in this file\n") @@ -1842,7 +1893,7 @@ def scan_output_file(self, log_out): log_out.write("=" * 70 + "\n") log_out.write("\nScanning: %s\n" % output_file) - self.output_tree_counts = self.scan_root_file(output_file) + self.output_tree_counts = self.scan_root_file(output_file, log_out) if not self.output_tree_counts: log_out.write(" WARNING: No TTrees found in output file\n") @@ -1965,6 +2016,77 @@ def print_summary(self, log_out): log_out.write("=" * 70 + "\n") log_out.flush() + def get_stats_filename(self): + """ + Get the stats JSON filename based on the output ROOT filename. + + Returns + ------- + str + Path to stats JSON file (e.g., 'merged_X_job1.root' -> 'merged_X_job1_stats.json') + """ + if not self.outputs or len(self.outputs) == 0: + return None + output_file = self.outputs[0] + base, _ = os.path.splitext(output_file) + return base + "_stats.json" + + def write_stats_json(self, log_out, validation_passed): + """ + Write merge statistics to a JSON file. + + Parameters + ---------- + log_out : file + Log file for output + validation_passed : bool + Whether the validation passed + """ + stats_file = self.get_stats_filename() + if stats_file is None: + log_out.write("WARNING: Cannot determine stats filename, skipping stats output\n") + return + + log_out.write("\n" + "=" * 70 + "\n") + log_out.write("MergeROOT: Writing stats to %s\n" % stats_file) + log_out.write("=" * 70 + "\n") + + # Calculate total input events per tree + total_input_events = {} + for input_file, tree_counts in self.input_tree_counts.items(): + for tree_name, count in tree_counts.items(): + if tree_name not in total_input_events: + total_input_events[tree_name] = 0 + total_input_events[tree_name] += count + + # Build input files list with event counts + input_files_list = [] + for input_file in self.inputs: + tree_counts = self.input_tree_counts.get(input_file, {}) + input_files_list.append({ + "path": input_file, + "events": tree_counts + }) + + # Build stats dictionary + stats = { + "job_id": self.job_id, + "output_file": self.outputs[0] if self.outputs else None, + "output_events": self.output_tree_counts, + "input_files": input_files_list, + "total_input_events": total_input_events, + "validation_passed": validation_passed, + "num_input_files": len(self.inputs) + } + + # Write JSON file + with open(stats_file, 'w') as f: + json.dump(stats, f, indent=2) + + log_out.write("Stats written successfully\n") + log_out.write("=" * 70 + "\n") + log_out.flush() + def execute(self, log_out, log_err): """ Execute MergeROOT component using hadd. @@ -1981,26 +2103,58 @@ def execute(self, log_out, log_err): int Return code from hadd command """ + # Debug: Entry point + log_out.write("\n" + "=" * 70 + "\n") + log_out.write("MergeROOT: DEBUG - Entering execute()\n") + log_out.write("=" * 70 + "\n") + log_out.write("DEBUG: self.command = %s\n" % self.command) + log_out.write("DEBUG: self.inputs = %s\n" % self.inputs) + log_out.write("DEBUG: self.outputs = %s\n" % self.outputs) + log_out.write("DEBUG: self.force = %s\n" % self.force) + log_out.write("DEBUG: self.compression = %s\n" % self.compression) + log_out.write("DEBUG: self.validate = %s\n" % self.validate) + log_out.flush() + # Check that hadd command exists + log_out.write("\nDEBUG: Checking if hadd command exists...\n") + log_out.flush() if not self.cmd_exists(): raise RuntimeError("MergeROOT: hadd command not found in PATH") + log_out.write("DEBUG: hadd command found\n") + log_out.flush() # Check that input files exist + log_out.write("\nDEBUG: Checking input files exist...\n") + log_out.flush() for input_file in self.inputs: + log_out.write("DEBUG: Checking: %s\n" % input_file) + log_out.flush() if not os.path.exists(input_file): raise RuntimeError("MergeROOT: Input file not found: %s" % input_file) + log_out.write("DEBUG: -> exists (size: %d bytes)\n" % os.path.getsize(input_file)) + log_out.flush() # Scan input files before merge if validation is enabled + log_out.write("\nDEBUG: Validation enabled = %s\n" % self.validate) + log_out.flush() if self.validate: try: + log_out.write("DEBUG: Starting input file scan...\n") + log_out.flush() self.scan_input_files(log_out) + log_out.write("DEBUG: Input file scan complete\n") + log_out.flush() except Exception as e: log_out.write("\nWARNING: Could not scan input files: %s\n" % str(e)) log_out.write("Proceeding with merge without validation.\n") self.validate = False # Build full command + log_out.write("\nDEBUG: Building command arguments...\n") + log_out.flush() cmd = [self.command] + self.cmd_args() + log_out.write("DEBUG: cmd_args() returned: %s\n" % self.cmd_args()) + log_out.flush() # Log the command log_out.write("\n" + "=" * 70 + "\n") @@ -2011,39 +2165,81 @@ def execute(self, log_out, log_err): log_out.flush() # Execute hadd + log_out.write("DEBUG: About to call subprocess.Popen...\n") + log_out.flush() proc = subprocess.Popen(cmd, stdout=log_out, stderr=log_err) + log_out.write("DEBUG: Popen returned, PID = %s\n" % proc.pid) + log_out.flush() + log_out.write("DEBUG: Waiting for process to complete...\n") + log_out.flush() proc.wait() + log_out.write("DEBUG: Process completed, returncode = %d\n" % proc.returncode) + log_out.flush() # Check return code if proc.returncode != 0: + log_out.write("DEBUG: hadd FAILED with return code %d\n" % proc.returncode) + log_out.flush() raise RuntimeError( "MergeROOT: hadd failed with return code %d" % proc.returncode ) # Verify output file was created + log_out.write("DEBUG: Checking if output file exists: %s\n" % self.outputs[0]) + log_out.flush() if not os.path.exists(self.outputs[0]): raise RuntimeError( "MergeROOT: Output file was not created: %s" % self.outputs[0] ) + log_out.write("DEBUG: Output file exists, size = %d bytes\n" % os.path.getsize(self.outputs[0])) + log_out.flush() log_out.write("\n✓ hadd completed successfully\n") + log_out.flush() # Scan output file and validate if enabled + log_out.write("\nDEBUG: Post-merge validation check, self.validate = %s\n" % self.validate) + log_out.flush() + validation_passed = True if self.validate: try: + log_out.write("DEBUG: Starting output file scan...\n") + log_out.flush() self.scan_output_file(log_out) + log_out.write("DEBUG: Output file scan complete\n") + log_out.flush() + log_out.write("DEBUG: Starting merge validation...\n") + log_out.flush() validation_passed = self.validate_merge(log_out) + self._validation_passed = validation_passed + log_out.write("DEBUG: Merge validation complete, passed = %s\n" % validation_passed) + log_out.flush() if not validation_passed: raise RuntimeError("MergeROOT: Event count validation failed!") except Exception as e: log_out.write("\nERROR during validation: %s\n" % str(e)) + log_out.flush() raise + # Write stats JSON if enabled + log_out.write("\nDEBUG: write_stats = %s\n" % self.write_stats) + log_out.flush() + if self.write_stats: + try: + self.write_stats_json(log_out, validation_passed) + except Exception as e: + log_out.write("\nWARNING: Could not write stats JSON: %s\n" % str(e)) + log_out.flush() + # Print summary + log_out.write("\nDEBUG: Printing summary...\n") + log_out.flush() self.print_summary(log_out) + log_out.write("\nDEBUG: MergeROOT.execute() returning %d\n" % proc.returncode) + log_out.flush() return proc.returncode def output_files(self): @@ -2053,9 +2249,14 @@ def output_files(self): Returns ------- list - List containing the merged output ROOT file - """ - return self.outputs + List containing the merged output ROOT file and optionally the stats JSON + """ + files = list(self.outputs) if self.outputs else [] + if self.write_stats: + stats_file = self.get_stats_filename() + if stats_file and stats_file not in files: + files.append(stats_file) + return files def required_config(self): """ diff --git a/python/jobs/data_cnv_job.py b/python/jobs/data_cnv_job.py index d8c9368bc..4d16dd434 100644 --- a/python/jobs/data_cnv_job.py +++ b/python/jobs/data_cnv_job.py @@ -3,12 +3,13 @@ Convert EVIO to LCIO and then process with HPSTR to produce a recon tuple. """ + from hpsmc.tools import EvioToLcio, HPSTR -job.description = 'EVIO converter' +job.description = "EVIO converter" -cnv = EvioToLcio(steering='recon') +cnv = EvioToLcio(steering="recon") -tuple = HPSTR(run_mode=1, cfg='recon') +# tuple = HPSTR(run_mode=1, cfg='recon') -job.add([cnv, tuple]) +job.add([cnv]) diff --git a/python/jobs/root_merge_job.py b/python/jobs/root_merge_job.py index 4222ce006..6c51878b4 100644 --- a/python/jobs/root_merge_job.py +++ b/python/jobs/root_merge_job.py @@ -12,12 +12,14 @@ "input3.root": "/path/to/input3.root" }, "output_files": { - "merged.root": "merged_output.root" + "merged.root": "merged_output.root", + "merged_stats.json": "merged_stats.json" }, "output_dir": "output", "force": true, "compression": 6, - "validate": true + "validate": true, + "write_stats": true } """ @@ -33,14 +35,21 @@ # The keys of input_files dict are the local file names input_list = list(job.input_files.keys()) -# Get the output file name -# The key of output_files dict is the source (local name) -output_file = list(job.output_files.keys())[0] +# Get the output file name (first .root file in output_files) +output_file = None +for key in job.output_files.keys(): + if key.endswith('.root'): + output_file = key + break +if output_file is None: + output_file = list(job.output_files.keys())[0] # Set up optional parameters with defaults force_overwrite = job.params.get('force', True) compression_level = job.params.get('compression', None) validate_merge = job.params.get('validate', True) +write_stats = job.params.get('write_stats', True) +job_id = job.params.get('job_id', None) # Create the MergeROOT component merge = MergeROOT( @@ -49,7 +58,9 @@ outputs=[output_file], force=force_overwrite, compression=compression_level, - validate=validate_merge + validate=validate_merge, + write_stats=write_stats, + job_id=job_id ) # Add component to the job diff --git a/scripts/hps-mc-collect-merge-stats.in b/scripts/hps-mc-collect-merge-stats.in new file mode 100644 index 000000000..c85454530 --- /dev/null +++ b/scripts/hps-mc-collect-merge-stats.in @@ -0,0 +1,3 @@ +#!/usr/bin/env bash + +@PYTHON@ @CMAKE_INSTALL_PREFIX@/lib/python/hpsmc/collect_merge_stats.py $@