Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions examples/merge_root/merge_root.json.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,26 @@
{% set first_file = input_files['root_files'][0] %}
{% set file_dir = first_file | dirname %}
{% set run_name = file_dir.split('/')[-1] %}
"job_id": {{ job_id }},
"description": "Merge ROOT files for {{ run_name }} (job {{ job_id }})",
{% set mass = file_dir.split('/')[-2] %}
"job_id": {{ job_id + 1}},
"description": "Merge ROOT files for {{ mass }}/{{ run_name }} (job {{ job_id }})",
"input_files": {
{% for file in input_files['root_files'] -%}
"{{ file }}": "{{ file | basename }}"{% if not loop.last %},{% endif %}

{% endfor -%}
},
"output_files": {
"merged_{{ run_name }}_job{{ job_id }}.root": "merged_{{ run_name }}_job{{ job_id }}.root"
"merged_{{ run_name }}_job{{ job_id }}.root": "merged_{{ run_name }}_job{{ job_id }}.root",
"merged_{{ run_name }}_job{{ job_id }}_stats.json": "merged_{{ run_name }}_job{{ job_id }}_stats.json"
},
"output_dir": "output/{{ run_name }}",
"output_dir": "output/",
"mass": "{{ mass }}",
"run_name": "{{ run_name }}",
"year": 2021,
"num_input_files": {{ input_files['root_files'] | length }},
"force": true,
"compression": 6,
"validate": true
"validate": true,
"write_stats": true
}
11 changes: 8 additions & 3 deletions examples/merge_root/merge_root_example.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,20 @@
# -n: Maximum number of files per merge job (default: 20)
# -f: File pattern to match (default: *.root)
# -r: Run directory pattern (default: hps_*)
# -F: Path filter - only include files whose full path contains this string

PARENT_DIR="/sdf/data/hps/physics2021/data/recon/pass4_v8/"
PARENT_DIR=""
OUTPUT_PREFIX="merge_jobs"
MAX_FILES=50

hps-mc-prepare-merge-jobs \
$PARENT_DIR \
-o $OUTPUT_PREFIX \
-n $MAX_FILES \
-f *v0skim*root \
--single-list
-F reprocess
# -r ap* \
# -F "pass5_v9" \
# --single-list

# This creates:
# - merge_jobs_input_files.txt (single consolidated file list)
Expand All @@ -37,4 +40,6 @@ for batch_file in ${OUTPUT_PREFIX}_batch*_files.txt; do
${OUTPUT_PREFIX}_batch${batch_num}_jobs.json
done

#signal.tmpl \

cat ${OUTPUT_PREFIX}_batch*_jobs.json | jq -s 'add' > ${OUTPUT_PREFIX}_jobs.json
9 changes: 8 additions & 1 deletion examples/merge_root/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,12 @@
#SBATCH --time=24:00:00
#SBATCH --partition=shared
#SBATCH --job-name=examples
#hps-mc-job run -d $PWD/scratch root_merge job.json

hps-mc-job run -d $PWD/scratch root_merge job.json
timestamp=`date +"%Y-%m-%d_%H-%M-%S"`

