Skip to content

Add stim demultiplex rhs intan #1660

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Apr 1, 2025
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 119 additions & 8 deletions neo/rawio/intanrawio.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ def _get_analogsignal_chunk_header_attached(self, i_start, i_stop, stream_index,

stream_name = self.header["signal_streams"][stream_index]["name"][:]
stream_is_digital = stream_name in digital_stream_names
stream_is_stim = stream_name == "Stim channel"

field_name = stream_name if stream_is_digital else channel_ids[0]

Expand All @@ -460,17 +461,22 @@ def _get_analogsignal_chunk_header_attached(self, i_start, i_stop, stream_index,
sl0 = i_start % block_size
sl1 = sl0 + (i_stop - i_start)

# For all streams raw_data is a structured memmap with a field for each channel_id
if not stream_is_digital:
sigs_chunk = np.zeros((i_stop - i_start, len(channel_ids)), dtype=dtype)

# For all streams raw_data is a structured memmap with a field for each channel_id
sigs_chunk = np.zeros((i_stop - i_start, len(channel_ids)), dtype=dtype)
for chunk_index, channel_id in enumerate(channel_ids):
data_chan = self._raw_data[channel_id]

if multiple_samples_per_block:
sigs_chunk[:, chunk_index] = data_chan[block_start:block_stop].flatten()[sl0:sl1]
else:
sigs_chunk[:, chunk_index] = data_chan[i_start:i_stop]
else: # For digital data the channels come interleaved in a single field so we need to demultiplex

if stream_is_stim:
sigs_chunk = self._decode_current_from_stim_data(sigs_chunk, 0, sigs_chunk.shape[0])

else:
# For digital data the channels come interleaved in a single field so we need to demultiplex
digital_raw_data = self._raw_data[field_name].flatten()
sigs_chunk = self._demultiplex_digital_data(digital_raw_data, channel_ids, i_start, i_stop)
return sigs_chunk
Expand All @@ -479,6 +485,8 @@ def _get_analogsignal_chunk_one_file_per_channel(self, i_start, i_stop, stream_i

stream_name = self.header["signal_streams"][stream_index]["name"][:]
signal_data_memmap_list = self._raw_data[stream_name]
stream_is_stim = stream_name == "Stim channel"

channel_indexes_are_slice = isinstance(channel_indexes, slice)
if channel_indexes_are_slice:
num_channels = len(signal_data_memmap_list)
Expand All @@ -496,6 +504,10 @@ def _get_analogsignal_chunk_one_file_per_channel(self, i_start, i_stop, stream_i
channel_memmap = signal_data_memmap_list[channel_index]
sigs_chunk[:, chunk_index] = channel_memmap[i_start:i_stop]

# If this is stim data, we need to decode the current values
if stream_is_stim:
sigs_chunk = self._decode_current_from_stim_data(sigs_chunk, 0, sigs_chunk.shape[0])

return sigs_chunk

def _get_analogsignal_chunk_one_file_per_signal(self, i_start, i_stop, stream_index, channel_indexes):
Expand All @@ -504,21 +516,57 @@ def _get_analogsignal_chunk_one_file_per_signal(self, i_start, i_stop, stream_in
raw_data = self._raw_data[stream_name]

stream_is_digital = stream_name in digital_stream_names
stream_is_stim = stream_name == "Stim channel"

if stream_is_digital:
stream_id = self.header["signal_streams"][stream_index]["id"]
mask = self.header["signal_channels"]["stream_id"] == stream_id
signal_channels = self.header["signal_channels"][mask]
channel_ids = signal_channels["id"][channel_indexes]

output = self._demultiplex_digital_data(raw_data, channel_ids, i_start, i_stop)

elif stream_is_stim:
output = self._decode_current_from_stim_data(raw_data, i_start, i_stop)
output = output[:, channel_indexes]
else:
output = raw_data[i_start:i_stop, channel_indexes]

return output

def _demultiplex_digital_data(self, raw_digital_data, channel_ids, i_start, i_stop):
"""
Demultiplex digital data by extracting individual channel values from packed 16-bit format.

According to the Intan format, digital input/output data is stored with all 16 channels
encoded bit-by-bit in each 16-bit word. This method extracts the specified digital channels
from the packed format into separate uint16 arrays of 0 and 1.

Parameters
----------
raw_digital_data : ndarray
Raw digital data in packed 16-bit format where each bit represents a different channel.
channel_ids : list or array
List of channel identifiers to extract. Each channel_id must correspond to a digital
input or output channel.
i_start : int
Starting sample index for demultiplexing.
i_stop : int
Ending sample index for demultiplexing (exclusive).

Returns
-------
ndarray
Demultiplexed digital data with shape (i_stop-i_start, len(channel_ids)),
containing 0 or 1 values for each requested channel.

Notes
-----
In the Intan format, digital channels are packed into 16-bit words where each bit position
corresponds to a specific channel number. For example, with digital inputs 0, 4, and 5
set high and the rest low, the 16-bit word would be 2^0 + 2^4 + 2^5 = 1 + 16 + 32 = 49.

The native_order property for each channel corresponds to its bit position in the packed word.
"""
dtype = np.uint16 # We fix this to match the memmap dtype
output = np.zeros((i_stop - i_start, len(channel_ids)), dtype=dtype)

Expand All @@ -530,6 +578,65 @@ def _demultiplex_digital_data(self, raw_digital_data, channel_ids, i_start, i_st

return output

def _decode_current_from_stim_data(self, raw_stim_data, i_start, i_stop):
"""
Demultiplex stimulation data by extracting current values from packed 16-bit format.

According to the Intan RHS data format, stimulation current is stored in the lower 9 bits
of each 16-bit word: 8 bits for magnitude and 1 bit for sign. The upper bits contain
flags for compliance limit, charge recovery, and amplifier settle.

Parameters
----------
raw_stim_data : ndarray
Raw stimulation data in packed 16-bit format.
i_start : int
Starting sample index for demultiplexing.
i_stop : int
Ending sample index for demultiplexing (exclusive).

Returns
-------
ndarray
Demultiplexed stimulation current values in amperes, preserving the original
array dimensions. The output values need to be multiplied by the stim_step_size
parameter (from header) to obtain the actual current in amperes.

Notes
-----
Bit structure of each 16-bit stimulation word:
- Bits 0-7: Current magnitude
- Bit 8: Sign bit (1 = negative current)
- Bits 9-13: Unused (always zero)
- Bit 14: Amplifier settle flag (1 = activated)
- Bit 15: Charge recovery flag (1 = activated)
- Bit 16 (MSB): Compliance limit flag (1 = limit reached)

The actual current value in amperes is obtained by multiplying the
output by the 'stim_step_size' parameter from the file header. These scaled values can be
obtained with the `rescale_signal_raw_to_float` function.
"""
# Get the relevant portion of the data
data = raw_stim_data[i_start:i_stop]

# Extract current value (bits 0-8)
magnitude = np.bitwise_and(data, 0xFF)
sign_bit = np.bitwise_and(np.right_shift(data, 8), 0x01) # Shift right by 8 bits to get the sign bit

# Apply sign to current values
# We need to cast to int16 to handle negative values correctly
# The max value of 8 bits is 255 so the casting is safe as there are non-negative values
magnitude = magnitude.astype(np.int16)
current = np.where(sign_bit == 1, -magnitude, magnitude)

# Note: If needed, other flag bits could be extracted as follows:
# compliance_flag = np.bitwise_and(np.right_shift(data, 15), 0x01).astype(bool) # Bit 16 (MSB)
# charge_recovery_flag = np.bitwise_and(np.right_shift(data, 14), 0x01).astype(bool) # Bit 15
# amp_settle_flag = np.bitwise_and(np.right_shift(data, 13), 0x01).astype(bool) # Bit 14
# These could be returned as a structured array or dictionary if needed

return current

def get_intan_timestamps(self, i_start=None, i_stop=None):
"""
Retrieves the sample indices from the Intan raw data within a specified range.
Expand Down Expand Up @@ -856,12 +963,16 @@ def read_rhs(filename, file_format: str):
chan_info_stim["sampling_rate"] = sr
# stim channel are complicated because they are coded
# with bits, they do not fit the gain/offset rawio strategy
chan_info_stim["units"] = ""
chan_info_stim["gain"] = 1.0
chan_info_stim["units"] = "A" # Amps
chan_info_stim["gain"] = global_info["stim_step_size"]
chan_info_stim["offset"] = 0.0
chan_info_stim["signal_type"] = 11 # put it in another group
chan_info_stim["dtype"] = "uint16"
chan_info_stim["dtype"] = "int16"
ordered_channel_info.append(chan_info_stim)

# Note that the data on disk is uint16 but the data is
# then decoded as int16 so the chan_info is int16
memmap_dtype = "uint16"
if file_format == "header-attached":
memmap_data_dtype += [(name + "_STIM", "uint16", BLOCK_SIZE)]
else:
Expand Down
1 change: 1 addition & 0 deletions neo/test/iotest/test_intanio.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class TestIntanIO(
"intan/intan_fpc_rhs_test_240329_091637/info.rhs", # Format one-file-per-channel
"intan/intan_fps_rhs_test_240329_091536/info.rhs", # Format one-file-per-signal
"intan/rhd_fpc_multistim_240514_082044/info.rhd", # Multiple digital channels one-file-per-channel rhd
"intan/rhs_stim_data_single_file_format/intanTestFile.rhs", # header-attached rhs data with stimulus current
]


Expand Down
51 changes: 51 additions & 0 deletions neo/test/rawiotest/test_intanrawio.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class TestIntanRawIO(
"intan/intan_fpc_rhs_test_240329_091637/info.rhs", # Format one-file-per-channel
"intan/intan_fps_rhs_test_240329_091536/info.rhs", # Format one-file-per-signal
"intan/rhd_fpc_multistim_240514_082044/info.rhd", # Multiple digital channels one-file-per-channel rhd
"intan/rhs_stim_data_single_file_format/intanTestFile.rhs", # header-attached rhs data with stimulus current
]

def test_annotations(self):
Expand Down Expand Up @@ -87,5 +88,55 @@ def test_correct_reading_one_file_per_channel(self):
np.testing.assert_allclose(data_raw, data_from_neo)


def test_correct_decoding_of_stimulus_current(self):
# See https://github.com/NeuralEnsemble/python-neo/pull/1660 for discussion
# See https://gin.g-node.org/NeuralEnsemble/ephy_testing_data/src/master/intan/README.md#rhs_stim_data_single_file_format
# For a description of the data

file_path = Path(self.get_local_path("intan/rhs_stim_data_single_file_format/intanTestFile.rhs"))
intan_reader = IntanRawIO(filename=file_path)
intan_reader.parse_header()

signal_streams = intan_reader.header['signal_streams']
stream_ids = signal_streams['id'].tolist()
stream_index = stream_ids.index('11')
sampling_rate = intan_reader.get_signal_sampling_rate(stream_index=stream_index)
sig_chunk = intan_reader.get_analogsignal_chunk(stream_index=stream_index, channel_ids=["D-016_STIM"])
final_stim = intan_reader.rescale_signal_raw_to_float(sig_chunk, stream_index=stream_index, channel_ids=["D-016_STIM"])

# This contains only the first pulse and I got this by visual inspection
data_to_test = final_stim[200:250]

positive_pulse_size = np.max(data_to_test).item()
negative_pulse_size = np.min(data_to_test).item()

expected_value = 60 * 1e-6# 60 microamperes

# Assert is close float
assert np.isclose(positive_pulse_size, expected_value)
assert np.isclose(negative_pulse_size, -expected_value)

# Check that negative pulse is leading
argmin = np.argmin(data_to_test)
argmax = np.argmax(data_to_test)
assert argmin < argmax

# Check that the negative pulse is 200 us long
negative_pulse_frames = np.where(data_to_test > 0)[0]
number_of_negative_frames = negative_pulse_frames.size
duration_of_negative_pulse = number_of_negative_frames / sampling_rate

expected_duration = 200 * 1e-6 # 400 microseconds / 2
assert np.isclose(duration_of_negative_pulse, expected_duration)

# Check that the positive pulse is 200 us long
positive_pulse_frames = np.where(data_to_test > 0)[0]
number_of_positive_frames = positive_pulse_frames.size
duration_of_positive_pulse = number_of_positive_frames / sampling_rate
expected_duration = 200 * 1e-6 # 400 microseconds / 2

assert np.isclose(duration_of_positive_pulse, expected_duration)


if __name__ == "__main__":
unittest.main()
Loading