Skip to content

Commit

Permalink
#52 sys.exit when amplitude is cut-off + #54 Enable defining amplitud…
Browse files Browse the repository at this point in the history
…e or voltage per element for IGT + #55 when timing parameter at lower level is set, set higer levels equal to value #55
  • Loading branch information
MaCuinea committed Jan 31, 2025
1 parent 5f229da commit 2ef9ed9
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 48 deletions.
11 changes: 7 additions & 4 deletions fus_ds_package/fus_driving_systems/igt/igt_ds.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,8 @@ def send_sequence(self, seq1, seq2=None):
n_pulse_train_rep = math.floor(seq1.pulse_train_rep_dur / seq1.pulse_train_rep_int)

# Apply ramping
if seq1.pulse_ramp_shape != config['General']['Ramp shape.rect'] and seq1.ampl > 0:
average_ampl = sum(seq1.ampl) / len(seq1.ampl)
if seq1.pulse_ramp_shape != config['General']['Ramp shape.rect'] and average_ampl > 0:
self._apply_ramping(seq1)
else:
self.gen.setPulseModulation([], 0, [], 0) # disable any modulation
Expand Down Expand Up @@ -336,8 +337,10 @@ def _define_two_seq_pulse(self, seq1, seq2):
freqs = []
ampls = []
for seq in [seq1, seq2]:

ampls = ampls + [seq.ampl] * seq.transducer.elements
if len(seq.ampl) == 1:
ampls = ampls + [seq.ampl] * seq.transducer.elements
else:
ampls = ampls + seq.ampl

oper_freq_hz = int(seq.oper_freq * 1e3)
tran_freq = [oper_freq_hz] * seq.transducer.elements
Expand Down Expand Up @@ -542,7 +545,7 @@ def _define_pulse(self, sequence):

# set same amplitude for all channels in percent (of max amplitude)
if sequence.ampl is not None:
pulse.setAmplitudes([sequence.ampl])
pulse.setAmplitudes(sequence.ampl)
else:
logger.error("Intensity parameter may be set incorrectly. Amplitude is None.")
sys.exit()
Expand Down
192 changes: 148 additions & 44 deletions fus_ds_package/fus_driving_systems/sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -723,8 +723,23 @@ def volt(self, volt):

# Check if pressure compensation is available for chosen equipment
if self._ds_tran_combo in self._equip_combos:
is_validated = validate_value(volt, 'Voltage [V] (volt)',
True, True, False, False)

check_list = True
if not isinstance(volt, list):
check_list = False
volt = [volt]

# Check if enough voltage entries are given
n_entries = len(volt)
if n_entries != self.driving_sys.n_elements and n_entries != 1:
sys.exit(f'Number of voltage entries ({n_entries}) does not correspond to ' +
'number of transducer elements ({self.driving_sys.n_elements}). Only' +
' enter one voltage value or n-values equal to the number of ' +
'transducer elements.')

is_validated = validate_value(volt, 'Voltage [V] (volt)', True, True, False, False,
True)

if is_validated:
self._volt = volt

Expand All @@ -733,24 +748,33 @@ def volt(self, volt):
# Convert required to amplitude
self._calc_ampl_using_volt()

# Calculate maximum pressure in free water for logging purposes
self._calc_press()

logger.info(f'New voltage value of {self._volt:.2f} [V] results in a maximum' +
f' pressure in free water of {self._press:.2f} [MPa] and an amplitude' +
f' of {self._ampl:.2f} [%].')
if not check_list:
# Calculate maximum pressure in free water for logging purposes
self._calc_press()

logger.info(f'New voltage value of {self._volt:.2f} [V] results in a maximum ' +
f'pressure in free water of {self._press:.2f} [MPa] and an ' +
f'amplitude of {self._ampl:.2f} [%].')
else:
logger.info('Pressure cannot be calculated when multiple voltages are given.')
logger.info(f'New voltage value of {self._volt:.2f} [V] results in an ' +
f'amplitude of {self._ampl:.2f} [%].')

elif self._driving_sys.manufact != config['Equipment.Manufacturer.IGT']['Name']:
logger.error('Voltage is not available as an input parameter for ' +
f'{self._driving_sys.name}.')
sys.exit()
else:
logger.error('No pressure compensation parameters available in the configuration' +
' file for chosen equipment combination. Enter amplitude [%].')
sys.exit()

@property
def ampl(self):
"""
Getter method for the amplitude.
Returns:
float: The amplitude [%] for IGT.
float: The amplitude array [%] for IGT: one value represents the value for all elements.
"""

