Skip to content

Conversation

@platinumhamburg
Copy link
Contributor

Purpose

Linked issue: close #2254

Brief change log

Tests

API and Format

Documentation

@platinumhamburg platinumhamburg changed the title basic aggregate merge engine support Implement basic Aggregate Merge Engine Dec 25, 2025
@platinumhamburg platinumhamburg force-pushed the base-agg branch 21 times, most recently from 907559e to e7d0af7 Compare December 26, 2025 06:45
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements the Aggregation Merge Engine for Fluss, which allows field-level aggregation of rows with the same primary key. The implementation includes 12 aggregate functions (sum, product, max, min, last_value, last_value_ignore_nulls, first_value, first_value_ignore_nulls, listagg, string_agg, bool_and, bool_or) with comprehensive schema evolution support.

Key changes:

  • Core aggregation engine with field-level aggregators and caching
  • Schema API enhancements to support aggregation function configuration
  • Comprehensive documentation with examples for all supported functions
  • Utility classes for string concatenation and value comparison

Reviewed changes

Copilot reviewed 56 out of 56 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
website/docs/table-design/table-types/pk-table/merge-engines/aggregation.md Comprehensive documentation for the Aggregation Merge Engine with examples
fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/AggregateRowMerger.java Main row merger implementation with partial update support
fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/*.java Aggregation context, caching, and field processing logic
fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/functions/*.java Field aggregator implementations for all functions
fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/*.java Factory classes for creating aggregators via SPI
fluss-common/src/main/java/org/apache/fluss/metadata/AggFunction.java Enum defining all supported aggregation functions
fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java Schema enhancements to support aggregation functions
fluss-common/src/main/java/org/apache/fluss/utils/*.java Utility classes for string operations and comparisons
fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java New configuration options for aggregation
Test files Comprehensive test coverage for all components

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @platinumhamburg , I didn't complete the review. I gave some feedbacks first after reading the documentation.

@platinumhamburg
Copy link
Contributor Author

Thanks @platinumhamburg , I didn't complete the review. I gave some feedbacks first after reading the documentation.

@wuchong I've refactored both the code and documentation based on the feedback. Thanks for review again.

Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Certainly! Here's a more polished, professional, and clear version of your message:


Thanks @platinumhamburg for the excellent work!

The code is well-designed, thoroughly tested, and clearly documented.

I’ve left some comments. One additional suggestion: could we add sanity checks for the parameters of AggregateFunctions when creating an aggregate merge table?

Currently, there’s no validation of the parameters being passed. Since these are persisted to ZooKeeper, malformed or incorrect inputs (e.g., typos in parameter keys) could lead to subtle runtime issues during data writes.

Adding upfront validation would improve robustness and user experience.

*/
public static AggFunction LISTAGG(String delimiter) {
Map<String, String> params = new HashMap<>();
params.put("delimiter", delimiter);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add delimiter as a public static constant to enable easy and consistent referencing across the codebase.

*
* @since 0.9
*/
AGGREGATE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AGGREGATION

}

@Test
void testSchemaWithAggregationFunctions() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you have added SCHEMA_WITH_AGG into createObjects(), this json serialization and deserialization test has already been verified by parent org.apache.fluss.utils.json.JsonSerdeTestBase#testJsonSerde. So I think we can remove this test method.

}

@Test
void testSchemaWithoutAggregationFunctions() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. we can remove it.

Schema.newBuilder()
.column("product_id", DataTypes.BIGINT().copy(false))
.column("total_sales", DataTypes.BIGINT(), AggFunctions.SUM())
.column("max_price", DataTypes.DECIMAL(10, 2), AggFunctions.MAX())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add aggregate functions that has parameters, like LISTAGG, to verify the json format of such agg functions.

.primaryKey("id")
.build();

TableConfig tableConfig = new TableConfig(new Configuration());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can be removed.

.primaryKey("id1", "id2")
.build();

TableConfig tableConfig = new TableConfig(new Configuration());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

.build();

// No configuration at all
TableConfig tableConfig = new TableConfig(new Configuration());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

.primaryKey("id")
.build();

TableConfig tableConfig = new TableConfig(new Configuration());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

.build();

Configuration conf = new Configuration();
conf.setString("table.merge-engine.aggregate.value.listagg-delimiter", "|");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replace to define delimiter in schema

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[server] Implement basic Aggregate Merge Engine

2 participants