Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ ci:
autoupdate_commit_msg: "chore: update pre-commit hooks"
autoupdate_schedule: "monthly"
autofix_prs: false
skip: [] # pre-commit.ci only checks for updates, prek runs hooks locally
skip: [] # pre-commit.ci only checks for updates, prek runs hooks locally

default_stages: [pre-commit, pre-push]

Expand Down Expand Up @@ -41,7 +41,8 @@ repos:
- numpy==2.1 # until https://github.com/numpy/numpy/issues/28034 is resolved
- typing_extensions
- universal-pathlib
- obstore>=0.5.1
- obstore>=0.7.0
- obspec>=0.1.0
# Tests
- pytest
- hypothesis
Expand Down
7 changes: 5 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ keywords = ["Python", "compressed", "ndimensional-arrays", "zarr"]
# User extras
remote = [
"fsspec>=2023.10.0",
"obstore>=0.5.1",
"obstore>=0.7.0",
"obspec>=0.1.0",
]
gpu = [
"cupy-cuda12x",
Expand Down Expand Up @@ -218,6 +219,7 @@ dependencies = [
'typing_extensions @ git+https://github.com/python/typing_extensions',
'donfig @ git+https://github.com/pytroll/donfig',
'obstore @ git+https://github.com/developmentseed/obstore@main#subdirectory=obstore',
'obspec @ git+https://github.com/developmentseed/obspec@main',
# test deps
'zarr[test]',
]
Expand All @@ -244,7 +246,8 @@ dependencies = [
'universal_pathlib==0.0.22',
'typing_extensions==4.12.*',
'donfig==0.8.*',
'obstore==0.5.*',
'obstore==0.7.*',
'obspec==0.1.*',
# test deps
'zarr[test]',
'zarr[remote_tests]',
Expand Down
162 changes: 98 additions & 64 deletions src/zarr/storage/_obstore.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,24 @@
from __future__ import annotations

import asyncio
import contextlib
import pickle
from collections import defaultdict
from typing import TYPE_CHECKING, Generic, Self, TypedDict, TypeVar
from collections.abc import Sequence
from typing import TYPE_CHECKING, Generic, Protocol, Self, TypedDict, TypeVar

from obspec import (
DeleteAsync,
GetAsync,
GetRangeAsync,
GetRangesAsync,
HeadAsync,
ListAsync,
ListWithDelimiterAsync,
OffsetRange,
PutAsync,
SuffixRange,
)
from obspec.exceptions import AlreadyExistsError, NotSupportedError, map_exception

from zarr.abc.store import (
ByteRequest,
Expand All @@ -20,8 +34,7 @@
from collections.abc import AsyncGenerator, Coroutine, Iterable, Sequence
from typing import Any

from obstore import ListResult, ListStream, ObjectMeta, OffsetRange, SuffixRange
from obstore.store import ObjectStore as _UpstreamObjectStore
from obspec import ListResult, ObjectMeta

from zarr.core.buffer import Buffer, BufferPrototype

Expand All @@ -34,7 +47,21 @@
)


T_Store = TypeVar("T_Store", bound="_UpstreamObjectStore")
class ObspecInput(
DeleteAsync,
GetAsync,
GetRangeAsync,
GetRangesAsync,
HeadAsync,
ListAsync,
ListWithDelimiterAsync,
PutAsync,
Protocol,
):
pass


T_Store = TypeVar("T_Store", bound=ObspecInput)


class ObjectStore(Store, Generic[T_Store]):
Expand Down Expand Up @@ -98,42 +125,44 @@ async def get(
self, key: str, prototype: BufferPrototype, byte_range: ByteRequest | None = None
) -> Buffer | None:
# docstring inherited
import obstore as obs

try:
if byte_range is None:
resp = await obs.get_async(self.store, key)
return prototype.buffer.from_bytes(await resp.bytes_async()) # type: ignore[arg-type]
resp = await self.store.get_async(key)
return prototype.buffer.from_bytes(await resp.buffer_async()) # type: ignore[arg-type]
elif isinstance(byte_range, RangeByteRequest):
bytes = await obs.get_range_async(
self.store, key, start=byte_range.start, end=byte_range.end
bytes = await self.store.get_range_async(
key, start=byte_range.start, end=byte_range.end
)
return prototype.buffer.from_bytes(bytes) # type: ignore[arg-type]
elif isinstance(byte_range, OffsetByteRequest):
resp = await obs.get_async(
self.store, key, options={"range": {"offset": byte_range.offset}}
resp = await self.store.get_async(
key, options={"range": {"offset": byte_range.offset}}
)
return prototype.buffer.from_bytes(await resp.bytes_async()) # type: ignore[arg-type]
return prototype.buffer.from_bytes(await resp.buffer_async()) # type: ignore[arg-type]
elif isinstance(byte_range, SuffixByteRequest):
# some object stores (Azure) don't support suffix requests. In this
# case, our workaround is to first get the length of the object and then
# manually request the byte range at the end.
try:
resp = await obs.get_async(
self.store, key, options={"range": {"suffix": byte_range.suffix}}
resp = await self.store.get_async(
key, options={"range": {"suffix": byte_range.suffix}}
)
return prototype.buffer.from_bytes(await resp.bytes_async()) # type: ignore[arg-type]
except obs.exceptions.NotSupportedError:
head_resp = await obs.head_async(self.store, key)
file_size = head_resp["size"]
suffix_len = byte_range.suffix
buffer = await obs.get_range_async(
self.store,
key,
start=file_size - suffix_len,
length=suffix_len,
)
return prototype.buffer.from_bytes(buffer) # type: ignore[arg-type]
return prototype.buffer.from_bytes(await resp.buffer_async()) # type: ignore[arg-type]
except Exception as e:
if isinstance(map_exception(e), NotSupportedError):
head_resp = await self.store.head_async(key)
file_size = head_resp["size"]
suffix_len = byte_range.suffix
buffer = await self.store.get_range_async(
key,
start=file_size - suffix_len,
length=suffix_len,
)
return prototype.buffer.from_bytes(buffer) # type: ignore[arg-type]
else:
raise e from None

else:
raise ValueError(f"Unexpected byte_range, got {byte_range}")
except _ALLOWED_EXCEPTIONS:
Expand All @@ -149,10 +178,9 @@ async def get_partial_values(

async def exists(self, key: str) -> bool:
# docstring inherited
import obstore as obs

try:
await obs.head_async(self.store, key)
await self.store.head_async(key)
except FileNotFoundError:
return False
else:
Expand All @@ -165,21 +193,26 @@ def supports_writes(self) -> bool:

async def set(self, key: str, value: Buffer) -> None:
# docstring inherited
import obstore as obs

self._check_writable()

buf = value.as_buffer_like()
await obs.put_async(self.store, key, buf)
await self.store.put_async(key, buf)

async def set_if_not_exists(self, key: str, value: Buffer) -> None:
# docstring inherited
import obstore as obs

self._check_writable()
buf = value.as_buffer_like()
with contextlib.suppress(obs.exceptions.AlreadyExistsError):
await obs.put_async(self.store, key, buf, mode="create")
try:
await self.store.put_async(key, buf, mode="create")
# Suppress an AlreadyExistsError
except Exception as e:
mapped_exc = map_exception(e)
if isinstance(mapped_exc, AlreadyExistsError):
pass
else:
raise mapped_exc from None

@property
def supports_deletes(self) -> bool:
Expand All @@ -188,26 +221,32 @@ def supports_deletes(self) -> bool:

async def delete(self, key: str) -> None:
# docstring inherited
import obstore as obs

self._check_writable()

# Some obstore stores such as local filesystems, GCP and Azure raise an error
# when deleting a non-existent key, while others such as S3 and in-memory do
# not. We suppress the error to make the behavior consistent across all obstore
# stores. This is also in line with the behavior of the other Zarr store adapters.
with contextlib.suppress(FileNotFoundError):
await obs.delete_async(self.store, key)
try:
await self.store.delete_async(key)
except Exception as e:
mapped_exc = map_exception(e)
# The obspec NotFoundError subclasses from the global FileNotFoundError
# https://developmentseed.org/obspec/latest/api/exceptions/#obspec.exceptions.NotFoundError
if isinstance(mapped_exc, FileNotFoundError):
pass
else:
raise mapped_exc from None

async def delete_dir(self, prefix: str) -> None:
# docstring inherited
import obstore as obs

self._check_writable()
if prefix != "" and not prefix.endswith("/"):
prefix += "/"

metas = await obs.list(self.store, prefix).collect_async()
metas = [obj async for batch in self.store.list_async(prefix) for obj in batch]
keys = [(m["path"],) for m in metas]
await concurrent_map(keys, self.delete, limit=config.get("async.concurrency"))

Expand All @@ -217,9 +256,7 @@ def supports_listing(self) -> bool:
return True

async def _list(self, prefix: str | None = None) -> AsyncGenerator[ObjectMeta, None]:
import obstore as obs

objects: ListStream[Sequence[ObjectMeta]] = obs.list(self.store, prefix=prefix)
objects = self.store.list_async(prefix=prefix)
async for batch in objects:
for item in batch:
yield item
Expand All @@ -234,16 +271,14 @@ def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]:

def list_dir(self, prefix: str) -> AsyncGenerator[str, None]:
# docstring inherited
import obstore as obs

coroutine = obs.list_with_delimiter_async(self.store, prefix=prefix)
coroutine = self.store.list_with_delimiter_async(prefix=prefix)
return _transform_list_dir(coroutine, prefix)

async def getsize(self, key: str) -> int:
# docstring inherited
import obstore as obs

resp = await obs.head_async(self.store, key)
resp = await self.store.head_async(key)
return resp["size"]

async def getsize_prefix(self, prefix: str) -> int:
Expand Down Expand Up @@ -333,7 +368,7 @@ class _Response(TypedDict):


async def _make_bounded_requests(
store: _UpstreamObjectStore,
store: ObspecInput,
path: str,
requests: list[_BoundedRequest],
prototype: BufferPrototype,
Expand All @@ -345,12 +380,11 @@ async def _make_bounded_requests(
within a single file, and will e.g. merge concurrent requests. This only uses one
single Python coroutine.
"""
import obstore as obs

starts = [r["start"] for r in requests]
ends = [r["end"] for r in requests]
async with semaphore:
responses = await obs.get_ranges_async(store, path=path, starts=starts, ends=ends)
responses = await store.get_ranges_async(path=path, starts=starts, ends=ends)

buffer_responses: list[_Response] = []
for request, response in zip(requests, responses, strict=True):
Expand All @@ -365,7 +399,7 @@ async def _make_bounded_requests(


async def _make_other_request(
store: _UpstreamObjectStore,
store: ObspecInput,
request: _OtherRequest,
prototype: BufferPrototype,
semaphore: asyncio.Semaphore,
Expand All @@ -375,14 +409,13 @@ async def _make_other_request(
We return a `list[_Response]` for symmetry with `_make_bounded_requests` so that all
futures can be gathered together.
"""
import obstore as obs

async with semaphore:
if request["range"] is None:
resp = await obs.get_async(store, request["path"])
resp = await store.get_async(request["path"])
else:
resp = await obs.get_async(store, request["path"], options={"range": request["range"]})
buffer = await resp.bytes_async()
resp = await store.get_async(request["path"], options={"range": request["range"]})
buffer = await resp.buffer_async()

return [
{
Expand All @@ -393,7 +426,7 @@ async def _make_other_request(


async def _make_suffix_request(
store: _UpstreamObjectStore,
store: ObspecInput,
request: _SuffixRequest,
prototype: BufferPrototype,
semaphore: asyncio.Semaphore,
Expand All @@ -407,18 +440,19 @@ async def _make_suffix_request(
We return a `list[_Response]` for symmetry with `_make_bounded_requests` so that all
futures can be gathered together.
"""
import obstore as obs

async with semaphore:
try:
resp = await obs.get_async(store, request["path"], options={"range": request["range"]})
buffer = await resp.bytes_async()
except obs.exceptions.NotSupportedError:
head_resp = await obs.head_async(store, request["path"])
resp = await store.get_async(request["path"], options={"range": request["range"]})
buffer = await resp.buffer_async()
except Exception as e:
mapped_exc = map_exception(e)
if not isinstance(mapped_exc, NotSupportedError):
raise mapped_exc from None

head_resp = await store.head_async(request["path"])
file_size = head_resp["size"]
suffix_len = request["range"]["suffix"]
buffer = await obs.get_range_async(
store,
buffer = await store.get_range_async(
request["path"],
start=file_size - suffix_len,
length=suffix_len,
Expand All @@ -433,7 +467,7 @@ async def _make_suffix_request(


async def _get_partial_values(
store: _UpstreamObjectStore,
store: ObspecInput,
prototype: BufferPrototype,
key_ranges: Iterable[tuple[str, ByteRequest | None]],
) -> list[Buffer | None]:
Expand Down
Loading