Skip to content

Latest commit

 

History

History
684 lines (540 loc) · 16.3 KB

File metadata and controls

684 lines (540 loc) · 16.3 KB

Query Engine

This document explains HugeGraph Store's query processing capabilities, including query pushdown, multi-partition queries, and gRPC API reference.

Table of Contents


Query Processing Overview

HugeGraph Store implements advanced query processing to minimize network traffic and improve performance.

Query Execution Flow

1. [Server] Receives Graph Query (e.g., g.V().has('age', 30))
   ↓
2. [Server] Translates to Store Query
   - Determine table (vertices, edges, indexes)
   - Extract filters (age == 30)
   - Identify partitions (all or specific)
   ↓
3. [Server] Sends Query to Store Nodes (parallel)
   - Query includes filters and aggregations
   - One request per partition
   ↓
4. [Store] Executes Query Locally
   - Apply filters at RocksDB scan level
   - Compute aggregations if requested
   - Stream results back to Server
   ↓
5. [Server] Merges Results
   - Deduplicate if needed
   - Apply final ordering/limiting
   ↓
6. [Server] Returns to Client

Key Capabilities

  • Filter Pushdown: Filters applied at Store nodes (not at Server)
  • Aggregation Pushdown: COUNT, SUM, MIN, MAX, AVG computed at Store
  • Index Pushdown: Index scans executed at Store
  • Streaming: Large result sets streamed (no full materialization)
  • Parallel Execution: Queries across multiple partitions run concurrently

Query Pushdown

Filter Pushdown

Supported Filters:

  • Equality: key == value
  • Range: key > value, key >= value, key < value, key <= value
  • Prefix: key.startsWith(prefix)
  • IN: key in [value1, value2, ...]

Example: Query vertices with age = 30

Without Pushdown (inefficient):

1. Store scans all vertices
2. Sends all vertices to Server
3. Server filters age == 30
Network: Transfers all vertices

With Pushdown (efficient):

1. Server sends filter: age == 30
2. Store scans and filters locally
3. Store sends only matching vertices
Network: Transfers only matching vertices

Implementation (proto definition):

File: hg-store-grpc/src/main/proto/query.proto

message Condition {
  string key = 1;                // Property key
  ConditionType type = 2;        // EQ, GT, LT, GTE, LTE, IN, PREFIX
  bytes value = 3;               // Value to compare
  repeated bytes values = 4;     // For IN operator
}

enum ConditionType {
  EQ = 0;        // Equals
  GT = 1;        // Greater than
  LT = 2;        // Less than
  GTE = 3;       // Greater than or equal
  LTE = 4;       // Less than or equal
  IN = 5;        // In list
  PREFIX = 6;    // Prefix match
}

Code Example (using client API):

import org.apache.hugegraph.store.client.HgStoreQuery;
import org.apache.hugegraph.store.client.HgStoreQuery.Condition;

// Build query with filter
HgStoreQuery query = HgStoreQuery.builder()
    .table("VERTEX")
    .filter(Condition.eq("age", 30))
    .build();

// Execute (filter applied at Store)
HgStoreResultSet results = session.query(query);

Aggregation Pushdown

Supported Aggregations:

  • COUNT: Count matching rows
  • SUM: Sum numeric property
  • MIN: Minimum value
  • MAX: Maximum value
  • AVG: Average value

Example: Count vertices with label "person"

Without Pushdown:

1. Store sends all person vertices to Server
2. Server counts vertices
Network: Transfers millions of vertices

With Pushdown:

1. Server sends COUNT query
2. Store counts locally
3. Store sends count (single number)
Network: Transfers 8 bytes

Proto Definition:

message QueryRequest {
  string table = 1;
  bytes start_key = 2;
  bytes end_key = 3;
  repeated Condition conditions = 4;
  AggregationType aggregation = 5;    // COUNT, SUM, MIN, MAX, AVG
  string aggregation_key = 6;          // Property to aggregate
  int64 limit = 7;
}

enum AggregationType {
  NONE = 0;
  COUNT = 1;
  SUM = 2;
  MIN = 3;
  MAX = 4;
  AVG = 5;
}

