Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Introduce REDUCTION and ERRATA file sections #473

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions extra_data/file_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ def __init__(self, filename, _cache_info=None):
self.train_ids = _cache_info['train_ids']
self.control_sources = _cache_info['control_sources']
self.instrument_sources = _cache_info['instrument_sources']
self.reduction_data = _cache_info['reduction_data']
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should either not be cached at all or use conditional access like validity_flag further below. Since it's only auxiliary information and may often be comparably small, it may be fine to always generate it on demand. If this changes in the future, we can add machinery to augment existing indices, or simply re-create them.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I'll remove from caching.

self.errata = _cache_info['errata']
self.validity_flag = _cache_info.get('flag', None)
else:
try:
Expand All @@ -155,7 +157,8 @@ def __init__(self, filename, _cache_info=None):

self.train_ids = tid_data[tid_data != 0]

self.control_sources, self.instrument_sources = self._read_data_sources()
(self.control_sources, self.instrument_sources,
self.reduction_data, self.errata) = self._read_data_sources()

self.validity_flag = None

Expand Down Expand Up @@ -295,6 +298,7 @@ def format_version(self):

def _read_data_sources(self):
control_sources, instrument_sources = set(), set()
reduction_info, errata = set(), set()

# The list of data sources moved in file format 1.0
if self.format_version == '0.5':
Expand All @@ -320,14 +324,19 @@ def _read_data_sources(self):
# TODO: Do something with groups?
elif category == 'CONTROL':
control_sources.add(h5_source)
elif category == 'REDUCTION':
reduction_info.add(h5_source)
elif category == 'ERRATA':
errata.add(h5_source)
elif category == 'Karabo_TimerServer':
# Ignore virtual data source used only in file format
# version 1.1 / pclayer-1.10.3-2.10.5.
pass
else:
raise ValueError("Unknown data category %r" % category)
Copy link
Contributor

@philsmt philsmt Dec 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this bit is probably the big sore point for this proposal - pretty much all prior versions of EXtra-data (or even karabo_data for that matter) will break with these new files.

Since this only affects the METADATA/dataSources entry but not the actual existance of the root groups, one way around this would be not list these sources here and either:

  1. Not list them at all and instead discover them by iterating over the root group
  2. List them in a distinct dataset only for these virtual metadata sources
  3. List both the real and virtual sources in a new dataset while preserving the old one for compatibility

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I don't see any way around it though, except if we add below METADATA after all. Generally one should possibly make this less restrictive in my view, as it could hit us again in the future.


return frozenset(control_sources), frozenset(instrument_sources)
return (frozenset(control_sources), frozenset(instrument_sources),
frozenset(reduction_info), frozenset(errata))

def _guess_valid_trains(self):
# File format version 1.0 includes a flag which is 0 if a train ID
Expand Down Expand Up @@ -418,6 +427,10 @@ def index_groups(self, source):
return {''}
elif source in self.instrument_sources:
return set(self.file[f'/INDEX/{source}'].keys())
elif source in self.reduction_data:
return set(self.file[f'/INDEX/{source}'].keys())
elif source in self.errata:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand from our example that REDUCTION sources are always prepended by some category (e.g. REDUCTION/PULSE_SLICING/<XTDF-source>/...). How would ERRATA sources look like? All the reader machinery assumes that source names are unique across all root section.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's nothing concrete implemented in the DAQ, but some form of source identification is required. Think of a train that came outside the buffer range, and couldn't be store. You'd want to know what the data was, so it would be prefixed by the source in the path.

Also technically all this additionally data is FAST data for the DAQ - i.e. it will be stored only if a train contains it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Certainly, my question was indeed more concrete in how you would pick the source name. Say SA3_XTD10_XGM/XGM/DOOCS:output has a hiccup and sends a late train, under what source name (i.e. the patch below ERRATA) would you save this? It must be different from SA3_XTD10_XGM/XGM/DOOCS:output by at least a prefix or suffix.

Furthermore I had a look into your example file for data reduction. There are INDEX entries for

  • PCLAYER_CI/DAQ/DET_DATA_TEST_1:xtdf
  • PCLAYER_CI/LFF/DATA_TEST_1:reductionOutput
  • PULSE_REDUCTION/PCLAYER_CI/DAQ/DET_DATA_TEST_1:xtdf

with a correspondig entry in INSTRUMENT for the first and in REDUCTION for the latter two. Is PCLAYER_CI/LFF/DATA_TEST_1 here a real device used as input for the data aggregator, or also some virtual source used for metadata tracking? Ideally sources in REDUCTION or ERRATA would never be identical to actual sources potentially occuring in CONTROL or INSTRUMENT.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So in the reduction case, I prefix all source names with PULSE_REDUCTION to make them distinct from the "real" sources, one could do the same for ERRATA, e.g. ERRATA/LATE_TRAINS/source/name/and/data/path/.

Hence, for what you observe in the index section:

  • PCLAYER_CI/DAQ/DET_DATA_TEST_1:xtdf is the real source
  • PCLAYER_CI/LFF/DATA_TEST_1:reductionOutput is the real source
  • PULSE_REDUCTION/PCLAYER_CI/DAQ/DET_DATA_TEST_1:xtdf is injected by the pulse dim reducer, i.e. is an artificial source.

