-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ROIFilter #173
base: main
Are you sure you want to change the base?
Add ROIFilter #173
Changes from all commits
0a380f0
7575790
b2152ce
4ee1073
a2e6350
2036552
1452406
ea98f31
eca8cf8
721ed0f
a83cfcd
0e038c8
5061193
b4f248b
e9473d8
f21e22b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,8 @@ | |
flatten dimensions of the data. | ||
""" | ||
|
||
from __future__ import annotations | ||
|
||
from collections.abc import Callable, Sequence | ||
from dataclasses import dataclass, field | ||
from math import ceil | ||
|
@@ -38,6 +40,8 @@ | |
) | ||
from ess.reduce.nexus.workflow import GenericNeXusWorkflow | ||
|
||
from . import roi | ||
|
||
CalibratedPositionWithNoisyReplicas = NewType( | ||
'CalibratedPositionWithNoisyReplicas', sc.Variable | ||
) | ||
|
@@ -73,10 +77,14 @@ def __init__( | |
self._coords = coords | ||
self._edges = edges | ||
|
||
@property | ||
def replicas(self) -> int: | ||
return self._replicas | ||
|
||
@staticmethod | ||
def from_coords( | ||
coords: ProjectedCoords, resolution: DetectorViewResolution | ||
) -> 'Histogrammer': | ||
) -> Histogrammer: | ||
""" | ||
Create a histogrammer from coordinates and resolution. | ||
|
||
|
@@ -102,7 +110,20 @@ def from_coords( | |
def __call__(self, da: sc.DataArray) -> sc.DataArray: | ||
self._current += 1 | ||
coords = self._coords[self._replica_dim, self._current % self._replicas] | ||
return sc.DataArray(da.data, coords=coords).hist(self._edges) | ||
# If input is multi-dim we need to flatten since those dims cannot be preserved. | ||
return sc.DataArray(da.data, coords=coords).flatten(to='_').hist(self._edges) | ||
|
||
def input_indices(self) -> sc.DataArray: | ||
"""Return an array with input indices corresponding to each histogram bin.""" | ||
dim = 'detector_number' | ||
# For some projections one of the coords is a scalar, convert to flat table. | ||
coords = self._coords.broadcast(sizes=self._coords.sizes).flatten(to=dim).copy() | ||
ndet = sc.index(coords.sizes[dim] // self._replicas) | ||
da = sc.DataArray( | ||
sc.arange(dim, coords.sizes[dim], dtype='int64', unit=None) % ndet, | ||
coords=coords, | ||
) | ||
return sc.DataArray(da.bin(self._edges).bins.data, coords=self._edges) | ||
|
||
|
||
@dataclass | ||
|
@@ -149,20 +170,27 @@ def __init__(self, detector_number: sc.Variable): | |
sc.zeros(sizes=detector_number.sizes, unit='counts', dtype='int32'), | ||
coords={'detector_number': detector_number}, | ||
) | ||
self._detector_number = detector_number | ||
self._flat_detector_number = detector_number.flatten(to='event_id') | ||
self._start = int(self._flat_detector_number[0].value) | ||
self._stop = int(self._flat_detector_number[-1].value) | ||
self._size = int(self._flat_detector_number.size) | ||
if not sc.issorted(self._flat_detector_number, dim='event_id'): | ||
raise ValueError("Detector numbers must be sorted.") | ||
if self._stop - self._start + 1 != self._size: | ||
raise ValueError("Detector numbers must be consecutive.") | ||
self._sorted = sc.issorted(self._flat_detector_number, dim='event_id') | ||
self._consecutive = self._stop - self._start + 1 == self._size | ||
|
||
@property | ||
def detector_number(self) -> sc.Variable: | ||
return self._detector_number | ||
|
||
@property | ||
def data(self) -> sc.DataArray: | ||
return self._data | ||
|
||
def bincount(self, data: Sequence[int]) -> sc.DataArray: | ||
if not self._sorted: | ||
raise ValueError("Detector numbers must be sorted to use `bincount`.") | ||
if not self._consecutive: | ||
raise ValueError("Detector numbers must be consecutive to use `bincount`.") | ||
offset = np.asarray(data, dtype=np.int32) - self._start | ||
# Ignore events with detector numbers outside the range of the detector. This | ||
# should not happen in valid files but for now it is useful until we are sure | ||
|
@@ -215,8 +243,16 @@ def __init__( | |
self._current = 0 | ||
self._history: sc.DataArray | None = None | ||
self._cache: sc.DataArray | None = None | ||
self.clear_counts() | ||
|
||
counts = self.bincount([]) | ||
def clear_counts(self) -> None: | ||
""" | ||
Clear counts. | ||
|
||
Overrides Detector.clear_counts, to properly clear sliding window history and | ||
cache. | ||
""" | ||
counts = sc.zeros_like(self.data) | ||
if self._projection is not None: | ||
counts = self._projection(counts) | ||
self._history = ( | ||
|
@@ -226,12 +262,25 @@ def __init__( | |
) | ||
self._cache = self._history.sum('window') | ||
|
||
def make_roi_filter(self) -> roi.ROIFilter: | ||
"""Return a ROI filter operating via the projection plane of the view.""" | ||
norm = 1.0 | ||
if isinstance(self._projection, Histogrammer): | ||
indices = self._projection.input_indices() | ||
norm = self._projection.replicas | ||
else: | ||
indices = sc.ones(sizes=self.data.sizes, dtype='int32', unit=None) | ||
indices = sc.cumsum(indices, mode='exclusive') | ||
if isinstance(self._projection, LogicalView): | ||
indices = self._projection(indices) | ||
return roi.ROIFilter(indices=indices, norm=norm) | ||
|
||
@staticmethod | ||
def from_detector_and_histogrammer( | ||
detector: CalibratedDetector[SampleRun], | ||
window: RollingDetectorViewWindow, | ||
projection: Histogrammer, | ||
) -> 'RollingDetectorView': | ||
) -> RollingDetectorView: | ||
"""Helper for constructing via a Sciline workflow.""" | ||
return RollingDetectorView( | ||
detector_number=detector.coords['detector_number'], | ||
|
@@ -244,7 +293,7 @@ def from_detector_and_logical_view( | |
detector: CalibratedDetector[SampleRun], | ||
window: RollingDetectorViewWindow, | ||
projection: LogicalView, | ||
) -> 'RollingDetectorView': | ||
) -> RollingDetectorView: | ||
"""Helper for constructing via a Sciline workflow.""" | ||
return RollingDetectorView( | ||
detector_number=detector.coords['detector_number'], | ||
|
@@ -261,7 +310,7 @@ def from_nexus( | |
projection: Literal['xy_plane', 'cylinder_mantle_z'] | LogicalView, | ||
resolution: dict[str, int] | None = None, | ||
pixel_noise: Literal['cylindrical'] | sc.Variable | None = None, | ||
) -> 'RollingDetectorView': | ||
) -> RollingDetectorView: | ||
""" | ||
Create a rolling detector view from a NeXus file using GenericNeXusWorkflow. | ||
|
||
|
@@ -343,8 +392,32 @@ def get(self, window: int | None = None) -> sc.DataArray: | |
data += self._history['window', 0 : self._current].sum('window') | ||
return data | ||
|
||
def add_events(self, data: sc.DataArray) -> None: | ||
""" | ||
Add counts in the form of events grouped by pixel ID. | ||
|
||
Parameters | ||
---------- | ||
data: | ||
Events grouped by pixel ID, given by binned data. | ||
""" | ||
counts = data.bins.size().to(dtype='int32', copy=False) | ||
counts.unit = 'counts' | ||
self._add_counts(counts) | ||
|
||
def add_counts(self, data: Sequence[int]) -> None: | ||
""" | ||
Add counts in the form of a sequence of pixel IDs. | ||
|
||
Parameters | ||
---------- | ||
data: | ||
List of pixel IDs. | ||
""" | ||
counts = self.bincount(data) | ||
self._add_counts(counts) | ||
|
||
def _add_counts(self, counts: sc.Variable) -> None: | ||
if self._projection is not None: | ||
counts = self._projection(counts) | ||
self._cache -= self._history['window', self._current] | ||
|
@@ -455,9 +528,12 @@ def position_noise_for_cylindrical_pixel( | |
|
||
|
||
def gaussian_position_noise(sigma: PositionNoiseSigma) -> PositionNoise: | ||
sigma = sigma.to(unit='m', copy=False) | ||
size = _noise_size | ||
position = sc.empty(sizes={'position': size}, unit='m', dtype=sc.DType.vector3) | ||
position.values = np.random.default_rng().normal(0, sigma.value, size=(size, 3)) | ||
position.values = np.random.default_rng(seed=1234).normal( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this to turn the centers of the pixel positions into small gaussian clouds of points? But maybe using a normal distribution is better as it makes the clouds spherical, and they then look the same from any line-of-sight? Using uniform also suggests that the pixels are square in shape, but this is not really true for tubes... Is the sigma approximately equal to half the width of the pixel? |
||
0, sigma.value, size=(size, 3) | ||
) | ||
return PositionNoise(position) | ||
|
||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
# SPDX-License-Identifier: BSD-3-Clause | ||
# Copyright (c) 2025 Scipp contributors (https://github.com/scipp) | ||
"""Utilities for region of interest (ROI) selection.""" | ||
|
||
from __future__ import annotations | ||
|
||
from typing import TypeVar | ||
|
||
import numpy as np | ||
import scipp as sc | ||
|
||
|
||
def select_indices_in_intervals( | ||
intervals: sc.DataGroup[tuple[int, int] | tuple[sc.Variable, sc.Variable]], | ||
indices: sc.Variable | sc.DataArray, | ||
) -> sc.Variable: | ||
""" | ||
Return subset of indices that fall within the intervals. | ||
|
||
Parameters | ||
---------- | ||
intervals: | ||
DataGroup with dimension names as keys and tuples of low and high values. This | ||
can be used to define a band or a rectangle to selected. When low and high are | ||
scipp.Variable, the selection is done using label-based indexing. In this case | ||
`indices` must be a DataArray with corresponding coordinates. | ||
indices: | ||
Variable or DataArray with indices to select from. If binned data the selected | ||
indices will be returned concatenated into a dense array. | ||
""" | ||
out_dim = 'index' | ||
for dim, (low, high) in intervals.items(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At some point, we may be hooking this up to a graphical interface where a user will draw a rectangle by hand. If they started from the left and dragged towards the right, you get We could end up in a situation where some of the tuples contain 2 values but the second one is lower than the first. The slicing below would then select nothing. I recommend we add something like |
||
indices = indices[dim, low:high] | ||
indices = indices.flatten(to=out_dim) | ||
if indices.bins is None: | ||
return indices | ||
indices = indices.bins.concat().value | ||
return indices.rename_dims({indices.dim: out_dim}) | ||
|
||
|
||
T = TypeVar('T', sc.DataArray, sc.Variable) | ||
|
||
|
||
def apply_selection( | ||
data: T, *, selection: sc.Variable, norm: float = 1.0 | ||
) -> tuple[T, sc.Variable]: | ||
""" | ||
Apply selection to data. | ||
|
||
Parameters | ||
---------- | ||
data: | ||
Data to filter. | ||
selection: | ||
Variable with indices to select. | ||
norm: | ||
Normalization factor to apply to the selected data. This is used for cases where | ||
indices may be selected multiple times. | ||
|
||
Returns | ||
------- | ||
: | ||
Filtered data and scale factor. | ||
""" | ||
indices, counts = np.unique(selection.values, return_counts=True) | ||
if data.ndim != 1: | ||
data = data.flatten(to='detector_number') | ||
scale = sc.array(dims=[data.dim], values=counts) / norm | ||
return data[indices], scale | ||
|
||
|
||
class ROIFilter: | ||
"""Filter for selecting a region of interest (ROI).""" | ||
|
||
def __init__(self, indices: sc.Variable | sc.DataArray, norm: float = 1.0) -> None: | ||
""" | ||
Create a new ROI filter. | ||
|
||
Parameters | ||
---------- | ||
indices: | ||
Variable with indices to filter. The indices facilitate selecting a 2-D | ||
ROI in a projection of a 3-D dataset. Typically the indices are given by a | ||
2-D array. Each element in the array may correspond to a single index (when | ||
there is no projection) or a list of indices that were projected into an | ||
output pixel. | ||
""" | ||
self._indices = indices | ||
self._selection = sc.array(dims=['index'], values=[]) | ||
self._norm = norm | ||
|
||
def set_roi_from_intervals(self, intervals: sc.DataGroup) -> None: | ||
"""Set the ROI from (typically 1 or 2) intervals.""" | ||
self._selection = select_indices_in_intervals(intervals, self._indices) | ||
|
||
def apply(self, data: T) -> tuple[T, sc.Variable]: | ||
""" | ||
Apply the ROI filter to data. | ||
|
||
The returned scale factor can be used to handle filtering via a projection, to | ||
take into account that fractions of source data point contribute to a data point | ||
in the projection. | ||
|
||
Parameters | ||
---------- | ||
data: | ||
Data to filter. | ||
|
||
Returns | ||
------- | ||
: | ||
Filtered data and scale factor. | ||
""" | ||
return apply_selection(data, selection=self._selection, norm=self._norm) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will not be used by Beamlime anymore, keeping it for backwards compatibility.