Code Example:

// Count query
HgStoreQuery query = HgStoreQuery.builder()
    .table("VERTEX")
    .prefix("person:")
    .aggregation(HgStoreQuery.Aggregation.COUNT)
    .build();

long count = session.aggregate(query);
System.out.println("Person count: " + count);

// Sum query
HgStoreQuery sumQuery = HgStoreQuery.builder()
    .table("VERTEX")
    .prefix("person:")
    .aggregation(HgStoreQuery.Aggregation.SUM)
    .aggregationKey("age")
    .build();

long totalAge = session.aggregate(sumQuery);

Index Pushdown

HugeGraph Server creates indexes as separate tables in Store. Store can directly scan index tables.

Index Types:

  1. Secondary Index: Index on property values
  2. Range Index: Index for range queries
  3. Search Index: Full-text search index (if enabled)

Example: Query by indexed property

// Server creates index (one-time setup)
// schema.indexLabel("personByName").onV("person").by("name").secondary().create()

// Query using index
// Server translates to index table scan
HgStoreQuery query = HgStoreQuery.builder()
    .table("INDEX_personByName")       // Index table
    .filter(Condition.eq("name", "Alice"))
    .build();

// Store scans index table directly (fast)
HgStoreResultSet results = session.query(query);

Index Table Structure:

Key: <index_value>:<vertex_id>
Value: <vertex_id>

Example:
Key: "Alice:V1001" → Value: "V1001"
Key: "Bob:V1002"   → Value: "V1002"

Multi-Partition Queries

Partition-Aware Routing

Store distributes data across partitions using hash-based partitioning. Queries may target:

  1. Single Partition: When key or hash is known
  2. Multiple Partitions: When scanning by label or property

Example: Single-partition query

// Get vertex by ID (single partition)
String vertexId = "person:1001";
int hash = MurmurHash3.hash32(vertexId);
int partitionId = hash % totalPartitions;

// Server routes to specific partition
HgStoreQuery query = HgStoreQuery.builder()
    .table("VERTEX")
    .partitionId(partitionId)
    .key(vertexId.getBytes())
    .build();

byte[] value = session.get(query);

Example: Multi-partition query

// Scan all person vertices (all partitions)
HgStoreQuery query = HgStoreQuery.builder()
    .table("VERTEX")
    .prefix("person:")
    .build();

// Server sends query to ALL partitions in parallel
HgStoreResultSet results = session.queryAll(query);

Multi-Partition Iterator

Implementation: hg-store-core/src/main/java/.../business/MultiPartitionIterator.java

Purpose: Merge results from multiple partitions

Deduplication Modes:

  1. NONE: No deduplication (fastest, may have duplicates)
  2. DEDUP: Basic deduplication using hash set
  3. LIMIT_DEDUP: Deduplicate up to limit (for top-K queries)
  4. PRECISE_DEDUP: Full deduplication with sorted merge

Code Example:

import org.apache.hugegraph.store.client.DeduplicationMode;

// Query with deduplication
HgStoreQuery query = HgStoreQuery.builder()
    .table("VERTEX")
    .prefix("person:")
    .limit(100)
    .deduplicationMode(DeduplicationMode.LIMIT_DEDUP)
    .build();

HgStoreResultSet results = session.queryAll(query);

Algorithm (LIMIT_DEDUP):

1. Iterate partition 1, add results to set (up to limit)
2. Iterate partition 2, skip duplicates, add new results
3. Continue until limit reached or all partitions exhausted
4. Return deduplicated results

Parallel Query Execution

Flow:

Server                           Store 1        Store 2        Store 3
  |                                 |              |              |
  |---Query(partition 1-4)--------->|              |              |
  |---Query(partition 5-8)--------------------->|              |
  |---Query(partition 9-12)------------------------------>|
  |                                 |              |              |
  |<---Results (partitions 1-4)-----|              |              |
  |<---Results (partitions 5-8)----------------|              |
  |<---Results (partitions 9-12)---------------------------|
  |                                 |              |              |
  |-Merge results                   |              |              |
  |                                 |              |              |

