Skip to content

Commit

Permalink
Unpin zarr & xarray (#555)
Browse files Browse the repository at this point in the history
* Unpin zarr & xarray

* Sync with latest python api

* Update icechunk-python/python/icechunk/store.py

Co-authored-by: Deepak Cherian <[email protected]>

* Add tests for other range request types

* Change icehunk internal BYteRrange to match python

* Lint

* Better byterequest impl

* last -> until

* Fix ranging

---------

Co-authored-by: Matthew Iannucci <[email protected]>
  • Loading branch information
dcherian and mpiannucci authored Jan 9, 2025
1 parent b850bdb commit e4b6f00
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 40 deletions.
4 changes: 2 additions & 2 deletions icechunk-python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ classifiers = [
license = { text = "Apache-2.0" }
dynamic = ["version"]

dependencies = ["zarr==3.0.0rc2"]
dependencies = ["zarr>=3"]

[tool.poetry]
name = "icechunk"
Expand All @@ -39,7 +39,7 @@ test = [
"ruff",
"dask>=2024.11.0",
"distributed>=2024.11.0",
"xarray@git+https://github.com/pydata/xarray.git@main",
"xarray>=2025.01.1",
"hypothesis",
"pandas-stubs",
"boto3-stubs[s3]",
Expand Down
39 changes: 33 additions & 6 deletions icechunk-python/python/icechunk/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@
from typing import TYPE_CHECKING, Any

from icechunk._icechunk_python import PyStore
from zarr.abc.store import ByteRangeRequest, Store
from zarr.abc.store import (
ByteRequest,
OffsetByteRequest,
RangeByteRequest,
Store,
SuffixByteRequest,
)
from zarr.core.buffer import Buffer, BufferPrototype
from zarr.core.common import BytesLike
from zarr.core.sync import SyncMixin
Expand All @@ -12,6 +18,20 @@
from icechunk import Session


def _byte_request_to_tuple(
byte_request: ByteRequest | None,
) -> tuple[int | None, int | None]:
match byte_request:
case None:
return (None, None)
case RangeByteRequest(start, end):
return (start, end)
case OffsetByteRequest(offset):
return (offset, None)
case SuffixByteRequest(suffix):
return (None, suffix)


class IcechunkStore(Store, SyncMixin):
_store: PyStore

Expand Down Expand Up @@ -93,22 +113,28 @@ async def get(
self,
key: str,
prototype: BufferPrototype,
byte_range: tuple[int | None, int | None] | None = None,
byte_range: ByteRequest | None = None,
) -> Buffer | None:
"""Retrieve the value associated with a given key.
Parameters
----------
key : str
byte_range : tuple[int, Optional[int]], optional
byte_range : ByteRequest, optional
ByteRequest may be one of the following. If not provided, all data associated with the key is retrieved.
- RangeByteRequest(int, int): Request a specific range of bytes in the form (start, end). The end is exclusive. If the given range is zero-length or starts after the end of the object, an error will be returned. Additionally, if the range ends after the end of the object, the entire remainder of the object will be returned. Otherwise, the exact requested range will be returned.
- OffsetByteRequest(int): Request all bytes starting from a given byte offset. This is equivalent to bytes={int}- as an HTTP header.
- SuffixByteRequest(int): Request the last int bytes. Note that here, int is the size of the request, not the byte offset. This is equivalent to bytes=-{int} as an HTTP header.
Returns
-------
Buffer
"""

try:
result = await self._store.get(key, byte_range)
result = await self._store.get(key, _byte_request_to_tuple(byte_range))
except KeyError as _e:
# Zarr python expects None to be returned if the key does not exist
# but an IcechunkStore returns an error if the key does not exist
Expand All @@ -119,7 +145,7 @@ async def get(
async def get_partial_values(
self,
prototype: BufferPrototype,
key_ranges: Iterable[tuple[str, ByteRangeRequest]],
key_ranges: Iterable[tuple[str, ByteRequest | None]],
) -> list[Buffer | None]:
"""Retrieve possibly partial values from given key_ranges.
Expand All @@ -134,7 +160,8 @@ async def get_partial_values(
"""
# NOTE: pyo3 has not implicit conversion from an Iterable to a rust iterable. So we convert it
# to a list here first. Possible opportunity for optimization.
result = await self._store.get_partial_values(list(key_ranges))
ranges = [(k[0], _byte_request_to_tuple(k[1])) for k in key_ranges]
result = await self._store.get_partial_values(list(ranges))
return [prototype.buffer.from_bytes(r) for r in result]

async def exists(self, key: str) -> bool:
Expand Down
3 changes: 2 additions & 1 deletion icechunk-python/tests/test_zarr/test_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
from zarr.core.group import GroupMetadata
from zarr.core.sync import sync
from zarr.errors import ContainsArrayError, ContainsGroupError
from zarr.storage import StorePath, make_store_path
from zarr.storage import StorePath
from zarr.storage._common import make_store_path


@pytest.fixture(params=["memory"])
Expand Down
2 changes: 1 addition & 1 deletion icechunk-python/tests/test_zarr/test_store/test_core.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from icechunk import IcechunkStore
from tests.conftest import parse_repo
from zarr.storage import make_store_path
from zarr.storage._common import make_store_path


async def test_make_store_path() -> None:
Expand Down
15 changes: 13 additions & 2 deletions icechunk-python/tests/test_zarr/test_store/test_icechunk_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from icechunk import IcechunkStore, local_filesystem_storage
from icechunk.repository import Repository
from zarr.abc.store import OffsetByteRequest, RangeByteRequest, SuffixByteRequest
from zarr.core.buffer import Buffer, cpu, default_buffer_prototype
from zarr.core.sync import collect_aiterator
from zarr.testing.store import StoreTests
Expand Down Expand Up @@ -226,15 +227,25 @@ async def test_get_partial_values(
values = await store.get_partial_values(
default_buffer_prototype(),
[
("zarr.json", (0, 5)),
("zarr.json", RangeByteRequest(0, 5)),
("zarr.json", SuffixByteRequest(5)),
("zarr.json", OffsetByteRequest(10)),
],
)

assert len(values) == 1
assert len(values) == 3
data = values[0].to_bytes()
assert len(data) == 5
assert data == DEFAULT_GROUP_METADATA[:5]

data = values[1].to_bytes()
assert len(data) == len(DEFAULT_GROUP_METADATA) - 5
assert data == DEFAULT_GROUP_METADATA[:-5]

data = values[2].to_bytes()
assert len(data) == len(DEFAULT_GROUP_METADATA) - 10
assert data == DEFAULT_GROUP_METADATA[10:]

async def test_set(self, store: IcechunkStore) -> None:
await store.set("zarr.json", self.buffer_cls.from_bytes(DEFAULT_GROUP_METADATA))
assert await store.exists("zarr.json")
Expand Down
10 changes: 7 additions & 3 deletions icechunk/src/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,10 @@ pub enum ByteRange {
Bounded(Range<ChunkOffset>),
/// All bytes from the given offset (included) to the end of the object
From(ChunkOffset),
/// Last n bytes in the object
/// The last n bytes in the object
Last(ChunkLength),
/// All bytes up to the last n bytes in the object
Until(ChunkOffset),
}

impl From<Range<ChunkOffset>> for ByteRange {
Expand Down Expand Up @@ -185,7 +187,8 @@ impl ByteRange {
bytes.slice(range.start as usize..range.end as usize)
}
ByteRange::From(from) => bytes.slice(*from as usize..),
ByteRange::Last(n) => bytes.slice(bytes.len() - *n as usize..bytes.len()),
ByteRange::Last(n) => bytes.slice(bytes.len() - *n as usize..),
ByteRange::Until(n) => bytes.slice(0usize..bytes.len() - *n as usize),
}
}
}
Expand All @@ -195,7 +198,8 @@ impl From<(Option<ChunkOffset>, Option<ChunkOffset>)> for ByteRange {
match (start, end) {
(Some(start), Some(end)) => Self::Bounded(start..end),
(Some(start), None) => Self::From(start),
(None, Some(end)) => Self::Bounded(0..end),
// NOTE: This is relied upon by zarr python
(None, Some(end)) => Self::Until(end),
(None, None) => Self::ALL,
}
}
Expand Down
4 changes: 2 additions & 2 deletions icechunk/src/refs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,10 @@ pub async fn list_branches(
Ok(branches)
}

async fn branch_history<'a, 'b>(
async fn branch_history<'a>(
storage: &'a (dyn Storage + Send + Sync),
storage_settings: &storage::Settings,
branch: &'b str,
branch: &str,
) -> RefResult<impl Stream<Item = RefResult<BranchVersion>> + 'a> {
let key = branch_root(branch)?;
let all = storage.ref_versions(storage_settings, key.as_str()).await?;
Expand Down
13 changes: 9 additions & 4 deletions icechunk/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1014,9 +1014,9 @@ async fn updated_nodes<'a>(
.chain(change_set.new_nodes_iterator(manifest_id)))
}

async fn get_node<'a>(
async fn get_node(
asset_manager: &AssetManager,
change_set: &'a ChangeSet,
change_set: &ChangeSet,
snapshot_id: &SnapshotId,
path: &Path,
) -> SessionResult<NodeSnapshot> {
Expand All @@ -1037,9 +1037,9 @@ async fn get_node<'a>(
}
}

