diff --git a/kv_cache_benchmark/MLperf v3 KV cache proposal.md b/kv_cache_benchmark/docs/MLperf v3 KV cache proposal.md similarity index 100% rename from kv_cache_benchmark/MLperf v3 KV cache proposal.md rename to kv_cache_benchmark/docs/MLperf v3 KV cache proposal.md diff --git a/kv_cache_benchmark/sources.md b/kv_cache_benchmark/docs/sources.md similarity index 100% rename from kv_cache_benchmark/sources.md rename to kv_cache_benchmark/docs/sources.md diff --git a/kv_cache_benchmark/kv_cache/backends.py b/kv_cache_benchmark/kv_cache/backends.py index 06f660cf..cd133e59 100755 --- a/kv_cache_benchmark/kv_cache/backends.py +++ b/kv_cache_benchmark/kv_cache/backends.py @@ -316,10 +316,8 @@ def read(self, key: str) -> Tuple[np.ndarray, StorageBackend.IOTiming]: def delete(self, key: str): path = self._get_path(key) - if path.exists(): - path.unlink() - if key in self.metadata: - del self.metadata[key] + path.unlink(missing_ok=True) + self.metadata.pop(key, None) def clear(self): """Deletes all .npy files from the cache directory.""" diff --git a/kv_cache_benchmark/kv_cache/benchmark.py b/kv_cache_benchmark/kv_cache/benchmark.py index f80ad070..08328591 100755 --- a/kv_cache_benchmark/kv_cache/benchmark.py +++ b/kv_cache_benchmark/kv_cache/benchmark.py @@ -792,6 +792,7 @@ def _run_preconditioning(self): state = {'written_bytes': 0, 'seq': 0, 'last_report': 0} def worker(): + consecutive_failures = 0 while True: with lock: if state['written_bytes'] >= target_bytes: @@ -803,6 +804,7 @@ def worker(): success, tier, latency = self.cache.allocate_cache(key, tokens_per_entry) if success: + consecutive_failures = 0 entry = self.cache.cache_entries.get(key) if entry: with lock: @@ -811,6 +813,13 @@ def worker(): if gb_written - state['last_report'] >= 10: print(f" Preconditioning progress: {gb_written:.1f} / {target_gb:.1f} GB") state['last_report'] = gb_written + else: + consecutive_failures += 1 + if consecutive_failures > 50: + with lock: + print(f" WARNING: Preconditioning stalled at {state['written_bytes']/1024**3:.1f} GB — filesystem full. Continuing.") + return + time.sleep(0.1) with ThreadPoolExecutor(max_workers=num_threads) as executor: futures = [executor.submit(worker) for _ in range(num_threads)] diff --git a/kv_cache_benchmark/kv_cache/cache.py b/kv_cache_benchmark/kv_cache/cache.py index 94ab686a..28bfd121 100755 --- a/kv_cache_benchmark/kv_cache/cache.py +++ b/kv_cache_benchmark/kv_cache/cache.py @@ -5,9 +5,9 @@ and MultiTierCache (3-tier LRU cache with waterfall eviction). """ +import os import time import hashlib -import shutil import logging import threading from typing import Dict, List, Optional, Tuple @@ -137,7 +137,8 @@ def __init__(self, else: try: nvme_base = self.backends['nvme'].base_path - self.nvme_memory_limit = float(shutil.disk_usage(nvme_base).free) + st = os.statvfs(str(nvme_base)) + self.nvme_memory_limit = float(st.f_bavail * st.f_frsize) * 0.95 except Exception: self.nvme_memory_limit = float('inf') @@ -322,88 +323,190 @@ def _ensure_space_in_tier(self, tier: str, required_bytes: int, recursion_depth: if next_tier is None and tier != 'nvme': return False + # When NVMe is the terminal tier (no tier after it), the entry MUST + # be written here — relax capacity guards and evict to full limit. + is_last_tier = (next_tier is None) + limit = self._get_tier_limit(tier) target_usage_ratio = cfg('eviction', 'target_usage_ratio', default=0.8) target_usage = limit * target_usage_ratio large_entry_limit_ratio = cfg('eviction', 'large_entry_limit_ratio', default=0.95) - if required_bytes > limit * large_entry_limit_ratio: + # Only reject oversized entries on non-terminal tiers (they can cascade). + # On the last tier, we must accommodate the entry regardless of size. + if not is_last_tier and required_bytes > limit * large_entry_limit_ratio: return False - entries_in_tier = len(self._get_lru_entries_in_tier(tier)) + # On the last tier, evict to full capacity (not 80%) since there's + # no next tier that needs a buffer for cascading entries. + effective_target = limit if is_last_tier else target_usage + + # ──────────────────────────────────────────────────────────────── + # SNAPSHOT-BASED LRU EVICTION + # + # Performance context: + # _get_lru_entries_in_tier() copies every entry in cache_entries + # that belongs to this tier, then sorts by last_access time. + # At 15 TB with 60k entries, that's ~60k dict copies + sort. + # + # Old behavior (O(n²)): + # The while loop called _get_lru_entries_in_tier() on EVERY + # iteration, but only used lru_entries[0] — the single oldest + # entry. Evicting 100 entries meant 100 full scans. + # + # New behavior (O(n)): + # Take ONE sorted snapshot before the loop. Walk through it + # with an index. Each entry is either: + # - Still valid → evict it (delete or demote) + # - Already gone (another thread got it) → skip, advance index + # If we exhaust the snapshot without freeing enough space, + # refresh it ONCE (new entries may have been written since the + # snapshot). Worst case: 2 scans instead of thousands. + # + # Why stale snapshots are safe: + # - DELETE path: the existence check under metadata_lock already + # skips entries that another thread evicted. A stale snapshot + # just means we hit more skips — no double-decrement. + # - DEMOTE path: _demote_entry() checks that the entry still + # exists in from_tier before moving it. If it's gone, it + # returns False and we advance to the next entry. + # - New entries added after the snapshot are NEWER than + # everything in it (higher last_access time), so LRU order + # says evict them last. Not including them is correct. + # + # Impact on MLPerf metrics: + # Storage device latencies (write_device_p50, read_device_p50) + # are timed INSIDE the backend — after eviction has already + # freed space. This optimization only reduces the untimed CPU + # overhead between I/O operations. Throughput (req/s) improves + # because the benchmark can push I/O faster; device-level + # numbers stay the same. + # ──────────────────────────────────────────────────────────────── + + lru_entries = self._get_lru_entries_in_tier(tier) + lru_idx = 0 + max_evictions_hard_cap = cfg('eviction', 'max_evictions_hard_cap', default=5000) max_evictions_min = cfg('eviction', 'max_evictions_min', default=1000) - max_evictions_per_call = min(max_evictions_hard_cap, max(max_evictions_min, entries_in_tier + 100)) + max_evictions_per_call = min(max_evictions_hard_cap, max(max_evictions_min, len(lru_entries) + 100)) eviction_count = 0 while eviction_count < max_evictions_per_call: + # ── Check 1: Is there already enough space? ── with self.memory_lock: current_usage = self._get_tier_usage(tier) - if current_usage + required_bytes <= target_usage: + if current_usage + required_bytes <= effective_target: self._update_tier_usage(tier, required_bytes) return True - if current_usage < limit * 0.05 and required_bytes <= limit * large_entry_limit_ratio: + # Near-empty tier: usage tracking may have drifted from + # accumulated rounding. Trust it and allow the write. + if current_usage < limit * 0.05: self._update_tier_usage(tier, required_bytes) return True - lru_entries = self._get_lru_entries_in_tier(tier) - - if not lru_entries: - with self.metadata_lock: - actual_usage = sum( - entry['size'] for entry in self.cache_entries.values() - if entry['location'] == tier - ) - with self.memory_lock: - if tier == 'gpu': - self.gpu_memory_used = actual_usage - elif tier == 'cpu': - self.cpu_memory_used = actual_usage - elif tier == 'nvme': - self.nvme_memory_used = actual_usage + # ── Check 2: Advance through the LRU snapshot ── + # If we've walked past the end of the snapshot, try one + # refresh — concurrent threads may have evicted most of our + # snapshot, or new entries may have landed in this tier. + if lru_idx >= len(lru_entries): + lru_entries = self._get_lru_entries_in_tier(tier) + lru_idx = 0 + + if not lru_entries: + # Tier is truly empty. Recount actual usage from + # cache_entries to correct any drift, then decide. + with self.metadata_lock: + actual_usage = sum( + entry['size'] for entry in self.cache_entries.values() + if entry['location'] == tier + ) + with self.memory_lock: + if tier == 'gpu': + self.gpu_memory_used = actual_usage + elif tier == 'cpu': + self.cpu_memory_used = actual_usage + elif tier == 'nvme': + self.nvme_memory_used = actual_usage - with self.memory_lock: - current_usage = self._get_tier_usage(tier) - if current_usage + required_bytes <= target_usage: - self._update_tier_usage(tier, required_bytes) + with self.memory_lock: + current_usage = self._get_tier_usage(tier) + if current_usage + required_bytes <= effective_target: + self._update_tier_usage(tier, required_bytes) + return True + + # Last tier with nothing left to evict — allow the + # write and let the OS enforce disk space. + if is_last_tier: + with self.memory_lock: + self._update_tier_usage(tier, required_bytes) return True - return False + return False - total_size_in_tier = sum(e['size'] for _, e in lru_entries) - if total_size_in_tier < limit * 0.2 and required_bytes > target_usage * 0.5: - return False + # On non-terminal tiers, bail out if there's little data to + # evict relative to what we need. On the last tier, keep + # going — there's nowhere else to send the entry. + # (Only check on first pass through the snapshot to avoid + # re-summing on every iteration.) + if lru_idx == 0 and not is_last_tier: + total_size_in_tier = sum(e['size'] for _, e in lru_entries) + if total_size_in_tier < limit * 0.2 and required_bytes > target_usage * 0.5: + return False - lru_key, lru_entry = lru_entries[0] + # ── Pick the next LRU entry from the snapshot ── + lru_key, lru_entry = lru_entries[lru_idx] lru_size = lru_entry['size'] + lru_idx += 1 + # ── Evict: DELETE (terminal tier) or DEMOTE (non-terminal) ── if next_tier is None and tier == 'nvme': + # Terminal tier: delete the .npy file from disk. + # The existence check prevents double-decrementing when + # multiple threads race on the same stale snapshot entry. entry_lock = self._get_entry_lock(lru_key) with entry_lock: + with self.metadata_lock: + existing = self.cache_entries.get(lru_key) + if existing is None or existing['location'] != 'nvme': + # Another thread already evicted this entry. + # Safe to skip — just advance to the next one. + eviction_count += 1 + continue + actual_size = existing['size'] + del self.cache_entries[lru_key] + self.entry_locks.pop(lru_key, None) try: self.backends['nvme'].delete(lru_key) except Exception as e: logger.warning(f"Failed to delete NVMe entry {lru_key}: {e}") - with self.metadata_lock: - self.cache_entries.pop(lru_key, None) with self.memory_lock: - self.nvme_memory_used = max(0, self.nvme_memory_used - lru_size) + self.nvme_memory_used = max(0, self.nvme_memory_used - actual_size) with self.stats_lock: self.stats['evictions'] += 1 else: + # Non-terminal tier: demote entry to the next tier down. + # Recursively ensure space in next_tier first. if not self._ensure_space_in_tier(next_tier, lru_size, recursion_depth + 1): logger.warning(f"Could not make space in {next_tier} for demotion") return False success, _ = self._demote_entry(lru_key, tier, next_tier) if not success: - # Entry may have been deleted/moved by another thread; skip to next + # Entry was deleted/moved by another thread between + # the snapshot and now. Skip to the next one. eviction_count += 1 continue eviction_count += 1 + # Exhausted eviction budget. On the last tier, allow the write + # anyway — we've freed as much as we can. + if is_last_tier: + with self.memory_lock: + self._update_tier_usage(tier, required_bytes) + return True + return False def allocate_cache(self, key: str, num_tokens: int, phase: InferencePhase = InferencePhase.PREFILL) -> Tuple[bool, str, float]: @@ -451,6 +554,8 @@ def _allocate_cache_inner(self, key: str, num_tokens: int, phase: InferencePhase if allocated_tier is None: logger.warning("All tiers full — eviction could not free space, forcing write to NVMe") allocated_tier = 'nvme' + with self.memory_lock: + self._update_tier_usage('nvme', size_bytes) try: if allocated_tier == 'gpu': diff --git a/kv_cache_benchmark/tests/test_kv_cache.py b/kv_cache_benchmark/tests/test_kv_cache.py index f5d44759..479a1dad 100644 --- a/kv_cache_benchmark/tests/test_kv_cache.py +++ b/kv_cache_benchmark/tests/test_kv_cache.py @@ -431,12 +431,11 @@ def test_kv_cache_size_formula(self, llama8b_config): 2 * llama8b_config.bytes_per_element) assert llama8b_config.kv_cache_size_per_token == expected - def test_all_five_model_configs_exist(self): - assert len(MODEL_CONFIGS) == 5 + def test_all_nine_model_configs_exist(self): + assert len(MODEL_CONFIGS) == 9 @pytest.mark.parametrize("model_name", [ - 'tiny-1b', 'mistral-7b', 'llama2-7b', 'llama3.1-8b', 'llama3.1-70b-instruct' - ]) + 'tiny-1b', 'mistral-7b', 'llama2-7b', 'llama3.1-8b', 'llama3.1-70b-instruct', 'deepseek-v3', 'qwen3-32b', 'gpt-oss-120b', 'gpt-oss-20b']) def test_model_config_exists(self, model_name): assert model_name in MODEL_CONFIGS @@ -2268,9 +2267,1367 @@ def test_eviction_lifecycle(self): # ============================================================================= -# Test: Bottleneck Profiling +# Test: 3-Tier Eviction Cascade (GPU → CPU → NVMe → Delete) # ============================================================================= +class TestThreeTierEvictionCascade: + """ + Tests that eviction cascades correctly through all three tiers: + GPU → CPU → NVMe → delete + + Since we have no real GPU, we inject a CPUMemoryBackend as a fake GPU + backend. This exercises the full _ensure_space_in_tier recursive path: + depth 0: GPU is full → demote LRU to CPU + depth 1: CPU is full → demote LRU to NVMe + depth 2: NVMe is full → delete LRU from disk + """ + + @pytest.fixture + def tiny_model_config(self): + return MODEL_CONFIGS['tiny-1b'] + + def test_full_cascade_gpu_to_cpu_to_nvme_to_delete(self, tiny_model_config): + """ + Fill all three tiers, then allocate one more entry. + Expect the cascade: + 1. GPU evicts its LRU to CPU (demote) + 2. CPU is full, so CPU evicts its LRU to NVMe (demote) + 3. NVMe is full, so NVMe deletes its LRU from disk (delete) + 4. New entry lands on GPU + """ + # --- Setup --- + # Tiny-1b: ~24KB per token, 10 tokens ≈ 240KB per entry. + # GPU: 2 MB → fits ~8 entries + # CPU: 2 MB → fits ~8 entries + # NVMe: 2 MB → fits ~8 entries + # Total across all tiers: ~24 entries before disk deletes start. + gpu_mb = 2 + cpu_mb = 2 + nvme_mb = 2 + tokens_per_entry = 10 + + cache = MultiTierCache( + model_config=tiny_model_config, + gpu_memory_gb=0, # we'll fake the GPU below + cpu_memory_gb=cpu_mb / 1024, + seed=42, + storage_capacity_gb=nvme_mb / 1024, + ) + + # Inject a fake GPU backend (CPUMemoryBackend in disguise) + cache.backends['gpu'] = CPUMemoryBackend() + cache.gpu_memory_limit = gpu_mb * 1024 * 1024 # 2 MB + + # --- Phase 1: Fill GPU --- + print("\n === Phase 1: Filling GPU ===") + gpu_keys = [] + for i in range(50): + key = f"gpu_fill_{i}" + success, tier, _ = cache.allocate_cache(key, num_tokens=tokens_per_entry) + assert success, f"Allocation {i} should succeed" + if tier == 'gpu': + gpu_keys.append(key) + print(f" [{i:3d}] key={key:<20s} → tier={tier} " + f"(GPU={cache.gpu_memory_used/1024:.0f}KB " + f"CPU={cache.cpu_memory_used/1024:.0f}KB " + f"NVMe={cache.nvme_memory_used/1024:.0f}KB)") + + # --- Phase 2: Verify entries exist on all three tiers --- + gpu_entries = [k for k, v in cache.cache_entries.items() if v['location'] == 'gpu'] + cpu_entries = [k for k, v in cache.cache_entries.items() if v['location'] == 'cpu'] + nvme_entries = [k for k, v in cache.cache_entries.items() if v['location'] == 'nvme'] + + print(f"\n === Phase 2: Tier distribution ===") + print(f" GPU entries: {len(gpu_entries)}") + print(f" CPU entries: {len(cpu_entries)}") + print(f" NVMe entries: {len(nvme_entries)}") + print(f" Total in cache_entries: {len(cache.cache_entries)}") + print(f" Evictions: {cache.stats['evictions']}") + print(f" Offloads to CPU: {cache.stats['offloads_cpu']}") + print(f" Offloads to storage: {cache.stats['offloads_storage']}") + + # With 50 entries and ~24 capacity, evictions must have happened + assert cache.stats['evictions'] > 0, \ + "Evictions should have occurred with 50 entries across 6MB total" + + # CPU demotion must have occurred (GPU → CPU) + assert cache.stats['offloads_cpu'] > 0, \ + "At least one GPU → CPU demotion should have occurred" + + # NVMe demotion must have occurred (CPU → NVMe) + assert cache.stats['offloads_storage'] > 0, \ + "At least one CPU → NVMe demotion should have occurred" + + # --- Phase 3: Verify early keys were deleted from all tiers --- + # With 50 entries and ~24 capacity, about half should be gone + total_entries = len(cache.cache_entries) + deleted_count = 50 - total_entries + print(f"\n === Phase 3: Deletion check ===") + print(f" Entries remaining: {total_entries}") + print(f" Entries deleted: {deleted_count}") + + assert deleted_count > 0, \ + f"Some entries should have been deleted from NVMe. " \ + f"Total remaining: {total_entries}/50" + + # --- Phase 4: Verify .npy files are actually deleted from disk --- + nvme_dir = cache.backends['nvme'].base_path + npy_files = list(nvme_dir.glob("*.npy")) + print(f"\n === Phase 4: Disk file check ===") + print(f" .npy files on disk: {len(npy_files)}") + print(f" NVMe entries in metadata: {len(nvme_entries)}") + + # Files on disk should roughly match entries in cache_entries with location='nvme' + # Some tolerance for timing, but there shouldn't be orphaned files + assert len(npy_files) <= len(nvme_entries) + 2, \ + f"Orphaned .npy files: {len(npy_files)} on disk vs {len(nvme_entries)} tracked" + + # --- Phase 5: Allocate one more and verify it still works --- + print(f"\n === Phase 5: Post-cascade allocation ===") + success, tier, _ = cache.allocate_cache("final_entry", num_tokens=tokens_per_entry) + print(f" final_entry → tier={tier}, success={success}") + assert success, "Allocation after full cascade should still succeed" + + def test_demote_path_preserves_data(self, tiny_model_config): + """ + Verify that data survives the full demotion chain: + GPU → CPU → NVMe + Read the entry back from NVMe and confirm it's the same data. + + Note: access_cache() returns (location, latency), not data. + To verify data integrity, we read directly from the backend. + """ + cache = MultiTierCache( + model_config=tiny_model_config, + gpu_memory_gb=0, + cpu_memory_gb=0.5 / 1024, # 0.5 MB CPU + seed=42, + storage_capacity_gb=10.0 / 1024, # 10 MB NVMe (plenty of room) + ) + + # Inject fake GPU: 0.5 MB + cache.backends['gpu'] = CPUMemoryBackend() + cache.gpu_memory_limit = int(0.5 * 1024 * 1024) + + # Write one entry to GPU + key = "preserve_test" + success, tier, _ = cache.allocate_cache(key, num_tokens=10) + assert success + print(f"\n Initial allocation: tier={tier}") + + # Read raw data from the backend while it's on the initial tier + original_data, _ = cache.backends[tier].read(key) + print(f" Original data shape: {original_data.shape}, sum: {np.sum(original_data):.4f}") + + # Fill GPU to force demotion to CPU + print(" Filling GPU to force demotion...") + for i in range(20): + cache.allocate_cache(f"push_{i}", num_tokens=10) + + # Check where our key ended up + entry = cache.cache_entries.get(key) + if entry: + print(f" After GPU fill: key is on tier={entry['location']}") + + # Fill CPU to force demotion to NVMe + print(" Filling CPU to force demotion to NVMe...") + for i in range(40): + cache.allocate_cache(f"push_more_{i}", num_tokens=10) + + entry = cache.cache_entries.get(key) + if entry: + current_tier = entry['location'] + print(f" After CPU fill: key is on tier={current_tier}") + + # Read raw data back from whichever backend it landed on + read_data, _ = cache.backends[current_tier].read(key) + print(f" Re-read data shape: {read_data.shape}, sum: {np.sum(read_data):.4f}") + + assert original_data.shape == read_data.shape, \ + f"Shape mismatch: {original_data.shape} vs {read_data.shape}" + assert np.allclose(original_data, read_data, atol=1e-3), \ + f"Data mismatch after demotion through tiers" + print(" Data integrity verified after demotion chain!") + else: + # Key was evicted entirely — that's also valid if NVMe was tiny + print(" Key was evicted (deleted). Skipping data comparison.") + + def test_tier_order_includes_fake_gpu(self, tiny_model_config): + """ + Confirm that injecting a GPU backend adds 'gpu' to the tier order, + giving us the full 3-tier cascade path. + """ + cache = MultiTierCache( + model_config=tiny_model_config, + gpu_memory_gb=0, + cpu_memory_gb=0.001, + seed=42, + ) + + # Without fake GPU, tier order is ['cpu', 'nvme'] + tier_order_before = cache._get_tier_order() + print(f"\n Tier order without GPU: {tier_order_before}") + assert 'gpu' not in tier_order_before + + # Inject fake GPU + cache.backends['gpu'] = CPUMemoryBackend() + cache.gpu_memory_limit = 1 * 1024 * 1024 # 1 MB + + tier_order_after = cache._get_tier_order() + print(f" Tier order with fake GPU: {tier_order_after}") + assert tier_order_after == ['gpu', 'cpu', 'nvme'], \ + f"Expected ['gpu', 'cpu', 'nvme'], got {tier_order_after}" + + +# ============================================================================= +# Test: NVMe-Only Mode (cpu=0, gpu=0) — Eviction and File Deletion +# ============================================================================= + +class TestNVMeOnlyEviction: + """ + Tests the cpu=0, gpu=0 configuration where NVMe is the ONLY tier. + + This is the exact configuration that triggered the three bugs: + 1. Double-decrement race in nvme_memory_used + 2. Eviction guards rejecting entries on the terminal tier + 3. Preconditioning spinning forever + + These tests verify that: + - Entries are allocated on NVMe (the only tier) + - When NVMe fills up, LRU entries are deleted (not demoted) + - .npy files are actually removed from disk after eviction + - nvme_memory_used tracking stays sane (no negative drift) + - The "second pass" works: new allocations succeed after eviction + """ + + @pytest.fixture + def tiny_model_config(self): + return MODEL_CONFIGS['tiny-1b'] + + def test_nvme_only_basic_allocation(self, tiny_model_config): + """ + With cpu=0 gpu=0, all entries should land on NVMe. + Verify tier='nvme' for every allocation. + """ + cache = MultiTierCache( + model_config=tiny_model_config, + gpu_memory_gb=0, + cpu_memory_gb=0, # ZERO CPU + seed=42, + storage_capacity_gb=0.01 # 10 MB NVMe + ) + + print(f"\n NVMe limit: {cache.nvme_memory_limit / 1024:.0f} KB") + print(f" CPU limit: {cache.cpu_memory_limit / 1024:.0f} KB") + print(f" Tier order: {cache._get_tier_order()}") + + for i in range(5): + key = f"nvme_only_{i}" + success, tier, _ = cache.allocate_cache(key, num_tokens=10) + print(f" [{i}] key={key} → tier={tier}, success={success}") + assert success, f"Allocation {i} should succeed" + # CPU has 0 capacity — entry should skip CPU and go to NVMe + assert tier == 'nvme' or tier == 'cpu', \ + f"Expected 'nvme' (or 'cpu' if zero-cap is treated as available), got '{tier}'" + + def test_nvme_only_eviction_deletes_files(self, tiny_model_config): + """ + Fill NVMe past capacity with cpu=0, gpu=0. + Verify that: + 1. Eviction counter increments + 2. Early keys are removed from cache_entries + 3. .npy files are actually deleted from disk + 4. Later allocations still succeed (the "second loop") + """ + nvme_mb = 2 # 2 MB NVMe + tokens_per_entry = 10 # ~240 KB per entry with tiny-1b + # 2 MB / 240 KB ≈ 8 entries before eviction starts + + cache = MultiTierCache( + model_config=tiny_model_config, + gpu_memory_gb=0, + cpu_memory_gb=0, + seed=42, + storage_capacity_gb=nvme_mb / 1024, + ) + + nvme_dir = cache.backends['nvme'].base_path + print(f"\n NVMe dir: {nvme_dir}") + print(f" NVMe limit: {cache.nvme_memory_limit / 1024:.0f} KB") + print(f" Tier order: {cache._get_tier_order()}") + + # --- Pass 1: Fill NVMe to trigger eviction --- + print("\n --- Pass 1: Fill and overflow ---") + all_keys = [] + for i in range(30): + key = f"pass1_{i}" + success, tier, _ = cache.allocate_cache(key, num_tokens=tokens_per_entry) + all_keys.append(key) + + npy_count = len(list(nvme_dir.glob("*.npy"))) + entry_count = len(cache.cache_entries) + + print(f" [{i:2d}] success={success} tier={tier:<5s} " + f"entries={entry_count:3d} .npy={npy_count:3d} " + f"nvme_used={cache.nvme_memory_used/1024:.0f}KB " + f"evictions={cache.stats['evictions']}") + + assert success, f"Allocation {i} should succeed even after eviction" + + # --- Verify eviction occurred --- + evictions = cache.stats['evictions'] + print(f"\n Evictions after pass 1: {evictions}") + assert evictions > 0, \ + "Evictions should have occurred with 30 entries in 2 MB" + + # --- Verify early keys were deleted --- + early_keys_present = sum(1 for k in all_keys[:10] if k in cache.cache_entries) + late_keys_present = sum(1 for k in all_keys[-5:] if k in cache.cache_entries) + print(f" Early keys (0-9) still in cache: {early_keys_present}/10") + print(f" Late keys (25-29) still in cache: {late_keys_present}/5") + + assert early_keys_present < 10, \ + f"Some early keys should have been evicted, but {early_keys_present}/10 remain" + assert late_keys_present > 0, \ + "Recent keys should still be in cache" + + # --- Verify .npy files match cache_entries --- + npy_files = set(f.stem for f in nvme_dir.glob("*.npy")) + nvme_entries = set(k for k, v in cache.cache_entries.items() if v['location'] == 'nvme') + orphaned = npy_files - nvme_entries + missing = nvme_entries - npy_files + + print(f"\n .npy files on disk: {len(npy_files)}") + print(f" NVMe entries tracked: {len(nvme_entries)}") + print(f" Orphaned files (on disk, not tracked): {len(orphaned)}") + print(f" Missing files (tracked, not on disk): {len(missing)}") + + assert len(orphaned) == 0, \ + f"Orphaned .npy files found: {orphaned}" + + # --- Pass 2: "Second loop" — new allocations after eviction --- + print("\n --- Pass 2: Second loop (allocate after eviction) ---") + pass2_success = 0 + for i in range(20): + key = f"pass2_{i}" + success, tier, _ = cache.allocate_cache(key, num_tokens=tokens_per_entry) + if success: + pass2_success += 1 + + if i < 5 or i >= 15: + print(f" [{i:2d}] success={success} tier={tier:<5s} " + f"nvme_used={cache.nvme_memory_used/1024:.0f}KB " + f"evictions={cache.stats['evictions']}") + + print(f"\n Pass 2 successes: {pass2_success}/20") + assert pass2_success == 20, \ + f"All pass-2 allocations should succeed, got {pass2_success}/20" + + # --- Verify nvme_memory_used didn't go negative --- + print(f" Final nvme_memory_used: {cache.nvme_memory_used/1024:.0f} KB") + assert cache.nvme_memory_used >= 0, \ + f"nvme_memory_used drifted negative: {cache.nvme_memory_used}" + + def test_nvme_only_memory_tracking_no_negative_drift(self, tiny_model_config): + """ + Rapid allocation/eviction cycles with cpu=0, gpu=0. + The double-decrement bug caused nvme_memory_used to drift to ~0 + while the disk was full. This test verifies tracking stays accurate. + """ + cache = MultiTierCache( + model_config=tiny_model_config, + gpu_memory_gb=0, + cpu_memory_gb=0, + seed=42, + storage_capacity_gb=1.0 / 1024, # 1 MB — very tight + ) + + print(f"\n NVMe limit: {cache.nvme_memory_limit / 1024:.0f} KB") + + # Rapid-fire 100 allocations into 1 MB — heavy eviction pressure + for i in range(100): + cache.allocate_cache(f"stress_{i}", num_tokens=10) + + # Recount actual usage from cache_entries + actual_nvme = sum( + e['size'] for e in cache.cache_entries.values() + if e['location'] == 'nvme' + ) + + tracked = cache.nvme_memory_used + print(f" Tracked nvme_memory_used: {tracked / 1024:.0f} KB") + print(f" Actual from cache_entries: {actual_nvme / 1024:.0f} KB") + print(f" Evictions: {cache.stats['evictions']}") + + assert tracked >= 0, \ + f"nvme_memory_used went negative: {tracked}" + + # Tracked should be >= actual (it can overcount due to forced writes, + # but should never undercount after our fix) + assert tracked >= actual_nvme * 0.5, \ + f"Tracked usage ({tracked/1024:.0f}KB) is suspiciously low vs " \ + f"actual ({actual_nvme/1024:.0f}KB) — possible double-decrement" + + def test_nvme_only_concurrent_allocation(self, tiny_model_config): + """ + Multiple threads allocating simultaneously with cpu=0, gpu=0. + This is the exact scenario that triggers the double-decrement race + (Bug 1 from the fix). Verify no crash and no negative drift. + """ + cache = MultiTierCache( + model_config=tiny_model_config, + gpu_memory_gb=0, + cpu_memory_gb=0, + seed=42, + storage_capacity_gb=2.0 / 1024, # 2 MB + ) + + results = {'success': 0, 'fail': 0} + lock = threading.Lock() + + def worker(thread_id, count): + local_success = 0 + local_fail = 0 + for i in range(count): + key = f"t{thread_id}_entry_{i}" + success, tier, _ = cache.allocate_cache(key, num_tokens=10) + if success: + local_success += 1 + else: + local_fail += 1 + with lock: + results['success'] += local_success + results['fail'] += local_fail + + # 4 threads, 25 allocations each = 100 total + threads = [] + for t in range(4): + th = threading.Thread(target=worker, args=(t, 25)) + threads.append(th) + + print(f"\n Starting 4 threads, 25 allocations each...") + for th in threads: + th.start() + for th in threads: + th.join() + + print(f" Successes: {results['success']}") + print(f" Failures: {results['fail']}") + print(f" Evictions: {cache.stats['evictions']}") + print(f" nvme_memory_used: {cache.nvme_memory_used / 1024:.0f} KB") + print(f" Entries in cache: {len(cache.cache_entries)}") + + assert results['success'] > 0, "At least some allocations should succeed" + assert cache.nvme_memory_used >= 0, \ + f"nvme_memory_used went negative after concurrent access: {cache.nvme_memory_used}" + + +# ============================================================================= +# Test: Visualize User Request Flow +# +# Run with: pytest tests/test_kv_cache.py::TestVisualizeUserRequestFlow -v -s --log-cli-level=DEBUG +# +# This test walks through the entire benchmark pipeline step-by-step, +# printing and logging every decision so you can see exactly what happens +# when a user request enters the system. +# ============================================================================= + +class TestVisualizeUserRequestFlow: + """ + Educational test that traces a user request through the full benchmark + pipeline. Enable debug logging to see every internal decision: + + pytest -k TestVisualizeUserRequestFlow -v -s --log-cli-level=DEBUG + + The test covers: + 1. How users are generated (UserSimulator, QoS distribution) + 2. How context tokens map to KV cache bytes (ModelConfig math) + 3. How the 4 latency components are produced + (end-to-end, storage I/O, generation, prefill/decode) + 4. Waterfall LRU eviction with 3 tiers (GPU → CPU → NVMe → delete) + 5. Waterfall LRU eviction with 1 tier (NVMe-only, cpu=0 gpu=0) + """ + + @pytest.fixture + def tiny_model(self): + return MODEL_CONFIGS['tiny-1b'] + + # ------------------------------------------------------------------ + # Part 1: User selection and request creation + # ------------------------------------------------------------------ + + def test_part1_user_selection_and_request_creation(self, tiny_model): + """ + Shows how UserSimulator picks users and how InferenceRequest + is built from a UserProfile. + + Key flow: + UserSimulator.generate_mixed_users(N) + → for each user, pick random type (chatbot/coding/document) + → sample context_length from type's range + → sample generation_length from type's range + → roll QoS level (15% interactive, 35% responsive, 50% batch) + → return UserProfile + + InferenceRequest is created from a UserProfile: + → context_tokens = user.context_length (how many tokens to prefill) + → generate_tokens = user.generation_length (how many tokens to decode) + → cache_key = "{user_id}_ctx" (or conversation-based) + → submit_time = time.perf_counter() (latency clock starts here) + """ + import random as rng + rng.seed(42) + + print("\n" + "=" * 72) + print(" PART 1: USER SELECTION AND REQUEST CREATION") + print("=" * 72) + + # --- Step 1: Generate users --- + print("\n --- Step 1: UserSimulator generates 6 users ---") + print(" Each user gets a random type (chatbot/coding/document)") + print(" and a QoS level (interactive/responsive/batch).\n") + print(" Templates:") + for utype, tmpl in UserSimulator.DEFAULT_USER_TEMPLATES.items(): + print(f" {utype:10s} context={tmpl['context_range']} " + f"gen={tmpl['generation_range']} think={tmpl['think_time_range']}") + + users = UserSimulator.generate_mixed_users(6) + + print(f"\n Generated {len(users)} users:") + print(f" {'ID':<12s} {'QoS':<14s} {'Pri':>3s} {'Context':>8s} {'GenLen':>7s} {'Think':>6s}") + print(f" {'-'*12} {'-'*14} {'-'*3} {'-'*8} {'-'*7} {'-'*6}") + for u in users: + print(f" {u.user_id:<12s} {u.qos_level.value:<14s} {u.priority:>3d} " + f"{u.context_length:>8,d} {u.generation_length:>7d} {u.think_time:>6.2f}s") + + # --- Step 2: Create an InferenceRequest --- + print("\n --- Step 2: Build an InferenceRequest from first user ---") + user = users[0] + req = InferenceRequest( + user_id=user.user_id, + request_id=f"{user.user_id}_req_0", + timestamp=datetime.now(), + context_tokens=user.context_length, + generate_tokens=user.generation_length, + priority=user.priority, + phase=InferencePhase.PREFILL_DECODE, + qos_level=user.qos_level, + ) + + print(f" Request fields:") + print(f" user_id = {req.user_id}") + print(f" request_id = {req.request_id}") + print(f" context_tokens = {req.context_tokens:,d}") + print(f" generate_tokens = {req.generate_tokens}") + print(f" phase = {req.phase.value}") + print(f" qos_level = {req.qos_level.value}") + print(f" priority = {req.priority}") + print(f" cache_key = {req.cache_key}") + print(f" submit_time = {req.submit_time:.6f} (perf_counter)") + + assert req.cache_key == f"{user.user_id}_ctx", \ + "Default cache_key should be '{user_id}_ctx'" + assert req.context_tokens > 0 + assert req.generate_tokens > 0 + + # ------------------------------------------------------------------ + # Part 2: KV cache size calculation + # ------------------------------------------------------------------ + + def test_part2_kv_cache_size_calculation(self, tiny_model): + """ + Shows how context_tokens is converted to bytes. + + Formula (MHA/GQA): + bytes_per_token = num_layers × kv_heads × kv_dim_per_head × 2 × dtype_bytes + + Total cache size: + cache_bytes = context_tokens × bytes_per_token + + For tiny-1b (12 layers, 4 KV heads, dim=128, float16): + bytes_per_token = 12 × 4 × 128 × 2 × 2 = 24,576 bytes = 24 KB/token + """ + print("\n" + "=" * 72) + print(" PART 2: KV CACHE SIZE CALCULATION") + print("=" * 72) + + m = tiny_model + bpt = m.kv_cache_size_per_token + + print(f"\n Model: {m.name}") + print(f" num_layers = {m.num_layers}") + print(f" kv_heads = {m.kv_heads}") + print(f" kv_dim_per_head = {m.kv_dim_per_head}") + print(f" dtype = {m.dtype} ({m.bytes_per_element} bytes/element)") + print(f" attention_type = {m.attention_type}") + print(f"\n Formula: num_layers × kv_heads × kv_dim_per_head × 2(K+V) × dtype_bytes") + print(f" {m.num_layers} × {m.kv_heads} × {m.kv_dim_per_head} × 2 × {m.bytes_per_element}") + print(f" = {bpt:,d} bytes/token ({bpt / 1024:.1f} KB/token)") + + expected = m.num_layers * m.kv_heads * m.kv_dim_per_head * 2 * m.bytes_per_element + assert bpt == expected, f"Formula mismatch: {bpt} != {expected}" + + # Show how different context sizes scale + print(f"\n Context size → cache bytes:") + for tokens in [100, 512, 2048, 8192, 16384]: + total = tokens * bpt + print(f" {tokens:>6,d} tokens × {bpt/1024:.0f} KB/tok = {total / 1024**2:>8.2f} MB") + + # Compare with a larger model + print(f"\n Comparison across models:") + for model_key in ['tiny-1b', 'mistral-7b', 'llama3.1-8b', 'llama3.1-70b-instruct']: + mc = MODEL_CONFIGS[model_key] + bpt2 = mc.kv_cache_size_per_token + size_2k = 2048 * bpt2 + print(f" {model_key:<25s} {bpt2/1024:>6.0f} KB/tok " + f" 2048 ctx = {size_2k / 1024**2:>7.1f} MB") + + # Show MLA (DeepSeek) is different + if 'deepseek-v3' in MODEL_CONFIGS: + ds = MODEL_CONFIGS['deepseek-v3'] + ds_bpt = ds.kv_cache_size_per_token + print(f"\n MLA model (DeepSeek V3): different formula") + print(f" num_layers × (kv_lora_rank + qk_rope_head_dim) × dtype_bytes") + print(f" {ds.num_layers} × ({ds.kv_lora_rank} + {ds.qk_rope_head_dim}) × {ds.bytes_per_element}") + print(f" = {ds_bpt:,d} bytes/token ({ds_bpt / 1024:.1f} KB/token)") + + # ------------------------------------------------------------------ + # Part 3: The 4 latency levels (nested hierarchy) + # ------------------------------------------------------------------ + + def test_part3_four_latency_levels(self, tiny_model): + """ + Traces a single request and shows how the 4 latency levels nest: + + ┌───────────────────────────────────────────────────────────────────┐ + │ L1: END-TO-END LATENCY │ + │ submit_time → complete_time │ + │ = Queue Wait + Storage I/O + Token Generation │ + │ │ + │ ┌────────────────────────────────────────────────────────────┐ │ + │ │ L2: PER-REQUEST STORAGE LATENCY │ │ + │ │ Total I/O time for ONE request (multiple ops) │ │ + │ │ = 1× Prefill Write + N× Decode Reads │ │ + │ │ │ │ + │ │ ┌──────────────────────────────────────────────────────┐ │ │ + │ │ │ L3: PER-TIER TOTAL LATENCY │ │ │ + │ │ │ Time for ONE file I/O op on ONE tier │ │ │ + │ │ │ = Host + Device │ │ │ + │ │ │ │ │ │ + │ │ │ ┌────────────────────────────────────────────────┐ │ │ │ + │ │ │ │ L4: HOST vs DEVICE BREAKDOWN │ │ │ │ + │ │ │ │ Write: Host=np.save() | Device=fsync() │ │ │ │ + │ │ │ │ Read: Host=fadvise+copy | Device=np.load │ │ │ │ + │ │ │ └────────────────────────────────────────────────┘ │ │ │ + │ │ └──────────────────────────────────────────────────────┘ │ │ + │ └────────────────────────────────────────────────────────────┘ │ + └───────────────────────────────────────────────────────────────────┘ + """ + print("\n" + "=" * 72) + print(" PART 3: THE 4 LATENCY LEVELS (NESTED HIERARCHY)") + print("=" * 72) + + # Force NVMe so we get real host/device splits (CPU backend + # doesn't have a meaningful host vs device distinction) + cache = MultiTierCache( + model_config=tiny_model, + gpu_memory_gb=0, + cpu_memory_gb=0, # zero → everything hits NVMe + seed=42, + storage_capacity_gb=0.1, # 100 MB + ) + + context_tokens = 512 + generate_tokens = 100 + bpt = tiny_model.kv_cache_size_per_token + cache_bytes = context_tokens * bpt + + print(f"\n Request: {context_tokens} context tokens, {generate_tokens} gen tokens") + print(f" Cache entry: {context_tokens} × {bpt:,d} = {cache_bytes:,d} bytes ({cache_bytes/1024:.0f} KB)") + print(f" Generation mode: NONE (0 ms/tok) — real benchmark uses FAST or REALISTIC") + + # ═══════════════════════════════════════════════════════════════ + # The clock starts when the request is submitted + # ═══════════════════════════════════════════════════════════════ + submit_time = time.perf_counter() + + # ───────────────────────────────────────────────────────────── + # L3/L4: PREFILL WRITE — one I/O operation + # NVMeBackend.write() measures: + # host_time = time for np.save() (serialize + buffered write) + # device_time = time for f.flush() + os.fsync() (commit to disk) + # total = host_time + device_time + # ───────────────────────────────────────────────────────────── + print(f"\n ──── PREFILL: allocate_cache('{context_tokens} tokens') ────") + + cache.stats['storage_write_latencies'].clear() + cache.stats['storage_write_device_latencies'].clear() + cache.stats['storage_write_host_latencies'].clear() + + success, tier, write_total = cache.allocate_cache( + "user_0000_ctx", num_tokens=context_tokens, phase=InferencePhase.PREFILL, + ) + + # Pull L4 breakdown from stats (cache records it during allocate_cache) + w_host = cache.stats['storage_write_host_latencies'][-1] if cache.stats['storage_write_host_latencies'] else 0 + w_device = cache.stats['storage_write_device_latencies'][-1] if cache.stats['storage_write_device_latencies'] else 0 + w_total = cache.stats['storage_write_latencies'][-1] if cache.stats['storage_write_latencies'] else write_total + + print(f" tier = {tier}, success = {success}") + print(f" L3 write total : {w_total * 1000:>10.3f} ms (one np.save + fsync)") + print(f" L4 host : {w_host * 1000:>10.3f} ms (np.save — serialize to page cache)") + print(f" L4 device : {w_device * 1000:>10.3f} ms (fsync — flush to NVMe controller)") + + prefill_latency = write_total + storage_latency = write_total + + # ───────────────────────────────────────────────────────────── + # L3/L4: DECODE READ — one I/O operation + # NVMeBackend.read() measures: + # device_time = time for np.load() (read from disk) + # host_time = time for fadvise + np.array(copy) + # total = host_time + device_time + # ───────────────────────────────────────────────────────────── + print(f"\n ──── DECODE: access_cache('{tier}') ────") + + cache.stats['storage_read_latencies'].clear() + cache.stats['storage_read_device_latencies'].clear() + cache.stats['storage_read_host_latencies'].clear() + + location, read_total = cache.access_cache( + "user_0000_ctx", phase=InferencePhase.DECODE, + ) + + r_host = cache.stats['storage_read_host_latencies'][-1] if cache.stats['storage_read_host_latencies'] else 0 + r_device = cache.stats['storage_read_device_latencies'][-1] if cache.stats['storage_read_device_latencies'] else 0 + r_total = cache.stats['storage_read_latencies'][-1] if cache.stats['storage_read_latencies'] else read_total + + print(f" location = {location}") + print(f" L3 read total : {r_total * 1000:>10.3f} ms (one fadvise + np.load + copy)") + print(f" L4 host : {r_host * 1000:>10.3f} ms (posix_fadvise + np.array copy)") + print(f" L4 device : {r_device * 1000:>10.3f} ms (np.load — read from NVMe)") + + decode_latency = read_total + storage_latency += read_total + + # ───────────────────────────────────────────────────────────── + # L2: BATCHED DECODE READS + # The benchmark does ceil(generate_tokens / batch_size) extra reads + # to simulate incremental KV access during token generation. + # ───────────────────────────────────────────────────────────── + decode_batch_size = 32 + num_batched = max(1, (generate_tokens + decode_batch_size - 1) // decode_batch_size) + + print(f"\n ──── BATCHED DECODE READS ────") + print(f" ceil({generate_tokens} gen_tokens / {decode_batch_size} batch_size) = {num_batched} extra reads") + + for i in range(num_batched): + _, batch_lat = cache.access_cache("user_0000_ctx", InferencePhase.DECODE) + storage_latency += batch_lat + + print(f" Batched read total: {(storage_latency - write_total - read_total) * 1000:.3f} ms") + + # ───────────────────────────────────────────────────────────── + # GENERATION LATENCY + # Simulates GPU token generation: sleep(tokens × per_token_time) + # ───────────────────────────────────────────────────────────── + gen_mode = GenerationMode.NONE # use NONE for test speed + generation_latency = generate_tokens * GENERATION_TIMING[gen_mode] + + print(f"\n ──── GENERATION ────") + print(f" Mode: {gen_mode.value}") + for mode, per_tok in GENERATION_TIMING.items(): + marker = " ←" if mode == gen_mode else "" + print(f" {mode.value:>10s}: {per_tok*1000:>5.0f} ms/tok × {generate_tokens} tok " + f"= {generate_tokens * per_tok * 1000:>7.0f} ms{marker}") + + complete_time = time.perf_counter() + end_to_end = (complete_time - submit_time) * 1000 + + # ═══════════════════════════════════════════════════════════════ + # FULL HIERARCHY — with real numbers + # ═══════════════════════════════════════════════════════════════ + print(f"\n {'═' * 68}") + print(f" LATENCY HIERARCHY (real measurements from this request)") + print(f" {'═' * 68}") + print(f"") + print(f" L1: END-TO-END {end_to_end:>10.3f} ms") + print(f" │ (submit_time → complete_time = storage + generation + overhead)") + print(f" │") + print(f" ├─ L2: STORAGE I/O (this request) {storage_latency * 1000:>10.3f} ms") + print(f" │ │ (1 prefill write + 1 decode read + {num_batched} batched reads)") + print(f" │ │") + print(f" │ ├─ PREFILL WRITE {write_total * 1000:>10.3f} ms") + print(f" │ │ └─ L3: tier total {w_total * 1000:>10.3f} ms") + print(f" │ │ ├─ L4 host (np.save) {w_host * 1000:>10.3f} ms") + print(f" │ │ └─ L4 device (fsync) {w_device * 1000:>10.3f} ms") + print(f" │ │") + print(f" │ ├─ DECODE READ {read_total * 1000:>10.3f} ms") + print(f" │ │ └─ L3: tier total {r_total * 1000:>10.3f} ms") + print(f" │ │ ├─ L4 host (fadvise+cp) {r_host * 1000:>10.3f} ms") + print(f" │ │ └─ L4 device (np.load) {r_device * 1000:>10.3f} ms") + print(f" │ │") + print(f" │ └─ BATCHED READS ×{num_batched:<3d} " + f"{(storage_latency - write_total - read_total) * 1000:>10.3f} ms") + print(f" │") + print(f" └─ GENERATION {generation_latency * 1000:>10.3f} ms") + print(f" ({generate_tokens} tokens × {GENERATION_TIMING[gen_mode]*1000:.0f} ms/tok [{gen_mode.value}])") + print(f"") + print(f" Overhead (locks, data gen, etc): " + f"{end_to_end - storage_latency * 1000 - generation_latency * 1000:>10.3f} ms") + + # ═══════════════════════════════════════════════════════════════ + # Where each level is recorded in the benchmark results JSON + # ═══════════════════════════════════════════════════════════════ + print(f"\n {'─' * 68}") + print(f" WHERE EACH LEVEL APPEARS IN BENCHMARK OUTPUT") + print(f" {'─' * 68}") + print(f" L1 → results['end_to_end_latencies']") + print(f" L2 → results['storage_latencies'] (per-request sum)") + print(f" results['prefill_latencies'] (write ops only)") + print(f" results['decode_latencies'] (read ops only)") + print(f" L3 → stats['storage_write_p50_ms'] thru stats['storage_write_p9999_ms']") + print(f" stats['storage_read_p50_ms'] thru stats['storage_read_p9999_ms']") + print(f" L4 → stats['storage_write_device_p50_ms'] (fsync only)") + print(f" stats['storage_write_host_p50_ms'] (np.save only)") + print(f" stats['storage_read_device_p50_ms'] (np.load only)") + print(f" stats['storage_read_host_p50_ms'] (fadvise+copy)") + + assert success + assert storage_latency >= 0 + assert w_host >= 0 and w_device >= 0 + assert r_host >= 0 and r_device >= 0 + assert write_total > 0, "Write to NVMe should have measurable latency" + assert read_total > 0, "Read from NVMe should have measurable latency" + + # ------------------------------------------------------------------ + # Part 3b: How requests become .npy files on disk + # ------------------------------------------------------------------ + + def test_part3b_request_to_npy_file_mapping(self, tiny_model): + """ + Shows the exact path from a user request to a .npy file on disk. + + Flow: + InferenceRequest.cache_key + → NVMeBackend._get_path(cache_key) = base_path / "{cache_key}.npy" + → NVMeBackend.write(): + open("{cache_key}.npy", 'wb') + np.save(f, kv_data) ← host time (serialize to page cache) + f.flush(); os.fsync(f.fileno()) ← device time (commit to NVMe) + → NVMeBackend.read(): + posix_fadvise(DONTNEED) ← drop page cache for honest benchmark + np.load("{cache_key}.npy") ← device time (read from NVMe) + np.array(data) ← host time (copy to writable buffer) + + The .npy file is a standard NumPy binary format: + - 10-byte magic header ("\\x93NUMPY") + - Version, header length, dtype/shape metadata + - Raw float16/float32 tensor data + + File size on disk ≈ data.nbytes + ~128 bytes header overhead + """ + print("\n" + "=" * 72) + print(" PART 3b: HOW REQUESTS BECOME .npy FILES ON DISK") + print("=" * 72) + + cache = MultiTierCache( + model_config=tiny_model, + gpu_memory_gb=0, + cpu_memory_gb=0, + seed=42, + storage_capacity_gb=0.1, + ) + + nvme_dir = cache.backends['nvme'].base_path + bpt = tiny_model.kv_cache_size_per_token + + print(f"\n NVMe base path: {nvme_dir}") + print(f" Model: {tiny_model.name} ({bpt:,d} bytes/token)") + + # --- Single-turn request: cache_key = "{user_id}_ctx" --- + print(f"\n ──── Single-turn request ────") + req = InferenceRequest( + user_id="user_0001", request_id="req_0", + timestamp=datetime.now(), + context_tokens=100, generate_tokens=50, priority=1, + ) + print(f" cache_key = {req.cache_key}") + print(f" Expected file: {nvme_dir / (req.cache_key + '.npy')}") + + success, tier, _ = cache.allocate_cache( + req.cache_key, num_tokens=req.context_tokens + ) + + file_path = nvme_dir / f"{req.cache_key}.npy" + expected_data_bytes = req.context_tokens * bpt + file_size = file_path.stat().st_size if file_path.exists() else 0 + header_overhead = file_size - expected_data_bytes + + print(f"\n allocate_cache() wrote to tier: {tier}") + print(f" File exists: {file_path.exists()}") + print(f" File path: {file_path}") + print(f" File size: {file_size:,d} bytes") + print(f" data: {expected_data_bytes:,d} bytes ({req.context_tokens} tok × {bpt:,d} B/tok)") + print(f" header: {header_overhead:,d} bytes (.npy magic + dtype + shape)") + + # Show file structure + print(f"\n .npy file internal structure:") + print(f" ┌──────────────────────────────────────────────┐") + print(f" │ \\x93NUMPY magic (6 bytes) │") + print(f" │ version 1.0 (2 bytes) │") + print(f" │ header_len (2 bytes) │") + print(f" │ {{'descr': '10,d} bytes ({sz/1024:.1f} KB)") + + assert file_path.exists(), f"Expected .npy file at {file_path}" + assert file_size > expected_data_bytes, "File should include .npy header" + assert len(npy_files) >= len(keys), "Each cache_key should produce one .npy file" + + # ------------------------------------------------------------------ + # Part 3c: Multi-turn conversations and file I/O + # ------------------------------------------------------------------ + + def test_part3c_multi_turn_prefill_decode_file_io(self, tiny_model): + """ + Shows how a multi-turn conversation creates and reads .npy files. + + Conversation with 4 turns: + + Turn 1 (no previous context): + cache_key = "conv_XXX_turn_1" + PREFILL: allocate_cache() → WRITE conv_XXX_turn_1.npy (new file) + DECODE: access_cache() → READ conv_XXX_turn_1.npy + + Turn 2 (has previous turn): + cache_key = "conv_XXX_turn_2" + MULTI-TURN READ: access_cache(turn_1) → READ conv_XXX_turn_1.npy ← reuse! + PREFILL: allocate_cache() → WRITE conv_XXX_turn_2.npy (new file) + DECODE: access_cache() → READ conv_XXX_turn_2.npy + + Turn 3: + MULTI-TURN READ: access_cache(turn_2) → READ conv_XXX_turn_2.npy ← reuse! + PREFILL: WRITE conv_XXX_turn_3.npy + DECODE: READ conv_XXX_turn_3.npy + + Each turn: + - Reads the PREVIOUS turn's .npy (multi-turn cache reuse) + - Writes a NEW .npy for this turn's KV cache + - Reads the NEW .npy during decode + - File count grows by 1 per turn (until eviction cleans old ones) + + This is the exact flow from benchmark.py process_requests() steps 2, 3, 5. + """ + print("\n" + "=" * 72) + print(" PART 3c: MULTI-TURN CONVERSATION FILE I/O") + print("=" * 72) + + cache = MultiTierCache( + model_config=tiny_model, + gpu_memory_gb=0, + cpu_memory_gb=0, + seed=42, + storage_capacity_gb=0.5, # plenty of room so no eviction + ) + + nvme_dir = cache.backends['nvme'].base_path + bpt = tiny_model.kv_cache_size_per_token + conv_mgr = ConversationManager(max_conversations=10) + + # Start a conversation + conv_id = conv_mgr.start_conversation("alice") + print(f"\n Conversation started: {conv_id}") + print(f" NVMe dir: {nvme_dir}") + + num_turns = 4 + context_per_turn = 200 # tokens + + print(f"\n Simulating {num_turns} turns, {context_per_turn} context tokens each") + print(f" Entry size per turn: {context_per_turn} × {bpt:,d} = " + f"{context_per_turn * bpt / 1024:.0f} KB") + + for turn in range(1, num_turns + 1): + print(f"\n {'━' * 64}") + print(f" TURN {turn}") + print(f" {'━' * 64}") + + # ConversationManager creates the cache_key + turn_num, cache_key = conv_mgr.add_turn(conv_id, context_per_turn, 50) + + print(f" cache_key = {cache_key}") + print(f" file = {cache_key}.npy") + + storage_latency = 0.0 + file_ops = [] + + # ── Step 2: Multi-turn read (previous turn's cache) ── + if turn > 1: + prev_key = f"{conv_id}_turn_{turn - 1}" + prev_file = nvme_dir / f"{prev_key}.npy" + + print(f"\n Step 2: MULTI-TURN READ (reuse previous turn)") + print(f" Read: {prev_key}.npy") + print(f" Exists: {prev_file.exists()}") + + location, read_lat = cache.access_cache( + prev_key, InferencePhase.DECODE, 'multi_turn' + ) + storage_latency += read_lat + file_ops.append(f"READ {prev_key}.npy ({read_lat*1000:.3f} ms) [multi-turn reuse]") + + if location: + print(f" Hit: location={location}, latency={read_lat*1000:.3f} ms") + else: + print(f" Miss: previous turn not in cache") + else: + print(f"\n Step 2: MULTI-TURN READ — skipped (turn 1, no history)") + + # ── Step 3: Prefill write (this turn's new KV cache) ── + this_file = nvme_dir / f"{cache_key}.npy" + + print(f"\n Step 3: PREFILL WRITE (new KV cache for this turn)") + print(f" Write: {cache_key}.npy") + + success, tier, write_lat = cache.allocate_cache( + cache_key, num_tokens=context_per_turn, phase=InferencePhase.PREFILL + ) + storage_latency += write_lat + file_ops.append(f"WRITE {cache_key}.npy ({write_lat*1000:.3f} ms) [prefill]") + + file_size = this_file.stat().st_size if this_file.exists() else 0 + print(f" tier={tier}, success={success}, latency={write_lat*1000:.3f} ms") + print(f" File created: {this_file.exists()}, size: {file_size:,d} bytes") + + # ── Step 5: Decode read (read back this turn's cache) ── + print(f"\n Step 5: DECODE READ (read back this turn's KV cache)") + print(f" Read: {cache_key}.npy") + + location, read_lat = cache.access_cache( + cache_key, InferencePhase.DECODE + ) + storage_latency += read_lat + file_ops.append(f"READ {cache_key}.npy ({read_lat*1000:.3f} ms) [decode]") + + print(f" location={location}, latency={read_lat*1000:.3f} ms") + + # ── Summary for this turn ── + npy_files = sorted(nvme_dir.glob("*.npy")) + print(f"\n Turn {turn} I/O summary:") + for op in file_ops: + print(f" {op}") + print(f" Total storage latency this turn: {storage_latency*1000:.3f} ms") + print(f" .npy files on disk after turn {turn}: {len(npy_files)}") + for f in npy_files: + marker = " ← NEW" if f.stem == cache_key else "" + print(f" {f.name}{marker}") + + # ── Final summary ── + all_npy = sorted(nvme_dir.glob("*.npy")) + all_entries = {k: v for k, v in cache.cache_entries.items() + if v['location'] == 'nvme'} + + print(f"\n {'═' * 64}") + print(f" MULTI-TURN FILE I/O SUMMARY") + print(f" {'═' * 64}") + print(f" Turns completed: {num_turns}") + print(f" .npy files on disk: {len(all_npy)}") + print(f" NVMe cache entries: {len(all_entries)}") + print(f" Total writes: {cache.stats['prefill_writes']}") + print(f" Total reads: {cache.stats['decode_reads']}") + print(f" Total write bytes: {cache.stats['total_write_bytes']/1024:.0f} KB") + print(f" Total read bytes: {cache.stats['total_read_bytes']/1024:.0f} KB") + + print(f"\n File-per-turn pattern:") + print(f" Turn 1: WRITE turn_1.npy + READ turn_1.npy") + print(f" Turn 2: READ turn_1.npy + WRITE turn_2.npy + READ turn_2.npy") + print(f" Turn 3: READ turn_2.npy + WRITE turn_3.npy + READ turn_3.npy") + print(f" Turn N: READ turn_(N-1).npy + WRITE turn_N.npy + READ turn_N.npy") + print(f"") + print(f" I/O per turn:") + print(f" Turn 1: 1 write + 1 read = 2 I/O ops") + print(f" Turn 2+: 1 write + 2 reads = 3 I/O ops (extra read = multi-turn reuse)") + print(f"") + print(f" Write amplification over {num_turns} turns:") + total_data = num_turns * context_per_turn * bpt + total_written = cache.stats['total_write_bytes'] + print(f" Unique KV data: {total_data/1024:.0f} KB " + f"({num_turns} turns × {context_per_turn} tok × {bpt:,d} B)") + print(f" Bytes written: {total_written/1024:.0f} KB") + print(f" Ratio: {total_written / total_data:.2f}x") + + # Assertions + assert len(all_npy) == num_turns, \ + f"Should have {num_turns} .npy files (one per turn), got {len(all_npy)}" + assert cache.stats['prefill_writes'] == num_turns, \ + f"Should have {num_turns} prefill writes" + # decode_reads: turn 1 has 1, turns 2-4 have 2 each (multi-turn + decode) + expected_reads = 1 + (num_turns - 1) * 2 + assert cache.stats['decode_reads'] == expected_reads, \ + f"Expected {expected_reads} decode reads, got {cache.stats['decode_reads']}" + + # ------------------------------------------------------------------ + # Part 4: 3-tier waterfall LRU eviction + # ------------------------------------------------------------------ + + def test_part4_three_tier_waterfall_eviction(self, tiny_model): + """ + Demonstrates the full 3-tier waterfall LRU eviction cascade: + + GPU (fastest) → CPU (mid) → NVMe (slowest) → DELETE + + When the benchmark calls allocate_cache(): + 1. Try GPU: _ensure_space_in_tier('gpu', size) + - If GPU is full, pick LRU entry in GPU + - Recursively call _ensure_space_in_tier('cpu', lru_size) ← makes room + - _demote_entry(lru_key, 'gpu', 'cpu') ← move data + - Now GPU has space → write new entry + + 2. If GPU has no capacity (limit=0), skip to CPU. + 3. If CPU is full, same cascade: CPU LRU → NVMe + 4. If NVMe is full (terminal tier): DELETE the LRU .npy file + + This test uses a fake GPU backend (CPUMemoryBackend injected as + backends['gpu']) since we have no real GPU. + """ + print("\n" + "=" * 72) + print(" PART 4: 3-TIER WATERFALL LRU EVICTION") + print("=" * 72) + + bpt = tiny_model.kv_cache_size_per_token + tokens = 10 + entry_kb = (tokens * bpt) / 1024 + + gpu_mb, cpu_mb, nvme_mb = 1, 1, 1 + + cache = MultiTierCache( + model_config=tiny_model, + gpu_memory_gb=0, + cpu_memory_gb=cpu_mb / 1024, + seed=42, + storage_capacity_gb=nvme_mb / 1024, + ) + + # Inject fake GPU + cache.backends['gpu'] = CPUMemoryBackend() + cache.gpu_memory_limit = gpu_mb * 1024 * 1024 + + tier_order = cache._get_tier_order() + entries_per_tier = int((gpu_mb * 1024) / entry_kb) + + print(f"\n Tier order: {tier_order}") + print(f" Entry size: {tokens} tokens × {bpt:,d} B/tok = {entry_kb:.0f} KB") + print(f" Tier capacity: GPU={gpu_mb}MB, CPU={cpu_mb}MB, NVMe={nvme_mb}MB") + print(f" Entries per tier: ~{entries_per_tier}") + print(f"\n Writing 30 entries (much more than total 3-tier capacity)...") + + print(f"\n {'#':>4s} {'Key':<14s} {'Tier':<6s} {'GPU KB':>7s} {'CPU KB':>7s} " + f"{'NVMe KB':>8s} {'Evict':>5s} {'→CPU':>4s} {'→NVMe':>5s} {'Event'}") + print(f" {'─'*4} {'─'*14} {'─'*6} {'─'*7} {'─'*7} {'─'*8} {'─'*5} {'─'*4} {'─'*5} {'─'*30}") + + prev_evictions = 0 + prev_cpu_offloads = 0 + prev_nvme_offloads = 0 + + for i in range(30): + key = f"req_{i}" + success, tier, lat = cache.allocate_cache(key, num_tokens=tokens) + + evictions = cache.stats['evictions'] + cpu_off = cache.stats['offloads_cpu'] + nvme_off = cache.stats['offloads_storage'] + + # Detect what happened this iteration + events = [] + new_evictions = evictions - prev_evictions + new_cpu = cpu_off - prev_cpu_offloads + new_nvme = nvme_off - prev_nvme_offloads + if new_cpu > 0: + events.append(f"GPU→CPU demote ×{new_cpu}") + if new_nvme > 0: + events.append(f"CPU→NVMe demote ×{new_nvme}") + if new_evictions > new_cpu + new_nvme: + deletes = new_evictions - new_cpu - new_nvme + events.append(f"NVMe DELETE ×{deletes}") + event_str = ", ".join(events) if events else "—" + + print(f" {i:>4d} {key:<14s} {tier:<6s} " + f"{cache.gpu_memory_used/1024:>7.0f} " + f"{cache.cpu_memory_used/1024:>7.0f} " + f"{cache.nvme_memory_used/1024:>8.0f} " + f"{evictions:>5d} {cpu_off:>4d} {nvme_off:>5d} {event_str}") + + prev_evictions = evictions + prev_cpu_offloads = cpu_off + prev_nvme_offloads = nvme_off + + gpu_entries = sum(1 for v in cache.cache_entries.values() if v['location'] == 'gpu') + cpu_entries = sum(1 for v in cache.cache_entries.values() if v['location'] == 'cpu') + nvme_entries = sum(1 for v in cache.cache_entries.values() if v['location'] == 'nvme') + + print(f"\n Final state:") + print(f" GPU entries: {gpu_entries}") + print(f" CPU entries: {cpu_entries}") + print(f" NVMe entries: {nvme_entries}") + print(f" Total alive: {len(cache.cache_entries)}") + print(f" Total evictions: {cache.stats['evictions']}") + print(f" GPU→CPU demotes: {cache.stats['offloads_cpu']}") + print(f" CPU→NVMe demotes: {cache.stats['offloads_storage']}") + print(f" NVMe deletes: {cache.stats['evictions'] - cache.stats['offloads_cpu'] - cache.stats['offloads_storage']}") + + npy_files = list(cache.backends['nvme'].base_path.glob("*.npy")) + print(f" .npy files on disk: {len(npy_files)} (should ≈ {nvme_entries})") + + print(f"\n Eviction flow summary:") + print(f" GPU full → demote LRU to CPU (_demote_entry, data moves)") + print(f" CPU full → demote LRU to NVMe (_demote_entry, data moves)") + print(f" NVMe full → DELETE LRU from disk (file unlinked, entry gone)") + print(f" New entry always lands on GPU (fastest available tier)") + + assert cache.stats['offloads_cpu'] > 0, "GPU→CPU demotions should have occurred" + assert cache.stats['offloads_storage'] > 0, "CPU→NVMe demotions should have occurred" + nvme_deletes = cache.stats['evictions'] - cache.stats['offloads_cpu'] - cache.stats['offloads_storage'] + assert nvme_deletes > 0, "NVMe deletes should have occurred" + + # ------------------------------------------------------------------ + # Part 5: 1-tier (NVMe-only) waterfall eviction + # ------------------------------------------------------------------ + + def test_part5_one_tier_nvme_only_eviction(self, tiny_model): + """ + Demonstrates NVMe-only mode (cpu=0, gpu=0). + + This is the configuration that exposed 3 bugs: + 1. Double-decrement race on nvme_memory_used + 2. Eviction guards rejecting entries on the terminal tier + 3. Preconditioning spinning forever + + With only NVMe available: + - Every allocate_cache() goes directly to NVMe + - _ensure_space_in_tier('nvme') sees next_tier=None → is_last_tier=True + - Eviction = DELETE (unlink .npy file), not demote + - Capacity guards are relaxed: + • Skip 95% size cap (entry has nowhere else to go) + • Use 100% target (no cascade buffer needed) + • Skip low-data bail (keep evicting until space is free) + """ + print("\n" + "=" * 72) + print(" PART 5: 1-TIER NVMe-ONLY EVICTION (cpu=0, gpu=0)") + print("=" * 72) + + bpt = tiny_model.kv_cache_size_per_token + tokens = 10 + entry_kb = (tokens * bpt) / 1024 + nvme_mb = 1 + + cache = MultiTierCache( + model_config=tiny_model, + gpu_memory_gb=0, + cpu_memory_gb=0, # ZERO + seed=42, + storage_capacity_gb=nvme_mb / 1024, + ) + + nvme_dir = cache.backends['nvme'].base_path + tier_order = cache._get_tier_order() + entries_fit = int((nvme_mb * 1024) / entry_kb) + + print(f"\n Tier order: {tier_order}") + print(f" CPU limit: {cache.cpu_memory_limit} bytes (zero → skipped)") + print(f" NVMe limit: {cache.nvme_memory_limit / 1024:.0f} KB") + print(f" Entry size: {entry_kb:.0f} KB") + print(f" Entries that fit: ~{entries_fit}") + print(f" NVMe dir: {nvme_dir}") + + print(f"\n is_last_tier behavior:") + print(f" next_tier = None (nothing after NVMe)") + print(f" is_last_tier = True") + print(f" → Skip 95% size cap (can't send entry elsewhere)") + print(f" → effective_target = 100% (no cascade buffer)") + print(f" → Skip low-data bailout (keep evicting)") + print(f" → Eviction = DELETE file (not demote)") + + print(f"\n Writing 20 entries into {nvme_mb} MB NVMe...") + print(f"\n {'#':>4s} {'Key':<12s} {'Tier':<6s} {'NVMe KB':>8s} " + f"{'Files':>5s} {'Evict':>5s} {'Event'}") + print(f" {'─'*4} {'─'*12} {'─'*6} {'─'*8} {'─'*5} {'─'*5} {'─'*20}") + + prev_evictions = 0 + + for i in range(20): + key = f"req_{i}" + success, tier, lat = cache.allocate_cache(key, num_tokens=tokens) + + npy_count = len(list(nvme_dir.glob("*.npy"))) + evictions = cache.stats['evictions'] + new_ev = evictions - prev_evictions + + event = f"DELETE ×{new_ev}" if new_ev > 0 else "—" + + print(f" {i:>4d} {key:<12s} {tier:<6s} " + f"{cache.nvme_memory_used/1024:>8.0f} " + f"{npy_count:>5d} {evictions:>5d} {event}") + + prev_evictions = evictions + assert success, f"Allocation {i} must succeed on terminal tier" + + entries_alive = len(cache.cache_entries) + npy_final = len(list(nvme_dir.glob("*.npy"))) + + print(f"\n Final state:") + print(f" Entries in cache: {entries_alive}") + print(f" .npy on disk: {npy_final}") + print(f" Total evictions: {cache.stats['evictions']} (all were DELETEs)") + print(f" nvme_memory_used: {cache.nvme_memory_used / 1024:.0f} KB") + print(f" Offloads to CPU: {cache.stats['offloads_cpu']} (0 — no CPU tier)") + print(f" Offloads to NVMe: {cache.stats['offloads_storage']} (= every allocation, since NVMe is the only tier)") + + print(f"\n Note on 'offloads_storage':") + print(f" This counter increments for EVERY entry written to NVMe,") + print(f" whether by direct allocation or by demotion from CPU.") + print(f" In NVMe-only mode: offloads_storage = total allocations (20)") + print(f" In 3-tier mode: offloads_storage = CPU→NVMe demotions only") + + print(f"\n Key difference from 3-tier:") + print(f" 3-tier: eviction = DEMOTE to next tier (data preserved)") + print(f" 1-tier: eviction = DELETE from disk (data destroyed)") + print(f" Both use LRU ordering (oldest access first)") + + assert cache.stats['evictions'] > 0, "Evictions should have occurred" + assert cache.stats['offloads_cpu'] == 0, "No CPU demotions with cpu=0" + assert cache.nvme_memory_used >= 0, "No negative drift" + assert npy_final == entries_alive, \ + f"Disk files ({npy_final}) should match alive entries ({entries_alive})" + class TestBottleneckProfiling: """Profile bottleneck detection in the KV cache benchmark."""