Performance:

  • 3 Store nodes: 3x parallelism
  • 12 partitions: Up to 12x parallelism (if evenly distributed)
  • Network: 3 concurrent gRPC streams

gRPC API Reference

HgStoreSession Service

File: hg-store-grpc/src/main/proto/store_session.proto

service HgStoreSession {
  // Get single key
  rpc Get(GetRequest) returns (GetResponse);

  // Batch get
  rpc BatchGet(BatchGetRequest) returns (stream BatchGetResponse);

  // Put/Delete/Batch operations
  rpc Batch(stream BatchRequest) returns (BatchResponse);

  // Scan range
  rpc ScanTable(ScanTableRequest) returns (stream ScanResponse);

  // Table operations
  rpc Table(TableRequest) returns (TableResponse);

  // Clean/Truncate
  rpc Clean(CleanRequest) returns (CleanResponse);
}

GetRequest:

message GetRequest {
  Header header = 1;              // Graph name, table name
  bytes key = 2;                  // Key to get
}

ScanTableRequest:

message ScanTableRequest {
  Header header = 1;
  bytes start_key = 2;            // Start of range
  bytes end_key = 3;              // End of range (exclusive)
  int64 limit = 4;                // Max results
  ScanMethod scan_method = 5;     // ALL, PREFIX, RANGE
  bytes prefix = 6;               // For PREFIX scan
}

enum ScanMethod {
  ALL = 0;       // Scan all keys
  PREFIX = 1;    // Scan keys with prefix
  RANGE = 2;     // Scan key range
}

QueryService

File: hg-store-grpc/src/main/proto/query.proto

service QueryService {
  // Execute query with filters
  rpc Query(QueryRequest) returns (stream QueryResponse);

  // Execute query (alternative API)
  rpc Query0(QueryRequest) returns (stream QueryResponse);

  // Aggregate query (COUNT, SUM, etc.)
  rpc Count(QueryRequest) returns (CountResponse);
}

QueryRequest:

message QueryRequest {
  Header header = 1;               // Graph name, partition ID
  string table = 2;                // Table name
  bytes start_key = 3;             // Scan start
  bytes end_key = 4;               // Scan end
  repeated Condition conditions = 5;  // Filters
  AggregationType aggregation = 6;    // COUNT, SUM, MIN, MAX, AVG
  string aggregation_key = 7;         // Property to aggregate
  int64 limit = 8;                    // Max results
  ScanType scan_type = 9;             // TABLE_SCAN, PRIMARY_SCAN, INDEX_SCAN
}

enum ScanType {
  TABLE_SCAN = 0;      // Full table scan
  PRIMARY_SCAN = 1;    // Primary key lookup
  INDEX_SCAN = 2;      // Index scan
}

QueryResponse:

message QueryResponse {
  repeated KV data = 1;            // Key-value pairs
  bool has_more = 2;               // More results available
  bytes continuation_token = 3;    // For pagination
}

CountResponse:

message CountResponse {
  int64 count = 1;                 // Aggregation result
}

GraphStore Service

File: hg-store-grpc/src/main/proto/graphpb.proto

service GraphStore {
  // Scan partition (graph-specific API)
  rpc ScanPartition(ScanPartitionRequest) returns (stream ScanPartitionResponse);
}

ScanPartitionRequest:

message ScanPartitionRequest {
  Header header = 1;
  int32 partition_id = 2;          // Partition to scan
  ScanType scan_type = 3;          // SCAN_VERTEX, SCAN_EDGE
  bytes start_key = 4;
  bytes end_key = 5;
  int64 limit = 6;
}

enum ScanType {
  SCAN_VERTEX = 0;
  SCAN_EDGE = 1;
  SCAN_ALL = 2;
}

ScanPartitionResponse:

message ScanPartitionResponse {
  repeated Vertex vertices = 1;    // Vertex results
  repeated Edge edges = 2;         // Edge results
}

message Vertex {
  bytes id = 1;                    // Vertex ID
  string label = 2;                // Vertex label
  repeated Property properties = 3;
}

message Edge {
  bytes id = 1;
  string label = 2;
  bytes source_id = 3;             // Source vertex ID
  bytes target_id = 4;             // Target vertex ID
  repeated Property properties = 5;
}