async fn get_existing_node<'a>(
async fn get_existing_node(
asset_manager: &AssetManager,
change_set: &'a ChangeSet,
change_set: &ChangeSet,
snapshot_id: &SnapshotId,
path: &Path,
) -> SessionResult<NodeSnapshot> {
Expand Down Expand Up @@ -1121,6 +1121,11 @@ pub fn construct_valid_byte_range(
let new_start = min(chunk_offset + n, chunk_offset + chunk_length - 1);
new_start..chunk_offset + chunk_length
}
ByteRange::Until(n) => {
let new_end = chunk_offset + chunk_length;
let new_start = new_end - n;
new_start..new_end
}
ByteRange::Last(n) => {
let new_end = chunk_offset + chunk_length;
let new_start = new_end - n;
Expand Down
22 changes: 3 additions & 19 deletions icechunk/src/storage/object_store.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use crate::{
format::{
ByteRange, ChunkId, ChunkOffset, FileTypeTag, ManifestId, ObjectId, SnapshotId,
},
format::{ChunkId, ChunkOffset, FileTypeTag, ManifestId, ObjectId, SnapshotId},
private,
};
use async_trait::async_trait;
Expand All @@ -12,8 +10,8 @@ use futures::{
};
use object_store::{
local::LocalFileSystem, parse_url_opts, path::Path as ObjectPath, Attribute,
AttributeValue, Attributes, GetOptions, GetRange, ObjectMeta, ObjectStore, PutMode,
PutOptions, PutPayload, UpdateVersion,
AttributeValue, Attributes, GetOptions, ObjectMeta, ObjectStore, PutMode, PutOptions,
PutPayload, UpdateVersion,
};
use serde::{Deserialize, Serialize};
use std::{
Expand All @@ -37,20 +35,6 @@ use super::{
SNAPSHOT_PREFIX, TRANSACTION_PREFIX,
};

// Get Range is object_store specific, keep it with this module
impl From<&ByteRange> for Option<GetRange> {
fn from(value: &ByteRange) -> Self {
match value {
ByteRange::Bounded(Range { start, end }) => {
Some(GetRange::Bounded(*start as usize..*end as usize))
}
ByteRange::From(start) if *start == 0u64 => None,
ByteRange::From(start) => Some(GetRange::Offset(*start as usize)),
ByteRange::Last(n) => Some(GetRange::Suffix(*n as usize)),
}
}
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct ObjectStorageConfig {
pub url: String,
Expand Down

0 comments on commit e4b6f00

Please sign in to comment.