diff --git a/unpackaged/vesuvio_calibration/calibrate_vesuvio_uranium_martyn_MK5.py b/unpackaged/vesuvio_calibration/calibrate_vesuvio_uranium_martyn_MK5.py index dba94119..b7317059 100644 --- a/unpackaged/vesuvio_calibration/calibrate_vesuvio_uranium_martyn_MK5.py +++ b/unpackaged/vesuvio_calibration/calibrate_vesuvio_uranium_martyn_MK5.py @@ -16,7 +16,8 @@ from mantid.simpleapi import CreateEmptyTableWorkspace, DeleteWorkspace, CropWorkspace, RebinToWorkspace, Divide,\ ReplaceSpecialValues, FindPeaks, GroupWorkspaces, mtd, Plus, LoadVesuvio, LoadRaw, ConvertToDistribution,\ FindPeakBackground, ExtractSingleSpectrum, SumSpectra, AppendSpectra, ConvertTableToMatrixWorkspace,\ - ConjoinWorkspaces, Transpose, PlotPeakByLogValue, CloneWorkspace, Fit, RenameWorkspace + ConjoinWorkspaces, Transpose, PlotPeakByLogValue, CloneWorkspace, Fit, RenameWorkspace, MaskDetectors,\ + ExtractUnmaskedSpectra, CreateWorkspace from functools import partial @@ -105,6 +106,36 @@ def calculate_r_theta(sample_mass, thetas): r_theta = (np.cos(rad_theta) + np.sqrt( (sample_mass / NEUTRON_MASS_AMU)**2 - np.sin(rad_theta)**2 )) / ((sample_mass / NEUTRON_MASS_AMU) +1) return r_theta + +#---------------------------------------------------------------------------------------- + +def identify_invalid_spectra(peak_table, peak_centres, peak_centres_errors, spec_list): + """ + Inspect fitting results, and identify the fits associated with invalid spectra. These are spectra associated with detectors + which have lost foil coverage following a recent reduction in distance from source to detectors. + + @param peak_table - name of table containing fitted parameters each spectra. + @param spec_list - spectrum range to inspect. + @return a list of invalid spectra. + """ + peak_Gaussian_FWHM = read_fitting_result_table_column(peak_table, 'f1.GaussianFWHM', spec_list) + peak_Gaussian_FWHM_errors = read_fitting_result_table_column(peak_table, 'f1.GaussianFWHM_Err', spec_list) + peak_Lorentz_FWHM = read_fitting_result_table_column(peak_table, 'f1.LorentzFWHM', spec_list) + peak_Lorentz_FWHM_errors = read_fitting_result_table_column(peak_table, 'f1.LorentzFWHM_Err', spec_list) + peak_Lorentz_Amp = read_fitting_result_table_column(peak_table, 'f1.LorentzAmp', spec_list) + peak_Lorentz_Amp_errors = read_fitting_result_table_column(peak_table, 'f1.LorentzAmp_Err', spec_list) + + invalid_spectra = np.argwhere((np.isinf(peak_Lorentz_Amp_errors)) | (np.isnan(peak_Lorentz_Amp_errors)) | \ + (np.isinf(peak_centres_errors)) | (np.isnan(peak_centres_errors)) | \ + (np.isnan(peak_Gaussian_FWHM_errors)) | (np.isinf(peak_Gaussian_FWHM_errors)) | \ + (np.isnan(peak_Lorentz_FWHM_errors)) | (np.isinf(peak_Lorentz_FWHM_errors)) | \ + (np.isnan(peak_Lorentz_Amp_errors)) | (np.isinf(peak_Lorentz_Amp_errors)) | \ + (np.absolute(peak_Gaussian_FWHM_errors) > np.absolute(peak_Gaussian_FWHM)) | \ + (np.absolute(peak_Lorentz_FWHM_errors) > np.absolute(peak_Lorentz_FWHM)) | \ + (np.absolute(peak_Lorentz_Amp_errors) > np.absolute(peak_Lorentz_Amp)) | \ + (np.absolute(peak_centres_errors) > np.absolute(peak_centres))) + + return invalid_spectra #---------------------------------------------------------------------------------------- # The IP text file load function skips the first 3 rows of the text file. @@ -253,6 +284,10 @@ def PyInit(self): doc='Choose the peak type that is being fitted.' 'Note that supplying a set of dspacings overrides the setting here') + shared_fit_type_validator = StringListValidator(["Individual", "Shared", "Both"]) + self.declareProperty('SharedParameterFitType', "Individual", doc='Calculate shared parameters using an individual and/or' + 'global fit.', validator=shared_fit_type_validator) + self.declareProperty(ITableWorkspaceProperty("InstrumentParameterWorkspace", "", Direction.Input, PropertyMode.Optional), doc='Workspace contain instrument parameters.') @@ -299,6 +334,7 @@ def _setup_class_variables_from_properties(self): self._energy_estimates = self.getProperty("Energy").value self._sample_mass = self.getProperty("Mass").value self._create_output = self.getProperty("CreateOutput").value + self._shared_parameter_fit_type = self.getProperty("SharedParameterFitType").value def _setup_spectra_list(self): self._spec_list = self.getProperty("SpectrumRange").value.tolist() @@ -420,8 +456,9 @@ def _fit_bragg_peaks(self): self._output_params_to_table(spec_number, num_estimated_peaks, selected_params, output_parameters_tbl_name) output_workspaces.append( - self._get_output_and_clean_workspaces(fit_results_unconstrained is not None, unconstrained_fit_selected, find_peaks_output_name, - fit_peaks_output_name)) + self._get_output_and_clean_workspaces(fit_results_unconstrained is not None, + (fit_results_unconstrained is not None and fit_results_unconstrained is not False), + unconstrained_fit_selected, find_peaks_output_name, fit_peaks_output_name)) if self._create_output: GroupWorkspaces(','.join(output_workspaces), OutputWorkspace=self._output_workspace_name + "_Peak_Fits") @@ -431,7 +468,7 @@ def _get_unconstrained_ws_name(ws_name): return ws_name + '_unconstrained' def _create_output_parameters_table_ws(self, output_table_name, num_estimated_peaks): - table = CreateEmptyTableWorkspace() + table = CreateEmptyTableWorkspace(OutputWorkspace=output_table_name) col_headers = self._generate_column_headers(num_estimated_peaks) @@ -466,6 +503,8 @@ def _fit_peaks_to_spectra(self, workspace_index, spec_number, peak_estimates_lis if peaks_found: return self._filter_and_fit_found_peaks(workspace_index, peak_estimates_list, find_peaks_output_name, fit_peaks_output_name, x_range, unconstrained) + else: + return False def _filter_and_fit_found_peaks(self, workspace_index, peak_estimates_list, find_peaks_output_name, fit_peaks_output_name, x_range, unconstrained): @@ -519,7 +558,7 @@ def _output_params_to_table(self, spec_num, num_estimated_peaks, params, output_ mtd[output_table_name].addRow([spec_num] + row) - def _get_output_and_clean_workspaces(self, unconstrained_fit_performed, unconstrained_fit_selected, find_peaks_output_name, + def _get_output_and_clean_workspaces(self, unconstrained_fit_performed, unconstrained_peaks_found, unconstrained_fit_selected, find_peaks_output_name, fit_peaks_output_name): find_peaks_output_name_u = self._get_unconstrained_ws_name(find_peaks_output_name) fit_peaks_output_name_u = self._get_unconstrained_ws_name(fit_peaks_output_name) @@ -530,14 +569,17 @@ def _get_output_and_clean_workspaces(self, unconstrained_fit_performed, unconstr output_workspace = fit_peaks_output_name + '_Workspace' if unconstrained_fit_performed: - DeleteWorkspace(fit_peaks_output_name_u + '_NormalisedCovarianceMatrix') - DeleteWorkspace(fit_peaks_output_name_u + '_Parameters') DeleteWorkspace(find_peaks_output_name_u) - if unconstrained_fit_selected: - output_workspace = fit_peaks_output_name_u + '_Workspace' - DeleteWorkspace(fit_peaks_output_name + '_Workspace') - else: - DeleteWorkspace(fit_peaks_output_name_u + '_Workspace') + + if unconstrained_peaks_found: + DeleteWorkspace(fit_peaks_output_name_u + '_NormalisedCovarianceMatrix') + DeleteWorkspace(fit_peaks_output_name_u + '_Parameters') + + if unconstrained_fit_selected: + output_workspace = fit_peaks_output_name_u + '_Workspace' + DeleteWorkspace(fit_peaks_output_name + '_Workspace') + else: + DeleteWorkspace(fit_peaks_output_name_u + '_Workspace') return output_workspace @@ -749,6 +791,26 @@ def _fit_peaks(self): GroupWorkspaces(self._output_parameter_tables, OutputWorkspace=self._output_workspace_name + '_Peak_Parameters') + if self._shared_parameter_fit_type != "Individual": + self._shared_parameter_fit(output_parameter_table_name, output_parameter_table_headers) + + def _shared_parameter_fit(self, output_parameter_table_name, output_parameter_table_headers): + init_Gaussian_FWHM = read_fitting_result_table_column(output_parameter_table_name, 'f1.GaussianFWHM', self._spec_list) + init_Gaussian_FWHM = np.nanmean(init_Gaussian_FWHM[init_Gaussian_FWHM != 0]) + init_Lorentz_FWHM = read_fitting_result_table_column(output_parameter_table_name, 'f1.LorentzFWHM', self._spec_list) + init_Lorentz_FWHM = np.nanmean(init_Lorentz_FWHM[init_Lorentz_FWHM != 0]) + init_Lorentz_Amp = read_fitting_result_table_column(output_parameter_table_name, 'f1.LorentzAmp', self._spec_list) + init_Lorentz_Amp = np.nanmean(init_Lorentz_Amp[init_Lorentz_Amp != 0]) + init_Lorentz_Pos = read_fitting_result_table_column(output_parameter_table_name, 'f1.LorentzPos', self._spec_list) + init_Lorentz_Pos = np.nanmean(init_Lorentz_Pos[init_Lorentz_Pos != 0]) + + initial_params = {'A0': 0.0, 'A1': 0.0, 'LorentzAmp': init_Lorentz_Amp, 'LorentzPos': init_Lorentz_Pos, 'LorentzFWHM': init_Lorentz_FWHM, 'GaussianFWHM': init_Gaussian_FWHM} + + peak_centres = read_fitting_result_table_column(output_parameter_table_name, 'f1.LorentzPos', self._spec_list) + peak_centres_errors = read_fitting_result_table_column(output_parameter_table_name, 'f1.LorentzPos_Err', self._spec_list) + invalid_spectra = identify_invalid_spectra(output_parameter_table_name, peak_centres, peak_centres_errors, self._spec_list) + self._fit_shared_parameter(invalid_spectra, initial_params, output_parameter_table_headers) + def _fit_peak(self, peak_index, spec_index, peak_position, output_parameter_table_name, output_parameter_table_headers): spec_number = self._spec_list[0] + spec_index self._prog_reporter.report("Fitting peak %d to spectrum %d" % (peak_index, spec_number)) @@ -796,7 +858,6 @@ def _find_fit_x_window(self, peak_params): def _output_fit_params_to_table_ws(self, spec_num, params, output_table_name, output_table_headers): fit_values = dict(zip(params.column(0), params.column(1))) fit_errors = dict(zip(params.column(0), params.column(2))) - row_values = [fit_values[name] for name in output_table_headers] row_errors = [fit_errors[name] for name in output_table_headers] row = [element for tupl in zip(row_values, row_errors) for element in tupl] @@ -809,6 +870,79 @@ def _del_fit_workspaces(self, ncm, fit_params, fws): if not self._create_output: DeleteWorkspace(fws) +#---------------------------------------------------------------------------------------- + + def _fit_shared_parameter(self, invalid_spectra, initial_params, param_names): + """ + Fit peaks to all detectors, with one set of fit parameters for all detectors. + """ + + shared_peak_table = self._output_workspace_name + '_shared_peak_parameters' + CreateEmptyTableWorkspace(OutputWorkspace=shared_peak_table) + + err_names = [name + '_Err' for name in param_names] + col_names = [element for tupl in zip(param_names, err_names) for element in tupl] + + mtd[shared_peak_table].addColumn('int', 'Spectrum') + for name in col_names: + mtd[shared_peak_table].addColumn('double', name) + + fit_func = self._build_function_string(initial_params) + start_str = 'composite=MultiDomainFunction, NumDeriv=1;' + sample_ws = mtd[self._sample] + MaskDetectors(Workspace=sample_ws, WorkspaceIndexList=invalid_spectra) + validSpecs = ExtractUnmaskedSpectra(InputWorkspace=sample_ws, OutputWorkspace='valid_spectra') + n_valid_specs = validSpecs.getNumberHistograms() + + fit_func = ('(composite=CompositeFunction, NumDeriv=false, $domains=i;' + fit_func[:-1] + ');') * n_valid_specs + composite_func = start_str + fit_func[:-1] + + ties = ','.join(f'f{i}.f1.{p}=f0.f1.{p}' for p in self._func_param_names.values() for i in range(1,n_valid_specs)) + func_string = composite_func + f';ties=({ties})' + + fit_output_name = '__' + self._output_workspace_name + '_Peak_0' + + xmin, xmax = None, None + + # create new workspace for each spectra + x = validSpecs.readX(0) + y = validSpecs.readY(0) + e = validSpecs.readE(0) + out_ws = self._sample + '_Spec_0' + CreateWorkspace(DataX=x, DataY=y, DataE=e, NSpec=1, OutputWorkspace=out_ws) + + other_inputs = [CreateWorkspace(DataX=validSpecs.readX(j), DataY=validSpecs.readY(j), DataE=validSpecs.readE(j), + OutputWorkspace=f'{self._sample}_Spec_{j}') + for j in range(1,n_valid_specs)] + + added_args = {f'InputWorkspace_{i + 1}': v for i,v in enumerate(other_inputs)} + + print('starting global fit') + + status, chi2, ncm, params, fws, func, cost_func = Fit(Function=func_string, InputWorkspace=out_ws, IgnoreInvalidData=True, + StartX=xmin, EndX=xmax, + CalcErrors=True, Output=fit_output_name, + Minimizer='SteepestDescent,RelError=1e-8', **added_args) + [DeleteWorkspace(f"{self._sample}_Spec_{i}") for i in range(0,n_valid_specs)] + + fit_values = dict(zip(params.column(0), params.column(1))) + fit_errors = dict(zip(params.column(0), params.column(2))) + + row_values = [fit_values['f0.' + name] for name in param_names] + row_errors = [fit_errors['f0.' + name] for name in param_names] + row = [element for tupl in zip(row_values, row_errors) for element in tupl] + + mtd[shared_peak_table].addRow([0] + row) + + DeleteWorkspace(ncm) + DeleteWorkspace(params) + if not self._create_output: + DeleteWorkspace(fws) + + DeleteWorkspace(validSpecs) + + mtd[self._output_workspace_name + '_Peak_Parameters'].add(shared_peak_table) + #---------------------------------------------------------------------------------------- def _get_find_peak_parameters(self, spec_number, peak_centre, unconstrained=False): @@ -1137,8 +1271,9 @@ def _generate_fit_workspaces(self): """ Output the fit workspace for each spectrum fitted. """ - fit_workspaces = map(list, zip(*self._peak_fit_workspaces_by_spec)) + fit_workspaces = map(list, zip(*self._peak_fit_workspaces)) group_ws_names = [] + for i, peak_fits in enumerate(fit_workspaces): ws_name = self._output_workspace_name + '_%d_Workspace' % i ExtractSingleSpectrum(InputWorkspace=self._sample, WorkspaceIndex=i, OutputWorkspace=ws_name) @@ -1226,6 +1361,10 @@ def PyInit(self): self.declareProperty('Iterations', 2, validator=IntBoundedValidator(lower=1), doc="Number of iterations to perform. Default is 2.") + shared_fit_type_validator = StringListValidator(["Individual", "Shared", "Both"]) + self.declareProperty('SharedParameterFitType', "Individual", doc='Calculate shared parameters using an individual and/or' + 'global fit.', validator=shared_fit_type_validator) + self.declareProperty('CreateOutput', False, doc="Whether to create output from fitting.") @@ -1294,27 +1433,26 @@ def PyExec(self): self._theta_peak_fits = theta_fit + '_Peak_Parameters' self._calculate_scattering_angle(self._theta_peak_fits, DETECTOR_RANGE) - #calibrate E1 for backscattering detectors and use the backscattering averaged value for all detectors + #calibrate E1 for backscattering detectors and use the backscattering averaged value for all detectors E1_fit_back = self._current_workspace + '_E1_back' self._run_calibration_fit(Samples=self._samples, Function='Voigt', Mode='SingleDifference', SpectrumRange=BACKSCATTERING_RANGE, InstrumentParameterWorkspace=self._param_table, Mass=self._sample_mass, OutputWorkspace=E1_fit_back, CreateOutput=self._create_output, - PeakType='Recoil') - - - E1_peak_fits_back = mtd[self._current_workspace + '_E1_back_Peak_Parameters'].getNames()[0] - self._calculate_final_energy(E1_peak_fits_back, BACKSCATTERING_RANGE) + PeakType='Recoil', SharedParameterFitType=self._shared_parameter_fit_type) + + E1_peak_fits_back = mtd[self._current_workspace + '_E1_back_Peak_Parameters'].getNames() + self._calculate_final_energy(E1_peak_fits_back, BACKSCATTERING_RANGE, self._shared_parameter_fit_type != "Individual") # calibrate L1 for backscattering detectors based on the averaged E1 value and calibrated theta values - self._calculate_final_flight_path(E1_peak_fits_back, BACKSCATTERING_RANGE) + self._calculate_final_flight_path(E1_peak_fits_back[0], BACKSCATTERING_RANGE) # calibrate L1 for frontscattering detectors based on the averaged E1 value and calibrated theta values E1_fit_front = self._current_workspace + '_E1_front' self._run_calibration_fit(Samples=self._samples, Function='Voigt', Mode='SingleDifference', SpectrumRange=FRONTSCATTERING_RANGE, InstrumentParameterWorkspace=self._param_table, Mass=self._sample_mass, OutputWorkspace=E1_fit_front, CreateOutput=self._create_output, - PeakType='Recoil') - - E1_peak_fits_front = mtd[self._current_workspace + '_E1_front_Peak_Parameters'].getNames()[0] - self._calculate_final_flight_path(E1_peak_fits_front, FRONTSCATTERING_RANGE) + PeakType='Recoil', SharedParameterFitType=self._shared_parameter_fit_type) + + E1_peak_fits_front = mtd[self._current_workspace + '_E1_front_Peak_Parameters'].getNames() + self._calculate_final_flight_path(E1_peak_fits_front[0], FRONTSCATTERING_RANGE) #make the fitted parameters for this iteration the input to the next iteration. @@ -1428,7 +1566,7 @@ def _calculate_final_flight_path(self, peak_table, spec_list): peak_centres = read_fitting_result_table_column(peak_table, 'f1.LorentzPos', spec_list) peak_centres_errors = read_fitting_result_table_column(peak_table, 'f1.LorentzPos_Err', spec_list) - invalid_spectra = self._identify_invalid_spectra(peak_table, peak_centres, peak_centres_errors, spec_list) + invalid_spectra = identify_invalid_spectra(peak_table, peak_centres, peak_centres_errors, spec_list) peak_centres[invalid_spectra] = np.nan print(f'Invalid Spectra Index Found and Marked NAN: {invalid_spectra.flatten()} from Spectra Index List:' @@ -1445,38 +1583,6 @@ def _calculate_final_flight_path(self, peak_table, spec_list): self._set_table_column(self._current_workspace, 'L1', L1, spec_list) self._set_table_column(self._current_workspace, 'L1_Err', L1_error, spec_list) -#---------------------------------------------------------------------------------------- - - def _identify_invalid_spectra(self, peak_table, peak_centres, peak_centres_errors, spec_list): - """ - Inspect fitting results, and identify the fits associated with invalid spectra. These are spectra associated with detectors - which have lost foil coverage following a recent reduction in distance from source to detectors. - - @param peak_table - name of table containing fitted parameters each spectra. - @param peak_centres - a list of the found peak centres - @param peak_centres_errors - a list of errors associated with the peak centres - @param spec_list - spectrum range to inspect. - @return a list of invalid spectra. - """ - peak_Gaussian_FWHM = read_fitting_result_table_column(peak_table, 'f1.GaussianFWHM', spec_list) - peak_Gaussian_FWHM_errors = read_fitting_result_table_column(peak_table, 'f1.GaussianFWHM_Err', spec_list) - peak_Lorentz_FWHM = read_fitting_result_table_column(peak_table, 'f1.LorentzFWHM', spec_list) - peak_Lorentz_FWHM_errors = read_fitting_result_table_column(peak_table, 'f1.LorentzFWHM_Err', spec_list) - peak_Lorentz_Amp = read_fitting_result_table_column(peak_table, 'f1.LorentzAmp', spec_list) - peak_Lorentz_Amp_errors = read_fitting_result_table_column(peak_table, 'f1.LorentzAmp_Err', spec_list) - - invalid_spectra = np.argwhere((np.isinf(peak_Lorentz_Amp_errors)) | (np.isnan(peak_Lorentz_Amp_errors)) | \ - (np.isinf(peak_centres_errors)) | (np.isnan(peak_centres_errors)) | \ - (np.isnan(peak_Gaussian_FWHM_errors)) | (np.isinf(peak_Gaussian_FWHM_errors)) | \ - (np.isnan(peak_Lorentz_FWHM_errors)) | (np.isinf(peak_Lorentz_FWHM_errors)) | \ - (np.isnan(peak_Lorentz_Amp_errors)) | (np.isinf(peak_Lorentz_Amp_errors)) | \ - (np.absolute(peak_Gaussian_FWHM_errors) > np.absolute(peak_Gaussian_FWHM)) | \ - (np.absolute(peak_Lorentz_FWHM_errors) > np.absolute(peak_Lorentz_FWHM)) | \ - (np.absolute(peak_Lorentz_Amp_errors) > np.absolute(peak_Lorentz_Amp)) | \ - (np.absolute(peak_centres_errors) > np.absolute(peak_centres))) - - return invalid_spectra - # ---------------------------------------------------------------------------------------- def _calculate_scattering_angle(self, table_name, spec_list): @@ -1523,9 +1629,9 @@ def _calculate_scattering_angle(self, table_name, spec_list): #---------------------------------------------------------------------------------------- - - def _calculate_final_energy(self, peak_table, spec_list): + + def _calculate_final_energy(self, peak_table, spec_list, calculate_global): """ Calculate the final energy using the fitted peak centres of a run. @@ -1536,6 +1642,8 @@ def _calculate_final_energy(self, peak_table, spec_list): spec_range = DETECTOR_RANGE[1] + 1 - DETECTOR_RANGE[0] mean_E1 = np.empty(spec_range) E1_error = np.empty(spec_range) + global_E1 = np.empty(spec_range) + global_E1_error = np.empty(spec_range) if not self._E1_value_and_error: t0 = read_table_column(self._current_workspace, 't0', spec_list) @@ -1545,10 +1653,10 @@ def _calculate_final_energy(self, peak_table, spec_list): theta = read_table_column(self._current_workspace, 'theta', spec_list) r_theta = calculate_r_theta(self._sample_mass, theta) - peak_centres = read_fitting_result_table_column(peak_table, 'f1.LorentzPos', spec_list) - peak_centres_errors = read_fitting_result_table_column(peak_table, 'f1.LorentzPos_Err', spec_list) + peak_centres = read_fitting_result_table_column(peak_table[0], 'f1.LorentzPos', spec_list) + peak_centres_errors = read_fitting_result_table_column(peak_table[0], 'f1.LorentzPos_Err', spec_list) - invalid_spectra = self._identify_invalid_spectra(peak_table, peak_centres, peak_centres_errors, spec_list) + invalid_spectra = identify_invalid_spectra(peak_table[0], peak_centres, peak_centres_errors, spec_list) peak_centres[invalid_spectra] = np.nan delta_t = (peak_centres - t0) / 1e+6 @@ -1559,16 +1667,35 @@ def _calculate_final_energy(self, peak_table, spec_list): mean_E1_val = np.nanmean(E1) E1_error_val = np.nanstd(E1) + else: mean_E1_val = self._E1_value_and_error[0] E1_error_val = self._E1_value_and_error[1] - + mean_E1.fill(mean_E1_val) E1_error.fill(E1_error_val) self._set_table_column(self._current_workspace, 'E1', mean_E1) self._set_table_column(self._current_workspace, 'E1_Err', E1_error) + if calculate_global: # This fn will need updating for the only global option + peak_centre = read_fitting_result_table_column(peak_table[1], 'f1.LorentzPos', [0]) + peak_centre = [peak_centre] * len(peak_centres) + + delta_t = (peak_centre - t0) / 1e+6 + v1 = (L0 * r_theta + L1) / delta_t + E1 = 0.5*scipy.constants.m_n*v1**2 + E1 /= MEV_CONVERSION + + global_E1_val = np.nanmean(E1) + global_E1_error_val = np.nanstd(E1) + + global_E1.fill(global_E1_val) + global_E1_error.fill(global_E1_error_val) + + self._set_table_column(self._current_workspace, 'global_E1', global_E1) + self._set_table_column(self._current_workspace, 'global_E1_Err', global_E1_error) + #---------------------------------------------------------------------------------------- def _setup(self): @@ -1581,6 +1708,7 @@ def _setup(self): self._sample_mass = self.getProperty("Mass").value self._d_spacings = self.getProperty("DSpacings").value.tolist() self._E1_value_and_error = self.getProperty("E1FixedValueAndError").value.tolist() + self._shared_parameter_fit_type = self.getProperty("SharedParameterFitType").value self._calc_L0 = self.getProperty("CalculateL0").value self._make_IP_file = self.getProperty("CreateIPFile").value self._output_workspace_name = self.getPropertyValue("OutputWorkspace") diff --git a/unpackaged/vesuvio_calibration/tests/system/test_system.py b/unpackaged/vesuvio_calibration/tests/system/test_system.py index 0f76ab49..aa52bf06 100644 --- a/unpackaged/vesuvio_calibration/tests/system/test_system.py +++ b/unpackaged/vesuvio_calibration/tests/system/test_system.py @@ -319,6 +319,41 @@ def test_copper_with_multiple_iterations(self, load_file_mock): 165, 167, 168, 169, 170, 182, 191, 192]}) self.assert_parameters_match_expected(params_table, detector_specific_r_tols) + @patch('unpackaged.vesuvio_calibration.calibrate_vesuvio_uranium_martyn_MK5.EVSCalibrationFit._load_file') + def test_copper_create_output(self, load_file_mock): + self._setup_copper_test() + self._output_workspace = "copper_analysis_test" + + load_file_mock.side_effect = self._load_file_side_effect + + self.create_evs_calibration_alg() + self._alg.setProperty("CreateOutput", True) + params_table = self.execute_evs_calibration_analysis() + + # Specify detectors tolerances set by user, then update with those to mask as invalid. + detector_specific_r_tols = {"L1": {116: 0.65, 170: 0.75}} + detector_specific_r_tols["L1"].update({k: INVALID_DETECTOR for k in [138, 141, 146, 147, 156, 158, 160, 163, 164, + 165, 167, 168, 169, 170, 182, 191, 192]}) + self.assert_parameters_match_expected(params_table, detector_specific_r_tols) + + @patch('unpackaged.vesuvio_calibration.calibrate_vesuvio_uranium_martyn_MK5.EVSCalibrationFit._load_file') + def test_copper_with_individual_and_global_fit(self, load_file_mock): + self._setup_copper_test() + self._output_workspace = "copper_analysis_test" + + load_file_mock.side_effect = self._load_file_side_effect + + self.create_evs_calibration_alg() + self._alg.setProperty("SharedParameterFitType", "Both") + params_table = self.execute_evs_calibration_analysis() + + # Specify detectors tolerances set by user, then update with those to mask as invalid. + detector_specific_r_tols = {"L1": {116: 0.65, 170: 0.75}} + detector_specific_r_tols["L1"].update({k: INVALID_DETECTOR for k in [138, 141, 146, 147, 156, 158, 160, 163, 164, + 165, 167, 168, 169, 170, 182, 191, 192]}) + self.assert_parameters_match_expected(params_table, detector_specific_r_tols) + self.assert_E1_parameters_match_expected(params_table, 3e-3, "Both") + def tearDown(self): mtd.clear() @@ -349,6 +384,15 @@ def assert_L1_parameters_match_expected(self, params_table, rel_tolerance): return _assert_allclose_excluding_bad_detectors(np.ma.masked_array(actual_L1, mask=invalid_detector_mask), np.ma.masked_array(L1, mask=invalid_detector_mask), rel_tolerance) + def assert_E1_parameters_match_expected(self, params_table, rel_tolerance, fit_type): + if fit_type != "Shared": + E1 = params_table.column('E1')[0] + self.assertAlmostEqual(E1, ENERGY_ESTIMATE, delta=ENERGY_ESTIMATE*rel_tolerance) + + if fit_type != "Individual": + global_E1 = params_table.column('global_E1')[0] + self.assertAlmostEqual(global_E1, ENERGY_ESTIMATE, delta=ENERGY_ESTIMATE*rel_tolerance) + def assert_parameters_match_expected(self, params_table, tolerances=None): rel_tol_theta, rel_tol_L1 = self._extract_tolerances(tolerances) theta_errors = self.assert_theta_parameters_match_expected(params_table, rel_tol_theta) diff --git a/unpackaged/vesuvio_calibration/tests/unit/test_calibrate_vesuvio_analysis.py b/unpackaged/vesuvio_calibration/tests/unit/test_calibrate_vesuvio_analysis.py index 6e2cb6af..eca5ce17 100644 --- a/unpackaged/vesuvio_calibration/tests/unit/test_calibrate_vesuvio_analysis.py +++ b/unpackaged/vesuvio_calibration/tests/unit/test_calibrate_vesuvio_analysis.py @@ -1,8 +1,6 @@ from unpackaged.vesuvio_calibration.calibrate_vesuvio_uranium_martyn_MK5 import EVSCalibrationAnalysis -from mock import MagicMock, patch import unittest -import numpy as np class TestVesuvioCalibrationAnalysis(unittest.TestCase): @@ -13,56 +11,8 @@ def setUpClass(cls): def setUp(self): pass - @patch('unpackaged.vesuvio_calibration.calibrate_vesuvio_uranium_martyn_MK5.mtd') - def test_identify_invalid_spectra_no_invalid(self, mock_mtd): - alg = EVSCalibrationAnalysis() - ws_mock = MagicMock() - mock_column_output_dict = {'f1.GaussianFWHM': [1, 2, 3], 'f1.GaussianFWHM_Err': [0.1, 0.2, 0.3], - 'f1.LorentzFWHM': [1.1, 2.1, 3.1], 'f1.LorentzFWHM_Err': [0.11, 0.21, 0.31], - 'f1.LorentzAmp': [1.2, 2.2, 3.2], 'f1.LorentzAmp_Err': [0.12, 0.22, 0.32]} - ws_mock.column.side_effect = lambda key: mock_column_output_dict[key] - mock_mtd.__getitem__.return_value = ws_mock - - np.testing.assert_equal(np.argwhere([]), alg._identify_invalid_spectra('peak_table', [5, 10, 20], [0.1, 0.15, 0.2], [0, 2])) - - @patch('unpackaged.vesuvio_calibration.calibrate_vesuvio_uranium_martyn_MK5.mtd') - def test_identify_invalid_spectra_nan_in_errors(self, mock_mtd): - alg = EVSCalibrationAnalysis() - ws_mock = MagicMock() - mock_column_output_dict = {'f1.GaussianFWHM': [1, 2, 3], 'f1.GaussianFWHM_Err': [np.nan, 0.2, 0.3], - 'f1.LorentzFWHM': [1.1, 2.1, 3.1], 'f1.LorentzFWHM_Err': [0.11, 0.21, 0.31], - 'f1.LorentzAmp': [1.2, 2.2, 3.2], 'f1.LorentzAmp_Err': [0.12, 0.22, np.nan]} - ws_mock.column.side_effect = lambda key: mock_column_output_dict[key] - mock_mtd.__getitem__.return_value = ws_mock - - np.testing.assert_equal(np.argwhere(np.array([True, False, True])), - alg._identify_invalid_spectra('peak_table', [5, 10, 20], [0.1, 0.15, 0.2], [0, 2])) - - @patch('unpackaged.vesuvio_calibration.calibrate_vesuvio_uranium_martyn_MK5.mtd') - def test_identify_invalid_spectra_inf_in_errors(self, mock_mtd): - alg = EVSCalibrationAnalysis() - ws_mock = MagicMock() - mock_column_output_dict = {'f1.GaussianFWHM': [1, 2, 3], 'f1.GaussianFWHM_Err': [0.1, 0.2, 0.3], - 'f1.LorentzFWHM': [1.1, 2.1, 3.1], 'f1.LorentzFWHM_Err': [np.inf, 0.21, 0.31], - 'f1.LorentzAmp': [1.2, 2.2, 3.2], 'f1.LorentzAmp_Err': [0.12, np.inf, 0.32]} - ws_mock.column.side_effect = lambda key: mock_column_output_dict[key] - mock_mtd.__getitem__.return_value = ws_mock - - np.testing.assert_equal(np.argwhere(np.array([True, True, False])), - alg._identify_invalid_spectra('peak_table', [5, 10, 20], [0.1, 0.15, 0.2], [0, 2])) - - @patch('unpackaged.vesuvio_calibration.calibrate_vesuvio_uranium_martyn_MK5.mtd') - def test_identify_invalid_spectra_error_greater_than_peak(self, mock_mtd): - alg = EVSCalibrationAnalysis() - ws_mock = MagicMock() - mock_column_output_dict = {'f1.GaussianFWHM': [1, 2, 3], 'f1.GaussianFWHM_Err': [0.1, 0.2, 0.3], - 'f1.LorentzFWHM': [1.1, 2.1, 3.1], 'f1.LorentzFWHM_Err': [0.11, 10, 0.31], - 'f1.LorentzAmp': [1.2, 2.2, 3.2], 'f1.LorentzAmp_Err': [0.12, 0.22, 10]} - ws_mock.column.side_effect = lambda key: mock_column_output_dict[key] - mock_mtd.__getitem__.return_value = ws_mock - - np.testing.assert_equal(np.argwhere(np.array([False, True, True])), - alg._identify_invalid_spectra('peak_table', [5, 10, 20], [0.1, 0.15, 0.2], [0, 2])) + def test_placeholder(self): + pass if __name__ == '__main__': diff --git a/unpackaged/vesuvio_calibration/tests/unit/test_calibrate_vesuvio_fit.py b/unpackaged/vesuvio_calibration/tests/unit/test_calibrate_vesuvio_fit.py index bc4181e2..cc3e0921 100644 --- a/unpackaged/vesuvio_calibration/tests/unit/test_calibrate_vesuvio_fit.py +++ b/unpackaged/vesuvio_calibration/tests/unit/test_calibrate_vesuvio_fit.py @@ -641,12 +641,13 @@ def test_load_file_raw(self, mock_load_vesuvio, mock_load_raw, mock_convert_to_d mock_convert_to_dist.assert_called_once_with(output_name, EnableLogging=False) @patch('unpackaged.vesuvio_calibration.calibrate_vesuvio_uranium_martyn_MK5.GroupWorkspaces') - def test_fit_peaks(self, group_workspaces_mock): + def test_fit_peaks_individual(self, group_workspaces_mock): alg = EVSCalibrationFit() alg._estimate_peak_positions = MagicMock(return_value=np.asarray([[5, 10, 15], [2.5, 7.5, 10.5]])) alg._create_parameter_table_and_output_headers = MagicMock(return_value=['a', 'b', 'c']) alg._fit_peak = MagicMock(side_effect=lambda a, b, c, d, e: f'fit_ws_{a}_{b}') alg._output_workspace_name = 'output_ws_name' + alg._shared_parameter_fit_type = 'Individual' alg._fit_peaks() alg._create_parameter_table_and_output_headers.assert_has_calls([call('output_ws_name_Peak_0_Parameters'), call('output_ws_name_Peak_1_Parameters')]) @@ -661,6 +662,54 @@ def test_fit_peaks(self, group_workspaces_mock): group_workspaces_mock.assert_called_once_with(['output_ws_name_Peak_0_Parameters', 'output_ws_name_Peak_1_Parameters'], OutputWorkspace='output_ws_name_Peak_Parameters') + @patch('unpackaged.vesuvio_calibration.calibrate_vesuvio_uranium_martyn_MK5.EVSCalibrationFit._shared_parameter_fit') + @patch('unpackaged.vesuvio_calibration.calibrate_vesuvio_uranium_martyn_MK5.GroupWorkspaces') + def test_fit_peaks_shared(self, group_workspaces_mock, shared_parameter_fit_mock): + alg = EVSCalibrationFit() + alg._estimate_peak_positions = MagicMock(return_value=np.asarray([[5, 10, 15], [2.5, 7.5, 10.5]])) + alg._create_parameter_table_and_output_headers = MagicMock(return_value=['a', 'b', 'c']) + alg._fit_peak = MagicMock(side_effect=lambda a, b, c, d, e: f'fit_ws_{a}_{b}') + alg._output_workspace_name = 'output_ws_name' + alg._shared_parameter_fit_type = 'Shared' + alg._fit_peaks() + alg._create_parameter_table_and_output_headers.assert_has_calls([call('output_ws_name_Peak_0_Parameters'), + call('output_ws_name_Peak_1_Parameters')]) + alg._fit_peak.assert_has_calls([call(0, 0, 5, 'output_ws_name_Peak_0_Parameters', ['a', 'b', 'c']), + call(0, 1, 10, 'output_ws_name_Peak_0_Parameters', ['a', 'b', 'c']), + call(0, 2, 15, 'output_ws_name_Peak_0_Parameters', ['a', 'b', 'c']), + call(1, 0, 2.5, 'output_ws_name_Peak_1_Parameters', ['a', 'b', 'c']), + call(1, 1, 7.5, 'output_ws_name_Peak_1_Parameters', ['a', 'b', 'c']), + call(1, 2, 10.5, 'output_ws_name_Peak_1_Parameters', ['a', 'b', 'c'])]) + self.assertEqual([['fit_ws_0_0', 'fit_ws_0_1', 'fit_ws_0_2'], ['fit_ws_1_0', 'fit_ws_1_1', 'fit_ws_1_2']], + alg._peak_fit_workspaces) + group_workspaces_mock.assert_called_once_with(['output_ws_name_Peak_0_Parameters', 'output_ws_name_Peak_1_Parameters'], + OutputWorkspace='output_ws_name_Peak_Parameters') + shared_parameter_fit_mock.assert_called_once_with('output_ws_name_Peak_1_Parameters', ['a', 'b', 'c']) + + @patch('unpackaged.vesuvio_calibration.calibrate_vesuvio_uranium_martyn_MK5.EVSCalibrationFit._shared_parameter_fit') + @patch('unpackaged.vesuvio_calibration.calibrate_vesuvio_uranium_martyn_MK5.GroupWorkspaces') + def test_fit_peaks_both(self, group_workspaces_mock, shared_parameter_fit_mock): + alg = EVSCalibrationFit() + alg._estimate_peak_positions = MagicMock(return_value=np.asarray([[5, 10, 15], [2.5, 7.5, 10.5]])) + alg._create_parameter_table_and_output_headers = MagicMock(return_value=['a', 'b', 'c']) + alg._fit_peak = MagicMock(side_effect=lambda a, b, c, d, e: f'fit_ws_{a}_{b}') + alg._output_workspace_name = 'output_ws_name' + alg._shared_parameter_fit_type = 'Both' + alg._fit_peaks() + alg._create_parameter_table_and_output_headers.assert_has_calls([call('output_ws_name_Peak_0_Parameters'), + call('output_ws_name_Peak_1_Parameters')]) + alg._fit_peak.assert_has_calls([call(0, 0, 5, 'output_ws_name_Peak_0_Parameters', ['a', 'b', 'c']), + call(0, 1, 10, 'output_ws_name_Peak_0_Parameters', ['a', 'b', 'c']), + call(0, 2, 15, 'output_ws_name_Peak_0_Parameters', ['a', 'b', 'c']), + call(1, 0, 2.5, 'output_ws_name_Peak_1_Parameters', ['a', 'b', 'c']), + call(1, 1, 7.5, 'output_ws_name_Peak_1_Parameters', ['a', 'b', 'c']), + call(1, 2, 10.5, 'output_ws_name_Peak_1_Parameters', ['a', 'b', 'c'])]) + self.assertEqual([['fit_ws_0_0', 'fit_ws_0_1', 'fit_ws_0_2'], ['fit_ws_1_0', 'fit_ws_1_1', 'fit_ws_1_2']], + alg._peak_fit_workspaces) + group_workspaces_mock.assert_called_once_with(['output_ws_name_Peak_0_Parameters', 'output_ws_name_Peak_1_Parameters'], + OutputWorkspace='output_ws_name_Peak_Parameters') + shared_parameter_fit_mock.assert_called_once_with('output_ws_name_Peak_1_Parameters', ['a', 'b', 'c']) + @staticmethod def _setup_alg_mocks_fit_peak(): alg = EVSCalibrationFit() @@ -907,7 +956,7 @@ def test_get_output_and_clean_workspaces_unconstrained_not_performed(self, mock_ alg = EVSCalibrationFit() find_peaks_output_name = 'test_find_output_name' fit_peaks_output_name = 'test_fit_output_name' - output_ws = alg._get_output_and_clean_workspaces(False, False, find_peaks_output_name, fit_peaks_output_name) + output_ws = alg._get_output_and_clean_workspaces(False, False, False, find_peaks_output_name, fit_peaks_output_name) mock_delete_ws.assert_has_calls([call(fit_peaks_output_name + '_NormalisedCovarianceMatrix'), call(fit_peaks_output_name + '_Parameters'), call(find_peaks_output_name)]) @@ -918,14 +967,14 @@ def test_get_output_and_clean_workspaces_unconstrained_performed(self, mock_dele alg = EVSCalibrationFit() find_peaks_output_name = 'test_find_output_name' fit_peaks_output_name = 'test_fit_output_name' - output_ws = alg._get_output_and_clean_workspaces(True, False, find_peaks_output_name, fit_peaks_output_name) + output_ws = alg._get_output_and_clean_workspaces(True, True, False, find_peaks_output_name, fit_peaks_output_name) mock_delete_ws.assert_has_calls([call(fit_peaks_output_name + '_NormalisedCovarianceMatrix'), - call(fit_peaks_output_name + '_Parameters'), - call(find_peaks_output_name), - call(fit_peaks_output_name + '_unconstrained' + '_NormalisedCovarianceMatrix'), - call(fit_peaks_output_name + '_unconstrained' + '_Parameters'), - call(find_peaks_output_name + '_unconstrained'), - call(fit_peaks_output_name + '_unconstrained' + '_Workspace')]) + call(fit_peaks_output_name + '_Parameters'), + call(find_peaks_output_name), + call(find_peaks_output_name + '_unconstrained'), + call(fit_peaks_output_name + '_unconstrained_NormalisedCovarianceMatrix'), + call(fit_peaks_output_name + '_unconstrained_Parameters'), + call(fit_peaks_output_name + '_unconstrained_Workspace')]) self.assertEqual(output_ws, fit_peaks_output_name + '_Workspace') @patch('unpackaged.vesuvio_calibration.calibrate_vesuvio_uranium_martyn_MK5.DeleteWorkspace') @@ -933,14 +982,14 @@ def test_get_output_and_clean_workspaces_unconstrained_performed_and_selected(se alg = EVSCalibrationFit() find_peaks_output_name = 'test_find_output_name' fit_peaks_output_name = 'test_fit_output_name' - output_ws = alg._get_output_and_clean_workspaces(True, True, find_peaks_output_name, fit_peaks_output_name) + output_ws = alg._get_output_and_clean_workspaces(True, True, True, find_peaks_output_name, fit_peaks_output_name) mock_delete_ws.assert_has_calls([call(fit_peaks_output_name + '_NormalisedCovarianceMatrix'), - call(fit_peaks_output_name + '_Parameters'), - call(find_peaks_output_name), - call(fit_peaks_output_name + '_unconstrained' + '_NormalisedCovarianceMatrix'), - call(fit_peaks_output_name + '_unconstrained' + '_Parameters'), - call(find_peaks_output_name + '_unconstrained'), - call(fit_peaks_output_name + '_Workspace')]) + call(fit_peaks_output_name + '_Parameters'), + call(find_peaks_output_name), + call(find_peaks_output_name + '_unconstrained'), + call(fit_peaks_output_name + '_unconstrained_NormalisedCovarianceMatrix'), + call(fit_peaks_output_name + '_unconstrained_Parameters'), + call(fit_peaks_output_name + '_Workspace')]) self.assertEqual(output_ws, fit_peaks_output_name + '_unconstrained' + '_Workspace') def test_generate_column_headers(self): @@ -1145,7 +1194,7 @@ def test_fit_peaks_to_spectra_not_unconstrained_peaks_not_found(self): alg._get_find_peak_parameters.assert_called_once_with(spec_number, peak_estimates_list, False) alg._run_find_peaks.assert_called_once_with(workspace_index, find_peaks_output_name, 'find_peaks_params', False) alg._filter_and_fit_found_peaks.assert_not_called() - self.assertEqual(result, None) + self.assertEqual(result, False) @staticmethod def _setup_fit_bragg_peaks_mocks(fit_results, selected_params, output_workspaces): @@ -1191,8 +1240,8 @@ def test_fit_bragg_peaks_success(self, group_workspaces_mock): call(2, fit_result_ret_val, None)]) alg._output_params_to_table.assert_has_calls([call(1, 2, selected_params[0], 'output_Peak_Parameters'), call(2, 2, selected_params[1], 'output_Peak_Parameters')]) - alg._get_output_and_clean_workspaces.assert_has_calls([call(False, False, 'sample_peaks_table_1', 'output_Spec_1'), - call(False, False, 'sample_peaks_table_2', 'output_Spec_2')]) + alg._get_output_and_clean_workspaces.assert_has_calls([call(False, False, False, 'sample_peaks_table_1', 'output_Spec_1'), + call(False, False, False, 'sample_peaks_table_2', 'output_Spec_2')]) group_workspaces_mock.assert_called_once_with(','.join(output_workspaces), OutputWorkspace='output_Peak_Fits') @patch('unpackaged.vesuvio_calibration.calibrate_vesuvio_uranium_martyn_MK5.GroupWorkspaces') @@ -1217,8 +1266,8 @@ def test_fit_bragg_peaks_not_success(self, group_workspaces_mock): call(2, fit_res, rit_u_res)]) alg._output_params_to_table.assert_has_calls([call(1, 2, selected_params[0], 'output_Peak_Parameters'), call(2, 2, selected_params[1], 'output_Peak_Parameters')]) - alg._get_output_and_clean_workspaces.assert_has_calls([call(True, False, 'sample_peaks_table_1', 'output_Spec_1'), - call(True, False, 'sample_peaks_table_2', 'output_Spec_2')]) + alg._get_output_and_clean_workspaces.assert_has_calls([call(True, True, False, 'sample_peaks_table_1', 'output_Spec_1'), + call(True, True, False, 'sample_peaks_table_2', 'output_Spec_2')]) group_workspaces_mock.assert_called_once_with(','.join(output_workspaces), OutputWorkspace='output_Peak_Fits') diff --git a/unpackaged/vesuvio_calibration/tests/unit/test_calibrate_vesuvio_misc.py b/unpackaged/vesuvio_calibration/tests/unit/test_calibrate_vesuvio_misc.py index 6019ab8d..08d1269c 100644 --- a/unpackaged/vesuvio_calibration/tests/unit/test_calibrate_vesuvio_misc.py +++ b/unpackaged/vesuvio_calibration/tests/unit/test_calibrate_vesuvio_misc.py @@ -1,7 +1,8 @@ -from unpackaged.vesuvio_calibration.calibrate_vesuvio_uranium_martyn_MK5 import generate_fit_function_header +from unpackaged.vesuvio_calibration.calibrate_vesuvio_uranium_martyn_MK5 import generate_fit_function_header, identify_invalid_spectra from mock import MagicMock, patch import unittest +import numpy as np class TestVesuvioCalibrationMisc(unittest.TestCase): @@ -33,6 +34,53 @@ def test_generate_header_function_invalid(self): with self.assertRaises(ValueError): generate_fit_function_header("Invalid") + @patch('unpackaged.vesuvio_calibration.calibrate_vesuvio_uranium_martyn_MK5.mtd') + def test_identify_invalid_spectra_no_invalid(self, mock_mtd): + ws_mock = MagicMock() + mock_column_output_dict = {'f1.GaussianFWHM': [1, 2, 3], 'f1.GaussianFWHM_Err': [0.1, 0.2, 0.3], + 'f1.LorentzFWHM': [1.1, 2.1, 3.1], 'f1.LorentzFWHM_Err': [0.11, 0.21, 0.31], + 'f1.LorentzAmp': [1.2, 2.2, 3.2], 'f1.LorentzAmp_Err': [0.12, 0.22, 0.32]} + ws_mock.column.side_effect = lambda key: mock_column_output_dict[key] + mock_mtd.__getitem__.return_value = ws_mock + + np.testing.assert_equal(np.argwhere([]), identify_invalid_spectra('peak_table', [5, 10, 20], [0.1, 0.15, 0.2], [0, 2])) + + @patch('unpackaged.vesuvio_calibration.calibrate_vesuvio_uranium_martyn_MK5.mtd') + def test_identify_invalid_spectra_nan_in_errors(self, mock_mtd): + ws_mock = MagicMock() + mock_column_output_dict = {'f1.GaussianFWHM': [1, 2, 3], 'f1.GaussianFWHM_Err': [np.nan, 0.2, 0.3], + 'f1.LorentzFWHM': [1.1, 2.1, 3.1], 'f1.LorentzFWHM_Err': [0.11, 0.21, 0.31], + 'f1.LorentzAmp': [1.2, 2.2, 3.2], 'f1.LorentzAmp_Err': [0.12, 0.22, np.nan]} + ws_mock.column.side_effect = lambda key: mock_column_output_dict[key] + mock_mtd.__getitem__.return_value = ws_mock + + np.testing.assert_equal(np.argwhere(np.array([True, False, True])), + identify_invalid_spectra('peak_table', [5, 10, 20], [0.1, 0.15, 0.2], [0, 2])) + + @patch('unpackaged.vesuvio_calibration.calibrate_vesuvio_uranium_martyn_MK5.mtd') + def test_identify_invalid_spectra_inf_in_errors(self, mock_mtd): + ws_mock = MagicMock() + mock_column_output_dict = {'f1.GaussianFWHM': [1, 2, 3], 'f1.GaussianFWHM_Err': [0.1, 0.2, 0.3], + 'f1.LorentzFWHM': [1.1, 2.1, 3.1], 'f1.LorentzFWHM_Err': [np.inf, 0.21, 0.31], + 'f1.LorentzAmp': [1.2, 2.2, 3.2], 'f1.LorentzAmp_Err': [0.12, np.inf, 0.32]} + ws_mock.column.side_effect = lambda key: mock_column_output_dict[key] + mock_mtd.__getitem__.return_value = ws_mock + + np.testing.assert_equal(np.argwhere(np.array([True, True, False])), + identify_invalid_spectra('peak_table', [5, 10, 20], [0.1, 0.15, 0.2], [0, 2])) + + @patch('unpackaged.vesuvio_calibration.calibrate_vesuvio_uranium_martyn_MK5.mtd') + def test_identify_invalid_spectra_error_greater_than_peak(self, mock_mtd): + ws_mock = MagicMock() + mock_column_output_dict = {'f1.GaussianFWHM': [1, 2, 3], 'f1.GaussianFWHM_Err': [0.1, 0.2, 0.3], + 'f1.LorentzFWHM': [1.1, 2.1, 3.1], 'f1.LorentzFWHM_Err': [0.11, 10, 0.31], + 'f1.LorentzAmp': [1.2, 2.2, 3.2], 'f1.LorentzAmp_Err': [0.12, 0.22, 10]} + ws_mock.column.side_effect = lambda key: mock_column_output_dict[key] + mock_mtd.__getitem__.return_value = ws_mock + + np.testing.assert_equal(np.argwhere(np.array([False, True, True])), + identify_invalid_spectra('peak_table', [5, 10, 20], [0.1, 0.15, 0.2], [0, 2])) + if __name__ == '__main__': unittest.main()