hps-mc-batch slurm -o -A hps:hps-prod -m 8000 -q milano -W 6 \
-E /sdf/data/hps/users/mgignac/software/dev/hps-mc/examples/merge_root/setup_env.sh \
-d /sdf/scratch/hps/mgignac/merged/run/$timestamp \
-l /sdf/scratch/hps/mgignac/merged/logs/$timestamp/ \
root_merge merge_jobs_jobs.json
26 changes: 26 additions & 0 deletions examples/merge_root/signal.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
{% set first_file = input_files['root_files'][0] %}
{% set file_dir = first_file | dirname %}
{% set run_name = file_dir.split('/')[-1] %}
{% set mass = file_dir.split('/')[-2] %}
"job_id": {{ job_id }},
"description": "Merge ROOT files for {{ mass }}/{{ run_name }} (job {{ job_id }})",
"input_files": {
{% for file in input_files['root_files'] -%}
"{{ file }}": "{{ file | basename }}"{% if not loop.last %},{% endif %}

{% endfor -%}
},
"output_files": {
"merged_{{ run_name }}_job{{ job_id }}.root": "merged_{{ run_name }}_job{{ job_id }}.root",
"merged_{{ run_name }}_job{{ job_id }}_stats.json": "merged_{{ run_name }}_job{{ job_id }}_stats.json"
},
"output_dir": "/sdf/data/hps/physics2021/mc/hpstr/simp_pulser_new/pass5_v9/{{ mass }}/{{ run_name }}-merged",
"mass": "{{ mass }}",
"run_name": "{{ run_name }}",
"year": 2021,
"num_input_files": {{ input_files['root_files'] | length }},
"force": true,
"compression": 6,
"validate": true
}
283 changes: 283 additions & 0 deletions python/hpsmc/collect_merge_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
#!/usr/bin/env python3
"""
Collector script for MergeROOT statistics files.

This script aggregates merge statistics JSON files from multiple jobs
and produces a summary report.

Usage:
hps-mc-collect-merge-stats <dir> [-o output.json] [-p pattern] [-q]
"""

import argparse
import glob
import json
import os
import sys


class MergeStatsCollector:
"""
Collects and aggregates MergeROOT statistics from JSON files.
"""

def __init__(self, search_dir, pattern="**/*_stats.json"):
"""
Initialize the collector.

Parameters
----------
search_dir : str
Directory to search for stats files
pattern : str
Glob pattern for finding stats files (default: **/*_stats.json)
"""
self.search_dir = search_dir
self.pattern = pattern
self.stats_files = []
self.job_stats = []
self.summary = {}

def find_stats_files(self):
"""
Find all stats JSON files matching the pattern.

Returns
-------
list
List of paths to stats files found
"""
search_pattern = os.path.join(self.search_dir, self.pattern)
self.stats_files = sorted(glob.glob(search_pattern, recursive=True))
return self.stats_files

def collect(self):
"""
Collect and aggregate statistics from all found files.

Returns
-------
dict
Aggregated statistics dictionary
"""
if not self.stats_files:
self.find_stats_files()

self.job_stats = []
successful_jobs = 0
failed_jobs = 0
total_input_files = 0
tree_totals = {}

for stats_file in self.stats_files:
try:
with open(stats_file, 'r') as f:
stats = json.load(f)

self.job_stats.append({
"file": stats_file,
"stats": stats
})

# Track validation results
if stats.get("validation_passed", False):
successful_jobs += 1
else:
failed_jobs += 1

# Count input files
total_input_files += stats.get("num_input_files", 0)

# Aggregate tree totals (use output events as source of truth)
output_events = stats.get("output_events", {})
for tree_name, count in output_events.items():
if tree_name not in tree_totals:
tree_totals[tree_name] = {"input": 0, "output": 0}
tree_totals[tree_name]["output"] += count

# Sum up input events
total_input_events = stats.get("total_input_events", {})
for tree_name, count in total_input_events.items():
if tree_name not in tree_totals:
tree_totals[tree_name] = {"input": 0, "output": 0}
tree_totals[tree_name]["input"] += count

except (json.JSONDecodeError, IOError) as e:
print("WARNING: Could not read stats file %s: %s" % (stats_file, e),
file=sys.stderr)
failed_jobs += 1
continue

# Build summary
self.summary = {
"summary": {
"total_jobs": len(self.job_stats),
"successful_jobs": successful_jobs,
"failed_jobs": failed_jobs,
"total_input_files": total_input_files,
"all_validations_passed": failed_jobs == 0 and len(self.job_stats) > 0
},
"tree_totals": tree_totals,
"jobs": [js["stats"] for js in self.job_stats]
}

return self.summary

def write_summary(self, output_file):
"""
Write the summary to a JSON file.

Parameters
----------
output_file : str
Path to output JSON file
"""
if not self.summary:
self.collect()

