Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/components/pulse-patterns.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@

::: extra.components.MachinePulses

::: extra.components.XtdfPulses

::: extra.components.DldPulses
3 changes: 2 additions & 1 deletion src/extra/components/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@

from .scantool import Scantool # noqa
from .pulses import XrayPulses, OpticalLaserPulses, MachinePulses, \
PumpProbePulses, DldPulses # noqa
PumpProbePulses, XtdfPulses, AgipdPulses, LpdPulses, DsscPulses, \
DldPulses # noqa
from .scan import Scan # noqa
from .xgm import XGM # noqa
from .dld import DelayLineDetector # noqa
Expand Down
246 changes: 245 additions & 1 deletion src/extra/components/pulses.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from euxfel_bunch_pattern import is_sase, is_laser, \
PPL_BITS, DESTINATION_TLD, DESTINATION_T4D, DESTINATION_T5D, \
PHOTON_LINE_DEFLECTION
from extra_data import SourceData, KeyData, by_id
from extra_data import SourceData, KeyData, SourceNameError, by_id

from .utils import identify_sase, _instrument_to_sase

Expand Down Expand Up @@ -1508,6 +1508,250 @@ def pulse_mask(self, labelled=True, field=None):
raise ValueError(f"{field=!r} parameter was not 'fel'/'ppl'/None")


class XtdfPulses(PulsePattern):
"""An interface to pulses from XTDF sources.

The custom fast MHz detectors used at European XFEL send their data
via the XFEL Train Data Format (XTDF), which contains information
about the pulse pattern recorded by these detectors.

This component exposes this information via the pulse pattern
interface. As XTDF detectors generally enumerate pulses
independently, it also allows to correlate its pulse ID with those
obtained from other global sources, e.g. the machine information
from [XrayPulses][extra.components.Xraypulses].

Args:
detector (SourceData): Instrument source of an XTDF source.
ppt_anchor (int or PulsePattern, optional): Constant offset to
add to XTDF pulse IDs, or PulsePattern-based component to
correlate every train individually. No correlation is done
by default.
first_lit_cell (int, optional): Which cell coincides with the
first pulse from passed ppt_anchor, ignored unless a
PulsePattern component is passed.
include_cells (bool, optional): Whether to include cell IDs
in the pulse index generated by this component, True by
default.
"""

def __init__(self, xtdf_source, ppt_anchor=None, first_lit_cell=0,
include_cells=True):
super().__init__(xtdf_source, xtdf_source['image.pulseId'])

self._ppt_anchor = ppt_anchor
self._first_lit_cell = first_lit_cell
self._cell_key = xtdf_source['image.cellId'] if include_cells else None

def __repr__(self):
return "<{} using {}>".format(type(self).__name__, self._source.source)

def _get_train_ids(self):
return self._source.train_ids

def _get_pulse_ids(self):
pulse_ids = self._key.ndarray().astype(np.int32).ravel()

if isinstance(self._ppt_anchor, int):
pulse_ids += self._ppt_anchor

elif isinstance(self._ppt_anchor, PulsePattern):
xtdf_tids = (x := self._key.data_counts()).index[x > 0]
first, last = self._key.train_index_bounds(False)

# TODO: Must be aligned!
for i, (train_id, anchor_pids) in enumerate(
self._ppt_anchor.pulse_ids().loc[xtdf_tids].groupby('trainId')
):
pulse_ids[first[i]:last[i]] += anchor_pids.iloc[0] - pulse_ids[
first[i] + self._first_lit_cell]

elif self._ppt_anchor is not None:
raise TypeError(type(self._ppt_anchor))

index_levels = {
'trainId': self._key.train_id_coordinates(),
'pulseIndex': np.concatenate([
np.arange(count, dtype=np.int32) for count
in self._key.data_counts(labelled=False)])}

if self._cell_key is not None:
index_levels['cellId'] = self._cell_key.ndarray().ravel()

import pandas as pd
index = pd.MultiIndex.from_arrays(
list(index_levels.values()), names=list(index_levels.keys()))

return pd.Series(data=pulse_ids, index=index, dtype=np.int32)


class LitFrameFinderPulses(PulsePattern):
"""An interface to pulses from LitFrameFinder.

Args:
data (extra.data.DataCollection): Data to access LitFrameFinder
data from.
source (str, optional): Source name of a LitFrameFinder device,
only needed if the data includes more than one such device
or none could not be detected automatically.
include_cells (bool, optional): Whether to include cell IDs from
actual detector sources in pulse data, False by default. May
cause less trains to be returned if detector sources are
missing data.
"""

_lff_class = 'AgipdLitFrameFinder'
_lff_pipeline_re = re.compile(r'^\w{3}\_\w+\_\w+\/REDU\/LITFRM:output$')


def __init__(self, data, source=None, include_cells=False):
if source is None:
source = self._find_lff_source(data)
elif ':' not in source:
# Append pipeline if control source was passed.
source += ':output'

lff_sd = data[source]

self._cells_key = None

if include_cells:
if (s := source.removesuffix(':output')) in data.control_sources:
det_pattern = re.compile(
data[s].run_value('detectorSourcePattern'))

for det_source in data.instrument_sources:
if det_pattern.match(det_source):
# TODO: Support JF burst mode?
self._cells_key = data[det_source, 'image.cellId']
break

