diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 8b6c41901c..cfc0bdf975 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -2,7 +2,7 @@ ci: autoupdate_commit_msg: "chore: update pre-commit hooks" autoupdate_schedule: "monthly" autofix_prs: false - skip: [] # pre-commit.ci only checks for updates, prek runs hooks locally + skip: [] # pre-commit.ci only checks for updates, prek runs hooks locally default_stages: [pre-commit, pre-push] @@ -41,7 +41,8 @@ repos: - numpy==2.1 # until https://github.com/numpy/numpy/issues/28034 is resolved - typing_extensions - universal-pathlib - - obstore>=0.5.1 + - obstore>=0.7.0 + - obspec>=0.1.0 # Tests - pytest - hypothesis diff --git a/pyproject.toml b/pyproject.toml index 068caa1f0d..0eb7a1b019 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -64,7 +64,8 @@ keywords = ["Python", "compressed", "ndimensional-arrays", "zarr"] # User extras remote = [ "fsspec>=2023.10.0", - "obstore>=0.5.1", + "obstore>=0.7.0", + "obspec>=0.1.0", ] gpu = [ "cupy-cuda12x", @@ -218,6 +219,7 @@ dependencies = [ 'typing_extensions @ git+https://github.com/python/typing_extensions', 'donfig @ git+https://github.com/pytroll/donfig', 'obstore @ git+https://github.com/developmentseed/obstore@main#subdirectory=obstore', + 'obspec @ git+https://github.com/developmentseed/obspec@main', # test deps 'zarr[test]', ] @@ -244,7 +246,8 @@ dependencies = [ 'universal_pathlib==0.0.22', 'typing_extensions==4.12.*', 'donfig==0.8.*', - 'obstore==0.5.*', + 'obstore==0.7.*', + 'obspec==0.1.*', # test deps 'zarr[test]', 'zarr[remote_tests]', diff --git a/src/zarr/storage/_obstore.py b/src/zarr/storage/_obstore.py index 5c2197ecf6..cc76f6e4dc 100644 --- a/src/zarr/storage/_obstore.py +++ b/src/zarr/storage/_obstore.py @@ -1,10 +1,24 @@ from __future__ import annotations import asyncio -import contextlib import pickle from collections import defaultdict -from typing import TYPE_CHECKING, Generic, Self, TypedDict, TypeVar +from collections.abc import Sequence +from typing import TYPE_CHECKING, Generic, Protocol, Self, TypedDict, TypeVar + +from obspec import ( + DeleteAsync, + GetAsync, + GetRangeAsync, + GetRangesAsync, + HeadAsync, + ListAsync, + ListWithDelimiterAsync, + OffsetRange, + PutAsync, + SuffixRange, +) +from obspec.exceptions import AlreadyExistsError, NotSupportedError, map_exception from zarr.abc.store import ( ByteRequest, @@ -20,8 +34,7 @@ from collections.abc import AsyncGenerator, Coroutine, Iterable, Sequence from typing import Any - from obstore import ListResult, ListStream, ObjectMeta, OffsetRange, SuffixRange - from obstore.store import ObjectStore as _UpstreamObjectStore + from obspec import ListResult, ObjectMeta from zarr.core.buffer import Buffer, BufferPrototype @@ -34,7 +47,21 @@ ) -T_Store = TypeVar("T_Store", bound="_UpstreamObjectStore") +class ObspecInput( + DeleteAsync, + GetAsync, + GetRangeAsync, + GetRangesAsync, + HeadAsync, + ListAsync, + ListWithDelimiterAsync, + PutAsync, + Protocol, +): + pass + + +T_Store = TypeVar("T_Store", bound=ObspecInput) class ObjectStore(Store, Generic[T_Store]): @@ -98,42 +125,44 @@ async def get( self, key: str, prototype: BufferPrototype, byte_range: ByteRequest | None = None ) -> Buffer | None: # docstring inherited - import obstore as obs try: if byte_range is None: - resp = await obs.get_async(self.store, key) - return prototype.buffer.from_bytes(await resp.bytes_async()) # type: ignore[arg-type] + resp = await self.store.get_async(key) + return prototype.buffer.from_bytes(await resp.buffer_async()) # type: ignore[arg-type] elif isinstance(byte_range, RangeByteRequest): - bytes = await obs.get_range_async( - self.store, key, start=byte_range.start, end=byte_range.end + bytes = await self.store.get_range_async( + key, start=byte_range.start, end=byte_range.end ) return prototype.buffer.from_bytes(bytes) # type: ignore[arg-type] elif isinstance(byte_range, OffsetByteRequest): - resp = await obs.get_async( - self.store, key, options={"range": {"offset": byte_range.offset}} + resp = await self.store.get_async( + key, options={"range": {"offset": byte_range.offset}} ) - return prototype.buffer.from_bytes(await resp.bytes_async()) # type: ignore[arg-type] + return prototype.buffer.from_bytes(await resp.buffer_async()) # type: ignore[arg-type] elif isinstance(byte_range, SuffixByteRequest): # some object stores (Azure) don't support suffix requests. In this # case, our workaround is to first get the length of the object and then # manually request the byte range at the end. try: - resp = await obs.get_async( - self.store, key, options={"range": {"suffix": byte_range.suffix}} + resp = await self.store.get_async( + key, options={"range": {"suffix": byte_range.suffix}} ) - return prototype.buffer.from_bytes(await resp.bytes_async()) # type: ignore[arg-type] - except obs.exceptions.NotSupportedError: - head_resp = await obs.head_async(self.store, key) - file_size = head_resp["size"] - suffix_len = byte_range.suffix - buffer = await obs.get_range_async( - self.store, - key, - start=file_size - suffix_len, - length=suffix_len, - ) - return prototype.buffer.from_bytes(buffer) # type: ignore[arg-type] + return prototype.buffer.from_bytes(await resp.buffer_async()) # type: ignore[arg-type] + except Exception as e: + if isinstance(map_exception(e), NotSupportedError): + head_resp = await self.store.head_async(key) + file_size = head_resp["size"] + suffix_len = byte_range.suffix + buffer = await self.store.get_range_async( + key, + start=file_size - suffix_len, + length=suffix_len, + ) + return prototype.buffer.from_bytes(buffer) # type: ignore[arg-type] + else: + raise e from None + else: raise ValueError(f"Unexpected byte_range, got {byte_range}") except _ALLOWED_EXCEPTIONS: @@ -149,10 +178,9 @@ async def get_partial_values( async def exists(self, key: str) -> bool: # docstring inherited - import obstore as obs try: - await obs.head_async(self.store, key) + await self.store.head_async(key) except FileNotFoundError: return False else: @@ -165,21 +193,26 @@ def supports_writes(self) -> bool: async def set(self, key: str, value: Buffer) -> None: # docstring inherited - import obstore as obs self._check_writable() buf = value.as_buffer_like() - await obs.put_async(self.store, key, buf) + await self.store.put_async(key, buf) async def set_if_not_exists(self, key: str, value: Buffer) -> None: # docstring inherited - import obstore as obs self._check_writable() buf = value.as_buffer_like() - with contextlib.suppress(obs.exceptions.AlreadyExistsError): - await obs.put_async(self.store, key, buf, mode="create") + try: + await self.store.put_async(key, buf, mode="create") + # Suppress an AlreadyExistsError + except Exception as e: + mapped_exc = map_exception(e) + if isinstance(mapped_exc, AlreadyExistsError): + pass + else: + raise mapped_exc from None @property def supports_deletes(self) -> bool: @@ -188,7 +221,6 @@ def supports_deletes(self) -> bool: async def delete(self, key: str) -> None: # docstring inherited - import obstore as obs self._check_writable() @@ -196,18 +228,25 @@ async def delete(self, key: str) -> None: # when deleting a non-existent key, while others such as S3 and in-memory do # not. We suppress the error to make the behavior consistent across all obstore # stores. This is also in line with the behavior of the other Zarr store adapters. - with contextlib.suppress(FileNotFoundError): - await obs.delete_async(self.store, key) + try: + await self.store.delete_async(key) + except Exception as e: + mapped_exc = map_exception(e) + # The obspec NotFoundError subclasses from the global FileNotFoundError + # https://developmentseed.org/obspec/latest/api/exceptions/#obspec.exceptions.NotFoundError + if isinstance(mapped_exc, FileNotFoundError): + pass + else: + raise mapped_exc from None async def delete_dir(self, prefix: str) -> None: # docstring inherited - import obstore as obs self._check_writable() if prefix != "" and not prefix.endswith("/"): prefix += "/" - metas = await obs.list(self.store, prefix).collect_async() + metas = [obj async for batch in self.store.list_async(prefix) for obj in batch] keys = [(m["path"],) for m in metas] await concurrent_map(keys, self.delete, limit=config.get("async.concurrency")) @@ -217,9 +256,7 @@ def supports_listing(self) -> bool: return True async def _list(self, prefix: str | None = None) -> AsyncGenerator[ObjectMeta, None]: - import obstore as obs - - objects: ListStream[Sequence[ObjectMeta]] = obs.list(self.store, prefix=prefix) + objects = self.store.list_async(prefix=prefix) async for batch in objects: for item in batch: yield item @@ -234,16 +271,14 @@ def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]: def list_dir(self, prefix: str) -> AsyncGenerator[str, None]: # docstring inherited - import obstore as obs - coroutine = obs.list_with_delimiter_async(self.store, prefix=prefix) + coroutine = self.store.list_with_delimiter_async(prefix=prefix) return _transform_list_dir(coroutine, prefix) async def getsize(self, key: str) -> int: # docstring inherited - import obstore as obs - resp = await obs.head_async(self.store, key) + resp = await self.store.head_async(key) return resp["size"] async def getsize_prefix(self, prefix: str) -> int: @@ -333,7 +368,7 @@ class _Response(TypedDict): async def _make_bounded_requests( - store: _UpstreamObjectStore, + store: ObspecInput, path: str, requests: list[_BoundedRequest], prototype: BufferPrototype, @@ -345,12 +380,11 @@ async def _make_bounded_requests( within a single file, and will e.g. merge concurrent requests. This only uses one single Python coroutine. """ - import obstore as obs starts = [r["start"] for r in requests] ends = [r["end"] for r in requests] async with semaphore: - responses = await obs.get_ranges_async(store, path=path, starts=starts, ends=ends) + responses = await store.get_ranges_async(path=path, starts=starts, ends=ends) buffer_responses: list[_Response] = [] for request, response in zip(requests, responses, strict=True): @@ -365,7 +399,7 @@ async def _make_bounded_requests( async def _make_other_request( - store: _UpstreamObjectStore, + store: ObspecInput, request: _OtherRequest, prototype: BufferPrototype, semaphore: asyncio.Semaphore, @@ -375,14 +409,13 @@ async def _make_other_request( We return a `list[_Response]` for symmetry with `_make_bounded_requests` so that all futures can be gathered together. """ - import obstore as obs async with semaphore: if request["range"] is None: - resp = await obs.get_async(store, request["path"]) + resp = await store.get_async(request["path"]) else: - resp = await obs.get_async(store, request["path"], options={"range": request["range"]}) - buffer = await resp.bytes_async() + resp = await store.get_async(request["path"], options={"range": request["range"]}) + buffer = await resp.buffer_async() return [ { @@ -393,7 +426,7 @@ async def _make_other_request( async def _make_suffix_request( - store: _UpstreamObjectStore, + store: ObspecInput, request: _SuffixRequest, prototype: BufferPrototype, semaphore: asyncio.Semaphore, @@ -407,18 +440,19 @@ async def _make_suffix_request( We return a `list[_Response]` for symmetry with `_make_bounded_requests` so that all futures can be gathered together. """ - import obstore as obs - async with semaphore: try: - resp = await obs.get_async(store, request["path"], options={"range": request["range"]}) - buffer = await resp.bytes_async() - except obs.exceptions.NotSupportedError: - head_resp = await obs.head_async(store, request["path"]) + resp = await store.get_async(request["path"], options={"range": request["range"]}) + buffer = await resp.buffer_async() + except Exception as e: + mapped_exc = map_exception(e) + if not isinstance(mapped_exc, NotSupportedError): + raise mapped_exc from None + + head_resp = await store.head_async(request["path"]) file_size = head_resp["size"] suffix_len = request["range"]["suffix"] - buffer = await obs.get_range_async( - store, + buffer = await store.get_range_async( request["path"], start=file_size - suffix_len, length=suffix_len, @@ -433,7 +467,7 @@ async def _make_suffix_request( async def _get_partial_values( - store: _UpstreamObjectStore, + store: ObspecInput, prototype: BufferPrototype, key_ranges: Iterable[tuple[str, ByteRequest | None]], ) -> list[Buffer | None]: