diff --git a/src/tavi/data/nxentry.py b/src/tavi/data/nxentry.py index 6eba4903..4b2208e7 100644 --- a/src/tavi/data/nxentry.py +++ b/src/tavi/data/nxentry.py @@ -143,6 +143,8 @@ def _read_recursively(nexus_entry, items=None): value = str(value.asstr()[...]) except TypeError: # arrays instead value = value[...] + if not np.shape(value): # single element + value = value.tolist() if not attr_dict: # empty attributes items.update({key: {"dataset": value}}) diff --git a/src/tavi/data/scan.py b/src/tavi/data/scan.py index fb1e0d64..90c9f963 100644 --- a/src/tavi/data/scan.py +++ b/src/tavi/data/scan.py @@ -7,6 +7,7 @@ from tavi.data.nxentry import NexusEntry from tavi.data.plotter import Plot1D +from tavi.data.scan_data import ScanData1D from tavi.sample.xtal import Xtal from tavi.utilities import spice_to_mantid @@ -166,110 +167,25 @@ def get_data(self) -> dict: data_dict.update({name: self._nexus_dict.get(name)}) return data_dict - def _rebin_tol( - self, - x_raw: np.ndarray, - y_raw: np.ndarray, - y_str: str, - rebin_params: tuple, - norm_channel: Literal["time", "monitor", "mcu", None], - norm_val: float, - ): - """Rebin with tolerance""" - - rebin_min, rebin_max, rebin_step = rebin_params - if rebin_min is None: - rebin_min = np.min(x_raw) - if rebin_max is None: - rebin_max = np.max(x_raw) - - x_grid = np.arange(rebin_min + rebin_step / 2, rebin_max + rebin_step / 2, rebin_step) - x = np.zeros_like(x_grid) - y = np.zeros_like(x_grid) - counts = np.zeros_like(x_grid) - weights = np.zeros_like(x_grid) - yerr = None - - if norm_channel is None: # rebin, no renorm - weight_channel = self.scan_info.preset_channel - weight = self.data[weight_channel] - for i, x0 in enumerate(x_raw): - idx = np.nanargmax(x_grid + rebin_step / 2 >= x0) - y[idx] += y_raw[i] - x[idx] += x_raw[i] * weight[i] - weights[idx] += weight[i] - counts[idx] += 1 - - # errror bars for detector only - if "detector" in y_str: - yerr = np.sqrt(y) / counts - y = y / counts - x = x / weights - return (x, y, yerr) - - # rebin and renorm - norm = self.data[norm_channel] - for i, x0 in enumerate(x_raw): - idx = np.nanargmax(x_grid + rebin_step / 2 >= x0) - y[idx] += y_raw[i] - x[idx] += x_raw[i] * norm[i] - counts[idx] += norm[i] - - # errror bars for detector only - if "detector" in y_str: - yerr = np.sqrt(y) / counts * norm_val - y = y / counts * norm_val - x = x / counts - return (x, y, yerr) - - def _rebin_grid( - self, - x_raw: np.ndarray, - y_raw: np.ndarray, - y_str: str, - rebin_params: tuple, - norm_channel: Literal["time", "monitor", "mcu", None], - norm_val: float, - ): - """Rebin with a regular grid""" - - rebin_min, rebin_max, rebin_step = rebin_params - if rebin_min is None: - rebin_min = np.min(x_raw) - if rebin_max is None: - rebin_max = np.max(x_raw) - - x = np.arange(rebin_min + rebin_step / 2, rebin_max + rebin_step / 2, rebin_step) - y = np.zeros_like(x) - cts = np.zeros_like(x) - yerr = None - # rebin, no renorm - if norm_channel is None: - for i, x0 in enumerate(x_raw): - idx = np.nanargmax(x + rebin_step / 2 >= x0) - y[idx] += y_raw[i] - cts[idx] += 1 - - # errror bars for detector only - if "detector" in y_str: - yerr = np.sqrt(y) / cts - y = y / cts - return (x, y, yerr) - - # rebin and renorm - norm = self.data[norm_channel] - for i, x0 in enumerate(x_raw): - idx = np.nanargmax(x + rebin_step / 2 >= x0) - y[idx] += y_raw[i] - cts[idx] += norm[i] - - # errror bars for detector only - if "detector" in y_str: - yerr = np.sqrt(y) / cts * norm_val - y = y / cts * norm_val - return (x, y, yerr) - - def generate_curve( + @staticmethod + def validate_rebin_params(rebin_params): + if isinstance(rebin_params, tuple): + if len(rebin_params) != 3: + raise ValueError("Rebin parameters should have the form (min, max, step)") + rebin_min, rebin_max, rebin_step = rebin_params + if (rebin_min >= rebin_max) or (rebin_step < 0): + raise ValueError(f"Nonsensical rebin parameters {rebin_params}") + + elif isinstance(rebin_params, float | int): + if rebin_params < 0: + raise ValueError("Rebin step needs to be greater than zero.") + rebin_params = (None, None, float(rebin_params)) + + else: + raise ValueError(f"Unrecogonized rebin parameters {rebin_params}") + return rebin_params + + def get_plot_data( self, x_str: Optional[str] = None, y_str: Optional[str] = None, @@ -291,63 +207,51 @@ def generate_curve( take as (min, max, step) if a tuple of size 3 is given """ - if x_str is None: - x_str = self.scan_info.def_x - - if y_str is None: - y_str = self.scan_info.def_y + x_str = self.scan_info.def_x if x_str is None else x_str + y_str = self.scan_info.def_y if y_str is None else y_str - x_raw = self.data[x_str] - y_raw = self.data[y_str] - - yerr = None + scan_data = ScanData1D(x=self.data[x_str], y=self.data[y_str]) if rebin_type is None: # no rebin - x = x_raw - y = y_raw - # errror bars for detector only - if "detector" in y_str: - yerr = np.sqrt(y) - # normalize y-axis without rebining along x-axis - if norm_channel is not None: - norm = self.data[norm_channel] / norm_val - y = y / norm - if yerr is not None: - yerr = yerr / norm - - plot1d = Plot1D(x=x, y=y, yerr=yerr) + if norm_channel is not None: # normalize y-axis without rebining along x-axis + scan_data.renorm(norm_col=self.data[norm_channel] / norm_val) + + plot1d = Plot1D(x=scan_data.x, y=scan_data.y, yerr=scan_data.err) plot1d.make_labels(x_str, y_str, norm_channel, norm_val, self.scan_info) return plot1d - # validate rebin params - if isinstance(rebin_params, tuple): - if len(rebin_params) != 3: - raise ValueError("Rebin parameters should have the form (min, max, step)") - rebin_min, rebin_max, rebin_step = rebin_params - if (rebin_min >= rebin_max) or (rebin_step < 0): - raise ValueError(f"Nonsensical rebin parameters {rebin_params}") - - elif isinstance(rebin_params, float | int): - if rebin_params < 0: - raise ValueError("Rebin step needs to be greater than zero.") - rebin_params = (None, float(rebin_params), None) - else: - raise ValueError(f"Unrecogonized rebin parameters {rebin_params}") + # Rebin, first validate rebin params + rebin_params_tuple = Scan.validate_rebin_params(rebin_params) match rebin_type: - case "tol": # x weighted by normalization channel - x, y, yerr = self._rebin_tol(x_raw, y_raw, y_str, rebin_params, norm_channel, norm_val) + case "tol": + if norm_channel is None: # x weighted by preset channel + weight_channel = self.scan_info.preset_channel + scan_data.rebin_tol(rebin_params_tuple, weight_col=self.data[weight_channel]) + else: # x weighted by normalization channel + scan_data.rebin_tol_renorm( + rebin_params_tuple, + norm_col=self.data[norm_channel], + norm_val=norm_val, + ) case "grid": - x, y, yerr = self._rebin_grid(x_raw, y_raw, y_str, rebin_params, norm_channel, norm_val) + if norm_channel is None: + scan_data.rebin_grid(rebin_params_tuple) + else: + scan_data.rebin_grid_renorm( + rebin_params_tuple, + norm_col=self.data[norm_channel], + norm_val=norm_val, + ) case _: raise ValueError('Unrecogonized rebin type. Needs to be "tol" or "grid".') - plot1d = Plot1D(x=x, y=y, yerr=yerr) + plot1d = Plot1D(x=scan_data.x, y=scan_data.y, yerr=scan_data.err) plot1d.make_labels(x_str, y_str, norm_channel, norm_val, self.scan_info) return plot1d - def plot_curve( + def plot( self, x_str: Optional[str] = None, y_str: Optional[str] = None, @@ -358,7 +262,7 @@ def plot_curve( ): """Plot a 1D curve gnerated from a singal scan in a new window""" - plot1d = self.generate_curve(x_str, y_str, norm_channel, norm_val, rebin_type, rebin_step) + plot1d = self.get_plot_data(x_str, y_str, norm_channel, norm_val, rebin_type, rebin_step) fig, ax = plt.subplots() plot1d.plot_curve(ax) diff --git a/src/tavi/data/scan_data.py b/src/tavi/data/scan_data.py new file mode 100644 index 00000000..d4368b11 --- /dev/null +++ b/src/tavi/data/scan_data.py @@ -0,0 +1,110 @@ +# -*- coding: utf-8 -*- + + +import numpy as np + + +class ScanData1D(object): + + ZERO = 1e-6 + + def __init__(self, x: np.ndarray, y: np.ndarray) -> None: + + self.ind = np.argsort(x) + self.x = x[self.ind] + self.y = y[self.ind] + self.err = np.sqrt(y) + + def renorm(self, norm_col: np.ndarray, norm_val: float = 1.0): + """Renormalized to norm_val""" + norm_col = norm_col[self.ind] + self.y = self.y / norm_col * norm_val + self.err = self.err / norm_col * norm_val + + def rebin_tol(self, rebin_params: tuple, weight_col: np.ndarray): + """Rebin with tolerance""" + + rebin_min, rebin_max, rebin_step = rebin_params + rebin_min = np.min(self.x) if rebin_min is None else rebin_min + rebin_max = np.max(self.x) if rebin_max is None else rebin_max + + x_grid = np.arange(rebin_min + rebin_step / 2, rebin_max + rebin_step / 2, rebin_step) + x = np.zeros_like(x_grid) + y = np.zeros_like(x_grid) + counts = np.zeros_like(x_grid) + weights = np.zeros_like(x_grid) + + for i, x0 in enumerate(self.x): + idx = np.nanargmax(x_grid + rebin_step / 2 + ScanData1D.ZERO >= x0) + y[idx] += self.y[i] + x[idx] += self.x[i] * weight_col[i] + weights[idx] += weight_col[i] + counts[idx] += 1 + + self.err = np.sqrt(y) / counts + self.y = y / counts + self.x = x / weights + + def rebin_tol_renorm(self, rebin_params: tuple, norm_col: np.ndarray, norm_val: float = 1.0): + """Rebin with tolerance and renormalize""" + rebin_min, rebin_max, rebin_step = rebin_params + rebin_min = np.min(self.x) if rebin_min is None else rebin_min + rebin_max = np.max(self.x) if rebin_max is None else rebin_max + + x_grid = np.arange(rebin_min + rebin_step / 2, rebin_max + rebin_step / 2, rebin_step) + x = np.zeros_like(x_grid) + y = np.zeros_like(x_grid) + counts = np.zeros_like(x_grid) + + norm_col = norm_col[self.ind] + + for i, x0 in enumerate(self.x): + idx = np.nanargmax(x_grid + rebin_step / 2 + ScanData1D.ZERO >= x0) + y[idx] += self.y[i] + x[idx] += self.x[i] * norm_col[i] + counts[idx] += norm_col[i] + + self.err = np.sqrt(y) / counts * norm_val + self.y = y / counts * norm_val + self.x = x / counts + + def rebin_grid(self, rebin_params: tuple): + """Rebin with a regular grid""" + rebin_min, rebin_max, rebin_step = rebin_params + rebin_min = np.min(self.x) if rebin_min is None else rebin_min + rebin_max = np.max(self.x) if rebin_max is None else rebin_max + + x = np.arange(rebin_min + rebin_step / 2, rebin_max + rebin_step / 2, rebin_step) + y = np.zeros_like(x) + counts = np.zeros_like(x) + + for i, x0 in enumerate(self.x): + idx = np.nanargmax(x + rebin_step / 2 + ScanData1D.ZERO >= x0) + y[idx] += self.y[i] + counts[idx] += 1 + + self.x = x + self.err = np.sqrt(y) / counts + self.y = y / counts + + def rebin_grid_renorm(self, rebin_params: tuple, norm_col: np.ndarray, norm_val: float = 1.0): + """Rebin with a regular grid and renormalize""" + + rebin_min, rebin_max, rebin_step = rebin_params + rebin_min = np.min(self.x) if rebin_min is None else rebin_min + rebin_max = np.max(self.x) if rebin_max is None else rebin_max + + x = np.arange(rebin_min + rebin_step / 2, rebin_max + rebin_step / 2, rebin_step) + y = np.zeros_like(x) + counts = np.zeros_like(x) + + norm_col = norm_col[self.ind] + + for i, x0 in enumerate(self.x): # plus ZERO helps improve precision + idx = np.nanargmax(x + rebin_step / 2 + ScanData1D.ZERO >= x0) + y[idx] += self.y[i] + counts[idx] += norm_col[i] + + self.x = x + self.err = np.sqrt(y) / counts * norm_val + self.y = y / counts * norm_val diff --git a/src/tavi/data/scan_group.py b/src/tavi/data/scan_group.py index cb3ed9ab..25bfd8a1 100644 --- a/src/tavi/data/scan_group.py +++ b/src/tavi/data/scan_group.py @@ -1,5 +1,3 @@ -from typing import Optional - import matplotlib.pyplot as plt import numpy as np @@ -10,43 +8,21 @@ class ScanGroup(object): Atributes: name (string): Name of combined scans - signals (list of Scan objects): - backgrounds (list of Scan objects): - signal_axes (list): Can be ["s1", "s2", "detector"], - Or ["s1", "s2",["det_1", "det_2", "det_3"]]. - Default is (None, None, None) - background_axes (list): Default is (None, None, None) Methods: - generate_curve - plot_curve - generate_contour + get_plot_data plot_contour """ def __init__( self, - signals: list[int], - backgrounds=Optional[list[int]], - signal_axes=(None, None, None), - background_axes=(None, None, None), ): - self.signals = signals - self.backgrounds = backgrounds - self.signal_axes = list(signal_axes) - self.background_axes = list(background_axes) - self.name = "" - - def generate_curve(self): - pass - def plot_curve(self): - pass + self.name = "" - # TODO background subtraction # TODO non-orthogonal axes for constant E contours - def generate_contour( + def get_plot_data( self, norm_channel=None, norm_val=1, @@ -129,7 +105,7 @@ def generate_contour( return (xv, yv, z, x_step, y_step, xlabel, ylabel, zlabel, title) - def plot_contour(self, contour_plot, cmap="turbo", vmax=100, vmin=0, ylim=None, xlim=None): + def plot(self, contour_plot, cmap="turbo", vmax=100, vmin=0, ylim=None, xlim=None): """Plot contour""" x, y, z, _, _, xlabel, ylabel, zlabel, title = contour_plot @@ -148,108 +124,3 @@ def plot_contour(self, contour_plot, cmap="turbo", vmax=100, vmin=0, ylim=None, ax.set_ylim(bottom=ylim[0], top=ylim[1]) fig.show() - - # def plot_waterfall(self, contour_plot, shifts=None, ylim=None, xlim=None, fmt="o"): - # """Plot waterfall plot. - - # Note: - # Horizontal is Y-axis, vertical is Z-axis. Stacked along X-axis. - # """ - - # x, y, z, _, _, xlabel, ylabel, zlabel, title = contour_plot - - # num = len(x[0]) - - # if shifts is not None: - # if np.size(shifts) == 1: - # shifts = (shifts,) * num - # else: - # shifts = (0,) * num - - # fig, ax = plt.subplots() - # shift = 0 - # for i in range(num): - # if np.isnan(z[:, i]).all(): # all nan - # continue - # else: - # p = ax.errorbar( - # x=y[:, i], - # y=z[:, i] + shift, - # fmt=fmt, - # label=f"{xlabel}={np.round(x[0,i],3)}, shift={shift}", - # ) - # shift += shifts[i] - - # ax.set_title(title) - # ax.set_xlabel(ylabel) - # ax.set_ylabel(zlabel) - # ax.grid(alpha=0.6) - # ax.legend() - # if xlim is not None: - # ax.set_xlim(left=xlim[0], right=xlim[1]) - # if ylim is not None: - # ax.set_ylim(bottom=ylim[0], top=ylim[1]) - # fig.show() - - # def generate_waterfall_scans( - # self, - # rebin_type=None, - # rebin_step=0, - # norm_channel=None, - # norm_val=1, - # ): - # curves = [] - # num_scans = np.size(self.signals) - # signal_x, signal_y, _ = self.signal_axes - - # if np.size(signal_x) == 1: - # signal_x = [signal_x] * num_scans - # xlabel = signal_x[0] - # if np.size(signal_y) == 1: - # signal_y = [signal_y] * num_scans - # ylabel = signal_y[0] - # if norm_channel is not None: - # ylabel += f" / {norm_val} " + norm_channel - - # title = self.name - - # for i, signal in enumerate(self.signals): - # x, y, _, yerr, _, _, _, label = signal.generate_curve( - # x_str=signal_x[i], - # y_str=signal_y[i], - # norm_channel=norm_channel, - # norm_val=norm_val, - # rebin_type=rebin_type, - # rebin_step=rebin_step, - # ) - # curve = (x, y, yerr, label) - # curves.append(curve) - # waterfall = curves, xlabel, ylabel, title - # return waterfall - - # def plot_waterfall_scans(self, waterfall, shifts=None, ylim=None, xlim=None, fmt="o"): - - # curves, xlabel, ylabel, title = waterfall - # if shifts is not None: - # if np.size(shifts) == 1: - # shifts = (shifts,) * len(curves) - # else: - # shifts = (0,) * len(curves) - - # fig, ax = plt.subplots() - # shift = 0 - # for i, curve in enumerate(curves): - # shift += shifts[i] - # x, y, yerr, label = curve - # ax.errorbar(x, y + shift, yerr=yerr, label=label, fmt=fmt) - - # ax.set_title(title) - # ax.set_xlabel(xlabel) - # ax.set_ylabel(ylabel) - # ax.legend() - # if xlim is not None: - # ax.set_xlim(left=xlim[0], right=xlim[1]) - # if ylim is not None: - # ax.set_ylim(bottom=ylim[0], top=ylim[1]) - - # fig.show() diff --git a/src/tavi/data/scan_group_backup.py b/src/tavi/data/scan_group_backup.py new file mode 100644 index 00000000..cb3ed9ab --- /dev/null +++ b/src/tavi/data/scan_group_backup.py @@ -0,0 +1,255 @@ +from typing import Optional + +import matplotlib.pyplot as plt +import numpy as np + + +class ScanGroup(object): + """ + Manage combined scans + + Atributes: + name (string): Name of combined scans + signals (list of Scan objects): + backgrounds (list of Scan objects): + signal_axes (list): Can be ["s1", "s2", "detector"], + Or ["s1", "s2",["det_1", "det_2", "det_3"]]. + Default is (None, None, None) + background_axes (list): Default is (None, None, None) + + Methods: + generate_curve + plot_curve + generate_contour + plot_contour + """ + + def __init__( + self, + signals: list[int], + backgrounds=Optional[list[int]], + signal_axes=(None, None, None), + background_axes=(None, None, None), + ): + self.signals = signals + self.backgrounds = backgrounds + self.signal_axes = list(signal_axes) + self.background_axes = list(background_axes) + self.name = "" + + def generate_curve(self): + pass + + def plot_curve(self): + pass + + # TODO background subtraction + # TODO non-orthogonal axes for constant E contours + + def generate_contour( + self, + norm_channel=None, + norm_val=1, + rebin_steps=(None, None), + ): + """Generate a 2D contour plot""" + + num_scans = np.size(self.signals) + + signal_x, signal_y, signal_z = self.signal_axes + + if np.size(signal_x) == 1: + signal_x = [signal_x] * num_scans + xlabel = signal_x[0] + if np.size(signal_y) == 1: + signal_y = [signal_y] * num_scans + ylabel = signal_y[0] + if np.size(signal_z) == 1: + signal_z = [signal_z] * num_scans + zlabel = signal_z[0] + + # shape = (num_scans, num_pts) + # x_array = [scan.data[signal_x[i]] for i, scan in enumerate(self.signals)] + # y_array = [scan.data[signal_y[i]] for i, scan in enumerate(self.signals)] + + x_array = [getattr(scan.data, signal_x[i]) for i, scan in enumerate(self.signals)] + y_array = [getattr(scan.data, signal_y[i]) for i, scan in enumerate(self.signals)] + + x_min = np.min([np.min(np.round(x, 3)) for x in x_array]) + x_max = np.max([np.max(np.round(x, 3)) for x in x_array]) + y_min = np.min([np.min(np.round(y, 3)) for y in y_array]) + y_max = np.max([np.max(np.round(y, 3)) for y in y_array]) + + # TODO problem if irregular size + x_step, y_step = rebin_steps + if x_step is None: + x_precision = 1 + x_unique = np.unique(np.concatenate([np.unique(np.round(x, x_precision)) for x in x_array])) + x_diff = np.unique(np.round(np.diff(x_unique), x_precision)) + x_diff = x_diff[x_diff > 0] + x_step = x_diff[0] + + if y_step is None: + y_precision = 5 + y_unique = np.unique(np.concatenate([np.unique(np.round(y, y_precision)) for y in y_array])) + y_diff = np.unique(np.round(np.diff(y_unique), y_precision)) + y_diff = y_diff[y_diff > 0] + y_step = y_diff[0] + + x_list = np.round(np.arange(x_min, x_max + x_step / 2, x_step), 3) + y_list = np.round(np.arange(y_min, y_max + y_step / 2, y_step), 3) + # shape = (num_pts, num_scans) + xv, yv = np.meshgrid(x_list, y_list) + + # finding bin boxes + cts = np.zeros_like(xv) + z = np.zeros_like(xv) + for i in range(num_scans): + scan = self.signals[i] + scan_len = np.size(getattr(scan.data, signal_z[i])) + for j in range(scan_len): + # if SCAN_ALONG_Y: + x0 = getattr(scan.data, signal_x[i])[j] + y0 = getattr(scan.data, signal_y[i])[j] + z0 = getattr(scan.data, signal_z[i])[j] + idx = np.nanargmax(x_list + x_step / 2 >= x0) + idy = np.nanargmax(y_list + y_step / 2 >= y0) + z[idy, idx] += z0 + if norm_channel is None: + cts[idy, idx] += 1 + else: + cts[idy, idx] += getattr(scan.data, norm_channel)[j] / norm_val + + z = z / cts + + title = self.name + if norm_channel is not None: + zlabel += f" / {norm_val} " + norm_channel + title += f" nomralized by {norm_val} " + norm_channel + + return (xv, yv, z, x_step, y_step, xlabel, ylabel, zlabel, title) + + def plot_contour(self, contour_plot, cmap="turbo", vmax=100, vmin=0, ylim=None, xlim=None): + """Plot contour""" + + x, y, z, _, _, xlabel, ylabel, zlabel, title = contour_plot + + fig, ax = plt.subplots() + p = ax.pcolormesh(x, y, z, shading="auto", cmap=cmap, vmax=vmax, vmin=vmin) + fig.colorbar(p, ax=ax) + ax.set_title(title) + ax.set_xlabel(xlabel) + ax.set_ylabel(ylabel) + ax.grid(alpha=0.6) + + if xlim is not None: + ax.set_xlim(left=xlim[0], right=xlim[1]) + if ylim is not None: + ax.set_ylim(bottom=ylim[0], top=ylim[1]) + + fig.show() + + # def plot_waterfall(self, contour_plot, shifts=None, ylim=None, xlim=None, fmt="o"): + # """Plot waterfall plot. + + # Note: + # Horizontal is Y-axis, vertical is Z-axis. Stacked along X-axis. + # """ + + # x, y, z, _, _, xlabel, ylabel, zlabel, title = contour_plot + + # num = len(x[0]) + + # if shifts is not None: + # if np.size(shifts) == 1: + # shifts = (shifts,) * num + # else: + # shifts = (0,) * num + + # fig, ax = plt.subplots() + # shift = 0 + # for i in range(num): + # if np.isnan(z[:, i]).all(): # all nan + # continue + # else: + # p = ax.errorbar( + # x=y[:, i], + # y=z[:, i] + shift, + # fmt=fmt, + # label=f"{xlabel}={np.round(x[0,i],3)}, shift={shift}", + # ) + # shift += shifts[i] + + # ax.set_title(title) + # ax.set_xlabel(ylabel) + # ax.set_ylabel(zlabel) + # ax.grid(alpha=0.6) + # ax.legend() + # if xlim is not None: + # ax.set_xlim(left=xlim[0], right=xlim[1]) + # if ylim is not None: + # ax.set_ylim(bottom=ylim[0], top=ylim[1]) + # fig.show() + + # def generate_waterfall_scans( + # self, + # rebin_type=None, + # rebin_step=0, + # norm_channel=None, + # norm_val=1, + # ): + # curves = [] + # num_scans = np.size(self.signals) + # signal_x, signal_y, _ = self.signal_axes + + # if np.size(signal_x) == 1: + # signal_x = [signal_x] * num_scans + # xlabel = signal_x[0] + # if np.size(signal_y) == 1: + # signal_y = [signal_y] * num_scans + # ylabel = signal_y[0] + # if norm_channel is not None: + # ylabel += f" / {norm_val} " + norm_channel + + # title = self.name + + # for i, signal in enumerate(self.signals): + # x, y, _, yerr, _, _, _, label = signal.generate_curve( + # x_str=signal_x[i], + # y_str=signal_y[i], + # norm_channel=norm_channel, + # norm_val=norm_val, + # rebin_type=rebin_type, + # rebin_step=rebin_step, + # ) + # curve = (x, y, yerr, label) + # curves.append(curve) + # waterfall = curves, xlabel, ylabel, title + # return waterfall + + # def plot_waterfall_scans(self, waterfall, shifts=None, ylim=None, xlim=None, fmt="o"): + + # curves, xlabel, ylabel, title = waterfall + # if shifts is not None: + # if np.size(shifts) == 1: + # shifts = (shifts,) * len(curves) + # else: + # shifts = (0,) * len(curves) + + # fig, ax = plt.subplots() + # shift = 0 + # for i, curve in enumerate(curves): + # shift += shifts[i] + # x, y, yerr, label = curve + # ax.errorbar(x, y + shift, yerr=yerr, label=label, fmt=fmt) + + # ax.set_title(title) + # ax.set_xlabel(xlabel) + # ax.set_ylabel(ylabel) + # ax.legend() + # if xlim is not None: + # ax.set_xlim(left=xlim[0], right=xlim[1]) + # if ylim is not None: + # ax.set_ylim(bottom=ylim[0], top=ylim[1]) + + # fig.show() diff --git a/test_data/scan_to_nexus_test.h5 b/test_data/scan_to_nexus_test.h5 index 777570fd..a7d01e77 100644 Binary files a/test_data/scan_to_nexus_test.h5 and b/test_data/scan_to_nexus_test.h5 differ diff --git a/test_data/spice_to_nxdict_test_all.h5 b/test_data/spice_to_nxdict_test_all.h5 index 3c699028..a516779e 100644 Binary files a/test_data/spice_to_nxdict_test_all.h5 and b/test_data/spice_to_nxdict_test_all.h5 differ diff --git a/test_data/spice_to_nxdict_test_empty.h5 b/test_data/spice_to_nxdict_test_empty.h5 index 99e8b372..59ca3adf 100644 Binary files a/test_data/spice_to_nxdict_test_empty.h5 and b/test_data/spice_to_nxdict_test_empty.h5 differ diff --git a/test_data/spice_to_nxdict_test_scan0034.h5 b/test_data/spice_to_nxdict_test_scan0034.h5 index 5f94bdeb..80044015 100644 Binary files a/test_data/spice_to_nxdict_test_scan0034.h5 and b/test_data/spice_to_nxdict_test_scan0034.h5 differ diff --git a/test_data/tavi_test_exp424.h5 b/test_data/tavi_test_exp424.h5 index d4822b9f..3a13fc87 100644 Binary files a/test_data/tavi_test_exp424.h5 and b/test_data/tavi_test_exp424.h5 differ diff --git a/tests/test_scan.py b/tests/test_scan.py index 9e5803e9..51b8a854 100644 --- a/tests/test_scan.py +++ b/tests/test_scan.py @@ -41,7 +41,7 @@ def test_scan_from_nexus(): def test_generate_curve(): nexus_file_name = "./test_data/IPTS32124_CG4C_exp0424/scan0042.h5" scan = Scan.from_nexus(nexus_file_name) - plot = scan.generate_curve() + plot = scan.get_plot_data() x_data = np.arange(0.1, 4.1, 0.1) y_data = np.array( @@ -68,7 +68,7 @@ def test_generate_curve(): def test_generate_curve_norm(): nexus_file_name = "./test_data/IPTS32124_CG4C_exp0424/scan0042.h5" scan = Scan.from_nexus(nexus_file_name) - plot = scan.generate_curve(norm_channel="mcu", norm_val=5) + plot = scan.get_plot_data(norm_channel="mcu", norm_val=5) x_data = np.arange(0.1, 4.1, 0.1) y_data = np.array( @@ -85,7 +85,7 @@ def test_generate_curve_norm(): def test_generate_curve_rebin_grid(): nexus_file_name = "./test_data/IPTS32124_CG4C_exp0424/scan0042.h5" scan = Scan.from_nexus(nexus_file_name) - plot = scan.generate_curve(rebin_type="grid", rebin_params=0.25) + plot = scan.get_plot_data(rebin_type="grid", rebin_params=0.25) x_data = np.arange(0.225, 4.1, 0.25) y_data = np.array( @@ -111,7 +111,7 @@ def test_generate_curve_rebin_grid(): def test_generate_curve_rebin_grid_renorm(): nexus_file_name = "./test_data/IPTS32124_CG4C_exp0424/scan0042.h5" scan = Scan.from_nexus(nexus_file_name) - plot = scan.generate_curve( + plot = scan.get_plot_data( rebin_type="grid", rebin_params=(0.1, 5, 0.25), norm_channel="time", @@ -142,7 +142,7 @@ def test_generate_curve_rebin_grid_renorm(): def test_plot_scan_from_nexus(): nexus_file_name = "./test_data/IPTS32124_CG4C_exp0424/scan0042.h5" s1 = Scan.from_nexus(nexus_file_name) - plot1d = s1.generate_curve(norm_channel="mcu", norm_val=30) + plot1d = s1.get_plot_data(norm_channel="mcu", norm_val=30) assert plot1d.label == "scan 42" fig, ax = plt.subplots() plot1d.plot_curve(ax) diff --git a/tests/test_scan_data.py b/tests/test_scan_data.py new file mode 100644 index 00000000..4386b5ab --- /dev/null +++ b/tests/test_scan_data.py @@ -0,0 +1,78 @@ +# -*- coding: utf-8 -*- +import numpy as np +import pytest + +from tavi.data.scan_data import ScanData1D + + +@pytest.fixture +def scans(): + scan0001 = ScanData1D(x=np.array([0, 1, 2]), y=np.array([1, 2, 3])) + scan0002 = ScanData1D( + x=np.array([15, 15.1, 15.2, 15.3, 15.4, 15.1, 15.2, 15.3, 15.4, 15.5]), + y=np.array([10, 12, 15, 42, 90, 31, 34, 105, 230, 3]), + ) + + return ( + scan0001, + scan0002, + ) + + +def test_scan_data_1d_renorm(scans): + scan0001, *_ = scans + scan0001.renorm(norm_col=np.array([2, 2, 3]), norm_val=2) + assert np.allclose(scan0001.y, [1, 2, 2], atol=1e-3) + assert np.allclose(scan0001.err, [1, np.sqrt(2), np.sqrt(3) / 3 * 2], atol=1e-3) + + +def test_rebin_grid_renorm(scans): + _, scan0002, *_ = scans + + scan0002.rebin_grid_renorm( + rebin_params=(15.0, 15.5, 0.2), + norm_col=np.array([2, 2, 2, 2, 2, 5, 5, 5, 5, 5]), + norm_val=4.0, + ) + assert np.allclose( + scan0002.x, + [15.1, 15.3, 15.5], + atol=1e-3, + ) + assert np.allclose( + scan0002.y, + [ + (10 + 12 + 15 + 31 + 34) / (2 + 2 + 2 + 5 + 5) * 4, + (42 + 90 + 105 + 230) / (2 + 5 + 2 + 5) * 4, + (3) / (5) * 4, + ], + atol=1e-3, + ) + assert np.allclose( + scan0002.err, + [ + np.sqrt(10 + 12 + 15 + 31 + 34) / (2 + 2 + 2 + 5 + 5) * 4, + np.sqrt(42 + 90 + 105 + 230) / (2 + 5 + 2 + 5) * 4, + np.sqrt(3) / (5) * 4, + ], + atol=1e-3, + ) + + +def test_rebin_tol_renorm(scans): + _, scan0002, *_ = scans + + scan0002.rebin_tol_renorm( + rebin_params=(15.0, 15.5, 0.2), + norm_col=np.array([2, 2, 2, 2, 2, 5, 5, 5, 5, 5]), + norm_val=4.0, + ) + assert np.allclose( + scan0002.x, + [ + (2 * 15 + 2 * 15.1 + 5 * 15.1 + 2 * 15.2 + 5 * 15.2) / (2 + 2 + 5 + 2 + 5), + (2 * 15.3 + 5 * 15.3 + 2 * 15.4 + 5 * 15.4) / (2 + 2 + 5 + 5), + (5 * 15.5) / (5), + ], + atol=1e-3, + )