return self._ampl
Expand All @@ -761,22 +785,37 @@ def ampl(self, ampl):
Setter method for the amplitude.
Parameters:
ampl (float): The amplitude [%] for IGT.
ampl (list(float)): The amplitude array [%] for IGT: one value represents the value
for all elements.
"""

# set other parameters determine the intensity to None
# set other parameters that determine the intensity to None
self._global_power = 0
self._ampl = 0

if self._driving_sys.manufact == config['Equipment.Manufacturer.IGT']['Name']:
self._chosen_power = config['General']['Power option.ampl']

check_list = True
if not isinstance(ampl, list):
check_list = False
ampl = [ampl]

# Check if enough amplitude entries are given
n_entries = len(ampl)
if n_entries != self.driving_sys.n_elements and n_entries != 1:
sys.exit(f'Number of amplitude entries ({n_entries}) does not correspond to ' +
'number of transducer elements ({self.driving_sys.n_elements}). Only' +
' enter one amplitude value or n-values equal to the number of ' +
'transducer elements.')

is_validated = validate_value(ampl, 'Amplitude [%] (ampl)',
True, True, False, False)
True, True, False, False, check_list)

if is_validated:
self._ampl = ampl

if self._ds_tran_combo in self._equip_combos:
if not check_list and self._ds_tran_combo in self._equip_combos:
# Convert amplitude to voltage for logging
self._calc_volt()

Expand All @@ -789,9 +828,9 @@ def ampl(self, ampl):

else:
# Equipment is not part a combination, so only set amplitude
logger.info('Chosen transducer - driving system combination ' +
'is not apart of configured combinations. ' +
'Pressure and voltage cannot be calculated.')
logger.info('Chosen transducer - driving system combination is not apart of ' +
'configured combinations or amplitude array is given. Pressure and ' +
'voltage cannot be calculated.')
else:
# Chosen system is not IGT.
if ampl > 0:
Expand Down Expand Up @@ -1353,6 +1392,13 @@ def pulse_dur(self, pulse_dur):
if is_validated:
self._timing_param['pulse_dur'] = pulse_dur

# Set other timing levels equal to new parameter to prevent higher levels being shorter
# than the lower levels when they are not set
self.pulse_rep_int = pulse_dur
self.pulse_train_dur = pulse_dur
self.pulse_train_rep_int = pulse_dur
self.pulse_train_rep_dur = pulse_dur * 1e3 # convert from ms to s

@property
def pulse_rep_int(self):
"""
Expand All @@ -1379,6 +1425,12 @@ def pulse_rep_int(self, pulse_rep_int):
if is_validated:
self._timing_param['pulse_rep_int'] = pulse_rep_int

# Set other timing levels equal to new parameter to prevent higher levels being shorter
# than the lower levels when they are not set
self.pulse_train_dur = pulse_rep_int
self.pulse_train_rep_int = pulse_rep_int
self.pulse_train_rep_dur = pulse_rep_int * 1e3 # convert from ms to s

def get_ramp_shapes(self):
"""
Returns a list of available ramp shapes for pulse modulation.
Expand Down Expand Up @@ -1467,11 +1519,11 @@ def pulse_train_dur(self, pulse_train_dur):
if is_validated:
self._timing_param['pulse_train_dur'] = pulse_train_dur

if self._driving_sys.manufact == config['Equipment.Manufacturer.SC']['Name']:
# SC doesn't have a pulse train repetition definition so set to None
# Set other timing levels equal to new parameter to prevent higher levels being shorter
# than the lower levels when they are not set
self.pulse_train_rep_int = pulse_train_dur
self.pulse_train_rep_dur = pulse_train_dur * 1e3 # convert from ms to s

self._timing_param['pulse_train_rep_int'] = None
self._timing_param['pulse_train_rep_dur'] = None
@property
def pulse_train_rep_int(self):
"""
Expand All @@ -1498,6 +1550,10 @@ def pulse_train_rep_int(self, pulse_train_rep_int):
if is_validated:
self._timing_param['pulse_train_rep_int'] = pulse_train_rep_int

# Set other timing levels equal to new parameter to prevent higher levels being shorter
# than the lower levels when they are not set
self.pulse_train_rep_dur = pulse_train_rep_int * 1e3 # convert from ms to s

@property
def pulse_train_rep_dur(self):
"""
Expand Down Expand Up @@ -1609,11 +1665,15 @@ def _calc_volt(self):
updated.
"""

# Prevent division by zero
if self.V2A_a == 0:
self._volt = 0
else:
self._volt = (self._ampl - self.V2A_b) / self.V2A_a
volt = []
for ampl in self._ampl:
# Prevent division by zero
if self.V2A_a == 0:
volt.append(0)
else:
volt.append((ampl - self.V2A_b) / self.V2A_a)

self._volt = volt

def _calc_ampl(self):
"""
Expand All @@ -1622,35 +1682,44 @@ def _calc_ampl(self):
"""

