diff --git a/milvus-cluster.yml b/milvus-cluster.yml new file mode 100644 index 0000000..064fa3e --- /dev/null +++ b/milvus-cluster.yml @@ -0,0 +1,251 @@ +version: '3.5' + +services: + etcd: + container_name: milvus-etcd + image: quay.io/coreos/etcd:v3.6.4 + environment: + - ETCD_AUTO_COMPACTION_MODE=revision + - ETCD_AUTO_COMPACTION_RETENTION=1000 + - ETCD_QUOTA_BACKEND_BYTES=4294967296 + - ETCD_SNAPSHOT_COUNT=50000 + volumes: + - /mnt/vdb/etcd:/etcd + command: etcd -advertise-client-urls=http://0.0.0.0:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd + ports: + - "2379:2379" + healthcheck: + test: ["CMD", "etcdctl", "endpoint", "health"] + interval: 30s + timeout: 20s + retries: 3 + + minio: + container_name: milvus-minio + image: minio/minio:RELEASE.2024-12-18T13-15-44Z + environment: + MINIO_ACCESS_KEY: minioadmin + MINIO_SECRET_KEY: minioadmin + ports: + - "9001:9001" + - "9000:9000" + volumes: + - /mnt/vdb/minio:/minio_data + command: minio server /minio_data --console-address ":9001" + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"] + interval: 30s + timeout: 20s + retries: 3 + + rootcoord: + container_name: milvus-rootcoord + image: milvusdb/milvus:v2.6.0 + command: ["milvus", "run", "rootcoord"] + environment: + ETCD_ENDPOINTS: etcd:2379 + MINIO_ADDRESS: minio:9000 + MINIO_REGION: us-east-1 + volumes: + - /mnt/vdb/milvus:/var/lib/milvus + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:53100/healthz"] + interval: 30s + start_period: 90s + timeout: 20s + retries: 3 + ports: + - "53100:53100" + depends_on: + - "etcd" + - "minio" + + proxy: + container_name: milvus-proxy + image: milvusdb/milvus:v2.6.0 + command: ["milvus", "run", "proxy"] + environment: + ETCD_ENDPOINTS: etcd:2379 + MINIO_ADDRESS: minio:9000 + MINIO_REGION: us-east-1 + volumes: + - /mnt/vdb/milvus:/var/lib/milvus + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9091/healthz"] + interval: 30s + start_period: 90s + timeout: 20s + retries: 3 + ports: + - "19530:19530" + - "9091:9091" + depends_on: + - "etcd" + - "minio" + - "rootcoord" + - "querycoord" + - "datacoord" + - "indexcoord" + + querycoord: + container_name: milvus-querycoord + image: milvusdb/milvus:v2.6.0 + command: ["milvus", "run", "querycoord"] + environment: + ETCD_ENDPOINTS: etcd:2379 + MINIO_ADDRESS: minio:9000 + MINIO_REGION: us-east-1 + volumes: + - /mnt/vdb/milvus:/var/lib/milvus + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:19531/healthz"] + interval: 30s + start_period: 90s + timeout: 20s + retries: 3 + ports: + - "19531:19531" + depends_on: + - "etcd" + - "minio" + - "rootcoord" + + querynode1: + container_name: milvus-querynode1 + image: milvusdb/milvus:v2.6.0 + command: ["milvus", "run", "querynode"] + environment: + ETCD_ENDPOINTS: etcd:2379 + MINIO_ADDRESS: minio:9000 + MINIO_REGION: us-east-1 + volumes: + - /mnt/vdb/milvus:/var/lib/milvus + depends_on: + - "etcd" + - "minio" + - "querycoord" + + querynode2: + container_name: milvus-querynode2 + image: milvusdb/milvus:v2.6.0 + command: ["milvus", "run", "querynode"] + environment: + ETCD_ENDPOINTS: etcd:2379 + MINIO_ADDRESS: minio:9000 + MINIO_REGION: us-east-1 + volumes: + - /mnt/vdb/milvus:/var/lib/milvus + depends_on: + - "etcd" + - "minio" + - "querycoord" + + datacoord: + container_name: milvus-datacoord + image: milvusdb/milvus:v2.6.0 + command: ["milvus", "run", "datacoord"] + environment: + ETCD_ENDPOINTS: etcd:2379 + MINIO_ADDRESS: minio:9000 + MINIO_REGION: us-east-1 + volumes: + - /mnt/vdb/milvus:/var/lib/milvus + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:13333/healthz"] + interval: 30s + start_period: 90s + timeout: 20s + retries: 3 + ports: + - "13333:13333" + depends_on: + - "etcd" + - "minio" + - "rootcoord" + + datanode1: + container_name: milvus-datanode1 + image: milvusdb/milvus:v2.6.0 + command: ["milvus", "run", "datanode"] + environment: + ETCD_ENDPOINTS: etcd:2379 + MINIO_ADDRESS: minio:9000 + MINIO_REGION: us-east-1 + volumes: + - /mnt/vdb/milvus:/var/lib/milvus + depends_on: + - "etcd" + - "minio" + - "datacoord" + + datanode2: + container_name: milvus-datanode2 + image: milvusdb/milvus:v2.6.0 + command: ["milvus", "run", "datanode"] + environment: + ETCD_ENDPOINTS: etcd:2379 + MINIO_ADDRESS: minio:9000 + MINIO_REGION: us-east-1 + volumes: + - /mnt/vdb/milvus:/var/lib/milvus + depends_on: + - "etcd" + - "minio" + - "datacoord" + + indexcoord: + container_name: milvus-indexcoord + image: milvusdb/milvus:v2.6.0 + command: ["milvus", "run", "indexcoord"] + environment: + ETCD_ENDPOINTS: etcd:2379 + MINIO_ADDRESS: minio:9000 + MINIO_REGION: us-east-1 + volumes: + - /mnt/vdb/milvus:/var/lib/milvus + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:31000/healthz"] + interval: 30s + start_period: 90s + timeout: 20s + retries: 3 + ports: + - "31000:31000" + depends_on: + - "etcd" + - "minio" + - "rootcoord" + + indexnode1: + container_name: milvus-indexnode1 + image: milvusdb/milvus:v2.6.0 + command: ["milvus", "run", "indexnode"] + environment: + ETCD_ENDPOINTS: etcd:2379 + MINIO_ADDRESS: minio:9000 + MINIO_REGION: us-east-1 + volumes: + - /mnt/vdb/milvus:/var/lib/milvus + depends_on: + - "etcd" + - "minio" + - "indexcoord" + + indexnode2: + container_name: milvus-indexnode2 + image: milvusdb/milvus:v2.6.0 + command: ["milvus", "run", "indexnode"] + environment: + ETCD_ENDPOINTS: etcd:2379 + MINIO_ADDRESS: minio:9000 + MINIO_REGION: us-east-1 + volumes: + - /mnt/vdb/milvus:/var/lib/milvus + depends_on: + - "etcd" + - "minio" + - "indexcoord" + +networks: + default: + name: milvus diff --git a/vdbbench/simple_bench.py b/vdbbench/simple_bench.py index b679cd1..9f9cd14 100644 --- a/vdbbench/simple_bench.py +++ b/vdbbench/simple_bench.py @@ -43,7 +43,10 @@ "batch_size", "batch_time_seconds", "avg_query_time_seconds", - "success" + "success", + "recall_at_10", + "recall_at_5", + "recall_at_1" ] @@ -124,6 +127,32 @@ def generate_random_vector(dim: int) -> List[float]: return (vec / np.linalg.norm(vec)).tolist() +def calculate_recall(search_results, ground_truth_results, k: int) -> float: + """ + Calculate recall@k between search results and ground truth results + + Args: + search_results: List of search result IDs from the approximate search + ground_truth_results: List of ground truth IDs from the exact search + k: Number of top results to consider + + Returns: + Recall@k as a float between 0 and 1 + """ + if not search_results or not ground_truth_results: + return 0.0 + + # Take only top k results from both sets + search_k = set(search_results[:k]) + ground_truth_k = set(ground_truth_results[:k]) + + # Calculate intersection + intersection = search_k.intersection(ground_truth_k) + + # Recall@k = |intersection| / |ground_truth_k| + return len(intersection) / len(ground_truth_k) if len(ground_truth_k) > 0 else 0.0 + + def connect_to_milvus(host: str, port: str) -> connections: """Establish connection to Milvus server""" try: @@ -208,7 +237,8 @@ def execute_batch_queries(process_id: int, host: str, port: str, collection_name # Execute batch and measure time batch_start = time.time() try: - search_params = {"metric_type": "COSINE", "params": {"ef": 200}} + # Primary search with lower ef (what we're measuring for performance) + search_params = {"metric_type": "COSINE", "params": {"ef": 50}} # Lower ef for speed results = collection.search( data=batch_vectors, anns_field="vector", @@ -216,12 +246,50 @@ def execute_batch_queries(process_id: int, host: str, port: str, collection_name limit=10, output_fields=["id"] ) + + # Ground truth search with much higher ef for better accuracy + ground_truth_params = {"metric_type": "COSINE", "params": {"ef": 1000}} # Much higher ef + ground_truth_results = collection.search( + data=batch_vectors, + anns_field="vector", + param=ground_truth_params, + limit=50, # Get more results for better ground truth + output_fields=["id"] + ) + batch_end = time.time() batch_success = True + + # Calculate recall metrics for the batch + batch_recall_at_10 = [] + batch_recall_at_5 = [] + batch_recall_at_1 = [] + + for i, (search_result, ground_truth_result) in enumerate(zip(results, ground_truth_results)): + search_ids = [hit.id for hit in search_result] + # Use top 10 from ground truth with higher ef as the reference + ground_truth_ids = [hit.id for hit in ground_truth_result[:10]] + + recall_at_10 = calculate_recall(search_ids, ground_truth_ids, 10) + recall_at_5 = calculate_recall(search_ids, ground_truth_ids, 5) + recall_at_1 = calculate_recall(search_ids, ground_truth_ids, 1) + + batch_recall_at_10.append(recall_at_10) + batch_recall_at_5.append(recall_at_5) + batch_recall_at_1.append(recall_at_1) + + # Average recall across the batch + avg_recall_at_10 = np.mean(batch_recall_at_10) if batch_recall_at_10 else 0.0 + avg_recall_at_5 = np.mean(batch_recall_at_5) if batch_recall_at_5 else 0.0 + avg_recall_at_1 = np.mean(batch_recall_at_1) if batch_recall_at_1 else 0.0 + except Exception as e: print(f"Process {process_id}: Search error: {e}") batch_end = time.time() batch_success = False + avg_recall_at_10 = 0.0 + avg_recall_at_5 = 0.0 + avg_recall_at_1 = 0.0 # Record batch results batch_time = batch_end - batch_start @@ -236,7 +304,10 @@ def execute_batch_queries(process_id: int, host: str, port: str, collection_name "batch_size": batch_size, "batch_time_seconds": batch_time, "avg_query_time_seconds": batch_time / batch_size, - "success": batch_success + "success": batch_success, + "recall_at_10": avg_recall_at_10, + "recall_at_5": avg_recall_at_5, + "recall_at_1": avg_recall_at_1 } writer.writerow(batch_data) @@ -307,6 +378,22 @@ def calculate_statistics(results_dir: str) -> Dict[str, Union[str, int, float, D batch_times = np.array(batch_times_ms) total_queries = len(latencies) + # Calculate recall statistics + recall_at_10_values = [] + recall_at_5_values = [] + recall_at_1_values = [] + + for _, row in all_data.iterrows(): + if row['success'] and 'recall_at_10' in row: + # Replicate recall values for each query in the batch + recall_at_10_values.extend([row['recall_at_10']] * row['batch_size']) + recall_at_5_values.extend([row['recall_at_5']] * row['batch_size']) + recall_at_1_values.extend([row['recall_at_1']] * row['batch_size']) + + recall_at_10_array = np.array(recall_at_10_values) if recall_at_10_values else np.array([0]) + recall_at_5_array = np.array(recall_at_5_values) if recall_at_5_values else np.array([0]) + recall_at_1_array = np.array(recall_at_1_values) if recall_at_1_values else np.array([0]) + stats = { "total_queries": total_queries, "total_time_seconds": total_time_seconds, @@ -329,7 +416,23 @@ def calculate_statistics(results_dir: str) -> Dict[str, Union[str, int, float, D "p95_batch_time_ms": float(np.percentile(batch_times, 95)) if len(batch_times) > 0 else 0, "p99_batch_time_ms": float(np.percentile(batch_times, 99)) if len(batch_times) > 0 else 0, "p999_batch_time_ms": float(np.percentile(batch_times, 99.9)) if len(batch_times) > 0 else 0, - "p9999_batch_time_ms": float(np.percentile(batch_times, 99.99)) if len(batch_times) > 0 else 0 + "p9999_batch_time_ms": float(np.percentile(batch_times, 99.99)) if len(batch_times) > 0 else 0, + + # Recall statistics + "mean_recall_at_10": float(np.mean(recall_at_10_array)), + "median_recall_at_10": float(np.median(recall_at_10_array)), + "min_recall_at_10": float(np.min(recall_at_10_array)), + "max_recall_at_10": float(np.max(recall_at_10_array)), + + "mean_recall_at_5": float(np.mean(recall_at_5_array)), + "median_recall_at_5": float(np.median(recall_at_5_array)), + "min_recall_at_5": float(np.min(recall_at_5_array)), + "max_recall_at_5": float(np.max(recall_at_5_array)), + + "mean_recall_at_1": float(np.mean(recall_at_1_array)), + "median_recall_at_1": float(np.median(recall_at_1_array)), + "min_recall_at_1": float(np.min(recall_at_1_array)), + "max_recall_at_1": float(np.max(recall_at_1_array)) } return stats @@ -638,6 +741,28 @@ def main(): print(f"Max Batch Time: {stats.get('max_batch_time_ms', 0):.2f} ms") print(f"Batch Throughput: {1000 / stats.get('mean_batch_time_ms', float('inf')):.2f} batches/second") + # Print recall statistics + print("\nRECALL STATISTICS") + print("-" * 50) + + print(f"Recall@10:") + print(f" Mean: {stats.get('mean_recall_at_10', 0):.4f}") + print(f" Median: {stats.get('median_recall_at_10', 0):.4f}") + print(f" Min: {stats.get('min_recall_at_10', 0):.4f}") + print(f" Max: {stats.get('max_recall_at_10', 0):.4f}") + + print(f"Recall@5:") + print(f" Mean: {stats.get('mean_recall_at_5', 0):.4f}") + print(f" Median: {stats.get('median_recall_at_5', 0):.4f}") + print(f" Min: {stats.get('min_recall_at_5', 0):.4f}") + print(f" Max: {stats.get('max_recall_at_5', 0):.4f}") + + print(f"Recall@1:") + print(f" Mean: {stats.get('mean_recall_at_1', 0):.4f}") + print(f" Median: {stats.get('median_recall_at_1', 0):.4f}") + print(f" Min: {stats.get('min_recall_at_1', 0):.4f}") + print(f" Max: {stats.get('max_recall_at_1', 0):.4f}") + # Print disk I/O statistics print("\nDISK I/O DURING BENCHMARK") print("-" * 50) @@ -665,4 +790,4 @@ def main(): if __name__ == "__main__": - main() \ No newline at end of file + main()