Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ publish:
whoami: asf-site

github:
description: "Apache GeaFlow: A Streaming Graph Computing Engine."
description: "Apache GeaFlow (Incubating): A Streaming Graph Computing Engine."
homepage: https://geaflow.apache.org/
features:
issues: true
Expand Down
89 changes: 41 additions & 48 deletions blog/27.md

Large diffs are not rendered by default.

73 changes: 35 additions & 38 deletions blog/28.md
Original file line number Diff line number Diff line change
@@ -1,49 +1,49 @@
---
title: 流图计算之增量match原理与应用
title: Principles and Applications of Incremental Match in Streaming Graph Computing
date: 2025-6-3
---

![](https://intranetproxy.alipay.com/skylark/lark/0/2025/png/23857192/1743162676746-973d8e75-11b5-43d7-8832-724e7332b964.png)

## 问题背景

在流式计算中,数据往往不是全部一批到来,而会源源不断地进行输入和计算,在图计算/图查询领域,也存在类似的场景,图的点边不断地从数据源读取,进行构图,从而形成增量图。在增量图查询中,图随时发生着变化,在不同的图版本中,进行图查询的结果也会有所不同。对于某一次新增的点边,构成了一个新的版本的图,如果重新对全图(即当前所有点边)进行图遍历,开销较大,并且也会和历史数据有重复。由于历史的数据已经计算过一遍,理想情况下,只需要对增量所影响的部分进行计算/查询,而不需要对全图重新进行查询。
## Problem Background
In streaming computing, data rarely arrives all at once but is continuously input and processed. Similarly, in graph computing/graph querying scenarios, vertices and edges are constantly read from data sources to construct graphs incrementally. In incremental graph queries, the graph evolves continuously, leading to different query results across graph versions. When new vertices/edges form an updated graph version, recomputing through the entire graph incurs high overhead and duplicates historical computations. Since historical data has already been processed, ideally only the delta-affected portions should be computed/queried without full-graph re-execution.

<!-- truncate -->

<font style="color:rgb(51, 51, 51);">GQLGraph Query Language</font><font style="color:rgb(0, 0, 0);">是国际标准化组织(ISO)为标准化图查询语言所制定的一个标准,</font><font style="color:rgb(51, 51, 51);">用于在图上执行查询的语言。Geaflow 是蚂蚁图计算团队开源的流图计算引擎,专注于处理动态变化的图数据,支持大规模、高并发的实时图计算场景。</font>本文将介绍在 Geaflow 引擎中,对增量图使用 GQL 进行增量 Match 的方法,目的尽可能地只对增量的数据进行查询,避免冗余的全量计算。
<font style="color:rgb(51, 51, 51);">GQL (Graph Query Language)</font> <font style="color:rgb(0, 0, 0);">is an international standard developed by ISO for graph query languages,</font> <font style="color:rgb(51, 51, 51);">used to execute queries on graphs. Geaflow is an open-source streaming graph engine by Ant Group’s graph computing team, specializing in dynamically changing graph data and supporting large-scale, high-concurrency real-time graph computing scenarios.</font> This article introduces Geaflow’s approach to incremental GQL-based Match queries on dynamic graphs, aiming to execute queries solely on delta data while avoiding redundant full computations.

![画板](https://intranetproxy.alipay.com/skylark/lark/0/2025/jpeg/23857192/1741574572676-ff7e2c56-14d0-470c-b21d-604f928c6ec9.jpeg)

## 当前问题

<font style="color:rgb(0, 0, 0);">Geaflow 引擎基于点中心框架(vertex center),通过迭代的方式,每一轮迭代中,每个点向其他点发送消息,并在下一轮收到消息时进行处理、分析。</font>在 Geaflow 的框架中,GQL 的查询需要从前往后进行 Traversal 遍历走图,即从起始节点开始出发,进行扩散,依次进行点边匹配,直到匹配到所需要的查询 pattern。在动态图里场景,如果只使用当前批次新增的点边触发计算,增量的结果会有缺失,例如下面例子所示。

<div style="text-align: center;">
<img src="https://intranetproxy.alipay.com/skylark/lark/0/2025/jpeg/23857192/1741576149930-b169b7da-0600-4fca-b6ad-5eadcfdbff5b.jpeg" alt='画板' height="281" width="486"></div>

如上问题关键在于如果只考虑增量的部分,则点 A1 无法触发计算,但是点 A1 实际包含于增量结果中。所以需要设法让点 A1 参与计算,我们考虑一种从新增点扩充子图的方法,将 a 触发。将整个查询分为 2 个阶段,Evolve 扩展阶段和 Traversal 阶段。在 Evolve 阶段中,从起始点开始,向邻居发送 EvolveMessage,后续的 iteration 中,收到 EvolveMessage 的点加入到 EvolveVertices 集合中。而后的 Traversal 阶段则会使用 EvolveVertices 里的点触发遍历,即表示当前窗口的触发点。
## Current Challenges
<font style="color:rgb(0, 0, 0);">The Geaflow engine adopts a vertex-centric framework, where each vertex sends messages iteratively. Vertices process received messages in subsequent iterations.</font> For GQL queries, traversal starts from initial vertices for pattern matching (e.g., from node `A` to `B` to `C`). In dynamic graphs, if only newly added vertices/edges trigger computation, results may be incomplete, as illustrated below:

## 方案步骤
<div style="text-align: center;">
<img src="https://intranetproxy.alipay.com/skylark/lark/0/2025/jpeg/23857192/1741576149930-b169b7da-0600-4fca-b6ad-5eadcfdbff5b.jpeg" alt='画板' height="281" width="486">
</div>

整体流程示例图如下:
The key issue is that **Vertex A1 cannot trigger computation if only the delta is considered**, yet it belongs to the incremental results. To resolve this, we propose a subgraph expansion method from new vertices. The query is divided into two phases:
1. **Evolve Phase**: Propagate `EvolveMessage` from new vertices to neighbors, adding recipients to the `EvolveVertices` set.
2. **Traversal Phase**: Use `EvolveVertices` as traversal triggers for the current window.

## Solution Workflow
Overall process:
![](https://intranetproxy.alipay.com/skylark/lark/0/2025/png/23857192/1741599519420-37fd1d9f-6623-44b3-87e4-5ac5275b876f.png)

1. 首先得到 query 的计划的迭代次数 N,需向外扩充 N-1 度(<font style="color:#000000;">maxEvolveIteration=N-1)</font>,即可覆盖当前 query。框架的最大迭代数将设置为 N + maxEvolveIteration(N>2)
**Steps:**
1. Determine the query’s iteration count `N`. Expand `N-1` hops outward (`maxEvolveIteration = N-1`) to cover the query. The max iteration becomes `N + maxEvolveIteration` (when `N>2`).

```sql
例如
match(a)迭代数为1,此时不需要Evolve逻辑
match(a)-[e]->(b)迭代数为2,此时不需要Evolve逻辑
match(a)-[e]->(b)->[e2]->(c)迭代数为3 最大迭代数5
For example
match(a) iteration count is 1, no Evolve logic needed at this time
match(a)-[e]->(b) iteration count is 2, no Evolve logic needed at this time
match(a)-[e]->(b)->[e2]->(c) iteration count is 3, maximum iteration count is 5
```

2. 由于当迭代数较大时,扩充子图可能可能扩充到全图,设置一个阈值 T, 当 N<=T 才执行这个增量逻辑。
3. 在每个 window 数据加入图中后,对于新增的点边,每个点会向邻居发送 EvolveVertexMessage,执行 N-1 次迭代,将 N-1 度子图扩充进来。<font style="color:#000000;">即当前迭代小于 maxEvolveIteration(N-1)时,发送 EvolveVertexMessage。</font>
4. 每个点在向邻居点发送 EvolveMessage 时,需要将自己的 id 放在消息中,收到消息的点记录其发送点的 id, 添加到 targetIdList,在后续 traversal 阶段中使用。此步骤作用是下游节点将增量信息反向传递给上游,上游点在进行遍历时可以得知下游的增量影响部分,从而只遍历这些含有动态信息的下游点,而不需要再遍历所有邻居点。
2. Set threshold `T`: Only execute incremental logic when `N <= T` (to avoid expanding to the full graph).
3. After new window data is added, vertices send `EvolveVertexMessage` to neighbors for `N-1` iterations.
4. When sending `EvolveMessage`, vertices include their ID. Receiving vertices store these IDs in `targetIdList` for later traversal. This propagates delta information upstream, allowing vertices to traverse only neighbors affected by changes.

反向扩展的主要逻辑在 GeaFlowDynamicVCTraversalFunction 中,GeaFlowDynamicVCTraversalFunction 继承自 IncVertexCentricFunction,在 Geaflow 中 IncVertexCentricFunction 是一个表示增量 VC 方法(点中心)的接口,在每次迭代中,都会对当前收到消息的点进行触发,执行 compute 方法中的逻辑。
Core logic in `GeaFlowDynamicVCTraversalFunction`:

```java
@Override
Expand Down Expand Up @@ -72,20 +72,17 @@ public void compute(Object vertexId, Iterator<MessageBox> messageIterator) {
}
```

具体示例如下:

**Visualization:**
![画板](https://intranetproxy.alipay.com/skylark/lark/0/2024/jpeg/23857192/1734590557540-5f3f4528-fa07-4208-8425-bc514ea5e06b.jpeg)

总结进行 Evolve 扩展的条件:

1. query 的迭代次数>2:当 match 小于两跳时不需要 Evolve。
2. query 的迭代次数<=Threshold:如果迭代数太多可能扩展到全图。
3. windowId>1:第一次构图不需要进行 Evolve 阶段。
4. GQL 语句中没有起始点:如果有起始点,则只需使用起始点计算,不需要扩展子图,例如查询语句 Match(a:person where a.id = 1))return a.name。

## Demo 示例
**Evolve Conditions:**
- Query iterations `>2` (no Evolve needed for ≤2 hops).
- Query iterations `≤ Threshold`.
- `windowId >1` (skip initial graph construction).
- No starting vertex filter in GQL (e.g., `Match(a:person where a.id=1)` excludes Evolve).

在 Geaflow 中,通过设置点表或边表的 windowSize 来默认实现增量逻辑,即每一批读入 windowSize 大小的点边数据,来构建增量图。
## Demo
In Geaflow, configure incremental graphs via `windowSize` for vertex/edge tables:

```sql
CREATE GRAPH modern (
Expand Down Expand Up @@ -159,8 +156,8 @@ INSERT INTO tbl_result
;
```

在 Demo 中,设置点 windowSize 为 20,边 windowSize 为 3,即构图时每个 window 导入 20 个点,3 条边。并执行 3 跳的查询语句。**示例 Demo 在 IncrMatchTest.java 中, 可直接运行执行 Demo。**
In this demo, vertex window size is 20, and edge window size is 3, meaning each window loads 20 vertices and 3 edges. A 3-hop query is executed. The demo is available in IncrMatchTest.java and can be run directly.

## 总结和展望
## Conclusion and Outlook

<font style="color:rgb(0, 0, 0);">在动态图/流图的场景中,图的点边是在实时变化的,在进行图查询时,对于不同窗口数据的图,我们往往可以根据一些历史信息,只对增量的部分触发计算,来进行增量地计算,避免触发全图的遍历。Geaflow 使用了一种基于子图扩展的增量 match 方法,应用于点中心分布式图计算框架,在动态图场景下进行增量的查询,未来期望实现更多更复杂场景下的增量匹配逻辑。</font>
In dynamic/streaming graph scenarios, graph nodes and edges change in real time. When querying such graphs, we can often trigger computation only on the incremental part using historical information, avoiding full graph traversal. Geaflow uses a subgraph expansion-based incremental match method, applied within a vertex-centric distributed graph computing framework, to support incremental querying in dynamic graph scenarios. In the future, we aim to implement more complex incremental matching logic for advanced use cases.
Loading