Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Explicit merge / accumulation strategies and customization points #97

Closed
SimonHeybrock opened this issue Feb 21, 2024 · 0 comments · Fixed by #135
Closed

Explicit merge / accumulation strategies and customization points #97

SimonHeybrock opened this issue Feb 21, 2024 · 0 comments · Fixed by #135
Assignees

Comments

@SimonHeybrock
Copy link
Member

SimonHeybrock commented Feb 21, 2024

Overview

Over the past months it has become very cleat that workflow need to be able to merge data in various ways:

It is important to realize that these are recurrent requirements, spanning file-based and stream-based workflows, and this is a generic problem that will affect all techniques/instruments.

At the same time, the merge / accumulate operations are not always performed, and it is not clear in advance at which point in a workflow a scientist will want/need to merge. We also have to ensure that merging is easy or almost transparent for scientists, without requiring in-depth knowledge, or the ability to modify workflows in a complicated manner.

I therefore propose to add customization points at which merges can be performed into workflows. I thing typically there may be a handful of such points, e.g., (1) concat right after loading, (2) concat/sum before normalization, (3) average after normalization.

  • By default no-op accumulation providers (taking only a single input, not a sciline.Series) would be part of the workflow.
  • The package would provide a set of commonly used accumulators suitable for insertion at these points. These would, e.g., take sciline.Series as input, and require replacing a param by a param table.
  • Multiple accumulation points may be used simultaneously, e.g., to merge banks before normalization and average over runs after normalization. In this case there would be two parameter tables.

Open points

My hope is that by making this explicit in the workflow, we can use that functionality for the streaming case, without having to write complicated or custom just for the sake of live-reduction. The details require some more thought.

References

Related: #49, scipp/esspolarization#19, scipp/beamlime#131

Example

This is not complete, but I hope it helps illustrating the point:

import sciline
from typing import NewType

Bank = NewType('Bank', str)
BankId = NewType('BankId', str)
Run = NewType('Run', int)
RunId = NewType('RunId', int)
RawData = NewType('RawData', int)
RawMonitor = NewType('RawMonitor', int)
Counts = NewType('Counts', int)
AccumulatedCounts = NewType('AccumulatedCounts', int)
Norm = NewType('Norm', float)
Normalized = NewType('Normalized', float)
AverageNormalized = NewType('AverageNormalized', float)

def load(run: RunId, bank: BankId) -> RawData:
    return RawData(42)

def load_monitor(run: RunId) -> RawMonitor:
    return RawMonitor(42)

def counts(data: RawData) -> Counts:
    return Counts(data)

def no_accum_counts(counts: Counts) -> AccumulatedCounts:
    return AccumulatedCounts(counts)

def accum_counts(counts: sciline.Series[Run, Counts]) -> AccumulatedCounts:
    return AccumulatedCounts(sum(counts.values()))

def norm_term(mon: RawMonitor) -> Norm:
    return Norm(mon)

def normalize(counts: AccumulatedCounts, norm: Norm) -> Normalized:
    return Normalized(counts / norm)

def no_average_normalized(normalized: Normalized) -> AverageNormalized:
    return AverageNormalized(normalized)

def average_normalized(normalized: sciline.Series[Run, Normalized]) -> AverageNormalized:
    return AverageNormalized(sum(normalized.values()) / len(normalized))

providers = (load, load_monitor, counts, norm_term, normalize)
pipeline = sciline.Pipeline(providers)
runs = sciline.ParamTable(Run, {RunId: range(3)})
banks = sciline.ParamTable(Bank, {BankId: ['A', 'B']})
pl = pipeline.copy()

pipeline[RunId] = 12345
pipeline[BankId] = 'A'
pipeline.insert(no_accum_counts)
pipeline.insert(no_average_normalized)
display(pipeline.visualize(AverageNormalized, graph_attr={'rankdir': 'LR'}))

pipeline.set_param_table(runs)
pipeline.insert(accum_counts)
display(pipeline.visualize(AverageNormalized, graph_attr={'rankdir': 'LR'}))

pipeline.insert(no_accum_counts)
pipeline.insert(average_normalized)
display(pipeline.visualize(AverageNormalized, graph_attr={'rankdir': 'LR'}))

def accum_counts_from_banks(banks: sciline.Series[Bank, Counts]) -> AccumulatedCounts:
    return AccumulatedCounts(sum(banks.values()))

pipeline.set_param_table(banks)
pipeline.insert(accum_counts_from_banks)
pipeline.insert(average_normalized)
display(pipeline.visualize(AverageNormalized, graph_attr={'rankdir': 'LR'}, compact=False))

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Archived in project
Development

Successfully merging a pull request may close this issue.

1 participant