diff --git a/docs/components/index.md b/docs/components/index.md index d726bbe2..bec657b0 100644 --- a/docs/components/index.md +++ b/docs/components/index.md @@ -3,10 +3,12 @@ This module contains classes that abstract various Karabo devices to make access easier. -- [Scans](scans.md) - - [Scantool][extra.components.Scantool] - - [Scan][extra.components.Scan] + - [Pulse patterns](pulse-patterns.md) - [XrayPulses][extra.components.XrayPulses] - [OpticalLaserPulses][extra.components.OpticalLaserPulses] - [DldPulses][extra.components.DldPulses] +- [Pulse Picker Unit](pulse-picker-unit.md) +- [Scans](scans.md) + - [Scantool][extra.components.Scantool] + - [Scan][extra.components.Scan] diff --git a/docs/components/pulse-picker-unit.md b/docs/components/pulse-picker-unit.md new file mode 100644 index 00000000..f978dcc6 --- /dev/null +++ b/docs/components/pulse-picker-unit.md @@ -0,0 +1 @@ +::: extra.components.PPU diff --git a/mkdocs.yml b/mkdocs.yml index 666c58e4..72d02285 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -22,8 +22,6 @@ nav: - detector-geometry.md - Components: - components/index.md - - components/scans.md - - components/pulse-patterns.md - karabo-bridge.md - utilities.md - changelog.md diff --git a/src/extra/components/__init__.py b/src/extra/components/__init__.py index eea3caf1..845c394b 100644 --- a/src/extra/components/__init__.py +++ b/src/extra/components/__init__.py @@ -1,4 +1,5 @@ from .scantool import Scantool # noqa +from .ppu import PPU from .pulses import XrayPulses, OpticalLaserPulses, DldPulses # noqa from .scan import Scan diff --git a/src/extra/components/ppu.py b/src/extra/components/ppu.py new file mode 100644 index 00000000..3e78baf8 --- /dev/null +++ b/src/extra/components/ppu.py @@ -0,0 +1,183 @@ +import logging +from functools import lru_cache +from typing import List, Union + +import numpy as np +import pandas as pd +from extra_data import by_id +from extra_data.keydata import KeyData +from extra_data.reader import DataCollection +from extra_data.sourcedata import SourceData + +log = logging.getLogger(__name__) + + +def _find_ppu(run: DataCollection, device: str = None): + """Helper function to find a PPU device.""" + + # fast path, we don't validate if the type or name match + if isinstance(device, SourceData): + return device + elif isinstance(device, KeyData): + return run[device.source] + elif isinstance(device, str): + if device in run.control_sources: + return run[device] + elif device in run.alias: + return _find_ppu(run, run.alias[device]) + # else search substring for match + elif device is not None: + raise KeyError(f"ppu must be a SourceData or str, not {type(device).__name__}") + + # Then we list all PPU device in the run + available_ppus = [ + source + for source in run.control_sources + if run[source].device_class in PPU._DEVICE_CLASSES + ] + if len(available_ppus) == 0: + available_ppus = [s for s in run.control_sources if "MDL/PPU" in s] + available_ppus += [s for s in run.control_sources if "MDL/DIPOLE_PPU" in s] + + if len(available_ppus) == 0: + raise KeyError("Could not find a PPU device in this data") + elif len(available_ppus) == 1: + return run[available_ppus[0]] + else: # len(available_ppus) > 1 + if device: + # And unique substrings of available PPU + matches = [name for name in available_ppus if device.upper() in name] + if len(matches) == 1: + return run[matches[0]] + elif len(matches) == 0: + raise KeyError( + f"Couldn't identify a PPU from '{device}'; please pass a valid device name, alias, or unique substring" + ) + else: + raise KeyError( + f"Multiple PPUs found matching '{device}', please be more specific: {matches}" + ) + raise KeyError(f"Multiple PPU devices found in that data: {available_ppus}") + + +class PPU: + """Interface to a Pulse Picker Unit (PPU). + + Despite its name, the PPU selects a bunch train from within the 10Hz + structure and block the remainder of the beam. + + Technical description: + A motor-driven absorber rotor is rotated into the beam axis in order to + block the XFEL beam when triggered. The rotor is contained within a UHV + chamber. In terms of temporal structure, the beam pipe is blocked by an + absorbing rotor for up to 9/10ths of a second or vice versa, + synchronized to the facility clock/trigger. + """ + + _DEVICE_CLASSES = [ + "PulsePickerTrainTrigger", # PPU + "PulsePickerTrainTriggerCopy", + "StandardTrigger", # DIPOLE PPU + ] + + def __init__( + self, data: DataCollection, ppu: Union[KeyData, SourceData, str] = None + ): + """ + + Args: + data (DataCollection): + ppu (Union[KeyData, SourceData, str], optional): + Specify a Pulse Picker Unit device to use, necessary if a run + contains more than one PPU. This can be any of: + - The device name of the source. + - A `SourceData` or [KeyData][extra_data.KeyData] of the + control source (e.g. `HED_XTD6_PPU/MDL/PPU_TRIGGER`) of a + PPU. + - The alias name of either a `SourceData` or + [KeyData][extra_data.KeyData] belonging to a PPU. + - A unique (case-insensitive) substring of a PPU source name. + + Raises: + KeyError: If we can't identify a unique PPU device from the + arguments. + """ + self.data = data + self.device = _find_ppu(data, ppu) + + @lru_cache() + def number_of_trains(self, train_id: int) -> int: + """Number of trains picked for the sequence starting at train_id. + + Args: + train_id (int): train ID of the sequence start. + """ + + # The Dipole PPU-like device does not allow to pick multiple trains + if "trainTrigger.numberOfTrains" not in self.device.keys(): + return 1 + n_trains = self.device["trainTrigger.numberOfTrains"] + return int(n_trains.select_trains(by_id[[train_id]]).ndarray()[0]) + + def train_ids( + self, offset: int = 0, labelled: bool = False + ) -> Union[List[int], pd.Series]: + """All train IDs picked by the PPU. + + Args: + offset (int, optional): + offset to add to the selected trains. Defaults to 0. + labelled (bool, optional): + Returns a Pandas Series if set to True, where this index represents the + trigger sequence a train ID is part of. Defaults to False. + + Returns: + Union[List[int], pd.Series]: Train IDs picked by the PPU. + """ + seq_start = self.device["trainTrigger.sequenceStart"].ndarray() + # The trains picked are the unique values of trainTrigger.sequenceStart + # minus the first (previous trigger before this run). + start_train_ids = np.unique(seq_start)[1:] + offset + + train_ids = [] + sequences = [] + for seq, train_id in enumerate(start_train_ids): + span = self.number_of_trains(train_id) + train_ids.extend(np.arange(train_id, train_id + span).tolist()) + sequences.extend([seq] * span) + + log.info( + f"PPU device {self.device.source} triggered for {len(train_ids)} train(s) across {len(sequences)} sequence(s)." + ) + + if labelled: + train_ids = pd.Series(train_ids, index=sequences) + return train_ids + + def trains( + self, + data: Union[DataCollection, SourceData, KeyData] = None, + *, + split_sequence: bool = False, + offset: int = 0, + ) -> Union[DataCollection, List[DataCollection]]: + """Returns a subset of the data only with Trains selected by the PPU. + + Args: + data: Data to filter. If set to None (defaut) use the data used at initialization. + split_sequence (bool, optional): Split data per PPU trigger sequence. Defaults to False. + offset (int, optional): offset to apply to train IDs to be selected. Defaults to 0. + + Returns: + Union[DataCollection, List[DataCollection]]: + DataCollection(s) containing only trains triggered by the PPU + """ + data = data or self.data + + train_ids = self.train_ids(labelled=True, offset=offset) + if split_sequence: + return [ + data.select_trains(by_id[seq.values]) + for _, seq in train_ids.groupby(train_ids.index) + ] + return data.select_trains(by_id[train_ids.values]) diff --git a/tests/conftest.py b/tests/conftest.py index dafb4b6c..7df10cde 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,14 +2,26 @@ from pathlib import Path from tempfile import TemporaryDirectory +import h5py +import numpy as np import pytest - from extra_data import RunDirectory from extra_data.tests.mockdata import write_file -from extra_data.tests.mockdata.xgm import XGM +from extra_data.tests.mockdata.base import DeviceBase from extra_data.tests.mockdata.motor import Motor +from extra_data.tests.mockdata.xgm import XGM + +from .mockdata.timeserver import PulsePatternDecoder, Timeserver -from .mockdata.timeserver import Timeserver, PulsePatternDecoder + +class PPU(DeviceBase): + control_keys = [ + ('trainTrigger.numberOfTrains', 'i4', ()), + ('trainTrigger.sequenceStart', 'i4', ()), + ] + extra_run_values = [ + ('classId', None, 'PulsePickerTrainTrigger'), + ] @pytest.fixture(scope='session') @@ -24,3 +36,27 @@ def mock_spb_aux_run(): with TemporaryDirectory() as td: write_file(Path(td) / 'RAW-R0001-DA01-S00000.h5', sources, 100) yield RunDirectory(td) + + +@pytest.fixture(scope='session') +def ppu_run(): + sources = [ + PPU('HED_XTD6_PPU/MDL/PPU_TRIGGER'), + PPU('HED_DIPOLE_PPU/MDL/PPU_TRIGGER'), + Timeserver('HED_RR_SYS/TSYS/TIMESERVER'), + ] + + with TemporaryDirectory() as td: + fpath = Path(td) / 'RAW-R0001-DA01-S00000.h5' + write_file(fpath, sources, 100, firsttrain=10000, format_version='1.3') + + with h5py.File(fpath, 'r+') as f: + f['/CONTROL/HED_XTD6_PPU/MDL/PPU_TRIGGER/trainTrigger/numberOfTrains'] = np.array([10] * 100, dtype=np.int64) + f['/CONTROL/HED_XTD6_PPU/MDL/PPU_TRIGGER/trainTrigger/sequenceStart'] = np.repeat([9000, 10080], 50) + f['/CONTROL/HED_DIPOLE_PPU/MDL/PPU_TRIGGER/trainTrigger/numberOfTrains'] = np.array([1] * 100, dtype=np.int64) + f['/CONTROL/HED_DIPOLE_PPU/MDL/PPU_TRIGGER/trainTrigger/sequenceStart'] = np.repeat([9985, 10015, 10045, 10075], 25) + + aliases = {'ppu-hed': 'HED_XTD6_PPU/MDL/PPU_TRIGGER', + 'ppu-dipole': 'HED_DIPOLE_PPU/MDL/PPU_TRIGGER'} + run = RunDirectory(td) + yield run.with_aliases(aliases) diff --git a/tests/test_components_ppu.py b/tests/test_components_ppu.py new file mode 100644 index 00000000..056c636d --- /dev/null +++ b/tests/test_components_ppu.py @@ -0,0 +1,86 @@ +import pandas as pd +import pytest + +from extra_data.reader import DataCollection +from extra.components import PPU +from extra.components.ppu import _find_ppu + + +def test_find_ppu(ppu_run): + source = _find_ppu(ppu_run, ppu_run['HED_DIPOLE_PPU/MDL/PPU_TRIGGER']) + assert source.source == 'HED_DIPOLE_PPU/MDL/PPU_TRIGGER' + + source = _find_ppu(ppu_run, ppu_run['HED_DIPOLE_PPU/MDL/PPU_TRIGGER', 'trainTrigger.sequenceStart']) + assert source.source == 'HED_DIPOLE_PPU/MDL/PPU_TRIGGER' + + source = _find_ppu(ppu_run, 'HED_DIPOLE_PPU/MDL/PPU_TRIGGER') + assert source.source == 'HED_DIPOLE_PPU/MDL/PPU_TRIGGER' + + source = _find_ppu(ppu_run, 'ppu-hed') + assert source.source == 'HED_XTD6_PPU/MDL/PPU_TRIGGER' + + source = _find_ppu(ppu_run, 'XTD6') + assert source.source == 'HED_XTD6_PPU/MDL/PPU_TRIGGER' + + source = _find_ppu(ppu_run.select('HED_XTD6_PPU*')) + assert source.source == 'HED_XTD6_PPU/MDL/PPU_TRIGGER' + + # fails with multiple PPUs + with pytest.raises(KeyError) as excinfo: + _find_ppu(ppu_run) + assert 'Multiple PPU' in str(excinfo.value) + + # fails with invalid device type + with pytest.raises(KeyError) as excinfo: + _find_ppu(ppu_run, 1) + assert 'not int' in str(excinfo.value) + + # fails with 0 PPUs + with pytest.raises(KeyError) as excinfo: + _find_ppu(ppu_run.select('*TIMESERVER')) + assert 'Could not find a PPU' in str(excinfo.value) + + # too many match + with pytest.raises(KeyError) as excinfo: + _find_ppu(ppu_run, 'PPU') + assert 'Multiple PPUs found matching' in str(excinfo.value) + + # no match + with pytest.raises(KeyError) as excinfo: + _find_ppu(ppu_run, 'PPU2') + assert 'Couldn\'t identify a PPU' in str(excinfo.value) + + +def test_train_ids(ppu_run): + # single trigger sequence + ppu = PPU(ppu_run, 'ppu-hed') + train_ids = ppu.train_ids() + assert isinstance(train_ids, list) + assert len(train_ids) == 10 + train_ids = ppu.train_ids(labelled=True) + assert isinstance(train_ids, pd.Series) + assert train_ids.size == 10 # 10 trains in total + assert train_ids.index.unique().size == 1 # single trigger sequence + + # multiple trigger sequences + ppu = PPU(ppu_run, 'ppu-dipole') + train_ids = ppu.train_ids() + assert isinstance(train_ids, list) + assert len(train_ids) == 3 + train_ids = ppu.train_ids(labelled=True) + assert isinstance(train_ids, pd.Series) + assert train_ids.index.unique().size == 3 # 3 trigger sequence + assert train_ids.size == 3 # 1 train per sequence + + +def test_trains(ppu_run): + ppu = PPU(ppu_run, 'ppu-dipole') + reduced_run = ppu.trains() + assert isinstance(reduced_run, DataCollection) + assert reduced_run.train_ids == [10015, 10045, 10075] + + # split per sequence + reduced_run = ppu.trains(split_sequence=True) + assert isinstance(reduced_run, list) + assert len(reduced_run) == 3 + assert reduced_run[0].train_ids == [10015]