with open(output_file, 'w') as f:
json.dump(self.summary, f, indent=2)

def print_report(self, quiet=False):
"""
Print a human-readable summary report.

Parameters
----------
quiet : bool
If True, only print summary line
"""
if not self.summary:
self.collect()

s = self.summary["summary"]

if quiet:
status = "PASS" if s["all_validations_passed"] else "FAIL"
print("%s: %d jobs, %d input files" % (
status, s["total_jobs"], s["total_input_files"]))
return

print()
print("=" * 70)
print("MergeROOT Statistics Summary")
print("=" * 70)
print()
print("Search directory: %s" % self.search_dir)
print("Stats files found: %d" % len(self.stats_files))
print()
print("-" * 70)
print("Job Summary")
print("-" * 70)
print(" Total jobs: %d" % s["total_jobs"])
print(" Successful jobs: %d" % s["successful_jobs"])
print(" Failed jobs: %d" % s["failed_jobs"])
print(" Total input files: %d" % s["total_input_files"])
print()

# Tree totals
tree_totals = self.summary.get("tree_totals", {})
if tree_totals:
print("-" * 70)
print("Event Counts by Tree")
print("-" * 70)
print("%-30s %15s %15s %10s" % ("Tree Name", "Input Events", "Output Events", "Status"))
print("-" * 70)

for tree_name, counts in sorted(tree_totals.items()):
input_count = counts["input"]
output_count = counts["output"]
if input_count == output_count:
status = "PASS"
else:
status = "FAIL"
print("%-30s %15d %15d %10s" % (tree_name, input_count, output_count, status))

print("-" * 70)
print()

# Final status
if s["all_validations_passed"]:
print("OVERALL STATUS: PASS")
else:
print("OVERALL STATUS: FAIL")

print("=" * 70)
print()


def main():
"""Main entry point for the collector script."""
parser = argparse.ArgumentParser(
description="Collect and aggregate MergeROOT statistics from JSON files.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Collect stats from current directory
hps-mc-collect-merge-stats .

# Collect stats and write summary to file
hps-mc-collect-merge-stats /path/to/output -o summary.json

# Use custom pattern
hps-mc-collect-merge-stats . -p "**/merge_*_stats.json"

# Quiet mode (single line output)
hps-mc-collect-merge-stats . -q
"""
)

parser.add_argument(
"directory",
help="Directory to search for stats files"
)
parser.add_argument(
"-o", "--output",
help="Output JSON file for aggregated summary"
)
parser.add_argument(
"-p", "--pattern",
default="**/*_stats.json",
help="Glob pattern for stats files (default: **/*_stats.json)"
)
parser.add_argument(
"-q", "--quiet",
action="store_true",
help="Quiet mode - only print summary line"
)

args = parser.parse_args()

# Validate directory
if not os.path.isdir(args.directory):
print("ERROR: Directory not found: %s" % args.directory, file=sys.stderr)
sys.exit(1)

# Create collector and run
collector = MergeStatsCollector(args.directory, args.pattern)
stats_files = collector.find_stats_files()

if not stats_files:
print("WARNING: No stats files found matching pattern '%s' in %s" % (
args.pattern, args.directory), file=sys.stderr)
sys.exit(0)

# Collect and report
collector.collect()
collector.print_report(quiet=args.quiet)

# Write output file if requested
if args.output:
collector.write_summary(args.output)
if not args.quiet:
print("Summary written to: %s" % args.output)

# Exit with appropriate code
if collector.summary["summary"]["all_validations_passed"]:
sys.exit(0)
else:
sys.exit(1)


if __name__ == "__main__":
main()
2 changes: 1 addition & 1 deletion python/hpsmc/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ def parse_args(self):
if cl.params:
self.param_file = os.path.abspath(cl.params)
params = {}
if cl.job_id:
if cl.job_id is not None:
# Load data from a job store containing multiple jobs.
self.job_id = cl.job_id
logger.debug("Loading job with ID %d from job store '%s'" % (self.job_id, self.param_file))
Expand Down
Loading