diff --git a/FF_calculation/FF_QCD.py b/FF_calculation/FF_QCD.py index 9d681a6..1f208a7 100644 --- a/FF_calculation/FF_QCD.py +++ b/FF_calculation/FF_QCD.py @@ -12,10 +12,12 @@ import ROOT import helper.ff_functions as ff_func +import helper.logging_helper as logging_helper import helper.plotting as plotting from helper.functions import RuntimeVariables +@logging_helper.grouped_logs(lambda args: f"{args[6]}") def calculation_QCD_FFs( args: Tuple[Any, ...], ) -> Dict[str, Union[str, Dict[str, str]]]: @@ -46,7 +48,7 @@ def calculation_QCD_FFs( *_, # SRlike_hists, ARlike_hists only used in ttbar calculation ) = args - log = logging.getLogger(logger) + log = logging_helper.setup_logging(logger=logging.getLogger(logger)) # init histogram dict for FF measurement SRlike_hists = dict() @@ -188,6 +190,7 @@ def calculation_QCD_FFs( return ff_func.fill_corrlib_expression(corrlib_exp, splitting.variables, splitting.split) +@logging_helper.grouped_logs(lambda args: f"{args[7]}") def non_closure_correction( args: Tuple[Any, ...], ) -> Dict[str, np.ndarray]: @@ -224,7 +227,7 @@ def non_closure_correction( for_DRtoSR, ) = args - log = logging.getLogger(logger) + log = logging_helper.setup_logging(logger=logging.getLogger(logger)) # init histogram dict for FF measurement SRlike_hists = dict() @@ -410,6 +413,7 @@ def non_closure_correction( return correction_dict +@logging_helper.grouped_logs(lambda args: f"{args[6]}") def DR_SR_correction( args: Tuple[Any, ...], ) -> Dict[str, np.ndarray]: @@ -442,7 +446,7 @@ def DR_SR_correction( corr_evaluators, ) = args - log = logging.getLogger(logger) + log = logging_helper.setup_logging(logger=logging.getLogger(logger)) # init histogram dict for FF measurement SRlike_hists = dict() diff --git a/FF_calculation/FF_Wjets.py b/FF_calculation/FF_Wjets.py index ec15e78..84caab8 100644 --- a/FF_calculation/FF_Wjets.py +++ b/FF_calculation/FF_Wjets.py @@ -12,10 +12,12 @@ import ROOT import helper.ff_functions as ff_func +import helper.logging_helper as logging_helper import helper.plotting as plotting from helper.functions import RuntimeVariables +@logging_helper.grouped_logs(lambda args: f"{args[6]}") def calculation_Wjets_FFs( args: Tuple[Any, ...], ) -> Dict[str, Union[Dict[str, str], Dict[str, Dict[str, str]]]]: @@ -46,7 +48,7 @@ def calculation_Wjets_FFs( *_, # SRlike_hists, ARlike_hists only used in ttbar calculation ) = args - log = logging.getLogger(logger) + log = logging_helper.setup_logging(logger=logging.getLogger(logger)) # init histogram dict for FF measurement SRlike_hists = dict() @@ -245,6 +247,7 @@ def calculation_Wjets_FFs( return ff_func.fill_corrlib_expression(corrlib_exp, splitting.variables, splitting.split) +@logging_helper.grouped_logs(lambda args: f"{args[7]}") def non_closure_correction( args: Tuple[Any, ...], ) -> Dict[str, np.ndarray]: @@ -281,7 +284,7 @@ def non_closure_correction( for_DRtoSR, ) = args - log = logging.getLogger(logger) + log = logging_helper.setup_logging(logger=logging.getLogger(logger)) # init histogram dict for FF measurement SRlike_hists = dict() @@ -520,6 +523,7 @@ def non_closure_correction( return correction_dict +@logging_helper.grouped_logs(lambda args: f"{args[6]}") def DR_SR_correction( args: Tuple[Any, ...], ) -> Dict[str, np.ndarray]: @@ -553,7 +557,7 @@ def DR_SR_correction( corr_evaluators, ) = args - log = logging.getLogger(logger) + log = logging_helper.setup_logging(logger=logging.getLogger(logger)) # init histogram dict for FF measurement SRlike_hists = dict() diff --git a/FF_calculation/FF_ttbar.py b/FF_calculation/FF_ttbar.py index 8ad473a..e3bf67f 100644 --- a/FF_calculation/FF_ttbar.py +++ b/FF_calculation/FF_ttbar.py @@ -11,10 +11,12 @@ import ROOT import helper.ff_functions as ff_func +import helper.logging_helper as logging_helper import helper.plotting as plotting from helper.functions import RuntimeVariables +@logging_helper.grouped_logs(lambda args: f"{args[6]}") def calculation_ttbar_FFs( args: Tuple[Any, ...], ) -> Dict[str, Union[str, Dict[str, str]]]: @@ -49,7 +51,7 @@ def calculation_ttbar_FFs( ARlike_hists, # ARlike_hists: Dict[str, ROOT.TH1D], ) = args - log = logging.getLogger(logger) + log = logging_helper.setup_logging(logger=logging.getLogger(logger)) # init histogram dict for FF measurement from MC SR_hists = dict() @@ -176,6 +178,7 @@ def calculation_ttbar_FFs( return ff_func.fill_corrlib_expression(corrlib_exp, splitting.variables, splitting.split) +@logging_helper.grouped_logs(lambda *args, **kwargs: args[4]) def calculation_FF_data_scaling_factor( config: Dict[str, Union[str, Dict, List]], process_conf: Dict[str, Union[str, Dict, List]], @@ -198,7 +201,7 @@ def calculation_FF_data_scaling_factor( Tuple of dictionaries containing the histograms for the signal-like and application-like regions """ - log = logging.getLogger(logger) + log = logging_helper.setup_logging(logger=logging.getLogger(logger)) # init histogram dict for FF data correction SRlike_hists = dict() @@ -311,6 +314,7 @@ def calculation_FF_data_scaling_factor( return SRlike_hists, ARlike_hists +@logging_helper.grouped_logs(lambda args: f"{args[7]}") def non_closure_correction( args: Tuple[Any, ...], ) -> Dict[str, np.ndarray]: @@ -351,7 +355,7 @@ def non_closure_correction( *_, # for_DRtoSR not needed for ttbar ) = args - log = logging.getLogger(logger) + log = logging_helper.setup_logging(logger=logging.getLogger(logger)) # init histogram dict for FF measurement SR_hists = dict() diff --git a/FF_calculation/fractions.py b/FF_calculation/fractions.py index 7b250f9..524fa02 100644 --- a/FF_calculation/fractions.py +++ b/FF_calculation/fractions.py @@ -10,10 +10,12 @@ import ROOT import helper.ff_functions as ff_func +import helper.logging_helper as logging_helper import helper.plotting as plotting from helper.functions import RuntimeVariables +@logging_helper.grouped_logs(lambda args: f"{args[6]}") def fraction_calculation( args: Tuple[Any, ...], ) -> Dict[str, Dict[str, Dict[str, List[float]]]]: @@ -44,7 +46,7 @@ def fraction_calculation( *_, # SRlike_hists, ARlike_hists only needed for ttbar ) = args - log = logging.getLogger(logger) + log = logging_helper.setup_logging(logger=logging.getLogger(logger)) AR_hists = dict() SR_hists = dict() diff --git a/ff_calculation.py b/ff_calculation.py index 8e5e000..aef03a6 100644 --- a/ff_calculation.py +++ b/ff_calculation.py @@ -13,6 +13,7 @@ import helper.correctionlib_json as corrlib import helper.ff_functions as ff_func import helper.functions as func +import helper.logging_helper as logging_helper from FF_calculation.fractions import fraction_calculation from helper.hooks_and_patches import Histo1DPatchedRDataFrame, PassThroughWrapper @@ -37,6 +38,11 @@ Flag to use intermediary filtered ROOT RDataFrames even if cached versions are available. """, ) +parser.add_argument( + "--log-level", + default="INFO", + help="Logging level to use. (default: INFO)", +) FF_CALCULATION_FUNCTIONS = { "QCD": FF_QCD.calculation_QCD_FFs, @@ -62,6 +68,7 @@ } +@logging_helper.grouped_logs def FF_calculation( config: Dict[str, Union[str, Dict, List]], sample_paths: List[str], @@ -134,6 +141,7 @@ def FF_calculation( return ff_func.fill_corrlib_expression(results, split_collections.split_variables) +@logging_helper.grouped_logs(lambda args: f"ff_calculation.{args[0]}") def run_ff_calculation( args: Tuple[str, Dict[str, Union[Dict, List, str]], List[str], str] ) -> Tuple[Tuple, Dict]: @@ -147,7 +155,7 @@ def run_ff_calculation( Depending on the "process" either a dictionary with fake factor function expressions or a dictionary with process fraction values """ process, config, sample_paths, output_path = args - log = logging.getLogger(f"ff_calculation.{process}") + log = logging_helper.setup_logging(logger=logging.getLogger(f"ff_calculation.{process}")) log.info(f"Calculating fake factors for the {process} process.") log.info("-" * 50) @@ -192,12 +200,9 @@ def run_ff_calculation( if "process_fractions_subleading" in config: subcategories = subcategories + ["process_fractions_subleading"] - func.setup_logger( - log_file=save_path_plots + "/ff_calculation.log", - log_name="ff_calculation", - log_level=logging.INFO, - subcategories=subcategories, - ) + logging_helper.LOG_FILENAME = save_path_plots + "/ff_calculation.log" + logging_helper.LOG_LEVEL = getattr(logging, args.log_level.upper(), logging.INFO) + log = logging_helper.setup_logging(logger=logging.getLogger("ff_calculation"), level=logging_helper.LOG_LEVEL) # getting all the ntuple input files sample_paths = func.get_samples(config=config) @@ -257,3 +262,5 @@ def run_ff_calculation( with open(os.path.join(save_path_plots, "done"), "w") as done_file: done_file.write("") + + log.info("Fake factor calculation finished successfully.") diff --git a/ff_corrections.py b/ff_corrections.py index 0109218..9595355 100644 --- a/ff_corrections.py +++ b/ff_corrections.py @@ -22,6 +22,7 @@ from ff_calculation import FF_calculation from helper.ff_evaluators import FakeFactorCorrectionEvaluator, FakeFactorEvaluator, DRSRCorrectionEvaluator from helper.hooks_and_patches import Histo1DPatchedRDataFrame, PassThroughWrapper +import helper.logging_helper as logging_helper parser = argparse.ArgumentParser() @@ -46,6 +47,11 @@ correction calculations. """, ) +parser.add_argument( + "--log-level", + default="INFO", + help="Logging level to use. (default: INFO)", +) NON_CLOSURE_CORRECTION_FUNCTIONS = { "QCD": FF_QCD.non_closure_correction, @@ -62,6 +68,7 @@ } +@logging_helper.grouped_logs def non_closure_correction( config: Dict[str, Union[str, Dict, List]], corr_config: Dict[str, Union[str, Dict]], @@ -128,6 +135,7 @@ def non_closure_correction( return ff_func.fill_corrlib_expression(results, split_collections.split_variables) +@logging_helper.grouped_logs(lambda *args, **kwargs: args[8]) def run_non_closure_correction( config: Dict[str, Union[str, Dict, List]], corr_config: Dict[str, Union[str, Dict]], @@ -160,7 +168,7 @@ def run_non_closure_correction( Dictionary with the process name as key and a dictionary with the corrections """ - log = logging.getLogger(logger) + log = logging_helper.setup_logging(logger=logging.getLogger(logger)) corrections = {process: {}} _chained_DR_SR_process_config = None if for_DRtoSR: @@ -295,6 +303,7 @@ def run_non_closure_correction( return corrections +@logging_helper.grouped_logs(lambda args: f"ff_corrections.{args[0]}") def run_ff_calculation_for_DRtoSR( args: Tuple[ str, @@ -314,7 +323,7 @@ def run_ff_calculation_for_DRtoSR( If a DR to SR correction is defined for the "process" a dictionary with fake factor function expressions is returned, otherwise None is returned """ process, config, corr_config, sample_paths, output_path = args - log = logging.getLogger(f"ff_corrections.{process}") + log = logging_helper.setup_logging(logger=logging.getLogger(f"ff_corrections.{process}")) if "DR_SR" in corr_config["target_processes"][process]: ff_config = copy.deepcopy(config) @@ -340,6 +349,7 @@ def run_ff_calculation_for_DRtoSR( return args, result +@logging_helper.grouped_logs(lambda args: f"ff_corrections.{args[0]}") def run_non_closure_correction_for_DRtoSR( args: Tuple[ str, @@ -366,7 +376,7 @@ def run_non_closure_correction_for_DRtoSR( """ process, config, corr_config, sample_paths, output_path = args - log = logging.getLogger(f"ff_corrections.{process}") + log = logging_helper.setup_logging(logger=logging.getLogger(f"ff_corrections.{process}")) corrections = {process: dict()} process_config = deepcopy(corr_config["target_processes"][process]) @@ -401,6 +411,7 @@ def run_non_closure_correction_for_DRtoSR( return args, corrections +@logging_helper.grouped_logs(lambda args: f"ff_corrections.{args[0]}") def run_correction( args, ) -> Dict[str, Dict[str, Any]]: @@ -426,7 +437,7 @@ def run_correction( save_path, ) = args - log = logging.getLogger(f"ff_corrections.{process}") + log = logging_helper.setup_logging(logger=logging.getLogger(f"ff_corrections.{process}")) corrections = {process: dict()} var_dependences = [config["target_processes"][process]["var_dependence"]] + list(config["target_processes"][process]["split_categories"].keys()) @@ -590,12 +601,9 @@ def run_correction( func.configured_yaml.dump(corr_config, config_file) # start output logging - func.setup_logger( - log_file=save_path + "/ff_corrections.log", - log_name="ff_corrections", - log_level=logging.INFO, - subcategories=corr_config["target_processes"].keys(), - ) + logging_helper.LOG_FILENAME = save_path + "/ff_corrections.log" + logging_helper.LOG_LEVEL = getattr(logging, args.log_level.upper(), logging.INFO) + log = logging_helper.setup_logging(logger=logging.getLogger("ff_corrections"), level=logging_helper.LOG_LEVEL) # getting all the input files sample_paths = func.get_samples(config=config) @@ -738,3 +746,5 @@ def run_correction( with open(os.path.join(save_path, "done"), "w") as done_file: done_file.write("") + + log.info("Fake factor correction calculation finished successfully.") diff --git a/helper/ff_evaluators.py b/helper/ff_evaluators.py index 99e75da..860357f 100644 --- a/helper/ff_evaluators.py +++ b/helper/ff_evaluators.py @@ -1,11 +1,13 @@ import logging import os -from typing import Any, Dict, List, Union, Tuple +from typing import Any, Dict, List, Tuple, Union import correctionlib import correctionlib.schemav2 as cs import ROOT +import helper.logging_helper as logging_helper + class FakeFactorEvaluator: """ @@ -21,7 +23,7 @@ def loading_from_file( for_DRtoSR: bool, logger: str, ) -> "FakeFactorEvaluator": - log = logging.getLogger(logger) + log = logging_helper.setup_logging(logger=logging.getLogger(logger)) directories = ["workdir", config["workdir_name"], config["era"]] if not for_DRtoSR: @@ -54,7 +56,7 @@ def loading_from_CorrectionSet( for_DRtoSR: bool, logger: str, ) -> "FakeFactorEvaluator": - log = logging.getLogger(logger) + log = logging_helper.setup_logging(logger=logging.getLogger(logger)) assert isinstance(fake_factors, cs.CorrectionSet), "face_factors must be of type correctionlib.schemav2.CorrectionSet" literal = fake_factors.json().replace('"', r'\"') @@ -128,7 +130,7 @@ def loading_from_file( for_DRtoSR: bool, logger: str, ) -> "FakeFactorCorrectionEvaluator": - log = logging.getLogger(logger) + log = logging_helper.setup_logging(logger=logging.getLogger(logger)) _for_DRtoSR = "for_DRtoSR" if for_DRtoSR else "" directories = ["workdir", config["workdir_name"], config["era"]] @@ -164,7 +166,7 @@ def loading_from_CorrectionSet( for_DRtoSR: bool, logger: str, ) -> "FakeFactorCorrectionEvaluator": - log = logging.getLogger(logger) + log = logging_helper.setup_logging(logger=logging.getLogger(logger)) assert isinstance(correction, cs.CorrectionSet), "Correction must be of type correctionlib.schemav2.CorrectionSet" @@ -260,7 +262,7 @@ def loading_from_file( Returns: An instance of the DRSRCorrectionEvaluator class. """ - log = logging.getLogger(logger) + log = logging_helper.setup_logging(logger=logging.getLogger(logger)) directories = ["workdir", config["workdir_name"], config["era"]] path = os.path.join(*directories, f"FF_corrections_{config['channel']}.json") @@ -296,7 +298,7 @@ def loading_from_CorrectionSet( Returns: An instance of the DRSRCorrectionEvaluator class. """ - log = logging.getLogger(logger) + log = logging_helper.setup_logging(logger=logging.getLogger(logger)) assert isinstance(correction, cs.CorrectionSet), "Correction must be of type correctionlib.schemav2.CorrectionSet" diff --git a/helper/ff_functions.py b/helper/ff_functions.py index 26d6cd1..36eb3b1 100644 --- a/helper/ff_functions.py +++ b/helper/ff_functions.py @@ -21,6 +21,7 @@ import configs.general_definitions as gd import helper.fitting_helper as fitting_helper import helper.functions as func +import helper.logging_helper as logging_helper import helper.weights as weights from configs.general_definitions import random_seed from helper.hooks_and_patches import (_EXTRA_PARAM_COUNTS, _EXTRA_PARAM_FLAG, @@ -53,7 +54,8 @@ def cache_rdf_snapshot(cache_dir: str = "./.RDF_CACHE") -> Callable: def decorator(function: Callable) -> Callable: @functools.wraps(function) def wrapper(*args: Any, **kwargs: Any) -> ROOT.RDataFrame: - log = logging.getLogger(kwargs.get("logger") or function.__module__ + '.' + function.__name__) + log_name = kwargs.get("logger") or function.__module__ + '.' + function.__name__ + log = logging_helper.setup_logging(logger=logging.getLogger(log_name)) tree_name = "ntuple" if "rdf" in kwargs: @@ -777,13 +779,13 @@ def apply_region_filters( rdf = weights.apply_btag_weight(rdf=rdf) rdf = rdf.Filter(f"({sum_cuts['bb_selection']})", "cut on bb pair") - log = logging.getLogger(logger) + log = logging_helper.setup_logging(logger=logging.getLogger(logger)) # redirecting C++ stdout for Report() to python stdout out = StringIO() with pipes(stdout=out, stderr=STDOUT): rdf.Report().Print() - log.info(out.getvalue()) - log.info("-" * 50) + log.debug(out.getvalue()) + log.debug("-" * 50) return rdf diff --git a/helper/fitting_helper.py b/helper/fitting_helper.py index d7196df..e416e90 100644 --- a/helper/fitting_helper.py +++ b/helper/fitting_helper.py @@ -9,6 +9,8 @@ import ROOT from wurlitzer import STDOUT, pipes +import helper.logging_helper as logging_helper + def _get_gradients( x: Union[List[float], str], @@ -338,7 +340,7 @@ def get_wrapped_functions_from_fits( - "mc_up": The best fit function including the upper error from MC subtraction. - "mc_down": The best fit function including the lower error from MC subtraction """ - log = logging.getLogger(logger) + log = logging_helper.setup_logging(logger=logging.getLogger(logger)) a, b = bounds TF1s, Fits = dict(), dict() @@ -545,7 +547,7 @@ def get_wrapped_hists( - "mc_up": measured histogram including the upper error from MC subtraction. - "mc_down": measured histogram including the lower error from MC subtraction """ - log = logging.getLogger(logger) + log = logging_helper.setup_logging(logger=logging.getLogger(logger)) if verbose: log.info("Measured histograms directly instead of a fit.") log.info("-" * 50) diff --git a/helper/functions.py b/helper/functions.py index 3709073..d7acffd 100644 --- a/helper/functions.py +++ b/helper/functions.py @@ -21,6 +21,8 @@ from ruamel.yaml.scalarstring import DoubleQuotedScalarString from XRootD import client +import helper.logging_helper as logging_helper + class CachingKeyHelper: @staticmethod @@ -375,12 +377,15 @@ def optional_process_pool( """ + log = logging_helper.setup_logging(logger=logging.getLogger(__name__)) + if len(args_list) == 1 or not RuntimeVariables.USE_MULTIPROCESSING: results = [function(args) for args in args_list] else: n = max_workers if max_workers is not None else len(args_list) - with concurrent.futures.ProcessPoolExecutor(max_workers=n) as executor: - results = list(executor.map(function, args_list)) + with logging_helper.LogContext(log).parallel_session() as pool_config: + with concurrent.futures.ProcessPoolExecutor(max_workers=n, **pool_config) as executor: + results = list(executor.map(function, args_list)) return results @@ -453,49 +458,6 @@ def check_path(path: str) -> None: os.makedirs(path, exist_ok=True) -def setup_logger( - log_file: str, log_name: str, log_level: int, subcategories: Union[List[str], None] = None -) -> None: - """ - Setting up all relevant loggers and handlers. - - Args: - log_file: Name of the file the logging information will be stored in - log_name: General name of the logger - log_level: Level of the logger, e.g. logging.INFO, logging.DEBUG, etc. - subcategories: List of different sub logger names e.g. can be used to differentiate between processes (default: None) - - Return: - None - """ - # create file handler - fh = logging.FileHandler(log_file) - fh.setLevel(log_level) - # create console handler with a higher log level - ch = logging.StreamHandler() - ch.setLevel(log_level) - # create formatter and add it to the handlers - formatter = logging.Formatter( - "%(asctime)s - %(name)s - %(levelname)s - %(message)s" - ) - ch.setFormatter(formatter) - fh.setFormatter(formatter) - - if subcategories is not None: - for cat in subcategories: - log = logging.getLogger(f"{log_name}.{cat}") - log.setLevel(log_level) - # add the handlers to logger - log.addHandler(ch) - log.addHandler(fh) - else: - log = logging.getLogger(f"{log_name}") - log.setLevel(log_level) - # add the handlers to logger - log.addHandler(ch) - log.addHandler(fh) - - def get_ntuples(config: Dict, process: str, sample: str) -> List[str]: """ This function generates a list of paths of all ntuples for a specific sample of a process. @@ -508,7 +470,7 @@ def get_ntuples(config: Dict, process: str, sample: str) -> List[str]: Return: List of file paths """ - log = logging.getLogger(f"preselection.{process}") + log = logging_helper.setup_logging(logger=logging.getLogger(f"preselection.{process}")) sample_path = os.path.join( config["ntuple_path"], config["era"], sample, config["channel"] ) @@ -540,7 +502,7 @@ def check_inputfiles(path: str, process: str, tree: str) -> List[str]: Return: List of file paths with not empty files """ - log = logging.getLogger(f"preselection.{process}") + log = logging_helper.setup_logging(logger=logging.getLogger(f"preselection.{process}")) fsname = "root://cmsdcache-kit-disk.gridka.de/" xrdclient = client.FileSystem(fsname) @@ -707,7 +669,7 @@ def get_samples(config: Dict[str, Union[str, Dict, List]]) -> List[str]: Return: List of all paths to the relevant samples """ - log = logging.getLogger("ff_calc") + log = logging_helper.setup_logging(logger=logging.getLogger("ff_calc")) general_sample_path = os.path.join( config["file_path"], "preselection", config["era"], config["channel"], "*.root" diff --git a/helper/logging_helper.py b/helper/logging_helper.py new file mode 100644 index 0000000..f52a7f5 --- /dev/null +++ b/helper/logging_helper.py @@ -0,0 +1,500 @@ +import inspect +import io +import logging +import logging.handlers +import multiprocessing +import os +from contextlib import contextmanager, redirect_stderr, redirect_stdout +from datetime import datetime +from logging import LogRecord +from time import localtime, strftime +from typing import Generator, List, Optional, Type, Union + +from rich.console import Console, ConsoleRenderable +from rich.live import Live +from rich.logging import RichHandler +from rich.text import Text +from rich.traceback import Traceback +from tqdm import tqdm +import functools + + +LOG_FILENAME = "routine_output.log" +LOG_LEVEL = logging.INFO +CONSOLE = Console() + +GRAY = "\x1b[38;21m" +WHITE = "\x1b[38;5;15m" +YELLOW = "\x1b[38;5;226m" +RED = "\x1b[38;5;196m" +BOLD_RED = "\x1b[31;1m" +RESET = "\x1b[0m" + + +def is_in_ipython(): + try: + from IPython import get_ipython + if get_ipython() is not None: + return True + except ImportError: + pass + return False + + +def capture_rich_renderable_as_string(renderable, width: int = 200) -> str: + string_io = io.StringIO() + capture_console = Console(file=string_io, record=True, width=width) + capture_console.print(renderable) + return string_io.getvalue() + + +def worker_init(log_queue: multiprocessing.Queue, level: int = logging.INFO): + root = logging.getLogger() + if root.hasHandlers(): + root.handlers.clear() + root.setLevel(level) + handler = logging.handlers.QueueHandler(log_queue) + root.addHandler(handler) + + +def grouped_logs(arg=None): + def _apply_logging(func, extractor): + @functools.wraps(func) + def wrapper(*args, **kwargs): + try: + if extractor: + target = extractor(*args, **kwargs) + if isinstance(target, logging.Logger): + logger = target + name = target.name + else: + name = str(target) + logger = logging.getLogger(name) + else: + name = func.__name__ + logger = logging.getLogger(name) + except Exception: + name = func.__name__ + logger = logging.getLogger(name) + + with LogContext(logger).grouped_logs(name): + return func(*args, **kwargs) + + return wrapper + + if callable(arg) and getattr(arg, "__name__", "") != "": + return _apply_logging(arg, None) + + extractor = arg + + def decorator(func): + return _apply_logging(func, extractor) + + return decorator + + +class BufferedWorkerHandler(logging.Handler): + def __init__(self): + super().__init__() + self.records = [] + + def emit(self, record): + self.records.append(record) + + +class RealtimeInjector(logging.Filter): + def filter(self, record: logging.LogRecord) -> bool: + if not getattr(record, 'summary', False): + record.realtime = True + return True + + +class ConsoleDisplayFilter(logging.Filter): + def filter(self, record: logging.LogRecord) -> bool: + return not getattr(record, 'summary', False) + + +class FileDisplayFilter(logging.Filter): + def filter(self, record: logging.LogRecord) -> bool: + return not getattr(record, 'realtime', False) + + +class NoFileOnlyFilter(logging.Filter): + def filter(self, record: logging.LogRecord) -> bool: + return not getattr(record, 'file_only', False) + + +class CustomRichHandler(RichHandler): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def render_message(self, record: LogRecord, message: str) -> ConsoleRenderable: + if getattr(record, 'summary', False): + return Text.from_markup(message) if self.markup else Text(message) + message_renderable = super().render_message(record, message) + return Text.assemble( + Text(f"{record.name} ", style="bold cyan"), + message_renderable, + ) + + +class MarkupStrippingRichHandler(CustomRichHandler): + def render_message(self, record: LogRecord, message: str) -> ConsoleRenderable: + """Strips markup from the message before passing it to the parent renderer.""" + plain_message = Text.from_markup(message).plain + return super().render_message(record, plain_message) + + def emit(self, record: LogRecord): + if getattr(record, 'summary', False): + try: + msg = self.format(record) + plain_msg = Text.from_markup(msg).plain + self.console.file.write(plain_msg + "\n") + self.console.file.flush() + except Exception: + self.handleError(record) + return + super().emit(record) + + def render(self, *, record: LogRecord, traceback: Optional[Traceback], message_renderable: ConsoleRenderable) -> ConsoleRenderable: + if getattr(record, 'summary', False): + time_format = self._log_render.time_format + time_str = datetime.fromtimestamp(record.created).strftime(time_format) + + level_text = self.get_level_text(record) + level_str = level_text.plain.ljust(8) + plain_str = str(message_renderable) + lines = plain_str.split('\n') + indented_lines = [' ' + line for line in lines] + indented_str = '\n'.join(indented_lines) + indented_message = Text(indented_str) + return Text.assemble(Text(f"{time_str} {level_str}\n"), indented_message) + else: + return super().render(record=record, traceback=traceback, message_renderable=message_renderable) + + +class _DuplicateFilter: + def __init__(self) -> None: + self.msgs = set() + + def filter(self, record: logging.LogRecord) -> bool: + if record.msg in self.msgs: + return False + self.msgs.add(record.msg) + return True + + +def setup_logging( + output_file: Union[str, None] = None, + logger: logging.Logger = logging.getLogger(""), + level: Union[int, None] = None, + console_markup: bool = False, + queue: Union[multiprocessing.Queue, None] = None, +) -> logging.Logger: + + if queue is not None: + if logger.hasHandlers(): + logger.handlers.clear() + logger.setLevel(level or LOG_LEVEL) + handler = logging.handlers.QueueHandler(queue) + logger.addHandler(handler) + logger.propagate = False + return logger + + root = logging.getLogger() + if any(isinstance(h, logging.handlers.QueueHandler) for h in root.handlers): + logger.setLevel(level or LOG_LEVEL) + return logger + + if output_file is None: + output_file = LOG_FILENAME + if level is None: + level = LOG_LEVEL + + if logger.hasHandlers(): + logger.handlers.clear() + + logger.setLevel(level) + + console_handler = CustomRichHandler( + console=CONSOLE, + rich_tracebacks=True, + show_time=True, + show_level=True, + show_path=True, + log_time_format="[%Y-%m-%d %H:%M:%S]", + markup=console_markup, + ) + console_handler.addFilter(NoFileOnlyFilter()) + console_handler.addFilter(ConsoleDisplayFilter()) + logger.addHandler(console_handler) + + log_file = open(output_file, "a") + file_console = Console(file=log_file, record=True, width=172) + file_handler = MarkupStrippingRichHandler( + console=file_console, + show_time=True, + show_level=True, + show_path=True, + log_time_format="[%Y-%m-%d %H:%M:%S.%f]", + rich_tracebacks=False, + markup=False, + ) + file_handler.addFilter(FileDisplayFilter()) + logger.addHandler(file_handler) + + # Install the duplicate filter permanently if not already present. + if not any(isinstance(f, _DuplicateFilter) for f in logger.filters): + logger.addFilter(_DuplicateFilter()) + + return logger + + +class _UnclosableStream: + def __init__(self, stream): + self._stream = stream + + def write(self, *args, **kwargs): + return self._stream.write(*args, **kwargs) + + def flush(self, *args, **kwargs): + return self._stream.flush(*args, **kwargs) + + def close(self): + pass + + def __getattr__(self, name): + return getattr(self._stream, name) + + +class TqdmRichLiveIO(io.StringIO): + def __init__(self, live_instance: Live, handler: RichHandler, logger_name: str): + super().__init__() + self.live = live_instance + self.handler = handler + self.logger_name = logger_name + self.last_text = "" + self._find_caller() + + def _find_caller(self): + self.caller_filename = "" + self.caller_lineno = 0 + for frame_info in inspect.stack(): + if 'tqdm' not in frame_info.filename and __file__ not in frame_info.filename: + self.caller_filename = frame_info.filename + self.caller_lineno = frame_info.lineno + break + + def write(self, s: str): + text = s.strip() + if text and text != self.last_text: + self.last_text = text + record = logging.LogRecord( + name=self.logger_name, + level=logging.INFO, + pathname=self.caller_filename, + lineno=self.caller_lineno, + msg=text, + args=(), + exc_info=None, + ) + message_renderable = self.handler.render_message(record, text) + full_renderable = self.handler.render( + record=record, + traceback=None, + message_renderable=message_renderable + ) + self.live.update(full_renderable) + + def flush(self): + pass + + +class LogContext: + + def __init__(self, logger: logging.Logger) -> None: + self.logger = logger + + @contextmanager + def grouped_logs(self, worker_name: str) -> Generator[None, None, None]: + class SimpleBuffer(logging.Handler): + def __init__(self): + super().__init__() + self.buffer = [] + + def emit(self, record): + msg_text = self.format(record) + + filename = os.path.basename(record.pathname) + path_info = f"{filename}:{record.lineno}" + + formatted_lines, lines, target_width, n_indent = [], msg_text.split("\n"), 172, 4 + + for i, line in enumerate(lines): + indented_line = " " * n_indent + line + if i == 0: + current_len = len(indented_line) + needed_padding = target_width - current_len - len(path_info) + if needed_padding < 2: + needed_padding = 2 + full_line = f"{indented_line}{' ' * needed_padding}{path_info}" + else: + full_line = indented_line + formatted_lines.append(full_line) + + self.buffer.append("\n".join(formatted_lines)) + + buffer = SimpleBuffer() + buffer.setFormatter(logging.Formatter( + "[%(asctime)s.%(msecs)03d] %(levelname)-8s %(name)s: %(message)s", + datefmt="%H:%M:%S" + )) + injector = RealtimeInjector() + self.logger.addHandler(buffer) + self.logger.addFilter(injector) + try: + yield + finally: + self.logger.removeHandler(buffer) + self.logger.removeFilter(injector) + if buffer.buffer: + block = "\n".join(buffer.buffer) + header = f"[bold cyan]{'=' * 30} START WORKER: {worker_name} {'=' * 30}[/]" + footer = f"[bold cyan]{'=' * 31} END WORKER: {worker_name} {'=' * 31}[/]" + self.logger.info(f"{header}\n{block}\n{footer}", extra={'summary': True}) + + @contextmanager + def parallel_session(self) -> Generator[dict, None, None]: + manager = multiprocessing.Manager() + log_queue = manager.Queue() + + listener = logging.handlers.QueueListener( + log_queue, + *self.logger.handlers, + respect_handler_level=True + ) + listener.start() + pool_config = { + "initializer": worker_init, + "initargs": (log_queue, self.logger.level) + } + + try: + yield pool_config + finally: + listener.stop() + manager.shutdown() + + @contextmanager + def redirect_tqdm(self) -> Generator[None, None, None]: + handler = None + for h in self.logger.handlers: + if isinstance(h, RichHandler) and hasattr(h, 'console'): + handler = h + break + + if not handler: + raise ValueError("A RichHandler with a console attribute must be attached to the logger.") + + console = handler.console + original_init = tqdm.__init__ + + with Live(console=console, transient=True, refresh_per_second=20) as live: + tqdm_io = TqdmRichLiveIO(live, handler, self.logger.name) + + def patched_init(self, *args, **kwargs): + if 'file' not in kwargs: + kwargs['file'] = _UnclosableStream(tqdm_io) + if 'disable' not in kwargs: + kwargs['disable'] = False + + original_init(self, *args, **kwargs) + + tqdm.__init__ = patched_init + try: + yield + finally: + tqdm.__init__ = original_init + + if tqdm_io.last_text: + self.logger.info(tqdm_io.last_text, stacklevel=3) + + @contextmanager + def suppress_console_logging(self) -> Generator[None, None, None]: + original_handlers = list(self.logger.handlers) # Make a copy + console_handlers_to_remove: List[logging.Handler] = [] + + for handler in original_handlers: + if isinstance(handler, logging.StreamHandler) and not isinstance(handler, logging.FileHandler): + console_handlers_to_remove.append(handler) + self.logger.removeHandler(handler) + + try: + yield + finally: + for handler in console_handlers_to_remove: + if handler not in self.logger.handlers: # Avoid adding duplicates if somehow re-added + self.logger.addHandler(handler) + + current_handlers = list(self.logger.handlers) + for handler in original_handlers: + if handler not in current_handlers and handler not in console_handlers_to_remove: + self.logger.addHandler(handler) + + @contextmanager + def duplicate_filter(self) -> Generator[None, None, None]: + if any(isinstance(f, _DuplicateFilter) for f in self.logger.filters): + yield + else: + dup_filter = _DuplicateFilter() + self.logger.addFilter(dup_filter) + try: + yield + finally: + self.logger.removeFilter(dup_filter) + + @contextmanager + def logging_raised_Error(self) -> Generator[None, None, None]: + try: + yield + except Exception as e: + self.logger.error(e) + raise + + @contextmanager + def set_logging_level(self, level: int) -> Generator[None, None, None]: + _old_level = self.logger.level + self.logger.setLevel(level) + try: + yield + finally: + self.logger.setLevel(_old_level) + + @contextmanager + def suppress_logging(self) -> Generator[None, None, None]: + original_level = self.logger.level + self.logger.setLevel(logging.CRITICAL + 1) + try: + yield + finally: + self.logger.setLevel(original_level) + + @contextmanager + def log_and_suppress(self, *exceptions: Type[Exception], msg: str = "An exception was suppressed"): + try: + yield + except exceptions or (Exception,) as e: + self.logger.error(f"{msg}: {type(e).__name__} - {e}", exc_info=True) + + @contextmanager + def suppress_terminal_print(self) -> Generator[None, None, None]: + if is_in_ipython(): + from IPython.display import display + from ipywidgets import Output + + out = Output() + with out: + yield + else: + with open(os.devnull, 'w') as f, redirect_stdout(f), redirect_stderr(f): + yield diff --git a/helper/plotting.py b/helper/plotting.py index 5ec5d4c..9fb707d 100644 --- a/helper/plotting.py +++ b/helper/plotting.py @@ -18,6 +18,7 @@ import configs.general_definitions as gd import helper.ff_functions as func +import helper.logging_helper as logging_helper hep.style.use(hep.style.CMS) @@ -361,7 +362,7 @@ def plot_FFs( Return: None """ - log = logging.getLogger(logger) + log = logging_helper.setup_logging(logger=logging.getLogger(logger)) ff_ratio = deepcopy(ff_ratio) uncertainties = deepcopy(uncertainties) @@ -514,7 +515,7 @@ def plot_data_mc_ratio( Return: None """ - log = logging.getLogger(logger) + log = logging_helper.setup_logging(logger=logging.getLogger(logger)) hists = {k: v.Clone() for k, v in hists.items()} # clone method accounts for center-of-mass x values @@ -728,7 +729,7 @@ def plot_fractions( Return: None """ - log = logging.getLogger(logger) + log = logging_helper.setup_logging(logger=logging.getLogger(logger)) hists = {k: v.Clone() for k, v in hists.items()} @@ -848,7 +849,7 @@ def plot_correction( Return: None """ - log = logging.getLogger(logger) + log = logging_helper.setup_logging(logger=logging.getLogger(logger)) corr_hist = deepcopy(corr_hist) corr_graph = deepcopy(corr_graph) diff --git a/preselection.py b/preselection.py index ce8be61..73b223a 100644 --- a/preselection.py +++ b/preselection.py @@ -14,6 +14,7 @@ import helper.filters as filters import helper.functions as func +import helper.logging_helper as logging_helper import helper.weights as weights parser = argparse.ArgumentParser() @@ -38,8 +39,14 @@ action="store_true", help="Flag to disable multiprocessing for debugging purposes.", ) +parser.add_argument( + "--log-level", + default="INFO", + help="Logging level to use. (default: INFO)", +) +@logging_helper.grouped_logs(lambda args: f"preselection.{args[0]}") def run_sample_preselection(args: Tuple[str, Dict[str, Union[Dict, List, str]], int, str, str]) -> Tuple[str, str]: """ This function can be used for multiprocessing. It runs the preselection step for a specified process. @@ -52,7 +59,7 @@ def run_sample_preselection(args: Tuple[str, Dict[str, Union[Dict, List, str]], A tuple with the tau gen. level mode and the name of the output file """ process, config, output_path, ncores, sample, tau_gen_mode = args - log = logging.getLogger(f"preselection.{process}") + log = logging_helper.setup_logging(logger=logging.getLogger(f"preselection.{process}")) ROOT.EnableImplicitMT(ncores) # loading ntuple files @@ -75,7 +82,7 @@ def run_sample_preselection(args: Tuple[str, Dict[str, Union[Dict, List, str]], chain.AddFriend(fchain) rdf = ROOT.RDataFrame(chain) - + if func.rdf_is_empty(rdf=rdf): log.info(f"WARNING: Sample {sample} is empty. Skipping...") return () @@ -186,6 +193,7 @@ def run_sample_preselection(args: Tuple[str, Dict[str, Union[Dict, List, str]], return (tau_gen_mode, tmp_file_name) +@logging_helper.grouped_logs(lambda args: f"preselection.{args[0]}") def run_preselection(args: Tuple[str, Dict[str, Union[Dict, List, str]], str, int]) -> None: """ This function can be used for multiprocessing. It runs the preselection step for a specified process. @@ -198,7 +206,7 @@ def run_preselection(args: Tuple[str, Dict[str, Union[Dict, List, str]], str, in None """ process, config, output_path, ncores = args - log = logging.getLogger(f"preselection.{process}") + log = logging_helper.setup_logging(logger=logging.getLogger(f"preselection.{process}")) log.info(f"Processing process: {process}") # bookkeeping of samples files due to splitting based on the tau origin (genuine, jet fake, lepton fake) @@ -270,12 +278,9 @@ def run_preselection(args: Tuple[str, Dict[str, Union[Dict, List, str]], str, in ) func.check_path(path=output_path) - func.setup_logger( - log_file=output_path + "/preselection.log", - log_name="preselection", - log_level=logging.INFO, - subcategories=config["processes"], - ) + logging_helper.LOG_FILENAME = output_path + "/preselection.log" + logging_helper.LOG_LEVEL = getattr(logging, args.log_level.upper(), logging.INFO) + log = logging_helper.setup_logging(logger=logging.getLogger(__name__), level=logging_helper.LOG_LEVEL) # get needed features for fake factor calculation output_features = config["output_features"] @@ -291,6 +296,8 @@ def run_preselection(args: Tuple[str, Dict[str, Union[Dict, List, str]], str, in for wp in config["tau_vs_jet_wgt_wps"]: output_features.append("id_wgt_tau_vsJet_" + wp + "_1") + log.info(f"Used output features: {output_features}") + # going through all wanted processes and run the preselection function with a pool of 8 workers args_list = [(process, config, output_path, int(args.ncores)) for process in config["processes"]] @@ -302,3 +309,5 @@ def run_preselection(args: Tuple[str, Dict[str, Union[Dict, List, str]], str, in # dumping config to output directory for documentation with open(output_path + "/config.yaml", "w") as config_file: func.configured_yaml.dump(config, config_file) + + log.info("Preselection finished.") diff --git a/preselection_boosted.py b/preselection_boosted.py index 6b38216..554d292 100644 --- a/preselection_boosted.py +++ b/preselection_boosted.py @@ -14,6 +14,7 @@ import helper.filters as filters import helper.functions as func +import helper.logging_helper as logging_helper import helper.weights as weights parser = argparse.ArgumentParser() @@ -38,8 +39,13 @@ action="store_true", help="Flag to disable multiprocessing for debugging purposes.", ) +parser.add_argument( + "--log-level", + default="INFO", + help="Logging level to use. (default: INFO)", +) - +@logging_helper.grouped_logs(lambda args: f"preselection.{args[0]}") def run_sample_preselection(args: Tuple[str, Dict[str, Union[Dict, List, str]], int, str, str]) -> Tuple[str, str]: """ This function can be used for multiprocessing. It runs the preselection step for a specified process. @@ -52,7 +58,7 @@ def run_sample_preselection(args: Tuple[str, Dict[str, Union[Dict, List, str]], A tuple with the tau gen. level mode and the name of the output file """ process, config, output_path, ncores, sample, tau_gen_mode = args - log = logging.getLogger(f"preselection.{process}") + log = logging_helper.setup_logging(logger=logging.getLogger(f"preselection.{process}")) ROOT.EnableImplicitMT(ncores) # loading ntuple files @@ -191,6 +197,7 @@ def run_sample_preselection(args: Tuple[str, Dict[str, Union[Dict, List, str]], return (tau_gen_mode, tmp_file_name) +@logging_helper.grouped_logs(lambda args: f"preselection.{args[0]}") def run_preselection(args: Tuple[str, Dict[str, Union[Dict, List, str]], str, int]) -> None: """ This function can be used for multiprocessing. It runs the preselection step for a specified process. @@ -203,7 +210,7 @@ def run_preselection(args: Tuple[str, Dict[str, Union[Dict, List, str]], str, in None """ process, config, output_path, ncores = args - log = logging.getLogger(f"preselection.{process}") + log = logging_helper.setup_logging(logger=logging.getLogger(f"preselection.{process}")) log.info(f"Processing process: {process}") # bookkeeping of samples files due to splitting based on the tau origin (genuine, jet fake, lepton fake) @@ -275,12 +282,9 @@ def run_preselection(args: Tuple[str, Dict[str, Union[Dict, List, str]], str, in ) func.check_path(path=output_path) - func.setup_logger( - log_file=output_path + "/preselection.log", - log_name="preselection", - log_level=logging.INFO, - subcategories=config["processes"], - ) + logging_helper.LOG_FILENAME = output_path + "/preselection.log" + logging_helper.LOG_LEVEL = getattr(logging, args.log_level.upper(), logging.INFO) + log = logging_helper.setup_logging(logger=logging.getLogger(__name__), level=logging_helper.LOG_LEVEL) # get needed features for fake factor calculation output_features = config["output_features"] @@ -296,6 +300,8 @@ def run_preselection(args: Tuple[str, Dict[str, Union[Dict, List, str]], str, in for wp in config["tau_iso_wgt_wps"]: output_features.append("id_wgt_boostedtau_iso_" + wp + "_1") + log.info(f"Used output features: {output_features}") + # going through all wanted processes and run the preselection function with a pool of 8 workers args_list = [(process, config, output_path, int(args.ncores)) for process in config["processes"]] @@ -307,3 +313,5 @@ def run_preselection(args: Tuple[str, Dict[str, Union[Dict, List, str]], str, in # dumping config to output directory for documentation with open(output_path + "/config.yaml", "w") as config_file: func.configured_yaml.dump(config, config_file) + + log.info("Preselection finished.")