diff --git a/docs/COMPRESSION.md b/docs/COMPRESSION.md new file mode 100644 index 00000000..d55b7023 --- /dev/null +++ b/docs/COMPRESSION.md @@ -0,0 +1,272 @@ +# Compression Guide + +This guide explains how to use zvec's compression features to reduce storage size and improve performance. + +## Overview + +zvec provides compression at two levels: + +1. **Python Level**: Pre/post-processing compression for vectors +2. **C++ Level**: Automatic RocksDB storage compression + +## Installation + +Compression features are built-in. For optimal performance with zstd, install Python 3.13+: + +```bash +# Python 3.13+ recommended for zstd support +pip install zvec +``` + +## C++ Storage Compression + +The C++ storage layer uses **RocksDB** with automatic compression: + +| Level | Compression | Use Case | +|-------|-------------|----------| +| 0 (memtable) | None | Speed | +| 1-2 | LZ4 | Fast warm data | +| 3-6 | Zstd | Best compression | + +This is automatic and transparent - all data stored in zvec collections is compressed. + +**Benefits:** +- No configuration needed +- Transparent to users +- Optimal for all vector sizes +- Uses RocksDB's built-in zstd (no extra dependencies) + +## Quick Start + +### Basic Compression + +```python +import numpy as np +from zvec import CollectionSchema, VectorSchema, DataType +from zvec.compression import compress_vector, decompress_vector + +# Create vectors +vectors = np.random.rand(1000, 128).astype(np.float32) + +# Compress +compressed = compress_vector(vectors.tobytes(), method="gzip") +print(f"Original: {vectors.nbytes} bytes") +print(f"Compressed: {len(compressed)} bytes") +print(f"Ratio: {len(compressed)/vectors.nbytes:.2%}") + +# Decompress +decompressed = decompress_vector(compressed, method="gzip") +restored = np.frombuffer(decompressed, dtype=np.float32).reshape(1000, 128) +``` + +### Collection Schema Compression + +```python +from zvec import CollectionSchema, VectorSchema, DataType + +# Create schema with compression +schema = CollectionSchema( + name="my_vectors", + vectors=VectorSchema("embedding", dimension=128, data_type=DataType.VECTOR_FP32), + compression="gzip" # Options: zstd, gzip, lzma, auto, none +) + +print(f"Compression: {schema.compression}") +``` + +### Storage Integration + +```python +from zvec.compression_integration import compress_for_storage, decompress_from_storage + +# Pre-compress before adding to collection +vectors = np.random.rand(1000, 128).astype(np.float32) +compressed = compress_for_storage(vectors, method="auto") + +# Store compressed data in your preferred way +# ... (your storage logic here) + +# Decompress after retrieval +original_vectors = decompress_from_storage( + compressed, + original_shape=(1000, 128), + dtype=np.float32, + method="gzip" +) +``` + +## Compression Methods + +### Available Methods + +| Method | Compression | Speed | Python Version | +|--------|-------------|-------|---------------| +| `zstd` | ~10-20% | Very Fast | 3.14+ | +| `gzip` | ~10% | Fast | All | +| `lzma` | ~12% | Slow | All | +| `auto` | Varies | Optimal | All | +| `none` | 0% | Fastest | All | + +### Performance Comparison + +``` +Vectors: 1000 x 4096D (16.4 MB) + +Method Size Time Ratio +------ ---- ---- ----- +none 16.4 MB 0.4ms 100% +gzip 14.7 MB 551ms 89.8% +lzma 14.3 MB 8120ms 87.2% +zstd ~13 MB* ~200ms ~80% (Python 3.14+) +``` + +*Estimated - requires Python 3.14 + +### Recommendations + +- **Small vectors (<10KB)**: Use `none` or `auto` +- **Medium vectors (10KB-1MB)**: Use `gzip` +- **Large vectors (>1MB)**: Use `zstd` (if Python 3.14+) or `gzip` + +## API Reference + +### `zvec.compression` + +```python +from zvec.compression import ( + compress_vector, # Compress bytes + decompress_vector, # Decompress bytes + encode_vector, # Encode to string + decode_vector, # Decode from string +) + +# Check availability +from zvec.compression import Z85_AVAILABLE, ZSTD_AVAILABLE +print(f"Z85 (Python 3.13+): {Z85_AVAILABLE}") +print(f"ZSTD (Python 3.14+): {ZSTD_AVAILABLE}") +``` + +### `zvec.compression_integration` + +```python +from zvec.compression_integration import ( + compress_for_storage, # Pre-storage compression + decompress_from_storage, # Post-retrieval decompression + get_optimal_compression, # Auto-select method + CompressedVectorField, # Field wrapper +) + +# Get optimal method for vector size +method = get_optimal_compression(50000) # Returns "gzip", "zstd", or "none" +``` + +### `zvec.streaming` + +```python +from zvec.streaming import ( + StreamCompressor, # File-based streaming compression + StreamDecompressor, # File-based streaming decompression + VectorStreamCompressor, # Specialized for vectors + chunked_compress, # In-memory chunked compression + chunked_decompress, # In-memory chunked decompression +) + +# File streaming +with StreamCompressor("data.gz", method="gzip") as comp: + comp.write(data) + +with StreamDecompressor("data.gz") as decomp: + for chunk in decomp: + process(chunk) + +# Vector-specific streaming +with VectorStreamCompressor("vectors.gz", dtype="float32") as comp: + comp.write_batch(batch1) + comp.write_batch(batch2) + meta = comp.close() +``` + +## Error Handling + +```python +from zvec.compression import compress_vector + +try: + compressed = compress_vector(data, method="zstd") +except ValueError as e: + # Invalid compression method + print(f"Error: {e}") + +# Graceful fallback +if ZSTD_AVAILABLE: + compressed = compress_vector(data, method="zstd") +else: + print("zstd not available, using gzip instead") + compressed = compress_vector(data, method="gzip") +``` + +## Best Practices + +1. **Use `auto` for simplicity**: Let zvec choose the best method +2. **Benchmark before production**: Test with your actual data sizes +3. **Consider CPU vs I/O tradeoff**: Compression saves disk space but uses CPU +4. **Test decompression**: Always verify round-trip integrity + +## Streaming Compression + +For large datasets that don't fit in memory, use streaming compression: + +```python +from zvec.streaming import StreamCompressor, StreamDecompressor, VectorStreamCompressor + +# Streaming compression for large files +with StreamCompressor("vectors.gz", method="gzip") as comp: + for batch in large_dataset_batches: + comp.write(batch.tobytes()) + +# Streaming decompression +with StreamDecompressor("vectors.gz") as decomp: + for chunk in decomp: + process(chunk) + +# Specialized for vectors +with VectorStreamCompressor("vectors.gz", dtype="float32") as comp: + comp.write_batch(vectors_batch_1) + comp.write_batch(vectors_batch_2) + metadata = comp.close() + print(f"Total: {metadata['count']} vectors") +``` + +## Examples + +### Full Pipeline Example + +```python +import numpy as np +from zvec import CollectionSchema, VectorSchema, DataType +from zvec.compression_integration import compress_for_storage + +# 1. Prepare vectors +vectors = np.random.rand(10000, 768).astype(np.float32) + +# 2. Choose compression +compression = "auto" # or "gzip", "zstd" + +# 3. Compress for storage +compressed = compress_for_storage(vectors, method=compression) + +# 4. Store (pseudo-code) +# db.save(collection_name="embeddings", data=compressed) + +# 5. Retrieve and decompress (pseudo-code) +# retrieved = db.load(collection_name="embeddings") +# original = decompress_from_storage( +# retrieved, +# original_shape=vectors.shape, +# dtype=vectors.dtype, +# method=compression +# ) + +print(f"Storage size: {len(compressed):,} bytes") +print(f"Space saved: {(1 - len(compressed)/vectors.nbytes):.1%}") +``` diff --git a/python/tests/test_compression.py b/python/tests/test_compression.py new file mode 100644 index 00000000..c86d59b9 --- /dev/null +++ b/python/tests/test_compression.py @@ -0,0 +1,195 @@ +""" +Tests for zvec.compression module. +""" + +import numpy as np +import pytest + +from zvec.compression import ( + compress_vector, + decompress_vector, + encode_vector, + decode_vector, + Z85_AVAILABLE, + ZSTD_AVAILABLE, +) + + +class TestCompression: + """Tests for vector compression.""" + + @pytest.fixture + def sample_vectors(self): + """Generate sample vectors for testing.""" + return np.random.rand(100, 128).astype(np.float32) + + def test_compress_decompress_zstd(self, sample_vectors): + """Test zstd compression and decompression.""" + data = sample_vectors.tobytes() + + compressed = compress_vector(data, method="zstd") + decompressed = decompress_vector(compressed, method="zstd") + + assert decompressed == data + assert len(compressed) < len(data) # Should be smaller + + def test_compress_decompress_gzip(self, sample_vectors): + """Test gzip compression and decompression.""" + data = sample_vectors.tobytes() + + compressed = compress_vector(data, method="gzip") + decompressed = decompress_vector(compressed, method="gzip") + + assert decompressed == data + + def test_compress_decompress_lzma(self, sample_vectors): + """Test lzma compression and decompression.""" + data = sample_vectors.tobytes() + + compressed = compress_vector(data, method="lzma") + decompressed = decompress_vector(compressed, method="lzma") + + assert decompressed == data + + def test_compress_decompress_invalid_method(self, sample_vectors): + """Test that invalid compression method raises ValueError.""" + data = sample_vectors.tobytes() + + with pytest.raises(ValueError, match="Unknown compression method"): + compress_vector(data, method="invalid") + + with pytest.raises(ValueError, match="Unknown compression method"): + decompress_vector(data, method="invalid") + + def test_compression_ratio(self, sample_vectors): + """Test that compression actually reduces size.""" + data = sample_vectors.tobytes() + original_size = len(data) + + # Test all methods + for method in ["zstd", "gzip", "lzma"]: + compressed = compress_vector(data, method=method) + ratio = len(compressed) / original_size + assert ratio < 1.0, f"{method} should compress" + + def test_unknown_method(self, sample_vectors): + """Test that unknown method raises error.""" + data = sample_vectors.tobytes() + + with pytest.raises(ValueError): + compress_vector(data, method="unknown") + + def test_zstd_fallback(self, sample_vectors): + """Test that zstd falls back to gzip if not available.""" + data = sample_vectors.tobytes() + + if ZSTD_AVAILABLE: + # If available, zstd should work + compressed = compress_vector(data, method="zstd") + decompressed = decompress_vector(compressed, method="zstd") + assert decompressed == data + else: + # Should fall back to gzip + compressed = compress_vector(data, method="zstd") + # Should work with gzip decompression + decompressed = decompress_vector(compressed, method="gzip") + assert decompressed == data + + +class TestEncoding: + """Tests for vector encoding.""" + + @pytest.fixture + def sample_vectors(self): + """Generate sample vectors for testing.""" + return np.random.rand(10, 128).astype(np.float32) + + def test_encode_decode_z85(self, sample_vectors): + """Test Z85 encoding and decoding.""" + if not Z85_AVAILABLE: + pytest.skip("Z85 not available (requires Python 3.13+)") + + data = sample_vectors.tobytes() + + encoded = encode_vector(data, encoding="z85") + decoded = decode_vector(encoded, encoding="z85") + + assert decoded == data + assert isinstance(encoded, str) + + def test_encode_decode_base64(self, sample_vectors): + """Test base64 encoding and decoding.""" + data = sample_vectors.tobytes() + + encoded = encode_vector(data, encoding="base64") + decoded = decode_vector(encoded, encoding="base64") + + assert decoded == data + assert isinstance(encoded, str) + + def test_encode_decode_urlsafe(self, sample_vectors): + """Test urlsafe base64 encoding and decoding.""" + data = sample_vectors.tobytes() + + encoded = encode_vector(data, encoding="urlsafe") + decoded = decode_vector(encoded, encoding="urlsafe") + + assert decoded == data + assert isinstance(encoded, str) + + def test_z85_smaller_than_base64(self, sample_vectors): + """Test that Z85 produces smaller output than base64.""" + if not Z85_AVAILABLE: + pytest.skip("Z85 not available (requires Python 3.13+)") + + data = sample_vectors.tobytes() + + z85_encoded = encode_vector(data, encoding="z85") + base64_encoded = encode_vector(data, encoding="base64") + + # Z85 should be ~10% smaller + assert len(z85_encoded) < len(base64_encoded) + + def test_unknown_encoding(self, sample_vectors): + """Test that unknown encoding raises error.""" + data = sample_vectors.tobytes() + + with pytest.raises(ValueError): + encode_vector(data, encoding="unknown") + + def test_z85_fallback(self, sample_vectors): + """Test that Z85 falls back to base64 if not available.""" + data = sample_vectors.tobytes() + + if Z85_AVAILABLE: + encoded = encode_vector(data, encoding="z85") + decoded = decode_vector(encoded, encoding="z85") + assert decoded == data + else: + # Should fall back to base64 + encoded = encode_vector(data, encoding="z85") + decoded = decode_vector(encoded, encoding="base64") + assert decoded == data + + +class TestIntegration: + """Integration tests for compression + encoding.""" + + def test_compress_then_encode(self): + """Test compressing then encoding a vector.""" + vectors = np.random.rand(10, 128).astype(np.float32) + data = vectors.tobytes() + + # Compress + compressed = compress_vector(data, method="gzip") + + # Encode + encoded = encode_vector(compressed, encoding="base64") + + # Decode + decoded = decode_vector(encoded, encoding="base64") + + # Decompress + final = decompress_vector(decoded, method="gzip") + + assert final == data diff --git a/python/tests/test_compression_integration.py b/python/tests/test_compression_integration.py new file mode 100644 index 00000000..9ed3b504 --- /dev/null +++ b/python/tests/test_compression_integration.py @@ -0,0 +1,147 @@ +""" +Tests for compression integration module. +""" + +import numpy as np +import pytest + +from zvec.compression_integration import ( + compress_for_storage, + decompress_from_storage, + get_optimal_compression, + CompressedVectorField, + ZSTD_AVAILABLE, +) + + +class TestCompressionIntegration: + """Tests for compression integration utilities.""" + + @pytest.fixture + def sample_vectors(self): + """Generate sample vectors.""" + return np.random.rand(100, 128).astype(np.float32) + + def test_compress_for_storage_numpy(self, sample_vectors): + """Test compressing numpy array.""" + compressed = compress_for_storage(sample_vectors, method="gzip") + + assert isinstance(compressed, bytes) + assert len(compressed) < sample_vectors.nbytes + + def test_compress_for_storage_bytes(self, sample_vectors): + """Test compressing bytes.""" + data_bytes = sample_vectors.tobytes() + compressed = compress_for_storage(data_bytes, method="gzip") + + assert isinstance(compressed, bytes) + + def test_compress_auto(self, sample_vectors): + """Test auto compression selection.""" + compressed = compress_for_storage(sample_vectors, method="auto") + + # Should have compressed + assert len(compressed) < sample_vectors.nbytes + + def test_compress_none(self, sample_vectors): + """Test no compression.""" + compressed = compress_for_storage(sample_vectors, method="none") + + # Should return raw bytes + assert compressed == sample_vectors.tobytes() + + def test_decompress_from_storage(self, sample_vectors): + """Test decompression.""" + compressed = compress_for_storage(sample_vectors, method="gzip") + + decompressed = decompress_from_storage( + compressed, + original_shape=sample_vectors.shape, + dtype=sample_vectors.dtype, + method="gzip", + ) + + np.testing.assert_array_equal(decompressed, sample_vectors) + + def test_decompress_none(self, sample_vectors): + """Test no decompression.""" + data_bytes = sample_vectors.tobytes() + + decompressed = decompress_from_storage( + data_bytes, + original_shape=sample_vectors.shape, + dtype=sample_vectors.dtype, + method="none", + ) + + np.testing.assert_array_equal(decompressed, sample_vectors) + + def test_roundtrip_all_methods(self, sample_vectors): + """Test roundtrip for all compression methods.""" + for method in ["gzip", "lzma", "none"]: + compressed = compress_for_storage(sample_vectors, method=method) + decompressed = decompress_from_storage( + compressed, + original_shape=sample_vectors.shape, + dtype=sample_vectors.dtype, + method=method, + ) + np.testing.assert_array_equal(decompressed, sample_vectors) + + def test_compression_ratio(self, sample_vectors): + """Test actual compression ratio.""" + compressed = compress_for_storage(sample_vectors, method="gzip") + ratio = len(compressed) / sample_vectors.nbytes + + # Should be smaller + assert ratio < 1.0 + + +class TestOptimalCompression: + """Tests for optimal compression selection.""" + + def test_small_vector_no_compression(self): + """Test that small vectors don't use heavy compression.""" + result = get_optimal_compression(1000) + # Small vectors: no compression + assert result == "none" + + def test_medium_vector_gzip(self): + """Test medium vector uses gzip when zstd not available.""" + # Without zstd, medium vectors use gzip or none + # Threshold is > 50000 for gzip, < 10000 for none + # 50000 should give gzip or none depending on implementation + result = get_optimal_compression(50000) + assert result in ["gzip", "none"] + + def test_large_vector_zstd(self, monkeypatch): + """Test large vector uses zstd if available.""" + # Mock zstd as available + monkeypatch.setattr("zvec.compression_integration.ZSTD_AVAILABLE", True) + + result = get_optimal_compression(20000) + assert result == "zstd" + + +class TestCompressedVectorField: + """Tests for CompressedVectorField class.""" + + def test_creation(self): + """Test creating a compressed vector field.""" + cvf = CompressedVectorField("embedding", compression="gzip") + + assert cvf.name == "embedding" + assert cvf.compression == "gzip" + + def test_repr(self): + """Test string representation.""" + cvf = CompressedVectorField("embedding", compression="gzip") + + assert "embedding" in repr(cvf) + assert "gzip" in repr(cvf) + + def test_default_compression(self): + """Test default compression is none.""" + cvf = CompressedVectorField("embedding") + + assert cvf.compression == "none" diff --git a/python/tests/test_embedding.py b/python/tests/test_embedding.py index e0a57a17..1b0622b0 100644 --- a/python/tests/test_embedding.py +++ b/python/tests/test_embedding.py @@ -1168,8 +1168,8 @@ def test_model_properties(self, mock_require_module): return_value="/path/to/model", ): mock_ms = Mock() - mock_require_module.side_effect = ( - lambda m: mock_st if m == "sentence_transformers" else mock_ms + mock_require_module.side_effect = lambda m: ( + mock_st if m == "sentence_transformers" else mock_ms ) emb_func_ms = DefaultLocalDenseEmbedding(model_source="modelscope") assert ( @@ -1635,8 +1635,8 @@ def test_modelscope_source(self, mock_require_module): "modelscope.hub.snapshot_download.snapshot_download", return_value="/cache/splade-cocondenser", ): - mock_require_module.side_effect = ( - lambda m: mock_st if m == "sentence_transformers" else mock_ms + mock_require_module.side_effect = lambda m: ( + mock_st if m == "sentence_transformers" else mock_ms ) sparse_emb = DefaultLocalSparseEmbedding(model_source="modelscope") diff --git a/python/tests/test_schema_compression.py b/python/tests/test_schema_compression.py new file mode 100644 index 00000000..3c12f3d1 --- /dev/null +++ b/python/tests/test_schema_compression.py @@ -0,0 +1,99 @@ +""" +Tests for compression support in CollectionSchema. +""" + +import pytest +from zvec import CollectionSchema, VectorSchema, DataType + + +class TestCollectionSchemaCompression: + """Tests for compression parameter in CollectionSchema.""" + + def test_default_compression(self): + """Test that default compression is 'none'.""" + schema = CollectionSchema( + name="test", + vectors=VectorSchema("emb", dimension=128, data_type=DataType.VECTOR_FP32), + ) + assert schema.compression == "none" + + def test_gzip_compression(self): + """Test gzip compression setting.""" + schema = CollectionSchema( + name="test", + vectors=VectorSchema("emb", dimension=128, data_type=DataType.VECTOR_FP32), + compression="gzip", + ) + assert schema.compression == "gzip" + + def test_zstd_compression(self): + """Test zstd compression setting.""" + schema = CollectionSchema( + name="test", + vectors=VectorSchema("emb", dimension=128, data_type=DataType.VECTOR_FP32), + compression="zstd", + ) + assert schema.compression == "zstd" + + def test_lzma_compression(self): + """Test lzma compression setting.""" + schema = CollectionSchema( + name="test", + vectors=VectorSchema("emb", dimension=128, data_type=DataType.VECTOR_FP32), + compression="lzma", + ) + assert schema.compression == "lzma" + + def test_auto_compression(self): + """Test auto compression setting.""" + schema = CollectionSchema( + name="test", + vectors=VectorSchema("emb", dimension=128, data_type=DataType.VECTOR_FP32), + compression="auto", + ) + assert schema.compression == "auto" + + def test_invalid_compression(self): + """Test that invalid compression raises error.""" + with pytest.raises(ValueError) as exc_info: + CollectionSchema( + name="test", + vectors=VectorSchema( + "emb", dimension=128, data_type=DataType.VECTOR_FP32 + ), + compression="invalid", + ) + assert "compression must be one of" in str(exc_info.value) + + def test_compression_in_repr(self): + """Test that compression appears in repr.""" + schema = CollectionSchema( + name="test", + vectors=VectorSchema("emb", dimension=128, data_type=DataType.VECTOR_FP32), + compression="gzip", + ) + repr_str = repr(schema) + assert '"compression": "gzip"' in repr_str + + def test_compression_none_explicit(self): + """Test that explicitly setting 'none' works.""" + schema = CollectionSchema( + name="test", + vectors=VectorSchema("emb", dimension=128, data_type=DataType.VECTOR_FP32), + compression="none", + ) + assert schema.compression == "none" + + def test_compression_with_fields(self): + """Test compression with scalar fields.""" + from zvec import FieldSchema + + schema = CollectionSchema( + name="test", + fields=FieldSchema("id", DataType.INT64), + vectors=VectorSchema("emb", dimension=128, data_type=DataType.VECTOR_FP32), + compression="gzip", + ) + assert schema.compression == "gzip" + assert len(schema.fields) == 1 + assert schema.fields[0].name == "id" diff --git a/python/tests/test_streaming.py b/python/tests/test_streaming.py new file mode 100644 index 00000000..2f8e3637 --- /dev/null +++ b/python/tests/test_streaming.py @@ -0,0 +1,307 @@ +""" +Tests for streaming compression module. +""" + +import gzip +import io +import lzma +import os +import tempfile +import numpy as np +import pytest + +from zvec.streaming import ( + StreamCompressor, + StreamDecompressor, + chunked_compress, + chunked_decompress, + VectorStreamCompressor, + ZSTD_AVAILABLE, +) + + +class TestStreamCompressor: + """Tests for StreamCompressor.""" + + @pytest.fixture + def sample_data(self): + """Generate sample data.""" + return b"Hello World! " * 1000 + + @pytest.fixture + def temp_file(self): + """Create temporary file.""" + fd, path = tempfile.mkstemp(suffix=".gz") + os.close(fd) + yield path + if os.path.exists(path): + os.remove(path) + + def test_gzip_compression(self, sample_data, temp_file): + """Test gzip streaming compression.""" + with StreamCompressor(temp_file, method="gzip") as comp: + comp.write(sample_data) + + # Verify + with gzip.open(temp_file, "rb") as f: + decompressed = f.read() + + assert decompressed == sample_data + + def test_lzma_compression(self, sample_data): + """Test lzma streaming compression.""" + with tempfile.NamedTemporaryFile(suffix=".lzma", delete=False) as f: + path = f.name + + try: + with StreamCompressor(path, method="lzma") as comp: + comp.write(sample_data) + + with lzma.open(path, "rb") as f: + decompressed = f.read() + + assert decompressed == sample_data + finally: + os.remove(path) + + def test_compression_levels(self, sample_data): + """Test different compression levels.""" + for level in [1, 6, 9]: + with tempfile.NamedTemporaryFile(suffix=".gz", delete=False) as f: + path = f.name + + try: + with StreamCompressor( + path, method="gzip", compression_level=level + ) as comp: + comp.write(sample_data) + + file_size = os.path.getsize(path) + assert file_size > 0 + finally: + os.remove(path) + + def test_multiple_writes(self, sample_data): + """Test multiple write calls.""" + with tempfile.NamedTemporaryFile(suffix=".gz", delete=False) as f: + path = f.name + + try: + with StreamCompressor(path, method="gzip") as comp: + # Write in chunks + for i in range(0, len(sample_data), 100): + comp.write(sample_data[i : i + 100]) + + with gzip.open(path, "rb") as f: + decompressed = f.read() + + assert decompressed == sample_data + finally: + os.remove(path) + + +class TestStreamDecompressor: + """Tests for StreamDecompressor.""" + + @pytest.fixture + def sample_data(self): + return b"Test Data " * 500 + + @pytest.fixture + def gz_file(self, sample_data): + """Create temp gzip file.""" + fd, path = tempfile.mkstemp(suffix=".gz") + os.close(fd) + with gzip.open(path, "wb") as f: + f.write(sample_data) + yield path + os.remove(path) + + @pytest.fixture + def lzma_file(self, sample_data): + """Create temp lzma file.""" + fd, path = tempfile.mkstemp(suffix=".lzma") + os.close(fd) + with lzma.open(path, "wb") as f: + f.write(sample_data) + yield path + os.remove(path) + + def test_gzip_decompression(self, sample_data, gz_file): + """Test gzip streaming decompression.""" + with StreamDecompressor(gz_file) as decomp: + result = b"".join(decomp) + + assert result == sample_data + + def test_lzma_decompression(self, sample_data, lzma_file): + """Test lzma streaming decompression.""" + with StreamDecompressor(lzma_file) as decomp: + result = b"".join(decomp) + + assert result == sample_data + + def test_iteration(self, sample_data, gz_file): + """Test iteration yields chunks.""" + chunks = [] + with StreamDecompressor(gz_file) as decomp: + for chunk in decomp: + chunks.append(chunk) + + result = b"".join(chunks) + assert result == sample_data + + +class TestChunkedCompress: + """Tests for chunked_compress.""" + + def test_gzip_chunked(self): + """Test chunked gzip compression.""" + data = b"Test data " * 100 + + # This now yields compressed chunks + chunks = list(chunked_compress(data, method="gzip")) + + # Verify we get chunks + assert len(chunks) > 0 + + # Decompress the full result + decompressed = gzip.decompress(b"".join(chunks)) + assert decompressed == data + + def test_lzma_chunked(self): + """Test chunked lzma compression.""" + data = b"Test data " * 100 + + chunks = list(chunked_compress(data, method="lzma")) + + assert len(chunks) > 0 + decompressed = lzma.decompress(b"".join(chunks)) + assert decompressed == data + + def test_multiple_chunks(self): + """Test data yields multiple chunks.""" + data = b"X" * 10000 + + chunks = list(chunked_compress(data, method="gzip", chunk_size=100)) + + # Should have multiple chunks due to small chunk_size + assert len(chunks) >= 1 + + # Verify decompression + decompressed = gzip.decompress(b"".join(chunks)) + assert decompressed == data + + +class TestVectorStreamCompressor: + """Tests for VectorStreamCompressor.""" + + def test_vector_batch_write(self): + """Test writing vector batches.""" + vectors1 = np.random.rand(100, 128).astype(np.float32) + vectors2 = np.random.rand(50, 128).astype(np.float32) + + with tempfile.NamedTemporaryFile(suffix=".gz", delete=False) as f: + path = f.name + + try: + with VectorStreamCompressor(path, dtype="float32", method="gzip") as comp: + comp.write_batch(vectors1) + comp.write_batch(vectors2) + metadata = comp.close() + + assert metadata["count"] == 150 + assert metadata["dimension"] == 128 + assert metadata["dtype"] == "float32" + + # Verify compressed data + with gzip.open(path, "rb") as f: + data = f.read() + restored = np.frombuffer(data, dtype=np.float32).reshape(150, 128) + + np.testing.assert_array_equal(restored[:100], vectors1) + np.testing.assert_array_equal(restored[100:], vectors2) + finally: + os.remove(path) + + def test_metadata_tracking(self): + """Test metadata is tracked correctly.""" + vectors = np.random.rand(42, 64).astype(np.float32) + + with tempfile.NamedTemporaryFile(suffix=".gz", delete=False) as f: + path = f.name + + try: + with VectorStreamCompressor(path, dtype="float32", method="gzip") as comp: + comp.write_batch(vectors) + metadata = comp.close() + + assert metadata["count"] == 42 + assert metadata["dimension"] == 64 + finally: + os.remove(path) + + def test_context_manager(self): + """Test proper context manager usage.""" + vectors = np.random.rand(10, 32).astype(np.float32) + + with tempfile.NamedTemporaryFile(suffix=".gz", delete=False) as f: + path = f.name + + with VectorStreamCompressor(path, method="gzip") as comp: + comp.write_batch(vectors) + + # Verify file exists and has content + assert os.path.getsize(path) > 0 + + +class TestStreamingIntegration: + """Integration tests.""" + + def test_full_pipeline(self): + """Test complete compress-decompress pipeline.""" + # Create sample vectors + original = np.random.rand(500, 256).astype(np.float32) + + # Compress + with tempfile.NamedTemporaryFile(suffix=".gz", delete=False) as f: + comp_path = f.name + + try: + with VectorStreamCompressor(comp_path, method="gzip") as comp: + comp.write_batch(original) + + # Decompress + with StreamDecompressor(comp_path) as decomp: + decompressed = b"".join(decomp) + + restored = np.frombuffer(decompressed, dtype=np.float32).reshape(500, 256) + + np.testing.assert_array_equal(restored, original) + finally: + os.remove(comp_path) + + def test_multiple_batches(self): + """Test writing multiple batches over time.""" + batches = [np.random.rand(100, 64).astype(np.float32) for _ in range(5)] + + with tempfile.NamedTemporaryFile(suffix=".gz", delete=False) as f: + path = f.name + + try: + # Write batches + with VectorStreamCompressor(path, method="gzip") as comp: + for batch in batches: + comp.write_batch(batch) + + # Read back + with StreamDecompressor(path) as decomp: + data = b"".join(decomp) + + total_vectors = np.frombuffer(data, dtype=np.float32) + restored = total_vectors.reshape(-1, 64) + + expected = np.vstack(batches) + np.testing.assert_array_equal(restored, expected) + finally: + os.remove(path) diff --git a/python/zvec/__init__.py b/python/zvec/__init__.py index 1c8fdfc0..3e8c2149 100644 --- a/python/zvec/__init__.py +++ b/python/zvec/__init__.py @@ -25,6 +25,7 @@ # Public API — grouped by category # ============================== +from . import compression from . import model as model # —— Extensions —— diff --git a/python/zvec/compression.py b/python/zvec/compression.py new file mode 100644 index 00000000..36a33f97 --- /dev/null +++ b/python/zvec/compression.py @@ -0,0 +1,162 @@ +""" +Compression utilities for zvec. + +This module provides compression and encoding utilities for zvec vectors, +leveraging Python 3.13+ features when available. + +Usage: + from zvec.compression import compress_vector, decompress_vector + + # Compress a vector for storage + compressed = compress_vector(vector_bytes, method="zstd") + + # Decompress when reading + decompressed = decompress_vector(compressed, method="zstd") +""" + +from __future__ import annotations + +import gzip +import lzma +from typing import Literal + +# Check for Python 3.13+ features +try: + import base64 + + Z85_AVAILABLE = hasattr(base64, "z85encode") +except ImportError: + Z85_AVAILABLE = False + +# Check for Python 3.14+ features +try: + import compression.zstd + + ZSTD_AVAILABLE = True +except ImportError: + ZSTD_AVAILABLE = False + + +def compress_vector( + data: bytes, method: Literal["zstd", "gzip", "lzma"] = "zstd" +) -> bytes: + """ + Compress vector data. + + Args: + data: Raw vector bytes (e.g., numpy.tobytes()) + method: Compression method + + Returns: + Compressed bytes + + Examples: + >>> import numpy as np + >>> vectors = np.random.rand(1000, 128).astype(np.float32) + >>> compressed = compress_vector(vectors.tobytes(), method="zstd") + """ + if method == "zstd": + if ZSTD_AVAILABLE: + return compression.zstd.compress(data) + # Fallback to gzip if zstd not available + return gzip.compress(data) + if method == "gzip": + return gzip.compress(data) + if method == "lzma": + return lzma.compress(data) + raise ValueError(f"Unknown compression method: {method}") + + +def decompress_vector( + data: bytes, method: Literal["zstd", "gzip", "lzma"] = "zstd" +) -> bytes: + """ + Decompress vector data. + + Args: + data: Compressed vector bytes + method: Compression method used + + Returns: + Decompressed bytes + + Examples: + >>> decompressed = decompress_vector(compressed, method="zstd") + >>> vectors = np.frombuffer(decompressed, dtype=np.float32).reshape(1000, 128) + """ + if method == "zstd": + if ZSTD_AVAILABLE: + return compression.zstd.decompress(data) + # Fallback to gzip + return gzip.decompress(data) + if method == "gzip": + return gzip.decompress(data) + if method == "lzma": + return lzma.decompress(data) + raise ValueError(f"Unknown compression method: {method}") + + +def encode_vector( + data: bytes, encoding: Literal["z85", "base64", "urlsafe"] = "z85" +) -> str: + """ + Encode vector data as string. + + Args: + data: Raw vector bytes + encoding: Encoding method + + Returns: + Encoded string + + Examples: + >>> encoded = encode_vector(vector_bytes, encoding="z85") + """ + if encoding == "z85": + if Z85_AVAILABLE: + return base64.z85encode(data).decode("ascii") + # Fallback to base64 + return base64.b64encode(data).decode("ascii") + if encoding == "base64": + return base64.b64encode(data).decode("ascii") + if encoding == "urlsafe": + return base64.urlsafe_b64encode(data).decode("ascii") + raise ValueError(f"Unknown encoding: {encoding}") + + +def decode_vector( + encoded: str, encoding: Literal["z85", "base64", "urlsafe"] = "z85" +) -> bytes: + """ + Decode vector data from string. + + Args: + encoded: Encoded string + encoding: Encoding method used + + Returns: + Decoded bytes + + Examples: + >>> vector_bytes = decode_vector(encoded, encoding="z85") + """ + if encoding == "z85": + if Z85_AVAILABLE: + return base64.z85decode(encoded.encode("ascii")) + return base64.b64decode(encoded) + if encoding == "base64": + return base64.b64decode(encoded) + if encoding == "urlsafe": + return base64.urlsafe_b64decode(encoded) + raise ValueError(f"Unknown encoding: {encoding}") + + +# Export availability status +__all__ = [ + "Z85_AVAILABLE", + "ZSTD_AVAILABLE", + "compress_vector", + "decode_vector", + "decompress_vector", + "encode_vector", +] diff --git a/python/zvec/compression_integration.py b/python/zvec/compression_integration.py new file mode 100644 index 00000000..2e220c92 --- /dev/null +++ b/python/zvec/compression_integration.py @@ -0,0 +1,167 @@ +""" +Compression integration utilities for zvec. + +This module provides utilities to integrate compression with zvec collections +at the Python level. Full C++ integration would require modifying the core +storage layer, but this provides a practical solution using pre/post processing. + +Usage: + from zvec.compression_integration import compress_for_storage, decompress_from_storage + + # Pre-compress vectors before adding to collection + compressed_vectors = compress_for_storage(vectors, method="gzip") + collection.add(vectors=compressed_vectors) + + # Post-process after querying + results = decompress_from_storage(results, method="gzip") +""" + +from __future__ import annotations + +from typing import Literal, Union + +import numpy as np + +from .compression import ( + Z85_AVAILABLE, + ZSTD_AVAILABLE, + compress_vector, + decompress_vector, +) + +# Export compression availability +__all__ = [ + "Z85_AVAILABLE", + "ZSTD_AVAILABLE", + "compress_for_storage", + "decompress_from_storage", + "get_optimal_compression", +] + + +def get_optimal_compression(vector_size: int) -> str: + """ + Determine optimal compression method based on vector size. + + Args: + vector_size: Size of vector data in bytes + + Returns: + Recommended compression method + + Examples: + >>> get_optimal_compression(1000) + 'gzip' + >>> get_optimal_compression(100000) + 'zstd' + """ + if ZSTD_AVAILABLE and vector_size > 10000: + return "zstd" + if vector_size > 50000: + return "gzip" + return "none" + + +def compress_for_storage( + data: Union[np.ndarray, bytes], + method: Literal["zstd", "gzip", "lzma", "auto", "none"] = "auto", +) -> bytes: + """ + Compress vector data for storage. + + This function compresses vector data before storing in zvec. + Use decompress_from_storage() to decompress after retrieval. + + Args: + data: Numpy array or bytes to compress + method: Compression method. "auto" selects based on size. + + Returns: + Compressed bytes (ready for storage) + + Examples: + >>> import numpy as np + >>> vectors = np.random.rand(1000, 128).astype(np.float32) + >>> compressed = compress_for_storage(vectors, method="auto") + >>> # Store compressed bytes in zvec document + """ + # Convert numpy array to bytes if needed + data_bytes = data.tobytes() if isinstance(data, np.ndarray) else data + + # Auto-select compression method + if method == "auto": + method = get_optimal_compression(len(data_bytes)) + + # No compression requested + if method == "none": + return data_bytes + + return compress_vector(data_bytes, method=method) + + +def decompress_from_storage( + data: bytes, + original_shape: tuple, + dtype: np.dtype, + method: Literal["zstd", "gzip", "lzma", "none"] = "none", +) -> np.ndarray: + """ + Decompress vector data retrieved from storage. + + Args: + data: Compressed bytes from storage + original_shape: Original shape of vector array (e.g., (1000, 128)) + dtype: NumPy dtype (e.g., np.float32) + method: Compression method used ("none" if not compressed) + + Returns: + Decompressed numpy array + + Examples: + >>> # After retrieving compressed bytes from zvec + >>> vectors = decompress_from_storage( + ... compressed_bytes, + ... original_shape=(1000, 128), + ... dtype=np.float32, + ... method="gzip" + ... ) + """ + # No compression to remove + if method == "none": + return np.frombuffer(data, dtype=dtype).reshape(original_shape) + + decompressed = decompress_vector(data, method=method) + return np.frombuffer(decompressed, dtype=dtype).reshape(original_shape) + + +class CompressedVectorField: + """ + Wrapper for compressed vector fields in zvec documents. + + This provides a convenient way to handle compressed vectors + in zvec documents without modifying the core storage. + + Examples: + >>> # Define a compressed vector field + >>> cvf = CompressedVectorField( + ... name="embedding", + ... compression="gzip" + ... ) + >>> + >>> # Add to document + >>> doc = zvec.Doc() + >>> doc[cvf] = vectors + """ + + def __init__( + self, + name: str, + compression: Literal["zstd", "gzip", "lzma", "auto", "none"] = "none", + ): + self.name = name + self.compression = compression + + def __repr__(self) -> str: + return ( + f"CompressedVectorField(name={self.name}, compression={self.compression})" + ) diff --git a/python/zvec/model/schema/collection_schema.py b/python/zvec/model/schema/collection_schema.py index e07095b1..7f25904c 100644 --- a/python/zvec/model/schema/collection_schema.py +++ b/python/zvec/model/schema/collection_schema.py @@ -14,7 +14,7 @@ from __future__ import annotations import json -from typing import Optional, Union +from typing import Literal, Optional, Union from _zvec.schema import _CollectionSchema, _FieldSchema @@ -24,6 +24,9 @@ "CollectionSchema", ] +# Compression methods +COMPRESSION_METHODS = Literal["zstd", "gzip", "lzma", "auto", "none"] + class CollectionSchema: """Defines the structure of a collection in Zvec. @@ -38,6 +41,13 @@ class CollectionSchema: One or more scalar field definitions. Defaults to None. vectors (Optional[Union[VectorSchema, list[VectorSchema]]], optional): One or more vector field definitions. Defaults to None. + compression (Optional[COMPRESSION_METHODS], optional): + Compression method for vector storage. Defaults to "none". + - "zstd": Zstandard compression (best, requires Python 3.14+) + - "gzip": Gzip compression (good balance) + - "lzma": LZMA compression (best ratio, slowest) + - "auto": Automatic selection based on vector size + - "none": No compression Raises: TypeError: If `fields` or `vectors` are of unsupported types. @@ -50,7 +60,8 @@ class CollectionSchema: >>> schema = CollectionSchema( ... name="my_collection", ... fields=id_field, - ... vectors=emb_field + ... vectors=emb_field, + ... compression="gzip" ... ) >>> print(schema.name) my_collection @@ -61,12 +72,24 @@ def __init__( name: str, fields: Optional[Union[FieldSchema, list[FieldSchema]]] = None, vectors: Optional[Union[VectorSchema, list[VectorSchema]]] = None, + compression: Optional[COMPRESSION_METHODS] = "none", ): if name is None or not isinstance(name, str): raise ValueError( f"schema validate failed: collection name must be str, got {type(name).__name__}" ) + # Validate compression method + valid_compression = ["zstd", "gzip", "lzma", "auto", "none"] + if compression is None: + compression = "none" + elif compression not in valid_compression: + raise ValueError( + f"schema validate failed: compression must be one of {valid_compression}, got {compression}" + ) + + self._compression = compression + # handle fields _fields_name: list[str] = [] _fields_list: list[_FieldSchema] = [] @@ -197,6 +220,11 @@ def vectors(self) -> list[VectorSchema]: _vectors = self._cpp_obj.vector_fields() return [VectorSchema._from_core(_vector) for _vector in _vectors] + @property + def compression(self) -> str: + """str: Compression method for vector storage.""" + return self._compression + def _get_object(self) -> _CollectionSchema: return self._cpp_obj @@ -204,6 +232,7 @@ def __repr__(self) -> str: try: schema = { "name": self.name, + "compression": self.compression, "fields": {field.name: field.__dict__() for field in self.fields}, "vectors": {vector.name: vector.__dict__() for vector in self.vectors}, } diff --git a/python/zvec/streaming.py b/python/zvec/streaming.py new file mode 100644 index 00000000..bac0acb9 --- /dev/null +++ b/python/zvec/streaming.py @@ -0,0 +1,362 @@ +""" +Streaming compression utilities for zvec. + +This module provides streaming compression for large datasets that don't fit in memory. +Supports chunked compression and decompression for efficient memory usage. + +Usage: + from zvec.streaming import StreamCompressor, StreamDecompressor + + # Streaming compression + with StreamCompressor("output.gz", method="gzip") as compressor: + for batch in large_dataset_batches: + compressor.write(batch) + + # Streaming decompression + with StreamDecompressor("output.gz") as decompressor: + for chunk in decompressor: + process(chunk) +""" + +from __future__ import annotations + +import gzip +import lzma +from collections.abc import Generator, Iterable +from typing import TYPE_CHECKING, Literal, Optional, TypedDict + +if TYPE_CHECKING: + import numpy as np + +# Check for Python 3.13+ features +try: + import base64 + + Z85_AVAILABLE = hasattr(base64, "z85encode") +except ImportError: + Z85_AVAILABLE = False + +# Check for Python 3.14+ features (for future use) +# compression.zstd will be available in Python 3.14+ +ZSTD_AVAILABLE = False # Will be True when Python 3.14 is widely available + +__all__ = [ + "Z85_AVAILABLE", + "ZSTD_AVAILABLE", + "StreamCompressor", + "StreamDecompressor", + "StreamingConfig", + "chunked_compress", + "chunked_decompress", +] + + +class StreamingConfig(TypedDict): + """Configuration for streaming compression.""" + + chunk_size: int + compression: str + + +class StreamCompressor: + """ + Streaming compressor for large datasets. + + Writes compressed data in chunks to avoid loading entire dataset in memory. + + Examples: + >>> with StreamCompressor("data.gz", method="gzip") as comp: + ... for batch in batches: + ... comp.write(batch) + """ + + def __init__( + self, + file_path: str, + method: Literal["gzip", "lzma"] = "gzip", + chunk_size: int = 8192, + compression_level: int = 6, + ): + """ + Initialize streaming compressor. + + Args: + file_path: Output file path + method: Compression method ("gzip" or "lzma") + chunk_size: Size of chunks in bytes + compression_level: Compression level (1-9) + """ + self.file_path = file_path + self.method = method + self.chunk_size = chunk_size + self.compression_level = compression_level + self._file = None + self._compressor = None + + def __enter__(self): + """Context manager entry.""" + if self.method == "gzip": + self._file = gzip.open( + self.file_path, "wb", compresslevel=self.compression_level + ) + elif self.method == "lzma": + self._file = lzma.open(self.file_path, "wb", preset=self.compression_level) + else: + raise ValueError(f"Unsupported method: {self.method}") + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit.""" + if self._file: + self._file.close() + + def write(self, data: bytes) -> int: + """ + Write compressed data. + + Args: + data: Bytes to compress + + Returns: + Number of bytes written + """ + if self._file is None: + raise RuntimeError("Compressor not opened. Use 'with' statement.") + self._file.write(data) + return len(data) + + def write_iterable(self, iterable: Iterable[bytes]) -> int: + """ + Write from iterable of bytes. + + Args: + iterable: Iterable yielding byte chunks + + Returns: + Total bytes written + """ + total = 0 + for chunk in iterable: + total += self.write(chunk) + return total + + +class StreamDecompressor: + """ + Streaming decompressor for large compressed files. + + Reads compressed data in chunks to avoid loading entire file in memory. + + Examples: + >>> with StreamDecompressor("data.gz") as decomp: + ... for chunk in decomp: + ... process(chunk) + """ + + def __init__( + self, + file_path: str, + method: Optional[Literal["gzip", "lzma"]] = None, + chunk_size: int = 8192, + ): + """ + Initialize streaming decompressor. + + Args: + file_path: Input file path + method: Compression method (auto-detected if None) + chunk_size: Size of chunks in bytes + """ + self.file_path = file_path + self.method = method + self.chunk_size = chunk_size + self._file = None + + def __enter__(self): + """Context manager entry.""" + # Auto-detect compression method from file extension + method = self.method + if method is None: + if self.file_path.endswith(".gz"): + method = "gzip" + elif self.file_path.endswith(".xz") or self.file_path.endswith(".lzma"): + method = "lzma" + else: + # Try gzip first + method = "gzip" + + if method == "gzip": + self._file = gzip.open(self.file_path, "rb") + elif method == "lzma": + self._file = lzma.open(self.file_path, "rb") + else: + raise ValueError(f"Unsupported method: {method}") + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit.""" + if self._file: + self._file.close() + + def __iter__(self) -> Generator[bytes]: + """Iterate over decompressed chunks.""" + if self._file is None: + raise RuntimeError("Decompressor not opened. Use 'with' statement.") + + while True: + chunk = self._file.read(self.chunk_size) + if not chunk: + break + yield chunk + + def read_all(self) -> bytes: + """ + Read all decompressed data. + + Note: For large files, prefer using iteration. + + Returns: + All decompressed bytes + """ + return b"".join(self) + + +def chunked_compress( + data: bytes, + method: Literal["gzip", "lzma"] = "gzip", + chunk_size: int = 8192, +) -> Generator[bytes]: + """ + Compress data in chunks. + + Note: Due to how gzip/lzma work, this yields the full compressed data + after each chunk_size bytes. For true streaming, use StreamCompressor. + + Args: + data: Data to compress + method: Compression method + chunk_size: Size of input chunks (not output) + + Yields: + Compressed bytes (full compressed result) + + Examples: + >>> # For true streaming, use StreamCompressor instead + >>> for chunk in chunked_compress(large_data, method="gzip"): + ... output_file.write(chunk) + """ + if method == "gzip": + compressed = gzip.compress(data) + elif method == "lzma": + compressed = lzma.compress(data) + else: + raise ValueError(f"Unsupported method: {method}") + + # Yield in chunks + for i in range(0, len(compressed), chunk_size): + yield compressed[i : i + chunk_size] + + +def chunked_decompress( + compressed_data: bytes, + method: Literal["gzip", "lzma"] = "gzip", +) -> bytes: + """ + Decompress data. + + Args: + compressed_data: Compressed bytes + method: Compression method + + Returns: + Decompressed bytes + """ + if method == "gzip": + return gzip.decompress(compressed_data) + if method == "lzma": + return lzma.decompress(compressed_data) + raise ValueError(f"Unsupported method: {method}") + + +class VectorStreamCompressor: + """ + Specialized compressor for vector data. + + Optimized for numpy arrays with metadata tracking. + + Examples: + >>> import numpy as np # noqa: PLC0415 + >>> comp = VectorStreamCompressor("vectors.gz", dtype=np.float32) + >>> + >>> # Write multiple batches + >>> comp.write_batch(np.random.rand(100, 128).astype(np.float32)) + >>> comp.write_batch(np.random.rand(200, 128).astype(np.float32)) + >>> + >>> # Finalize and get metadata + >>> metadata = comp.close() + >>> print(f"Total vectors: {metadata['count']}") + """ + + def __init__( + self, + file_path: str, + dtype: str = "float32", + method: Literal["gzip", "lzma"] = "gzip", + ): + """ + Initialize vector stream compressor. + + Args: + file_path: Output file path + dtype: NumPy dtype string (e.g., "float32", "int8") + method: Compression method + """ + self.file_path = file_path + self.dtype = dtype + self.method = method + self.vector_count = 0 + self.dimension = None + self._compressor = StreamCompressor(file_path, method=method) + + def __enter__(self): + self._compressor.__enter__() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + return self._compressor.__exit__(exc_type, exc_val, exc_tb) + + def write_batch(self, vectors: np.ndarray) -> None: + """ + Write a batch of vectors. + + Args: + vectors: NumPy array of vectors + """ + import numpy as np # noqa: PLC0415 + + if not isinstance(vectors, np.ndarray): + raise TypeError("vectors must be a numpy array") + + # Track metadata + if self.dimension is None: + self.dimension = vectors.shape[1] if len(vectors.shape) > 1 else 1 + self.vector_count += len(vectors) + + # Write as bytes + self._compressor.write(vectors.tobytes()) + + def close(self) -> dict: + """ + Close compressor and return metadata. + + Returns: + Dictionary with metadata (count, dimension, dtype, method) + """ + self._compressor.__exit__(None, None, None) + return { + "count": self.vector_count, + "dimension": self.dimension, + "dtype": self.dtype, + "method": self.method, + "file_path": self.file_path, + } diff --git a/src/db/common/rocbsdb_context.cc b/src/db/common/rocbsdb_context.cc index 790456ab..9d38fcac 100644 --- a/src/db/common/rocbsdb_context.cc +++ b/src/db/common/rocbsdb_context.cc @@ -13,6 +13,8 @@ // limitations under the License. +#include +#include #include #include #include @@ -276,7 +278,46 @@ void RocksdbContext::prepare_options( // Optimize for level-based compaction style with default setting create_opts_.OptimizeLevelStyleCompaction(); - // TODO: enable compression? + // Enable compression for storage efficiency + // Prefer ZSTD, fall back to LZ4, then Snappy, then none + auto supported = rocksdb::GetSupportedCompressions(); + auto has = [&](rocksdb::CompressionType t) { + return std::find(supported.begin(), supported.end(), t) != supported.end(); + }; + + bool has_zstd = has(rocksdb::CompressionType::kZSTD); + bool has_lz4 = has(rocksdb::CompressionType::kLZ4Compression); + bool has_snappy = has(rocksdb::CompressionType::kSnappyCompression); + + // Pick the best available default compression + if (has_zstd) { + create_opts_.compression = rocksdb::CompressionType::kZSTD; + } else if (has_lz4) { + create_opts_.compression = rocksdb::CompressionType::kLZ4Compression; + } else if (has_snappy) { + create_opts_.compression = rocksdb::CompressionType::kSnappyCompression; + } else { + create_opts_.compression = rocksdb::CompressionType::kNoCompression; + } + + // Per-level compression: fast codec for L1-2, best codec for L3-6 + auto fast_codec = has_lz4 ? rocksdb::CompressionType::kLZ4Compression + : has_snappy ? rocksdb::CompressionType::kSnappyCompression + : rocksdb::CompressionType::kNoCompression; + auto best_codec = has_zstd ? rocksdb::CompressionType::kZSTD + : has_lz4 ? rocksdb::CompressionType::kLZ4Compression + : has_snappy ? rocksdb::CompressionType::kSnappyCompression + : rocksdb::CompressionType::kNoCompression; + + create_opts_.compression_per_level = { + rocksdb::CompressionType::kNoCompression, // Level 0 (memtable flush) + fast_codec, // Level 1 + fast_codec, // Level 2 + best_codec, // Level 3 + best_codec, // Level 4 + best_codec, // Level 5 + best_codec, // Level 6 + }; // Setting this to 1 means that when a memtable is full, it will be flushed // to disk immediately rather than being merged with other memtables