From 5df652e89db59d165eb7a0d4d2a7d4d89f531c15 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 18 Mar 2026 15:59:55 +0000 Subject: [PATCH 1/2] feat: add unified context-aware logging system New src/crab/log/ package with: - CrabLogger: hierarchical context nesting (worker > experiment > run > app) - RichFormatter: ANSI-colored tree-style output - PlainFormatter: grep-friendly bracketed output - StreamHandler: stdout (-> slurm_output.log under SLURM) - TUIHandler: routes records to Textual RichLog widget - Live subprocess output streaming via background threads - Thread-safe concurrent logging with write locks - CRAB_LOG_LEVEL env var + --log-level CLI flag Integration: - engine.py: CrabLogger replaces log_callback, context nesting per experiment/run/app - orchestrator.py: all print() replaced, --log-level flag added - slurm.py/mpi.py: debug prints removed (wl_managers stay logger-agnostic) - controller.py: TUIHandler wired to RichLog widget - wrappers: stray prints removed from microbench_common, miniFE, ib_send_lat Co-Authored-By: Matteo Marcelletti --- src/crab/cli/orchestrator.py | 73 +++++++------ src/crab/core/engine.py | 170 ++++++++++++++++++------------ src/crab/core/wl_manager/mpi.py | 1 - src/crab/core/wl_manager/slurm.py | 3 +- src/crab/log/__init__.py | 74 +++++++++++++ src/crab/log/formatters.py | 109 +++++++++++++++++++ src/crab/log/handlers.py | 102 ++++++++++++++++++ src/crab/log/logger.py | 159 ++++++++++++++++++++++++++++ src/crab/tui/controller.py | 30 +++--- wrappers/ib_send_lat.py | 1 - wrappers/microbench_common.py | 1 - wrappers/miniFE.py | 3 +- 12 files changed, 605 insertions(+), 121 deletions(-) create mode 100644 src/crab/log/__init__.py create mode 100644 src/crab/log/formatters.py create mode 100644 src/crab/log/handlers.py create mode 100644 src/crab/log/logger.py diff --git a/src/crab/cli/orchestrator.py b/src/crab/cli/orchestrator.py index 5e450743..2f7cdcef 100644 --- a/src/crab/cli/orchestrator.py +++ b/src/crab/cli/orchestrator.py @@ -10,10 +10,11 @@ sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))) from crab.core.engine import Engine +from crab.log import get_logger, LogLevel def load_environment_config(preset_arg: str) -> Dict[str, Any]: presets_filename = "presets.json" - print(f"Info: Loading preset '{preset_arg}' from {presets_filename}", flush=True) + # Logging happens at the call site; this function stays pure try: with open(presets_filename, 'r') as f: all_presets = json.load(f) @@ -66,9 +67,20 @@ def prepare_execution_environment(env_dict: Dict[str, str]) -> Dict[str, str]: final_env[key] = os.path.expandvars(value) return final_env +def _parse_log_level(raw: str) -> LogLevel: + """Convert a CLI string to a LogLevel, defaulting to INFO.""" + mapping = {"DEBUG": LogLevel.DEBUG, "INFO": LogLevel.INFO, + "WARNING": LogLevel.WARNING, "ERROR": LogLevel.ERROR, + "CRITICAL": LogLevel.CRITICAL} + return mapping.get(raw.upper().strip(), LogLevel.INFO) + + def run_from_cli(): # --- WORKER MODE --- if "--worker" in sys.argv: + # Workers read CRAB_LOG_LEVEL from the environment (set by presets) + logger = get_logger() + try: workdir_index = sys.argv.index("--workdir") + 1 work_dir = sys.argv[workdir_index] @@ -76,93 +88,92 @@ def run_from_cli(): config_file = os.path.join(work_dir, 'config.json') env_file = os.path.join(work_dir, 'environment.json') - print(f"--- [WORKER MODE DETECTED] Work dir: {work_dir} ---", flush=True) + logger.info(f"Worker mode detected workdir={work_dir}") with open(config_file, 'r') as f: benchmark_config = json.load(f) - - # NOTA: environment.json ora contiene solo le ENV vars processate dall'orchestrator + with open(env_file, 'r') as f: execution_env = json.load(f) - - print(f"--- [WORKER] Environment loaded. Starting engine. ---", flush=True) + logger.info("Environment loaded, starting engine") - # Time execution start = time.time() - engine = Engine(log_callback=print) + engine = Engine(logger=logger) engine.run( - config=benchmark_config, + config=benchmark_config, environment=execution_env, is_worker=True, - output_dir=work_dir + output_dir=work_dir, ) elapsed_time = time.time() - start total = timedelta(seconds=int(elapsed_time)) - - print(f"--- [WORKER] Engine run finished. ---", flush=True) - print(f"--- Took: " + str(total) + " ---", flush=True) + logger.info(f"Engine run finished elapsed={total}") except Exception as e: - print(f"[WORKER FATAL ERROR] {e}", file=sys.stderr, flush=True) + logger.critical(f"Worker fatal error: {e}") import traceback traceback.print_exc() sys.exit(1) - + # --- ORCHESTRATOR MODE --- else: parser = argparse.ArgumentParser(description="CRAB Benchmarking Orchestrator.") - parser.add_argument("-c", "--config", dest="app_config_file", required=True, help="Path to the JSON benchmark config.") + parser.add_argument("-c", "--config", dest="app_config_file", required=True, + help="Path to the JSON benchmark config.") parser.add_argument("-p", "--preset", help="Name of the preset to use.") + parser.add_argument("--log-level", dest="log_level", default=None, + help="Log verbosity (DEBUG, INFO, WARNING, ERROR, CRITICAL).") parser.add_argument("--worker", action="store_true", help=argparse.SUPPRESS) args = parser.parse_args() + # CLI flag overrides the env var + level = _parse_log_level(args.log_level) if args.log_level else None + logger = get_logger(level=level) + try: selected_preset = args.preset or os.environ.get("CRAB_PRESET") if os.path.exists(".env") and not selected_preset: with open(".env", "r") as f: selected_preset = f.read().strip() - + if not selected_preset: selected_preset = "local" + logger.info(f"Loading preset '{selected_preset}'") + # 1. Carica la configurazione strutturata (Env, Sbatch, Header) preset_config = load_environment_config(selected_preset) - + # 2. Prepara le variabili d'ambiente (risolve __CWD__ etc) execution_env = prepare_execution_environment(preset_config["env"]) - + # 3. Carica il config dell'esperimento with open(args.app_config_file, 'r') as f: benchmark_config = json.load(f) # 4. Inietta le configurazioni di sistema nelle global_options del benchmark - # Questo permette all'Engine di accedere a sbatch/header di sistema if "global_options" not in benchmark_config: benchmark_config["global_options"] = {} - + benchmark_config["global_options"]["system_sbatch"] = preset_config["sbatch"] benchmark_config["global_options"]["system_header"] = preset_config["header"] - print("-" * 50) - print(f"Avvio del motore con il preset '{selected_preset}'...") - print("-" * 50) + logger.info(f"Starting engine with preset '{selected_preset}'") - engine = Engine(log_callback=print) + engine = Engine(logger=logger) engine.run( - config=benchmark_config, + config=benchmark_config, environment=execution_env, - is_worker=False + is_worker=False, ) - print("-" * 50) - print("Orchestration complete. Job submitted to SLURM.") - print("-" * 50) + logger.info("Orchestration complete — job submitted to SLURM") except Exception as e: - print(f"[ORCHESTRATOR FATAL ERROR] {e}", file=sys.stderr) + logger.critical(f"Orchestrator fatal error: {e}") import traceback traceback.print_exc() sys.exit(1) diff --git a/src/crab/core/engine.py b/src/crab/core/engine.py index 76c40958..bdc9bf42 100644 --- a/src/crab/core/engine.py +++ b/src/crab/core/engine.py @@ -12,7 +12,9 @@ import shlex import json import shutil -from typing import List, Dict, Any, Callable, Optional, Union +from typing import List, Dict, Any, Optional, Union + +from crab.log import CrabLogger, LogLevel, LogSource # ============================================================================= # 1. DATA CONTAINERS & UTILITIES @@ -63,12 +65,18 @@ def check_CI(container_list: List[DataContainer], alpha: float, beta: float, con check = check and container.converged return check -def run_job(job, wlmanager, ppn: int, pre_commands: List[str] = None): - """launches an application process via the workload manager.""" +def run_job(job, wlmanager, ppn: int, logger: CrabLogger, + pre_commands: List[str] = None, live_stream: bool = False): + """ + Launch an application process via the workload manager. + + When live_stream=True, stdout is read line-by-line in a background + thread and forwarded through the logger in real time. The reader + thread is stored on job._stream_thread for later joining. + """ if not job.node_list: raise Exception(f"Application {job.id_num} has 0 allocated nodes.") - # Passa pre_commands al workload manager cmd_string = wlmanager.run_job(job.node_list, ppn, job.run_app(), pre_commands=pre_commands) if not cmd_string: @@ -76,24 +84,41 @@ def run_job(job, wlmanager, ppn: int, pre_commands: List[str] = None): raise Exception cmd = shlex.split(cmd_string) - process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False) - job.set_process(process) -def end_job(job): + if live_stream: + # Stdout piped line-by-line to the logger thread; + # stderr still fully captured for post-run inspection + process = subprocess.Popen(cmd, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, shell=False) + job.set_process(process) + app_label = f"App {job.id_num}" + job._stream_thread = logger.stream_process(process, app_label) + else: + process = subprocess.Popen(cmd, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, shell=False) + job.set_process(process) + job._stream_thread = None + + logger.info(f"Launched App {job.id_num} PID={process.pid} nodes={job.node_list}") + +def end_job(job, logger: CrabLogger): """Forcefully terminates a job and retrieves output.""" if hasattr(job, 'process') and job.process: job.process.kill() + if hasattr(job, '_stream_thread') and job._stream_thread: + job._stream_thread.join(timeout=2.0) out, err = job.process.communicate() job.set_output(out, err) + logger.debug(f"Killed App {job.id_num}") -def wait_timed(job, timeout_sec: float) -> bool: +def wait_timed(job, timeout_sec: float, logger: CrabLogger) -> bool: """Waits for a job with a timeout. Returns True if timed out.""" try: out, err = job.process.communicate(timeout=timeout_sec) job.set_output(out, err) return False except subprocess.TimeoutExpired: - end_job(job) + end_job(job, logger) return True def log_data(out_format: str, path_prefix: str, data_containers: List[DataContainer]): @@ -264,12 +289,12 @@ class ExperimentRunner: Isolates setup, execution, and teardown. """ def __init__(self, exp_name: str, config: Dict[str, Any], global_options: Dict[str, Any], - node_list: List[str], output_dir: str, log_fn: Callable): + node_list: List[str], output_dir: str, logger: CrabLogger): self.name = exp_name self.config = config self.global_opts = global_options self.node_list = node_list - self.log = log_fn + self.log = logger.enter(exp_name) # Paths self.exp_dir = os.path.join(output_dir, self.name) @@ -283,7 +308,7 @@ def __init__(self, exp_name: str, config: Dict[str, Any], global_options: Dict[s def setup(self): """Loads apps, workload manager, and calculates node layout.""" - self.log(f"[{self.name}] Setting up...") + self.log.info("Setting up...") # 1. Load Applications self.apps = [] @@ -319,7 +344,7 @@ def load_module(path): path = os.path.join(os.environ["CRAB_WRAPPERS_PATH"], path) if not os.path.exists(path): - self.log(f"[ERROR] Wrapper not found at: {path}") + self.log.error(f"Wrapper not found at: {path}") raise FileNotFoundError(f"Wrapper not found: {path}") # Load App Class @@ -373,7 +398,7 @@ def load_module(path): def execute(self): """Main execution loop (Setup -> Run -> Wait -> Converge).""" - self.log(f"[{self.name}] Execution started.") + self.log.info("Execution started") # Params min_runs = int(self.global_opts.get('minruns', 10)) @@ -419,7 +444,8 @@ def execute(self): if runs >= max_runs or (runs >= min_runs and converged) or elapsed >= timeout: break - self.log(f"[{self.name}] Run {runs+1}...") + run_log = self.log.enter(f"Run {runs + 1}") + run_log.info("Started") run_start = time.time() # Reset ephemeral schedule for this run @@ -437,11 +463,15 @@ def execute(self): aid, action, _ = curr_schedule.pop(0) if action == 's': if aid not in running: - run_job(self.apps[aid], self.wlmanager, self.ppn, pre_commands=system_header) + app_log = run_log.enter(f"App {aid}") + concurrent = len(static_schedule) > 1 or len(dependency_map) > 0 + run_job(self.apps[aid], self.wlmanager, self.ppn, + logger=app_log, pre_commands=system_header, + live_stream=concurrent) running.add(aid) elif action == 'k': if aid in running: - end_job(self.apps[aid]) + end_job(self.apps[aid], run_log) running.remove(aid) finished.add(aid) @@ -449,45 +479,42 @@ def execute(self): for aid in list(running): proc = self.apps[aid].process if proc.poll() is not None: - # Il processo è terminato + app_log = run_log.enter(f"App {aid}") try: + # Join live-stream thread if one exists + if hasattr(self.apps[aid], '_stream_thread') and self.apps[aid]._stream_thread: + self.apps[aid]._stream_thread.join(timeout=2.0) + out, err = proc.communicate() self.apps[aid].set_output(out, err) - - # --- INIZIO MODIFICA: Error Logging --- - if proc.returncode != 0: - # Costruiamo il messaggio di errore - error_msg = ( - f"\n[CRAB ERROR] Experiment '{self.name}' - App {aid} failed!\n" - f"Return Code: {proc.returncode}\n" - ) - - # Decodifica STDERR (byte -> string) per sicurezza + + exit_code = proc.returncode + if exit_code != 0: + app_log.error(f"FAILED exit={exit_code}") + # Forward stderr through the logger if err: - decoded_err = err.decode('utf-8', errors='replace') if isinstance(err, bytes) else err - error_msg += f"--- STDERR ---\n{decoded_err}\n" - - # Decodifica STDOUT (spesso MPI stampa errori qui) - if out: - decoded_out = out.decode('utf-8', errors='replace') if isinstance(out, bytes) else out - error_msg += f"--- STDOUT TAIL ---\n{decoded_out[-2000:]}\n" # Ultimi 2000 caratteri - - error_msg += "------------------------------------------------\n" - - # 1. Stampa su sys.stderr (finisce in slurm_error.log) - print(error_msg, file=sys.stderr, flush=True) - - # 2. Salva un file di log dedicato nella cartella dell'esperimento + stderr_text = err.decode('utf-8', errors='replace') if isinstance(err, bytes) else err + app_log.app_output("", stderr_text) + # Write detailed error log to experiment dir try: - log_path = os.path.join(self.exp_dir, f"error_app_{aid}.log") - with open(log_path, "w") as f: - f.write(error_msg) - except Exception as e: - print(f"[CRAB WARNING] Could not write error log file: {e}", file=sys.stderr) - # --- FINE MODIFICA --- + err_path = os.path.join(self.exp_dir, f"error_app_{aid}.log") + with open(err_path, "w") as f: + f.write(f"App {aid} exit={exit_code}\n") + if err: + decoded = err.decode('utf-8', errors='replace') if isinstance(err, bytes) else err + f.write(decoded) + except Exception: + app_log.warning("Could not write error log file") + else: + app_log.info(f"FINISHED exit=0") + # Forward stdout (post-run) if not already live-streamed + if not (hasattr(self.apps[aid], '_stream_thread') and self.apps[aid]._stream_thread): + if out: + stdout_text = out.decode('utf-8', errors='replace') if isinstance(out, bytes) else out + app_log.app_output(stdout_text) except Exception as e: - self.log(f"[INTERNAL ERROR] Failed reading output for app {aid}: {e}") + app_log.error(f"Failed reading output: {e}") running.remove(aid) finished.add(aid) @@ -496,7 +523,10 @@ def execute(self): started_deps = [] for waiter, target in curr_deps.items(): if target in finished: - run_job(self.apps[waiter], self.wlmanager, self.ppn, pre_commands=system_header) + dep_log = run_log.enter(f"App {waiter}") + run_job(self.apps[waiter], self.wlmanager, self.ppn, + logger=dep_log, pre_commands=system_header, + live_stream=True) running.add(waiter) if waiter in rel_durations: curr_schedule.append((waiter, 'k', now + rel_durations[waiter])) @@ -522,6 +552,8 @@ def execute(self): runs += 1 if runs >= min_runs: converged = check_CI(self.data_containers, alpha, beta, converge_all, runs) + if converged: + self.log.info(f"Converged at run {runs}") finally: self.teardown() @@ -540,7 +572,7 @@ def save_results(self): out_fmt = self.global_opts.get('outformat', 'csv') prefix = os.path.join(self.exp_dir, 'data') log_data(out_fmt, prefix, self.data_containers) - self.log(f"[{self.name}] Data saved to {self.exp_dir}") + self.log.info(f"Data saved to {self.exp_dir}") # ============================================================================= # 4. ENGINE (Orchestrator & Worker Entry Point) @@ -549,8 +581,8 @@ def save_results(self): # ... (Imports e classi precedenti rimangono uguali) ... class Engine: - def __init__(self, log_callback: Callable[[str], None] = print): - self.log = log_callback + def __init__(self, logger: CrabLogger): + self.log = logger def run(self, config: Dict[str, Any], environment: Dict[str, Any], is_worker: bool = False, output_dir: str = None): if is_worker: @@ -614,7 +646,7 @@ def _generate_sbatch_header(self, global_opts: Dict[str, Any], data_directory: s # A. Security Check (Newline Injection) if '\n' in directive or '\r' in directive: - self.log(f"[SECURITY WARN] Skipping directive containing newlines: {directive}") + self.log.warning(f"Skipping directive containing newlines: {directive}") continue # B. Estrazione Chiave (Key Extraction) @@ -629,11 +661,11 @@ def _generate_sbatch_header(self, global_opts: Dict[str, Any], data_directory: s # C. Conflict Resolution if key in protected_defaults: - self.log(f"[CONFIG WARN] User directive '{directive}' ignored. '{key}' is managed by Crab to ensure stability.") + self.log.warning(f"User directive '{directive}' ignored. '{key}' is managed by CRAB.") continue if key in ['output', 'error', 'o', 'e']: - self.log(f"[CONFIG WARN] User overrode log path with '{directive}'. Standard logging might be lost.") + self.log.warning(f"User overrode log path with '{directive}'. Standard logging might be lost.") # D. Apply (Last write wins for user defaults, except protected) directives_map[key] = directive @@ -643,7 +675,7 @@ def _generate_sbatch_header(self, global_opts: Dict[str, Any], data_directory: s return [f"#SBATCH {v}" for v in directives_map.values()] def _run_orchestrator(self, config: Dict[str, Any], environment: Dict[str, Any]): - self.log("Engine running in ORCHESTRATOR mode.") + self.log.info("Engine running in ORCHESTRATOR mode") if "experiments" not in config: if "applications" in config: @@ -720,14 +752,14 @@ def _run_orchestrator(self, config: Dict[str, Any], environment: Dict[str, Any]) f.write(f"\n{cmd}\n") - self.log(f"Submitting: sbatch {script_path}") + self.log.info(f"Submitting: sbatch {script_path}") out = subprocess.check_output(['sbatch', script_path], text=True) - self.log(out.strip()) + self.log.info(out.strip()) def _run_worker(self, config: Dict[str, Any], environment: Dict[str, Any], output_dir: str): # ... (Il worker rimane identico a prima) ... # (Incolla qui il codice di _run_worker che hai già) - self.log("--- [WORKER] Started ---") + self.log.info("Worker started") orig_env = os.environ.copy() os.environ.update(environment) @@ -738,36 +770,38 @@ def _run_worker(self, config: Dict[str, Any], environment: Dict[str, Any], outpu subprocess.call(["scontrol", "show", "hostnames", os.environ.get('SLURM_NODELIST')], stdout=f) nodes_df = pandas.read_csv(node_file, header=None) full_node_list = nodes_df.iloc[:, 0].tolist() - + self.log.info(f"Allocated {len(full_node_list)} node(s)") + global_opts = config.get('global_options', {}) experiments = config.get('experiments', {}) sorted_exp_ids = sorted(experiments.keys()) + total_exps = len(sorted_exp_ids) - for exp_id in sorted_exp_ids: + for idx, exp_id in enumerate(sorted_exp_ids, 1): exp_config = experiments[exp_id] - self.log(f"\n=== Starting Experiment: {exp_id} ===") - + self.log.info(f"Starting experiment [{idx}/{total_exps}]: {exp_id}") + runner = ExperimentRunner( exp_name=exp_id, config=exp_config, global_options=global_opts, node_list=full_node_list, output_dir=output_dir, - log_fn=self.log + logger=self.log, ) try: runner.setup() runner.execute() runner.save_results() except Exception as e: - self.log(f"[ERROR] Experiment {exp_id} failed: {e}") + self.log.error(f"Experiment {exp_id} failed: {e}") import traceback traceback.print_exc() finally: runner.teardown() time.sleep(2) - - self.log("--- [WORKER] All experiments finished ---") + + self.log.info("All experiments finished") finally: os.environ.clear() diff --git a/src/crab/core/wl_manager/mpi.py b/src/crab/core/wl_manager/mpi.py index 6399ee04..c0d92b48 100644 --- a/src/crab/core/wl_manager/mpi.py +++ b/src/crab/core/wl_manager/mpi.py @@ -22,5 +22,4 @@ def run_job(self, node_list, ppn, cmd): os.environ["CRAB_PINNING_FLAGS"] + " " + \ os.environ["CRAB_MPIRUN_HOSTNAMES_FLAG"] + " " + node_list_string + " " + \ "-np " + str(ppn*num_nodes) + " " + cmd - print("[DEBUG]: MPI command is: " + job_cmd) return job_cmd diff --git a/src/crab/core/wl_manager/slurm.py b/src/crab/core/wl_manager/slurm.py index 5c055b00..f1215cf8 100644 --- a/src/crab/core/wl_manager/slurm.py +++ b/src/crab/core/wl_manager/slurm.py @@ -53,5 +53,6 @@ def run_job(self, node_list: List[str], ppn: int, cmd: str, pre_commands: Option final_cmd # Usiamo il comando calcolato (wrapped o raw) ).strip() - print("[DEBUG]: SLURM command is: " + slurm_string) + # Debug output routed through the CRAB logger by the caller; + # the wl_manager itself stays logger-agnostic. return slurm_string diff --git a/src/crab/log/__init__.py b/src/crab/log/__init__.py new file mode 100644 index 00000000..f314aa5e --- /dev/null +++ b/src/crab/log/__init__.py @@ -0,0 +1,74 @@ +""" +CRAB Logging Package — public API. + +Usage: + from crab.log import get_logger, LogLevel + + logger = get_logger() # root logger + exp_log = logger.enter("exp_baseline") # experiment context + run_log = exp_log.enter("run_1") # run context + app_log = run_log.enter("app_0::a2a_b") # app context + + app_log.info("Launched on 4 nodes") + app_log.app_output(stdout_text) + +Log level is controlled by the CRAB_LOG_LEVEL environment variable +(DEBUG, INFO, WARNING, ERROR, CRITICAL). Defaults to INFO. +""" + +import os + +from .logger import CrabLogger, LogLevel, LogSource, LogRecord +from .formatters import RichFormatter, PlainFormatter +from .handlers import StreamHandler, TUIHandler + +__all__ = [ + "get_logger", + "CrabLogger", + "LogLevel", + "LogSource", + "LogRecord", + "RichFormatter", + "PlainFormatter", + "StreamHandler", + "TUIHandler", +] + +# Canonical level names accepted by CRAB_LOG_LEVEL +_LEVEL_MAP = { + "DEBUG": LogLevel.DEBUG, + "INFO": LogLevel.INFO, + "WARNING": LogLevel.WARNING, + "ERROR": LogLevel.ERROR, + "CRITICAL": LogLevel.CRITICAL, +} + + +def _resolve_level() -> LogLevel: + """Read CRAB_LOG_LEVEL from the environment, default to INFO.""" + raw = os.environ.get("CRAB_LOG_LEVEL", "INFO").upper().strip() + return _LEVEL_MAP.get(raw, LogLevel.INFO) + + +def get_logger(use_rich: bool = True, level: LogLevel = None) -> CrabLogger: + """ + Create a fresh root CrabLogger wired to stdout. + + Parameters + ---------- + use_rich : bool + If True (default), use the colorized RichFormatter. + If False, use PlainFormatter for grep-friendly output. + level : LogLevel, optional + Override the log level. If None, reads CRAB_LOG_LEVEL env var. + + Returns a ready-to-use CrabLogger instance. + """ + if level is None: + level = _resolve_level() + + formatter = RichFormatter() if use_rich else PlainFormatter() + handler = StreamHandler(formatter) + + logger = CrabLogger(level=level, handlers=[handler]) + return logger diff --git a/src/crab/log/formatters.py b/src/crab/log/formatters.py new file mode 100644 index 00000000..563235c2 --- /dev/null +++ b/src/crab/log/formatters.py @@ -0,0 +1,109 @@ +""" +Output formatters for CRAB log records. + +Two formatters are provided: +- RichFormatter: colorized tree-style output using ANSI codes via Rich +- PlainFormatter: simple bracketed text for grep-friendly log files +""" + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from .logger import LogRecord + +# Level labels, padded to 5 chars for alignment +_LEVEL_LABELS = { + 10: "DEBUG", + 20: "INFO ", + 30: "WARN ", + 40: "ERROR", + 50: "FATAL", +} + +# ANSI color codes (256-color safe) +_RESET = "\033[0m" +_DIM = "\033[2m" +_BOLD = "\033[1m" + +_LEVEL_COLORS = { + 10: "\033[36m", # cyan for DEBUG + 20: "\033[32m", # green for INFO + 30: "\033[33m", # yellow for WARN + 40: "\033[31m", # red for ERROR + 50: "\033[1;31m", # bold red for FATAL +} + +_SOURCE_COLORS = { + "CRAB": "\033[34m", # blue + "APP": "\033[35m", # magenta +} + +# Tree-drawing characters +_TREE_PIPE = "\033[2m\u2502\033[0m" # dimmed | +_TREE_TEE = "\033[2m\u251c\u2500\033[0m" # dimmed |- +_TREE_INDENT = " " + + +class RichFormatter: + """ + Produces colorized, tree-structured log lines with ANSI escape codes. + + Output example: + [14:32:01] INFO CRAB | exp_baseline | Run 1 Launched App 0 + [14:32:04] INFO APP | exp_baseline | Run 1 | app_0::a2a_b 0.123, 0.120 + """ + + def format(self, record: "LogRecord") -> str: + level_color = _LEVEL_COLORS.get(record.level, "") + source_color = _SOURCE_COLORS.get(record.source, "") + level_label = _LEVEL_LABELS.get(record.level, "?????") + + # Timestamp + ts = f"{_DIM}[{record.timestamp}]{_RESET}" + + # Level tag + level_tag = f"{level_color}{level_label}{_RESET}" + + # Source tag + source_tag = f"{source_color}{record.source:4s}{_RESET}" + + # Context breadcrumb with tree separators + if record.context_stack: + ctx_parts = [] + for part in record.context_stack: + ctx_parts.append(f"{_DIM}{part}{_RESET}") + context_str = f" {_DIM}\u2502{_RESET} ".join(ctx_parts) + context_str = f" {_DIM}\u2502{_RESET} {context_str}" + else: + context_str = "" + + # Message color: APP source gets dimmed slightly + if record.source == "APP": + msg = f"{source_color}{record.message}{_RESET}" + else: + msg = record.message + + return f"{ts} {level_tag} {source_tag}{context_str} {msg}" + + +class PlainFormatter: + """ + Produces plain-text log lines with no ANSI codes. + + Output example: + [14:32:01] [INFO ] [CRAB] [exp_baseline] [Run 1] Launched App 0 + [14:32:04] [INFO ] [APP ] [exp_baseline] [Run 1] [app_0::a2a_b] 0.123 + """ + + def format(self, record: "LogRecord") -> str: + level_label = _LEVEL_LABELS.get(record.level, "?????") + source_label = f"{record.source:4s}" + + # Context breadcrumb + if record.context_stack: + context_str = " ".join(f"[{c}]" for c in record.context_stack) + context_str = f" {context_str}" + else: + context_str = "" + + return f"[{record.timestamp}] [{level_label}] [{source_label}]{context_str} {record.message}" diff --git a/src/crab/log/handlers.py b/src/crab/log/handlers.py new file mode 100644 index 00000000..efd19658 --- /dev/null +++ b/src/crab/log/handlers.py @@ -0,0 +1,102 @@ +""" +Output handlers for CRAB log records. + +Handlers receive formatted log records and write them to a destination. +Each handler owns a formatter that determines the visual style. +""" + +import sys +from typing import Union, TYPE_CHECKING + +if TYPE_CHECKING: + from .logger import LogRecord + from .formatters import RichFormatter, PlainFormatter + + +class BaseHandler: + """Abstract base — subclasses must implement `emit()`.""" + + def __init__(self, formatter: Union["RichFormatter", "PlainFormatter"]): + self.formatter = formatter + + def emit(self, record: "LogRecord") -> None: + raise NotImplementedError + + +class StreamHandler(BaseHandler): + """ + Writes formatted log lines to a stream (stdout by default). + + Under SLURM, stdout is redirected to slurm_output.log, so writing + here is equivalent to writing to the log file. + """ + + def __init__(self, formatter: Union["RichFormatter", "PlainFormatter"], + stream=None): + super().__init__(formatter) + self.stream = stream or sys.stdout + + def emit(self, record: "LogRecord") -> None: + line = self.formatter.format(record) + self.stream.write(line + "\n") + self.stream.flush() + + +class TUIHandler(BaseHandler): + """ + Routes log records to a Textual RichLog widget via a callback. + + The callback is typically `BenchmarkApp.log_to_tui`, which uses + `call_from_thread` to safely write from any thread. + """ + + def __init__(self, callback): + # TUI handler uses its own inline formatting (Rich markup) + super().__init__(formatter=None) + self._callback = callback + + def emit(self, record: "LogRecord") -> None: + line = self._format_for_tui(record) + self._callback(line) + + @staticmethod + def _format_for_tui(record: "LogRecord") -> str: + """Produce Rich markup suitable for Textual's RichLog widget.""" + from .logger import LogLevel, LogSource + + # Level styling + level_styles = { + LogLevel.DEBUG: ("dim", "DEBUG"), + LogLevel.INFO: ("bold green", "INFO "), + LogLevel.WARNING: ("bold yellow", "WARN "), + LogLevel.ERROR: ("bold red", "ERROR"), + LogLevel.CRITICAL: ("bold white on red", "FATAL"), + } + style, label = level_styles.get(record.level, ("", "?????")) + + # Source styling + if record.source == LogSource.APP: + source_markup = "[magenta]APP [/]" + else: + source_markup = "[blue]CRAB[/]" + + # Context breadcrumb + if record.context_stack: + ctx = " [dim]|[/] ".join( + f"[dim]{c}[/]" for c in record.context_stack + ) + ctx = f" [dim]|[/] {ctx}" + else: + ctx = "" + + # Message — APP output gets a distinct color + if record.source == LogSource.APP: + msg = f"[magenta]{record.message}[/]" + else: + msg = record.message + + return ( + f"[dim]\\[{record.timestamp}][/] " + f"[{style}]{label}[/] " + f"{source_markup}{ctx} {msg}" + ) diff --git a/src/crab/log/logger.py b/src/crab/log/logger.py new file mode 100644 index 00000000..1eef10b8 --- /dev/null +++ b/src/crab/log/logger.py @@ -0,0 +1,159 @@ +""" +Core context-aware logger for CRAB. + +Provides a hierarchical logger that tracks execution context +(worker -> experiment -> run -> app) and routes messages through +pluggable handlers. +""" + +import threading +from enum import IntEnum +from typing import List, Optional, TYPE_CHECKING + +if TYPE_CHECKING: + from .handlers import BaseHandler + + +class LogLevel(IntEnum): + """Standard log levels, ordered by severity.""" + DEBUG = 10 + INFO = 20 + WARNING = 30 + ERROR = 40 + CRITICAL = 50 + + +class LogSource: + """Identifies the origin of a log message.""" + CRAB = "CRAB" + APP = "APP" + + +class LogRecord: + """Single log entry carrying all context needed for formatting.""" + + __slots__ = ("level", "source", "message", "context_stack", "timestamp") + + def __init__(self, level: LogLevel, source: str, message: str, + context_stack: List[str], timestamp: str): + self.level = level + self.source = source + self.message = message + self.context_stack = context_stack + self.timestamp = timestamp + + +class CrabLogger: + """ + Context-aware logger that supports hierarchical nesting. + + Each call to `enter()` returns a child logger with an extended + context stack. All children share the same handlers and write lock, + so output from concurrent apps never interleaves mid-line. + """ + + def __init__(self, level: LogLevel = LogLevel.INFO, + handlers: Optional[List["BaseHandler"]] = None, + _context: Optional[List[str]] = None, + _lock: Optional[threading.Lock] = None): + self.level = level + self._handlers: List["BaseHandler"] = handlers or [] + self._context: List[str] = _context or [] + self._lock = _lock or threading.Lock() + + # -- Context management -- + + def enter(self, context_name: str) -> "CrabLogger": + """ + Create a child logger with an additional context level. + + The child shares handlers and lock with the parent, so all + output is serialized regardless of which thread writes. + """ + return CrabLogger( + level=self.level, + handlers=self._handlers, + _context=self._context + [context_name], + _lock=self._lock, + ) + + # -- Handler management -- + + def add_handler(self, handler: "BaseHandler") -> None: + self._handlers.append(handler) + + # -- Core emit -- + + def _emit(self, level: LogLevel, source: str, message: str) -> None: + """Build a record and dispatch to all handlers under the lock.""" + if level < self.level: + return + + import datetime + ts = datetime.datetime.now().strftime("%H:%M:%S") + record = LogRecord( + level=level, + source=source, + message=message, + context_stack=list(self._context), + timestamp=ts, + ) + + with self._lock: + for handler in self._handlers: + handler.emit(record) + + # -- Convenience methods (CRAB source) -- + + def debug(self, message: str) -> None: + self._emit(LogLevel.DEBUG, LogSource.CRAB, message) + + def info(self, message: str) -> None: + self._emit(LogLevel.INFO, LogSource.CRAB, message) + + def warning(self, message: str) -> None: + self._emit(LogLevel.WARNING, LogSource.CRAB, message) + + def error(self, message: str) -> None: + self._emit(LogLevel.ERROR, LogSource.CRAB, message) + + def critical(self, message: str) -> None: + self._emit(LogLevel.CRITICAL, LogSource.CRAB, message) + + # -- App output forwarding -- + + def app_output(self, stdout: str, stderr: str = "") -> None: + """ + Forward captured application output through the logger. + Uses the APP source so formatters can visually distinguish it. + """ + if stdout and stdout.strip(): + self._emit(LogLevel.INFO, LogSource.APP, stdout.strip()) + if stderr and stderr.strip(): + self._emit(LogLevel.WARNING, LogSource.APP, stderr.strip()) + + # -- Live output streaming -- + + def stream_process(self, process, app_label: str) -> threading.Thread: + """ + Start a background thread that reads stdout from a subprocess + line-by-line and forwards each line through the logger in real time. + + Returns the reader thread (caller may join it after process ends). + Stderr is captured post-run since it's typically small. + """ + child = self.enter(app_label) + + def _reader(): + try: + for raw_line in iter(process.stdout.readline, b""): + line = raw_line.decode("utf-8", errors="replace").rstrip() + if line: + child._emit(LogLevel.INFO, LogSource.APP, line) + except (ValueError, OSError): + # Pipe closed or process killed + pass + + thread = threading.Thread(target=_reader, daemon=True) + thread.start() + return thread diff --git a/src/crab/tui/controller.py b/src/crab/tui/controller.py index bb6edd72..3b8ae851 100644 --- a/src/crab/tui/controller.py +++ b/src/crab/tui/controller.py @@ -2,15 +2,16 @@ import threading from typing import Callable, Dict -# Importa il motore e i modelli dalla nuova posizione from ..core.engine import Engine -from ..core.models import BenchmarkState +from ..log import get_logger, TUIHandler, CrabLogger -LogCallback = Callable[[str], None] class TUIController: - def __init__(self, log_callback: LogCallback): - self.log = log_callback + def __init__(self, log_callback: Callable[[str], None]): + # Build a logger that routes records to the TUI widget + self.logger = get_logger() + tui_handler = TUIHandler(callback=log_callback) + self.logger.add_handler(tui_handler) def _prepare_environment(self, tui_settings: Dict[str, str], selected_preset: str) -> Dict[str, str]: execution_env = os.environ.copy() @@ -31,25 +32,22 @@ def _prepare_environment(self, tui_settings: Dict[str, str], selected_preset: st return execution_env def _execute_benchmark_logic(self, benchmark_config: dict, tui_settings: Dict[str, str], selected_preset: str): - self.log("[bold blue]Preparing to run benchmark...[/]") - + self.logger.info("Preparing to run benchmark...") + try: - # 1. Prepara l'ambiente execution_env = self._prepare_environment(tui_settings, selected_preset) - self.log("Environment prepared.") + self.logger.info("Environment prepared") - # 2. Istanzia ed esegui il motore - self.log("[bold red]Starting benchmark engine...[/]") - engine = Engine(log_callback=self.log) - # NOTA: il motore non ha più bisogno di un file di config, passiamo il dizionario + self.logger.info("Starting benchmark engine") + engine = Engine(logger=self.logger) engine.run( - config=benchmark_config, + config=benchmark_config, environment=execution_env, ) - self.log(f"\n[bold green]Benchmark finished successfully.[/]") + self.logger.info("Benchmark finished successfully") except Exception as e: - self.log(f"[bold red]An error occurred in the benchmark engine: {e}[/]") + self.logger.error(f"Benchmark engine error: {e}") def run_in_thread(self, benchmark_config: dict, tui_settings: Dict[str, str], selected_preset: str): thread = threading.Thread( diff --git a/wrappers/ib_send_lat.py b/wrappers/ib_send_lat.py index dfd1b526..b90f73a9 100644 --- a/wrappers/ib_send_lat.py +++ b/wrappers/ib_send_lat.py @@ -20,7 +20,6 @@ def read_data(self): # return list (size num_metrics) of variable size lists files += [file] if len(files) == 0: # cannot find the json file created by ib_send_lat - print('No output files found.') return [[] for _ in range(len(self.metadata))] samples = [] for path in files: diff --git a/wrappers/microbench_common.py b/wrappers/microbench_common.py index b1a2405d..b4c7333d 100644 --- a/wrappers/microbench_common.py +++ b/wrappers/microbench_common.py @@ -24,7 +24,6 @@ def get_path(self, name): def read_data(self): out_string = self.stdout tmp_list = [] - print(out_string.splitlines()[-1]) for line in out_string.splitlines()[2:-1]: tmp_list += [[float(x) for x in line.split(',')]] data_list = [list(x) for x in zip(*tmp_list)] diff --git a/wrappers/miniFE.py b/wrappers/miniFE.py index f3daef2e..ff32aec7 100644 --- a/wrappers/miniFE.py +++ b/wrappers/miniFE.py @@ -34,7 +34,6 @@ def read_data(self): # return list (size num_metrics) of variable size lists break if path is None: # cannot find a file yaml file created by miniFE - print('No yaml file found.') return [[] for _ in range(8)] with open(path, 'r') as file: lines = file.readlines() @@ -49,4 +48,4 @@ def get_bench_name(self): return "MiniFE" def get_bench_input(self): - return "" \ No newline at end of file + return "" From d4262befddaa554d4d2dd18d7822a3330b92f505 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 18 Mar 2026 16:02:54 +0000 Subject: [PATCH 2/2] cleanup: remove unused imports, move datetime import to top-level Co-Authored-By: Matteo Marcelletti --- src/crab/core/engine.py | 2 +- src/crab/log/logger.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/crab/core/engine.py b/src/crab/core/engine.py index bdc9bf42..fcd9ae94 100644 --- a/src/crab/core/engine.py +++ b/src/crab/core/engine.py @@ -14,7 +14,7 @@ import shutil from typing import List, Dict, Any, Optional, Union -from crab.log import CrabLogger, LogLevel, LogSource +from crab.log import CrabLogger # ============================================================================= # 1. DATA CONTAINERS & UTILITIES diff --git a/src/crab/log/logger.py b/src/crab/log/logger.py index 1eef10b8..14e93b7e 100644 --- a/src/crab/log/logger.py +++ b/src/crab/log/logger.py @@ -6,6 +6,7 @@ pluggable handlers. """ +import datetime import threading from enum import IntEnum from typing import List, Optional, TYPE_CHECKING @@ -89,7 +90,6 @@ def _emit(self, level: LogLevel, source: str, message: str) -> None: if level < self.level: return - import datetime ts = datetime.datetime.now().strftime("%H:%M:%S") record = LogRecord( level=level,