Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ROIFilter #173

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 87 additions & 11 deletions src/ess/reduce/live/raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
flatten dimensions of the data.
"""

from __future__ import annotations

from collections.abc import Callable, Sequence
from dataclasses import dataclass, field
from math import ceil
Expand All @@ -38,6 +40,8 @@
)
from ess.reduce.nexus.workflow import GenericNeXusWorkflow

from . import roi

CalibratedPositionWithNoisyReplicas = NewType(
'CalibratedPositionWithNoisyReplicas', sc.Variable
)
Expand Down Expand Up @@ -73,10 +77,14 @@ def __init__(
self._coords = coords
self._edges = edges

@property
def replicas(self) -> int:
return self._replicas

@staticmethod
def from_coords(
coords: ProjectedCoords, resolution: DetectorViewResolution
) -> 'Histogrammer':
) -> Histogrammer:
"""
Create a histogrammer from coordinates and resolution.

Expand All @@ -102,7 +110,20 @@ def from_coords(
def __call__(self, da: sc.DataArray) -> sc.DataArray:
self._current += 1
coords = self._coords[self._replica_dim, self._current % self._replicas]
return sc.DataArray(da.data, coords=coords).hist(self._edges)
# If input is multi-dim we need to flatten since those dims cannot be preserved.
return sc.DataArray(da.data, coords=coords).flatten(to='_').hist(self._edges)

def input_indices(self) -> sc.DataArray:
"""Return an array with input indices corresponding to each histogram bin."""
dim = 'detector_number'
# For some projections one of the coords is a scalar, convert to flat table.
coords = self._coords.broadcast(sizes=self._coords.sizes).flatten(to=dim).copy()
ndet = sc.index(coords.sizes[dim] // self._replicas)
da = sc.DataArray(
sc.arange(dim, coords.sizes[dim], dtype='int64', unit=None) % ndet,
coords=coords,
)
return sc.DataArray(da.bin(self._edges).bins.data, coords=self._edges)


@dataclass
Expand Down Expand Up @@ -149,20 +170,27 @@ def __init__(self, detector_number: sc.Variable):
sc.zeros(sizes=detector_number.sizes, unit='counts', dtype='int32'),
coords={'detector_number': detector_number},
)
self._detector_number = detector_number
self._flat_detector_number = detector_number.flatten(to='event_id')
self._start = int(self._flat_detector_number[0].value)
self._stop = int(self._flat_detector_number[-1].value)
self._size = int(self._flat_detector_number.size)
if not sc.issorted(self._flat_detector_number, dim='event_id'):
raise ValueError("Detector numbers must be sorted.")
if self._stop - self._start + 1 != self._size:
raise ValueError("Detector numbers must be consecutive.")
self._sorted = sc.issorted(self._flat_detector_number, dim='event_id')
self._consecutive = self._stop - self._start + 1 == self._size

@property
def detector_number(self) -> sc.Variable:
return self._detector_number

@property
def data(self) -> sc.DataArray:
return self._data

def bincount(self, data: Sequence[int]) -> sc.DataArray:
if not self._sorted:
raise ValueError("Detector numbers must be sorted to use `bincount`.")
if not self._consecutive:
raise ValueError("Detector numbers must be consecutive to use `bincount`.")
offset = np.asarray(data, dtype=np.int32) - self._start
# Ignore events with detector numbers outside the range of the detector. This
# should not happen in valid files but for now it is useful until we are sure
Expand Down Expand Up @@ -215,8 +243,16 @@ def __init__(
self._current = 0
self._history: sc.DataArray | None = None
self._cache: sc.DataArray | None = None
self.clear_counts()

counts = self.bincount([])
def clear_counts(self) -> None:
"""
Clear counts.

Overrides Detector.clear_counts, to properly clear sliding window history and
cache.
"""
counts = sc.zeros_like(self.data)
if self._projection is not None:
counts = self._projection(counts)
self._history = (
Expand All @@ -226,12 +262,25 @@ def __init__(
)
self._cache = self._history.sum('window')

def make_roi_filter(self) -> roi.ROIFilter:
"""Return a ROI filter operating via the projection plane of the view."""
norm = 1.0
if isinstance(self._projection, Histogrammer):
indices = self._projection.input_indices()
norm = self._projection.replicas
else:
indices = sc.ones(sizes=self.data.sizes, dtype='int32', unit=None)
indices = sc.cumsum(indices, mode='exclusive')
if isinstance(self._projection, LogicalView):
indices = self._projection(indices)
return roi.ROIFilter(indices=indices, norm=norm)

@staticmethod
def from_detector_and_histogrammer(
detector: CalibratedDetector[SampleRun],
window: RollingDetectorViewWindow,
projection: Histogrammer,
) -> 'RollingDetectorView':
) -> RollingDetectorView:
"""Helper for constructing via a Sciline workflow."""
return RollingDetectorView(
detector_number=detector.coords['detector_number'],
Expand All @@ -244,7 +293,7 @@ def from_detector_and_logical_view(
detector: CalibratedDetector[SampleRun],
window: RollingDetectorViewWindow,
projection: LogicalView,
) -> 'RollingDetectorView':
) -> RollingDetectorView:
"""Helper for constructing via a Sciline workflow."""
return RollingDetectorView(
detector_number=detector.coords['detector_number'],
Expand All @@ -261,7 +310,7 @@ def from_nexus(
projection: Literal['xy_plane', 'cylinder_mantle_z'] | LogicalView,
resolution: dict[str, int] | None = None,
pixel_noise: Literal['cylindrical'] | sc.Variable | None = None,
) -> 'RollingDetectorView':
) -> RollingDetectorView:
"""
Create a rolling detector view from a NeXus file using GenericNeXusWorkflow.

Expand Down Expand Up @@ -343,8 +392,32 @@ def get(self, window: int | None = None) -> sc.DataArray:
data += self._history['window', 0 : self._current].sum('window')
return data

def add_events(self, data: sc.DataArray) -> None:
"""
Add counts in the form of events grouped by pixel ID.

Parameters
----------
data:
Events grouped by pixel ID, given by binned data.
"""
counts = data.bins.size().to(dtype='int32', copy=False)
counts.unit = 'counts'
self._add_counts(counts)

def add_counts(self, data: Sequence[int]) -> None:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will not be used by Beamlime anymore, keeping it for backwards compatibility.

"""
Add counts in the form of a sequence of pixel IDs.

Parameters
----------
data:
List of pixel IDs.
"""
counts = self.bincount(data)
self._add_counts(counts)

def _add_counts(self, counts: sc.Variable) -> None:
if self._projection is not None:
counts = self._projection(counts)
self._cache -= self._history['window', self._current]
Expand Down Expand Up @@ -455,9 +528,12 @@ def position_noise_for_cylindrical_pixel(


def gaussian_position_noise(sigma: PositionNoiseSigma) -> PositionNoise:
sigma = sigma.to(unit='m', copy=False)
size = _noise_size
position = sc.empty(sizes={'position': size}, unit='m', dtype=sc.DType.vector3)
position.values = np.random.default_rng().normal(0, sigma.value, size=(size, 3))
position.values = np.random.default_rng(seed=1234).normal(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this to turn the centers of the pixel positions into small gaussian clouds of points?
I was thinking maybe it should be random.uniform instead to avoid spill-over from one pixel over to the next?

But maybe using a normal distribution is better as it makes the clouds spherical, and they then look the same from any line-of-sight? Using uniform also suggests that the pixels are square in shape, but this is not really true for tubes...

Is the sigma approximately equal to half the width of the pixel?

0, sigma.value, size=(size, 3)
)
return PositionNoise(position)


Expand Down
114 changes: 114 additions & 0 deletions src/ess/reduce/live/roi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2025 Scipp contributors (https://github.com/scipp)
"""Utilities for region of interest (ROI) selection."""

from __future__ import annotations

from typing import TypeVar

import numpy as np
import scipp as sc


def select_indices_in_intervals(
intervals: sc.DataGroup[tuple[int, int] | tuple[sc.Variable, sc.Variable]],
indices: sc.Variable | sc.DataArray,
) -> sc.Variable:
"""
Return subset of indices that fall within the intervals.

Parameters
----------
intervals:
DataGroup with dimension names as keys and tuples of low and high values. This
can be used to define a band or a rectangle to selected. When low and high are
scipp.Variable, the selection is done using label-based indexing. In this case
`indices` must be a DataArray with corresponding coordinates.
indices:
Variable or DataArray with indices to select from. If binned data the selected
indices will be returned concatenated into a dense array.
"""
out_dim = 'index'
for dim, (low, high) in intervals.items():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At some point, we may be hooking this up to a graphical interface where a user will draw a rectangle by hand.
In some plotting libraries (e.g. MPL), when you request the bounds of a rectangle, the order of the bounds depends on how the user drew the rectangle.

If they started from the left and dragged towards the right, you get [left, right] as the bounds.
They they started from the right, you get [right, left] instead.
The same goes fro top/bottom.

We could end up in a situation where some of the tuples contain 2 values but the second one is lower than the first. The slicing below would then select nothing.

I recommend we add something like low, high = sorted(low, high) to make sure that does not happen further down the line.

indices = indices[dim, low:high]
indices = indices.flatten(to=out_dim)
if indices.bins is None:
return indices
indices = indices.bins.concat().value
return indices.rename_dims({indices.dim: out_dim})


T = TypeVar('T', sc.DataArray, sc.Variable)


def apply_selection(
data: T, *, selection: sc.Variable, norm: float = 1.0
) -> tuple[T, sc.Variable]:
"""
Apply selection to data.

Parameters
----------
data:
Data to filter.
selection:
Variable with indices to select.
norm:
Normalization factor to apply to the selected data. This is used for cases where
indices may be selected multiple times.

Returns
-------
:
Filtered data and scale factor.
"""
indices, counts = np.unique(selection.values, return_counts=True)
if data.ndim != 1:
data = data.flatten(to='detector_number')
scale = sc.array(dims=[data.dim], values=counts) / norm
return data[indices], scale


class ROIFilter:
"""Filter for selecting a region of interest (ROI)."""

def __init__(self, indices: sc.Variable | sc.DataArray, norm: float = 1.0) -> None:
"""
Create a new ROI filter.

Parameters
----------
indices:
Variable with indices to filter. The indices facilitate selecting a 2-D
ROI in a projection of a 3-D dataset. Typically the indices are given by a
2-D array. Each element in the array may correspond to a single index (when
there is no projection) or a list of indices that were projected into an
output pixel.
"""
self._indices = indices
self._selection = sc.array(dims=['index'], values=[])
self._norm = norm

def set_roi_from_intervals(self, intervals: sc.DataGroup) -> None:
"""Set the ROI from (typically 1 or 2) intervals."""
self._selection = select_indices_in_intervals(intervals, self._indices)

def apply(self, data: T) -> tuple[T, sc.Variable]:
"""
Apply the ROI filter to data.

The returned scale factor can be used to handle filtering via a projection, to
take into account that fractions of source data point contribute to a data point
in the projection.

Parameters
----------
data:
Data to filter.

Returns
-------
:
Filtered data and scale factor.
"""
return apply_selection(data, selection=self._selection, norm=self._norm)
Loading