-
|
@calderast wanted to take a SpikeInterfaceRecordingDataChunkIterator from NeuroConv and modify its output to return data multiplied by a conversion factor and converted back to an int16 before adding to an # Convert to uV without loading the whole thing at once
def traces_in_microvolts_iterator(traces_as_iterator, conversion_factor_uv):
for chunk in traces_as_iterator:
yield (chunk * conversion_factor_uv).astype("int16")
# Wrap iterator in DataChunkIterator for H5DataIO
data_iterator = DataChunkIterator(
traces_in_microvolts_iterator(traces_as_iterator, channel_conversion_factor_uv),
buffer_size=1, # number of chunks to keep in memory
maxshape=(num_samples, num_channels),
dtype=np.dtype("int16"),
)
data_data_io = H5DataIO(
data=data_iterator, # formerly traces_as_iterator
chunks=(min(num_samples, 81920), min(num_channels, 64)),
compression="gzip",
)
This resulted in the error |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 2 replies
-
|
The issue is that The solution was to create a new subclass of class MicrovoltsSpikeInterfaceRecordingDataChunkIterator(SpikeInterfaceRecordingDataChunkIterator):
def __init__(self, iterator: SpikeInterfaceRecordingDataChunkIterator, conversion_factor_uv):
self.iterator = iterator
self.conversion_factor_uv = conversion_factor_uv
super().__init__(iterator.recording)
def _get_default_chunk_shape(self, chunk_mb: float = 10.0) -> tuple[int, int]:
return self.iterator._get_default_chunk_shape(chunk_mb)
def _get_data(self, selection: tuple[slice]):
data = self.iterator._get_data(selection)
return (data * self.conversion_factor_uv).astype("int16")
def _get_dtype(self):
return np.dtype("int16")
def _get_maxshape(self):
return self.iterator._get_maxshape()
uv_traces_as_iterator = MicrovoltsSpikeInterfaceRecordingDataChunkIterator(traces_as_iterator, channel_conversion_factor_uv)
data_data_io = H5DataIO(
data=uv_traces_as_iterator,
chunks=(min(num_samples, 81920), min(num_channels, 64)),
compression="gzip",
) |
Beta Was this translation helpful? Give feedback.
-
|
An alternative approach is to apply the scaling before the iterator using SpikeInterface's lazy preprocessing tools. This avoids having to subclass or wrap the chunk iterator entirely. With SpikeInterface directlyfrom spikeinterface.preprocessing import scale, astype
from neuroconv.tools.spikeinterface import SpikeInterfaceRecordingDataChunkIterator
# Both operations are lazy (no data copied in memory)
scaled_recording = scale(recording, gain=conversion_factor)
scaled_recording = astype(scaled_recording, dtype="float32")
# The iterator sees already-transformed data
iterator = SpikeInterfaceRecordingDataChunkIterator(recording=scaled_recording)If the goal is specifically to convert to microvolts, from spikeinterface.preprocessing import scale_to_uV
recording_uV = scale_to_uV(recording)
iterator = SpikeInterfaceRecordingDataChunkIterator(recording=recording_uV)SpikeInterface preprocessors are lazy wrappers, so they apply the transform on each Within NeuroConvYou can swap the preprocessed recording on the data interface before running the conversion: from spikeinterface.preprocessing import scale
interface.recording = scale(interface.recording, gain=conversion_factor)
converter.run_conversion(nwbfile_path="output.nwb") |
Beta Was this translation helpful? Give feedback.
The issue is that
DataChunkIteratorassumes data are read in a very particular manner: It wraps returns one element along the iteration dimension at a time. I.e., the iterator is expected to return chunks that are one dimension lower than the array itself. For example, when iterating over the first dimension of a dataset with shape (1000, 10, 10), then the iterator would return 1000 chunks of shape (10, 10) one-chunk-at-a-time.The solution was to create a new subclass of
GenericDataChunkIteratororSpikeInterfaceRecordingDataChunkIteratorthat wraps the originalSpikeInterfaceRecordingDataChunkIteratorand modifies the_get_datamethod to get the data from the wrapped iterator, modify it…