Skip to content

Commit

Permalink
feature(store): V3 ZipStore (#2078)
Browse files Browse the repository at this point in the history
* feature(store): add basic implementation of a zip store

* add zip store to array/group/sharding tests

* fix sharding and skip tests that require delete

* store context managers

* fix test typing

* add buffer_cls to store test

* clean up test failures

* class docstring

* remove commented out check against zipstore.delete

* add api integration test

---------

Co-authored-by: Davis Bennett <[email protected]>
  • Loading branch information
jhamman and d-v-b authored Sep 13, 2024
1 parent 534e0cd commit f1bd703
Show file tree
Hide file tree
Showing 11 changed files with 413 additions and 58 deletions.
17 changes: 16 additions & 1 deletion src/zarr/abc/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@


class AccessMode(NamedTuple):
str: AccessModeLiteral
readonly: bool
overwrite: bool
create: bool
Expand All @@ -20,6 +21,7 @@ class AccessMode(NamedTuple):
def from_literal(cls, mode: AccessModeLiteral) -> Self:
if mode in ("r", "r+", "a", "w", "w-"):
return cls(
str=mode,
readonly=mode == "r",
overwrite=mode == "w",
create=mode in ("a", "w", "w-"),
Expand All @@ -42,6 +44,14 @@ async def open(cls, *args: Any, **kwargs: Any) -> Self:
await store._open()
return store

def __enter__(self) -> Self:
"""Enter a context manager that will close the store upon exiting."""
return self

def __exit__(self, *args: Any) -> None:
"""Close the store."""
self.close()

async def _open(self) -> None:
if self._is_open:
raise ValueError("store is already open")
Expand Down Expand Up @@ -143,6 +153,12 @@ async def set(self, key: str, value: Buffer) -> None:
"""
...

@property
@abstractmethod
def supports_deletes(self) -> bool:
"""Does the store support deletes?"""
...

@abstractmethod
async def delete(self, key: str) -> None:
"""Remove a key from the store
Expand Down Expand Up @@ -221,7 +237,6 @@ def list_dir(self, prefix: str) -> AsyncGenerator[str, None]:
def close(self) -> None:
"""Close the store."""
self._is_open = False
pass


@runtime_checkable
Expand Down
11 changes: 10 additions & 1 deletion src/zarr/store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,14 @@
from zarr.store.local import LocalStore
from zarr.store.memory import MemoryStore
from zarr.store.remote import RemoteStore
from zarr.store.zip import ZipStore

__all__ = ["StorePath", "StoreLike", "make_store_path", "RemoteStore", "LocalStore", "MemoryStore"]
__all__ = [
"StorePath",
"StoreLike",
"make_store_path",
"RemoteStore",
"LocalStore",
"MemoryStore",
"ZipStore",
]
1 change: 1 addition & 0 deletions src/zarr/store/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def _put(

class LocalStore(Store):
supports_writes: bool = True
supports_deletes: bool = True
supports_partial_writes: bool = True
supports_listing: bool = True

Expand Down
1 change: 1 addition & 0 deletions src/zarr/store/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
# When that is done, the `MemoryStore` will just be a store that wraps a dict.
class MemoryStore(Store):
supports_writes: bool = True
supports_deletes: bool = True
supports_partial_writes: bool = True
supports_listing: bool = True

Expand Down
1 change: 1 addition & 0 deletions src/zarr/store/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
class RemoteStore(Store):
# based on FSSpec
supports_writes: bool = True
supports_deletes: bool = True
supports_partial_writes: bool = False
supports_listing: bool = True

Expand Down
223 changes: 223 additions & 0 deletions src/zarr/store/zip.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
from __future__ import annotations

import os
import threading
import time
import zipfile
from pathlib import Path
from typing import TYPE_CHECKING, Literal

from zarr.abc.store import Store
from zarr.core.buffer import Buffer, BufferPrototype

if TYPE_CHECKING:
from collections.abc import AsyncGenerator

ZipStoreAccessModeLiteral = Literal["r", "w", "a"]


class ZipStore(Store):
"""
Storage class using a ZIP file.
Parameters
----------
path : string
Location of file.
compression : integer, optional
Compression method to use when writing to the archive.
allowZip64 : bool, optional
If True (the default) will create ZIP files that use the ZIP64
extensions when the zipfile is larger than 2 GiB. If False
will raise an exception when the ZIP file would require ZIP64
extensions.
mode : string, optional
One of 'r' to read an existing file, 'w' to truncate and write a new
file, 'a' to append to an existing file, or 'x' to exclusively create
and write a new file.
"""

supports_writes: bool = True
supports_deletes: bool = False
supports_partial_writes: bool = False
supports_listing: bool = True

path: Path
compression: int
allowZip64: bool

_zf: zipfile.ZipFile
_lock: threading.RLock

def __init__(
self,
path: Path | str,
*,
mode: ZipStoreAccessModeLiteral = "r",
compression: int = zipfile.ZIP_STORED,
allowZip64: bool = True,
):
super().__init__(mode=mode)

if isinstance(path, str):
path = Path(path)
assert isinstance(path, Path)
self.path = path # root?

self._zmode = mode
self.compression = compression
self.allowZip64 = allowZip64

async def _open(self) -> None:
if self._is_open:
raise ValueError("store is already open")

self._lock = threading.RLock()

self._zf = zipfile.ZipFile(
self.path,
mode=self._zmode,
compression=self.compression,
allowZip64=self.allowZip64,
)

self._is_open = True

def close(self) -> None:
super().close()
with self._lock:
self._zf.close()

async def clear(self) -> None:
with self._lock:
self._check_writable()
self._zf.close()
os.remove(self.path)
self._zf = zipfile.ZipFile(
self.path, mode="w", compression=self.compression, allowZip64=self.allowZip64
)

async def empty(self) -> bool:
with self._lock:
if self._zf.namelist():
return False
else:
return True

def __str__(self) -> str:
return f"zip://{self.path}"

def __repr__(self) -> str:
return f"ZipStore({str(self)!r})"

def __eq__(self, other: object) -> bool:
return isinstance(other, type(self)) and self.path == other.path

def _get(
self,
key: str,
prototype: BufferPrototype,
byte_range: tuple[int | None, int | None] | None = None,
) -> Buffer | None:
try:
with self._zf.open(key) as f: # will raise KeyError
if byte_range is None:
return prototype.buffer.from_bytes(f.read())
start, length = byte_range
if start:
if start < 0:
start = f.seek(start, os.SEEK_END) + start
else:
start = f.seek(start, os.SEEK_SET)
if length:
return prototype.buffer.from_bytes(f.read(length))
else:
return prototype.buffer.from_bytes(f.read())
except KeyError:
return None

async def get(
self,
key: str,
prototype: BufferPrototype,
byte_range: tuple[int | None, int | None] | None = None,
) -> Buffer | None:
assert isinstance(key, str)

with self._lock:
return self._get(key, prototype=prototype, byte_range=byte_range)

async def get_partial_values(
self,
prototype: BufferPrototype,
key_ranges: list[tuple[str, tuple[int | None, int | None]]],
) -> list[Buffer | None]:
out = []
with self._lock:
for key, byte_range in key_ranges:
out.append(self._get(key, prototype=prototype, byte_range=byte_range))
return out

def _set(self, key: str, value: Buffer) -> None:
# generally, this should be called inside a lock
keyinfo = zipfile.ZipInfo(filename=key, date_time=time.localtime(time.time())[:6])
keyinfo.compress_type = self.compression
if keyinfo.filename[-1] == os.sep:
keyinfo.external_attr = 0o40775 << 16 # drwxrwxr-x
keyinfo.external_attr |= 0x10 # MS-DOS directory flag
else:
keyinfo.external_attr = 0o644 << 16 # ?rw-r--r--
self._zf.writestr(keyinfo, value.to_bytes())

async def set(self, key: str, value: Buffer) -> None:
self._check_writable()
assert isinstance(key, str)
if not isinstance(value, Buffer):
raise TypeError("ZipStore.set(): `value` must a Buffer instance")
with self._lock:
self._set(key, value)

async def set_partial_values(self, key_start_values: list[tuple[str, int, bytes]]) -> None:
raise NotImplementedError

async def delete(self, key: str) -> None:
raise NotImplementedError

async def exists(self, key: str) -> bool:
with self._lock:
try:
self._zf.getinfo(key)
except KeyError:
return False
else:
return True

async def list(self) -> AsyncGenerator[str, None]:
with self._lock:
for key in self._zf.namelist():
yield key

async def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]:
async for key in self.list():
if key.startswith(prefix):
yield key

async def list_dir(self, prefix: str) -> AsyncGenerator[str, None]:
if prefix.endswith("/"):
prefix = prefix[:-1]

keys = self._zf.namelist()
seen = set()
if prefix == "":
keys_unique = set(k.split("/")[0] for k in keys)
for key in keys_unique:
if key not in seen:
seen.add(key)
yield key
else:
for key in keys:
if key.startswith(prefix + "/") and key != prefix:
k = key.removeprefix(prefix + "/").split("/")[0]
if k not in seen:
seen.add(k)
yield k
15 changes: 11 additions & 4 deletions tests/v3/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from hypothesis import HealthCheck, Verbosity, settings

from zarr import AsyncGroup, config
from zarr.store import LocalStore, MemoryStore, StorePath
from zarr.store import LocalStore, MemoryStore, StorePath, ZipStore
from zarr.store.remote import RemoteStore

if TYPE_CHECKING:
Expand All @@ -25,14 +25,16 @@


async def parse_store(
store: Literal["local", "memory", "remote"], path: str
) -> LocalStore | MemoryStore | RemoteStore:
store: Literal["local", "memory", "remote", "zip"], path: str
) -> LocalStore | MemoryStore | RemoteStore | ZipStore:
if store == "local":
return await LocalStore.open(path, mode="w")
if store == "memory":
return await MemoryStore.open(mode="w")
if store == "remote":
return await RemoteStore.open(url=path, mode="w")
if store == "zip":
return await ZipStore.open(path + "/zarr.zip", mode="w")
raise AssertionError


Expand Down Expand Up @@ -64,6 +66,11 @@ async def memory_store() -> MemoryStore:
return await MemoryStore.open(mode="w")


@pytest.fixture(scope="function")
async def zip_store(tmpdir: LEGACY_PATH) -> ZipStore:
return await ZipStore.open(str(tmpdir / "zarr.zip"), mode="w")


@pytest.fixture(scope="function")
async def store(request: pytest.FixtureRequest, tmpdir: LEGACY_PATH) -> Store:
param = request.param
Expand All @@ -73,7 +80,7 @@ async def store(request: pytest.FixtureRequest, tmpdir: LEGACY_PATH) -> Store:
@dataclass
class AsyncGroupRequest:
zarr_format: ZarrFormat
store: Literal["local", "remote", "memory"]
store: Literal["local", "remote", "memory", "zip"]
attributes: dict[str, Any] = field(default_factory=dict)


Expand Down
6 changes: 3 additions & 3 deletions tests/v3/test_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from zarr.store.common import StorePath


@pytest.mark.parametrize("store", ("local", "memory"), indirect=["store"])
@pytest.mark.parametrize("store", ("local", "memory", "zip"), indirect=["store"])
@pytest.mark.parametrize("zarr_format", (2, 3))
@pytest.mark.parametrize("exists_ok", [True, False])
@pytest.mark.parametrize("extant_node", ["array", "group"])
Expand Down Expand Up @@ -59,7 +59,7 @@ def test_array_creation_existing_node(
)


@pytest.mark.parametrize("store", ("local", "memory"), indirect=["store"])
@pytest.mark.parametrize("store", ("local", "memory", "zip"), indirect=["store"])
@pytest.mark.parametrize("zarr_format", (2, 3))
def test_array_name_properties_no_group(
store: LocalStore | MemoryStore, zarr_format: ZarrFormat
Expand All @@ -70,7 +70,7 @@ def test_array_name_properties_no_group(
assert arr.basename is None


@pytest.mark.parametrize("store", ("local", "memory"), indirect=["store"])
@pytest.mark.parametrize("store", ("local", "memory", "zip"), indirect=["store"])
@pytest.mark.parametrize("zarr_format", (2, 3))
def test_array_name_properties_with_group(
store: LocalStore | MemoryStore, zarr_format: ZarrFormat
Expand Down
Loading

0 comments on commit f1bd703

Please sign in to comment.