return set(self.file[f'/INDEX/{source}'].keys())
else:
raise SourceNameError(source)

Expand Down Expand Up @@ -454,6 +467,10 @@ def get_keys(self, source):
group = '/CONTROL/' + source
elif source in self.instrument_sources:
group = '/INSTRUMENT/' + source
elif source in self.reduction_data:
group = '/REDUCTION/' + source
elif source in self.errata:
group = '/ERRATA/' + source
else:
raise SourceNameError(source)

Expand All @@ -478,6 +495,10 @@ def get_one_key(self, source):
group = '/CONTROL/' + source
elif source in self.instrument_sources:
group = '/INSTRUMENT/' + source
elif source in self.reduction_data:
group = '/REDUCTION/' + source
elif source in self.errata:
group = '/ERRATA/' + source
else:
raise SourceNameError(source)

Expand Down Expand Up @@ -527,6 +548,10 @@ def has_source_key(self, source, key):
path = '/CONTROL/{}/{}'.format(source, key.replace('.', '/'))
elif source in self.instrument_sources:
path = '/INSTRUMENT/{}/{}'.format(source, key.replace('.', '/'))
elif source in self.reduction_data:
path = '/REDUCTION/{}/{}'.format(source, key.replace('.', '/'))
elif source in self.errata:
path = '/ERRATA/{}/{}'.format(source, key.replace('.', '/'))
else:
raise SourceNameError(source)

Expand Down
49 changes: 31 additions & 18 deletions extra_data/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ def __init__(
files_by_sources[source, 'CONTROL'].append(f)
for source in f.instrument_sources:
files_by_sources[source, 'INSTRUMENT'].append(f)
for source in f.reduction_data:
files_by_sources[source, 'REDUCTION'].append(f)
for source in f.errata:
files_by_sources[source, 'ERRATA'].append(f)
sources_data = {
src: SourceData(src,
sel_keys=None,
Expand All @@ -127,6 +131,14 @@ def __init__(
name for (name, sd) in self._sources_data.items()
if sd.section == 'INSTRUMENT'
})
self.reduction_data = frozenset({
name for (name, sd) in self._sources_data.items()
if sd.section == 'REDUCTION'
})
self.errata = frozenset({
name for (name, sd) in self._sources_data.items()
if sd.section == 'ERRATA'
})

@staticmethod
def _open_file(path, cache_info=None):
Expand Down Expand Up @@ -401,26 +413,27 @@ def train_from_id(
path = '/CONTROL/{}/{}'.format(source, key.replace('.', '/'))
source_data[key] = file.file[path][first]

for source in self.instrument_sources:
source_data = res[source] = {
'metadata': {'source': source, 'timestamp.tid': train_id}
}
file, pos = self._find_data(source, train_id)
if file is None:
continue

for key in self.keys_for_source(source):
group = key.partition('.')[0]
firsts, counts = file.get_index(source, group)
first, count = firsts[pos], counts[pos]
if not count:
for prefix, category in (("INSTRUMENT", self.instrument_sources), ("REDUCTION", self.reduction_data), ("ERRATA", self.errata)):
for source in category:
source_data = res[source] = {
'metadata': {'source': source, 'timestamp.tid': train_id}
}
file, pos = self._find_data(source, train_id)
if file is None:
continue

path = '/INSTRUMENT/{}/{}'.format(source, key.replace('.', '/'))
if count == 1 and not keep_dims:
source_data[key] = file.file[path][first]
else:
source_data[key] = file.file[path][first : first + count]
for key in self.keys_for_source(source):
group = key.partition('.')[0]
firsts, counts = file.get_index(source, group)
first, count = firsts[pos], counts[pos]
if not count:
continue

path = '/{}/{}/{}'.format(prefix, source, key.replace('.', '/'))
if count == 1 and not keep_dims:
source_data[key] = file.file[path][first]
else:
source_data[key] = file.file[path][first : first + count]

if flat_keys:
# {src: {key: data}} -> {(src, key): data}
Expand Down
6 changes: 5 additions & 1 deletion extra_data/run_files_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ def get(self, path):
res = {
'train_ids': np.array(d['train_ids'], dtype=np.uint64),
'control_sources': frozenset(d['control_sources']),
'instrument_sources': frozenset(d['instrument_sources'])
'instrument_sources': frozenset(d['instrument_sources']),
'reduction_data': frozenset(d.get('reduction_data', set())),
'errata': frozenset(d.get('errata', set()))
}
# Older cache files don't contain info on 'suspect' trains.
if 'suspect_train_indices' in d:
Expand Down Expand Up @@ -182,6 +184,8 @@ def save(self, files):
'train_ids': [int(t) for t in file_access.train_ids],
'control_sources': sorted(file_access.control_sources),
'instrument_sources': sorted(file_access.instrument_sources),
'reduction_data': sorted(file_access.reduction_data),
'errata': sorted(file_access.errata),
'suspect_train_indices': [
int(i) for i in (~file_access.validity_flag).nonzero()[0]
],
Expand Down
Loading