diff --git a/docs/components/pulse-patterns.md b/docs/components/pulse-patterns.md index b40ce18a..af7d04bf 100644 --- a/docs/components/pulse-patterns.md +++ b/docs/components/pulse-patterns.md @@ -6,4 +6,6 @@ ::: extra.components.MachinePulses +::: extra.components.XtdfPulses + ::: extra.components.DldPulses diff --git a/src/extra/components/__init__.py b/src/extra/components/__init__.py index 0989e485..7839ab4a 100644 --- a/src/extra/components/__init__.py +++ b/src/extra/components/__init__.py @@ -1,7 +1,8 @@ from .scantool import Scantool # noqa from .pulses import XrayPulses, OpticalLaserPulses, MachinePulses, \ - PumpProbePulses, DldPulses # noqa + PumpProbePulses, XtdfPulses, AgipdPulses, LpdPulses, DsscPulses, \ + DldPulses # noqa from .scan import Scan # noqa from .xgm import XGM # noqa from .dld import DelayLineDetector # noqa diff --git a/src/extra/components/pulses.py b/src/extra/components/pulses.py index 0bacb958..65c87e8a 100644 --- a/src/extra/components/pulses.py +++ b/src/extra/components/pulses.py @@ -14,7 +14,7 @@ from euxfel_bunch_pattern import is_sase, is_laser, \ PPL_BITS, DESTINATION_TLD, DESTINATION_T4D, DESTINATION_T5D, \ PHOTON_LINE_DEFLECTION -from extra_data import SourceData, KeyData, by_id +from extra_data import SourceData, KeyData, SourceNameError, by_id from .utils import identify_sase, _instrument_to_sase @@ -1508,6 +1508,250 @@ def pulse_mask(self, labelled=True, field=None): raise ValueError(f"{field=!r} parameter was not 'fel'/'ppl'/None") +class XtdfPulses(PulsePattern): + """An interface to pulses from XTDF sources. + + The custom fast MHz detectors used at European XFEL send their data + via the XFEL Train Data Format (XTDF), which contains information + about the pulse pattern recorded by these detectors. + + This component exposes this information via the pulse pattern + interface. As XTDF detectors generally enumerate pulses + independently, it also allows to correlate its pulse ID with those + obtained from other global sources, e.g. the machine information + from [XrayPulses][extra.components.Xraypulses]. + + Args: + detector (SourceData): Instrument source of an XTDF source. + ppt_anchor (int or PulsePattern, optional): Constant offset to + add to XTDF pulse IDs, or PulsePattern-based component to + correlate every train individually. No correlation is done + by default. + first_lit_cell (int, optional): Which cell coincides with the + first pulse from passed ppt_anchor, ignored unless a + PulsePattern component is passed. + include_cells (bool, optional): Whether to include cell IDs + in the pulse index generated by this component, True by + default. + """ + + def __init__(self, xtdf_source, ppt_anchor=None, first_lit_cell=0, + include_cells=True): + super().__init__(xtdf_source, xtdf_source['image.pulseId']) + + self._ppt_anchor = ppt_anchor + self._first_lit_cell = first_lit_cell + self._cell_key = xtdf_source['image.cellId'] if include_cells else None + + def __repr__(self): + return "<{} using {}>".format(type(self).__name__, self._source.source) + + def _get_train_ids(self): + return self._source.train_ids + + def _get_pulse_ids(self): + pulse_ids = self._key.ndarray().astype(np.int32).ravel() + + if isinstance(self._ppt_anchor, int): + pulse_ids += self._ppt_anchor + + elif isinstance(self._ppt_anchor, PulsePattern): + xtdf_tids = (x := self._key.data_counts()).index[x > 0] + first, last = self._key.train_index_bounds(False) + + # TODO: Must be aligned! + for i, (train_id, anchor_pids) in enumerate( + self._ppt_anchor.pulse_ids().loc[xtdf_tids].groupby('trainId') + ): + pulse_ids[first[i]:last[i]] += anchor_pids.iloc[0] - pulse_ids[ + first[i] + self._first_lit_cell] + + elif self._ppt_anchor is not None: + raise TypeError(type(self._ppt_anchor)) + + index_levels = { + 'trainId': self._key.train_id_coordinates(), + 'pulseIndex': np.concatenate([ + np.arange(count, dtype=np.int32) for count + in self._key.data_counts(labelled=False)])} + + if self._cell_key is not None: + index_levels['cellId'] = self._cell_key.ndarray().ravel() + + import pandas as pd + index = pd.MultiIndex.from_arrays( + list(index_levels.values()), names=list(index_levels.keys())) + + return pd.Series(data=pulse_ids, index=index, dtype=np.int32) + + +class LitFrameFinderPulses(PulsePattern): + """An interface to pulses from LitFrameFinder. + + Args: + data (extra.data.DataCollection): Data to access LitFrameFinder + data from. + source (str, optional): Source name of a LitFrameFinder device, + only needed if the data includes more than one such device + or none could not be detected automatically. + include_cells (bool, optional): Whether to include cell IDs from + actual detector sources in pulse data, False by default. May + cause less trains to be returned if detector sources are + missing data. + """ + + _lff_class = 'AgipdLitFrameFinder' + _lff_pipeline_re = re.compile(r'^\w{3}\_\w+\_\w+\/REDU\/LITFRM:output$') + + + def __init__(self, data, source=None, include_cells=False): + if source is None: + source = self._find_lff_source(data) + elif ':' not in source: + # Append pipeline if control source was passed. + source += ':output' + + lff_sd = data[source] + + self._cells_key = None + + if include_cells: + if (s := source.removesuffix(':output')) in data.control_sources: + det_pattern = re.compile( + data[s].run_value('detectorSourcePattern')) + + for det_source in data.instrument_sources: + if det_pattern.match(det_source): + # TODO: Support JF burst mode? + self._cells_key = data[det_source, 'image.cellId'] + break + + if self._cells_key is None: + raise ValueError('detector source required in data to ' + 'include memory cells') + + lff_tids = (x := lff_sd.data_counts()).index[x > 0] + cells_tids = (x := self._cells_key.data_counts()).index[x > 0] + + if (dt := np.setdiff1d(lff_tids, cells_tids)).any(): + # Cells data is missing trains present in LFF data, + # remove those trains from LFF. + lff_sd = lff_sd[by_id[np.setdiff1d(lff_tids, dt)]] + + if (dt := np.setdiff1d(cells_tids, lff_tids)).any(): + # LFF data is missing trains present in cells data, + # remove those from cells data. + self._cells_key = self._cells_key[by_id[ + np.setdiff1d(cells_tids, dt)]] + + super().__init__(lff_sd) + + def __repr__(self): + cells_str = f' and {self._cells_key.source}' \ + if self._cells_key is not None else '' + return "<{} using {}{}>".format(type(self).__name__, + self._source.source, cells_str) + + @classmethod + def _find_lff_source(cls, data): + """Try to find a LitFrameFinder source.""" + + # Try to go by device class first. + candidates = { + f'{source}:output' + for source in data.control_sources + if (data[source].device_class == cls._lff_class and + f'{source}:output' in data.instrument_sources) + } + + if len(candidates) > 1: + raise ValueError('multiple LFF sources found via device ' + 'class, please pass one explicitly:\n' + + ', '.join(sorted(candidates))) + elif candidates: + return candidates.pop() + + # Next check for explicit instrument data. + for source in data.instrument_sources: + m = cls._lff_pipline_re.match(source) + if m is not None: + candidates.add(m[0]) + + if len(candidates) > 1: + raise ValueError('multiple LFF instrument sources found, ' + 'please pass one explicitly:\n' + ', '.join( + sorted(candidates))) + elif candidates: + return candidates.pop() + + raise ValueError('no LFF found, please pass one explicitly') + + def _get_train_ids(self): + return self._source.train_ids + + def _get_pulse_ids(self): + pids_by_train = [] + indices_by_train = [] + lit_frames_by_train = [] + + counts = self._source['data.nFrame'].ndarray() + + for real_pulse_ids, det_pulse_ids, pulses_per_frame, num_frames in zip( + self._source['data.masterPulseId'].ndarray(), + self._source['data.detectorPulseId'].ndarray(), + self._source['data.nPulsePerFrame'].ndarray(), + counts + ): + first_lit_frame = pulses_per_frame.argmax() + ppt_offset = real_pulse_ids[0] - det_pulse_ids[first_lit_frame] + + pids_by_train.append(det_pulse_ids[:num_frames] + ppt_offset) + indices_by_train.append(np.arange(num_frames)) + lit_frames_by_train.append(pulses_per_frame[:num_frames] > 0) + + import pandas as pd + + if not pids_by_train: + # Immediately return an empty series if there is no data. + return pd.Series([], dtype=np.int32) + + index_levels = { + 'trainId': np.repeat(self._source.train_id_coordinates(), counts), + 'pulseIndex': np.concatenate(indices_by_train), + } + + if self._cells_key is not None: + index_levels['cellId'] = self._cells_key.ndarray()[:, 0] + + index_levels['lit'] = np.concatenate(lit_frames_by_train) + + index = pd.MultiIndex.from_arrays( + list(index_levels.values()), names=list(index_levels.keys())) + + return pd.Series(data=np.concatenate(pids_by_train), + index=index, dtype=np.int32) + + def select_trains(self, train_sel): + res = super().select_trains(train_sel) + + if self._cells_key is not None: + res._cells_key = self._cells_key[train_sel] + + return res + + +class AgipdPulses: + pass + + +class LpdPulses: + pass + + +class DsscPulses: + pass + + class DldPulses(PulsePattern): """An interface to pulses from DLD reconstruction.