From 2affa68361337e18c7836c71f59b2ae72ce0e9d4 Mon Sep 17 00:00:00 2001 From: Adam Newgas Date: Tue, 10 Feb 2026 14:05:50 +0000 Subject: [PATCH 1/3] Support with_read_only in LoggingStore and LatencyStore. Fixes #3699 --- src/zarr/experimental/cache_store.py | 7 +++- src/zarr/storage/_logging.py | 3 ++ src/zarr/storage/_wrapper.py | 13 ++++++- src/zarr/testing/store.py | 9 +++-- tests/test_store/test_latency.py | 57 ++++++++++++++++++++++++++++ tests/test_store/test_logging.py | 40 +++++++++++++++++++ 6 files changed, 123 insertions(+), 6 deletions(-) create mode 100644 tests/test_store/test_latency.py diff --git a/src/zarr/experimental/cache_store.py b/src/zarr/experimental/cache_store.py index 3456c94320..f1980e1444 100644 --- a/src/zarr/experimental/cache_store.py +++ b/src/zarr/experimental/cache_store.py @@ -4,7 +4,7 @@ import logging import time from collections import OrderedDict -from typing import TYPE_CHECKING, Any, Literal +from typing import TYPE_CHECKING, Any, Literal, Self from zarr.abc.store import ByteRequest, Store from zarr.storage._wrapper import WrapperStore @@ -120,6 +120,11 @@ def __init__( self._misses = 0 self._evictions = 0 + def _with_store(self, store: Store) -> Self: + # Cannot support this operation because it would share a cache, but have a new store + # So cache keys would conflict + raise NotImplementedError("CacheStore does not support this operation.") + def _is_key_fresh(self, key: str) -> bool: """Check if a cached key is still fresh based on max_age_seconds. diff --git a/src/zarr/storage/_logging.py b/src/zarr/storage/_logging.py index dd20d49ae5..98dca6b23d 100644 --- a/src/zarr/storage/_logging.py +++ b/src/zarr/storage/_logging.py @@ -77,6 +77,9 @@ def _default_handler(self) -> logging.Handler: ) return handler + def _with_store(self, store: T_Store) -> Self: + return type(self)(store=store, log_level=self.log_level, log_handler=self.log_handler) + @contextmanager def log(self, hint: Any = "") -> Generator[None, None, None]: """Context manager to log method calls diff --git a/src/zarr/storage/_wrapper.py b/src/zarr/storage/_wrapper.py index 64a5b2d83c..e8a2859abc 100644 --- a/src/zarr/storage/_wrapper.py +++ b/src/zarr/storage/_wrapper.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Generic, TypeVar +from typing import TYPE_CHECKING, Generic, TypeVar, cast if TYPE_CHECKING: from collections.abc import AsyncGenerator, AsyncIterator, Iterable @@ -31,14 +31,23 @@ class WrapperStore(Store, Generic[T_Store]): def __init__(self, store: T_Store) -> None: self._store = store + def _with_store(self, store: T_Store) -> Self: + """ + Constructs a new instance of the wrapper store with the same details but a new store. + """ + return type(self)(store=store) + @classmethod async def open(cls: type[Self], store_cls: type[T_Store], *args: Any, **kwargs: Any) -> Self: store = store_cls(*args, **kwargs) await store._open() return cls(store=store) + def with_read_only(self, read_only: bool = False) -> Self: + return self._with_store(cast(T_Store, self._store.with_read_only(read_only))) + def __enter__(self) -> Self: - return type(self)(self._store.__enter__()) + return self._with_store(self._store.__enter__()) def __exit__( self, diff --git a/src/zarr/testing/store.py b/src/zarr/testing/store.py index 5daf8284eb..aac730e8cf 100644 --- a/src/zarr/testing/store.py +++ b/src/zarr/testing/store.py @@ -4,7 +4,7 @@ import json import pickle from abc import abstractmethod -from typing import TYPE_CHECKING, Generic, TypeVar +from typing import TYPE_CHECKING, Generic, Self, TypeVar from zarr.storage import WrapperStore @@ -578,10 +578,13 @@ class LatencyStore(WrapperStore[Store]): get_latency: float set_latency: float - def __init__(self, cls: Store, *, get_latency: float = 0, set_latency: float = 0) -> None: + def __init__(self, store: Store, *, get_latency: float = 0, set_latency: float = 0) -> None: self.get_latency = float(get_latency) self.set_latency = float(set_latency) - self._store = cls + self._store = store + + def _with_store(self, store: Store) -> Self: + return type(self)(store, get_latency=self.get_latency, set_latency=self.set_latency) async def set(self, key: str, value: Buffer) -> None: """ diff --git a/tests/test_store/test_latency.py b/tests/test_store/test_latency.py new file mode 100644 index 0000000000..38ffb17dd6 --- /dev/null +++ b/tests/test_store/test_latency.py @@ -0,0 +1,57 @@ +from __future__ import annotations + +import pytest + +from zarr.core.buffer import default_buffer_prototype +from zarr.storage import MemoryStore +from zarr.testing.store import LatencyStore + + +async def test_latency_store_with_read_only_round_trip() -> None: + """ + Ensure that LatencyStore.with_read_only returns another LatencyStore with + the requested read_only state, preserves latency configuration, and does + not change the original wrapper. + """ + base = await MemoryStore.open() + # Start from a read-only underlying store + ro_base = base.with_read_only(read_only=True) + latency_ro = LatencyStore(ro_base, get_latency=0.01, set_latency=0.02) + + assert latency_ro.read_only + assert latency_ro.get_latency == pytest.approx(0.01) + assert latency_ro.set_latency == pytest.approx(0.02) + + buf = default_buffer_prototype().buffer.from_bytes(b"abcd") + + # Cannot write through the read-only wrapper + with pytest.raises( + ValueError, match="store was opened in read-only mode and does not support writing" + ): + await latency_ro.set("key", buf) + + # Create a writable wrapper from the read-only one + writer = latency_ro.with_read_only(read_only=False) + assert isinstance(writer, LatencyStore) + assert not writer.read_only + # Latency configuration is preserved + assert writer.get_latency == latency_ro.get_latency + assert writer.set_latency == latency_ro.set_latency + + # Writes via the writable wrapper succeed + await writer.set("key", buf) + out = await writer.get("key", prototype=default_buffer_prototype()) + assert out is not None + assert out.to_bytes() == buf.to_bytes() + + # Creating a read-only copy from the writable wrapper works and is enforced + reader = writer.with_read_only(read_only=True) + assert isinstance(reader, LatencyStore) + assert reader.read_only + with pytest.raises( + ValueError, match="store was opened in read-only mode and does not support writing" + ): + await reader.set("other", buf) + + # The original read-only wrapper remains read-only + assert latency_ro.read_only diff --git a/tests/test_store/test_logging.py b/tests/test_store/test_logging.py index fa566e45aa..96cd184938 100644 --- a/tests/test_store/test_logging.py +++ b/tests/test_store/test_logging.py @@ -86,6 +86,46 @@ def test_is_open_setter_raises(self, store: LoggingStore[LocalStore]) -> None: ): store._is_open = True + async def test_with_read_only_round_trip(self, local_store: LocalStore) -> None: + """ + Ensure that LoggingStore.with_read_only returns another LoggingStore with + the requested read_only state, preserves logging configuration, and does + not change the original store. + """ + # Start from a read-only underlying store + ro_store = local_store.with_read_only(read_only=True) + wrapped_ro = LoggingStore(store=ro_store, log_level="INFO") + assert wrapped_ro.read_only + + buf = default_buffer_prototype().buffer.from_bytes(b"0123") + + # Cannot write through the read-only wrapper + with pytest.raises( + ValueError, match="store was opened in read-only mode and does not support writing" + ): + await wrapped_ro.set("foo", buf) + + # Create a writable wrapper + writer = wrapped_ro.with_read_only(read_only=False) + assert isinstance(writer, LoggingStore) + assert not writer.read_only + # logging configuration is preserved + assert writer.log_level == wrapped_ro.log_level + assert writer.log_handler == wrapped_ro.log_handler + + # Writes via the writable wrapper succeed + await writer.set("foo", buf) + out = await writer.get("foo", prototype=default_buffer_prototype()) + assert out is not None + assert out.to_bytes() == buf.to_bytes() + + # The original wrapper remains read-only + assert wrapped_ro.read_only + with pytest.raises( + ValueError, match="store was opened in read-only mode and does not support writing" + ): + await wrapped_ro.set("bar", buf) + @pytest.mark.parametrize("store", ["local", "memory", "zip"], indirect=["store"]) async def test_logging_store(store: Store, caplog: pytest.LogCaptureFixture) -> None: From 9d08e8a1f84f632dc01faf8013b3a290ab4b0163 Mon Sep 17 00:00:00 2001 From: Adam Newgas Date: Tue, 10 Feb 2026 14:34:01 +0000 Subject: [PATCH 2/3] Support CacheStore.with_read_only --- src/zarr/experimental/cache_store.py | 125 +++++++++++--------- tests/test_experimental/test_cache_store.py | 76 ++++++++++-- 2 files changed, 138 insertions(+), 63 deletions(-) diff --git a/src/zarr/experimental/cache_store.py b/src/zarr/experimental/cache_store.py index f1980e1444..956329e33e 100644 --- a/src/zarr/experimental/cache_store.py +++ b/src/zarr/experimental/cache_store.py @@ -15,6 +15,16 @@ from zarr.core.buffer.core import Buffer, BufferPrototype +class _CacheState: + _cache_order: OrderedDict[str, None] # Track access order for LRU + _current_size: int # Track current cache size + _key_sizes: dict[str, int] # Track size of each cached key + _lock: asyncio.Lock + _hits: int # Cache hit counter + _misses: int # Cache miss counter + _evictions: int # Cache eviction counter + + class CacheStore(WrapperStore[Store]): """ A dual-store caching implementation for Zarr stores. @@ -71,13 +81,7 @@ class CacheStore(WrapperStore[Store]): max_size: int | None key_insert_times: dict[str, float] cache_set_data: bool - _cache_order: OrderedDict[str, None] # Track access order for LRU - _current_size: int # Track current cache size - _key_sizes: dict[str, int] # Track size of each cached key - _lock: asyncio.Lock - _hits: int # Cache hit counter - _misses: int # Cache miss counter - _evictions: int # Cache eviction counter + _state: _CacheState def __init__( self, @@ -107,24 +111,39 @@ def __init__( else: self.max_age_seconds = max_age_seconds self.max_size = max_size + self.cache_set_data = cache_set_data + self._state = _CacheState() + if key_insert_times is None: self.key_insert_times = {} else: self.key_insert_times = key_insert_times - self.cache_set_data = cache_set_data - self._cache_order = OrderedDict() - self._current_size = 0 - self._key_sizes = {} - self._lock = asyncio.Lock() - self._hits = 0 - self._misses = 0 - self._evictions = 0 + self._state._cache_order = OrderedDict() + self._state._current_size = 0 + self._state._key_sizes = {} + self._state._lock = asyncio.Lock() + self._state._hits = 0 + self._state._misses = 0 + self._state._evictions = 0 def _with_store(self, store: Store) -> Self: # Cannot support this operation because it would share a cache, but have a new store # So cache keys would conflict raise NotImplementedError("CacheStore does not support this operation.") + def with_read_only(self, read_only: bool = False) -> Self: + # Create a new cache store that shares the same cache and mutable state + store = type(self)( + store=self._store.with_read_only(read_only), + cache_store=self._cache, + max_age_seconds=self.max_age_seconds, + max_size=self.max_size, + key_insert_times=self.key_insert_times, + cache_set_data=self.cache_set_data, + ) + store._state = self._state + return store + def _is_key_fresh(self, key: str) -> bool: """Check if a cached key is still fresh based on max_age_seconds. @@ -145,9 +164,9 @@ async def _accommodate_value(self, value_size: int) -> None: return # Remove least recently used items until we have enough space - while self._current_size + value_size > self.max_size and self._cache_order: + while self._state._current_size + value_size > self.max_size and self._state._cache_order: # Get the least recently used key (first in OrderedDict) - lru_key = next(iter(self._cache_order)) + lru_key = next(iter(self._state._cache_order)) await self._evict_key(lru_key) async def _evict_key(self, key: str) -> None: @@ -157,15 +176,15 @@ async def _evict_key(self, key: str) -> None: Updates size tracking atomically with deletion. """ try: - key_size = self._key_sizes.get(key, 0) + key_size = self._state._key_sizes.get(key, 0) # Delete from cache store await self._cache.delete(key) # Update tracking after successful deletion self._remove_from_tracking(key) - self._current_size = max(0, self._current_size - key_size) - self._evictions += 1 + self._state._current_size = max(0, self._state._current_size - key_size) + self._state._evictions += 1 logger.debug("_evict_key: evicted key %s, freed %d bytes", key, key_size) except Exception: @@ -188,39 +207,39 @@ async def _cache_value(self, key: str, value: Buffer) -> None: ) return - async with self._lock: + async with self._state._lock: # If key already exists, subtract old size first - if key in self._key_sizes: - old_size = self._key_sizes[key] - self._current_size -= old_size + if key in self._state._key_sizes: + old_size = self._state._key_sizes[key] + self._state._current_size -= old_size logger.debug("_cache_value: updating existing key %s, old size %d", key, old_size) # Make room for the new value (this calls _evict_key_locked internally) await self._accommodate_value(value_size) # Update tracking atomically - self._cache_order[key] = None # OrderedDict to track access order - self._current_size += value_size - self._key_sizes[key] = value_size + self._state._cache_order[key] = None # OrderedDict to track access order + self._state._current_size += value_size + self._state._key_sizes[key] = value_size self.key_insert_times[key] = time.monotonic() logger.debug("_cache_value: cached key %s with size %d bytes", key, value_size) async def _update_access_order(self, key: str) -> None: """Update the access order for LRU tracking.""" - if key in self._cache_order: - async with self._lock: + if key in self._state._cache_order: + async with self._state._lock: # Move to end (most recently used) - self._cache_order.move_to_end(key) + self._state._cache_order.move_to_end(key) def _remove_from_tracking(self, key: str) -> None: """Remove a key from all tracking structures. - Must be called while holding self._lock. + Must be called while holding self._state._lock. """ - self._cache_order.pop(key, None) + self._state._cache_order.pop(key, None) self.key_insert_times.pop(key, None) - self._key_sizes.pop(key, None) + self._state._key_sizes.pop(key, None) async def _get_try_cache( self, key: str, prototype: BufferPrototype, byte_range: ByteRequest | None = None @@ -229,7 +248,7 @@ async def _get_try_cache( maybe_cached_result = await self._cache.get(key, prototype, byte_range) if maybe_cached_result is not None: logger.debug("_get_try_cache: key %s found in cache (HIT)", key) - self._hits += 1 + self._state._hits += 1 # Update access order for LRU await self._update_access_order(key) return maybe_cached_result @@ -237,12 +256,12 @@ async def _get_try_cache( logger.debug( "_get_try_cache: key %s not found in cache (MISS), fetching from store", key ) - self._misses += 1 + self._state._misses += 1 maybe_fresh_result = await super().get(key, prototype, byte_range) if maybe_fresh_result is None: # Key doesn't exist in source store await self._cache.delete(key) - async with self._lock: + async with self._state._lock: self._remove_from_tracking(key) else: # Cache the newly fetched value @@ -254,12 +273,12 @@ async def _get_no_cache( self, key: str, prototype: BufferPrototype, byte_range: ByteRequest | None = None ) -> Buffer | None: """Get data directly from source store and update cache.""" - self._misses += 1 + self._state._misses += 1 maybe_fresh_result = await super().get(key, prototype, byte_range) if maybe_fresh_result is None: # Key doesn't exist in source, remove from cache and tracking await self._cache.delete(key) - async with self._lock: + async with self._state._lock: self._remove_from_tracking(key) else: logger.debug("_get_no_cache: key %s found in store, setting in cache", key) @@ -317,7 +336,7 @@ async def set(self, key: str, value: Buffer) -> None: else: logger.debug("set: deleting key %s from cache", key) await self._cache.delete(key) - async with self._lock: + async with self._state._lock: self._remove_from_tracking(key) async def delete(self, key: str) -> None: @@ -333,7 +352,7 @@ async def delete(self, key: str) -> None: await super().delete(key) logger.debug("delete: deleting key %s from cache", key) await self._cache.delete(key) - async with self._lock: + async with self._state._lock: self._remove_from_tracking(key) def cache_info(self) -> dict[str, Any]: @@ -344,20 +363,20 @@ def cache_info(self) -> dict[str, Any]: if self.max_age_seconds == "infinity" else self.max_age_seconds, "max_size": self.max_size, - "current_size": self._current_size, + "current_size": self._state._current_size, "cache_set_data": self.cache_set_data, "tracked_keys": len(self.key_insert_times), - "cached_keys": len(self._cache_order), + "cached_keys": len(self._state._cache_order), } def cache_stats(self) -> dict[str, Any]: """Return cache performance statistics.""" - total_requests = self._hits + self._misses - hit_rate = self._hits / total_requests if total_requests > 0 else 0.0 + total_requests = self._state._hits + self._state._misses + hit_rate = self._state._hits / total_requests if total_requests > 0 else 0.0 return { - "hits": self._hits, - "misses": self._misses, - "evictions": self._evictions, + "hits": self._state._hits, + "misses": self._state._misses, + "evictions": self._state._evictions, "total_requests": total_requests, "hit_rate": hit_rate, } @@ -369,11 +388,11 @@ async def clear_cache(self) -> None: await self._cache.clear() # Reset tracking - async with self._lock: + async with self._state._lock: self.key_insert_times.clear() - self._cache_order.clear() - self._key_sizes.clear() - self._current_size = 0 + self._state._cache_order.clear() + self._state._key_sizes.clear() + self._state._current_size = 0 logger.debug("clear_cache: cleared all cache data") def __repr__(self) -> str: @@ -384,6 +403,6 @@ def __repr__(self) -> str: f"cache_store={self._cache!r}, " f"max_age_seconds={self.max_age_seconds}, " f"max_size={self.max_size}, " - f"current_size={self._current_size}, " - f"cached_keys={len(self._cache_order)})" + f"current_size={self._state._current_size}, " + f"cached_keys={len(self._state._cache_order)})" ) diff --git a/tests/test_experimental/test_cache_store.py b/tests/test_experimental/test_cache_store.py index d4a45f78f1..1f2a45c816 100644 --- a/tests/test_experimental/test_cache_store.py +++ b/tests/test_experimental/test_cache_store.py @@ -32,6 +32,60 @@ def cached_store(self, source_store: Store, cache_store: Store) -> CacheStore: """Create a cached store instance.""" return CacheStore(source_store, cache_store=cache_store, key_insert_times={}) + async def test_with_read_only_round_trip(self) -> None: + """ + Ensure that CacheStore.with_read_only returns another CacheStore with + the requested read_only state, shares cache state, and does not change + the original store's read_only flag. + """ + source = MemoryStore() + cache = MemoryStore() + + # Start from a read-only underlying store + source_ro = source.with_read_only(read_only=True) + cached_ro = CacheStore(store=source_ro, cache_store=cache, key_insert_times={}) + assert cached_ro.read_only + + buf = CPUBuffer.from_bytes(b"0123") + + # Cannot write through the read-only cache store + with pytest.raises( + ValueError, match="store was opened in read-only mode and does not support writing" + ): + await cached_ro.set("foo", buf) + + # Create a writable cache store from the read-only one + writer = cached_ro.with_read_only(read_only=False) + assert isinstance(writer, CacheStore) + assert not writer.read_only + + # Cache configuration and state are shared + assert writer._cache is cached_ro._cache + assert writer._state is cached_ro._state + assert writer.key_insert_times is cached_ro.key_insert_times + + # Writes via the writable cache store succeed and are cached + await writer.set("foo", buf) + out = await writer.get("foo", default_buffer_prototype()) + assert out is not None + assert out.to_bytes() == buf.to_bytes() + + # The original cache store remains read-only + assert cached_ro.read_only + with pytest.raises( + ValueError, match="store was opened in read-only mode and does not support writing" + ): + await cached_ro.set("bar", buf) + + # Creating a read-only copy from the writable cache store works and is enforced + reader = writer.with_read_only(read_only=True) + assert isinstance(reader, CacheStore) + assert reader.read_only + with pytest.raises( + ValueError, match="store was opened in read-only mode and does not support writing" + ): + await reader.set("baz", buf) + async def test_basic_caching(self, cached_store: CacheStore, source_store: Store) -> None: """Test basic cache functionality.""" # Store some data @@ -519,7 +573,7 @@ async def test_evict_key_exception_handling(self) -> None: # Manually corrupt the tracking to trigger exception # Remove from one structure but not others to create inconsistency - del cached_store._cache_order["test_key"] + del cached_store._state._cache_order["test_key"] # Try to evict - should handle the KeyError gracefully await cached_store._evict_key("test_key") @@ -540,7 +594,7 @@ async def test_get_no_cache_delete_tracking(self) -> None: await cached_store._cache_value("phantom_key", test_data) # Verify it's in tracking - assert "phantom_key" in cached_store._cache_order + assert "phantom_key" in cached_store._state._cache_order assert "phantom_key" in cached_store.key_insert_times # Now try to get it - since it's not in source, should clean up tracking @@ -548,7 +602,7 @@ async def test_get_no_cache_delete_tracking(self) -> None: assert result is None # Should have cleaned up tracking - assert "phantom_key" not in cached_store._cache_order + assert "phantom_key" not in cached_store._state._cache_order assert "phantom_key" not in cached_store.key_insert_times async def test_accommodate_value_no_max_size(self) -> None: @@ -609,7 +663,9 @@ async def set_large(key: str) -> None: # Size should be consistent with tracked keys assert info["current_size"] <= 200 # Might pass # But verify actual cache store size matches tracking - total_size = sum(cached_store._key_sizes.get(k, 0) for k in cached_store._cache_order) + total_size = sum( + cached_store._state._key_sizes.get(k, 0) for k in cached_store._state._cache_order + ) assert total_size == info["current_size"] # WOULD FAIL async def test_concurrent_get_and_evict(self) -> None: @@ -638,7 +694,7 @@ async def write_key() -> None: # Verify consistency info = cached_store.cache_info() assert info["current_size"] <= 100 - assert len(cached_store._cache_order) == len(cached_store._key_sizes) + assert len(cached_store._state._cache_order) == len(cached_store._state._key_sizes) async def test_eviction_actually_deletes_from_cache_store(self) -> None: """Test that eviction removes keys from cache_store, not just tracking.""" @@ -659,8 +715,8 @@ async def test_eviction_actually_deletes_from_cache_store(self) -> None: await cached_store.set("key2", data2) # Check tracking - key1 should be removed - assert "key1" not in cached_store._cache_order - assert "key1" not in cached_store._key_sizes + assert "key1" not in cached_store._state._cache_order + assert "key1" not in cached_store._state._key_sizes # CRITICAL: key1 should also be removed from cache_store assert not await cache_store.exists("key1"), ( @@ -733,13 +789,13 @@ async def test_all_tracked_keys_exist_in_cache_store(self) -> None: await cached_store.set(f"key_{i}", data) # Every key in tracking should exist in cache_store - for key in cached_store._cache_order: + for key in cached_store._state._cache_order: assert await cache_store.exists(key), ( f"Key '{key}' is tracked but doesn't exist in cache_store" ) # Every key in _key_sizes should exist in cache_store - for key in cached_store._key_sizes: + for key in cached_store._state._key_sizes: assert await cache_store.exists(key), ( f"Key '{key}' has size tracked but doesn't exist in cache_store" ) @@ -778,7 +834,7 @@ async def failing_delete(key: str) -> None: # Attempt to evict should raise the exception with pytest.raises(RuntimeError, match="Simulated cache deletion failure"): - async with cached_store._lock: + async with cached_store._state._lock: await cached_store._evict_key("test_key") async def test_cache_stats_method(self) -> None: From d62cf8ad18557f48a97fca2db798327e1da5f3a7 Mon Sep 17 00:00:00 2001 From: Adam Newgas Date: Tue, 10 Feb 2026 15:10:58 +0000 Subject: [PATCH 3/3] Add entry to changes/ --- changes/3700.bugfix.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/3700.bugfix.md diff --git a/changes/3700.bugfix.md b/changes/3700.bugfix.md new file mode 100644 index 0000000000..86acb71d0e --- /dev/null +++ b/changes/3700.bugfix.md @@ -0,0 +1 @@ +CacheStore, LoggingStore and LatencyStore now support with_read_only. \ No newline at end of file