Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 42 additions & 31 deletions src/crab/cli/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')))

from crab.core.engine import Engine
from crab.log import get_logger, LogLevel

def load_environment_config(preset_arg: str) -> Dict[str, Any]:
presets_filename = "presets.json"
print(f"Info: Loading preset '{preset_arg}' from {presets_filename}", flush=True)
# Logging happens at the call site; this function stays pure
try:
with open(presets_filename, 'r') as f:
all_presets = json.load(f)
Expand Down Expand Up @@ -66,103 +67,113 @@ def prepare_execution_environment(env_dict: Dict[str, str]) -> Dict[str, str]:
final_env[key] = os.path.expandvars(value)
return final_env

def _parse_log_level(raw: str) -> LogLevel:
"""Convert a CLI string to a LogLevel, defaulting to INFO."""
mapping = {"DEBUG": LogLevel.DEBUG, "INFO": LogLevel.INFO,
"WARNING": LogLevel.WARNING, "ERROR": LogLevel.ERROR,
"CRITICAL": LogLevel.CRITICAL}
return mapping.get(raw.upper().strip(), LogLevel.INFO)


def run_from_cli():
# --- WORKER MODE ---
if "--worker" in sys.argv:
# Workers read CRAB_LOG_LEVEL from the environment (set by presets)
logger = get_logger()

try:
workdir_index = sys.argv.index("--workdir") + 1
work_dir = sys.argv[workdir_index]

config_file = os.path.join(work_dir, 'config.json')
env_file = os.path.join(work_dir, 'environment.json')

print(f"--- [WORKER MODE DETECTED] Work dir: {work_dir} ---", flush=True)
logger.info(f"Worker mode detected workdir={work_dir}")

with open(config_file, 'r') as f:
benchmark_config = json.load(f)

# NOTA: environment.json ora contiene solo le ENV vars processate dall'orchestrator

with open(env_file, 'r') as f:
execution_env = json.load(f)

print(f"--- [WORKER] Environment loaded. Starting engine. ---", flush=True)

logger.info("Environment loaded, starting engine")

# Time execution
start = time.time()

engine = Engine(log_callback=print)
engine = Engine(logger=logger)
engine.run(
config=benchmark_config,
config=benchmark_config,
environment=execution_env,
is_worker=True,
output_dir=work_dir
output_dir=work_dir,
)

elapsed_time = time.time() - start
total = timedelta(seconds=int(elapsed_time))

print(f"--- [WORKER] Engine run finished. ---", flush=True)
print(f"--- Took: " + str(total) + " ---", flush=True)
logger.info(f"Engine run finished elapsed={total}")

except Exception as e:
print(f"[WORKER FATAL ERROR] {e}", file=sys.stderr, flush=True)
logger.critical(f"Worker fatal error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)

# --- ORCHESTRATOR MODE ---
else:
parser = argparse.ArgumentParser(description="CRAB Benchmarking Orchestrator.")
parser.add_argument("-c", "--config", dest="app_config_file", required=True, help="Path to the JSON benchmark config.")
parser.add_argument("-c", "--config", dest="app_config_file", required=True,
help="Path to the JSON benchmark config.")
parser.add_argument("-p", "--preset", help="Name of the preset to use.")
parser.add_argument("--log-level", dest="log_level", default=None,
help="Log verbosity (DEBUG, INFO, WARNING, ERROR, CRITICAL).")
parser.add_argument("--worker", action="store_true", help=argparse.SUPPRESS)
args = parser.parse_args()

# CLI flag overrides the env var
level = _parse_log_level(args.log_level) if args.log_level else None
logger = get_logger(level=level)

try:
selected_preset = args.preset or os.environ.get("CRAB_PRESET")
if os.path.exists(".env") and not selected_preset:
with open(".env", "r") as f:
selected_preset = f.read().strip()

if not selected_preset:
selected_preset = "local"

logger.info(f"Loading preset '{selected_preset}'")

# 1. Carica la configurazione strutturata (Env, Sbatch, Header)
preset_config = load_environment_config(selected_preset)

# 2. Prepara le variabili d'ambiente (risolve __CWD__ etc)
execution_env = prepare_execution_environment(preset_config["env"])

# 3. Carica il config dell'esperimento
with open(args.app_config_file, 'r') as f:
benchmark_config = json.load(f)

# 4. Inietta le configurazioni di sistema nelle global_options del benchmark
# Questo permette all'Engine di accedere a sbatch/header di sistema
if "global_options" not in benchmark_config:
benchmark_config["global_options"] = {}

benchmark_config["global_options"]["system_sbatch"] = preset_config["sbatch"]
benchmark_config["global_options"]["system_header"] = preset_config["header"]

print("-" * 50)
print(f"Avvio del motore con il preset '{selected_preset}'...")
print("-" * 50)
logger.info(f"Starting engine with preset '{selected_preset}'")

engine = Engine(log_callback=print)
engine = Engine(logger=logger)
engine.run(
config=benchmark_config,
config=benchmark_config,
environment=execution_env,
is_worker=False
is_worker=False,
)

print("-" * 50)
print("Orchestration complete. Job submitted to SLURM.")
print("-" * 50)
logger.info("Orchestration complete — job submitted to SLURM")

except Exception as e:
print(f"[ORCHESTRATOR FATAL ERROR] {e}", file=sys.stderr)
logger.critical(f"Orchestrator fatal error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
Loading