if self._cells_key is None:
raise ValueError('detector source required in data to '
'include memory cells')

lff_tids = (x := lff_sd.data_counts()).index[x > 0]
cells_tids = (x := self._cells_key.data_counts()).index[x > 0]

if (dt := np.setdiff1d(lff_tids, cells_tids)).any():
# Cells data is missing trains present in LFF data,
# remove those trains from LFF.
lff_sd = lff_sd[by_id[np.setdiff1d(lff_tids, dt)]]

if (dt := np.setdiff1d(cells_tids, lff_tids)).any():
# LFF data is missing trains present in cells data,
# remove those from cells data.
self._cells_key = self._cells_key[by_id[
np.setdiff1d(cells_tids, dt)]]

super().__init__(lff_sd)

def __repr__(self):
cells_str = f' and {self._cells_key.source}' \
if self._cells_key is not None else ''
return "<{} using {}{}>".format(type(self).__name__,
self._source.source, cells_str)

@classmethod
def _find_lff_source(cls, data):
"""Try to find a LitFrameFinder source."""

# Try to go by device class first.
candidates = {
f'{source}:output'
for source in data.control_sources
if (data[source].device_class == cls._lff_class and
f'{source}:output' in data.instrument_sources)
}

if len(candidates) > 1:
raise ValueError('multiple LFF sources found via device '
'class, please pass one explicitly:\n' +
', '.join(sorted(candidates)))
elif candidates:
return candidates.pop()

# Next check for explicit instrument data.
for source in data.instrument_sources:
m = cls._lff_pipline_re.match(source)
if m is not None:
candidates.add(m[0])

if len(candidates) > 1:
raise ValueError('multiple LFF instrument sources found, '
'please pass one explicitly:\n' + ', '.join(
sorted(candidates)))
elif candidates:
return candidates.pop()

raise ValueError('no LFF found, please pass one explicitly')

def _get_train_ids(self):
return self._source.train_ids

def _get_pulse_ids(self):
pids_by_train = []
indices_by_train = []
lit_frames_by_train = []

counts = self._source['data.nFrame'].ndarray()

for real_pulse_ids, det_pulse_ids, pulses_per_frame, num_frames in zip(
self._source['data.masterPulseId'].ndarray(),
self._source['data.detectorPulseId'].ndarray(),
self._source['data.nPulsePerFrame'].ndarray(),
counts
):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shell we also add XGM energy in every frame: data.energyPerFrame and data.energySigma?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer one would rather use the XGM component and combine the data. Of course it will have to be made PulsePattern aware for this to work...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, there is one rare case... There was at least one experiment, when multiple pulses were exposed in single JF frame...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, that mostly tells me that JF is a weird case we should exclude for now. I don't expect multiple pulses per frame for an XTDF detector?

Copy link
Member

@egorsobolev egorsobolev Dec 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potentially it is possible... If the detector is able to expose longer then span between pulses, then it is possible. Not for AGIPD v1, it always works at 4MHz

first_lit_frame = pulses_per_frame.argmax()
ppt_offset = real_pulse_ids[0] - det_pulse_ids[first_lit_frame]

pids_by_train.append(det_pulse_ids[:num_frames] + ppt_offset)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the detector exposes pulses in the dark, then pulse ids refer the slots in the BunchPatternTable with no x-ray pulse. Is this intended?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.
This is definitely somewhat of a debatable point, but I'd consider it more useful to expose the entire pulse pattern (which is actually a frame pattern) seen by the detector. If you really want to get the frames exposed to X-rays, you can do p.pulse_ids().xs(True, level='lit') or p.pulse_ids().loc[:, :, True].

There are other components doing this already, e.g. PumpProbePattern showing PPL-only pulses in places where there is no FEL pulse or MaschinePulses exposing whatever is in the bunch pattern table.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: current implementation of LFF for JF just enumerate frames in detector pulse id.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Figured as much.
The cell ID mechanism also doesn't support JF burst mode yet, I have to double-check how it exposes memory cells. Not sure it's very useful anyway.

indices_by_train.append(np.arange(num_frames))
lit_frames_by_train.append(pulses_per_frame[:num_frames] > 0)

import pandas as pd

if not pids_by_train:
# Immediately return an empty series if there is no data.
return pd.Series([], dtype=np.int32)

index_levels = {
'trainId': np.repeat(self._source.train_id_coordinates(), counts),
'pulseIndex': np.concatenate(indices_by_train),
}

if self._cells_key is not None:
index_levels['cellId'] = self._cells_key.ndarray()[:, 0]

index_levels['lit'] = np.concatenate(lit_frames_by_train)

index = pd.MultiIndex.from_arrays(
list(index_levels.values()), names=list(index_levels.keys()))

return pd.Series(data=np.concatenate(pids_by_train),
index=index, dtype=np.int32)

def select_trains(self, train_sel):
res = super().select_trains(train_sel)

if self._cells_key is not None:
res._cells_key = self._cells_key[train_sel]

return res


class AgipdPulses:
pass


class LpdPulses:
pass


class DsscPulses:
pass


class DldPulses(PulsePattern):
"""An interface to pulses from DLD reconstruction.

Expand Down
Loading