Skip to content

Commit

Permalink
sharding with empty inner chunk and index location start (#2336)
Browse files Browse the repository at this point in the history
* test_sharding_with_empty_inner_chunk

* only update non-empty chunks offset

* format

* Update test_sharding.py

* format

---------

Co-authored-by: Norman Rzepka <[email protected]>
  • Loading branch information
brokkoli71 and normanrz authored Oct 18, 2024
1 parent ee112b9 commit 5f3a512
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 2 deletions.
1 change: 0 additions & 1 deletion src/zarr/codecs/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ async def decode_batch(
) -> Iterable[NDBuffer | None]:
chunk_bytes_batch: Iterable[Buffer | None]
chunk_bytes_batch, chunk_specs = _unzip2(chunk_bytes_and_specs)

(
aa_codecs_with_spec,
ab_codec_with_spec,
Expand Down
3 changes: 2 additions & 1 deletion src/zarr/codecs/sharding.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,8 @@ async def finalize(
) -> Buffer:
index_bytes = await index_encoder(self.index)
if index_location == ShardingCodecIndexLocation.start:
self.index.offsets_and_lengths[..., 0] += len(index_bytes)
empty_chunks_mask = self.index.offsets_and_lengths[..., 0] == MAX_UINT_64
self.index.offsets_and_lengths[~empty_chunks_mask, 0] += len(index_bytes)
index_bytes = await index_encoder(self.index) # encode again with corrected offsets
out_buf = index_bytes + self.buf
else:
Expand Down
27 changes: 27 additions & 0 deletions tests/test_codecs/test_sharding.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,3 +366,30 @@ async def test_delete_empty_shards(store: Store) -> None:
def test_pickle() -> None:
codec = ShardingCodec(chunk_shape=(8, 8))
assert pickle.loads(pickle.dumps(codec)) == codec


@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"])
@pytest.mark.parametrize(
"index_location", [ShardingCodecIndexLocation.start, ShardingCodecIndexLocation.end]
)
async def test_sharding_with_empty_inner_chunk(
store: Store, index_location: ShardingCodecIndexLocation
) -> None:
data = np.arange(0, 16 * 16, dtype="uint32").reshape((16, 16))
fill_value = 1

path = f"sharding_with_empty_inner_chunk_{index_location}"
spath = StorePath(store, path)
a = await AsyncArray.create(
spath,
shape=(16, 16),
chunk_shape=(8, 8),
dtype="uint32",
fill_value=fill_value,
codecs=[ShardingCodec(chunk_shape=(4, 4), index_location=index_location)],
)
data[:4, :4] = fill_value
await a.setitem(..., data)
print("read data")
data_read = await a.getitem(...)
assert np.array_equal(data_read, data)

0 comments on commit 5f3a512

Please sign in to comment.