press_pa = self._press * 1e6 # convert to Pa
self._ampl = self.P2A_a * (press_pa * self._eq_factor) + self.P2A_b
if self._ampl > 100:
logger.warning(('Calculated amplitude exceeds 100%, so cut off the amplitude at 100% ' +
'and recalculate the pressure.'))
self._ampl = 100
calc_ampl = self.P2A_a * (press_pa * self._eq_factor) + self.P2A_b
if calc_ampl > 100:
self._ampl = [100]
self._calc_press()
self._calc_volt()
elif self._ampl < 0:

sys.exit('Calculated amplitude exceeds 100%. A pressure of {self._press:.2f} [MPa] ' +
'and/or a voltage of {self._volt:.2f} [V] will result in an amplitude of ' +
'100%.')
elif calc_ampl < 0:
logger.warning(('Calculated amplitude below 0%, so cut off the amplitude at 0% and ' +
'recalculate the pressure.'))
self._ampl = 0
self._ampl = [0]
self._calc_press()
self._calc_volt()

else:
self._ampl = [calc_ampl]

def _calc_ampl_using_volt(self):
"""
Calculate voltage [V] vs. amplitude [%] equation (A = a*V + b) when voltage is
updated.
"""

self._ampl = self.V2A_a * self._volt + self.V2A_b
ampl = []
for volt in self._volt:
ampl.append(self.V2A_a * self._volt + self.V2A_b)

self._ampl = ampl

def _calc_press(self):
"""
Calculate pressure [Pa] vs. amplitude [%] equation (P = (A - b)/(a*EQF)) when amplitude is
updated.
"""

press_pa = (self._ampl - self.P2A_b) / (self.P2A_a * self._eq_factor)
press_pa = (self._ampl[0] - self.P2A_b) / (self.P2A_a * self._eq_factor)
press_mpa = press_pa * 1e-6 # convert to MPa

max_press = float(config['General']['Maximum pressure allowed in free water [MPa]'])
Expand All @@ -1663,7 +1732,8 @@ def _calc_press(self):
self._press = press_mpa # convert to MPa


def validate_value(value, input_param, check_num, check_pos, check_nonzero, check_bool):
def validate_value(value, input_param, check_num, check_pos, check_nonzero, check_bool,
check_list=False):
"""
Validates `value` based on specified checks, logs errors if conditions are not met, and exits
if validation fails.
Expand All @@ -1675,26 +1745,60 @@ def validate_value(value, input_param, check_num, check_pos, check_nonzero, chec
check_pos (bool): Ensures value is non-negative.
check_nonzero (bool): Ensures value is not zero.
check_bool (bool): Checks if value is a boolean.
check_list (bool): Checks if value is a list.
Returns:
bool: True if all checks pass; otherwise, logs errors and exits.
"""

val_messages = []

if check_nonzero and value == 0:
val_messages.append(f'{input_param} is not allowed to be zero.')
if check_num and not isinstance(value, (int, float)):
val_messages.append(f'{input_param} should be a number.')
if check_pos and value < 0:
val_messages.append(f'{input_param} is not allowed to be negative.')
if check_list:
if isinstance(value, list):
for item in value:
input_name = 'Items of ' + input_param
val_messages = _check_parameter(val_messages, item, input_name, check_nonzero,
check_num, check_pos, check_bool)

if check_bool and not isinstance(value, bool):
val_messages.append(f'{input_param} should be a boolean.')
else:
val_messages.append(f'{input_param} should be a list.')
else:
val_messages = _check_parameter(val_messages, value, input_name, check_nonzero, check_num,
check_pos, check_bool)

if val_messages:
for message in val_messages:
logger.error(message)
sys.exit()

return True


def _check_parameter(val_messages, value, input_name, check_nonzero, check_num, check_pos,
check_bool):
"""
Checks a single value against specified conditions and appends error messages if any checks
fail.
Parameters:
val_messages (list): List to append error messages to.
value (any): The value to check.
input_name (str): Name of the parameter, used in error messages.
check_nonzero (bool): Ensures value is not zero.
check_num (bool): Checks if value is a number.
check_pos (bool): Ensures value is non-negative.
check_bool (bool): Checks if value is a boolean.
Returns:
list: The updated list of error messages.
"""

if check_nonzero and value == 0:
val_messages.append(f'{input_name} is not allowed to be zero.')
if check_num and not isinstance(value, (int, float)):
val_messages.append(f'{input_name} should be a number.')
if check_pos and value < 0:
val_messages.append(f'{input_name} is not allowed to be negative.')
if check_bool and not isinstance(value, bool):
val_messages.append(f'{input_name} should be a boolean.')
return val_messages

0 comments on commit 2ef9ed9

Please sign in to comment.