Skip to content

Add stim demultiplex rhs intan #1660

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Apr 1, 2025
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 162 additions & 40 deletions neo/rawio/intanrawio.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,12 @@ class IntanRawIO(BaseRawIO):
'one-file-per-channel' which have a header file called 'info.rhd' or 'info.rhs' and a series
of binary files with the '.dat' suffix

* The reader can handle three file formats 'header-attached', 'one-file-per-signal' and
'one-file-per-channel'.

* Intan files contain amplifier channels labeled 'A', 'B' 'C' or 'D'
depending on the port in which they were recorded along with the following
* Intan files contain amplifier channels labeled 'A', 'B' 'C' or 'D' for the 512 recorder
or 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H' for the 1024 recorder system
depending on the port in which they were recorded along (stored in stream_id '0') with the following
additional streams.

0: 'RHD2000' amplifier channel
0: 'RHD2000 amplifier channel'
1: 'RHD2000 auxiliary input channel',
2: 'RHD2000 supply voltage channel',
3: 'USB board ADC input channel',
Expand All @@ -87,9 +85,11 @@ class IntanRawIO(BaseRawIO):
10: 'DC Amplifier channel',
11: 'Stim channel',

* For the "header-attached" and "one-file-per-signal" formats, the structure of the digital input and output channels is
one long vector, which must be post-processed to extract individual digital channel information.
See the intantech website for more information on performing this post-processing.
* We currently implement digital data demultiplexing so that if digital streams are requested they are
returned as arrays of 1s and 0s.

* We also do stim data decoding which returns the stim data as an int16 of appropriate magnitude. Please
use `rescale_signal_raw_to_float` to obtain stim data in amperes.


