diff --git a/.all-contributorsrc b/.all-contributorsrc index 09e5078cf..78194e308 100644 --- a/.all-contributorsrc +++ b/.all-contributorsrc @@ -71,7 +71,8 @@ "profile": "https://github.com/Bogdan-Wiederspan", "contributions": [ "code", - "test" + "test", + "review" ] }, { @@ -153,7 +154,8 @@ "avatar_url": "https://avatars.githubusercontent.com/u/99343616?v=4", "profile": "https://github.com/aalvesan", "contributions": [ - "code" + "code", + "review" ] }, { diff --git a/README.md b/README.md index 5c6368a59..43abbe91b 100644 --- a/README.md +++ b/README.md @@ -138,7 +138,7 @@ For a better overview of the tasks that are triggered by the commands below, che Daniel Savoiu
Daniel Savoiu

💻 👀 pkausw
pkausw

💻 👀 nprouvost
nprouvost

💻 ⚠️ - Bogdan-Wiederspan
Bogdan-Wiederspan

💻 ⚠️ + Bogdan-Wiederspan
Bogdan-Wiederspan

💻 ⚠️ 👀 Tobias Kramer
Tobias Kramer

💻 👀 @@ -151,7 +151,7 @@ For a better overview of the tasks that are triggered by the commands below, che JulesVandenbroeck
JulesVandenbroeck

💻 - Ana Andrade
Ana Andrade

💻 + Ana Andrade
Ana Andrade

💻 👀 philippgadow
philippgadow

💻 Lukas Schaller
Lukas Schaller

💻 diff --git a/analysis_templates/cms_minimal/law.cfg b/analysis_templates/cms_minimal/law.cfg index 06307d72f..d2db0c3aa 100644 --- a/analysis_templates/cms_minimal/law.cfg +++ b/analysis_templates/cms_minimal/law.cfg @@ -30,7 +30,7 @@ default_dataset: st_tchannel_t_4f_powheg calibration_modules: columnflow.calibration.cms.{jets,met,tau}, __cf_module_name__.calibration.example selection_modules: columnflow.selection.empty, columnflow.selection.cms.{json_filter,met_filters}, __cf_module_name__.selection.example reduction_modules: columnflow.reduction.default, __cf_module_name__.reduction.example -production_modules: columnflow.production.{categories,matching,normalization,processes}, columnflow.production.cms.{btag,electron,jet,matching,mc_weight,muon,pdf,pileup,scale,parton_shower,seeds}, __cf_module_name__.production.example +production_modules: columnflow.production.{categories,matching,normalization,processes}, columnflow.production.cms.{btag,electron,jet,matching,mc_weight,muon,pdf,pileup,scale,parton_shower,seeds,gen_particles}, __cf_module_name__.production.example categorization_modules: __cf_module_name__.categorization.example hist_production_modules: columnflow.histogramming.default, __cf_module_name__.histogramming.example ml_modules: columnflow.ml, __cf_module_name__.ml.example diff --git a/columnflow/calibration/cms/egamma.py b/columnflow/calibration/cms/egamma.py index fc31a289e..137735329 100644 --- a/columnflow/calibration/cms/egamma.py +++ b/columnflow/calibration/cms/egamma.py @@ -1,649 +1,245 @@ # coding: utf-8 """ -Egamma energy correction methods. -Source: https://twiki.cern.ch/twiki/bin/view/CMS/EgammSFandSSRun3#Scale_And_Smearings_Correctionli +CMS-specific calibrators applying electron and photon energy scale and smearing. + +1. Scale corrections are applied to data. +2. Resolution smearing is applied to simulation. +3. Both scale and resolution uncertainties are applied to simulation. + +Resources: + - https://twiki.cern.ch/twiki/bin/viewauth/CMS/EgammSFandSSRun3#Scale_And_Smearings_Correctionli + - https://egammapog.docs.cern.ch/Run3/SaS + - https://cms-analysis-corrections.docs.cern.ch/corrections_era/Run3-22CDSep23-Summer22-NanoAODv12/EGM/2025-10-22 """ from __future__ import annotations -import abc import functools +import dataclasses + import law -from dataclasses import dataclass, field from columnflow.calibration import Calibrator, calibrator from columnflow.calibration.util import ak_random from columnflow.util import maybe_import, load_correction_set, DotDict -from columnflow.columnar_util import set_ak_column, flat_np_view, ak_copy, optional_column +from columnflow.columnar_util import set_ak_column, full_like from columnflow.types import Any ak = maybe_import("awkward") np = maybe_import("numpy") +logger = law.logger.get_logger(__name__) + # helper set_ak_column_f32 = functools.partial(set_ak_column, value_type=np.float32) -@dataclass +@dataclasses.dataclass class EGammaCorrectionConfig: - correction_set: str - value_type: str - uncertainty_type: str - compound: bool = False - corrector_kwargs: dict[str, Any] = field(default_factory=dict) - - -class egamma_scale_corrector(Calibrator): - - with_uncertainties = True - """Switch to control whether uncertainties are calculated.""" - - @property - @abc.abstractmethod - def source_field(self) -> str: - """Fields required for the current calibrator.""" - ... - - @abc.abstractmethod - def get_correction_file(self, external_files: law.FileTargetCollection) -> law.LocalFileTarget: - """Function to retrieve the correction file from the external files. - - :param external_files: File target containing the files as requested - in the current config instance under ``config_inst.x.external_files`` - """ - ... - - @abc.abstractmethod - def get_scale_config(self) -> EGammaCorrectionConfig: - """Function to retrieve the configuration for the photon energy correction.""" - ... - - def call_func(self, events: ak.Array, **kwargs) -> ak.Array: - """ - Apply energy corrections to EGamma objects in the events array. There are two types of implementations: standard - and Et dependent. - For Run2 the standard implementation is used, while for Run3 the Et dependent is recommended by the EGammaPog: - https://twiki.cern.ch/twiki/bin/viewauth/CMS/EgammSFandSSRun3?rev=41 - The Et dependendent recipe follows the example given in: - https://gitlab.cern.ch/cms-nanoAOD/jsonpog-integration/-/blob/66f581d0549e8d67fc55420d8bba15c9369fff7c/examples/egmScaleAndSmearingExample.py - - Requires an external file in the config under ``electron_ss``. Example: - - .. code-block:: python - - cfg.x.external_files = DotDict.wrap({ - "electron_ss": "/afs/cern.ch/work/m/mrieger/public/mirrors/jsonpog-integration-120c4271/POG/EGM/2022_Summer22//electronSS_EtDependent.json.gz", # noqa - }) - - The pairs of correction set, value and uncertainty type names, and if a compound method is used should be configured using the :py:class:`EGammaCorrectionConfig` as an - auxiliary entry in the config: - - .. code-block:: python - - cfg.x.eec = EGammaCorrectionConfig( - correction_set="EGMScale_Compound_Ele_2022preEE", - value_type="scale", - uncertainty_type="escale", - compound=True, - ) - - Derivatives of this base class require additional member variables and functions: - - - *source_field*: The field name of the EGamma objects in the events array (i.e. `Electron` - or `Photon`). - - *get_correction_file*: Function to retrieve the correction file, e.g.from - the list, of external files in the current `config_inst`. - - *get_scale_config*: Function to retrieve the configuration for the energy correction. - This config must be an instance of - :py:class:`~columnflow.calibration.cms.egamma.EGammaCorrectionConfig`. - - If no raw pt (i.e., pt before any corrections) is available, use the nominal pt. The - correction tool only supports flat arrays, so inputs are converted to a flat numpy view - first. Corrections are always applied to the raw pt, which is important if more than one - correction is applied in a row. The final corrections must be applied to the current pt. - - If :py:attr:`with_uncertainties` is set to `True`, the scale uncertainties are calculated. - The scale uncertainties are only available for simulated data. - - :param events: The events array containing EGamma objects. - :return: The events array with applied scale corrections. - - :notes: - - Varied corrections are only applied to Monte Carlo (MC) data. - - EGamma energy correction is only applied to real data. - - Changes are applied to the views and directly propagate to the original awkward - arrays. - """ - # if no raw pt (i.e. pt for any corrections) is available, use the nominal pt - if "rawPt" not in events[self.source_field].fields: - events = set_ak_column_f32(events, f"{self.source_field}.rawPt", events[self.source_field].pt) - - # the correction tool only supports flat arrays, so convert inputs to flat np view first - # corrections are always applied to the raw pt - this is important if more than - # one correction is applied in a row - pt_eval = flat_np_view(events[self.source_field].rawPt, axis=1) - - # the final corrections must be applied to the current pt though - pt_application = flat_np_view(events[self.source_field].pt, axis=1) - - broadcasted_run = ak.broadcast_arrays(events[self.source_field].pt, events.run) - run = flat_np_view(broadcasted_run[1], axis=1) - gain = flat_np_view(events[self.source_field].seedGain, axis=1) - sceta = flat_np_view(events[self.source_field].superclusterEta, axis=1) - r9 = flat_np_view(events[self.source_field].r9, axis=1) - - # prepare arguments - # (energy is part of the LorentzVector behavior) - variable_map = { - "et": pt_eval, - "eta": sceta, - "gain": gain, - "r9": r9, - "run": run, - "seedGain": gain, - "pt": pt_eval, - "AbsScEta": np.abs(sceta), - "ScEta": sceta, - **self.scale_config.corrector_kwargs, - } - args = tuple( - variable_map[inp.name] for inp in self.scale_corrector.inputs - if inp.name in variable_map - ) - - # varied corrections are only applied to MC - if self.with_uncertainties and self.dataset_inst.is_mc: - scale_uncertainties = self.scale_corrector.evaluate(self.scale_config.uncertainty_type, *args) - scales_up = (1 + scale_uncertainties) - scales_down = (1 - scale_uncertainties) - - for (direction, scales) in [("up", scales_up), ("down", scales_down)]: - # copy pt and mass - pt_varied = ak_copy(events[self.source_field].pt) - pt_view = flat_np_view(pt_varied, axis=1) - - # apply the scale variation - pt_view *= scales - - # save columns - postfix = f"scale_{direction}" - events = set_ak_column_f32(events, f"{self.source_field}.pt_{postfix}", pt_varied) - - # apply the nominal correction - # note: changes are applied to the views and directly propagate to the original ak arrays - # and do not need to be inserted into the events chunk again - # EGamma energy correction is ONLY applied to DATA - if self.dataset_inst.is_data: - scales_nom = self.scale_corrector.evaluate(self.scale_config.value_type, *args) - pt_application *= scales_nom - - return events - - def init_func(self, **kwargs) -> None: - """Function to initialize the calibrator. - - Sets the required and produced columns for the calibrator. - """ - self.uses |= { - # nano columns - f"{self.source_field}.{{seedGain,pt,eta,phi,superclusterEta,r9}}", - "run", - optional_column(f"{self.source_field}.rawPt"), - } - self.produces |= { - f"{self.source_field}.pt", - optional_column(f"{self.source_field}.rawPt"), - } - - # if we do not calculate uncertainties, this module - # should only run on observed DATA - self.data_only = not self.with_uncertainties - - # add columns with unceratinties if requested - # photon scale _uncertainties_ are only available for MC - if self.with_uncertainties and self.dataset_inst.is_mc: - self.produces |= {f"{self.source_field}.pt_scale_{{up,down}}"} - - def requires_func(self, task: law.Task, reqs: dict[str, DotDict[str, Any]], **kwargs) -> None: - """Function to add necessary requirements. - - This function add the :py:class:`~columnflow.tasks.external.BundleExternalFiles` - task to the requirements. - - :param reqs: Dictionary of requirements. - """ - if "external_files" in reqs: - return - - from columnflow.tasks.external import BundleExternalFiles - reqs["external_files"] = BundleExternalFiles.req(task) - - def setup_func( - self, - task: law.Task, - reqs: dict[str, DotDict[str, Any]], - inputs: dict[str, Any], - reader_targets: law.util.InsertableDict, - **kwargs, - ) -> None: - """Setup function before event chunk loop. - - This function loads the correction file and sets up the correction tool. - Additionally, the *scale_config* is retrieved. - - :param reqs: Dictionary with resolved requirements. - :param inputs: Dictionary with inputs (not used). - :param reader_targets: Dictionary for optional additional columns to load - """ - self.scale_config = self.get_scale_config() - # create the egamma corrector - corr_file = self.get_correction_file(reqs["external_files"].files) - # init and extend the correction set - corr_set = load_correction_set(corr_file) - if self.scale_config.compound: - corr_set = corr_set.compound - self.scale_corrector = corr_set[self.scale_config.correction_set] - - -class egamma_resolution_corrector(Calibrator): - - with_uncertainties = True - """Switch to control whether uncertainties are calculated.""" - - # smearing of the energy resolution is only applied to MC - mc_only = True - """This calibrator is only applied to simulated data.""" - - deterministic_seed_index = -1 - """ use deterministic seeds for random smearing and - take the "index"-th random number per seed when not -1 """ + Container class to describe energy scaling and smearing configurations. Example: - @property - @abc.abstractmethod - def source_field(self) -> str: - """Fields required for the current calibrator.""" - ... - - @abc.abstractmethod - def get_correction_file(self, external_files: law.FileTargetCollection) -> law.LocalFile: - """Function to retrieve the correction file from the external files. - - :param external_files: File target containing the files as requested - in the current config instance under ``config_inst.x.external_files`` - """ - ... - - @abc.abstractmethod - def get_resolution_config(self) -> EGammaCorrectionConfig: - """Function to retrieve the configuration for the photon energy correction.""" - ... - - def call_func(self, events: ak.Array, **kwargs) -> ak.Array: - """ - Apply energy resolution corrections to EGamma objects in the events array. - - There are two types of implementations: standard and Et dependent. For Run2 the standard - implementation is used, while for Run3 the Et dependent is recommended by the EGammaPog: - https://twiki.cern.ch/twiki/bin/viewauth/CMS/EgammSFandSSRun3?rev=41 The Et dependendent - recipe follows the example given in: - https://gitlab.cern.ch/cms-nanoAOD/jsonpog-integration/-/blob/66f581d0549e8d67fc55420d8bba15c9369fff7c/examples/egmScaleAndSmearingExample.py - - Requires an external file in the config under ``electron_ss``. Example: - - .. code-block:: python - - cfg.x.external_files = DotDict.wrap({ - "electron_ss": "/afs/cern.ch/work/m/mrieger/public/mirrors/jsonpog-integration-120c4271/POG/EGM/2022_Summer22/electronSS_EtDependent.json.gz", # noqa - }) - - The pairs of correction set, value and uncertainty type names, and if a compound method is used should be configured using the :py:class:`EGammaCorrectionConfig` as an - auxiliary entry in the config: - - .. code-block:: python - - cfg.x.eec = EGammaCorrectionConfig( - correction_set="EGMSmearAndSyst_ElePTsplit_2022preEE", - value_type="smear", - uncertainty_type="esmear", - ) - - Derivatives of this base class require additional member variables and functions: - - - *source_field*: The field name of the EGamma objects in the events array (i.e. `Electron` or `Photon`). - - *get_correction_file*: Function to retrieve the correction file, e.g. - from the list of external files in the current `config_inst`. - - *get_resolution_config*: Function to retrieve the configuration for the energy resolution correction. - This config must be an instance of :py:class:`~columnflow.calibration.cms.egamma.EGammaCorrectionConfig`. - - If no raw pt (i.e., pt before any corrections) is available, use the nominal pt. - The correction tool only supports flat arrays, so inputs are converted to a flat numpy view first. - Corrections are always applied to the raw pt, which is important if more than one correction is applied in a - row. The final corrections must be applied to the current pt. - - If :py:attr:`with_uncertainties` is set to `True`, the resolution uncertainties are calculated. - - If :py:attr:`deterministic_seed_index` is set to a value greater than or equal to 0, deterministic seeds - are used for random smearing. The "index"-th random number per seed is taken for the nominal resolution - correction. The "index+1"-th random number per seed is taken for the up variation and the "index+2"-th random - number per seed is taken for the down variation. - - :param events: The events array containing EGamma objects. - :return: The events array with applied resolution corrections. - - :notes: - - Energy resolution correction are only to be applied to simulation. - - Changes are applied to the views and directly propagate to the original awkward arrays. - """ - - # if no raw pt (i.e. pt for any corrections) is available, use the nominal pt - if "rawPt" not in events[self.source_field].fields: - events = set_ak_column_f32(events, f"{self.source_field}.rawPt", ak_copy(events[self.source_field].pt)) - - # the correction tool only supports flat arrays, so convert inputs to flat np view first - sceta = flat_np_view(events[self.source_field].superclusterEta, axis=1) - r9 = flat_np_view(events[self.source_field].r9, axis=1) - flat_seeds = flat_np_view(events[self.source_field].deterministic_seed, axis=1) - pt = flat_np_view(events[self.source_field].rawPt, axis=1) - - # prepare arguments - variable_map = { - "AbsScEta": np.abs(sceta), - "ScEta": sceta, # 2024 version - "eta": sceta, - "r9": r9, - "pt": pt, - **self.resolution_cfg.corrector_kwargs, - } + .. code-block:: python - args = tuple( - variable_map[inp.name] - for inp in self.resolution_corrector.inputs - if inp.name in variable_map + cfg.x.ess = EGammaCorrectionConfig( + scale_correction_set="Scale", + scale_compound=True, + smear_syst_correction_set="SmearAndSyst", + systs=["scale_down", "scale_up", "smear_down", "smear_up"], ) - - # calculate the smearing scale - # as mentioned in the example above, allows us to apply them directly to the MC simulation. - rho = self.resolution_corrector.evaluate(self.resolution_cfg.value_type, *args) - - # varied corrections - if self.with_uncertainties and self.dataset_inst.is_mc: - rho_unc = self.resolution_corrector.evaluate(self.resolution_cfg.uncertainty_type, *args) - random_normal_number = functools.partial(ak_random, 0, 1) - smearing_func = lambda rng_array, variation: rng_array * variation + 1 - - smearing_up = ( - smearing_func( - random_normal_number(flat_seeds, rand_func=self.deterministic_normal_up), - rho + rho_unc, - ) - if self.deterministic_seed_index >= 0 - else smearing_func( - random_normal_number(rand_func=np.random.Generator(np.random.SFC64(events.event.to_list())).normal), - rho + rho_unc, - ) - ) - - smearing_down = ( - smearing_func( - random_normal_number(flat_seeds, rand_func=self.deterministic_normal_down), - rho - rho_unc, - ) - if self.deterministic_seed_index >= 0 - else smearing_func( - random_normal_number(rand_func=np.random.Generator(np.random.SFC64(events.event.to_list())).normal), - rho - rho_unc, - ) - ) - - for (direction, smear) in [("up", smearing_up), ("down", smearing_down)]: - # copy pt and mass - pt_varied = ak_copy(events[self.source_field].pt) - pt_view = flat_np_view(pt_varied, axis=1) - - # apply the scale variation - # cast ak to numpy array for convenient usage of *= - pt_view *= smear.to_numpy() - - # save columns - postfix = f"res_{direction}" - events = set_ak_column_f32(events, f"{self.source_field}.pt_{postfix}", pt_varied) - - # apply the nominal correction - # note: changes are applied to the views and directly propagate to the original ak arrays - # and do not need to be inserted into the events chunk again - # EGamma energy resolution correction is ONLY applied to MC - if self.dataset_inst.is_mc: - smearing = ( - ak_random(1, rho, flat_seeds, rand_func=self.deterministic_normal) - if self.deterministic_seed_index >= 0 - else ak_random(1, rho, rand_func=np.random.Generator( - np.random.SFC64(events.event.to_list())).normal, - ) - ) - # the final corrections must be applied to the current pt though - pt = flat_np_view(events[self.source_field].pt, axis=1) - pt *= smearing.to_numpy() - - return events - - def init_func(self, **kwargs) -> None: - """Function to initialize the calibrator. - - Sets the required and produced columns for the calibrator. - """ - self.uses |= { - # nano columns - f"{self.source_field}.{{pt,eta,phi,superclusterEta,r9}}", - optional_column(f"{self.source_field}.rawPt"), - } - self.produces |= { - f"{self.source_field}.pt", - optional_column(f"{self.source_field}.rawPt"), - } - - # add columns with unceratinties if requested - if self.with_uncertainties and self.dataset_inst.is_mc: - self.produces |= {f"{self.source_field}.pt_res_{{up,down}}"} - - def requires_func(self, task: law.Task, reqs: dict[str, DotDict[str, Any]], **kwargs) -> None: - """Function to add necessary requirements. - - This function add the :py:class:`~columnflow.tasks.external.BundleExternalFiles` - task to the requirements. - - :param reqs: Dictionary of requirements. - """ - if "external_files" in reqs: - return - - from columnflow.tasks.external import BundleExternalFiles - reqs["external_files"] = BundleExternalFiles.req(task) - - def setup_func( - self, - task: law.Task, - reqs: dict[str, DotDict[str, Any]], - inputs: dict[str, Any], - reader_targets: law.util.InsertableDict, - **kwargs, - ) -> None: - """Setup function before event chunk loop. - - This function loads the correction file and sets up the correction tool. - Additionally, the *resolution_config* is retrieved. - If :py:attr:`deterministic_seed_index` is set to a value greater than or equal to 0, - random generator based on object-specific random seeds are setup. - - :param reqs: Dictionary with resolved requirements. - :param inputs: Dictionary with inputs (not used). - :param reader_targets: Dictionary for optional additional columns to load - (not used). - """ - self.resolution_cfg = self.get_resolution_config() - # create the egamma corrector - corr_file = self.get_correction_file(reqs["external_files"].files) - corr_set = load_correction_set(corr_file) - if self.resolution_cfg.compound: - corr_set = corr_set.compound - self.resolution_corrector = corr_set[self.resolution_cfg.correction_set] - - # use deterministic seeds for random smearing if requested - if self.deterministic_seed_index >= 0: - idx = self.deterministic_seed_index - bit_generator = np.random.SFC64 - - def deterministic_normal(loc, scale, seed, idx_offset=0): - return np.asarray([ - np.random.Generator(bit_generator(_seed)).normal(_loc, _scale, size=idx + 1 + idx_offset)[-1] - for _loc, _scale, _seed in zip(loc, scale, seed) - ]) - self.deterministic_normal = functools.partial(deterministic_normal, idx_offset=0) - self.deterministic_normal_up = functools.partial(deterministic_normal, idx_offset=1) - self.deterministic_normal_down = functools.partial(deterministic_normal, idx_offset=2) - - -pec = egamma_scale_corrector.derive( - "pec", cls_dict={ - "source_field": "Photon", - "with_uncertainties": True, - "get_correction_file": (lambda self, external_files: external_files.photon_ss), - "get_scale_config": (lambda self: self.config_inst.x.pec), - }, -) - -per = egamma_resolution_corrector.derive( - "per", cls_dict={ - "source_field": "Photon", - "with_uncertainties": True, - # function to determine the correction file - "get_correction_file": (lambda self, external_files: external_files.photon_ss), - # function to determine the tec config - "get_resolution_config": (lambda self: self.config_inst.x.per), - }, -) + """ + scale_correction_set: str + smear_syst_correction_set: str + scale_compound: bool = False + smear_syst_compound: bool = False + systs: list[str] = dataclasses.field(default_factory=list) + corrector_kwargs: dict[str, Any] = dataclasses.field(default_factory=dict) @calibrator( - uses={per, pec}, - produces={per, pec}, + exposed=False, + # used and produced columns are defined dynamically in init function with_uncertainties=True, - get_correction_file=None, - get_scale_config=None, - get_resolution_config=None, - deterministic_seed_index=-1, + collection_name=None, # to be set in derived classes to "Electron" or "Photon" + get_scale_smear_config=None, # to be set in derived classes + get_correction_file=None, # to be set in derived classes + deterministic_seed_index=-1, # use deterministic seeds for random smearing when >=0 + store_original=False, # if original columns (pt, energyErr) should be stored as "*_uncorrected" ) -def photons(self, events: ak.Array, **kwargs) -> ak.Array: - """ - Calibrator for photons. This calibrator runs the energy scale and resolution calibrators - for photons. - - Careful! Always apply resolution before scale corrections for MC. - """ +def _egamma_scale_smear(self: Calibrator, events: ak.Array, **kwargs) -> ak.Array: + # gather inputs + coll = events[self.collection_name] + variable_map = { + "run": events.run, + "pt": coll.pt, + "ScEta": coll.superclusterEta, + "r9": coll.r9, + "seedGain": coll.seedGain, + **self.cfg.corrector_kwargs, + } + def get_inputs(corrector, **additional_variables): + _variable_map = variable_map | additional_variables + return (_variable_map[inp.name] for inp in corrector.inputs if inp.name in _variable_map) + + # apply scale correction to data + if self.dataset_inst.is_data: + # store uncorrected values before correcting + if self.store_original: + events = set_ak_column(events, f"{self.collection_name}.pt_scale_uncorrected", coll.pt) + events = set_ak_column(events, f"{self.collection_name}.energyErr_scale_uncorrected", coll.energyErr) + + # get scaled pt + scale = self.scale_corrector.evaluate("scale", *get_inputs(self.scale_corrector)) + pt_scaled = coll.pt * scale + + # get scaled energy error + smear = self.smear_syst_corrector.evaluate("smear", *get_inputs(self.smear_syst_corrector, pt=pt_scaled)) + energy_err_scaled = (((coll.energyErr)**2 + (coll.energy * smear)**2) * scale)**0.5 + + # store columns + events = set_ak_column_f32(events, f"{self.collection_name}.pt", pt_scaled) + events = set_ak_column_f32(events, f"{self.collection_name}.energyErr", energy_err_scaled) + + # apply smearing to MC if self.dataset_inst.is_mc: - events = self[per](events, **kwargs) - - if self.with_uncertainties or self.dataset_inst.is_data: - events = self[pec](events, **kwargs) + # store uncorrected values before correcting + if self.store_original: + events = set_ak_column(events, f"{self.collection_name}.pt_smear_uncorrected", coll.pt) + events = set_ak_column(events, f"{self.collection_name}.energyErr_smear_uncorrected", coll.energyErr) + + # helper to compute random variables in the shape of the collection + def get_rnd(syst): + args = (full_like(coll.pt, 0.0), full_like(coll.pt, 1.0)) + if self.use_deterministic_seeds: + args += (coll.deterministic_seed,) + rand_func = self.deterministic_normal[syst] + else: + # TODO: bit generator could be configurable + rand_func = np.random.Generator(np.random.SFC64((events.event + sum(map(ord, syst))).to_list())).normal + return ak_random(*args, rand_func=rand_func) + + # helper to compute smeared pt and energy error values given a syst + def apply_smearing(syst): + # get smeared pt + smear = self.smear_syst_corrector.evaluate(syst, *get_inputs(self.smear_syst_corrector)) + smear_factor = 1.0 + smear * get_rnd(syst) + pt_smeared = coll.pt * smear_factor + # get smeared energy error + energy_err_smeared = (((coll.energyErr)**2 + (coll.energy * smear)**2) * smear_factor)**0.5 + # return both + return pt_smeared, energy_err_smeared + + # compute and store columns + pt_smeared, energy_err_smeared = apply_smearing("smear") + events = set_ak_column_f32(events, f"{self.collection_name}.pt", pt_smeared) + events = set_ak_column_f32(events, f"{self.collection_name}.energyErr", energy_err_smeared) + + # apply scale and smearing uncertainties to MC + if self.with_uncertainties and self.cfg.systs: + for syst in self.cfg.systs: + # exact behavior depends on syst itself + if syst in {"scale_up", "scale_down"}: + # compute scale with smeared pt and apply muliplicatively to smeared values + scale = self.smear_syst_corrector.evaluate(syst, *get_inputs(self.smear_syst_corrector, pt=pt_smeared)) # noqa: E501 + events = set_ak_column_f32(events, f"{self.collection_name}.pt_{syst}", pt_smeared * scale) + events = set_ak_column_f32(events, f"{self.collection_name}.energyErr_{syst}", energy_err_smeared * scale) # noqa: E501 + + elif syst in {"smear_up", "smear_down"}: + # compute smearing variations on original variables with same method as above + pt_smeared_syst, energy_err_smeared_syst = apply_smearing(syst) + events = set_ak_column_f32(events, f"{self.collection_name}.pt_{syst}", pt_smeared_syst) + events = set_ak_column_f32(events, f"{self.collection_name}.energyErr_{syst}", energy_err_smeared_syst) # noqa: E501 + + else: + logger.error(f"{self.cls_name} calibrator received unknown systematic '{syst}', skipping") return events -@photons.pre_init -def photons_pre_init(self, **kwargs) -> None: - # forward argument to the producers - if pec not in self.deps_kwargs: - self.deps_kwargs[pec] = dict() - if per not in self.deps_kwargs: - self.deps_kwargs[per] = dict() - self.deps_kwargs[pec]["with_uncertainties"] = self.with_uncertainties - self.deps_kwargs[per]["with_uncertainties"] = self.with_uncertainties - - self.deps_kwargs[per]["deterministic_seed_index"] = self.deterministic_seed_index - if self.get_correction_file is not None: - self.deps_kwargs[pec]["get_correction_file"] = self.get_correction_file - self.deps_kwargs[per]["get_correction_file"] = self.get_correction_file - - if self.get_resolution_config is not None: - self.deps_kwargs[per]["get_resolution_config"] = self.get_resolution_config - if self.get_scale_config is not None: - self.deps_kwargs[pec]["get_scale_config"] = self.get_scale_config - - -photons_nominal = photons.derive("photons_nominal", cls_dict={"with_uncertainties": False}) - - -eer = egamma_resolution_corrector.derive( - "eer", cls_dict={ - "source_field": "Electron", - # calculation of superclusterEta for electrons requires the deltaEtaSC - "uses": {"Electron.deltaEtaSC"}, - "with_uncertainties": True, - # function to determine the correction file - "get_correction_file": (lambda self, external_files: external_files.electron_ss), - # function to determine the tec config - "get_resolution_config": (lambda self: self.config_inst.x.eer), - }, -) +@_egamma_scale_smear.init +def _egamma_scale_smear_init(self: Calibrator, **kwargs) -> None: + # store the config + self.cfg = self.get_scale_smear_config() + + # update used columns + self.uses |= {"run", f"{self.collection_name}.{{pt,eta,phi,mass,energyErr,superclusterEta,r9,seedGain}}"} + + # update produced columns + if self.dataset_inst.is_data: + self.produces |= {f"{self.collection_name}.{{pt,energyErr}}"} + if self.store_original: + self.produces |= {f"{self.collection_name}.{{pt,energyErr}}_scale_uncorrected"} + else: + self.produces |= {f"{self.collection_name}.{{pt,energyErr}}"} + if self.store_original: + self.produces |= {f"{self.collection_name}.{{pt,energyErr}}_smear_uncorrected"} + if self.with_uncertainties: + for syst in self.cfg.systs: + self.produces |= {f"{self.collection_name}.{{pt,energyErr}}_{syst}"} + + +@_egamma_scale_smear.requires +def _egamma_scale_smear_requires(self, task: law.Task, reqs: dict[str, DotDict[str, Any]], **kwargs) -> None: + if "external_files" in reqs: + return + + from columnflow.tasks.external import BundleExternalFiles + reqs["external_files"] = BundleExternalFiles.req(task) + + +@_egamma_scale_smear.setup +def _egamma_scale_smear_setup( + self, + task: law.Task, + reqs: dict[str, DotDict[str, Any]], + inputs: dict[str, Any], + reader_targets: law.util.InsertableDict, + **kwargs, +) -> None: + # get and load the correction file + corr_file = self.get_correction_file(reqs["external_files"].files) + corr_set = load_correction_set(corr_file) + + # setup the correctors + get_set = lambda set_name, compound: (corr_set.compound if compound else corr_set)[set_name] + self.scale_corrector = get_set(self.cfg.scale_correction_set, self.cfg.scale_compound) + self.smear_syst_corrector = get_set(self.cfg.smear_syst_correction_set, self.cfg.smear_syst_compound) + + # use deterministic seeds for random smearing if requested + self.use_deterministic_seeds = self.deterministic_seed_index >= 0 + if self.use_deterministic_seeds: + idx = self.deterministic_seed_index + bit_generator = np.random.SFC64 + + def _deterministic_normal(loc, scale, seed, idx_offset=0): + return np.asarray([ + np.random.Generator(bit_generator(_seed)).normal(_loc, _scale, size=idx + 1 + idx_offset)[-1] + for _loc, _scale, _seed in zip(loc, scale, seed) + ]) + + self.deterministic_normal = { + "smear": functools.partial(_deterministic_normal, idx_offset=0), + "smear_up": functools.partial(_deterministic_normal, idx_offset=1), + "smear_down": functools.partial(_deterministic_normal, idx_offset=2), + } + -eec = egamma_scale_corrector.derive( - "eec", cls_dict={ - "source_field": "Electron", - # calculation of superclusterEta for electrons requires the deltaEtaSC - "uses": {"Electron.deltaEtaSC"}, - "with_uncertainties": True, - "get_correction_file": (lambda self, external_files: external_files.electron_ss), - "get_scale_config": (lambda self: self.config_inst.x.eec), +electron_scale_smear = _egamma_scale_smear.derive( + "electron_scale_smear", + cls_dict={ + "collection_name": "Electron", + "get_scale_smear_config": lambda self: self.config_inst.x.ess, + "get_correction_file": lambda self, external_files: external_files.electron_ss, }, ) - -@calibrator( - uses={eer, eec}, - produces={eer, eec}, - with_uncertainties=True, - get_correction_file=None, - get_scale_config=None, - get_resolution_config=None, - deterministic_seed_index=-1, +photon_scale_smear = _egamma_scale_smear.derive( + "photon_scale_smear", + cls_dict={ + "collection_name": "Photon", + "get_scale_smear_config": lambda self: self.config_inst.x.gss, + "get_correction_file": lambda self, external_files: external_files.photon_ss, + }, ) -def electrons(self, events: ak.Array, **kwargs) -> ak.Array: - """ - Calibrator for electrons. This calibrator runs the energy scale and resolution calibrators - for electrons. - - Careful! Always apply resolution before scale corrections for MC. - """ - if self.dataset_inst.is_mc: - events = self[eer](events, **kwargs) - - if self.with_uncertainties or self.dataset_inst.is_data: - events = self[eec](events, **kwargs) - - return events - - -@electrons.pre_init -def electrons_pre_init(self, **kwargs) -> None: - # forward argument to the producers - if eec not in self.deps_kwargs: - self.deps_kwargs[eec] = dict() - if eer not in self.deps_kwargs: - self.deps_kwargs[eer] = dict() - self.deps_kwargs[eec]["with_uncertainties"] = self.with_uncertainties - self.deps_kwargs[eer]["with_uncertainties"] = self.with_uncertainties - - self.deps_kwargs[eer]["deterministic_seed_index"] = self.deterministic_seed_index - if self.get_correction_file is not None: - self.deps_kwargs[eec]["get_correction_file"] = self.get_correction_file - self.deps_kwargs[eer]["get_correction_file"] = self.get_correction_file - - if self.get_resolution_config is not None: - self.deps_kwargs[eer]["get_resolution_config"] = self.get_resolution_config - if self.get_scale_config is not None: - self.deps_kwargs[eec]["get_scale_config"] = self.get_scale_config - - -electrons_nominal = photons.derive("electrons_nominal", cls_dict={"with_uncertainties": False}) diff --git a/columnflow/calibration/cms/tau.py b/columnflow/calibration/cms/tau.py index 4cd4e7081..69e5a6760 100644 --- a/columnflow/calibration/cms/tau.py +++ b/columnflow/calibration/cms/tau.py @@ -263,7 +263,7 @@ def tec_setup( self.tec_corrector = load_correction_set(tau_file)[self.tec_cfg.correction_set] # check versions - assert self.tec_corrector.version in [0, 1] + assert self.tec_corrector.version in {0, 1, 2} tec_nominal = tec.derive("tec_nominal", cls_dict={"with_uncertainties": False}) diff --git a/columnflow/cms_util.py b/columnflow/cms_util.py new file mode 100644 index 000000000..2e283009f --- /dev/null +++ b/columnflow/cms_util.py @@ -0,0 +1,201 @@ +# coding: utf-8 + +""" +Collection of CMS specific helpers and utilities. +""" + +from __future__ import annotations + +__all__ = [] + +import os +import re +import copy +import pathlib +import dataclasses + +from columnflow.types import ClassVar, Generator + + +#: Default root path to CAT metadata. +cat_metadata_root = "/cvmfs/cms-griddata.cern.ch/cat/metadata" + + +@dataclasses.dataclass +class CATSnapshot: + """ + Dataclass to wrap YYYY-MM-DD stype timestamps of CAT metadata per POG stored in + "/cvmfs/cms-griddata.cern.ch/cat/metadata". No format parsing or validation is done, leaving responsibility to the + user. + """ + btv: str = "" + dc: str = "" + egm: str = "" + jme: str = "" + lum: str = "" + muo: str = "" + tau: str = "" + + def items(self) -> Generator[tuple[str, str], None, None]: + return ((k, getattr(self, k)) for k in self.__dataclass_fields__.keys()) + + +@dataclasses.dataclass +class CATInfo: + """ + Dataclass to describe and wrap information about a specific CAT-defined metadata era. + + .. code-block:: python + + CATInfo( + run=3, + era="22CDSep23-Summer22", + vnano=12, + snapshot=CATSnapshot( + btv="2025-08-20", + dc="2025-07-25", + egm="2025-04-15", + jme="2025-09-23", + lum="2024-01-31", + muo="2025-08-14", + tau="2025-10-01", + ), + # pog-specific settings + pog_directories={"dc": "Collisions22"}, + ) + """ + run: int + era: str + vnano: int + snapshot: CATSnapshot + # optional POG-specific overrides + pog_eras: dict[str, str] = dataclasses.field(default_factory=dict) + pog_directories: dict[str, str] = dataclasses.field(default_factory=dict) + + metadata_root: ClassVar[str] = cat_metadata_root + + def get_era_directory(self, pog: str = "") -> str: + """ + Returns the era directory name for a given *pog*. + + :param pog: The POG to get the era for. Leave empty if the common POG-unspecific directory name should be used. + """ + pog = pog.lower() + + # use specific directory if defined + if pog in self.pog_directories: + return self.pog_directories[pog] + + # build common directory name from run, era, and vnano + era = self.pog_eras.get(pog.lower(), self.era) if pog else self.era + return f"Run{self.run}-{era}-NanoAODv{self.vnano}" + + def get_file(self, pog: str, *paths: str | pathlib.Path) -> str: + """ + Returns the full path to a specific file or directory defined by *paths* in the CAT metadata structure for a + given *pog*. + """ + return os.path.join( + self.metadata_root, + pog.upper(), + self.get_era_directory(pog), + getattr(self.snapshot, pog.lower()), + *(str(p).strip("/") for p in paths), + ) + + +@dataclasses.dataclass +class CMSDatasetInfo: + """ + Container to wrap a CMS dataset given by its *key* with access to its components. The key should be in the format + ``//--/AOD``. + + .. code-block:: python + + d = CMSDatasetInfo.from_key("/TTtoLNu2Q_TuneCP5_13p6TeV_powheg-pythia8/RunIII2024Summer24MiniAODv6-150X_mcRun3_2024_realistic_v2-v2/MINIAODSIM") # noqa + print(d.name) # TTtoLNu2Q_TuneCP5_13p6TeV_powheg-pythia8 + print(d.campaign) # RunIII2024Summer24MiniAODv6 + print(d.campaign_version) # 150X_mcRun3_2024_realistic_v2 + print(d.dataset_version) # v2 + print(d.tier) # mini (lower case) + print(d.mc) # True + print(d.data) # False + print(d.kind) # mc + """ + name: str + campaign: str + campaign_version: str + dataset_version: str # this is usually the GT for MC + tier: str + mc: bool + + @classmethod + def from_key(cls, key: str) -> CMSDatasetInfo: + """ + Takes a dataset *key*, splits it into its components, and returns a new :py:class:`CMSDatasetInfo` instance. + + :param key: The dataset key: + :return: A new instance of :py:class:`CMSDatasetInfo`. + """ + # split + if not (m := re.match(r"^/([^/]+)/([^/-]+)-([^/-]+)-([^/-]+)/([^/-]+)AOD(SIM)?$", key)): + raise ValueError(f"invalid dataset key '{key}'") + + # create instance + return cls( + name=m.group(1), + campaign=m.group(2), + campaign_version=m.group(3), + dataset_version=m.group(4), + tier=m.group(5).lower(), + mc=m.group(6) == "SIM", + ) + + @property + def key(self) -> str: + # transform back to key format + return ( + f"/{self.name}" + f"/{self.campaign}-{self.campaign_version}-{self.dataset_version}" + f"/{self.tier.upper()}AOD{'SIM' if self.mc else ''}" + ) + + @property + def data(self) -> bool: + return not bool(self.mc) + + @data.setter + def data(self, value: bool) -> None: + self.mc = not bool(value) + + @property + def kind(self) -> str: + return "mc" if self.mc else "data" + + @kind.setter + def kind(self, value: str) -> None: + if (_value := str(value).lower()) not in {"mc", "data"}: + raise ValueError(f"invalid kind '{value}', expected 'mc' or 'data'") + self.mc = _value == "mc" + + @property + def store_path(self) -> str: + return ( + "/store" + f"/{self.kind}" + f"/{self.campaign}" + f"/{self.name}" + f"/{self.tier.upper()}AOD{'SIM' if self.mc else ''}" + f"/{self.campaign_version}-{self.dataset_version}" + ) + + def copy(self, **kwargs) -> CMSDatasetInfo: + """ + Creates a copy of this instance, allowing to override specific attributes via *kwargs*. + + :param kwargs: Attributes to override in the copy. + :return: A new instance of :py:class:`CMSDatasetInfo`. + """ + attrs = copy.deepcopy(self.__dict__) + attrs.update(kwargs) + return self.__class__(**attrs) diff --git a/columnflow/config_util.py b/columnflow/config_util.py index 3875ea502..0958e0ec7 100644 --- a/columnflow/config_util.py +++ b/columnflow/config_util.py @@ -333,16 +333,27 @@ def get_shift_from_configs(configs: list[od.Config], shift: str | od.Shift, sile def get_shifts_from_sources(config: od.Config, *shift_sources: Sequence[str]) -> list[od.Shift]: """ - Takes a *config* object and returns a list of shift instances for both directions given a - sequence *shift_sources*. + Takes a *config* object and returns a list of shift instances for both directions given a sequence of + *shift_sources*. Each source should be the name of a shift source (no direction suffix) or a pattern. + + :param config: :py:class:`order.Config` object from which to retrieve the shifts. + :param shift_sources: Sequence of shift source names or patterns. + :return: List of :py:class:`order.Shift` instances obtained from the given sources. """ - return sum( - ( - [config.get_shift(f"{s}_{od.Shift.UP}"), config.get_shift(f"{s}_{od.Shift.DOWN}")] - for s in shift_sources - ), - [], - ) + # since each passed source can be a pattern, all existing sources need to be checked + # however, the order should be preserved, so loop through each pattern and check for matching sources + existing_sources = {shift.source for shift in config.shifts} + found_sources = set() + shifts = [] + for pattern in shift_sources: + for source in existing_sources: + if source not in found_sources and law.util.multi_match(source, pattern): + found_sources.add(source) + shifts += [ + config.get_shift(f"{source}_{od.Shift.UP}"), + config.get_shift(f"{source}_{od.Shift.DOWN}"), + ] + return shifts def group_shifts( diff --git a/columnflow/hist_util.py b/columnflow/hist_util.py index f579c0af5..1a82c8617 100644 --- a/columnflow/hist_util.py +++ b/columnflow/hist_util.py @@ -14,7 +14,7 @@ from columnflow.columnar_util import flat_np_view from columnflow.util import maybe_import -from columnflow.types import TYPE_CHECKING, Any +from columnflow.types import TYPE_CHECKING, Any, Sequence np = maybe_import("numpy") ak = maybe_import("awkward") @@ -306,3 +306,37 @@ def add_missing_shifts( h.fill(*dummy_fill, weight=0) # TODO: this might skip overflow and underflow bins h[{str_axis: hist.loc(missing_shift)}] = nominal.view() + + +def sum_hists(hists: Sequence[hist.Hist]) -> hist.Hist: + """ + Sums a sequence of histograms into a new histogram. In case axis labels differ, which typically leads to errors + ("axes not mergable"), the labels of the first histogram are used. + + :param hists: The histograms to sum. + :return: The summed histogram. + """ + hists = list(hists) + if not hists: + raise ValueError("no histograms given for summation") + + # copy the first histogram + h_sum = hists[0].copy() + if len(hists) == 1: + return h_sum + + # store labels of first histogram + axis_labels = {ax.name: ax.label for ax in h_sum.axes} + + for h in hists[1:]: + # align axis labels if needed, only copy if necessary + h_aligned_labels = None + for ax in h.axes: + if ax.name not in axis_labels or ax.label == axis_labels[ax.name]: + continue + if h_aligned_labels is None: + h_aligned_labels = h.copy() + h_aligned_labels.axes[ax.name].label = axis_labels[ax.name] + h_sum = h_sum + (h if h_aligned_labels is None else h_aligned_labels) + + return h_sum diff --git a/columnflow/inference/cms/datacard.py b/columnflow/inference/cms/datacard.py index bca00f5fd..394960c6a 100644 --- a/columnflow/inference/cms/datacard.py +++ b/columnflow/inference/cms/datacard.py @@ -13,6 +13,7 @@ from columnflow import __version__ as cf_version from columnflow.inference import InferenceModel, ParameterType, ParameterTransformation, FlowStrategy +from columnflow.hist_util import sum_hists from columnflow.util import DotDict, maybe_import, real_path, ensure_dir, safe_div, maybe_int from columnflow.types import TYPE_CHECKING, Sequence, Any, Union, Hashable @@ -616,7 +617,7 @@ def fill_empty(cat_obj, h): continue # helper to sum over them for a given shift key and an optional fallback - def sum_hists(key: Hashable, fallback_key: Hashable | None = None) -> hist.Hist: + def get_hist_sum(key: Hashable, fallback_key: Hashable | None = None) -> hist.Hist: def get(hd: dict[Hashable, hist.Hist]) -> hist.Hist: if key in hd: return hd[key] @@ -625,7 +626,7 @@ def get(hd: dict[Hashable, hist.Hist]) -> hist.Hist: raise Exception( f"'{key}' shape for process '{proc_name}' in category '{cat_name}' misconfigured: {hd}", ) - return sum(map(get, hists[1:]), get(hists[0]).copy()) + return sum_hists(map(get, hists)) # helper to extract sum of hists, apply scale, handle flow and fill empty bins def load( @@ -634,7 +635,7 @@ def load( fallback_key: Hashable | None = None, scale: float = 1.0, ) -> hist.Hist: - h = sum_hists(hist_key, fallback_key) * scale + h = get_hist_sum(hist_key, fallback_key) * scale handle_flow(cat_obj, h, hist_name) fill_empty(cat_obj, h) return h @@ -826,7 +827,7 @@ def load( if not h_data: proc_str = ",".join(map(str, cat_obj.data_from_processes)) raise Exception(f"none of requested processes '{proc_str}' found to create fake data") - h_data = sum(h_data[1:], h_data[0].copy()) + h_data = sum_hists(h_data) data_name = data_pattern.format(category=cat_name) fill_empty(cat_obj, h_data) handle_flow(cat_obj, h_data, data_name) @@ -845,7 +846,7 @@ def load( h_data.append(proc_hists["data"][config_name]["nominal"]) # simply save the data histogram that was already built from the requested datasets - h_data = sum(h_data[1:], h_data[0].copy()) + h_data = sum_hists(h_data) data_name = data_pattern.format(category=cat_name) handle_flow(cat_obj, h_data, data_name) out_file[data_name] = h_data diff --git a/columnflow/plotting/plot_all.py b/columnflow/plotting/plot_all.py index 3a424bf30..ef93d9566 100644 --- a/columnflow/plotting/plot_all.py +++ b/columnflow/plotting/plot_all.py @@ -365,13 +365,17 @@ def plot_all( rax = None grid_spec = {"left": 0.15, "right": 0.95, "top": 0.95, "bottom": 0.1} grid_spec |= style_config.get("gridspec_cfg", {}) + + # Get figure size from style_config, with default values + subplots_cfg = style_config.get("subplots_cfg", {}) + if not skip_ratio: grid_spec = {"height_ratios": [3, 1], "hspace": 0, **grid_spec} - fig, axs = plt.subplots(2, 1, gridspec_kw=grid_spec, sharex=True) + fig, axs = plt.subplots(2, 1, gridspec_kw=grid_spec, sharex=True, **subplots_cfg) (ax, rax) = axs else: grid_spec.pop("height_ratios", None) - fig, ax = plt.subplots(gridspec_kw=grid_spec) + fig, ax = plt.subplots(gridspec_kw=grid_spec, **subplots_cfg) axs = (ax,) # invoke all plots methods diff --git a/columnflow/plotting/plot_functions_1d.py b/columnflow/plotting/plot_functions_1d.py index 69e26562e..34b6d02a7 100644 --- a/columnflow/plotting/plot_functions_1d.py +++ b/columnflow/plotting/plot_functions_1d.py @@ -30,7 +30,7 @@ remove_negative_contributions, join_labels, ) -from columnflow.hist_util import add_missing_shifts +from columnflow.hist_util import add_missing_shifts, sum_hists from columnflow.types import TYPE_CHECKING, Iterable np = maybe_import("numpy") @@ -76,7 +76,7 @@ def plot_variable_stack( if len(shift_insts) == 1: # when there is exactly one shift bin, we can remove the shift axis - hists = remove_residual_axis(hists, "shift", select_value=shift_insts[0].name) + hists = remove_residual_axis(hists, "shift") else: # remove shift axis of histograms that are not to be stacked unstacked_hists = { @@ -265,7 +265,7 @@ def plot_shifted_variable( add_missing_shifts(h, all_shifts, str_axis="shift", nominal_bin="nominal") # create the sum of histograms over all processes - h_sum = sum(list(hists.values())[1:], list(hists.values())[0].copy()) + h_sum = sum_hists(hists.values()) # setup plotting configs plot_config = {} diff --git a/columnflow/plotting/plot_functions_2d.py b/columnflow/plotting/plot_functions_2d.py index c731c4822..2009586fe 100644 --- a/columnflow/plotting/plot_functions_2d.py +++ b/columnflow/plotting/plot_functions_2d.py @@ -16,6 +16,7 @@ import order as od from columnflow.util import maybe_import +from columnflow.hist_util import sum_hists from columnflow.plotting.plot_util import ( remove_residual_axis, apply_variable_settings, @@ -81,7 +82,7 @@ def plot_2d( extremes = "color" # add all processes into 1 histogram - h_sum = sum(list(hists.values())[1:], list(hists.values())[0].copy()) + h_sum = sum_hists(hists.values()) if shape_norm: h_sum = h_sum / h_sum.sum().value diff --git a/columnflow/plotting/plot_util.py b/columnflow/plotting/plot_util.py index 3c892c974..c680cc46a 100644 --- a/columnflow/plotting/plot_util.py +++ b/columnflow/plotting/plot_util.py @@ -18,8 +18,8 @@ import order as od import scinum as sn -from columnflow.util import maybe_import, try_int, try_complex, UNSET -from columnflow.hist_util import copy_axis +from columnflow.util import maybe_import, try_int, try_complex, safe_div, UNSET +from columnflow.hist_util import copy_axis, sum_hists from columnflow.types import TYPE_CHECKING, Iterable, Any, Callable, Sequence, Hashable np = maybe_import("numpy") @@ -225,7 +225,7 @@ def get_stack_integral() -> float: if scale_factor == "stack": # compute the scale factor and round h_no_shift = remove_residual_axis_single(h, "shift", select_value="nominal") - scale_factor = round_dynamic(get_stack_integral() / h_no_shift.sum().value) or 1 + scale_factor = round_dynamic(safe_div(get_stack_integral(), h_no_shift.sum().value)) or 1 if try_int(scale_factor): scale_factor = int(scale_factor) hists[proc_inst] = h * scale_factor @@ -571,9 +571,9 @@ def prepare_stack_plot_config( h_data, h_mc, h_mc_stack = None, None, None if data_hists: - h_data = sum(data_hists[1:], data_hists[0].copy()) + h_data = sum_hists(data_hists) if mc_hists: - h_mc = sum(mc_hists[1:], mc_hists[0].copy()) + h_mc = sum_hists(mc_hists) h_mc_stack = hist.Stack(*mc_hists) # setup plotting configs diff --git a/columnflow/production/cms/dy.py b/columnflow/production/cms/dy.py index 46201d28d..9e618c007 100644 --- a/columnflow/production/cms/dy.py +++ b/columnflow/production/cms/dy.py @@ -6,9 +6,9 @@ from __future__ import annotations -import law +import dataclasses -from dataclasses import dataclass +import law from columnflow.production import Producer, producer from columnflow.util import maybe_import, load_correction_set @@ -21,14 +21,23 @@ logger = law.logger.get_logger(__name__) -@dataclass +@dataclasses.dataclass class DrellYanConfig: + # era, e.g. "2022preEE" era: str + # correction set name correction: str + # uncertainty correction set name unc_correction: str | None = None + # generator order order: str | None = None - njets: bool = False + # list of systematics to be considered systs: list[str] | None = None + # functions to get the number of jets and b-tagged jets from the events in case they should be used as inputs + get_njets: callable[["dy_weights", ak.Array], ak.Array] | None = None + get_nbtags: callable[["dy_weights", ak.Array], ak.Array] | None = None + # additional columns to be loaded, e.g. as needed for njets or nbtags + used_columns: set = dataclasses.field(default_factory=set) def __post_init__(self) -> None: if not self.era or not self.correction: @@ -135,7 +144,8 @@ def dy_weights(self: Producer, events: ak.Array, **kwargs) -> ak.Array: *get_dy_weight_file* can be adapted in a subclass in case it is stored differently in the external files. - The campaign era and name of the correction set (see link above) should be given as an auxiliary entry in the config: + The analysis config should contain an auxiliary entry *dy_weight_config* pointing to a :py:class:`DrellYanConfig` + object: .. code-block:: python @@ -157,8 +167,12 @@ def dy_weights(self: Producer, events: ak.Array, **kwargs) -> ak.Array: # optionals if self.dy_config.order: variable_map["order"] = self.dy_config.order - if self.dy_config.njets: - variable_map["njets"] = ak.num(events.Jet, axis=1) + if callable(self.dy_config.get_njets): + variable_map["njets"] = self.dy_config.get_njets(self, events) + if callable(self.dy_config.get_nbtags): + variable_map["nbtags"] = self.dy_config.get_nbtags(self, events) + # for compatibility + variable_map["ntags"] = variable_map["nbtags"] # initializing the list of weight variations (called syst in the dy files) systs = [("nom", "")] @@ -193,10 +207,12 @@ def dy_weights_init(self: Producer) -> None: f"campaign year {self.config_inst.campaign.x.year} is not yet supported by {self.cls_name}", ) - # declare additional used columns + # get the dy weight config self.dy_config: DrellYanConfig = self.get_dy_weight_config() - if self.dy_config.njets: - self.uses.add("Jet.pt") + + # declare additional used columns + if self.dy_config.used_columns: + self.uses.update(self.dy_config.used_columns) # declare additional produced columns if self.dy_config.unc_correction: diff --git a/columnflow/production/cms/gen_particles.py b/columnflow/production/cms/gen_particles.py new file mode 100644 index 000000000..294af1a81 --- /dev/null +++ b/columnflow/production/cms/gen_particles.py @@ -0,0 +1,359 @@ +# coding: utf-8 + +""" +Producers that determine the generator-level particles and bring them into a structured format. This is most likely +useful for generator studies and truth definitions of physics objects. +""" + +from __future__ import annotations + +import law + +from columnflow.production import Producer, producer +from columnflow.columnar_util import set_ak_column +from columnflow.util import UNSET, maybe_import + +np = maybe_import("numpy") +ak = maybe_import("awkward") + + +logger = law.logger.get_logger(__name__) + +_keep_gen_part_fields = ["pt", "eta", "phi", "mass", "pdgId"] + + +# helper to transform generator particles by dropping / adding fields +def transform_gen_part(gen_parts: ak.Array, *, depth_limit: int, optional: bool = False) -> ak.Array: + # reduce down to relevant fields + arr = {} + for f in _keep_gen_part_fields: + if optional: + if (v := getattr(gen_parts, f, UNSET)) is not UNSET: + arr[f] = v + else: + arr[f] = getattr(gen_parts, f) + arr = ak.zip(arr, depth_limit=depth_limit) + + # remove parameters and add Lorentz vector behavior + arr = ak.without_parameters(arr) + arr = ak.with_name(arr, "PtEtaPhiMLorentzVector") + + return arr + + +@producer( + uses={ + "GenPart.{genPartIdxMother,status,statusFlags}", # required by the gen particle identification + f"GenPart.{{{','.join(_keep_gen_part_fields)}}}", # additional fields that should be read and added to gen_top + }, + produces={"gen_top.*.*"}, +) +def gen_top_lookup(self: Producer, events: ak.Array, strict: bool = True, **kwargs) -> ak.Array: + """ + Creates a new ragged column "gen_top" containing information about generator-level top quarks and their decay + products in a structured array with the following fields: + + - ``t``: list of all top quarks in the event, sorted such that top quarks precede anti-top quarks + - ``b``: list of bottom quarks from top quark decays, consistent ordering w.r.t. ``t`` (note that, in rare + cases, the decay into charm or down quarks is realized, and therefore stored in this field) + - ``w``: list of W bosons from top quark decays, consistent ordering w.r.t. ``t`` + - ``w_children``: list of W boson decay products, consistent ordering w.r.t. ``w``, the first entry is the + down-type quark or charged lepton, the second entry is the up-type quark or neutrino, and additional decay + products (e.g photons) are appended afterwards + - ``w_tau_children``: list of decay products from tau lepton decays stemming from W boson decays, however, + skipping the W boson from the tau lepton decay itself; the first entry is the tau neutrino, the second and + third entries are either the charged lepton and neutrino, or quarks or hadrons sorted by ascending absolute + pdg id; additional decay products (e.g photons) are appended afterwards + """ + # helper to extract unique values + unique_set = lambda a: set(np.unique(ak.flatten(a, axis=None))) + + # find hard top quarks + t = events.GenPart[abs(events.GenPart.pdgId) == 6] + t = t[t.hasFlags("isLastCopy")] # they are either fromHardProcess _or_ isLastCopy + + # sort them so that that top quarks come before anti-top quarks + t = t[ak.argsort(t.pdgId, axis=1, ascending=False)] + + # distinct top quark children + # (asking for isLastCopy leads to some tops that miss children, usually b's) + t_children = ak.drop_none(t.distinctChildren[t.distinctChildren.hasFlags("fromHardProcess", "isFirstCopy")]) + + # strict mode: check that there are exactly two children that are b and w + if strict: + if (tcn := unique_set(ak.num(t_children, axis=2))) != {2}: + raise Exception(f"found top quarks that have != 2 children: {tcn - {2}}") + if (tci := unique_set(abs(t_children.pdgId))) - {1, 3, 5, 24}: + raise Exception(f"found top quark children with unexpected pdgIds: {tci - {1, 3, 5, 24}}") + + # store b's (or s/d) and w's + abs_tc_ids = abs(t_children.pdgId) + b = ak.drop_none(ak.firsts(t_children[(abs_tc_ids == 1) | (abs_tc_ids == 3) | (abs_tc_ids == 5)], axis=2)) + w = ak.drop_none(ak.firsts(t_children[abs(t_children.pdgId) == 24], axis=2)) + + # distinct w children + w_children = ak.drop_none(w.distinctChildrenDeep) + + # distinguish into "hard" and additional ones + w_children_hard = w_children[(hard_mask := w_children.hasFlags("fromHardProcess"))] + w_children_rest = w_children[~hard_mask] + + # strict: check that there are exactly two hard children + if strict: + if (wcn := unique_set(ak.num(w_children_hard, axis=2))) != {2}: + raise Exception(f"found W bosons that have != 2 children: {wcn - {2}}") + + # sort them so that down-type quarks and charged leptons (odd pdgIds) come first, followed by up-type quarks and + # neutrinos (even pdgIds), then add back the remaining ones + w_children_hard = w_children_hard[ak.argsort(-(w_children_hard.pdgId % 2), axis=2)] + w_children = ak.concatenate([w_children_hard, w_children_rest], axis=2) + + # further distinguish tau decays in w_children + w_tau_children = ak.drop_none(w_children[abs(w_children.pdgId) == 15].distinctChildrenDeep) + # sort: nu tau first, photons last, rest in between sorted by ascending absolute pdgId + w_tau_nu_mask = abs(w_tau_children.pdgId) == 16 + w_tau_photon_mask = w_tau_children.pdgId == 22 + w_tau_rest = w_tau_children[~(w_tau_nu_mask | w_tau_photon_mask)] + w_tau_rest = w_tau_rest[ak.argsort(abs(w_tau_rest.pdgId), axis=3, ascending=True)] + w_tau_children = ak.concatenate( + [w_tau_children[w_tau_nu_mask], w_tau_rest, w_tau_children[w_tau_photon_mask]], + axis=3, + ) + + # zip into a single array with named fields + gen_top = ak.zip( + { + "t": transform_gen_part(t, depth_limit=2), + "b": transform_gen_part(b, depth_limit=2), + "w": transform_gen_part(w, depth_limit=2), + "w_children": transform_gen_part(w_children, depth_limit=3), + "w_tau_children": transform_gen_part(w_tau_children, depth_limit=4), + }, + depth_limit=1, + ) + + # save the column + events = set_ak_column(events, "gen_top", gen_top) + + return events + + +@producer( + uses={ + "GenPart.{genPartIdxMother,status,statusFlags}", # required by the gen particle identification + f"GenPart.{{{','.join(_keep_gen_part_fields)}}}", # additional fields that should be read and added to gen_top + }, + produces={"gen_higgs.*.*"}, +) +def gen_higgs_lookup(self: Producer, events: ak.Array, strict: bool = True, **kwargs) -> ak.Array: + """ + Creates a new ragged column "gen_higgs" containing information about generator-level Higgs bosons and their decay + products in a structured array with the following fields: + + - ``h``: list of all Higgs bosons in the event, sorted by the pdgId of their decay products such that Higgs + bosons decaying to quarks (b's) come first, followed by leptons, and then gauge bosons + - ``h_children``: list of direct Higgs boson children, consistent ordering w.r.t. ``h``, with the first entry + being the particle and the second one being the anti-particle; for Z bosons and (effective) gluons and + photons, no ordering is applied + - ``tau_children``: list of decay products from tau lepton decays coming from Higgs bosons, with the first entry + being the neutrino and the second one being the W boson + - ``tau_w_children``: list of the decay products from W boson decays from tau lepton decays, with the first + entry being the down-type quark or charged lepton, the second entry being the up-type quark or neutrino, and + additional decay products (e.g photons) are appended afterwards + - ``z_children``: not yet implemented + - ``w_children``: not yet implemented + """ + # helper to extract unique values + unique_set = lambda a: set(np.unique(ak.flatten(a, axis=None))) + + # find higgs + h = events.GenPart[events.GenPart.pdgId == 25] + h = h[h.hasFlags("fromHardProcess", "isLastCopy")] + + # sort them by increasing pdgId of their children (quarks, leptons, Z, W, effective gluons/photons) + h = h[ak.argsort(abs(ak.drop_none(ak.min(h.children.pdgId, axis=2))), axis=1, ascending=True)] + + # get distinct children + h_children = ak.drop_none(h.distinctChildren[h.distinctChildren.hasFlags("fromHardProcess", "isFirstCopy")]) + + # strict mode: check that there are exactly two children + if strict: + if (hcn := unique_set(ak.num(h_children, axis=2))) != {2}: + raise Exception(f"found Higgs bosons that have != 2 children: {hcn - {2}}") + + # sort them by decreasing pdgId + h_children = h_children[ak.argsort(h_children.pdgId, axis=2, ascending=False)] + # in strict mode, fix the children dimension to 2 + if strict: + h_children = h_children[:, :, [0, 1]] + + # further treatment of tau decays + tau_mask = h_children.pdgId[:, :, 0] == 15 + tau = ak.fill_none(h_children[ak.mask(tau_mask, tau_mask)], [], axis=1) + tau_children = tau.distinctChildrenDeep[tau.distinctChildrenDeep.hasFlags("isFirstCopy", "isTauDecayProduct")] + tau_children = ak.drop_none(tau_children) + # prepare neutrino and W boson handling + tau_nu_mask = abs(tau_children.pdgId) == 16 + tau_w_mask = abs(tau_children.pdgId) == 24 + tau_rest_mask = ~(tau_nu_mask | tau_w_mask) + tau_has_rest = ak.any(tau_rest_mask, axis=3) + # strict mode: there should always be a neutrino, and _either_ a W and nothing else _or_ no W at all + if strict: + if not ak.all(ak.any(tau_nu_mask[tau_mask], axis=3)): + raise Exception("found tau leptons without a tau neutrino among their children") + tau_has_w = ak.any(tau_w_mask, axis=3) + if not ak.all((tau_has_w ^ tau_has_rest)[tau_mask]): + raise Exception("found tau leptons with both W bosons and other decay products among their children") + # get the tau neutrino + tau_nu = tau_children[tau_nu_mask].sum(axis=3) + tau_nu = set_ak_column(tau_nu, "pdgId", ak.values_astype(16 * np.sign(tau.pdgId), np.int32)) + # get the W boson in case it is part of the tau children, otherwise build it from the sum of children + tau_w = tau_children[tau_w_mask].sum(axis=3) + if ak.any(tau_has_rest): + tau_w_rest = tau_children[tau_rest_mask].sum(axis=-1) + tau_w = ak.where(tau_has_rest, tau_w_rest, tau_w) + tau_w = set_ak_column(tau_w, "pdgId", ak.values_astype(-24 * np.sign(tau.pdgId), np.int32)) + # combine nu and w again + tau_nuw = ak.concatenate([tau_nu[..., None], tau_w[..., None]], axis=3) + # define w children + tau_w_children = ak.concatenate( + [tau_children[tau_rest_mask], ak.drop_none(ak.firsts(tau_children[tau_w_mask], axis=3).children)], + axis=2, + ) + + # children for decays other than taus are not yet implemented, so show a warning in case they are found + unhandled_ids = unique_set(abs(h_children.pdgId)) - set(range(1, 6 + 1)) - set(range(11, 16 + 1)) + if unhandled_ids: + logger.warning_once( + f"gen_higgs_undhandled_children_{'_'.join(map(str, sorted(unhandled_ids)))}", + f"found Higgs boson decays in the {self.cls_name} producer with pdgIds {unhandled_ids}, for which the " + "lookup of children is not yet implemented", + ) + + # zip into a single array with named fields + gen_higgs = ak.zip( + { + "h": transform_gen_part(h, depth_limit=2), + "h_children": transform_gen_part(h_children, depth_limit=3), + "tau_children": transform_gen_part(tau_nuw, depth_limit=4), + "tau_w_children": transform_gen_part(tau_w_children, depth_limit=4), + # "z_children": None, # not yet implemented + # "w_children": None, # not yet implemented + }, + depth_limit=1, + ) + + # save the column + events = set_ak_column(events, "gen_higgs", gen_higgs) + + return events + + +@producer( + uses={ + "GenPart.{genPartIdxMother,status,statusFlags}", # required by the gen particle identification + f"GenPart.{{{','.join(_keep_gen_part_fields)}}}", # additional fields that should be read and added to gen_top + }, + produces={"gen_dy.*.*"}, +) +def gen_dy_lookup(self: Producer, events: ak.Array, strict: bool = True, **kwargs) -> ak.Array: + """ + Creates a new ragged column "gen_dy" containing information about generator-level Z/g bosons and their decay + products in a structured array with the following fields: + + - ``z``: list of all Z/g bosons in the event, sorted by the pdgId of their decay products + - ``lep``: list of direct Z/g boson children, consistent ordering w.r.t. ``z``, with the first entry being the + lepton and the second one being the anti-lepton + - ``tau_children``: list of decay products from tau lepton decays coming from Z/g bosons, with the first entry + being the neutrino and the second one being the W boson + - ``tau_w_children``: list of the decay products from W boson decays from tau lepton decays, with the first + entry being the down-type quark or charged lepton, the second entry being the up-type quark or neutrino, and + additional decay products (e.g photons) are appended afterwards + """ + # note: in about 4% of DY events, the Z/g boson is missing, so this lookup starts at lepton level, see + # -> https://indico.cern.ch/event/1495537/contributions/6359516/attachments/3014424/5315938/HLepRare_25.02.14.pdf + # -> https://indico.cern.ch/event/1495537/contributions/6359516/attachments/3014424/5315938/HLepRare_25.02.14.pdf + + # helper to extract unique values + unique_set = lambda a: set(np.unique(ak.flatten(a, axis=None))) + + # get the e/mu and tau masks + abs_id = abs(events.GenPart.pdgId) + emu_mask = ( + ((abs_id == 11) | (abs_id == 13)) & + (events.GenPart.status == 1) & + events.GenPart.hasFlags("fromHardProcess") + ) + # taus need to have status == 2 + tau_mask = ( + (abs_id == 15) & + (events.GenPart.status == 2) & + events.GenPart.hasFlags("fromHardProcess") + ) + lep_mask = emu_mask | tau_mask + + # strict mode: there must be exactly two charged leptons per event + if strict: + if (nl := unique_set(ak.num(events.GenPart[lep_mask], axis=1))) - {2}: + raise Exception(f"found events that have != 2 charged leptons: {nl - {2}}") + + # get the leptons and sort by decreasing pdgId (lepton before anti-lepton) + lep = events.GenPart[lep_mask] + lep = lep[ak.argsort(lep.pdgId, axis=1, ascending=False)] + + # in strict mode, fix the lep dimension to 2 + if strict: + lep = lep[:, [0, 1]] + + # build the z from them + z = lep.sum(axis=-1) + z = set_ak_column(z, "pdgId", np.int32(23)) + + # further treatment of tau decays + tau = events.GenPart[tau_mask] + tau_children = tau.distinctChildren[tau.distinctChildren.hasFlags("isFirstCopy", "isTauDecayProduct")] + tau_children = ak.drop_none(tau_children) + # prepare neutrino and W boson handling + tau_nu_mask = abs(tau_children.pdgId) == 16 + tau_w_mask = abs(tau_children.pdgId) == 24 + tau_rest_mask = ~(tau_nu_mask | tau_w_mask) + tau_has_rest = ak.any(tau_rest_mask, axis=2) + # strict mode: there should always be a neutrino, and _either_ a W and nothing else _or_ no W at all + if strict: + if not ak.all(ak.any(tau_nu_mask, axis=2)): + raise Exception("found tau leptons without a tau neutrino among their children") + tau_has_w = ak.any(tau_w_mask, axis=2) + if not ak.all(tau_has_w ^ tau_has_rest): + raise Exception("found tau leptons with both W bosons and other decay products among their children") + # get the tau neutrino + tau_nu = tau_children[tau_nu_mask].sum(axis=2) + tau_nu = set_ak_column(tau_nu, "pdgId", ak.values_astype(16 * np.sign(tau.pdgId), np.int32)) + # get the W boson in case it is part of the tau children, otherwise build it from the sum of children + tau_w = tau_children[tau_w_mask].sum(axis=2) + if ak.any(tau_has_rest): + tau_w_rest = tau_children[tau_rest_mask].sum(axis=-1) + tau_w = ak.where(tau_has_rest, tau_w_rest, tau_w) + tau_w = set_ak_column(tau_w, "pdgId", ak.values_astype(-24 * np.sign(tau.pdgId), np.int32)) + # combine nu and w again + tau_nuw = ak.concatenate([tau_nu[..., None], tau_w[..., None]], axis=2) + # define w children + tau_w_children = ak.concatenate( + [tau_children[tau_rest_mask], ak.drop_none(ak.firsts(tau_children[tau_w_mask], axis=2).children)], + axis=1, + ) + + # zip into a single array with named fields + gen_dy = ak.zip( + { + "z": transform_gen_part(z, depth_limit=1), + "lep": transform_gen_part(lep, depth_limit=2), + "tau_children": transform_gen_part(tau_nuw, depth_limit=3), + "tau_w_children": transform_gen_part(tau_w_children, depth_limit=3), + }, + depth_limit=1, + ) + + # save the column + events = set_ak_column(events, "gen_dy", gen_dy) + + return events diff --git a/columnflow/production/cms/gen_top_decay.py b/columnflow/production/cms/gen_top_decay.py deleted file mode 100644 index 8e925aaa0..000000000 --- a/columnflow/production/cms/gen_top_decay.py +++ /dev/null @@ -1,90 +0,0 @@ -# coding: utf-8 - -""" -Producers that determine the generator-level particles related to a top quark decay. -""" - -from __future__ import annotations - -from columnflow.production import Producer, producer -from columnflow.util import maybe_import -from columnflow.columnar_util import set_ak_column - -ak = maybe_import("awkward") - - -@producer( - uses={"GenPart.{genPartIdxMother,pdgId,statusFlags}"}, - produces={"gen_top_decay"}, -) -def gen_top_decay_products(self: Producer, events: ak.Array, **kwargs) -> ak.Array: - """ - Creates a new ragged column "gen_top_decay" with one element per hard top quark. Each element is - a GenParticleArray with five or more objects in a distinct order: top quark, bottom quark, - W boson, down-type quark or charged lepton, up-type quark or neutrino, and any additional decay - produces of the W boson (if any, then most likly photon radiations). Per event, the structure - will be similar to: - - .. code-block:: python - - [ - # event 1 - [ - # top 1 - [t1, b1, W1, q1/l, q2/n(, additional_w_decay_products)], - # top 2 - [...], - ], - # event 2 - ... - ] - """ - # find hard top quarks - abs_id = abs(events.GenPart.pdgId) - t = events.GenPart[abs_id == 6] - t = t[t.hasFlags("isHardProcess")] - t = t[~ak.is_none(t, axis=1)] - - # distinct top quark children (b's and W's) - t_children = t.distinctChildrenDeep[t.distinctChildrenDeep.hasFlags("isHardProcess")] - - # get b's - b = t_children[abs(t_children.pdgId) == 5][:, :, 0] - - # get W's - w = t_children[abs(t_children.pdgId) == 24][:, :, 0] - - # distinct W children - w_children = w.distinctChildrenDeep[w.distinctChildrenDeep.hasFlags("isHardProcess")] - - # reorder the first two W children (leptons or quarks) so that the charged lepton / down-type - # quark is listed first (they have an odd pdgId) - w_children_firsttwo = w_children[:, :, :2] - w_children_firsttwo = w_children_firsttwo[(w_children_firsttwo.pdgId % 2 == 0) * 1] - w_children_rest = w_children[:, :, 2:] - - # concatenate to create the structure to return - groups = ak.concatenate( - [ - t[:, :, None], - b[:, :, None], - w[:, :, None], - w_children_firsttwo, - w_children_rest, - ], - axis=2, - ) - - # save the column - events = set_ak_column(events, "gen_top_decay", groups) - - return events - - -@gen_top_decay_products.skip -def gen_top_decay_products_skip(self: Producer, **kwargs) -> bool: - """ - Custom skip function that checks whether the dataset is a MC simulation containing top - quarks in the first place. - """ - return self.dataset_inst.is_data or not self.dataset_inst.has_tag("has_top") diff --git a/columnflow/production/cms/top_pt_weight.py b/columnflow/production/cms/top_pt_weight.py index bb1fb4c4e..8207414d2 100644 --- a/columnflow/production/cms/top_pt_weight.py +++ b/columnflow/production/cms/top_pt_weight.py @@ -6,13 +6,13 @@ from __future__ import annotations -from dataclasses import dataclass +import dataclasses import law from columnflow.production import Producer, producer from columnflow.util import maybe_import -from columnflow.columnar_util import set_ak_column +from columnflow.columnar_util import set_ak_column, full_like ak = maybe_import("awkward") np = maybe_import("numpy") @@ -21,134 +21,101 @@ logger = law.logger.get_logger(__name__) -@dataclass -class TopPtWeightConfig: - params: dict[str, float] - pt_max: float = 500.0 - - @classmethod - def new(cls, obj: TopPtWeightConfig | dict[str, float]) -> TopPtWeightConfig: - # backward compatibility only - if isinstance(obj, cls): - return obj - return cls(params=obj) - - -@producer( - uses={"GenPart.{pdgId,statusFlags}"}, - # requested GenPartonTop columns, passed to the *uses* and *produces* - produced_top_columns={"pt"}, - mc_only=True, - # skip the producer unless the datasets has this specified tag (no skip check performed when none) - require_dataset_tag="has_top", -) -def gen_parton_top(self: Producer, events: ak.Array, **kwargs) -> ak.Array: +@dataclasses.dataclass +class TopPtWeightFromDataConfig: """ - Produce parton-level top quarks (before showering and detector simulation). - Creates new collection named "GenPartonTop" - - *produced_top_columns* can be adapted to change the columns that will be produced - for the GenPartonTop collection. - - The function is skipped when the dataset is data or when it does not have the tag *has_top*. - - :param events: awkward array containing events to process + Container to configure the top pt reweighting parameters for the method based on fits to data. For more info, see + https://twiki.cern.ch/twiki/bin/viewauth/CMS/TopPtReweighting?rev=31#TOP_PAG_corrections_based_on_dat """ - # find parton-level top quarks - abs_id = abs(events.GenPart.pdgId) - t = events.GenPart[abs_id == 6] - t = t[t.hasFlags("isLastCopy")] - t = t[~ak.is_none(t, axis=1)] - - # save the column - events = set_ak_column(events, "GenPartonTop", t) - - return events - - -@gen_parton_top.init -def gen_parton_top_init(self: Producer, **kwargs) -> bool: - for col in self.produced_top_columns: - self.uses.add(f"GenPart.{col}") - self.produces.add(f"GenPartonTop.{col}") + params: dict[str, float] = dataclasses.field(default_factory=lambda: { + "a": 0.0615, + "a_up": 0.0615 * 1.5, + "a_down": 0.0615 * 0.5, + "b": -0.0005, + "b_up": -0.0005 * 1.5, + "b_down": -0.0005 * 0.5, + }) + pt_max: float = 500.0 -@gen_parton_top.skip -def gen_parton_top_skip(self: Producer, **kwargs) -> bool: +@dataclasses.dataclass +class TopPtWeightFromTheoryConfig: """ - Custom skip function that checks whether the dataset is a MC simulation containing top quarks in the first place - using the :py:attr:`require_dataset_tag` attribute. + Container to configure the top pt reweighting parameters for the theory-based method. For more info, see + https://twiki.cern.ch/twiki/bin/viewauth/CMS/TopPtReweighting?rev=31#TOP_PAG_corrections_based_on_the """ - # never skip if the tag is not set - if self.require_dataset_tag is None: - return False - - return self.dataset_inst.is_data or not self.dataset_inst.has_tag(self.require_dataset_tag) - - -def get_top_pt_weight_config(self: Producer) -> TopPtWeightConfig: - if self.config_inst.has_aux("top_pt_reweighting_params"): - logger.info_once( - "deprecated_top_pt_weight_config", - "the config aux field 'top_pt_reweighting_params' is deprecated and will be removed in " - "a future release, please use 'top_pt_weight' instead", + params: dict[str, float] = dataclasses.field(default_factory=lambda: { + "a": 0.103, + "b": -0.0118, + "c": -0.000134, + "d": 0.973, + }) + + +# for backward compatibility +class TopPtWeightConfig(TopPtWeightFromDataConfig): + + def __init__(self, *args, **kwargs): + logger.warning_once( + "TopPtWeightConfig is deprecated and will be removed in future versions, please use " + "TopPtWeightFromDataConfig instead to keep using the data-based method, or TopPtWeightFromTheoryConfig to " + "use the theory-based method", ) - params = self.config_inst.x.top_pt_reweighting_params - else: - params = self.config_inst.x.top_pt_weight - - return TopPtWeightConfig.new(params) + super().__init__(*args, **kwargs) @producer( - uses={"GenPartonTop.pt"}, + uses={"gen_top.t.pt"}, produces={"top_pt_weight{,_up,_down}"}, - get_top_pt_weight_config=get_top_pt_weight_config, - # skip the producer unless the datasets has this specified tag (no skip check performed when none) - require_dataset_tag="is_ttbar", + get_top_pt_weight_config=(lambda self: self.config_inst.x.top_pt_weight), ) def top_pt_weight(self: Producer, events: ak.Array, **kwargs) -> ak.Array: - """ - Compute SF to be used for top pt reweighting. + r""" + Compute SF to be used for top pt reweighting, either with information from a fit to data or from theory. See https://twiki.cern.ch/twiki/bin/view/CMS/TopPtReweighting?rev=31 for more information. - The *GenPartonTop.pt* column can be produced with the :py:class:`gen_parton_top` Producer. The - SF should *only be applied in ttbar MC* as an event weight and is computed based on the - gen-level top quark transverse momenta. - - The top pt reweighting parameters should be given as an auxiliary entry in the config: + The method to be used depends on the config entry obtained with *get_top_pt_config* which should either be of + type :py:class:`TopPtWeightFromDataConfig` or :py:class:`TopPtWeightFromTheoryConfig`. - .. code-block:: python + - data-based: $SF(p_T)=e^{a + b \cdot p_T}$ + - theory-based: $SF(p_T)=a \cdot e^{b \cdot p_T} + c \cdot p_T + d$ - cfg.x.top_pt_reweighting_params = { - "a": 0.0615, - "a_up": 0.0615 * 1.5, - "a_down": 0.0615 * 0.5, - "b": -0.0005, - "b_up": -0.0005 * 1.5, - "b_down": -0.0005 * 0.5, - } + The *gen_top.t.pt* column can be produced with the :py:class:`gen_top_lookup` producer. The SF should *only be + applied in ttbar MC* as an event weight and is computed based on the gen-level top quark transverse momenta. + The top pt weight configuration should be given as an auxiliary entry "top_pt_weight" in the config. *get_top_pt_config* can be adapted in a subclass in case it is stored differently in the config. - - :param events: awkward array containing events to process """ # check the number of gen tops - if ak.any((n_tops := ak.num(events.GenPartonTop, axis=1)) != 2): + if ak.any((n_tops := ak.num(events.gen_top.t, axis=1)) != 2): raise Exception( - f"{self.cls_name} can only run on events with two generator top quarks, but found " - f"counts of {','.join(map(str, sorted(set(n_tops))))}", + f"{self.cls_name} can only run on events with two generator top quarks, but found counts of " + f"{','.join(map(str, sorted(set(n_tops))))}", ) - # clamp top pt - top_pt = events.GenPartonTop.pt - if self.cfg.pt_max >= 0.0: + # get top pt + top_pt = events.gen_top.t.pt + if not self.theory_method and self.cfg.pt_max >= 0.0: top_pt = ak.where(top_pt > self.cfg.pt_max, self.cfg.pt_max, top_pt) - for variation in ("", "_up", "_down"): - # evaluate SF function - sf = np.exp(self.cfg.params[f"a{variation}"] + self.cfg.params[f"b{variation}"] * top_pt) + for variation in ["", "_up", "_down"]: + # evaluate SF function, implementation is method dependent + if self.theory_method: + # up variation: apply twice the effect + # down variation: no weight at all + if variation != "_down": + sf = ( + self.cfg.params["a"] * np.exp(self.cfg.params["b"] * top_pt) + + self.cfg.params["c"] * top_pt + + self.cfg.params["d"] + ) + if variation == "_up": + sf = 1.0 + 2.0 * (sf - 1.0) + elif variation == "_down": + sf = full_like(top_pt, 1.0) + else: + sf = np.exp(self.cfg.params[f"a{variation}"] + self.cfg.params[f"b{variation}"] * top_pt) # compute weight from SF product for top and anti-top weight = np.sqrt(np.prod(sf, axis=1)) @@ -163,14 +130,9 @@ def top_pt_weight(self: Producer, events: ak.Array, **kwargs) -> ak.Array: def top_pt_weight_init(self: Producer) -> None: # store the top pt weight config self.cfg = self.get_top_pt_weight_config() - - -@top_pt_weight.skip -def top_pt_weight_skip(self: Producer, **kwargs) -> bool: - """ - Skip if running on anything except ttbar MC simulation, evaluated via the :py:attr:`require_dataset_tag` attribute. - """ - if self.require_dataset_tag is None: - return self.dataset_inst.is_data - - return self.dataset_inst.is_data or not self.dataset_inst.has_tag("is_ttbar") + if not isinstance(self.cfg, (TopPtWeightFromDataConfig, TopPtWeightFromTheoryConfig)): + raise Exception( + f"{self.cls_name} expects the config entry obtained with get_top_pt_weight_config to be of type " + f"TopPtWeightFromDataConfig or TopPtWeightFromTheoryConfig, but got {type(self.cfg)}", + ) + self.theory_method = isinstance(self.cfg, TopPtWeightFromTheoryConfig) diff --git a/columnflow/production/util.py b/columnflow/production/util.py index 1df6d49f9..938876282 100644 --- a/columnflow/production/util.py +++ b/columnflow/production/util.py @@ -47,11 +47,14 @@ def attach_coffea_behavior( # general awkward array functions # -def ak_extract_fields(arr: ak.Array, fields: list[str], **kwargs): +def ak_extract_fields(arr: ak.Array, fields: list[str], optional_fields: list[str] | None = None, **kwargs): """ Build an array containing only certain `fields` of an input array `arr`, preserving behaviors. """ + if optional_fields is None: + optional_fields = [] + # reattach behavior if "behavior" not in kwargs: kwargs["behavior"] = arr.behavior @@ -60,6 +63,10 @@ def ak_extract_fields(arr: ak.Array, fields: list[str], **kwargs): { field: getattr(arr, field) for field in fields + } | { + field: getattr(arr, field) + for field in optional_fields + if field in arr.fields }, **kwargs, ) diff --git a/columnflow/tasks/cms/external.py b/columnflow/tasks/cms/external.py index 03eb98220..148b0bac5 100644 --- a/columnflow/tasks/cms/external.py +++ b/columnflow/tasks/cms/external.py @@ -6,6 +6,11 @@ from __future__ import annotations +__all__ = [] + +import os +import glob + import luigi import law @@ -20,6 +25,8 @@ class CreatePileupWeights(ConfigTask): + task_namespace = "cf.cms" + single_config = True data_mode = luigi.ChoiceParameter( @@ -162,3 +169,73 @@ def normalize_values(cls, values: Sequence[float]) -> list[float]: enable=["configs", "skip_configs"], attributes={"version": None}, ) + + +class CheckCATUpdates(ConfigTask, law.tasks.RunOnceTask): + """ + CMS specific task that checks for updates in the metadata managed and stored by the CAT group. See + https://cms-analysis-corrections.docs.cern.ch for more info. + + To function correctly, this task requires an auxiliary entry ``cat_info`` in the analysis config, pointing to a + :py:class:`columnflow.cms_util.CATInfo` instance that defines the era information and the current POG correction + timestamps. The task will then check in the CAT metadata structure if newer timestamps are available. + """ + + task_namespace = "cf.cms" + + version = None + + single_config = False + + def run(self): + # helpers to convert date strings to tuples for numeric comparisons + decode_date_str = lambda s: tuple(map(int, s.split("-"))) + + # loop through configs + for config_inst in self.config_insts: + with self.publish_step( + f"checking CAT metadata updates for config '{law.util.colored(config_inst.name, style='bright')}' in " + f"{config_inst.x.cat_info.metadata_root}", + ): + newest_dates = {} + updated_any = False + for pog, date_str in config_inst.x.cat_info.snapshot.items(): + if not date_str: + continue + + # get all versions in the cat directory, split by date numbers + pog_era_dir = os.path.join( + config_inst.x.cat_info.metadata_root, + pog.upper(), + config_inst.x.cat_info.get_era_directory(pog), + ) + if not os.path.isdir(pog_era_dir): + self.logger.warning(f"CAT metadata directory '{pog_era_dir}' does not exist, skipping") + continue + dates = [ + os.path.basename(path) + for path in glob.glob(os.path.join(pog_era_dir, "*-*-*")) + ] + if not dates: + raise ValueError(f"no CAT snapshots found in '{pog_era_dir}'") + + # compare with current date + latest_date_str = max(dates, key=decode_date_str) + if date_str == "latest" or decode_date_str(date_str) < decode_date_str(latest_date_str): + newest_dates[pog] = latest_date_str + updated_any = True + self.publish_message( + f"found newer {law.util.colored(pog.upper(), color='cyan')} snapshot: {date_str} -> " + f"{latest_date_str} ({os.path.join(pog_era_dir, latest_date_str)})", + ) + else: + newest_dates[pog] = date_str + + # print a new CATSnapshot line that can be copy-pasted into the config + if updated_any: + args_str = ", ".join(f"{pog}=\"{date_str}\"" for pog, date_str in newest_dates.items() if date_str) + self.publish_message( + f"{law.util.colored('new CATSnapshot line ->', style='bright')} CATSnapshot({args_str})\n", + ) + else: + self.publish_message("no updates found\n") diff --git a/columnflow/tasks/cms/inference.py b/columnflow/tasks/cms/inference.py index e88c41975..abf8ec2ec 100644 --- a/columnflow/tasks/cms/inference.py +++ b/columnflow/tasks/cms/inference.py @@ -130,7 +130,7 @@ def run(self): proc_objs.append(self.inference_model_inst.process_spec(name="data")) for proc_obj in proc_objs: # skip the process objects if it does not contribute to this config_inst - if config_inst.name not in proc_obj.config_data: + if config_inst.name not in proc_obj.config_data and proc_obj.name != "data": continue # get all process instances (keys in _input_hists) to be combined diff --git a/columnflow/tasks/external.py b/columnflow/tasks/external.py index 8f37ede77..33c4a4793 100644 --- a/columnflow/tasks/external.py +++ b/columnflow/tasks/external.py @@ -591,7 +591,12 @@ def fetch(src, dst): # copy local dir shutil.copytree(src, dst) else: - raise NotImplementedError(f"fetching {src} is not supported") + err = f"cannot fetch {src}" + if src.startswith("/") and os.path.isdir("/".join(src.split("/", 2)[:2])): + err += ", file or directory does not exist" + else: + err += ", resource type is not supported" + raise NotImplementedError(err) # helper function to fetch generic files def fetch_file(ext_file, counter=[0]): diff --git a/columnflow/tasks/framework/base.py b/columnflow/tasks/framework/base.py index 177ed8e84..0cf4e0d42 100644 --- a/columnflow/tasks/framework/base.py +++ b/columnflow/tasks/framework/base.py @@ -1270,6 +1270,17 @@ def resolve_param_values(cls, params: dict[str, Any]) -> dict[str, Any]: params["config_insts"] = [params["config_inst"]] else: if "config_insts" not in params and "configs" in params: + # custom pattern matching + matched_config_names = [] + for pattern in params["configs"]: + matched_config_names.extend( + config_name for config_name in analysis_inst.configs.names() + if law.util.multi_match(config_name, pattern) + ) + matched_config_names = law.util.make_unique(matched_config_names) + if matched_config_names: + params["configs"] = matched_config_names + # load config instances params["config_insts"] = list(map(analysis_inst.get_config, params["configs"])) # resolving of parameters that is required before ArrayFunctions etc. can be initialized diff --git a/columnflow/tasks/framework/mixins.py b/columnflow/tasks/framework/mixins.py index 54fd42d35..e4a4ce9e7 100644 --- a/columnflow/tasks/framework/mixins.py +++ b/columnflow/tasks/framework/mixins.py @@ -30,6 +30,7 @@ from columnflow.timing import Timer +np = maybe_import("numpy") ak = maybe_import("awkward") @@ -2634,18 +2635,25 @@ class ChunkedIOMixin(ConfigTask): @classmethod def raise_if_not_finite(cls, ak_array: ak.Array) -> None: """ - Checks whether all values in array *ak_array* are finite. + Checks whether values of all columns in *ak_array* are finite. String and bytestring types are skipped. The check is performed using the :external+numpy:py:func:`numpy.isfinite` function. - :param ak_array: Array with events to check. + :param ak_array: Array with columns to check. :raises ValueError: If any value in *ak_array* is not finite. """ - import numpy as np from columnflow.columnar_util import get_ak_routes for route in get_ak_routes(ak_array): - if ak.any(~np.isfinite(ak.flatten(route.apply(ak_array), axis=None))): + # flatten + flat = ak.flatten(route.apply(ak_array), axis=None) + # perform parameter dependent checks + if isinstance((params := getattr(getattr(flat, "layout", None), "parameters", None)), dict): + # skip string and bytestring arrays + if params.get("__array__") in {"string", "bytestring"}: + continue + # check finiteness + if ak.any(~np.isfinite(flat)): raise ValueError(f"found one or more non-finite values in column '{route.column}' of array {ak_array}") @classmethod diff --git a/columnflow/tasks/framework/remote.py b/columnflow/tasks/framework/remote.py index 8f3393ba4..fae1d3559 100644 --- a/columnflow/tasks/framework/remote.py +++ b/columnflow/tasks/framework/remote.py @@ -48,6 +48,10 @@ class BundleRepo(AnalysisTask, law.git.BundleGitRepository, law.tasks.TransferLo os.environ["CF_CONDA_BASE"], ] + include_files = [ + "law_user.cfg", + ] + def get_repo_path(self): # required by BundleGitRepository return os.environ["CF_REPO_BASE"] diff --git a/columnflow/tasks/histograms.py b/columnflow/tasks/histograms.py index 6fc0fc604..b78003302 100644 --- a/columnflow/tasks/histograms.py +++ b/columnflow/tasks/histograms.py @@ -21,6 +21,7 @@ from columnflow.tasks.reduction import ReducedEventsUser from columnflow.tasks.production import ProduceColumns from columnflow.tasks.ml import MLEvaluation +from columnflow.hist_util import sum_hists from columnflow.util import dev_sandbox @@ -446,7 +447,7 @@ def run(self): # merge them variable_hists = [h[variable_name] for h in hists] - merged = sum(variable_hists[1:], variable_hists[0].copy()) + merged = sum_hists(variable_hists) # post-process the merged histogram merged = self.hist_producer_inst.run_post_process_merged_hist(merged, task=self) @@ -544,7 +545,7 @@ def run(self): ] # merge and write the output - merged = sum(variable_hists[1:], variable_hists[0].copy()) + merged = sum_hists(variable_hists) outp.dump(merged, formatter="pickle") diff --git a/columnflow/tasks/plotting.py b/columnflow/tasks/plotting.py index 6dd2acf3b..ba91c0492 100644 --- a/columnflow/tasks/plotting.py +++ b/columnflow/tasks/plotting.py @@ -266,15 +266,12 @@ def run(self): for process_inst in hists.keys(): h = hists[process_inst] # determine expected shifts from the intersection of requested shifts and those known for the process - # process_shifts = ( - # process_shift_map[process_inst.name] - # if process_inst.name in process_shift_map - # else {"nominal"} - # ) - - # change Ghent: replace all expected shifts with nominal. - # not preffered by columnflow: https://github.com/columnflow/columnflow/pull/692 - expected_shifts = plot_shift_names # & process_shifts + process_shifts = ( + process_shift_map[process_inst.name] + if process_inst.name in process_shift_map + else {"nominal"} + ) + expected_shifts = (process_shifts & plot_shift_names) or (process_shifts & {"nominal"}) if not expected_shifts: raise Exception(f"no shifts to plot found for process {process_inst.name}") # selections diff --git a/columnflow/tasks/reduction.py b/columnflow/tasks/reduction.py index 08aadca45..5deef6bb8 100644 --- a/columnflow/tasks/reduction.py +++ b/columnflow/tasks/reduction.py @@ -213,12 +213,16 @@ def run(self): ) # invoke the reducer - if len(events): + if len(events) > 0: n_all += len(events) events = attach_coffea_behavior(events) events = self.reducer_inst(events, selection=sel, task=self) n_reduced += len(events) + # no need to proceed when no events are left + if len(events) == 0: + continue + # remove columns events = route_filter(events) diff --git a/modules/law b/modules/law index 44b98b7dc..3adec62db 160000 --- a/modules/law +++ b/modules/law @@ -1 +1 @@ -Subproject commit 44b98b7dcd434badd003fd498eaf399e14c3ee53 +Subproject commit 3adec62db42d1fe8021c792538fe66ee1ed77b91