diff --git a/.gitignore b/.gitignore index 8ff9bb7ec..9b3ccce32 100644 --- a/.gitignore +++ b/.gitignore @@ -29,6 +29,7 @@ custom_data test_input_data *.zip last_bootstrax_exception.txt +bootstrax_exceptions # cProfile output *.prof diff --git a/bin/bootstrax b/bin/bootstrax index 0b0715806..584f218e6 100755 --- a/bin/bootstrax +++ b/bin/bootstrax @@ -15,13 +15,14 @@ How to use For more info, see the documentation: https://straxen.readthedocs.io/en/latest/bootstrax.html """ -__version__ = '2.0.0' +__version__ = '3.0.0' import argparse import typing from datetime import datetime, timedelta, timezone import logging import multiprocessing +import multiprocess import npshmex import os import os.path as osp @@ -39,6 +40,7 @@ import straxen import threading import pandas as pd import typing as ty +from immutabledict import immutabledict import daqnt import fnmatch from glob import glob @@ -86,7 +88,12 @@ parser.add_argument( parser.add_argument( '--max_messages', type=int, default=10, help="number of max mailbox messages") - +parser.add_argument( + '--software_veto_overwrite', default=None, + help="Class name of veto plugin to apply (i.e.: RadialVeto). It's overwritten by rundoc value.") +parser.add_argument( + '--test_input_folder', default=None, + help="Add to storage") actions = parser.add_mutually_exclusive_group() actions.add_argument( @@ -109,8 +116,8 @@ args = parser.parse_args() # The folder that can be used for testing bootstrax (i.e. non production # mode). It will be written to: -test_data_folder = ('/data/test_processed/' if - os.path.exists('/data/test_processed/') +test_data_folder = ('/data/test_pre_processed/' if + os.path.exists('/data/test_pre_processed/') else './bootstrax/') # Timeouts in seconds @@ -270,6 +277,9 @@ if os.access(output_folder, os.W_OK) is not True: raise IOError(message) + + + def new_context(cores=args.cores, max_messages=args.max_messages, timeout=500, @@ -293,8 +303,11 @@ def new_context(cores=args.cores, # all other storage frontends except fo the test folder. context.storage = [context.storage[0], strax.DataDirectory(output_folder)] + if args.test_input_folder is not None: + context.storage += [strax.DataDirectory(args.test_input_folder, readonly=True)] context.storage[0].readonly = True context.storage[0].local_only = True + return context @@ -316,6 +329,7 @@ if not args.undying: def main(): + if args.cores == -1: # Use all of the available cores on this machine args.cores = multiprocessing.cpu_count() @@ -505,7 +519,7 @@ def keep_target(targets, compare_with, n_fails): return kept_targets -def infer_target(rd: dict) -> dict: +def infer_target(rd: dict, software_veto_on: bool = False) -> dict: """ Check if the target should be overridden based on the mode of the DAQ for this run :param rd: rundoc @@ -514,6 +528,20 @@ def infer_target(rd: dict) -> dict: targets = args.targets.copy() post_process = args.post_process.copy() + if software_veto_on: + # TODO study which targets_sv is better to have in targets or post + # TODO THIS is bad!! + _targets = list(targets) + _targets.append('raw_records_sv') + _targets.append('peaklets_sv') + + _post = list(post_process) + _post.append('event_info_sv') + + targets = tuple(_targets) + post_process = tuple(_post) + + if args.fix_target: return {'targets': strax.to_str_tuple(targets), 'post_processing': strax.to_str_tuple(post_process)} @@ -579,6 +607,8 @@ def infer_target(rd: dict) -> dict: f'processing up to {targets} and postprocessing ' f'to {post_process}') + + if targets is None or not len(targets): targets = 'raw_records' if post_process is None or not len(post_process): @@ -1169,7 +1199,7 @@ def manual_fail(*, mongo_id=None, number=None, reason=''): def run_strax(run_id, input_dir, targets, readout_threads, compressor, run_start_time, samples_per_record, cores, max_messages, timeout, daq_chunk_duration, daq_overlap_chunk_duration, post_processing, - records_compressor, debug=False): + records_compressor, software_veto_name=None, debug=False): # Check mongo connection ping_dbs() # Clear the swap memory used by npshmmex @@ -1193,9 +1223,19 @@ def run_strax(run_id, input_dir, targets, readout_threads, compressor, timeout=timeout, ) + if software_veto_name is not None: + # If the software veto is on, let's register the correct extra plugins + st = software_veto_register_plugins(st, software_veto_name) + for t in ('raw_records', 'records', 'records_nv', 'hitlets_nv'): # Set the (raw)records processor to the inferred one st._plugin_class_registry[t].compressor = records_compressor + try: + st._plugin_class_registry[t+'_sv'].compressor = records_compressor + except: + pass + + # Make a function for running strax, call the function to process the run # This way, it can also be run inside a wrapper to profile strax @@ -1307,6 +1347,9 @@ def process_run(rd, send_heartbeats=args.production): # or use the test-dir: if not osp.exists(loc): loc = os.path.join('/live_data/xenonnt_bootstrax_test/', run_id) + # or maybe it's in the /data dir: + if not osp.exists(loc): + loc = os.path.join('/data/xenonnt_bootstrax_test/', run_id) else: for dd in rd['data']: @@ -1369,10 +1412,14 @@ def process_run(rd, send_heartbeats=args.production): except Exception as e: fail(f"Could not find start in datetime.datetime object: {str(e)}") - run_strax_config.update(infer_target(rd)) + software_veto_on = software_veto_get_name(rd, args)['software_veto_name'] is not None + run_strax_config.update(infer_target(rd, software_veto_on)) run_strax_config.update(infer_mode(rd)) + run_strax_config.update(software_veto_get_name(rd, args)) run_strax_config['debug'] = args.debug - strax_proc = multiprocessing.Process( + + log.info(f"Processing run!") + strax_proc = multiprocess.Process( target=run_strax, kwargs=run_strax_config) @@ -1642,7 +1689,52 @@ def cleanup_db(): abandon(mongo_id=rd['_id']) + + +########################### +# Software veto functions # +########################### + + + +def software_veto_get_name(rd, args=None): + """Decide if software veto is on based on arguments or rundoc""" + if args is not None: + if args.software_veto_overwrite is not None: + return {'software_veto_name': args.software_veto_overwrite} + + return {'software_veto_name': rd.get('software_veto', None)} + + +def software_veto_register_plugins(st, veto_name): + """Based on the selection that we want to apply, + register the correct veto plugin. + Probably we want to pass it as argument and if possible overwrite it from rundoc. + Then register a copy of all the plugins. + """ + + import straxen.plugins.raw_records_sv.software_veto as software_veto + raw_records_sv_plugin = getattr(software_veto, veto_name) + st.register(raw_records_sv_plugin) + + import straxen.plugins.raw_records_sv._software_veto_copies as _software_veto_copies + st.register_all(_software_veto_copies) + + return st + + + + +########################### +# __MAIN__ # +########################### + if __name__ == '__main__': + + # to avoid warnings pymongo fork + multiprocessing.set_start_method('spawn') + multiprocess.set_start_method('spawn') + if not args.undying: main() else: @@ -1653,7 +1745,7 @@ if __name__ == '__main__': raise except Exception as fatal_error: log.error(f'Fatal warning:\tran into {fatal_error}. Try ' - f'logging error and restart bootstrax') + f'logging error and restart bootstrax') try: log_warning(f'Fatal warning:\tran into {fatal_error}', priority='error') diff --git a/bin/restrax b/bin/restrax index fb482d077..edb243853 100644 --- a/bin/restrax +++ b/bin/restrax @@ -33,6 +33,7 @@ import typing as ty from straxen import daq_core from memory_profiler import memory_usage import glob +import json from ast import literal_eval from straxen.daq_core import now @@ -57,6 +58,9 @@ def parse_args(): help='Only bother with doing the recompression if there are more than this many chunks') parser.add_argument('--bypass_mode', action='store_true', help='Stop recompression and just rename folders. Use with care!') + parser.add_argument('--test_remove_existing', action='store_true', + help='Test software veto overwriting info from rundoc') + actions = parser.add_mutually_exclusive_group() actions.add_argument('--undying', action='store_true', help="Except any error and ignore it") @@ -129,6 +133,13 @@ class ReStrax(daq_core.DataBases): 'raw_records', 'raw_records_nv', 'raw_records_mv', 'raw_records_he', 'records', 'records_nv', 'records_mv') + # TODO do better here + _raw_record_types = raw_record_types + for t in raw_record_types: + _raw_record_types += (t+'_sv',) + raw_record_types = _raw_record_types + + exclude_modes: ty.Iterable = ( 'pmtgain', 'pmtap', 'exttrig', 'noise', 'nVeto_LASER_calibration', 'mv_diffuserballs', 'mv_fibres', 'mv_darkrate') @@ -184,6 +195,7 @@ class ReStrax(daq_core.DataBases): self.recompress_min_chunks = args.recompress_min_chunks self.bypass_mode = args.bypass_mode self.overwrite_settings() + self.test_remove_existing = args.test_remove_existing def infinite_loop(self, close=False) -> None: """Core of restrax, recompress the data followed by several validation steps""" @@ -234,8 +246,10 @@ class ReStrax(daq_core.DataBases): data_docs = [] for folder in folders: if os.path.exists(os.path.join(self.write_to, folder)): - # Don't do work twice - continue + if not self.test_remove_existing: + # Don't do the work again + continue + if len(split := folder.split('-')) and len(split[0]) == 6: run_id, data_type, lineage = split @@ -243,16 +257,24 @@ class ReStrax(daq_core.DataBases): first_run = run_id if run_id != first_run: continue + self.log.info(f'Do {folder}') + if self.test_remove_existing: + if os.path.exists(os.path.join(self.write_to, folder)): + # We want to test again, we do the work again + self.log.info(f'Data exists, removing {os.path.join(self.write_to, folder)} to do it again') + shutil.rmtree(os.path.join(self.write_to, folder)) + data_docs.append({'host': self.hostname, 'location': os.path.join(self.read_from, folder), 'type': data_type, - 'linage_hash': lineage, + 'meta': {'lineage_hash': lineage}, }) if not len(data_docs): return None run_doc = self.run_coll.find_one({'number': int(first_run)}, projection=projection) run_doc['data'] = data_docs + return run_doc def _find_production_work(self, projection) -> ty.Optional[dict]: @@ -336,6 +358,10 @@ class ReStrax(daq_core.DataBases): def _bypass_for_data_doc(self, data_doc: dict) -> None: source = data_doc['location'] dest = self.renamed_path(source) + if self._check_before_moving(source, dest): + self._move_dir(source, dest) + + def _check_before_moving(self, source, dest) -> bool: if os.path.exists(dest): # Honestly don't know how this could happen, but we have to be carefull here # We are dealing with single copies, so this is a tricky operation. @@ -351,15 +377,18 @@ class ReStrax(daq_core.DataBases): message = f'Trying to move {source}->{dest} but {source} does not exist?!' self.log.error(message) self.log_warning(message) - return - self._move_dir(source, dest) + return False + return True def handle_run(self, run_doc: dict) -> None: """For a given batch of data_docs of a given run, do all the rechunking steps""" self.log.debug('start handle_run') data_docs = self._get_data_docs(run_doc) self.log.info(f'{run_doc["number"]} -> doing {len(data_docs)}') - # self.run_software_veto(run_doc) + + # Software veto + data_docs, original_docs = self.software_veto_docs(data_docs) + # Split the work in files that we will compress and files that will skip compression compress_docs = [ @@ -369,6 +398,7 @@ class ReStrax(daq_core.DataBases): d for d in data_docs if self.should_skip_compression(run_doc, d) ] + self.log.debug(f'Compressing {len(compress_docs)} docs and skipping (i.e. move) {len(skip_docs)} docs') assert len(compress_docs) + len(skip_docs) == len(data_docs), "one and one is three?! " @@ -382,11 +412,19 @@ class ReStrax(daq_core.DataBases): for move_doc in skip_docs: self._bypass_for_data_doc(move_doc) + # Software veto + data_docs = self.software_veto_hack_copies_to_originals(run_doc, data_docs, original_docs) + self.finalize_execute(data_docs) # Only remove the data that we rechunked (there are now two copies), # the moved data is always just a single copy self.remove_old_docs(compress_docs) + + # Software veto + self.remove_old_docs(original_docs) + self._delete_data_from_rundb(run_doc, original_docs) + self.log.info(f'{run_doc["number"]} succes') def _get_data_docs(self, run_doc: dict): @@ -405,9 +443,138 @@ class ReStrax(daq_core.DataBases): data_docs = sorted(data_docs, key=size, reverse=True) return data_docs - def run_software_veto(self, run_doc: dict): - """This is where we can add a software veto for specific runs""" - raise NotImplementedError + + def software_veto_docs(self, data_docs: ty.List[dict]) -> (ty.List[dict], ty.List[dict]): + """Replace the entire docs with only the docs of software veto data""" + + # #TODO put the suffix _sv somewhere common not hardcoded everywhere + def _filter_docs(docs, ts): + return [d for d in docs if d['type'] in ts] + + all_targets = set([d['type'] for d in data_docs]) + + # Software veto is not on (or broken) + if not all(x in all_targets for x in ['raw_records_sv', 'raw_records_aqmon_sv']): + normal_targets = {t for t in all_targets if not t.endswith('_sv')} + return _filter_docs(data_docs, normal_targets), _filter_docs(data_docs, []) + + # Software veto is working + else: + sv_targets = {t+'_sv' for t in all_targets if t+'_sv' in all_targets} + rr_to_pass = {t for t in all_targets if t.startswith('raw_records') and t+'_sv' not in all_targets} + + to_delete = all_targets - sv_targets - rr_to_pass + to_process = all_targets - to_delete + + for t in to_delete: + self.log.debug(f'Detected SV for {t} (or missing SV copy), so I will delete it') + + for t in to_process: + self.log.debug(f'Will process {t}') + + return _filter_docs(data_docs, to_process), _filter_docs(data_docs, to_delete) + + + def software_veto_hack_copies_to_originals(self, run_doc: dict, data_docs: ty.List[dict], original_docs: ty.List[dict]) -> ty.List[dict]: + """ + Here we do difficult software veto stuff. We need to fake that the now recompressed + data is the same lineage as the original data. + We need to update the + """ + new_data_docs = data_docs.copy() + + # TODO style of this function is terrible + for _i, d in enumerate(data_docs): + + if d['type'].endswith('_sv'): + + type_in = d['type'] + type_orig = d['type'].replace('_sv', '') + + # TODO check if they are more than one + d_orig = [d for d in original_docs if d['type'] == type_orig] + assert len(d_orig) == 1, f" two copies of the same data?! {type_orig}" + d_orig = d_orig[0] + + lineage_in = d['meta']['lineage_hash'] + lineage_orig = d_orig['meta']['lineage_hash'] + + dir_in = d['location'] + dir_orig = d_orig['location'] + dir_out = self.renamed_path(dir_in) + + backend = strax.FileSytemBackend() + md_orig = backend.get_metadata(dir_orig) + md_in = backend.get_metadata(dir_in) + md_out = backend.get_metadata(dir_out) + + split = dir_out.split('-') + split[-2] = split[-2].replace('_sv', '') + dir_out_hack = '-'.join(split[:-1]) + '-' + lineage_orig + + if self._check_before_moving(source=dir_out, dest=dir_out_hack): + self.log.debug(f'from {dir_out} to {dir_out_hack}') + os.rename(dir_out, dir_out_hack) + + dir_out = dir_out_hack + + # Keep track of plugins used for veto + md_out['software_veto_lineage'] = md_out['lineage'] + + # Change filenames in chunk info + for i_chunk in range(len(md_out['chunks'])): + chunk_name = md_out['chunks'][i_chunk]['filename'] + data_type, lineage, chunk_number = chunk_name.split('-') + data_type = data_type.replace('_sv', '') + new_chunk_name = '-'.join([data_type, lineage_orig, chunk_number]) + md_out['chunks'][i_chunk]['filename'] = new_chunk_name + os.rename(os.path.join(dir_out, chunk_name), os.path.join(dir_out, new_chunk_name)) + + # Change lineage, kind and type to original raw_records + for key in ['data_kind', 'data_type', 'lineage', 'lineage_hash']: + md_out[key] = md_orig[key] + + # TODO source from FileSaver + # write the new metadata file + json_options = dict(sort_keys=True, indent=4) + new_md_json_name = '-'.join([type_orig, lineage_orig, 'metadata.json']) + with open(os.path.join(dir_out, new_md_json_name), mode='w') as f: + f.write(json.dumps(md_out, **json_options)) + + # remove the old metadata file + old_md_json_name = '-'.join([type_in, lineage_in, 'metadata.json']) + os.remove(os.path.join(dir_out, old_md_json_name)) + + new_data_docs[_i]['location'] = dir_out + + return new_data_docs + + + def _delete_data_from_rundb(self, run_doc, data_docs, reason='Software veto applied'): + """ + Delete data and update the rundoc. Used for software veto. + :param run_doc: rundoc + :param data_docs: docs of data to be deleted + """ + # Remove the data location from the rundoc and append it to the 'deleted_data' entries + + self.log.info('Deleting original data from rundb') + for ddoc in data_docs: + self.log.info(f"Deleting {ddoc['location']}") + for k in ddoc.copy().keys(): + if k in ['location', 'meta', 'protocol']: + ddoc.pop(k) + + ddoc.update({'at': now(), 'by': f'restrax.{self.hostname}', 'reason': reason}) + if self.production: + run_coll.update_one({'_id': run_doc['_id']}, + {"$addToSet": {'deleted_data': ddoc}, + "$pull": {"data": + {"type": run_doc['type'], + "host": {'$in': ['daq', self.hostname]}}}}) + else: + self.log.info(f'Would update ddoc. But we are testing, so no db stuff!') + def rechunk_docs(self, run_doc: dict, data_docs: ty.List[dict]) -> None: """ @@ -530,6 +697,7 @@ class ReStrax(daq_core.DataBases): # no need to recompress data if it's only one chunk self.log.debug(f'Skip {data_type} -> only {n_chunks} chunks') return True + self.log.debug(f'Compress {data_type}') return False def do_checks(self, data_docs: ty.List[dict]) -> None: @@ -672,6 +840,16 @@ class ReStrax(daq_core.DataBases): """ # Maybe could merge this with do checks? -> Avoid opening metadate twice? # Then again, that is SO minor in the grand scheme of things, that I just leave it like this for the moment + + if not self.production: + self.log.debug(f'Will now update rundb (no, just testing)') + for data_doc in data_docs: + dir_in = data_doc.get('location', '') + dir_out = self.renamed_path(dir_in) + storage_backend = strax.FileSytemBackend() + new_metadata = storage_backend.get_metadata(dir_out) + self.log.debug(f'{dir_in} --> {dir_out}') + if not self.production or not len(data_docs): return storage_backend = strax.FileSytemBackend() @@ -713,10 +891,12 @@ class ReStrax(daq_core.DataBases): self.log.info('Rundoc updated') def remove_old_docs(self, done_data_docs: ty.List[dict]): + """Remove old docs, only if in production mode""" for data_doc in done_data_docs: loc = data_doc.get('location', '??') - assert 'pre_processed' in loc - self._remove_dir(loc) + if self.production: + assert 'pre_processed' in loc + self._remove_dir(loc) def take_a_nap(self, dt: ty.Optional[int] = None): time.sleep(dt if dt is not None else self.nap_time) @@ -743,12 +923,14 @@ class ReStrax(daq_core.DataBases): self.log.info(f'Move {source} -> {dest}') if self.production: os.rename(source, dest) + else: + # if we are testing just copy the files + shutil.copytree(source, dest, dirs_exist_ok=True) def _remove_dir(self, directory: str) -> None: """Remove directory (when in production mode)""" self.log.info(f'Remove {directory}') - if self.production: - shutil.rmtree(directory) + shutil.rmtree(directory) def log_warning(self, message: str, **kw) -> None: self.log.warning(message) diff --git a/bin/straxer b/bin/straxer index f796b17fd..cd6bf9943 100644 --- a/bin/straxer +++ b/bin/straxer @@ -11,6 +11,7 @@ import json import importlib import sys +sys.path.insert(0, '/daq_common/carlo/test_software/straxen_software_veto') def parse_args(): parser = argparse.ArgumentParser( @@ -114,10 +115,18 @@ def parse_args(): '--add_folder', default='', help='Also add folder to st.storage') + parser.add_argument( + '--add_input_folder', + default='', + help='Also add folder to st.storage in readonly mode') parser.add_argument( '--print_alive', default=300, help='Print that straxer is still running every this many [seconds]') + parser.add_argument( + '--software_veto', default=False, + help="Class name of veto plugin to apply (i.e.: RadialVeto). It's overwritten by rundoc value.") + return parser.parse_args() @@ -133,6 +142,14 @@ def setup_context(args): logging.info(f'set context kwargs {args.context_kwargs}') st = getattr(context_module, args.context)(**args.context_kwargs) + if args.software_veto is not False: + import straxen.plugins.raw_records_sv.software_veto as software_veto + veto_plugin = getattr(straxen.plugins.raw_records_sv.software_veto, args.software_veto) + st.register(veto_plugin) + + import straxen.plugins.raw_records_sv._software_veto_copies as _software_veto_copies + st.register_all(_software_veto_copies) + if args.config_kwargs: logging.info(f'set context options to {args.config_kwargs}') st.set_config(to_dict_tuple(args.config_kwargs)) @@ -175,6 +192,10 @@ def setup_context(args): st.storage += [ strax.DataDirectory('./strax_data')] + if args.add_input_folder != '': + if os.path.exists(args.add_input_folder): + st.storage += [strax.DataDirectory(args.add_input_folder, readonly=True)] + if args.add_folder != '': for sf in st.storage: # Set all others to read only @@ -214,7 +235,8 @@ def main(args): # Reactivate after https://github.com/XENONnT/straxen/issues/586 logging.info(f'Checking availabilty') - logging.info(f'Available\n{str(st.available_for_run(args.run_id))}') + # Deactivate temporarily for software_veto + # logging.info(f'Available\n{str(st.available_for_run(args.run_id))}') logging.info('Infer start/end') try: diff --git a/straxen/plugins/__init__.py b/straxen/plugins/__init__.py index 5a15d7c18..425fa9413 100644 --- a/straxen/plugins/__init__.py +++ b/straxen/plugins/__init__.py @@ -83,3 +83,9 @@ from . import led_cal from .led_cal import * + + +#Software veto +from .raw_records_sv import * +from . import raw_records_sv + diff --git a/straxen/plugins/aqmon_hits/aqmon_hits.py b/straxen/plugins/aqmon_hits/aqmon_hits.py index 10dd4a116..69c7fa04b 100644 --- a/straxen/plugins/aqmon_hits/aqmon_hits.py +++ b/straxen/plugins/aqmon_hits/aqmon_hits.py @@ -3,7 +3,7 @@ import strax import straxen -from straxen.plugins.raw_records.daqreader import ARTIFICIAL_DEADTIME_CHANNEL +from straxen.plugins.raw_records.daqreader import ARTIFICIAL_DEADTIME_CHANNEL, SOFTWARE_VETO_CHANNEL export, __all__ = strax.exporter() @@ -17,6 +17,7 @@ class AqmonChannels(IntEnum): MV_TRIGGER = 797 GPS_SYNC = 798 ARTIFICIAL_DEADTIME = ARTIFICIAL_DEADTIME_CHANNEL + SOFTWARE_VETO = SOFTWARE_VETO_CHANNEL # Analogue sum waveform SUM_WF = 800 # GPS sync acquisition monitor @@ -63,6 +64,7 @@ class AqmonHits(strax.Plugin): # Fake signals, 0 meaning that we won't find hits using # strax but just look for starts and stops (0, (int(AqmonChannels.ARTIFICIAL_DEADTIME),)), + (0, (int(AqmonChannels.SOFTWARE_VETO),)), ), track=True, help='Minimum hit threshold in ADC*counts above baseline. Specified ' @@ -117,7 +119,10 @@ def find_aqmon_hits_per_channel(self, records): aqmon_thresholds[np.array(channels)] = hit_threshold # Split the artificial deadtime ones and do those separately if there are any - is_artificial = records['channel'] == AqmonChannels.ARTIFICIAL_DEADTIME + # here we also add the software veto because it's treated in the exact same way + is_artificial = (records['channel'] == AqmonChannels.ARTIFICIAL_DEADTIME) + is_artificial |= (records['channel'] == AqmonChannels.SOFTWARE_VETO) + aqmon_hits = strax.find_hits(records[~is_artificial], min_amplitude=aqmon_thresholds) diff --git a/straxen/plugins/events/event_area_per_channel.py b/straxen/plugins/events/event_area_per_channel.py index fd1593127..56c1a9e66 100644 --- a/straxen/plugins/events/event_area_per_channel.py +++ b/straxen/plugins/events/event_area_per_channel.py @@ -22,7 +22,8 @@ class EventAreaPerChannel(strax.LoopPlugin): def infer_dtype(self): # setting data type from peak dtype - pfields_=self.deps['peaks'].dtype_for('peaks').fields + _dtype_for = self.depends_on[1] + pfields_=self.deps[_dtype_for].dtype_for(_dtype_for).fields ## Populating data type infoline = {'s1': 'main S1', 's2': 'main S2', diff --git a/straxen/plugins/events/event_basics.py b/straxen/plugins/events/event_basics.py index 179d104c1..16ee3e7cf 100644 --- a/straxen/plugins/events/event_basics.py +++ b/straxen/plugins/events/event_basics.py @@ -146,7 +146,9 @@ def _set_posrec_save(self): parse x_mlp et cetera if needed to get the algorithms used and set required class attributes """ - posrec_fields = self.deps['peak_positions'].dtype_for('peak_positions').names + _dtype_for = self.depends_on[2] + + posrec_fields = self.deps[_dtype_for].dtype_for(_dtype_for).names posrec_names = [d.split('_')[-1] for d in posrec_fields if 'x_' in d] # Preserve order. "set" is not ordered and dtypes should always be ordered diff --git a/straxen/plugins/events/veto_proximity.py b/straxen/plugins/events/veto_proximity.py index ce194a7a6..5dc5e6aed 100644 --- a/straxen/plugins/events/veto_proximity.py +++ b/straxen/plugins/events/veto_proximity.py @@ -40,7 +40,7 @@ class VetoProximity(strax.OverlapWindowPlugin): 'such that one will never cut events that are < YY ns.' ) - veto_names = ['busy', 'busy_he', 'hev', 'straxen_deadtime'] + veto_names = ['busy', 'busy_he', 'hev', 'straxen_deadtime', 'software'] def infer_dtype(self): dtype = [] @@ -88,16 +88,19 @@ def set_result_for_veto(self, result_buffer[f'time_to_previous_{veto_name}'] = self.time_no_aqmon_veto_found result_buffer[f'time_to_next_{veto_name}'] = self.time_no_aqmon_veto_found + selected_intervals = veto_intervals[veto_intervals['veto_type'] == f'{veto_name}_veto'] if not len(selected_intervals): return vetos_during_event = strax.touching_windows(selected_intervals, event_window) + # Figure out the vetos *during* an event for event_i, veto_window in enumerate(vetos_during_event): if veto_window[1] - veto_window[0]: + vetos_in_window = selected_intervals[veto_window[0]: veto_window[1]].copy() starts = np.clip(vetos_in_window['time'], diff --git a/straxen/plugins/merged_s2s/merged_s2s.py b/straxen/plugins/merged_s2s/merged_s2s.py index 1e35460d8..3df4aff25 100644 --- a/straxen/plugins/merged_s2s/merged_s2s.py +++ b/straxen/plugins/merged_s2s/merged_s2s.py @@ -64,7 +64,8 @@ def setup(self): self.to_pe = self.gain_model def infer_dtype(self): - return strax.unpack_dtype(self.deps['peaklets'].dtype_for('peaklets')) + _dtype_for = self.depends_on[0] + return strax.unpack_dtype(self.deps[_dtype_for].dtype_for(_dtype_for)) def get_window_size(self): return 5 * (int(self.s2_merge_gap_thresholds[0][1]) diff --git a/straxen/plugins/merged_s2s_he/merged_s2s_he.py b/straxen/plugins/merged_s2s_he/merged_s2s_he.py index 99d4b8966..3bc12393a 100644 --- a/straxen/plugins/merged_s2s_he/merged_s2s_he.py +++ b/straxen/plugins/merged_s2s_he/merged_s2s_he.py @@ -25,7 +25,9 @@ def n_tpc_pmts(self): return self.n_he_pmts def infer_dtype(self): - return strax.unpack_dtype(self.deps['peaklets_he'].dtype_for('peaklets_he')) + _dtype_for = self.depends_on[0] # raw_records + + return strax.unpack_dtype(self.deps[_dtype_for].dtype_for(_dtype_for)) def compute(self, peaklets_he): # There are not any lone hits for the high energy channel, diff --git a/straxen/plugins/peaklets/peaklets.py b/straxen/plugins/peaklets/peaklets.py index 639b0e8d6..dd3ff9f33 100644 --- a/straxen/plugins/peaklets/peaklets.py +++ b/straxen/plugins/peaklets/peaklets.py @@ -311,7 +311,7 @@ def compute(self, records, start, end): # Drop the data_top field if n_top_pmts_if_digitize_top <= 0: - peaklets = drop_data_top_field(peaklets, self.dtype_for('peaklets')) + peaklets = drop_data_top_field(peaklets, self.dtype_for(self.provides[0])) return dict(peaklets=peaklets, lone_hits=lone_hits) diff --git a/straxen/plugins/peaks/peaks.py b/straxen/plugins/peaks/peaks.py index e544a701b..a2a084ae7 100644 --- a/straxen/plugins/peaks/peaks.py +++ b/straxen/plugins/peaks/peaks.py @@ -32,7 +32,8 @@ class Peaks(strax.Plugin): "It's now possible for a S1 to be inside a S2 post merging") def infer_dtype(self): - return self.deps['peaklets'].dtype_for('peaklets') + _dtype = self.depends_on[0] # peaklets + return self.deps[_dtype].dtype_for(_dtype) def compute(self, peaklets, merged_s2s): # Remove fake merged S2s from dirty hack, see above diff --git a/straxen/plugins/peaks_he/peaks_he.py b/straxen/plugins/peaks_he/peaks_he.py index 67bf25d44..af9f918f5 100644 --- a/straxen/plugins/peaks_he/peaks_he.py +++ b/straxen/plugins/peaks_he/peaks_he.py @@ -21,7 +21,8 @@ class PeaksHighEnergy(Peaks): child_ends_with = '_he' def infer_dtype(self): - return self.deps['peaklets_he'].dtype_for('peaklets') + _dtype_for = self.depends_on[0] + return self.deps[_dtype_for].dtype_for(_dtype_for) def compute(self, peaklets_he, merged_s2s_he): return super().compute(peaklets_he, merged_s2s_he) diff --git a/straxen/plugins/raw_records/daqreader.py b/straxen/plugins/raw_records/daqreader.py index 3f34e78c7..427b23c68 100644 --- a/straxen/plugins/raw_records/daqreader.py +++ b/straxen/plugins/raw_records/daqreader.py @@ -10,10 +10,12 @@ export, __all__ = strax.exporter() __all__ += ['ARTIFICIAL_DEADTIME_CHANNEL'] +__all__ += ['SOFTWARE_VETO_CHANNEL'] # Just below the TPC acquisition monitor, see # https://xe1t-wiki.lngs.infn.it/doku.php?id=xenon:xenonnt:dsg:daq:channel_groups ARTIFICIAL_DEADTIME_CHANNEL = 799 +SOFTWARE_VETO_CHANNEL = 792 class ArtificialDeadtimeInserted(UserWarning): @@ -188,11 +190,14 @@ def is_ready(self, chunk_i): return False def _load_chunk(self, path, start, end, kind='central'): + + _dtype_for = self.provides[0] # raw_records + records = [ strax.load_file( fn, compressor=self.config["daq_compressor"], - dtype=self.dtype_for('raw_records')) + dtype=self.dtype_for(_dtype_for)) for fn in sorted(glob.glob(f'{path}/*'))] records = np.concatenate(records) records = strax.sort_by_time(records) @@ -269,12 +274,16 @@ def _load_chunk(self, path, start, end, kind='central'): return result, break_time def _artificial_dead_time(self, start, end, dt): + + _dtype_for = self.provides[0] # raw_records + + return strax.dict_to_rec( dict(time=[start], length=[(end - start) // dt], dt=[dt], channel=[ARTIFICIAL_DEADTIME_CHANNEL]), - self.dtype_for('raw_records')) + self.dtype_for(_dtype_for)) def compute(self, chunk_i): dt_central = self.config['daq_chunk_duration'] diff --git a/straxen/plugins/raw_records_coin_nv/nveto_recorder.py b/straxen/plugins/raw_records_coin_nv/nveto_recorder.py index 3bc54e075..27a3c3dbd 100644 --- a/straxen/plugins/raw_records_coin_nv/nveto_recorder.py +++ b/straxen/plugins/raw_records_coin_nv/nveto_recorder.py @@ -90,7 +90,7 @@ def setup(self): def infer_dtype(self): self.record_length = strax.record_length_from_dtype( - self.deps['raw_records_nv'].dtype_for('raw_records_nv')) + self.deps[self.depends_on[0]].dtype_for(self.depends_on[0])) channel_range = self.channel_map['nveto'] n_channel = (channel_range[1] - channel_range[0]) + 1 diff --git a/straxen/plugins/raw_records_sv/__init__.py b/straxen/plugins/raw_records_sv/__init__.py new file mode 100644 index 000000000..c7026a248 --- /dev/null +++ b/straxen/plugins/raw_records_sv/__init__.py @@ -0,0 +1 @@ +# we don't want to import software veto plugins by default \ No newline at end of file diff --git a/straxen/plugins/raw_records_sv/_build_copies.py b/straxen/plugins/raw_records_sv/_build_copies.py new file mode 100644 index 000000000..4ded24a07 --- /dev/null +++ b/straxen/plugins/raw_records_sv/_build_copies.py @@ -0,0 +1,167 @@ +import sys, os +import strax +import straxen +from immutabledict import immutabledict + +_dir = os.path.dirname(os.path.abspath(__file__)) +print(_dir) + +st = straxen.contexts.xenonnt_online(output_folder='') +registry = st._plugin_class_registry.copy().items() + +save_when_replacers = {"'":'', + '<':'', + '>':'', + '{':'', + '}':'', + '0':'', + '1':'', + '2':'', + '3':'', + '4':'', + ': ':'', + 'Save':'=strax.Save', + } + +rechunk_replacers = {"'":'', + '<':'', + '>':'', + '{':'', + '}':'', + ':':'=' + } + + +tofile = """ +from immutabledict import immutabledict +import numba +import numpy as np +import inspect +import strax +import straxen + +""" + + +_plugins = [] + +for name, pl in registry: + if pl == straxen.DAQReader: + pass + elif pl == straxen.Fake1TDAQReader: + pass + elif name.endswith('_sv'): # ext_timings_nv_sv_sv ?? + pass + elif pl in _plugins: + pass + else: + + print(name) + + # I need initialisation to save myself + init_pl = st.get_single_plugin('000000', name) + + provides = [prov.replace('_sv','')+'_sv' for prov in strax.to_str_tuple(pl.provides)] + depends_on = [deps.replace('_sv','')+'_sv' for deps in strax.to_str_tuple(pl.depends_on)] + + if isinstance(init_pl.data_kind, dict): + data_kind = {t.replace('_sv','')+'_sv':init_pl.data_kind[t]+'_sv' for t in init_pl.data_kind} + else: + data_kind = "'"+str(init_pl.data_kind+'_sv')+"'" + + if isinstance(pl.save_when, immutabledict): + save_when = str(immutabledict({t.replace('_sv','')+'_sv':init_pl.save_when[t] for t in pl.save_when})) + for k, v in save_when_replacers.items(): + save_when = save_when.replace(k, v) + save_when = f""" + save_when = {save_when} + """ + else: + save_when = '' + + if isinstance(pl.rechunk_on_save, immutabledict): + rechunk_on_save = str(immutabledict({t.replace('_sv','')+'_sv':init_pl.rechunk_on_save[t] for t in pl.rechunk_on_save})) + for k, v in rechunk_replacers.items(): + rechunk_on_save = rechunk_on_save.replace(k, v) + rechunk_on_save = f""" + rechunk_on_save = {rechunk_on_save} + """ + else: + rechunk_on_save = '' + + if init_pl.multi_output: + dtype = str({prov+'_sv': init_pl.dtype_for(prov) for prov in pl.provides}) + dtype = dtype.replace('dtype(', '').replace(')])', ')]') + else: + dtype = init_pl.dtype_for(name) + + compute_takes_chunk_i = init_pl.compute_takes_chunk_i + compute_takes_start_end = init_pl.compute_takes_start_end + + if init_pl.multi_output: + output = """ + p_mapping = {v: k for k, v in zip(strax.to_str_tuple(self.provides), + strax.to_str_tuple(super().provides))} + return {p_mapping[k]: v for k,v in result.items()} +""" + else: + output = """ + return result +""" + + + + + + classtofile = f""" + +class {pl.__name__}SV(straxen.{pl.__name__}): + depends_on = {depends_on} + provides = {provides} + dtype = {dtype} + data_kind = {data_kind} + {save_when} + {rechunk_on_save} + + def __init__(self): + super().__init__() + self.compute_takes_chunk_i = {compute_takes_chunk_i} + self.compute_takes_start_end = {compute_takes_start_end} + + def infer_dtype(self): + super().infer_dtype() + return self.dtype + + def compute(self, **kwargs): + + _kwargs = {{}} + for k,v in kwargs.items(): + if k not in ['chunk_i', 'end', 'start']: + _kwargs[k.replace('_sv', '')] = v + else: + _kwargs[k] = v + + result = super().compute(**_kwargs) + + {output} + +""" + + if pl == straxen.PulseProcessing: + classtofile += """ + allow_sloppy_chunking = True +""" + + tofile += classtofile + + + + + _plugins.append(pl) + + +with open(os.path.join(_dir, '_software_veto_copies.py'), "w") as text_file: + text_file.write(tofile) + + +print('Finished') \ No newline at end of file diff --git a/straxen/plugins/raw_records_sv/_software_veto_base.py b/straxen/plugins/raw_records_sv/_software_veto_base.py new file mode 100644 index 000000000..633ce8a5f --- /dev/null +++ b/straxen/plugins/raw_records_sv/_software_veto_base.py @@ -0,0 +1,123 @@ +import numpy as np +import strax +import straxen +from immutabledict import immutabledict + +export, __all__ = strax.exporter() + +from straxen.plugins.raw_records.daqreader import SOFTWARE_VETO_CHANNEL + +@export +@strax.takes_config( + # DAQ settings -- should match settings given to redax + strax.Option('record_length', default=110, track=False, type=int, + help="Number of samples per raw_record"), + ) +class RawRecordsSoftwareVetoBase(strax.Plugin): + + """ + Software veto for raw records - yes, we throw them away forever! + + contact: Carlo Fuselli (cfuselli@nikhef.nl) + """ + + __version__ = '0.0.5' + + # if extra dependency on i.e. peaks_proximity is needed, + # redefine the depends_on in the software_vet0.py plugin (see ExamplePeakLevel) + # keeping the order raw_records, raw_records_aqmon, peaks, events + depends_on = ('raw_records', 'raw_records_aqmon', 'event_info') + + provides = ( + 'raw_records_sv', + 'raw_records_aqmon_sv', + ) + + data_kind = immutabledict(zip(provides, provides)) + + rechunk_on_save = immutabledict( + raw_records_sv=False, + raw_records_aqmon_sv=True, + ) + + parallel = 'process' + chunk_target_size_mb = 50 + compressor = 'lz4' + input_timeout = 300 + + software_veto_touching_window = straxen.URLConfig( + default=int(0), infer_type=False, + help='Strax touching window for container and thing (raw_records and events).') + + software_veto_pre_scaling = straxen.URLConfig( + default=int(0), infer_type=False, + help='This sets the pre_scaling factor (keep a fracion of the events we want to delete)' + ' 0 to delete all non-wanted raw_records' + ' 0.5 to keep half of the non-wanted raw_records' + ' 1 the software veto is basically deactivated') + + def infer_dtype(self): + return { + d: strax.raw_record_dtype( + samples_per_record=self.config["record_length"]) + for d in self.provides} + + def software_veto_mask(self, objects): + + return NotImplementedError(""" + This is a base plugin, + please build a plugin with this function""") + + def compute(self, raw_records, raw_records_aqmon, events): + + result = dict() + + # define events of which to delete raw_records + objects_to_delete = events[self.software_veto_mask(events)] + + # apply pre-scaling and update objects to delete + r = np.random.random(len(objects_to_delete)) + pre_scaling_mask = (r>self.software_veto_pre_scaling) + objects_to_delete = objects_to_delete[pre_scaling_mask] + + # get mask of raw_records to delete + veto_mask = self.get_touching_mask(raw_records, objects_to_delete) + + # Result: raw_records to keep + result[self.provides[0]] = raw_records[veto_mask] + + # Result: aqmon to add + result[self.provides[1]] = strax.sort_by_time( + np.concatenate([ + raw_records_aqmon, + self._software_veto_time( + start=objects_to_delete['time'], + end=strax.endtime(objects_to_delete), + dt=10 # TODO TODO TODO TODO bad + )])) + + return result + + def get_touching_mask(self, things, containers): + + # things = raw_records + # containers = i.e. events + + # start with keep everything + mask = np.full(len(things), True) + + # throw away things inside every container + for i0, i1 in strax.touching_windows(things, containers, window=self.software_veto_touching_window): + mask[i0:i1] = False + + # return only the things outside the containers + return mask + + def _software_veto_time(self, start, end, dt): + + return strax.dict_to_rec( + dict(time=start, + length=(end - start) // dt, + dt=np.repeat(dt, len(start)), + channel=np.repeat(SOFTWARE_VETO_CHANNEL, len(start))), + self.dtype_for(self.provides[0])) diff --git a/straxen/plugins/raw_records_sv/_software_veto_copies.py b/straxen/plugins/raw_records_sv/_software_veto_copies.py new file mode 100644 index 000000000..c1a7c50a6 --- /dev/null +++ b/straxen/plugins/raw_records_sv/_software_veto_copies.py @@ -0,0 +1,2099 @@ + +from immutabledict import immutabledict +import numba +import numpy as np +import inspect +import strax +import straxen + + + +class AqmonHitsSV(straxen.AqmonHits): + depends_on = ['raw_records_aqmon_sv'] + provides = ['aqmon_hits_sv'] + dtype = [(('Start time since unix epoch [ns]', 'time'), ' 0 + + return m + +@export +class RadialVeto(RawRecordsSoftwareVetoBase): + """ + Radial sofrtare veto + Deletes raw records of events outside certain r + """ + + __version__ = 'radial-veto-0.0.1' + + def software_veto_mask(self, e): + + m = (e['x']**2 + e['y']**2) > 50**2 + + return m + +@export +class HighEnergyVeto(RawRecordsSoftwareVetoBase): + """ + High energy sofrtare veto + Deletes raw records for events with high s1 and s2 area + """ + + __version__ = 'high-energy-veto-0.0.1' + + def software_veto_mask(self, e): + + m = (e['s1_area'] > 1e4) & (e['s2_area'] > 1e5) + + return m + +@export +class ExamplePeakLevel(RawRecordsSoftwareVetoBase): + """ + Example veto on peak level, needs to specify veto_mask_on + """ + + __version__ = 'example-peak-level-0.0.2' + depends_on = ('raw_records', 'raw_records_aqmon', 'peak_basics') + + def compute(self, raw_records, raw_records_aqmon, peaks): + # base class written to work on events, but we just care about the time intervals + return super().compute(raw_records, raw_records_aqmon, events=peaks) + + def software_veto_mask(self, p): + + m = (p['type'] == 2) & (p['area'] > 100000) + + return m \ No newline at end of file diff --git a/straxen/plugins/records/records.py b/straxen/plugins/records/records.py index b1359421d..815f6c4db 100644 --- a/straxen/plugins/records/records.py +++ b/straxen/plugins/records/records.py @@ -121,7 +121,7 @@ class PulseProcessing(strax.Plugin): def infer_dtype(self): # Get record_length from the plugin making raw_records self.record_length = strax.record_length_from_dtype( - self.deps['raw_records'].dtype_for('raw_records')) + self.deps[self.depends_on[0]].dtype_for(self.depends_on[0])) dtype = dict() for p in self.provides: diff --git a/straxen/plugins/records_nv/records_nv.py b/straxen/plugins/records_nv/records_nv.py index 8676acba0..629a69f9c 100644 --- a/straxen/plugins/records_nv/records_nv.py +++ b/straxen/plugins/records_nv/records_nv.py @@ -53,7 +53,7 @@ def setup(self): def infer_dtype(self): record_length = strax.record_length_from_dtype( - self.deps['raw_records_coin_nv'].dtype_for('raw_records_coin_nv')) + self.deps[self.depends_on[0]].dtype_for(self.depends_on[0])) dtype = strax.record_dtype(record_length) return dtype diff --git a/straxen/plugins/veto_intervals/veto_intervals.py b/straxen/plugins/veto_intervals/veto_intervals.py index f8827d456..b812ef1da 100644 --- a/straxen/plugins/veto_intervals/veto_intervals.py +++ b/straxen/plugins/veto_intervals/veto_intervals.py @@ -29,8 +29,10 @@ class VetoIntervals(strax.OverlapWindowPlugin): - hev_* <= DDC10 hardware high energy veto - straxen_deadtime <= special case of deadtime introduced by the DAQReader-plugin + - software_veto <= raw_records are removed by software veto at event level + """ - __version__ = '1.1.1' + __version__ = '1.1.2' depends_on = 'aqmon_hits' provides = 'veto_intervals' data_kind = 'veto_intervals' @@ -90,8 +92,8 @@ def compute(self, aqmon_hits, start, end): # Straxen deadtime is special, it's a start and stop with no data # but already an interval so easily used here - artificial_deadtime = aqmon_hits[(aqmon_hits['channel'] == - AqmonChannels.ARTIFICIAL_DEADTIME)] + is_artificial = (aqmon_hits['channel'] == AqmonChannels.ARTIFICIAL_DEADTIME) + artificial_deadtime = aqmon_hits[is_artificial] n_artificial = len(artificial_deadtime) if n_artificial: @@ -100,6 +102,18 @@ def compute(self, aqmon_hits, start, end): result[vetos_seen:n_artificial]['veto_type'] = 'straxen_deadtime_veto' vetos_seen += n_artificial + # Software veto is also special, same as artificial deadtime + # so with no data but just time interval. Treated in the same way + is_software_veto = (aqmon_hits['channel'] == AqmonChannels.SOFTWARE_VETO) + software_veto = aqmon_hits[is_software_veto] + n_software_veto = len(software_veto) + + if n_software_veto: + result[vetos_seen:vetos_seen+n_software_veto]['time'] = software_veto['time'] + result[vetos_seen:vetos_seen+n_software_veto]['endtime'] = strax.endtime(software_veto) + result[vetos_seen:vetos_seen+n_software_veto]['veto_type'] = 'software_veto' + vetos_seen += n_software_veto + result = result[:vetos_seen] result['veto_interval'] = result['endtime'] - result['time'] sort = np.argsort(result['time'])