Examples
Expand Down Expand Up @@ -442,6 +442,7 @@ def _get_analogsignal_chunk_header_attached(self, i_start, i_stop, stream_index,

stream_name = self.header["signal_streams"][stream_index]["name"][:]
stream_is_digital = stream_name in digital_stream_names
stream_is_stim = stream_name == "Stim channel"

field_name = stream_name if stream_is_digital else channel_ids[0]

Expand All @@ -460,17 +461,22 @@ def _get_analogsignal_chunk_header_attached(self, i_start, i_stop, stream_index,
sl0 = i_start % block_size
sl1 = sl0 + (i_stop - i_start)

# For all streams raw_data is a structured memmap with a field for each channel_id
if not stream_is_digital:
# For all streams raw_data is a structured memmap with a field for each channel_id
sigs_chunk = np.zeros((i_stop - i_start, len(channel_ids)), dtype=dtype)

for chunk_index, channel_id in enumerate(channel_ids):
data_chan = self._raw_data[channel_id]

if multiple_samples_per_block:
sigs_chunk[:, chunk_index] = data_chan[block_start:block_stop].flatten()[sl0:sl1]
else:
sigs_chunk[:, chunk_index] = data_chan[i_start:i_stop]
else: # For digital data the channels come interleaved in a single field so we need to demultiplex

if stream_is_stim:
sigs_chunk = self._decode_current_from_stim_data(sigs_chunk, 0, sigs_chunk.shape[0])

else:
# For digital data the channels come interleaved in a single field so we need to demultiplex
digital_raw_data = self._raw_data[field_name].flatten()
sigs_chunk = self._demultiplex_digital_data(digital_raw_data, channel_ids, i_start, i_stop)
return sigs_chunk
Expand All @@ -479,6 +485,8 @@ def _get_analogsignal_chunk_one_file_per_channel(self, i_start, i_stop, stream_i

stream_name = self.header["signal_streams"][stream_index]["name"][:]
signal_data_memmap_list = self._raw_data[stream_name]
stream_is_stim = stream_name == "Stim channel"

channel_indexes_are_slice = isinstance(channel_indexes, slice)
if channel_indexes_are_slice:
num_channels = len(signal_data_memmap_list)
Expand All @@ -496,6 +504,10 @@ def _get_analogsignal_chunk_one_file_per_channel(self, i_start, i_stop, stream_i
channel_memmap = signal_data_memmap_list[channel_index]
sigs_chunk[:, chunk_index] = channel_memmap[i_start:i_stop]

# If this is stim data, we need to decode the current values
if stream_is_stim:
sigs_chunk = self._decode_current_from_stim_data(sigs_chunk, 0, sigs_chunk.shape[0])

return sigs_chunk

def _get_analogsignal_chunk_one_file_per_signal(self, i_start, i_stop, stream_index, channel_indexes):
Expand All @@ -504,21 +516,57 @@ def _get_analogsignal_chunk_one_file_per_signal(self, i_start, i_stop, stream_in
raw_data = self._raw_data[stream_name]

stream_is_digital = stream_name in digital_stream_names
stream_is_stim = stream_name == "Stim channel"

if stream_is_digital:
stream_id = self.header["signal_streams"][stream_index]["id"]
mask = self.header["signal_channels"]["stream_id"] == stream_id
signal_channels = self.header["signal_channels"][mask]
channel_ids = signal_channels["id"][channel_indexes]

output = self._demultiplex_digital_data(raw_data, channel_ids, i_start, i_stop)

elif stream_is_stim:
output = self._decode_current_from_stim_data(raw_data, i_start, i_stop)
output = output[:, channel_indexes]
else:
output = raw_data[i_start:i_stop, channel_indexes]

return output

def _demultiplex_digital_data(self, raw_digital_data, channel_ids, i_start, i_stop):
"""
Demultiplex digital data by extracting individual channel values from packed 16-bit format.

According to the Intan format, digital input/output data is stored with all 16 channels
encoded bit-by-bit in each 16-bit word. This method extracts the specified digital channels
from the packed format into separate uint16 arrays of 0 and 1.

Parameters
----------
raw_digital_data : ndarray
Raw digital data in packed 16-bit format where each bit represents a different channel.
channel_ids : list or array
List of channel identifiers to extract. Each channel_id must correspond to a digital
input or output channel.
i_start : int
Starting sample index for demultiplexing.
i_stop : int
Ending sample index for demultiplexing (exclusive).

Returns
-------
ndarray
Demultiplexed digital data with shape (i_stop-i_start, len(channel_ids)),
containing 0 or 1 values for each requested channel.

Notes
-----
In the Intan format, digital channels are packed into 16-bit words where each bit position
corresponds to a specific channel number. For example, with digital inputs 0, 4, and 5
set high and the rest low, the 16-bit word would be 2^0 + 2^4 + 2^5 = 1 + 16 + 32 = 49.

The native_order property for each channel corresponds to its bit position in the packed word.
"""
dtype = np.uint16 # We fix this to match the memmap dtype
output = np.zeros((i_stop - i_start, len(channel_ids)), dtype=dtype)

Expand All @@ -530,6 +578,65 @@ def _demultiplex_digital_data(self, raw_digital_data, channel_ids, i_start, i_st

return output

def _decode_current_from_stim_data(self, raw_stim_data, i_start, i_stop):
"""
Demultiplex stimulation data by extracting current values from packed 16-bit format.

According to the Intan RHS data format, stimulation current is stored in the lower 9 bits
of each 16-bit word: 8 bits for magnitude and 1 bit for sign. The upper bits contain
flags for compliance limit, charge recovery, and amplifier settle.

Parameters
----------
raw_stim_data : ndarray
Raw stimulation data in packed 16-bit format.
i_start : int
Starting sample index for demultiplexing.
i_stop : int
Ending sample index for demultiplexing (exclusive).

Returns
-------
ndarray
Demultiplexed stimulation current values in amperes, preserving the original
array dimensions. The output values need to be multiplied by the stim_step_size
parameter (from header) to obtain the actual current in amperes.

Notes
-----
Bit structure of each 16-bit stimulation word:
- Bits 0-7: Current magnitude
- Bit 8: Sign bit (1 = negative current)
- Bits 9-13: Unused (always zero)
- Bit 14: Amplifier settle flag (1 = activated)
- Bit 15: Charge recovery flag (1 = activated)
- Bit 16 (MSB): Compliance limit flag (1 = limit reached)

The actual current value in amperes is obtained by multiplying the
output by the 'stim_step_size' parameter from the file header. These scaled values can be
obtained with the `rescale_signal_raw_to_float` function.
"""
# Get the relevant portion of the data
data = raw_stim_data[i_start:i_stop]

# Extract current value (bits 0-8)
magnitude = np.bitwise_and(data, 0xFF)
sign_bit = np.bitwise_and(np.right_shift(data, 8), 0x01) # Shift right by 8 bits to get the sign bit

# Apply sign to current values
# We need to cast to int16 to handle negative values correctly
# The max value of 8 bits is 255 so the casting is safe as there are non-negative values
magnitude = magnitude.astype(np.int16)
current = np.where(sign_bit == 1, -magnitude, magnitude)

# Note: If needed, other flag bits could be extracted as follows:
# compliance_flag = np.bitwise_and(np.right_shift(data, 15), 0x01).astype(bool) # Bit 16 (MSB)
# charge_recovery_flag = np.bitwise_and(np.right_shift(data, 14), 0x01).astype(bool) # Bit 15
# amp_settle_flag = np.bitwise_and(np.right_shift(data, 13), 0x01).astype(bool) # Bit 14
# These could be returned as a structured array or dictionary if needed

return current

def get_intan_timestamps(self, i_start=None, i_stop=None):
"""
Retrieves the sample indices from the Intan raw data within a specified range.
Expand Down Expand Up @@ -793,8 +900,18 @@ def read_rhs(filename, file_format: str):
channel_number_dict = {name: len(stream_name_to_channel_info_list[name]) for name in names_to_count}

# Both DC Amplifier and Stim streams have the same number of channels as the amplifier stream
# if using the `header-attached` or `one-file-per-signal` formats
# the amplifier data is stored in the same place in the header so no matter what the DC amp
# uses the same info as the RHS amp.
channel_number_dict["DC Amplifier channel"] = channel_number_dict["RHS2000 amplifier channel"]
channel_number_dict["Stim channel"] = channel_number_dict["RHS2000 amplifier channel"]
if file_format != "one-file-per-channel":
channel_number_dict["Stim channel"] = channel_number_dict["RHS2000 amplifier channel"]
raw_file_paths_dict = create_one_file_per_signal_dict_rhs(dirname=filename.parent)
else:
raw_file_paths_dict = create_one_file_per_channel_dict_rhs(dirname=filename.parent)
channel_number_dict["Stim channel"] = len(raw_file_paths_dict["Stim channel"])
# but the user can shut off the normal amplifier and only save dc amplifier
channel_number_dict["RHS2000 amplifier channel"] = len(raw_file_paths_dict["RHS2000 amplifier channel"])
Comment on lines +911 to +914
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@h-mayorquin does this make sense or can you think of a better way of doing this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the context. Why are we doing this? I re-read

        # if using the `header-attached` or `one-file-per-signal` formats
        # the amplifier data is stored in the same place in the header so no matter what the DC amp
        # uses the same info as the RHS amp

There is something strange about it and makes me confused.


header_size = f.tell()

Expand All @@ -808,24 +925,25 @@ def read_rhs(filename, file_format: str):
memmap_data_dtype["timestamp"] = "int32"
channel_number_dict["timestamp"] = 1

for chan_info in stream_name_to_channel_info_list["RHS2000 amplifier channel"]:
chan_info["sampling_rate"] = sr
chan_info["units"] = "uV"
chan_info["gain"] = 0.195
if file_format == "header-attached":
chan_info["offset"] = -32768 * 0.195
else:
chan_info["offset"] = 0.0
if file_format == "header-attached":
chan_info["dtype"] = "uint16"
else:
chan_info["dtype"] = "int16"
ordered_channel_info.append(chan_info)
if file_format == "header-attached":
name = chan_info["native_channel_name"]
memmap_data_dtype += [(name, "uint16", BLOCK_SIZE)]
else:
memmap_data_dtype["RHS2000 amplifier channel"] = "int16"
if file_format != "one-file-per-channel" or channel_number_dict["RHS2000 amplifier channel"] > 0:
for chan_info in stream_name_to_channel_info_list["RHS2000 amplifier channel"]:
Comment on lines +928 to +929
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only change here (the diff is wrong). Basically if the user shut off saving of files in the file-per-channel we need to skip this section. Does this make sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@h-mayorquin you want to push your last change with our patch here? Then I merged. We can add the rhs channel test either now or in a new PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merge it like this, my comments are a mess.

chan_info["sampling_rate"] = sr
chan_info["units"] = "uV"
chan_info["gain"] = 0.195
if file_format == "header-attached":
chan_info["offset"] = -32768 * 0.195
else:
chan_info["offset"] = 0.0
if file_format == "header-attached":
chan_info["dtype"] = "uint16"
else:
chan_info["dtype"] = "int16"
ordered_channel_info.append(chan_info)
if file_format == "header-attached":
name = chan_info["native_channel_name"]
memmap_data_dtype += [(name, "uint16", BLOCK_SIZE)]
else:
memmap_data_dtype["RHS2000 amplifier channel"] = "int16"

if bool(global_info["dc_amplifier_data_saved"]):
# if we have dc amp we need to grab the correct number of channels
Expand All @@ -847,27 +965,31 @@ def read_rhs(filename, file_format: str):
memmap_data_dtype["DC Amplifier channel"] = "uint16"

# I can't seem to get stim files to generate for one-file-per-channel
# so let's skip for now and can be given on request
if file_format != "one-file-per-channel":
for chan_info in stream_name_to_channel_info_list["RHS2000 amplifier channel"]:
# so ideally at some point we need test data to confirm this is true
# based on what Heberto and I read in the docs
for chan_info in stream_name_to_channel_info_list["RHS2000 amplifier channel"]:
# we see which stim were activated
if file_format != "one-file-per-channel" or any(
[chan_info["native_channel_name"] in stim_file.stem for stim_file in raw_file_paths_dict["Stim channel"]]
):
Comment on lines +972 to +974
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this we check if these exist and if they do we analyze. This was my hacky way of doing this (since the user can choose individual channels to stim (ie stim files only exist for a subset of amp channels). Check this for me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also fine if you want to do your reorg for this if statement before we merge :)

Copy link
Contributor Author

@h-mayorquin h-mayorquin Apr 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Na, same thing, this is working, I am afraid of breaking something and I want to be sure we have some sort of support on the next release.

chan_info_stim = dict(chan_info)
name = chan_info["native_channel_name"]
chan_info_stim["native_channel_name"] = name + "_STIM"
chan_info_stim["sampling_rate"] = sr
# stim channel are complicated because they are coded
# with bits, they do not fit the gain/offset rawio strategy
chan_info_stim["units"] = ""
chan_info_stim["gain"] = 1.0
chan_info_stim["units"] = "A" # Amps
chan_info_stim["gain"] = global_info["stim_step_size"]
chan_info_stim["offset"] = 0.0
chan_info_stim["signal_type"] = 11 # put it in another group
chan_info_stim["dtype"] = "uint16"
chan_info_stim["dtype"] = "int16" # this change is due to bit decoding see note below
ordered_channel_info.append(chan_info_stim)
# Note that the data on disk is uint16 but the data is
# then decoded as int16 so the chan_info is int16
if file_format == "header-attached":
memmap_data_dtype += [(name + "_STIM", "uint16", BLOCK_SIZE)]
else:
memmap_data_dtype["Stim channel"] = "uint16"
else:
warnings.warn("Stim not implemented for `one-file-per-channel` due to lack of test files")

# No supply or aux for rhs files (ie no stream_id 1 and 2)
# We have an error above that requests test files to help if the spec is changed
Expand Down
3 changes: 3 additions & 0 deletions neo/test/iotest/test_intanio.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ class TestIntanIO(
"intan/intan_fpc_rhs_test_240329_091637/info.rhs", # Format one-file-per-channel
"intan/intan_fps_rhs_test_240329_091536/info.rhs", # Format one-file-per-signal
"intan/rhd_fpc_multistim_240514_082044/info.rhd", # Multiple digital channels one-file-per-channel rhd
"intan/rhs_stim_data_single_file_format/intanTestFile.rhs", # header-attached rhs data with stimulus current
"intan/test_fcs_dc_250327_154333/info.rhs", # this is an example of only having dc amp rather than amp files
#"intan/test_fpc_stim_250327_151617/info.rhs", # wrong files Heberto will fix
]


Expand Down
53 changes: 53 additions & 0 deletions neo/test/rawiotest/test_intanrawio.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ class TestIntanRawIO(
"intan/intan_fpc_rhs_test_240329_091637/info.rhs", # Format one-file-per-channel
"intan/intan_fps_rhs_test_240329_091536/info.rhs", # Format one-file-per-signal
"intan/rhd_fpc_multistim_240514_082044/info.rhd", # Multiple digital channels one-file-per-channel rhd
"intan/rhs_stim_data_single_file_format/intanTestFile.rhs", # header-attached rhs data with stimulus current
"intan/test_fcs_dc_250327_154333/info.rhs", # this is an example of only having dc amp rather than amp files
#"intan/test_fpc_stim_250327_151617/info.rhs", # wrong files Heberto will fix
]

def test_annotations(self):
Expand Down Expand Up @@ -87,5 +90,55 @@ def test_correct_reading_one_file_per_channel(self):
np.testing.assert_allclose(data_raw, data_from_neo)


def test_correct_decoding_of_stimulus_current(self):
# See https://github.com/NeuralEnsemble/python-neo/pull/1660 for discussion
# See https://gin.g-node.org/NeuralEnsemble/ephy_testing_data/src/master/intan/README.md#rhs_stim_data_single_file_format
# For a description of the data

file_path = Path(self.get_local_path("intan/rhs_stim_data_single_file_format/intanTestFile.rhs"))
intan_reader = IntanRawIO(filename=file_path)
intan_reader.parse_header()

signal_streams = intan_reader.header['signal_streams']
stream_ids = signal_streams['id'].tolist()
stream_index = stream_ids.index('11')
sampling_rate = intan_reader.get_signal_sampling_rate(stream_index=stream_index)
sig_chunk = intan_reader.get_analogsignal_chunk(stream_index=stream_index, channel_ids=["D-016_STIM"])
final_stim = intan_reader.rescale_signal_raw_to_float(sig_chunk, stream_index=stream_index, channel_ids=["D-016_STIM"])

# This contains only the first pulse and I got this by visual inspection
data_to_test = final_stim[200:250]

positive_pulse_size = np.max(data_to_test).item()
negative_pulse_size = np.min(data_to_test).item()

expected_value = 60 * 1e-6# 60 microamperes

# Assert is close float
assert np.isclose(positive_pulse_size, expected_value)
assert np.isclose(negative_pulse_size, -expected_value)

# Check that negative pulse is leading
argmin = np.argmin(data_to_test)
argmax = np.argmax(data_to_test)
assert argmin < argmax

# Check that the negative pulse is 200 us long
negative_pulse_frames = np.where(data_to_test > 0)[0]
number_of_negative_frames = negative_pulse_frames.size
duration_of_negative_pulse = number_of_negative_frames / sampling_rate

expected_duration = 200 * 1e-6 # 400 microseconds / 2
assert np.isclose(duration_of_negative_pulse, expected_duration)

# Check that the positive pulse is 200 us long
positive_pulse_frames = np.where(data_to_test > 0)[0]
number_of_positive_frames = positive_pulse_frames.size
duration_of_positive_pulse = number_of_positive_frames / sampling_rate
expected_duration = 200 * 1e-6 # 400 microseconds / 2

assert np.isclose(duration_of_positive_pulse, expected_duration)


if __name__ == "__main__":
unittest.main()
Loading