message Property {
  string key = 1;                  // Property name
  Variant value = 2;               // Property value
}

message Variant {
  oneof value {
    int32 int_value = 1;
    int64 long_value = 2;
    float float_value = 3;
    double double_value = 4;
    string string_value = 5;
    bytes bytes_value = 6;
    bool bool_value = 7;
  }
}

HgStoreState Service

File: hg-store-grpc/src/main/proto/store_state.proto

service HgStoreState {
  // Subscribe to state changes
  rpc SubState(SubStateRequest) returns (stream StateResponse);

  // Unsubscribe
  rpc UnsubState(UnsubStateRequest) returns (UnsubStateResponse);

  // Get partition scan state
  rpc GetScanState(GetScanStateRequest) returns (GetScanStateResponse);

  // Get Raft peers
  rpc GetPeers(GetPeersRequest) returns (GetPeersResponse);
}

StateResponse:

message StateResponse {
  NodeState node_state = 1;        // Store node state
  repeated PartitionState partition_states = 2;
}

message NodeState {
  int64 store_id = 1;
  string address = 2;
  NodeStateType state = 3;         // STARTING, ONLINE, PAUSE, STOPPING, ERROR
}

enum NodeStateType {
  STARTING = 0;
  STANDBY = 1;
  ONLINE = 2;
  PAUSE = 3;
  STOPPING = 4;
  HALTED = 5;
  ERROR = 6;
}

message PartitionState {
  int32 partition_id = 1;
  int64 leader_term = 2;
  string leader_address = 3;
  PartitionStateType state = 4;
}

Query Optimization

Best Practices

1. Use Indexes for Selective Queries

Inefficient (full scan):

// g.V().has('name', 'Alice')  # Scans all vertices

Efficient (index scan):

// Create index first
schema.indexLabel("personByName").onV("person").by("name").secondary().create()

// Query uses index
// g.V().has('name', 'Alice')  # Scans index only

2. Limit Results Early

Inefficient:

// Fetches all, then limits at Server
HgStoreQuery query = HgStoreQuery.builder()
    .table("VERTEX")
    .prefix("person:")
    .build();

HgStoreResultSet results = session.query(query);
// Server limits to 100 after fetching all

Efficient:

// Limits at Store
HgStoreQuery query = HgStoreQuery.builder()
    .table("VERTEX")
    .prefix("person:")
    .limit(100)  // Store limits before sending
    .build();

HgStoreResultSet results = session.query(query);

3. Use Aggregations for Counts

Inefficient:

// Fetch all, count at Server
HgStoreResultSet results = session.query(query);
int count = 0;
while (results.hasNext()) {
    results.next();
    count++;
}

Efficient:

// Count at Store
HgStoreQuery query = HgStoreQuery.builder()
    .table("VERTEX")
    .prefix("person:")
    .aggregation(HgStoreQuery.Aggregation.COUNT)
    .build();

long count = session.aggregate(query);

4. Batch Reads

Inefficient (N sequential requests):

for (String id : vertexIds) {
    byte[] value = session.get(table, id.getBytes());
}

Efficient (single batch request):

List<byte[]> keys = vertexIds.stream()
    .map(String::getBytes)
    .collect(Collectors.toList());

Map<byte[], byte[]> results = session.batchGet(table, keys);

Query Performance Metrics

Typical Latencies (production cluster, 3 Store nodes):

Operation Single Partition Multi-Partition (12 partitions)
Get by key 1-2 ms N/A
Scan 100 rows 5-10 ms 15-30 ms
Scan 10,000 rows 50-100 ms 100-200 ms
Count (no filter) 10-20 ms 30-60 ms
Count (with filter) 20-50 ms 50-150 ms
Index scan (100 rows) 3-8 ms 10-25 ms

Factors Affecting Performance:

  • Network latency between Server and Store
  • RocksDB read amplification (depends on compaction)
  • Number of partitions (more partitions = more parallel work but more overhead)
  • Filter selectivity (fewer matches = faster)

For operational monitoring of query performance, see Operations Guide.

For RocksDB tuning to improve query speed, see Best Practices.