From 43d126b2d9107b106e615d35fd3141704a96f3d1 Mon Sep 17 00:00:00 2001 From: idevasena Date: Tue, 3 Jun 2025 08:18:57 -0700 Subject: [PATCH 1/6] initial pgvector test cases --- vdbbench/pgvector/pgvector_bench.py | 170 ++++++++++++++++++++++++++++ vdbbench/pgvector/setup.md | 37 ++++++ 2 files changed, 207 insertions(+) create mode 100644 vdbbench/pgvector/pgvector_bench.py create mode 100644 vdbbench/pgvector/setup.md diff --git a/vdbbench/pgvector/pgvector_bench.py b/vdbbench/pgvector/pgvector_bench.py new file mode 100644 index 0000000..96fad6d --- /dev/null +++ b/vdbbench/pgvector/pgvector_bench.py @@ -0,0 +1,170 @@ +import psycopg2 +import numpy as np +from psycopg2.extensions import register_adapter, AsIs +from time import time +from itertools import islice + +# Configuration settings +TOTAL_INSERT = 10_000_000 # Total initial insert data volume +UPDATE_SAMPLE = 5_000_000 # Number of data to update in Scenario 1 +DELETE_INSERT_SAMPLE = 5_000_000 # Number of delete and reinsert in Scenario 2 +BATCH_SIZE = 10000 # Batch size (adjust according to device) +DB_CONFIG = { + 'host': 'localhost', + 'port': 5432, + 'dbname': 'testdb', + 'user': 'postgres', + 'password': 'postgres' +} + +def register_vector_adapter(): + """Register numpy array/vector adapters""" + def adapt_array_to_vector(vec): + # Use square brackets instead of curly braces + return AsIs(f"'[{','.join(map(repr, vec))}]'") + # Register for both numpy arrays and regular lists + register_adapter(np.ndarray, adapt_array_to_vector) + register_adapter(list, adapt_array_to_vector) + +def create_table_and_index(conn): + """Create vector table and DISKANN index""" + with conn.cursor() as cur: + cur.execute("CREATE EXTENSION IF NOT EXISTS vector;") + cur.execute("DROP TABLE IF EXISTS vectors CASCADE;") + cur.execute( + "CREATE TABLE vectors (id SERIAL PRIMARY KEY, embedding vector(128));" + ) + cur.execute( + "CREATE INDEX vectors_index ON vectors " + "USING diskann (embedding vector_cosine_ops) WITH (lists = 1000);" + ) + conn.commit() + +def chunked_batches(iterable, size): + """Yield items in chunks (memory optimization)""" + it = iter(iterable) + while True: + chunk = list(islice(it, size)) + if not chunk: + return + yield chunk + +def generate_random_vector(): + """Generates random 1536-dimensional vector""" + return np.random.rand(1536).tolist() + +def insert_update_test(conn): + """Scenario 1: Insert 10M then update 5M entries""" + with conn.cursor() as cur: + print("++ Scenario 1: Inserting initial data ++") + start = time() + # Insert initial data in batches + for i in range(0, TOTAL_INSERT, BATCH_SIZE): + vec_batch = [ + generate_random_vector() + for _ in range(min(BATCH_SIZE, TOTAL_INSERT - i)) + ] + cur.executemany( + "INSERT INTO vectors (embedding) VALUES (%s)", + [(vec,) for vec in vec_batch] + ) + conn.commit() + insert_time = time() - start + print(f"Initial insert took: {insert_time:.2f}s") + # Prepare random update IDs (select 5M from existing data) + cur.execute("SELECT id FROM vectors") + all_ids = [row[0] for row in cur.fetchall()] + update_ids = np.random.choice(all_ids, UPDATE_SAMPLE, replace=False).tolist() + print(f"++ Starting 5M updates ++") + start_update = time() + for batch_ids in chunked_batches(update_ids, BATCH_SIZE): + new_vectors = [generate_random_vector() for _ in batch_ids] + params = [(vec, vec_id) for vec, vec_id in zip(new_vectors, batch_ids)] + cur.executemany( + "UPDATE vectors SET embedding = %s WHERE id = %s", + params + ) + conn.commit() + update_time = time() - start_update + print(f"Update completed: {update_time:.2f}s. Total time: {(update_time + insert_time)/60:.2f} minutes") + return { + 'scenario1_total': insert_time + update_time + } + +def insert_delete_reinsert_test(conn): + """Scenario 2: Insert 10M → Delete 5M → Reinsert 5M""" + with conn.cursor() as cur: + print("++ Scenario 2: Inserting initial data ++") + start = time() + # Insert initial data + for i in range(0, TOTAL_INSERT, BATCH_SIZE): + vec_batch = [ + generate_random_vector() + for _ in range(min(BATCH_SIZE, TOTAL_INSERT - i)) + ] + cur.executemany( + "INSERT INTO vectors (embedding) VALUES (%s)", + [(vec,) for vec in vec_batch] + ) + conn.commit() + insert_time = time() - start + print(f"Initial insert took: {insert_time:.2f}s") + # Prepare IDs to delete (select 5M) + cur.execute("SELECT id FROM vectors") + all_ids = [row[0] for row in cur.fetchall()] + delete_ids = np.random.choice(all_ids, DELETE_INSERT_SAMPLE, replace=False).tolist() + # Batch delete (increasing batch size for large deletions) + print("++ Deleting 5M entries ++") + del_start = time() + for batch in chunked_batches(delete_ids, 50000): + batch_str = ",".join(map(str, batch)) + cur.execute(f"DELETE FROM vectors WHERE id IN ({batch_str})") + conn.commit() + delete_time = time() - del_start + print(f"Delete completed: {delete_time:.2f}s") + # Reinsert 5M new entries + print("++ Reinserting 5M entries ++") + reinsert_start = time() + for _ in range(DELETE_INSERT_SAMPLE // BATCH_SIZE): + vec_batch = [generate_random_vector() for _ in range(BATCH_SIZE)] + cur.executemany( + "INSERT INTO vectors (embedding) VALUES (%s)", + [(vec,) for vec in vec_batch] + ) + conn.commit() + # Process the remainder + remainder = DELETE_INSERT_SAMPLE % BATCH_SIZE + if remainder: + vec_batch = [generate_random_vector() for _ in range(remainder)] + cur.executemany( + "INSERT INTO vectors (embedding) VALUES (%s)", + [(vec,) for vec in vec_batch] + ) + conn.commit() + reinsert_time = time() - reinsert_start + print(f"Reinsert completed: {reinsert_time:.2f}s. Total time: {(delete_time + reinsert_time + insert_time)/60:.2f} minutes") + return { + 'scenario2_total': insert_time + delete_time + reinsert_time + } + +def main(): + # Register vector adapters + register_vector_adapter() + results = {} + print("\n=== Scenario 1 begins ===") + with psycopg2.connect(**DB_CONFIG) as conn: + create_table_and_index(conn) + results['scenario1'] = insert_update_test(conn) + + print("\n=== Scenario 2 begins ===") + with psycopg2.connect(**DB_CONFIG) as conn: + create_table_and_index(conn) + results['scenario2'] = insert_delete_reinsert_test(conn) + + print("\nPerformance comparison:") + print(f"Scenario 1 total time: {results['scenario1']['scenario1_total']/60:.2f} minutes") + print(f"Scenario 2 total time: {results['scenario2']['scenario2_total']/60:.2f} minutes") + print(f"Scenario 2 is {(results['scenario1']['scenario1_total']/results['scenario2']['scenario2_total']*100):.1f}% faster than Scenario 1") + +if __name__ == "__main__": + main() diff --git a/vdbbench/pgvector/setup.md b/vdbbench/pgvector/setup.md new file mode 100644 index 0000000..b6f97d1 --- /dev/null +++ b/vdbbench/pgvector/setup.md @@ -0,0 +1,37 @@ +### Install postgresql-17.0 from source code + +#### Get from source +The PostgreSQL source code for released versions can be obtained from website: https://www.postgresql.org/ftp/source/. Download the postgresql-17.0.tar.gz , then unpack it: + +``` +tar xf postgresql-17.0.tar.gz +``` + +This will create a directory postgresql-17.0 under the current directory with the PostgreSQL sources. Change into that directory for the rest of the installation procedure. + +#### Building and Installation with Autoconf and Make + +``` +cd postgresql-17.0 + +./configure + +make + +su + +make install +``` + +#### Install pgvector from source code + +``` +git clone https://github.com/pgvector/pgvector.git -b v0.8.0 + +cd pgvector + +make PG_CONFIG=/usr/local/pgsql/bin/pg_config + +make PG_CONFIG=/usr/local/pgsql/bin/pg_config install + +``` From 45e7f963d99686144421dd9a879dbb0ffc110ee9 Mon Sep 17 00:00:00 2001 From: idevasena Date: Tue, 9 Sep 2025 08:54:25 -0700 Subject: [PATCH 2/6] single node milvus cluster yaml --- milvus-cluster.yml | 251 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 251 insertions(+) create mode 100644 milvus-cluster.yml diff --git a/milvus-cluster.yml b/milvus-cluster.yml new file mode 100644 index 0000000..064fa3e --- /dev/null +++ b/milvus-cluster.yml @@ -0,0 +1,251 @@ +version: '3.5' + +services: + etcd: + container_name: milvus-etcd + image: quay.io/coreos/etcd:v3.6.4 + environment: + - ETCD_AUTO_COMPACTION_MODE=revision + - ETCD_AUTO_COMPACTION_RETENTION=1000 + - ETCD_QUOTA_BACKEND_BYTES=4294967296 + - ETCD_SNAPSHOT_COUNT=50000 + volumes: + - /mnt/vdb/etcd:/etcd + command: etcd -advertise-client-urls=http://0.0.0.0:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd + ports: + - "2379:2379" + healthcheck: + test: ["CMD", "etcdctl", "endpoint", "health"] + interval: 30s + timeout: 20s + retries: 3 + + minio: + container_name: milvus-minio + image: minio/minio:RELEASE.2024-12-18T13-15-44Z + environment: + MINIO_ACCESS_KEY: minioadmin + MINIO_SECRET_KEY: minioadmin + ports: + - "9001:9001" + - "9000:9000" + volumes: + - /mnt/vdb/minio:/minio_data + command: minio server /minio_data --console-address ":9001" + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"] + interval: 30s + timeout: 20s + retries: 3 + + rootcoord: + container_name: milvus-rootcoord + image: milvusdb/milvus:v2.6.0 + command: ["milvus", "run", "rootcoord"] + environment: + ETCD_ENDPOINTS: etcd:2379 + MINIO_ADDRESS: minio:9000 + MINIO_REGION: us-east-1 + volumes: + - /mnt/vdb/milvus:/var/lib/milvus + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:53100/healthz"] + interval: 30s + start_period: 90s + timeout: 20s + retries: 3 + ports: + - "53100:53100" + depends_on: + - "etcd" + - "minio" + + proxy: + container_name: milvus-proxy + image: milvusdb/milvus:v2.6.0 + command: ["milvus", "run", "proxy"] + environment: + ETCD_ENDPOINTS: etcd:2379 + MINIO_ADDRESS: minio:9000 + MINIO_REGION: us-east-1 + volumes: + - /mnt/vdb/milvus:/var/lib/milvus + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9091/healthz"] + interval: 30s + start_period: 90s + timeout: 20s + retries: 3 + ports: + - "19530:19530" + - "9091:9091" + depends_on: + - "etcd" + - "minio" + - "rootcoord" + - "querycoord" + - "datacoord" + - "indexcoord" + + querycoord: + container_name: milvus-querycoord + image: milvusdb/milvus:v2.6.0 + command: ["milvus", "run", "querycoord"] + environment: + ETCD_ENDPOINTS: etcd:2379 + MINIO_ADDRESS: minio:9000 + MINIO_REGION: us-east-1 + volumes: + - /mnt/vdb/milvus:/var/lib/milvus + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:19531/healthz"] + interval: 30s + start_period: 90s + timeout: 20s + retries: 3 + ports: + - "19531:19531" + depends_on: + - "etcd" + - "minio" + - "rootcoord" + + querynode1: + container_name: milvus-querynode1 + image: milvusdb/milvus:v2.6.0 + command: ["milvus", "run", "querynode"] + environment: + ETCD_ENDPOINTS: etcd:2379 + MINIO_ADDRESS: minio:9000 + MINIO_REGION: us-east-1 + volumes: + - /mnt/vdb/milvus:/var/lib/milvus + depends_on: + - "etcd" + - "minio" + - "querycoord" + + querynode2: + container_name: milvus-querynode2 + image: milvusdb/milvus:v2.6.0 + command: ["milvus", "run", "querynode"] + environment: + ETCD_ENDPOINTS: etcd:2379 + MINIO_ADDRESS: minio:9000 + MINIO_REGION: us-east-1 + volumes: + - /mnt/vdb/milvus:/var/lib/milvus + depends_on: + - "etcd" + - "minio" + - "querycoord" + + datacoord: + container_name: milvus-datacoord + image: milvusdb/milvus:v2.6.0 + command: ["milvus", "run", "datacoord"] + environment: + ETCD_ENDPOINTS: etcd:2379 + MINIO_ADDRESS: minio:9000 + MINIO_REGION: us-east-1 + volumes: + - /mnt/vdb/milvus:/var/lib/milvus + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:13333/healthz"] + interval: 30s + start_period: 90s + timeout: 20s + retries: 3 + ports: + - "13333:13333" + depends_on: + - "etcd" + - "minio" + - "rootcoord" + + datanode1: + container_name: milvus-datanode1 + image: milvusdb/milvus:v2.6.0 + command: ["milvus", "run", "datanode"] + environment: + ETCD_ENDPOINTS: etcd:2379 + MINIO_ADDRESS: minio:9000 + MINIO_REGION: us-east-1 + volumes: + - /mnt/vdb/milvus:/var/lib/milvus + depends_on: + - "etcd" + - "minio" + - "datacoord" + + datanode2: + container_name: milvus-datanode2 + image: milvusdb/milvus:v2.6.0 + command: ["milvus", "run", "datanode"] + environment: + ETCD_ENDPOINTS: etcd:2379 + MINIO_ADDRESS: minio:9000 + MINIO_REGION: us-east-1 + volumes: + - /mnt/vdb/milvus:/var/lib/milvus + depends_on: + - "etcd" + - "minio" + - "datacoord" + + indexcoord: + container_name: milvus-indexcoord + image: milvusdb/milvus:v2.6.0 + command: ["milvus", "run", "indexcoord"] + environment: + ETCD_ENDPOINTS: etcd:2379 + MINIO_ADDRESS: minio:9000 + MINIO_REGION: us-east-1 + volumes: + - /mnt/vdb/milvus:/var/lib/milvus + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:31000/healthz"] + interval: 30s + start_period: 90s + timeout: 20s + retries: 3 + ports: + - "31000:31000" + depends_on: + - "etcd" + - "minio" + - "rootcoord" + + indexnode1: + container_name: milvus-indexnode1 + image: milvusdb/milvus:v2.6.0 + command: ["milvus", "run", "indexnode"] + environment: + ETCD_ENDPOINTS: etcd:2379 + MINIO_ADDRESS: minio:9000 + MINIO_REGION: us-east-1 + volumes: + - /mnt/vdb/milvus:/var/lib/milvus + depends_on: + - "etcd" + - "minio" + - "indexcoord" + + indexnode2: + container_name: milvus-indexnode2 + image: milvusdb/milvus:v2.6.0 + command: ["milvus", "run", "indexnode"] + environment: + ETCD_ENDPOINTS: etcd:2379 + MINIO_ADDRESS: minio:9000 + MINIO_REGION: us-east-1 + volumes: + - /mnt/vdb/milvus:/var/lib/milvus + depends_on: + - "etcd" + - "minio" + - "indexcoord" + +networks: + default: + name: milvus From c6f9003213f4cc709db18e8048053272f6505818 Mon Sep 17 00:00:00 2001 From: idevasena Date: Fri, 12 Sep 2025 07:53:24 -0700 Subject: [PATCH 3/6] added recall stats calc between search results and ground truth results --- vdbbench/simple_bench.py | 131 ++++++- vdbbench/simple_bench_bkp.py | 668 +++++++++++++++++++++++++++++++++++ 2 files changed, 795 insertions(+), 4 deletions(-) create mode 100644 vdbbench/simple_bench_bkp.py diff --git a/vdbbench/simple_bench.py b/vdbbench/simple_bench.py index b679cd1..ac7fc6a 100644 --- a/vdbbench/simple_bench.py +++ b/vdbbench/simple_bench.py @@ -43,7 +43,10 @@ "batch_size", "batch_time_seconds", "avg_query_time_seconds", - "success" + "success", + "recall_at_10", + "recall_at_5", + "recall_at_1" ] @@ -124,6 +127,32 @@ def generate_random_vector(dim: int) -> List[float]: return (vec / np.linalg.norm(vec)).tolist() +def calculate_recall(search_results, ground_truth_results, k: int) -> float: + """ + Calculate recall@k between search results and ground truth results + + Args: + search_results: List of search result IDs from the approximate search + ground_truth_results: List of ground truth IDs from the exact search + k: Number of top results to consider + + Returns: + Recall@k as a float between 0 and 1 + """ + if not search_results or not ground_truth_results: + return 0.0 + + # Take only top k results from both sets + search_k = set(search_results[:k]) + ground_truth_k = set(ground_truth_results[:k]) + + # Calculate intersection + intersection = search_k.intersection(ground_truth_k) + + # Recall@k = |intersection| / |ground_truth_k| + return len(intersection) / len(ground_truth_k) if len(ground_truth_k) > 0 else 0.0 + + def connect_to_milvus(host: str, port: str) -> connections: """Establish connection to Milvus server""" try: @@ -216,12 +245,49 @@ def execute_batch_queries(process_id: int, host: str, port: str, collection_name limit=10, output_fields=["id"] ) + + # Calculate recall by performing brute-force search as ground truth + # Note: This is computationally expensive and should be used carefully + ground_truth_results = collection.search( + data=batch_vectors, + anns_field="vector", + param={"metric_type": "COSINE", "params": {"nprobe": -1}}, # Brute force search + limit=10, + output_fields=["id"] + ) + batch_end = time.time() batch_success = True + + # Calculate recall metrics for the batch + batch_recall_at_10 = [] + batch_recall_at_5 = [] + batch_recall_at_1 = [] + + for i, (search_result, ground_truth_result) in enumerate(zip(results, ground_truth_results)): + search_ids = [hit.id for hit in search_result] + ground_truth_ids = [hit.id for hit in ground_truth_result] + + recall_at_10 = calculate_recall(search_ids, ground_truth_ids, 10) + recall_at_5 = calculate_recall(search_ids, ground_truth_ids, 5) + recall_at_1 = calculate_recall(search_ids, ground_truth_ids, 1) + + batch_recall_at_10.append(recall_at_10) + batch_recall_at_5.append(recall_at_5) + batch_recall_at_1.append(recall_at_1) + + # Average recall across the batch + avg_recall_at_10 = np.mean(batch_recall_at_10) if batch_recall_at_10 else 0.0 + avg_recall_at_5 = np.mean(batch_recall_at_5) if batch_recall_at_5 else 0.0 + avg_recall_at_1 = np.mean(batch_recall_at_1) if batch_recall_at_1 else 0.0 + except Exception as e: print(f"Process {process_id}: Search error: {e}") batch_end = time.time() batch_success = False + avg_recall_at_10 = 0.0 + avg_recall_at_5 = 0.0 + avg_recall_at_1 = 0.0 # Record batch results batch_time = batch_end - batch_start @@ -236,7 +302,10 @@ def execute_batch_queries(process_id: int, host: str, port: str, collection_name "batch_size": batch_size, "batch_time_seconds": batch_time, "avg_query_time_seconds": batch_time / batch_size, - "success": batch_success + "success": batch_success, + "recall_at_10": avg_recall_at_10, + "recall_at_5": avg_recall_at_5, + "recall_at_1": avg_recall_at_1 } writer.writerow(batch_data) @@ -307,6 +376,22 @@ def calculate_statistics(results_dir: str) -> Dict[str, Union[str, int, float, D batch_times = np.array(batch_times_ms) total_queries = len(latencies) + # Calculate recall statistics + recall_at_10_values = [] + recall_at_5_values = [] + recall_at_1_values = [] + + for _, row in all_data.iterrows(): + if row['success'] and 'recall_at_10' in row: + # Replicate recall values for each query in the batch + recall_at_10_values.extend([row['recall_at_10']] * row['batch_size']) + recall_at_5_values.extend([row['recall_at_5']] * row['batch_size']) + recall_at_1_values.extend([row['recall_at_1']] * row['batch_size']) + + recall_at_10_array = np.array(recall_at_10_values) if recall_at_10_values else np.array([0]) + recall_at_5_array = np.array(recall_at_5_values) if recall_at_5_values else np.array([0]) + recall_at_1_array = np.array(recall_at_1_values) if recall_at_1_values else np.array([0]) + stats = { "total_queries": total_queries, "total_time_seconds": total_time_seconds, @@ -329,7 +414,23 @@ def calculate_statistics(results_dir: str) -> Dict[str, Union[str, int, float, D "p95_batch_time_ms": float(np.percentile(batch_times, 95)) if len(batch_times) > 0 else 0, "p99_batch_time_ms": float(np.percentile(batch_times, 99)) if len(batch_times) > 0 else 0, "p999_batch_time_ms": float(np.percentile(batch_times, 99.9)) if len(batch_times) > 0 else 0, - "p9999_batch_time_ms": float(np.percentile(batch_times, 99.99)) if len(batch_times) > 0 else 0 + "p9999_batch_time_ms": float(np.percentile(batch_times, 99.99)) if len(batch_times) > 0 else 0, + + # Recall statistics + "mean_recall_at_10": float(np.mean(recall_at_10_array)), + "median_recall_at_10": float(np.median(recall_at_10_array)), + "min_recall_at_10": float(np.min(recall_at_10_array)), + "max_recall_at_10": float(np.max(recall_at_10_array)), + + "mean_recall_at_5": float(np.mean(recall_at_5_array)), + "median_recall_at_5": float(np.median(recall_at_5_array)), + "min_recall_at_5": float(np.min(recall_at_5_array)), + "max_recall_at_5": float(np.max(recall_at_5_array)), + + "mean_recall_at_1": float(np.mean(recall_at_1_array)), + "median_recall_at_1": float(np.median(recall_at_1_array)), + "min_recall_at_1": float(np.min(recall_at_1_array)), + "max_recall_at_1": float(np.max(recall_at_1_array)) } return stats @@ -638,6 +739,28 @@ def main(): print(f"Max Batch Time: {stats.get('max_batch_time_ms', 0):.2f} ms") print(f"Batch Throughput: {1000 / stats.get('mean_batch_time_ms', float('inf')):.2f} batches/second") + # Print recall statistics + print("\nRECALL STATISTICS") + print("-" * 50) + + print(f"Recall@10:") + print(f" Mean: {stats.get('mean_recall_at_10', 0):.4f}") + print(f" Median: {stats.get('median_recall_at_10', 0):.4f}") + print(f" Min: {stats.get('min_recall_at_10', 0):.4f}") + print(f" Max: {stats.get('max_recall_at_10', 0):.4f}") + + print(f"Recall@5:") + print(f" Mean: {stats.get('mean_recall_at_5', 0):.4f}") + print(f" Median: {stats.get('median_recall_at_5', 0):.4f}") + print(f" Min: {stats.get('min_recall_at_5', 0):.4f}") + print(f" Max: {stats.get('max_recall_at_5', 0):.4f}") + + print(f"Recall@1:") + print(f" Mean: {stats.get('mean_recall_at_1', 0):.4f}") + print(f" Median: {stats.get('median_recall_at_1', 0):.4f}") + print(f" Min: {stats.get('min_recall_at_1', 0):.4f}") + print(f" Max: {stats.get('max_recall_at_1', 0):.4f}") + # Print disk I/O statistics print("\nDISK I/O DURING BENCHMARK") print("-" * 50) @@ -665,4 +788,4 @@ def main(): if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/vdbbench/simple_bench_bkp.py b/vdbbench/simple_bench_bkp.py new file mode 100644 index 0000000..b679cd1 --- /dev/null +++ b/vdbbench/simple_bench_bkp.py @@ -0,0 +1,668 @@ +#!/usr/bin/env python3 +""" +Milvus Vector Database Benchmark Script + +This script executes random vector queries against a Milvus collection using multiple processes. +It measures and reports query latency statistics. +""" + +import argparse +import multiprocessing as mp +import numpy as np +import os +import time +import json +import csv +import uuid +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Any, Optional, Tuple, Union +import signal +import sys +from tabulate import tabulate + +from vdbbench.config_loader import load_config, merge_config_with_args +from vdbbench.list_collections import get_collection_info + +try: + from pymilvus import connections, Collection, utility +except ImportError: + print("Error: pymilvus package not found. Please install it with 'pip install pymilvus'") + sys.exit(1) + +STAGGER_INTERVAL_SEC = 0.1 + +# Global flag for graceful shutdown +shutdown_flag = mp.Value('i', 0) + +# CSV header fields +csv_fields = [ + "process_id", + "batch_id", + "timestamp", + "batch_size", + "batch_time_seconds", + "avg_query_time_seconds", + "success" +] + + +def signal_handler(sig, frame): + """Handle interrupt signals to gracefully shut down worker processes""" + print("\nReceived interrupt signal. Shutting down workers gracefully...") + with shutdown_flag.get_lock(): + shutdown_flag.value = 1 + + +def read_disk_stats() -> Dict[str, Dict[str, int]]: + """ + Read disk I/O statistics from /proc/diskstats + + Returns: + Dictionary mapping device names to their read/write statistics + """ + stats = {} + try: + with open('/proc/diskstats', 'r') as f: + for line in f: + parts = line.strip().split() + if len(parts) >= 14: # Ensure we have enough fields + device = parts[2] + # Fields based on kernel documentation + # https://www.kernel.org/doc/Documentation/ABI/testing/procfs-diskstats + sectors_read = int(parts[5]) # sectors read + sectors_written = int(parts[9]) # sectors written + + # 1 sector = 512 bytes + bytes_read = sectors_read * 512 + bytes_written = sectors_written * 512 + + stats[device] = { + "bytes_read": bytes_read, + "bytes_written": bytes_written + } + return stats + except FileNotFoundError: + print("Warning: /proc/diskstats not available (non-Linux system)") + return {} + except Exception as e: + print(f"Error reading disk stats: {e}") + return {} + + +def format_bytes(bytes_value: int) -> str: + """Format bytes into human-readable format with appropriate units""" + units = ['B', 'KB', 'MB', 'GB', 'TB'] + unit_index = 0 + value = float(bytes_value) + + while value > 1024 and unit_index < len(units) - 1: + value /= 1024 + unit_index += 1 + + return f"{value:.2f} {units[unit_index]}" + + +def calculate_disk_io_diff(start_stats: Dict[str, Dict[str, int]], + end_stats: Dict[str, Dict[str, int]]) -> Dict[str, Dict[str, int]]: + """Calculate the difference in disk I/O between start and end measurements""" + diff_stats = {} + + for device in end_stats: + if device in start_stats: + diff_stats[device] = { + "bytes_read": end_stats[device]["bytes_read"] - start_stats[device]["bytes_read"], + "bytes_written": end_stats[device]["bytes_written"] - start_stats[device]["bytes_written"] + } + + return diff_stats + + +def generate_random_vector(dim: int) -> List[float]: + """Generate a random normalized vector of the specified dimension""" + vec = np.random.random(dim).astype(np.float32) + return (vec / np.linalg.norm(vec)).tolist() + + +def connect_to_milvus(host: str, port: str) -> connections: + """Establish connection to Milvus server""" + try: + connections.connect(alias="default", host=host, port=port) + return connections + except Exception as e: + print(f"Failed to connect to Milvus: {e}") + return False + + +def execute_batch_queries(process_id: int, host: str, port: str, collection_name: str, vector_dim: int, batch_size: int, + report_count: int, max_queries: Optional[int], runtime_seconds: Optional[int], output_dir: str, + shutdown_flag: mp.Value) -> None: + """ + Execute batches of vector queries and log results to disk + + Args: + process_id: ID of the current process + host: Milvus server host + port: Milvus server port + collection_name: Name of the collection to query + vector_dim: Dimension of vectors + batch_size: Number of queries to execute in each batch + max_queries: Maximum number of queries to execute (None for unlimited) + runtime_seconds: Maximum runtime in seconds (None for unlimited) + output_dir: Directory to save results + shutdown_flag: Shared value to signal process termination + """ + print(f'Process {process_id} initialized') + # Connect to Milvus + connections = connect_to_milvus(host, port) + if not connections: + print(f'Process {process_id} - No milvus connection') + return + + # Get collection + try: + collection = Collection(collection_name) + print(f'Process {process_id} - Loading collection') + collection.load() + except Exception as e: + print(f"Process {process_id}: Failed to load collection: {e}") + return + + # Prepare output file + output_file = Path(output_dir) / f"milvus_benchmark_p{process_id}.csv" + sys.stdout.write(f"Process {process_id}: Writing results to {output_file}\r\n") + # Create output directory if it doesn't exist + os.makedirs(os.path.dirname(output_file), exist_ok=True) + + # Track execution + start_time = time.time() + query_count = 0 + batch_count = 0 + + sys.stdout.write(f"Process {process_id}: Starting benchmark ...\r\n") + sys.stdout.flush() + + try: + with open(output_file, 'w') as f: + writer = csv.DictWriter(f, fieldnames=csv_fields) + writer.writeheader() + while True: + # Check if we should terminate + with shutdown_flag.get_lock(): + if shutdown_flag.value == 1: + break + + # Check termination conditions + current_time = time.time() + elapsed_time = current_time - start_time + + if runtime_seconds is not None and elapsed_time >= runtime_seconds: + break + + if max_queries is not None and query_count >= max_queries: + break + + # Generate batch of query vectors + batch_vectors = [generate_random_vector(vector_dim) for _ in range(batch_size)] + + # Execute batch and measure time + batch_start = time.time() + try: + search_params = {"metric_type": "COSINE", "params": {"ef": 200}} + results = collection.search( + data=batch_vectors, + anns_field="vector", + param=search_params, + limit=10, + output_fields=["id"] + ) + batch_end = time.time() + batch_success = True + except Exception as e: + print(f"Process {process_id}: Search error: {e}") + batch_end = time.time() + batch_success = False + + # Record batch results + batch_time = batch_end - batch_start + batch_count += 1 + query_count += batch_size + + # Log batch results to file + batch_data = { + "process_id": process_id, + "batch_id": batch_count, + "timestamp": current_time, + "batch_size": batch_size, + "batch_time_seconds": batch_time, + "avg_query_time_seconds": batch_time / batch_size, + "success": batch_success + } + + writer.writerow(batch_data) + f.flush() # Ensure data is written to disk immediately + + # Print progress + if batch_count % report_count == 0: + sys.stdout.write(f"Process {process_id}: Completed {query_count} queries in {elapsed_time:.2f} seconds.\r\n") + sys.stdout.flush() + + except Exception as e: + print(f"Process {process_id}: Error during benchmark: {e}") + + finally: + # Disconnect from Milvus + try: + connections.disconnect("default") + except: + pass + + print( + f"Process {process_id}: Finished. Executed {query_count} queries in {time.time() - start_time:.2f} seconds", flush=True) + + +def calculate_statistics(results_dir: str) -> Dict[str, Union[str, int, float, Dict[str, int]]]: + """Calculate statistics from benchmark results""" + import pandas as pd + + # Find all result files + file_paths = list(Path(results_dir).glob("milvus_benchmark_p*.csv")) + + if not file_paths: + return {"error": "No benchmark result files found"} + + # Read and concatenate all CSV files into a single DataFrame + dfs = [] + for file_path in file_paths: + try: + df = pd.read_csv(file_path) + if not df.empty: + dfs.append(df) + except Exception as e: + print(f"Error reading result file {file_path}: {e}") + + if not dfs: + return {"error": "No valid data found in benchmark result files"} + + # Concatenate all dataframes + all_data = pd.concat(dfs, ignore_index=True) + all_data.sort_values('timestamp', inplace=True) + + # Calculate start and end times + file_start_time = min(all_data['timestamp']) + file_end_time = max(all_data['timestamp'] + all_data['batch_time_seconds']) + total_time_seconds = file_end_time - file_start_time + + # Each row represents a batch, so we need to expand based on batch_size + all_latencies = [] + for _, row in all_data.iterrows(): + query_time_ms = row['avg_query_time_seconds'] * 1000 + all_latencies.extend([query_time_ms] * row['batch_size']) + + # Convert batch times to milliseconds + batch_times_ms = all_data['batch_time_seconds'] * 1000 + + # Calculate statistics + latencies = np.array(all_latencies) + batch_times = np.array(batch_times_ms) + total_queries = len(latencies) + + stats = { + "total_queries": total_queries, + "total_time_seconds": total_time_seconds, + "min_latency_ms": float(np.min(latencies)), + "max_latency_ms": float(np.max(latencies)), + "mean_latency_ms": float(np.mean(latencies)), + "median_latency_ms": float(np.median(latencies)), + "p95_latency_ms": float(np.percentile(latencies, 95)), + "p99_latency_ms": float(np.percentile(latencies, 99)), + "p999_latency_ms": float(np.percentile(latencies, 99.9)), + "p9999_latency_ms": float(np.percentile(latencies, 99.99)), + "throughput_qps": float(total_queries / total_time_seconds) if total_time_seconds > 0 else 0, + + # Batch time statistics + "batch_count": len(batch_times), + "min_batch_time_ms": float(np.min(batch_times)) if len(batch_times) > 0 else 0, + "max_batch_time_ms": float(np.max(batch_times)) if len(batch_times) > 0 else 0, + "mean_batch_time_ms": float(np.mean(batch_times)) if len(batch_times) > 0 else 0, + "median_batch_time_ms": float(np.median(batch_times)) if len(batch_times) > 0 else 0, + "p95_batch_time_ms": float(np.percentile(batch_times, 95)) if len(batch_times) > 0 else 0, + "p99_batch_time_ms": float(np.percentile(batch_times, 99)) if len(batch_times) > 0 else 0, + "p999_batch_time_ms": float(np.percentile(batch_times, 99.9)) if len(batch_times) > 0 else 0, + "p9999_batch_time_ms": float(np.percentile(batch_times, 99.99)) if len(batch_times) > 0 else 0 + } + + return stats + + +def load_database(host: str, port: str, collection_name: str, reload=False) -> Union[dict, None]: + print(f'Connecting to Milvus server at {host}:{port}...', flush=True) + connections = connect_to_milvus(host, port) + if not connections: + print(f'Unable to connect to Milvus server', flush=True) + return None + + # Connect to Milvus + try: + collection = Collection(collection_name) + except Exception as e: + print(f"Unable to connect to Milvus collection {collection_name}: {e}", flush=True) + return None + + try: + # Get the load state of the collection: + state = utility.load_state(collection_name) + if reload or state.name != "Loaded": + if reload: + print(f'Reloading the collection {collection_name}...') + else: + print(f'Loading the collection {collection_name}...') + start_load_time = time.time() + collection.load() + load_time = time.time() - start_load_time + print(f'Collection {collection_name} loaded in {load_time:.2f} seconds', flush=True) + if not reload and state.name == "Loaded": + print(f'Collection {collection_name} already reloaded and not reloading...') + + except Exception as e: + print(f'Unable to load collection {collection_name}: {e}') + return None + + print(f'Getting collection statistics...', flush=True) + collection_info = get_collection_info(collection_name, release=False) + table_data = [] + + index_types = ", ".join([idx.get("index_type", "N/A") for idx in collection_info.get("index_info", [])]) + metric_types = ", ".join([idx.get("metric_type", "N/A") for idx in collection_info.get("index_info", [])]) + + row = [ + collection_info["name"], + collection_info.get("row_count", "N/A"), + collection_info.get("dimension", "N/A"), + index_types, + metric_types, + len(collection_info.get("partitions", [])) + ] + table_data.append(row) + + headers = ["Collection Name", "Vector Count", "Dimension", "Index Types", "Metric Types", "Partitions"] + print(f'\nTabulating information...', flush=True) + tabulated_data = tabulate(table_data, headers=headers, tablefmt="grid") + print(tabulated_data, flush=True) + + return collection_info + + +def main(): + parser = argparse.ArgumentParser(description="Milvus Vector Database Benchmark") + + parser.add_argument("--config", type=str, help="Path to vdbbench config file") + + # Required parameters + parser.add_argument("--processes", type=int, help="Number of parallel processes") + parser.add_argument("--batch-size", type=int, help="Number of queries per batch") + parser.add_argument("--vector-dim", type=int, default=1536, help="Vector dimension") + parser.add_argument("--report-count", type=int, default=10, help="Number of queries between logging results") + + # Database parameters + parser.add_argument("--host", type=str, default="localhost", help="Milvus server host") + parser.add_argument("--port", type=str, default="19530", help="Milvus server port") + parser.add_argument("--collection-name", type=str, help="Collection name to query") + + # Termination conditions (at least one must be specified) + termination_group = parser.add_argument_group("termination conditions (at least one required)") + termination_group.add_argument("--runtime", type=int, help="Maximum runtime in seconds") + termination_group.add_argument("--queries", type=int, help="Total number of queries to execute") + + # Output directory + parser.add_argument("--output-dir", type=str, help="Directory to save benchmark results") + parser.add_argument("--json-output", action="store_true", help="Print benchmark results as JSON document") + + args = parser.parse_args() + + # Validate termination conditions + if args.runtime is None and args.queries is None: + parser.error("At least one termination condition (--runtime or --queries) must be specified") + + # Register signal handlers for graceful shutdown + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + print("") + print("=" * 50) + print("OUTPUT CONFIGURATION", flush=True) + print("=" * 50, flush=True) + + # Load config from YAML if specified + if args.config: + config = load_config(args.config) + args = merge_config_with_args(config, args) + + # Create output directory + if not args.output_dir: + output_dir = "vdbbench_results" + datetime_str = datetime.now().strftime("%Y%m%d_%H%M%S") + output_dir = os.path.join(output_dir, datetime_str) + else: + output_dir = args.output_dir + + os.makedirs(output_dir, exist_ok=True) + + # Save benchmark configuration + config = { + "timestamp": datetime.now().isoformat(), + "processes": args.processes, + "batch_size": args.batch_size, + "report_count": args.report_count, + "vector_dim": args.vector_dim, + "host": args.host, + "port": args.port, + "collection_name": args.collection_name, + "runtime_seconds": args.runtime, + "total_queries": args.queries + } + + print(f"Results will be saved to: {output_dir}") + print(f'Writing configuration to {output_dir}/config.json') + with open(os.path.join(output_dir, "config.json"), 'w') as f: + json.dump(config, f, indent=2) + + print("") + print("=" * 50) + print("Database Verification and Loading", flush=True) + print("=" * 50) + + connections = connect_to_milvus(args.host, args.port) + print(f'Verifing database connection and loading collection') + if collection_info := load_database(args.host, args.port, args.collection_name): + print(f"\nCOLLECTION INFORMATION: {collection_info}") + # Having an active connection in the main thread when we fork seems to cause problems + connections.disconnect("default") + else: + print("Unable to load the specified collection") + sys.exit(1) + + # Read initial disk stats + print(f'\nCollecting initial disk statistics...') + start_disk_stats = read_disk_stats() + + # Calculate queries per process if total queries specified + max_queries_per_process = None + if args.queries is not None: + max_queries_per_process = args.queries // args.processes + # Add remainder to the first process + remainder = args.queries % args.processes + + # Start worker processes + processes = [] + stagger_interval_secs = 1 / args.processes + + print("") + print("=" * 50) + print("Benchmark Execution", flush=True) + print("=" * 50) + if max_queries_per_process is not None: + print(f"Starting benchmark with {args.processes} processes and {max_queries_per_process} queries per process") + else: + print(f'Starting benchmark with {args.processes} processes and running for {args.runtime} seconds') + if args.processes > 1: + print(f"Staggering benchmark execution by {stagger_interval_secs} seconds between processes") + try: + for i in range(args.processes): + if i > 0: + time.sleep(stagger_interval_secs) + # Adjust queries for the first process if there's a remainder + process_max_queries = None + if max_queries_per_process is not None: + process_max_queries = max_queries_per_process + (remainder if i == 0 else 0) + + p = mp.Process( + target=execute_batch_queries, + args=( + i, + args.host, + args.port, + args.collection_name, + args.vector_dim, + args.batch_size, + args.report_count, + process_max_queries, + args.runtime, + output_dir, + shutdown_flag + ) + ) + print(f'Starting process {i}...') + p.start() + processes.append(p) + + # Wait for all processes to complete + for p in processes: + p.join() + except Exception as e: + print(f"Error during benchmark execution: {e}") + # Signal all processes to terminate + with shutdown_flag.get_lock(): + shutdown_flag.value = 1 + + # Wait for processes to terminate + for p in processes: + if p.is_alive(): + p.join(timeout=5) + if p.is_alive(): + p.terminate() + else: + print(f'Running single process benchmark...') + execute_batch_queries(0, args.host, args.port, args.collection_name, args.vector_dim, args.batch_size, + args.report_count, args.queries, args.runtime, output_dir, shutdown_flag) + + # Read final disk stats + print('Reading final disk statistics...') + end_disk_stats = read_disk_stats() + + # Calculate disk I/O during benchmark + disk_io_diff = calculate_disk_io_diff(start_disk_stats, end_disk_stats) + + # Calculate and print statistics + print("\nCalculating benchmark statistics...") + stats = calculate_statistics(output_dir) + + # Add disk I/O statistics to the stats dictionary + if disk_io_diff: + # Calculate totals across all devices + total_bytes_read = sum(dev_stats["bytes_read"] for dev_stats in disk_io_diff.values()) + total_bytes_written = sum(dev_stats["bytes_written"] for dev_stats in disk_io_diff.values()) + + # Add disk I/O totals to stats + stats["disk_io"] = { + "total_bytes_read": total_bytes_read, + "total_bytes_read_per_sec": total_bytes_read / stats["total_time_seconds"], + "total_bytes_written": total_bytes_written, + "total_read_formatted": format_bytes(total_bytes_read), + "total_write_formatted": format_bytes(total_bytes_written), + "devices": {} + } + + # Add per-device breakdown + for device, io_stats in disk_io_diff.items(): + bytes_read = io_stats["bytes_read"] + bytes_written = io_stats["bytes_written"] + if bytes_read > 0 or bytes_written > 0: # Only include devices with activity + stats["disk_io"]["devices"][device] = { + "bytes_read": bytes_read, + "bytes_written": bytes_written, + "read_formatted": format_bytes(bytes_read), + "write_formatted": format_bytes(bytes_written) + } + else: + stats["disk_io"] = {"error": "Disk I/O statistics not available"} + + # Save statistics to file + with open(os.path.join(output_dir, "statistics.json"), 'w') as f: + json.dump(stats, f, indent=2) + + if args.json_output: + print("\nBenchmark statistics as JSON:") + print(json.dumps(stats)) + else: + # Print summary + print("\n" + "=" * 50) + print("BENCHMARK SUMMARY") + print("=" * 50) + print(f"Total Queries: {stats.get('total_queries', 0)}") + print(f"Total Batches: {stats.get('batch_count', 0)}") + print(f'Total Runtime: {stats.get("total_time_seconds", 0):.2f} seconds') + + # Print query time statistics + print("\nQUERY STATISTICS") + print("-" * 50) + + print(f"Mean Latency: {stats.get('mean_latency_ms', 0):.2f} ms") + print(f"Median Latency: {stats.get('median_latency_ms', 0):.2f} ms") + print(f"95th Percentile: {stats.get('p95_latency_ms', 0):.2f} ms") + print(f"99th Percentile: {stats.get('p99_latency_ms', 0):.2f} ms") + print(f"99.9th Percentile: {stats.get('p999_latency_ms', 0):.2f} ms") + print(f"99.99th Percentile: {stats.get('p9999_latency_ms', 0):.2f} ms") + print(f"Throughput: {stats.get('throughput_qps', 0):.2f} queries/second") + + # Print batch time statistics + print("\nBATCH STATISTICS") + print("-" * 50) + + print(f"Mean Batch Time: {stats.get('mean_batch_time_ms', 0):.2f} ms") + print(f"Median Batch Time: {stats.get('median_batch_time_ms', 0):.2f} ms") + print(f"95th Percentile: {stats.get('p95_batch_time_ms', 0):.2f} ms") + print(f"99th Percentile: {stats.get('p99_batch_time_ms', 0):.2f} ms") + print(f"99.9th Percentile: {stats.get('p999_batch_time_ms', 0):.2f} ms") + print(f"99.99th Percentile: {stats.get('p9999_batch_time_ms', 0):.2f} ms") + print(f"Max Batch Time: {stats.get('max_batch_time_ms', 0):.2f} ms") + print(f"Batch Throughput: {1000 / stats.get('mean_batch_time_ms', float('inf')):.2f} batches/second") + + # Print disk I/O statistics + print("\nDISK I/O DURING BENCHMARK") + print("-" * 50) + if disk_io_diff: + # Calculate totals across all devices + total_bytes_read = sum(dev_stats["bytes_read"] for dev_stats in disk_io_diff.values()) + total_bytes_written = sum(dev_stats["bytes_written"] for dev_stats in disk_io_diff.values()) + + print(f"Total Bytes Read: {format_bytes(total_bytes_read)}") + print(f"Total Bytes Written: {format_bytes(total_bytes_written)}") + print("\nPer-Device Breakdown:") + + for device, io_stats in disk_io_diff.items(): + bytes_read = io_stats["bytes_read"] + bytes_written = io_stats["bytes_written"] + if bytes_read > 0 or bytes_written > 0: # Only show devices with activity + print(f" {device}:") + print(f" Read: {format_bytes(bytes_read)}") + print(f" Write: {format_bytes(bytes_written)}") + else: + print("Disk I/O statistics not available") + + print("\nDetailed results saved to:", output_dir) + print("=" * 50) + + +if __name__ == "__main__": + main() \ No newline at end of file From 8b03e052d3e21c603cf139ed5098be654bfe25b6 Mon Sep 17 00:00:00 2001 From: idevasena Date: Fri, 19 Sep 2025 03:07:39 -0700 Subject: [PATCH 4/6] fixed recall metric implementation with primary and ground truth search to reflect accuracy trade-off between search configs --- vdbbench/simple_bench.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/vdbbench/simple_bench.py b/vdbbench/simple_bench.py index ac7fc6a..9f9cd14 100644 --- a/vdbbench/simple_bench.py +++ b/vdbbench/simple_bench.py @@ -237,7 +237,8 @@ def execute_batch_queries(process_id: int, host: str, port: str, collection_name # Execute batch and measure time batch_start = time.time() try: - search_params = {"metric_type": "COSINE", "params": {"ef": 200}} + # Primary search with lower ef (what we're measuring for performance) + search_params = {"metric_type": "COSINE", "params": {"ef": 50}} # Lower ef for speed results = collection.search( data=batch_vectors, anns_field="vector", @@ -246,13 +247,13 @@ def execute_batch_queries(process_id: int, host: str, port: str, collection_name output_fields=["id"] ) - # Calculate recall by performing brute-force search as ground truth - # Note: This is computationally expensive and should be used carefully + # Ground truth search with much higher ef for better accuracy + ground_truth_params = {"metric_type": "COSINE", "params": {"ef": 1000}} # Much higher ef ground_truth_results = collection.search( data=batch_vectors, anns_field="vector", - param={"metric_type": "COSINE", "params": {"nprobe": -1}}, # Brute force search - limit=10, + param=ground_truth_params, + limit=50, # Get more results for better ground truth output_fields=["id"] ) @@ -266,7 +267,8 @@ def execute_batch_queries(process_id: int, host: str, port: str, collection_name for i, (search_result, ground_truth_result) in enumerate(zip(results, ground_truth_results)): search_ids = [hit.id for hit in search_result] - ground_truth_ids = [hit.id for hit in ground_truth_result] + # Use top 10 from ground truth with higher ef as the reference + ground_truth_ids = [hit.id for hit in ground_truth_result[:10]] recall_at_10 = calculate_recall(search_ids, ground_truth_ids, 10) recall_at_5 = calculate_recall(search_ids, ground_truth_ids, 5) From af4adb6116aa0a198de558e81c80dc95c7917bb4 Mon Sep 17 00:00:00 2001 From: idevasena Date: Fri, 19 Sep 2025 03:11:30 -0700 Subject: [PATCH 5/6] omitting pgvector support for now --- vdbbench/pgvector/pgvector_bench.py | 170 ---------------------------- vdbbench/pgvector/setup.md | 37 ------ 2 files changed, 207 deletions(-) delete mode 100644 vdbbench/pgvector/pgvector_bench.py delete mode 100644 vdbbench/pgvector/setup.md diff --git a/vdbbench/pgvector/pgvector_bench.py b/vdbbench/pgvector/pgvector_bench.py deleted file mode 100644 index 96fad6d..0000000 --- a/vdbbench/pgvector/pgvector_bench.py +++ /dev/null @@ -1,170 +0,0 @@ -import psycopg2 -import numpy as np -from psycopg2.extensions import register_adapter, AsIs -from time import time -from itertools import islice - -# Configuration settings -TOTAL_INSERT = 10_000_000 # Total initial insert data volume -UPDATE_SAMPLE = 5_000_000 # Number of data to update in Scenario 1 -DELETE_INSERT_SAMPLE = 5_000_000 # Number of delete and reinsert in Scenario 2 -BATCH_SIZE = 10000 # Batch size (adjust according to device) -DB_CONFIG = { - 'host': 'localhost', - 'port': 5432, - 'dbname': 'testdb', - 'user': 'postgres', - 'password': 'postgres' -} - -def register_vector_adapter(): - """Register numpy array/vector adapters""" - def adapt_array_to_vector(vec): - # Use square brackets instead of curly braces - return AsIs(f"'[{','.join(map(repr, vec))}]'") - # Register for both numpy arrays and regular lists - register_adapter(np.ndarray, adapt_array_to_vector) - register_adapter(list, adapt_array_to_vector) - -def create_table_and_index(conn): - """Create vector table and DISKANN index""" - with conn.cursor() as cur: - cur.execute("CREATE EXTENSION IF NOT EXISTS vector;") - cur.execute("DROP TABLE IF EXISTS vectors CASCADE;") - cur.execute( - "CREATE TABLE vectors (id SERIAL PRIMARY KEY, embedding vector(128));" - ) - cur.execute( - "CREATE INDEX vectors_index ON vectors " - "USING diskann (embedding vector_cosine_ops) WITH (lists = 1000);" - ) - conn.commit() - -def chunked_batches(iterable, size): - """Yield items in chunks (memory optimization)""" - it = iter(iterable) - while True: - chunk = list(islice(it, size)) - if not chunk: - return - yield chunk - -def generate_random_vector(): - """Generates random 1536-dimensional vector""" - return np.random.rand(1536).tolist() - -def insert_update_test(conn): - """Scenario 1: Insert 10M then update 5M entries""" - with conn.cursor() as cur: - print("++ Scenario 1: Inserting initial data ++") - start = time() - # Insert initial data in batches - for i in range(0, TOTAL_INSERT, BATCH_SIZE): - vec_batch = [ - generate_random_vector() - for _ in range(min(BATCH_SIZE, TOTAL_INSERT - i)) - ] - cur.executemany( - "INSERT INTO vectors (embedding) VALUES (%s)", - [(vec,) for vec in vec_batch] - ) - conn.commit() - insert_time = time() - start - print(f"Initial insert took: {insert_time:.2f}s") - # Prepare random update IDs (select 5M from existing data) - cur.execute("SELECT id FROM vectors") - all_ids = [row[0] for row in cur.fetchall()] - update_ids = np.random.choice(all_ids, UPDATE_SAMPLE, replace=False).tolist() - print(f"++ Starting 5M updates ++") - start_update = time() - for batch_ids in chunked_batches(update_ids, BATCH_SIZE): - new_vectors = [generate_random_vector() for _ in batch_ids] - params = [(vec, vec_id) for vec, vec_id in zip(new_vectors, batch_ids)] - cur.executemany( - "UPDATE vectors SET embedding = %s WHERE id = %s", - params - ) - conn.commit() - update_time = time() - start_update - print(f"Update completed: {update_time:.2f}s. Total time: {(update_time + insert_time)/60:.2f} minutes") - return { - 'scenario1_total': insert_time + update_time - } - -def insert_delete_reinsert_test(conn): - """Scenario 2: Insert 10M → Delete 5M → Reinsert 5M""" - with conn.cursor() as cur: - print("++ Scenario 2: Inserting initial data ++") - start = time() - # Insert initial data - for i in range(0, TOTAL_INSERT, BATCH_SIZE): - vec_batch = [ - generate_random_vector() - for _ in range(min(BATCH_SIZE, TOTAL_INSERT - i)) - ] - cur.executemany( - "INSERT INTO vectors (embedding) VALUES (%s)", - [(vec,) for vec in vec_batch] - ) - conn.commit() - insert_time = time() - start - print(f"Initial insert took: {insert_time:.2f}s") - # Prepare IDs to delete (select 5M) - cur.execute("SELECT id FROM vectors") - all_ids = [row[0] for row in cur.fetchall()] - delete_ids = np.random.choice(all_ids, DELETE_INSERT_SAMPLE, replace=False).tolist() - # Batch delete (increasing batch size for large deletions) - print("++ Deleting 5M entries ++") - del_start = time() - for batch in chunked_batches(delete_ids, 50000): - batch_str = ",".join(map(str, batch)) - cur.execute(f"DELETE FROM vectors WHERE id IN ({batch_str})") - conn.commit() - delete_time = time() - del_start - print(f"Delete completed: {delete_time:.2f}s") - # Reinsert 5M new entries - print("++ Reinserting 5M entries ++") - reinsert_start = time() - for _ in range(DELETE_INSERT_SAMPLE // BATCH_SIZE): - vec_batch = [generate_random_vector() for _ in range(BATCH_SIZE)] - cur.executemany( - "INSERT INTO vectors (embedding) VALUES (%s)", - [(vec,) for vec in vec_batch] - ) - conn.commit() - # Process the remainder - remainder = DELETE_INSERT_SAMPLE % BATCH_SIZE - if remainder: - vec_batch = [generate_random_vector() for _ in range(remainder)] - cur.executemany( - "INSERT INTO vectors (embedding) VALUES (%s)", - [(vec,) for vec in vec_batch] - ) - conn.commit() - reinsert_time = time() - reinsert_start - print(f"Reinsert completed: {reinsert_time:.2f}s. Total time: {(delete_time + reinsert_time + insert_time)/60:.2f} minutes") - return { - 'scenario2_total': insert_time + delete_time + reinsert_time - } - -def main(): - # Register vector adapters - register_vector_adapter() - results = {} - print("\n=== Scenario 1 begins ===") - with psycopg2.connect(**DB_CONFIG) as conn: - create_table_and_index(conn) - results['scenario1'] = insert_update_test(conn) - - print("\n=== Scenario 2 begins ===") - with psycopg2.connect(**DB_CONFIG) as conn: - create_table_and_index(conn) - results['scenario2'] = insert_delete_reinsert_test(conn) - - print("\nPerformance comparison:") - print(f"Scenario 1 total time: {results['scenario1']['scenario1_total']/60:.2f} minutes") - print(f"Scenario 2 total time: {results['scenario2']['scenario2_total']/60:.2f} minutes") - print(f"Scenario 2 is {(results['scenario1']['scenario1_total']/results['scenario2']['scenario2_total']*100):.1f}% faster than Scenario 1") - -if __name__ == "__main__": - main() diff --git a/vdbbench/pgvector/setup.md b/vdbbench/pgvector/setup.md deleted file mode 100644 index b6f97d1..0000000 --- a/vdbbench/pgvector/setup.md +++ /dev/null @@ -1,37 +0,0 @@ -### Install postgresql-17.0 from source code - -#### Get from source -The PostgreSQL source code for released versions can be obtained from website: https://www.postgresql.org/ftp/source/. Download the postgresql-17.0.tar.gz , then unpack it: - -``` -tar xf postgresql-17.0.tar.gz -``` - -This will create a directory postgresql-17.0 under the current directory with the PostgreSQL sources. Change into that directory for the rest of the installation procedure. - -#### Building and Installation with Autoconf and Make - -``` -cd postgresql-17.0 - -./configure - -make - -su - -make install -``` - -#### Install pgvector from source code - -``` -git clone https://github.com/pgvector/pgvector.git -b v0.8.0 - -cd pgvector - -make PG_CONFIG=/usr/local/pgsql/bin/pg_config - -make PG_CONFIG=/usr/local/pgsql/bin/pg_config install - -``` From e8d7587f84090d6ccaca00a8d4bcde0b703a7fdf Mon Sep 17 00:00:00 2001 From: idevasena Date: Fri, 19 Sep 2025 03:13:23 -0700 Subject: [PATCH 6/6] clean up --- vdbbench/simple_bench_bkp.py | 668 ----------------------------------- 1 file changed, 668 deletions(-) delete mode 100644 vdbbench/simple_bench_bkp.py diff --git a/vdbbench/simple_bench_bkp.py b/vdbbench/simple_bench_bkp.py deleted file mode 100644 index b679cd1..0000000 --- a/vdbbench/simple_bench_bkp.py +++ /dev/null @@ -1,668 +0,0 @@ -#!/usr/bin/env python3 -""" -Milvus Vector Database Benchmark Script - -This script executes random vector queries against a Milvus collection using multiple processes. -It measures and reports query latency statistics. -""" - -import argparse -import multiprocessing as mp -import numpy as np -import os -import time -import json -import csv -import uuid -from datetime import datetime -from pathlib import Path -from typing import Dict, List, Any, Optional, Tuple, Union -import signal -import sys -from tabulate import tabulate - -from vdbbench.config_loader import load_config, merge_config_with_args -from vdbbench.list_collections import get_collection_info - -try: - from pymilvus import connections, Collection, utility -except ImportError: - print("Error: pymilvus package not found. Please install it with 'pip install pymilvus'") - sys.exit(1) - -STAGGER_INTERVAL_SEC = 0.1 - -# Global flag for graceful shutdown -shutdown_flag = mp.Value('i', 0) - -# CSV header fields -csv_fields = [ - "process_id", - "batch_id", - "timestamp", - "batch_size", - "batch_time_seconds", - "avg_query_time_seconds", - "success" -] - - -def signal_handler(sig, frame): - """Handle interrupt signals to gracefully shut down worker processes""" - print("\nReceived interrupt signal. Shutting down workers gracefully...") - with shutdown_flag.get_lock(): - shutdown_flag.value = 1 - - -def read_disk_stats() -> Dict[str, Dict[str, int]]: - """ - Read disk I/O statistics from /proc/diskstats - - Returns: - Dictionary mapping device names to their read/write statistics - """ - stats = {} - try: - with open('/proc/diskstats', 'r') as f: - for line in f: - parts = line.strip().split() - if len(parts) >= 14: # Ensure we have enough fields - device = parts[2] - # Fields based on kernel documentation - # https://www.kernel.org/doc/Documentation/ABI/testing/procfs-diskstats - sectors_read = int(parts[5]) # sectors read - sectors_written = int(parts[9]) # sectors written - - # 1 sector = 512 bytes - bytes_read = sectors_read * 512 - bytes_written = sectors_written * 512 - - stats[device] = { - "bytes_read": bytes_read, - "bytes_written": bytes_written - } - return stats - except FileNotFoundError: - print("Warning: /proc/diskstats not available (non-Linux system)") - return {} - except Exception as e: - print(f"Error reading disk stats: {e}") - return {} - - -def format_bytes(bytes_value: int) -> str: - """Format bytes into human-readable format with appropriate units""" - units = ['B', 'KB', 'MB', 'GB', 'TB'] - unit_index = 0 - value = float(bytes_value) - - while value > 1024 and unit_index < len(units) - 1: - value /= 1024 - unit_index += 1 - - return f"{value:.2f} {units[unit_index]}" - - -def calculate_disk_io_diff(start_stats: Dict[str, Dict[str, int]], - end_stats: Dict[str, Dict[str, int]]) -> Dict[str, Dict[str, int]]: - """Calculate the difference in disk I/O between start and end measurements""" - diff_stats = {} - - for device in end_stats: - if device in start_stats: - diff_stats[device] = { - "bytes_read": end_stats[device]["bytes_read"] - start_stats[device]["bytes_read"], - "bytes_written": end_stats[device]["bytes_written"] - start_stats[device]["bytes_written"] - } - - return diff_stats - - -def generate_random_vector(dim: int) -> List[float]: - """Generate a random normalized vector of the specified dimension""" - vec = np.random.random(dim).astype(np.float32) - return (vec / np.linalg.norm(vec)).tolist() - - -def connect_to_milvus(host: str, port: str) -> connections: - """Establish connection to Milvus server""" - try: - connections.connect(alias="default", host=host, port=port) - return connections - except Exception as e: - print(f"Failed to connect to Milvus: {e}") - return False - - -def execute_batch_queries(process_id: int, host: str, port: str, collection_name: str, vector_dim: int, batch_size: int, - report_count: int, max_queries: Optional[int], runtime_seconds: Optional[int], output_dir: str, - shutdown_flag: mp.Value) -> None: - """ - Execute batches of vector queries and log results to disk - - Args: - process_id: ID of the current process - host: Milvus server host - port: Milvus server port - collection_name: Name of the collection to query - vector_dim: Dimension of vectors - batch_size: Number of queries to execute in each batch - max_queries: Maximum number of queries to execute (None for unlimited) - runtime_seconds: Maximum runtime in seconds (None for unlimited) - output_dir: Directory to save results - shutdown_flag: Shared value to signal process termination - """ - print(f'Process {process_id} initialized') - # Connect to Milvus - connections = connect_to_milvus(host, port) - if not connections: - print(f'Process {process_id} - No milvus connection') - return - - # Get collection - try: - collection = Collection(collection_name) - print(f'Process {process_id} - Loading collection') - collection.load() - except Exception as e: - print(f"Process {process_id}: Failed to load collection: {e}") - return - - # Prepare output file - output_file = Path(output_dir) / f"milvus_benchmark_p{process_id}.csv" - sys.stdout.write(f"Process {process_id}: Writing results to {output_file}\r\n") - # Create output directory if it doesn't exist - os.makedirs(os.path.dirname(output_file), exist_ok=True) - - # Track execution - start_time = time.time() - query_count = 0 - batch_count = 0 - - sys.stdout.write(f"Process {process_id}: Starting benchmark ...\r\n") - sys.stdout.flush() - - try: - with open(output_file, 'w') as f: - writer = csv.DictWriter(f, fieldnames=csv_fields) - writer.writeheader() - while True: - # Check if we should terminate - with shutdown_flag.get_lock(): - if shutdown_flag.value == 1: - break - - # Check termination conditions - current_time = time.time() - elapsed_time = current_time - start_time - - if runtime_seconds is not None and elapsed_time >= runtime_seconds: - break - - if max_queries is not None and query_count >= max_queries: - break - - # Generate batch of query vectors - batch_vectors = [generate_random_vector(vector_dim) for _ in range(batch_size)] - - # Execute batch and measure time - batch_start = time.time() - try: - search_params = {"metric_type": "COSINE", "params": {"ef": 200}} - results = collection.search( - data=batch_vectors, - anns_field="vector", - param=search_params, - limit=10, - output_fields=["id"] - ) - batch_end = time.time() - batch_success = True - except Exception as e: - print(f"Process {process_id}: Search error: {e}") - batch_end = time.time() - batch_success = False - - # Record batch results - batch_time = batch_end - batch_start - batch_count += 1 - query_count += batch_size - - # Log batch results to file - batch_data = { - "process_id": process_id, - "batch_id": batch_count, - "timestamp": current_time, - "batch_size": batch_size, - "batch_time_seconds": batch_time, - "avg_query_time_seconds": batch_time / batch_size, - "success": batch_success - } - - writer.writerow(batch_data) - f.flush() # Ensure data is written to disk immediately - - # Print progress - if batch_count % report_count == 0: - sys.stdout.write(f"Process {process_id}: Completed {query_count} queries in {elapsed_time:.2f} seconds.\r\n") - sys.stdout.flush() - - except Exception as e: - print(f"Process {process_id}: Error during benchmark: {e}") - - finally: - # Disconnect from Milvus - try: - connections.disconnect("default") - except: - pass - - print( - f"Process {process_id}: Finished. Executed {query_count} queries in {time.time() - start_time:.2f} seconds", flush=True) - - -def calculate_statistics(results_dir: str) -> Dict[str, Union[str, int, float, Dict[str, int]]]: - """Calculate statistics from benchmark results""" - import pandas as pd - - # Find all result files - file_paths = list(Path(results_dir).glob("milvus_benchmark_p*.csv")) - - if not file_paths: - return {"error": "No benchmark result files found"} - - # Read and concatenate all CSV files into a single DataFrame - dfs = [] - for file_path in file_paths: - try: - df = pd.read_csv(file_path) - if not df.empty: - dfs.append(df) - except Exception as e: - print(f"Error reading result file {file_path}: {e}") - - if not dfs: - return {"error": "No valid data found in benchmark result files"} - - # Concatenate all dataframes - all_data = pd.concat(dfs, ignore_index=True) - all_data.sort_values('timestamp', inplace=True) - - # Calculate start and end times - file_start_time = min(all_data['timestamp']) - file_end_time = max(all_data['timestamp'] + all_data['batch_time_seconds']) - total_time_seconds = file_end_time - file_start_time - - # Each row represents a batch, so we need to expand based on batch_size - all_latencies = [] - for _, row in all_data.iterrows(): - query_time_ms = row['avg_query_time_seconds'] * 1000 - all_latencies.extend([query_time_ms] * row['batch_size']) - - # Convert batch times to milliseconds - batch_times_ms = all_data['batch_time_seconds'] * 1000 - - # Calculate statistics - latencies = np.array(all_latencies) - batch_times = np.array(batch_times_ms) - total_queries = len(latencies) - - stats = { - "total_queries": total_queries, - "total_time_seconds": total_time_seconds, - "min_latency_ms": float(np.min(latencies)), - "max_latency_ms": float(np.max(latencies)), - "mean_latency_ms": float(np.mean(latencies)), - "median_latency_ms": float(np.median(latencies)), - "p95_latency_ms": float(np.percentile(latencies, 95)), - "p99_latency_ms": float(np.percentile(latencies, 99)), - "p999_latency_ms": float(np.percentile(latencies, 99.9)), - "p9999_latency_ms": float(np.percentile(latencies, 99.99)), - "throughput_qps": float(total_queries / total_time_seconds) if total_time_seconds > 0 else 0, - - # Batch time statistics - "batch_count": len(batch_times), - "min_batch_time_ms": float(np.min(batch_times)) if len(batch_times) > 0 else 0, - "max_batch_time_ms": float(np.max(batch_times)) if len(batch_times) > 0 else 0, - "mean_batch_time_ms": float(np.mean(batch_times)) if len(batch_times) > 0 else 0, - "median_batch_time_ms": float(np.median(batch_times)) if len(batch_times) > 0 else 0, - "p95_batch_time_ms": float(np.percentile(batch_times, 95)) if len(batch_times) > 0 else 0, - "p99_batch_time_ms": float(np.percentile(batch_times, 99)) if len(batch_times) > 0 else 0, - "p999_batch_time_ms": float(np.percentile(batch_times, 99.9)) if len(batch_times) > 0 else 0, - "p9999_batch_time_ms": float(np.percentile(batch_times, 99.99)) if len(batch_times) > 0 else 0 - } - - return stats - - -def load_database(host: str, port: str, collection_name: str, reload=False) -> Union[dict, None]: - print(f'Connecting to Milvus server at {host}:{port}...', flush=True) - connections = connect_to_milvus(host, port) - if not connections: - print(f'Unable to connect to Milvus server', flush=True) - return None - - # Connect to Milvus - try: - collection = Collection(collection_name) - except Exception as e: - print(f"Unable to connect to Milvus collection {collection_name}: {e}", flush=True) - return None - - try: - # Get the load state of the collection: - state = utility.load_state(collection_name) - if reload or state.name != "Loaded": - if reload: - print(f'Reloading the collection {collection_name}...') - else: - print(f'Loading the collection {collection_name}...') - start_load_time = time.time() - collection.load() - load_time = time.time() - start_load_time - print(f'Collection {collection_name} loaded in {load_time:.2f} seconds', flush=True) - if not reload and state.name == "Loaded": - print(f'Collection {collection_name} already reloaded and not reloading...') - - except Exception as e: - print(f'Unable to load collection {collection_name}: {e}') - return None - - print(f'Getting collection statistics...', flush=True) - collection_info = get_collection_info(collection_name, release=False) - table_data = [] - - index_types = ", ".join([idx.get("index_type", "N/A") for idx in collection_info.get("index_info", [])]) - metric_types = ", ".join([idx.get("metric_type", "N/A") for idx in collection_info.get("index_info", [])]) - - row = [ - collection_info["name"], - collection_info.get("row_count", "N/A"), - collection_info.get("dimension", "N/A"), - index_types, - metric_types, - len(collection_info.get("partitions", [])) - ] - table_data.append(row) - - headers = ["Collection Name", "Vector Count", "Dimension", "Index Types", "Metric Types", "Partitions"] - print(f'\nTabulating information...', flush=True) - tabulated_data = tabulate(table_data, headers=headers, tablefmt="grid") - print(tabulated_data, flush=True) - - return collection_info - - -def main(): - parser = argparse.ArgumentParser(description="Milvus Vector Database Benchmark") - - parser.add_argument("--config", type=str, help="Path to vdbbench config file") - - # Required parameters - parser.add_argument("--processes", type=int, help="Number of parallel processes") - parser.add_argument("--batch-size", type=int, help="Number of queries per batch") - parser.add_argument("--vector-dim", type=int, default=1536, help="Vector dimension") - parser.add_argument("--report-count", type=int, default=10, help="Number of queries between logging results") - - # Database parameters - parser.add_argument("--host", type=str, default="localhost", help="Milvus server host") - parser.add_argument("--port", type=str, default="19530", help="Milvus server port") - parser.add_argument("--collection-name", type=str, help="Collection name to query") - - # Termination conditions (at least one must be specified) - termination_group = parser.add_argument_group("termination conditions (at least one required)") - termination_group.add_argument("--runtime", type=int, help="Maximum runtime in seconds") - termination_group.add_argument("--queries", type=int, help="Total number of queries to execute") - - # Output directory - parser.add_argument("--output-dir", type=str, help="Directory to save benchmark results") - parser.add_argument("--json-output", action="store_true", help="Print benchmark results as JSON document") - - args = parser.parse_args() - - # Validate termination conditions - if args.runtime is None and args.queries is None: - parser.error("At least one termination condition (--runtime or --queries) must be specified") - - # Register signal handlers for graceful shutdown - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) - - print("") - print("=" * 50) - print("OUTPUT CONFIGURATION", flush=True) - print("=" * 50, flush=True) - - # Load config from YAML if specified - if args.config: - config = load_config(args.config) - args = merge_config_with_args(config, args) - - # Create output directory - if not args.output_dir: - output_dir = "vdbbench_results" - datetime_str = datetime.now().strftime("%Y%m%d_%H%M%S") - output_dir = os.path.join(output_dir, datetime_str) - else: - output_dir = args.output_dir - - os.makedirs(output_dir, exist_ok=True) - - # Save benchmark configuration - config = { - "timestamp": datetime.now().isoformat(), - "processes": args.processes, - "batch_size": args.batch_size, - "report_count": args.report_count, - "vector_dim": args.vector_dim, - "host": args.host, - "port": args.port, - "collection_name": args.collection_name, - "runtime_seconds": args.runtime, - "total_queries": args.queries - } - - print(f"Results will be saved to: {output_dir}") - print(f'Writing configuration to {output_dir}/config.json') - with open(os.path.join(output_dir, "config.json"), 'w') as f: - json.dump(config, f, indent=2) - - print("") - print("=" * 50) - print("Database Verification and Loading", flush=True) - print("=" * 50) - - connections = connect_to_milvus(args.host, args.port) - print(f'Verifing database connection and loading collection') - if collection_info := load_database(args.host, args.port, args.collection_name): - print(f"\nCOLLECTION INFORMATION: {collection_info}") - # Having an active connection in the main thread when we fork seems to cause problems - connections.disconnect("default") - else: - print("Unable to load the specified collection") - sys.exit(1) - - # Read initial disk stats - print(f'\nCollecting initial disk statistics...') - start_disk_stats = read_disk_stats() - - # Calculate queries per process if total queries specified - max_queries_per_process = None - if args.queries is not None: - max_queries_per_process = args.queries // args.processes - # Add remainder to the first process - remainder = args.queries % args.processes - - # Start worker processes - processes = [] - stagger_interval_secs = 1 / args.processes - - print("") - print("=" * 50) - print("Benchmark Execution", flush=True) - print("=" * 50) - if max_queries_per_process is not None: - print(f"Starting benchmark with {args.processes} processes and {max_queries_per_process} queries per process") - else: - print(f'Starting benchmark with {args.processes} processes and running for {args.runtime} seconds') - if args.processes > 1: - print(f"Staggering benchmark execution by {stagger_interval_secs} seconds between processes") - try: - for i in range(args.processes): - if i > 0: - time.sleep(stagger_interval_secs) - # Adjust queries for the first process if there's a remainder - process_max_queries = None - if max_queries_per_process is not None: - process_max_queries = max_queries_per_process + (remainder if i == 0 else 0) - - p = mp.Process( - target=execute_batch_queries, - args=( - i, - args.host, - args.port, - args.collection_name, - args.vector_dim, - args.batch_size, - args.report_count, - process_max_queries, - args.runtime, - output_dir, - shutdown_flag - ) - ) - print(f'Starting process {i}...') - p.start() - processes.append(p) - - # Wait for all processes to complete - for p in processes: - p.join() - except Exception as e: - print(f"Error during benchmark execution: {e}") - # Signal all processes to terminate - with shutdown_flag.get_lock(): - shutdown_flag.value = 1 - - # Wait for processes to terminate - for p in processes: - if p.is_alive(): - p.join(timeout=5) - if p.is_alive(): - p.terminate() - else: - print(f'Running single process benchmark...') - execute_batch_queries(0, args.host, args.port, args.collection_name, args.vector_dim, args.batch_size, - args.report_count, args.queries, args.runtime, output_dir, shutdown_flag) - - # Read final disk stats - print('Reading final disk statistics...') - end_disk_stats = read_disk_stats() - - # Calculate disk I/O during benchmark - disk_io_diff = calculate_disk_io_diff(start_disk_stats, end_disk_stats) - - # Calculate and print statistics - print("\nCalculating benchmark statistics...") - stats = calculate_statistics(output_dir) - - # Add disk I/O statistics to the stats dictionary - if disk_io_diff: - # Calculate totals across all devices - total_bytes_read = sum(dev_stats["bytes_read"] for dev_stats in disk_io_diff.values()) - total_bytes_written = sum(dev_stats["bytes_written"] for dev_stats in disk_io_diff.values()) - - # Add disk I/O totals to stats - stats["disk_io"] = { - "total_bytes_read": total_bytes_read, - "total_bytes_read_per_sec": total_bytes_read / stats["total_time_seconds"], - "total_bytes_written": total_bytes_written, - "total_read_formatted": format_bytes(total_bytes_read), - "total_write_formatted": format_bytes(total_bytes_written), - "devices": {} - } - - # Add per-device breakdown - for device, io_stats in disk_io_diff.items(): - bytes_read = io_stats["bytes_read"] - bytes_written = io_stats["bytes_written"] - if bytes_read > 0 or bytes_written > 0: # Only include devices with activity - stats["disk_io"]["devices"][device] = { - "bytes_read": bytes_read, - "bytes_written": bytes_written, - "read_formatted": format_bytes(bytes_read), - "write_formatted": format_bytes(bytes_written) - } - else: - stats["disk_io"] = {"error": "Disk I/O statistics not available"} - - # Save statistics to file - with open(os.path.join(output_dir, "statistics.json"), 'w') as f: - json.dump(stats, f, indent=2) - - if args.json_output: - print("\nBenchmark statistics as JSON:") - print(json.dumps(stats)) - else: - # Print summary - print("\n" + "=" * 50) - print("BENCHMARK SUMMARY") - print("=" * 50) - print(f"Total Queries: {stats.get('total_queries', 0)}") - print(f"Total Batches: {stats.get('batch_count', 0)}") - print(f'Total Runtime: {stats.get("total_time_seconds", 0):.2f} seconds') - - # Print query time statistics - print("\nQUERY STATISTICS") - print("-" * 50) - - print(f"Mean Latency: {stats.get('mean_latency_ms', 0):.2f} ms") - print(f"Median Latency: {stats.get('median_latency_ms', 0):.2f} ms") - print(f"95th Percentile: {stats.get('p95_latency_ms', 0):.2f} ms") - print(f"99th Percentile: {stats.get('p99_latency_ms', 0):.2f} ms") - print(f"99.9th Percentile: {stats.get('p999_latency_ms', 0):.2f} ms") - print(f"99.99th Percentile: {stats.get('p9999_latency_ms', 0):.2f} ms") - print(f"Throughput: {stats.get('throughput_qps', 0):.2f} queries/second") - - # Print batch time statistics - print("\nBATCH STATISTICS") - print("-" * 50) - - print(f"Mean Batch Time: {stats.get('mean_batch_time_ms', 0):.2f} ms") - print(f"Median Batch Time: {stats.get('median_batch_time_ms', 0):.2f} ms") - print(f"95th Percentile: {stats.get('p95_batch_time_ms', 0):.2f} ms") - print(f"99th Percentile: {stats.get('p99_batch_time_ms', 0):.2f} ms") - print(f"99.9th Percentile: {stats.get('p999_batch_time_ms', 0):.2f} ms") - print(f"99.99th Percentile: {stats.get('p9999_batch_time_ms', 0):.2f} ms") - print(f"Max Batch Time: {stats.get('max_batch_time_ms', 0):.2f} ms") - print(f"Batch Throughput: {1000 / stats.get('mean_batch_time_ms', float('inf')):.2f} batches/second") - - # Print disk I/O statistics - print("\nDISK I/O DURING BENCHMARK") - print("-" * 50) - if disk_io_diff: - # Calculate totals across all devices - total_bytes_read = sum(dev_stats["bytes_read"] for dev_stats in disk_io_diff.values()) - total_bytes_written = sum(dev_stats["bytes_written"] for dev_stats in disk_io_diff.values()) - - print(f"Total Bytes Read: {format_bytes(total_bytes_read)}") - print(f"Total Bytes Written: {format_bytes(total_bytes_written)}") - print("\nPer-Device Breakdown:") - - for device, io_stats in disk_io_diff.items(): - bytes_read = io_stats["bytes_read"] - bytes_written = io_stats["bytes_written"] - if bytes_read > 0 or bytes_written > 0: # Only show devices with activity - print(f" {device}:") - print(f" Read: {format_bytes(bytes_read)}") - print(f" Write: {format_bytes(bytes_written)}") - else: - print("Disk I/O statistics not available") - - print("\nDetailed results saved to:", output_dir) - print("=" * 50) - - -if __name__ == "__main__": - main() \ No newline at end of file