diff --git a/docs/content/append-table/incremental-clustering.md b/docs/content/append-table/incremental-clustering.md
index 0ca3462e1682..be247acb2ee2 100644
--- a/docs/content/append-table/incremental-clustering.md
+++ b/docs/content/append-table/incremental-clustering.md
@@ -39,12 +39,15 @@ gradually converges to an optimal state, significantly reducing the decision-mak
Incremental Clustering supports:
-- Support incremental clustering; minimizing write amplification as possible.
-- Support small-file compaction; during rewrites, respect target-file-size.
-- Support changing clustering keys; newly ingested data is clustered according to the latest clustering keys.
-- Provide a full mode; when selected, the entire dataset will be reclustered.
+- Support incremental clustering: minimizing write amplification as possible.
+- Support small-file compaction: during rewrites, respect target-file-size.
+- Support changing clustering keys: newly ingested data is clustered according to the latest clustering keys.
+- Provide a full mode: when in full mode, the entire dataset will be reclustered.
-**Only append unaware-bucket table supports Incremental Clustering.**
+
+**Note**: If data ordering within bucketed tables is not a concern, you can set `'bucket-append-ordered'` to `false`
+to disable this ordering requirement. This allows you to enable Incremental Clustering for bucketed tables,
+which can significantly improve query performance for specific query keys.
## Enable Incremental Clustering
@@ -85,6 +88,30 @@ To enable Incremental Clustering, the following configuration needs to be set fo
+For bucketed tables, you also need to set the following configuration:
+
+
+
+ | Option |
+ Value |
+ Required |
+ Type |
+ Description |
+
+
+
+
+ bucket-append-ordered |
+ false |
+ Yes |
+ Boolean |
+ Must be set to false to disable the ordering requirement for bucketed tables. Default is true. |
+
+
+
+
+
+
Once Incremental Clustering for a table is enabled, you can run Incremental Clustering in batch mode periodically
to continuously optimizes data layout of the table and deliver better query performance.
diff --git a/paimon-core/src/main/java/org/apache/paimon/append/cluster/HistoryPartitionCluster.java b/paimon-core/src/main/java/org/apache/paimon/append/cluster/HistoryPartitionCluster.java
index 8acbb8e7785a..7a3f3c494a79 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/cluster/HistoryPartitionCluster.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/cluster/HistoryPartitionCluster.java
@@ -38,7 +38,6 @@
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
-import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@@ -46,8 +45,10 @@
import java.util.Optional;
import java.util.stream.Collectors;
-import static org.apache.paimon.append.cluster.IncrementalClusterManager.constructPartitionLevels;
-import static org.apache.paimon.append.cluster.IncrementalClusterManager.logForPartitionLevel;
+import static org.apache.paimon.append.cluster.IncrementalClusterManager.constructBucketLevels;
+import static org.apache.paimon.append.cluster.IncrementalClusterManager.groupByPtAndBucket;
+import static org.apache.paimon.append.cluster.IncrementalClusterManager.logForCompactUnits;
+import static org.apache.paimon.append.cluster.IncrementalClusterManager.logForLevels;
/** Handle historical partition for full clustering. */
public class HistoryPartitionCluster {
@@ -111,23 +112,39 @@ public static HistoryPartitionCluster create(
limit);
}
- public Map pickForHistoryPartitions() {
- Map> partitionLevels =
+ public Map> createHistoryCompactUnits() {
+ Map>> partitionLevels =
constructLevelsForHistoryPartitions();
- logForPartitionLevel(partitionLevels, partitionComputer);
+ logForLevels(partitionLevels, partitionComputer);
- Map units = new HashMap<>();
+ Map> units = new HashMap<>();
partitionLevels.forEach(
- (k, v) -> {
- Optional pick =
- incrementalClusterStrategy.pick(maxLevel + 1, v, true);
- pick.ifPresent(compactUnit -> units.put(k, compactUnit));
+ (partition, bucketLevels) -> {
+ Map bucketUnits = new HashMap<>();
+ bucketLevels.forEach(
+ (bucket, levels) -> {
+ Optional pick =
+ incrementalClusterStrategy.pick(maxLevel + 1, levels, true);
+ pick.ifPresent(
+ compactUnit -> {
+ bucketUnits.put(bucket, compactUnit);
+ if (LOG.isDebugEnabled()) {
+ logForCompactUnits(
+ partitionComputer.generatePartValues(
+ partition),
+ bucket,
+ compactUnit);
+ }
+ });
+ });
+ units.put(partition, bucketUnits);
});
return units;
}
@VisibleForTesting
- public Map> constructLevelsForHistoryPartitions() {
+ public Map>>
+ constructLevelsForHistoryPartitions() {
long historyMilli =
LocalDateTime.now()
.minus(historyPartitionIdleTime)
@@ -152,23 +169,19 @@ public Map> constructLevelsForHistoryPartitions(
.read()
.dataSplits();
- Map> historyPartitionFiles = new HashMap<>();
- for (DataSplit dataSplit : historyDataSplits) {
- historyPartitionFiles
- .computeIfAbsent(dataSplit.partition(), k -> new ArrayList<>())
- .addAll(dataSplit.dataFiles());
- }
+ Map>> historyPartitionFiles =
+ groupByPtAndBucket(historyDataSplits);
return filterPartitions(historyPartitionFiles).entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
- entry -> constructPartitionLevels(entry.getValue())));
+ entry -> constructBucketLevels(entry.getValue())));
}
- private Map> filterPartitions(
- Map> partitionFiles) {
- Map> result = new HashMap<>();
+ private Map>> filterPartitions(
+ Map>> partitionFiles) {
+ Map>> result = new HashMap<>();
partitionFiles.forEach(
(part, files) -> {
if (specifiedPartitions.test(part)) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
index 1c8da3c03348..3688788b1385 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
@@ -84,8 +84,13 @@ public IncrementalClusterManager(FileStoreTable table) {
public IncrementalClusterManager(
FileStoreTable table, @Nullable PartitionPredicate specifiedPartitions) {
checkArgument(
- table.bucketMode() == BucketMode.BUCKET_UNAWARE,
- "only append unaware-bucket table support incremental clustering.");
+ table.primaryKeys().isEmpty(), "only append table support incremental clustering.");
+ if (table.bucketMode() == BucketMode.HASH_FIXED) {
+ checkArgument(
+ !table.coreOptions().bucketAppendOrdered(),
+ "%s must be false for incremental clustering table.",
+ CoreOptions.BUCKET_APPEND_ORDERED.key());
+ }
this.table = table;
CoreOptions options = table.coreOptions();
checkArgument(
@@ -123,167 +128,210 @@ public IncrementalClusterManager(
specifiedPartitions);
}
- public Map createCompactUnits(boolean fullCompaction) {
- // 1. construct LSM structure for each partition
- Map> partitionLevels = constructLevels();
- logForPartitionLevel(partitionLevels, partitionComputer);
+ public Map> createCompactUnits(boolean fullCompaction) {
+ // 1. construct LSM structure for each bucket
+ Map>> partitionLevels =
+ constructPartitionLevels();
+ if (LOG.isDebugEnabled()) {
+ logForLevels(partitionLevels, partitionComputer);
+ }
- // 2. pick files to be clustered for each partition
- Map units = new HashMap<>();
+ // 2. pick files to be clustered for each bucket
+ Map> units = new HashMap<>();
partitionLevels.forEach(
- (k, v) -> {
- Optional pick =
- incrementalClusterStrategy.pick(numLevels, v, fullCompaction);
- pick.ifPresent(compactUnit -> units.put(k, compactUnit));
+ (partition, bucketLevels) -> {
+ Map bucketUnits = new HashMap<>();
+ bucketLevels.forEach(
+ (bucket, levels) -> {
+ Optional pick =
+ incrementalClusterStrategy.pick(
+ numLevels, levels, fullCompaction);
+ pick.ifPresent(
+ compactUnit -> {
+ bucketUnits.put(bucket, compactUnit);
+ if (LOG.isDebugEnabled()) {
+ logForCompactUnits(
+ partitionComputer.generatePartValues(
+ partition),
+ bucket,
+ compactUnit);
+ }
+ });
+ });
+ units.put(partition, bucketUnits);
});
if (historyPartitionCluster != null) {
- units.putAll(historyPartitionCluster.pickForHistoryPartitions());
+ units.putAll(historyPartitionCluster.createHistoryCompactUnits());
}
- if (LOG.isDebugEnabled()) {
- units.forEach(
- (partition, compactUnit) -> {
- String filesInfo =
- compactUnit.files().stream()
- .map(
- file ->
- String.format(
- "%s,%s,%s",
- file.fileName(),
- file.level(),
- file.fileSize()))
- .collect(Collectors.joining(", "));
- LOG.debug(
- "Partition {}, outputLevel:{}, clustered with {} files: [{}]",
- partitionComputer.generatePartValues(partition),
- compactUnit.outputLevel(),
- compactUnit.files().size(),
- filesInfo);
- });
- }
return units;
}
- public Map> constructLevels() {
+ public Map>> constructPartitionLevels() {
List dataSplits = snapshotReader.read().dataSplits();
- Map> partitionFiles = new HashMap<>();
- for (DataSplit dataSplit : dataSplits) {
- partitionFiles
- .computeIfAbsent(dataSplit.partition(), k -> new ArrayList<>())
- .addAll(dataSplit.dataFiles());
- }
- return partitionFiles.entrySet().stream()
+ Map>> partitionLevels =
+ groupByPtAndBucket(dataSplits);
+
+ return partitionLevels.entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
- entry -> constructPartitionLevels(entry.getValue())));
+ entry -> constructBucketLevels(entry.getValue())));
}
- public static List constructPartitionLevels(List partitionFiles) {
- List partitionLevels = new ArrayList<>();
+ public static Map> constructBucketLevels(
+ Map> bucketLevels) {
+ return bucketLevels.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey, entry -> constructLevels(entry.getValue())));
+ }
+
+ public static List constructLevels(List files) {
+ List levels = new ArrayList<>();
Map> levelMap =
- partitionFiles.stream().collect(Collectors.groupingBy(DataFileMeta::level));
+ files.stream().collect(Collectors.groupingBy(DataFileMeta::level));
for (Map.Entry> entry : levelMap.entrySet()) {
int level = entry.getKey();
if (level == 0) {
for (DataFileMeta level0File : entry.getValue()) {
- partitionLevels.add(
- new LevelSortedRun(level, SortedRun.fromSingle(level0File)));
+ levels.add(new LevelSortedRun(level, SortedRun.fromSingle(level0File)));
}
} else {
// don't need to guarantee that the files within the same sorted run are
// non-overlapping here, so we call SortedRun.fromSorted() to avoid sorting and
// validation
- partitionLevels.add(
- new LevelSortedRun(level, SortedRun.fromSorted(entry.getValue())));
+ levels.add(new LevelSortedRun(level, SortedRun.fromSorted(entry.getValue())));
}
}
// sort by level
- partitionLevels.sort(Comparator.comparing(LevelSortedRun::level));
+ levels.sort(Comparator.comparing(LevelSortedRun::level));
+ return levels;
+ }
+
+ public static Map>> groupByPtAndBucket(
+ List dataSplits) {
+ Map>> partitionLevels = new HashMap<>();
+ for (DataSplit dataSplit : dataSplits) {
+ BinaryRow partition = dataSplit.partition();
+ Map> bucketLevels = partitionLevels.get(partition);
+ if (bucketLevels == null) {
+ bucketLevels = new HashMap<>();
+ partitionLevels.put(partition.copy(), bucketLevels);
+ }
+ bucketLevels
+ .computeIfAbsent(dataSplit.bucket(), k -> new ArrayList<>())
+ .addAll(dataSplit.dataFiles());
+ }
return partitionLevels;
}
- public Map, CommitMessage>> toSplitsAndRewriteDvFiles(
- Map compactUnits) {
- Map, CommitMessage>> result = new HashMap<>();
+ public Map, CommitMessage>>>
+ toSplitsAndRewriteDvFiles(
+ Map> compactUnits, BucketMode bucketMode) {
+ Map, CommitMessage>>> result = new HashMap<>();
boolean dvEnabled = table.coreOptions().deletionVectorsEnabled();
- for (BinaryRow partition : compactUnits.keySet()) {
- CompactUnit unit = compactUnits.get(partition);
- AppendDeleteFileMaintainer dvMaintainer =
- dvEnabled
- ? BaseAppendDeleteFileMaintainer.forUnawareAppend(
- table.store().newIndexFileHandler(), snapshot, partition)
- : null;
- List splits = new ArrayList<>();
-
- DataSplit.Builder builder =
- DataSplit.builder()
- .withPartition(partition)
- .withBucket(0)
- .withTotalBuckets(1)
- .isStreaming(false);
-
- SplitGenerator splitGenerator = snapshotReader.splitGenerator();
- List splitGroups =
- splitGenerator.splitForBatch(unit.files());
-
- for (SplitGenerator.SplitGroup splitGroup : splitGroups) {
- List dataFiles = splitGroup.files;
-
- String bucketPath =
- snapshotReader.pathFactory().bucketPath(partition, 0).toString();
- builder.withDataFiles(dataFiles)
- .rawConvertible(splitGroup.rawConvertible)
- .withBucketPath(bucketPath);
+ if (bucketMode == BucketMode.HASH_FIXED) {
+ checkArgument(
+ !dvEnabled,
+ "Clustering is not supported for fixed-bucket table enabled deletion-vector currently.");
+ }
- if (dvMaintainer != null) {
- List dataDeletionFiles = new ArrayList<>();
- for (DataFileMeta file : dataFiles) {
- DeletionFile deletionFile =
- dvMaintainer.notifyRemovedDeletionVector(file.fileName());
- dataDeletionFiles.add(deletionFile);
+ for (BinaryRow partition : compactUnits.keySet()) {
+ Map bucketUnits = compactUnits.get(partition);
+ Map, CommitMessage>> bucketSplits = new HashMap<>();
+ for (Integer bucket : bucketUnits.keySet()) {
+ CompactUnit unit = bucketUnits.get(bucket);
+ BaseAppendDeleteFileMaintainer dvMaintainer =
+ dvEnabled ? getDvMaintainer(bucketMode, partition, bucket) : null;
+ List splits = new ArrayList<>();
+
+ DataSplit.Builder builder =
+ DataSplit.builder()
+ .withPartition(partition)
+ .withBucket(bucket)
+ .withTotalBuckets(table.coreOptions().bucket())
+ .isStreaming(false);
+
+ SplitGenerator splitGenerator = snapshotReader.splitGenerator();
+ List splitGroups =
+ splitGenerator.splitForBatch(unit.files());
+
+ for (SplitGenerator.SplitGroup splitGroup : splitGroups) {
+ List dataFiles = splitGroup.files;
+
+ String bucketPath =
+ snapshotReader.pathFactory().bucketPath(partition, 0).toString();
+ builder.withDataFiles(dataFiles)
+ .rawConvertible(splitGroup.rawConvertible)
+ .withBucketPath(bucketPath);
+
+ if (dvMaintainer != null) {
+ List dataDeletionFiles = new ArrayList<>();
+ for (DataFileMeta file : dataFiles) {
+ DeletionFile deletionFile =
+ ((AppendDeleteFileMaintainer) dvMaintainer)
+ .notifyRemovedDeletionVector(file.fileName());
+ dataDeletionFiles.add(deletionFile);
+ }
+ builder.withDataDeletionFiles(dataDeletionFiles);
}
- builder.withDataDeletionFiles(dataDeletionFiles);
+ splits.add(builder.build());
}
- splits.add(builder.build());
- }
- // generate delete dv index meta
- CommitMessage dvCommitMessage = null;
- if (dvMaintainer != null) {
- List newIndexFiles = new ArrayList<>();
- List deletedIndexFiles = new ArrayList<>();
- List indexEntries = dvMaintainer.persist();
- for (IndexManifestEntry entry : indexEntries) {
- if (entry.kind() == FileKind.ADD) {
- newIndexFiles.add(entry.indexFile());
- } else {
- deletedIndexFiles.add(entry.indexFile());
+ // generate delete dv index meta
+ CommitMessage dvCommitMessage = null;
+ if (dvMaintainer != null) {
+ List newIndexFiles = new ArrayList<>();
+ List deletedIndexFiles = new ArrayList<>();
+ List indexEntries = dvMaintainer.persist();
+ for (IndexManifestEntry entry : indexEntries) {
+ if (entry.kind() == FileKind.ADD) {
+ newIndexFiles.add(entry.indexFile());
+ } else {
+ deletedIndexFiles.add(entry.indexFile());
+ }
}
+ dvCommitMessage =
+ new CommitMessageImpl(
+ dvMaintainer.getPartition(),
+ bucket,
+ table.coreOptions().bucket(),
+ DataIncrement.emptyIncrement(),
+ new CompactIncrement(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ newIndexFiles,
+ deletedIndexFiles));
}
- dvCommitMessage =
- new CommitMessageImpl(
- dvMaintainer.getPartition(),
- 0,
- table.coreOptions().bucket(),
- DataIncrement.emptyIncrement(),
- new CompactIncrement(
- Collections.emptyList(),
- Collections.emptyList(),
- Collections.emptyList(),
- newIndexFiles,
- deletedIndexFiles));
+
+ bucketSplits.put(bucket, Pair.of(splits, dvCommitMessage));
}
- result.put(partition, Pair.of(splits, dvCommitMessage));
+ result.put(partition, bucketSplits);
}
return result;
}
+ public BaseAppendDeleteFileMaintainer getDvMaintainer(
+ BucketMode bucketMode, BinaryRow partition, int bucket) {
+ switch (bucketMode) {
+ case HASH_FIXED:
+ // TODO: support dv for hash fixed bucket table
+ return null;
+ case BUCKET_UNAWARE:
+ return BaseAppendDeleteFileMaintainer.forUnawareAppend(
+ table.store().newIndexFileHandler(), snapshot, partition);
+ default:
+ throw new UnsupportedOperationException("Unsupported bucket mode: " + bucketMode);
+ }
+ }
+
public static List upgrade(
List filesAfterCluster, int outputLevel) {
return filesAfterCluster.stream()
@@ -291,28 +339,49 @@ public static List upgrade(
.collect(Collectors.toList());
}
- public static void logForPartitionLevel(
- Map> partitionLevels,
+ public static void logForLevels(
+ Map>> partitionLevels,
InternalRowPartitionComputer partitionComputer) {
- if (LOG.isDebugEnabled()) {
- partitionLevels.forEach(
- (partition, levelSortedRuns) -> {
- String runsInfo =
- levelSortedRuns.stream()
- .map(
- lsr ->
- String.format(
- "level-%s:%s",
- lsr.level(),
- lsr.run().files().size()))
- .collect(Collectors.joining(","));
- LOG.debug(
- "Partition {} has {} runs: [{}]",
- partitionComputer.generatePartValues(partition),
- levelSortedRuns.size(),
- runsInfo);
- });
- }
+ partitionLevels.forEach(
+ (partition, bucketLevels) -> {
+ bucketLevels.forEach(
+ (bucket, levels) -> {
+ String runsInfo =
+ levels.stream()
+ .map(
+ lsr ->
+ String.format(
+ "level-%s:%s",
+ lsr.level(),
+ lsr.run().files().size()))
+ .collect(Collectors.joining(","));
+ LOG.debug(
+ "Partition {}, bucket {} has {} runs: [{}]",
+ partitionComputer.generatePartValues(partition),
+ bucket,
+ levels.size(),
+ runsInfo);
+ });
+ });
+ }
+
+ public static void logForCompactUnits(
+ Map partition, int bucket, CompactUnit compactUnit) {
+ String filesInfo =
+ compactUnit.files().stream()
+ .map(
+ file ->
+ String.format(
+ "%s,%s,%s",
+ file.fileName(), file.level(), file.fileSize()))
+ .collect(Collectors.joining(", "));
+ LOG.debug(
+ "Partition {}, bucket {}, outputLevel:{}, clustered with {} files: [{}]",
+ partition,
+ bucket,
+ compactUnit.outputLevel(),
+ compactUnit.files().size(),
+ filesInfo);
}
public CoreOptions.OrderType clusterCurve() {
diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 56176f2b83dd..602923f7227f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -669,14 +669,19 @@ private static void validateRowTracking(TableSchema schema, CoreOptions options)
private static void validateIncrementalClustering(TableSchema schema, CoreOptions options) {
if (options.clusteringIncrementalEnabled()) {
- checkArgument(
- options.bucket() == -1,
- "Cannot define %s for incremental clustering table, it only support bucket = -1",
- CoreOptions.BUCKET.key());
checkArgument(
schema.primaryKeys().isEmpty(),
"Cannot define %s for incremental clustering table.",
PRIMARY_KEY.key());
+ if (options.bucket() != -1) {
+ checkArgument(
+ !options.bucketAppendOrdered(),
+ "%s must be false for incremental clustering table.",
+ CoreOptions.BUCKET_APPEND_ORDERED.key());
+ checkArgument(
+ !options.deletionVectorsEnabled(),
+ "Cannot enable deletion-vectors for incremental clustering table which bucket is not -1.");
+ }
}
}
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/append/cluster/HistoryPartitionClusterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/cluster/HistoryPartitionClusterTest.java
index f1f93ca989b2..12d0bd7099f7 100644
--- a/paimon-core/src/test/java/org/apache/paimon/append/cluster/HistoryPartitionClusterTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/append/cluster/HistoryPartitionClusterTest.java
@@ -80,7 +80,7 @@ public void testHistoryPartitionAutoClustering() throws Exception {
RowType.of(DataTypes.INT()),
Lists.newArrayList(BinaryRow.singleColumn("pt1"))))
.historyPartitionCluster();
- Map> partitionLevels =
+ Map>> partitionLevels =
historyPartitionCluster.constructLevelsForHistoryPartitions();
assertThat(partitionLevels.size()).isEqualTo(1);
assertThat(partitionLevels.get(BinaryRow.singleColumn("pt2"))).isNotEmpty();
diff --git a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
index 516fa63b55dc..36867a35bfa9 100644
--- a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
@@ -58,15 +58,21 @@ public class IncrementalClusterManagerTest {
@TempDir java.nio.file.Path tempDir;
@Test
- public void testNonUnAwareBucketTable() {
+ public void testCreateClusterTable() {
Map options = new HashMap<>();
options.put(CoreOptions.BUCKET.key(), "1");
options.put(CoreOptions.BUCKET_KEY.key(), "f0");
+ assertThatThrownBy(() -> createTable(options, Collections.emptyList()))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining(
+ "bucket-append-ordered must be false for incremental clustering table.");
+ options.put(CoreOptions.BUCKET_APPEND_ORDERED.key(), "false");
+ options.put(CoreOptions.DELETION_VECTORS_ENABLED.key(), "true");
assertThatThrownBy(() -> createTable(options, Collections.emptyList()))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining(
- "Cannot define bucket for incremental clustering table, it only support bucket = -1");
+ "Cannot enable deletion-vectors for incremental clustering table which bucket is not -1.");
}
@Test
@@ -82,7 +88,7 @@ public void testNonClusterIncremental() throws Exception {
}
@Test
- public void testConstructPartitionLevels() throws Exception {
+ public void testConstructLevels() throws Exception {
// Create a valid table for IncrementalClusterManager
Map options = new HashMap<>();
FileStoreTable table = createTable(options, Collections.emptyList());
@@ -108,8 +114,7 @@ public void testConstructPartitionLevels() throws Exception {
partitionFiles.add(level2File1);
// Call the method under test
- List result =
- incrementalClusterManager.constructPartitionLevels(partitionFiles);
+ List result = IncrementalClusterManager.constructLevels(partitionFiles);
// Verify the results
assertThat(result).hasSize(4); // 2 level-0 runs + 1 level-1 run + 1 level-2 run
@@ -191,7 +196,7 @@ public void testHistoryPartitionAutoClustering() throws Exception {
PartitionPredicate.fromMultiple(
RowType.of(DataTypes.INT()),
Lists.newArrayList(BinaryRow.singleColumn("pt3"))));
- Map partitionLevels =
+ Map> partitionLevels =
incrementalClusterManager.createCompactUnits(true);
assertThat(partitionLevels.size()).isEqualTo(2);
assertThat(partitionLevels.get(BinaryRow.singleColumn("pt1"))).isNotNull();
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 2e4442d9d741..ce24dd2319b7 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -154,7 +154,16 @@ protected boolean buildImpl() throws Exception {
buildForAppendTableCompact(env, fileStoreTable, isStreaming);
}
} else {
- buildForBucketedTableCompact(env, fileStoreTable, isStreaming);
+ if (fileStoreTable.primaryKeys().isEmpty()
+ && fileStoreTable.bucketMode() == BucketMode.HASH_FIXED
+ && !fileStoreTable.coreOptions().bucketAppendOrdered()
+ && fileStoreTable.coreOptions().clusteringIncrementalEnabled()) {
+ new IncrementalClusterCompact(
+ env, fileStoreTable, partitionPredicate, fullCompaction)
+ .build();
+ } else {
+ buildForBucketedTableCompact(env, fileStoreTable, isStreaming);
+ }
}
return true;
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java
index 25fe923df57a..e9f824db2b1b 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java
@@ -21,12 +21,15 @@
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.CommittableTypeInfo;
+import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.flink.source.AbstractNonCoordinatedSource;
import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader;
import org.apache.paimon.flink.source.SimpleSourceSplit;
import org.apache.paimon.flink.source.operator.ReadOperator;
import org.apache.paimon.flink.utils.JavaTypeInfo;
+import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
@@ -42,6 +45,7 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
@@ -89,7 +93,7 @@ public static Pair, DataStream> buildSource(
FileStoreTable table,
Map partitionSpec,
List splits,
- @Nullable CommitMessage dvCommitMessage,
+ @Nullable List dvCommitMessages,
@Nullable Integer parallelism) {
DataStream source =
env.fromSource(
@@ -101,9 +105,13 @@ public static Pair, DataStream> buildSource(
new JavaTypeInfo<>(Split.class))
.forceNonParallel();
+ StreamPartitioner partitioner =
+ table.bucketMode() == BucketMode.HASH_FIXED
+ ? new FlinkStreamPartitioner<>(new SplitChannelComputer())
+ : new RebalancePartitioner<>();
+
PartitionTransformation partitioned =
- new PartitionTransformation<>(
- source.getTransformation(), new RebalancePartitioner<>());
+ new PartitionTransformation<>(source.getTransformation(), partitioner);
if (parallelism != null) {
partitioned.setParallelism(parallelism);
}
@@ -122,7 +130,23 @@ public static Pair, DataStream> buildSource(
.transform(
"Remove files to be clustered",
new CommittableTypeInfo(),
- new RemoveClusterBeforeFilesOperator(dvCommitMessage))
+ new RemoveClusterBeforeFilesOperator(dvCommitMessages))
.forceNonParallel());
}
+
+ private static class SplitChannelComputer implements ChannelComputer {
+
+ private transient int numChannels;
+
+ @Override
+ public void setup(int numChannels) {
+ this.numChannels = numChannels;
+ }
+
+ @Override
+ public int channel(Split record) {
+ DataSplit dataSplit = (DataSplit) record;
+ return ChannelComputer.select(dataSplit.partition(), dataSplit.bucket(), numChannels);
+ }
+ }
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java
index 83b41940bc4e..f7f1347fcff6 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java
@@ -32,16 +32,17 @@
import javax.annotation.Nullable;
import java.util.Collections;
+import java.util.List;
/** Operator used with {@link IncrementalClusterSplitSource}, to remove files to be clustered. */
public class RemoveClusterBeforeFilesOperator extends BoundedOneInputOperator {
- private static final long serialVersionUID = 2L;
+ private static final long serialVersionUID = 3L;
- private final @Nullable CommitMessage dvCommitMessage;
+ private final @Nullable List dvCommitMessages;
- public RemoveClusterBeforeFilesOperator(@Nullable CommitMessage dvCommitMessage) {
- this.dvCommitMessage = dvCommitMessage;
+ public RemoveClusterBeforeFilesOperator(@Nullable List dvCommitMessages) {
+ this.dvCommitMessages = dvCommitMessages;
}
@Override
@@ -68,10 +69,15 @@ public void endInput() throws Exception {
}
private void emitDvIndexCommitMessages(long checkpointId) {
- if (dvCommitMessage != null) {
- output.collect(
- new StreamRecord<>(
- new Committable(checkpointId, Committable.Kind.FILE, dvCommitMessage)));
+ if (dvCommitMessages != null) {
+ for (CommitMessage dvCommitMessage : dvCommitMessages) {
+ if (dvCommitMessage != null) {
+ output.collect(
+ new StreamRecord<>(
+ new Committable(
+ checkpointId, Committable.Kind.FILE, dvCommitMessage)));
+ }
+ }
}
}
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java
index 01c2deba6466..5ec1c7b96e6e 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java
@@ -27,6 +27,7 @@
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.utils.Pair;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.slf4j.Logger;
@@ -38,23 +39,21 @@
import java.util.List;
import java.util.Map;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-
/** Rewrite committable for new files written after clustered. */
public class RewriteIncrementalClusterCommittableOperator
extends BoundedOneInputOperator {
protected static final Logger LOG =
LoggerFactory.getLogger(RewriteIncrementalClusterCommittableOperator.class);
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private final FileStoreTable table;
- private final Map outputLevels;
+ private final Map, Integer> outputLevels;
- private transient Map> partitionFiles;
+ private transient Map>> partitionFiles;
public RewriteIncrementalClusterCommittableOperator(
- FileStoreTable table, Map outputLevels) {
+ FileStoreTable table, Map, Integer> outputLevels) {
this.table = table;
this.outputLevels = outputLevels;
}
@@ -72,11 +71,18 @@ public void processElement(StreamRecord element) throws Exception {
}
CommitMessageImpl message = (CommitMessageImpl) committable.wrappedCommittable();
- checkArgument(message.bucket() == 0);
BinaryRow partition = message.partition();
- partitionFiles
- .computeIfAbsent(partition, file -> new ArrayList<>())
+ int bucket = message.bucket();
+
+ Map> bucketFiles = partitionFiles.get(partition);
+ if (bucketFiles == null) {
+ bucketFiles = new HashMap<>();
+ partitionFiles.put(partition.copy(), bucketFiles);
+ }
+ bucketFiles
+ .computeIfAbsent(bucket, file -> new ArrayList<>())
.addAll(message.newFilesIncrement().newFiles());
+ partitionFiles.put(partition, bucketFiles);
}
@Override
@@ -85,31 +91,38 @@ public void endInput() throws Exception {
}
protected void emitAll(long checkpointId) {
- for (Map.Entry> partitionEntry : partitionFiles.entrySet()) {
+ for (Map.Entry>> partitionEntry :
+ partitionFiles.entrySet()) {
BinaryRow partition = partitionEntry.getKey();
- // upgrade the clustered file to outputLevel
- List clusterAfter =
- IncrementalClusterManager.upgrade(
- partitionEntry.getValue(), outputLevels.get(partition));
- LOG.info(
- "Partition {}: upgrade file level to {}",
- partition,
- outputLevels.get(partition));
- CompactIncrement compactIncrement =
- new CompactIncrement(
- Collections.emptyList(), clusterAfter, Collections.emptyList());
- CommitMessageImpl clusterMessage =
- new CommitMessageImpl(
- partition,
- // bucket 0 is bucket for unaware-bucket table
- // for compatibility with the old design
- 0,
- table.coreOptions().bucket(),
- DataIncrement.emptyIncrement(),
- compactIncrement);
- output.collect(
- new StreamRecord<>(
- new Committable(checkpointId, Committable.Kind.FILE, clusterMessage)));
+ Map> bucketFiles = partitionEntry.getValue();
+
+ for (Map.Entry> bucketEntry : bucketFiles.entrySet()) {
+ int bucket = bucketEntry.getKey();
+ // upgrade the clustered file to outputLevel
+ List clusterAfter =
+ IncrementalClusterManager.upgrade(
+ bucketEntry.getValue(),
+ outputLevels.get(Pair.of(partition, bucket)));
+ LOG.info(
+ "Partition {}, bucket {}: upgrade file level to {}",
+ partition,
+ bucket,
+ outputLevels.get(Pair.of(partition, bucket)));
+ CompactIncrement compactIncrement =
+ new CompactIncrement(
+ Collections.emptyList(), clusterAfter, Collections.emptyList());
+ CommitMessageImpl clusterMessage =
+ new CommitMessageImpl(
+ partition,
+ bucket,
+ table.coreOptions().bucket(),
+ DataIncrement.emptyIncrement(),
+ compactIncrement);
+ output.collect(
+ new StreamRecord<>(
+ new Committable(
+ checkpointId, Committable.Kind.FILE, clusterMessage)));
+ }
}
partitionFiles.clear();
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java
index c7fef7026689..08955fd7aea3 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java
@@ -22,16 +22,20 @@
import org.apache.paimon.append.cluster.IncrementalClusterManager;
import org.apache.paimon.compact.CompactUnit;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.cluster.IncrementalClusterSplitSource;
import org.apache.paimon.flink.cluster.RewriteIncrementalClusterCommittableOperator;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.CommittableTypeInfo;
+import org.apache.paimon.flink.sink.FixedBucketSink;
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
+import org.apache.paimon.flink.sink.FlinkWriteSink;
import org.apache.paimon.flink.sink.RowAppendTableSink;
import org.apache.paimon.flink.sorter.TableSortInfo;
import org.apache.paimon.flink.sorter.TableSorter;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.source.DataSplit;
@@ -46,12 +50,14 @@
import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_PARALLELISM;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Compact for incremental clustering. */
public class IncrementalClusterCompact {
@@ -61,11 +67,14 @@ public class IncrementalClusterCompact {
protected final IncrementalClusterManager clusterManager;
protected final String commitUser;
protected final InternalRowPartitionComputer partitionComputer;
+ protected final BucketMode bucketMode;
protected final @Nullable Integer parallelism;
protected final int localSampleMagnification;
- protected final Map compactUnits;
- protected final Map, CommitMessage>> compactSplits;
+ protected final Map> compactUnits;
+ protected final Map, CommitMessage>>>
+ compactSplits;
+ protected final Map, Integer> outputLevels;
public IncrementalClusterCompact(
StreamExecutionEnvironment env,
@@ -78,6 +87,7 @@ public IncrementalClusterCompact(
Options options = new Options(table.options());
this.partitionComputer = table.store().partitionComputer();
this.commitUser = CoreOptions.createCommitUser(options);
+ this.bucketMode = table.bucketMode();
this.parallelism = options.get(SCAN_PARALLELISM);
this.localSampleMagnification = table.coreOptions().getLocalSampleMagnification();
if (localSampleMagnification < 20) {
@@ -87,10 +97,20 @@ public IncrementalClusterCompact(
CoreOptions.SORT_COMPACTION_SAMPLE_MAGNIFICATION.key(),
localSampleMagnification));
}
+
// non-full strategy as default for incremental clustering
this.compactUnits =
clusterManager.createCompactUnits(fullCompaction != null && fullCompaction);
- this.compactSplits = clusterManager.toSplitsAndRewriteDvFiles(compactUnits);
+ this.compactSplits =
+ clusterManager.toSplitsAndRewriteDvFiles(compactUnits, table.bucketMode());
+ this.outputLevels = new HashMap<>();
+ compactUnits.forEach(
+ (partition, bucketMap) -> {
+ bucketMap.forEach(
+ (bucket, unit) ->
+ outputLevels.put(
+ Pair.of(partition, bucket), unit.outputLevel()));
+ });
}
public void build() throws Exception {
@@ -100,57 +120,84 @@ public void build() throws Exception {
}
List> dataStreams = new ArrayList<>();
- for (Map.Entry, CommitMessage>> entry :
+ switch (bucketMode) {
+ case HASH_FIXED:
+ buildForFixedBucket(dataStreams);
+ break;
+ case BUCKET_UNAWARE:
+ buildForUnawareBucket(dataStreams);
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported bucket mode: " + bucketMode);
+ }
+
+ buildCommitOperator(dataStreams);
+ }
+
+ protected void buildForUnawareBucket(List> dataStreams) {
+ for (Map.Entry, CommitMessage>>> entry :
compactSplits.entrySet()) {
+ BinaryRow partition = entry.getKey();
+ checkArgument(
+ entry.getValue().size() == 1,
+ "Unaware-bucket table should only have one bucket.");
+ Pair, CommitMessage> pair = entry.getValue().values().iterator().next();
dataStreams.addAll(
buildCompactOperator(
- entry.getKey(),
- entry.getValue().getKey(),
- entry.getValue().getRight(),
+ partitionComputer.generatePartValues(partition),
+ pair.getLeft(),
+ Collections.singletonList(pair.getRight()),
parallelism));
}
+ }
- buildCommitOperator(dataStreams);
+ protected void buildForFixedBucket(List> dataStreams) {
+ // read data of all partitions and shuffle by partition and bucket
+ List dataSplits = new ArrayList<>();
+ List dvCommitMessages = new ArrayList<>();
+ compactSplits.forEach(
+ (partition, bucketEntry) -> {
+ bucketEntry.forEach(
+ (bucket, pair) -> {
+ dataSplits.addAll(pair.getLeft());
+ dvCommitMessages.add(pair.getRight());
+ });
+ });
+
+ dataStreams.addAll(
+ buildCompactOperator(
+ new LinkedHashMap<>(), dataSplits, dvCommitMessages, parallelism));
}
/**
- * Build for one partition.
+ * Build compact operator for specified splits.
*
* @param parallelism Give the caller the opportunity to set parallelism
*/
protected List> buildCompactOperator(
- BinaryRow partition,
+ LinkedHashMap partitionSpec,
List splits,
- CommitMessage dvCommitMessage,
+ List dvCommitMessages,
@Nullable Integer parallelism) {
- LinkedHashMap partitionSpec =
- partitionComputer.generatePartValues(partition);
- // 2.1 generate source for current partition
+ // 2.1 generate source for splits
Pair, DataStream> sourcePair =
IncrementalClusterSplitSource.buildSource(
- env, table, partitionSpec, splits, dvCommitMessage, parallelism);
+ env, table, partitionSpec, splits, dvCommitMessages, parallelism);
- // 2.2 cluster in partition
+ // 2.2 cluster for splits
+ // --- for unaware bucket, need global sort
+ // --- for fixed bucket, just need local sort
Integer sinkParallelism = parallelism;
if (sinkParallelism == null) {
sinkParallelism = sourcePair.getLeft().getParallelism();
}
- TableSortInfo sortInfo =
- new TableSortInfo.Builder()
- .setSortColumns(clusterManager.clusterKeys())
- .setSortStrategy(clusterManager.clusterCurve())
- .setSinkParallelism(sinkParallelism)
- .setLocalSampleSize(sinkParallelism * localSampleMagnification)
- .setGlobalSampleSize(sinkParallelism * 1000)
- .setRangeNumber(sinkParallelism * 10)
- .build();
- DataStream sorted =
- TableSorter.getSorter(env, sourcePair.getLeft(), table, sortInfo).sort();
+
+ DataStream sorted = sortDataStream(sourcePair.getLeft(), sinkParallelism);
// 2.3 write and then reorganize the committable
// set parallelism to null, and it'll forward parallelism when doWrite()
- RowAppendTableSink sink = new RowAppendTableSink(table, null, null, null);
+ FlinkWriteSink sink = getSink();
boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor();
DataStream written =
sink.doWrite(
@@ -167,14 +214,7 @@ protected List> buildCompactOperator(
"Rewrite cluster committable",
new CommittableTypeInfo(),
new RewriteIncrementalClusterCommittableOperator(
- table,
- compactUnits.entrySet().stream()
- .collect(
- Collectors.toMap(
- Map.Entry::getKey,
- unit ->
- unit.getValue()
- .outputLevel()))))
+ table, outputLevels))
.setParallelism(written.getParallelism());
List> dataStreams = new ArrayList<>();
@@ -184,11 +224,43 @@ protected List> buildCompactOperator(
}
protected void buildCommitOperator(List> dataStreams) {
- RowAppendTableSink sink = new RowAppendTableSink(table, null, null, null);
+ FlinkWriteSink sink = getSink();
DataStream dataStream = dataStreams.get(0);
for (int i = 1; i < dataStreams.size(); i++) {
dataStream = dataStream.union(dataStreams.get(i));
}
sink.doCommit(dataStream, commitUser);
}
+
+ protected DataStream sortDataStream(
+ DataStream input, Integer sinkParallelism) {
+ TableSortInfo sortInfo =
+ new TableSortInfo.Builder()
+ .setSortColumns(clusterManager.clusterKeys())
+ .setSortStrategy(clusterManager.clusterCurve())
+ .setSinkParallelism(sinkParallelism)
+ .setLocalSampleSize(sinkParallelism * localSampleMagnification)
+ .setGlobalSampleSize(sinkParallelism * 1000)
+ .setRangeNumber(sinkParallelism * 10)
+ .build();
+ switch (bucketMode) {
+ case HASH_FIXED:
+ return TableSorter.getSorter(env, input, table, sortInfo).sortInLocal();
+ case BUCKET_UNAWARE:
+ return TableSorter.getSorter(env, input, table, sortInfo).sort();
+ default:
+ throw new UnsupportedOperationException("Unsupported bucket mode: " + bucketMode);
+ }
+ }
+
+ protected FlinkWriteSink getSink() {
+ switch (bucketMode) {
+ case HASH_FIXED:
+ return new FixedBucketSink(table, null, null);
+ case BUCKET_UNAWARE:
+ return new RowAppendTableSink(table, null, null, null);
+ default:
+ throw new UnsupportedOperationException("Unsupported bucket mode: " + bucketMode);
+ }
+ }
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/HilbertSorter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/HilbertSorter.java
index 26c698f5ed71..7c96f018871b 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/HilbertSorter.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/HilbertSorter.java
@@ -61,7 +61,12 @@ public HilbertSorter(
@Override
public DataStream sort() {
- return sortStreamByHilbert(origin, table);
+ return sortStreamByHilbert(origin, table, true);
+ }
+
+ @Override
+ public DataStream sortInLocal() {
+ return sortStreamByHilbert(origin, table, false);
}
/**
@@ -71,7 +76,7 @@ public DataStream sort() {
* @return the sorted data stream
*/
private DataStream sortStreamByHilbert(
- DataStream inputStream, FileStoreTable table) {
+ DataStream inputStream, FileStoreTable table, boolean isGlobalSort) {
final HilbertIndexer hilbertIndexer = new HilbertIndexer(table.rowType(), orderColNames);
return SortUtils.sortStreamByKey(
inputStream,
@@ -102,6 +107,7 @@ public byte[] apply(RowData value) {
}
},
GenericRow::of,
- tableSortInfo);
+ tableSortInfo,
+ isGlobalSort);
}
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java
index 406195b6fce6..171338160059 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java
@@ -49,13 +49,23 @@ public OrderSorter(
@Override
public DataStream sort() {
+ return sortStreamByOrder(origin, table, true);
+ }
+
+ @Override
+ public DataStream sortInLocal() {
+ return sortStreamByOrder(origin, table, false);
+ }
+
+ private DataStream sortStreamByOrder(
+ DataStream inputStream, FileStoreTable table, boolean isGlobalSort) {
final RowType valueRowType = table.rowType();
final int[] keyProjectionMap = table.schema().projection(orderColNames);
final RowType keyRowType =
addKeyNamePrefix(Projection.of(keyProjectionMap).project(valueRowType));
return SortUtils.sortStreamByKey(
- origin,
+ inputStream,
table,
keyRowType,
InternalTypeInfo.fromRowType(keyRowType),
@@ -77,6 +87,7 @@ public InternalRow apply(RowData value) {
}
},
row -> row,
- tableSortInfo);
+ tableSortInfo,
+ isGlobalSort);
}
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
index b30e14551296..330e441f00bf 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
@@ -78,6 +78,7 @@ public class SortUtils {
* @param convertor convert the `KEY` to the sort key, then we can sort in
* `BinaryExternalSortBuffer`.
* @param tableSortInfo the necessary info of table sort.
+ * @param isGlobalSort whether to do global sort
* @return the global sorted data stream
* @param the KEY type in range shuffle
*/
@@ -89,14 +90,163 @@ public static DataStream sortStreamByKey(
final SerializableSupplier> shuffleKeyComparator,
final KeyAbstract shuffleKeyAbstract,
final ShuffleKeyConvertor convertor,
- final TableSortInfo tableSortInfo) {
+ final TableSortInfo tableSortInfo,
+ final boolean isGlobalSort) {
+ return isGlobalSort
+ ? sortStreamByKeyInGlobal(
+ inputStream,
+ table,
+ sortKeyType,
+ keyTypeInformation,
+ shuffleKeyComparator,
+ shuffleKeyAbstract,
+ convertor,
+ tableSortInfo)
+ : sortStreamByKeyInLocal(
+ inputStream,
+ table,
+ sortKeyType,
+ keyTypeInformation,
+ shuffleKeyAbstract,
+ convertor,
+ tableSortInfo);
+ }
+ public static DataStream sortStreamByKeyInGlobal(
+ final DataStream inputStream,
+ final FileStoreTable table,
+ final RowType sortKeyType,
+ final TypeInformation keyTypeInformation,
+ final SerializableSupplier> shuffleKeyComparator,
+ final KeyAbstract shuffleKeyAbstract,
+ final ShuffleKeyConvertor convertor,
+ final TableSortInfo tableSortInfo) {
+ CoreOptions options = table.coreOptions();
final RowType valueRowType = table.rowType();
+ final int sinkParallelism = tableSortInfo.getSinkParallelism();
+
+ DataStream> inputWithKey =
+ generateSortKey(inputStream, keyTypeInformation, shuffleKeyAbstract);
+
+ DataStream> rangeShuffleResult =
+ rangeShuffle(
+ inputWithKey,
+ keyTypeInformation,
+ shuffleKeyComparator,
+ tableSortInfo,
+ valueRowType,
+ options.sortBySize());
+
+ if (tableSortInfo.isSortInCluster()) {
+ return localSort(
+ rangeShuffleResult,
+ convertor,
+ sortKeyType,
+ valueRowType,
+ inputStream.getType(),
+ options,
+ sinkParallelism);
+ } else {
+ return rangeShuffleResult
+ .transform("REMOVE KEY", inputStream.getType(), new RemoveKeyOperator<>())
+ .setParallelism(sinkParallelism);
+ }
+ }
+
+ public static DataStream sortStreamByKeyInLocal(
+ final DataStream inputStream,
+ final FileStoreTable table,
+ final RowType sortKeyType,
+ final TypeInformation keyTypeInformation,
+ final KeyAbstract shuffleKeyAbstract,
+ final ShuffleKeyConvertor convertor,
+ final TableSortInfo tableSortInfo) {
CoreOptions options = table.coreOptions();
+ final RowType valueRowType = table.rowType();
+ final int sinkParallelism = tableSortInfo.getSinkParallelism();
+
+ DataStream> inputWithKey =
+ generateSortKey(inputStream, keyTypeInformation, shuffleKeyAbstract);
+
+ return localSort(
+ inputWithKey,
+ convertor,
+ sortKeyType,
+ valueRowType,
+ inputStream.getType(),
+ options,
+ sinkParallelism);
+ }
+
+ public static DataStream> generateSortKey(
+ final DataStream inputStream,
+ final TypeInformation keyTypeInformation,
+ final KeyAbstract shuffleKeyAbstract) {
+ // generate the KEY as the key of Pair.
+ return inputStream
+ .map(
+ new RichMapFunction>() {
+
+ /**
+ * Do not annotate with @override here to maintain
+ * compatibility with Flink 1.18-.
+ */
+ public void open(OpenContext openContext) throws Exception {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with @override here to maintain
+ * compatibility with Flink 2.0+.
+ */
+ public void open(Configuration parameters) throws Exception {
+ shuffleKeyAbstract.open();
+ }
+
+ @Override
+ public Tuple2 map(RowData value) {
+ return Tuple2.of(shuffleKeyAbstract.apply(value), value);
+ }
+ },
+ new TupleTypeInfo<>(keyTypeInformation, inputStream.getType()))
+ .setParallelism(inputStream.getParallelism());
+ }
+
+ public static DataStream> rangeShuffle(
+ final DataStream> inputWithKey,
+ final TypeInformation keyTypeInformation,
+ final SerializableSupplier> shuffleKeyComparator,
+ final TableSortInfo tableSortInfo,
+ final RowType valueRowType,
+ final boolean isSortBySize) {
+
final int sinkParallelism = tableSortInfo.getSinkParallelism();
final int localSampleSize = tableSortInfo.getLocalSampleSize();
final int globalSampleSize = tableSortInfo.getGlobalSampleSize();
final int rangeNum = tableSortInfo.getRangeNumber();
+
+ // range shuffle by key
+ return RangeShuffle.rangeShuffleByKey(
+ inputWithKey,
+ shuffleKeyComparator,
+ keyTypeInformation,
+ localSampleSize,
+ globalSampleSize,
+ rangeNum,
+ sinkParallelism,
+ valueRowType,
+ isSortBySize);
+ }
+
+ public static DataStream localSort(
+ DataStream> rangeShuffleResult,
+ final ShuffleKeyConvertor convertor,
+ final RowType sortKeyType,
+ final RowType valueRowType,
+ final TypeInformation typeInformation,
+ final CoreOptions options,
+ final Integer sinkParallelism) {
+
int keyFieldCount = sortKeyType.getFieldCount();
int valueFieldCount = valueRowType.getFieldCount();
final int[] valueProjectionMap = new int[valueFieldCount];
@@ -114,105 +264,57 @@ public static DataStream sortStreamByKey(
final InternalTypeInfo internalRowType =
InternalTypeInfo.fromRowType(longRowType);
- // generate the KEY as the key of Pair.
- DataStream> inputWithKey =
- inputStream
- .map(
- new RichMapFunction>() {
-
- /**
- * Do not annotate with @override here to maintain
- * compatibility with Flink 1.18-.
- */
- public void open(OpenContext openContext) throws Exception {
- open(new Configuration());
- }
-
- /**
- * Do not annotate with @override here to maintain
- * compatibility with Flink 2.0+.
- */
- public void open(Configuration parameters) throws Exception {
- shuffleKeyAbstract.open();
- }
-
- @Override
- public Tuple2 map(RowData value) {
- return Tuple2.of(shuffleKeyAbstract.apply(value), value);
- }
- },
- new TupleTypeInfo<>(keyTypeInformation, inputStream.getType()))
- .setParallelism(inputStream.getParallelism());
+ return rangeShuffleResult
+ .map(
+ a -> new JoinedRow(convertor.apply(a.f0), new FlinkRowWrapper(a.f1)),
+ internalRowType)
+ .setParallelism(sinkParallelism)
+ // sort the output locally by `SortOperator`
+ .transform(
+ "LOCAL SORT",
+ internalRowType,
+ new SortOperator(
+ sortKeyType,
+ longRowType,
+ options.writeBufferSize(),
+ options.pageSize(),
+ options.localSortMaxNumFileHandles(),
+ options.spillCompressOptions(),
+ sinkParallelism,
+ options.writeBufferSpillDiskSize(),
+ options.sequenceFieldSortOrderIsAscending()))
+ .setParallelism(sinkParallelism)
+ // remove the key column from every row
+ .map(
+ new RichMapFunction() {
- // range shuffle by key
- DataStream> rangeShuffleResult =
- RangeShuffle.rangeShuffleByKey(
- inputWithKey,
- shuffleKeyComparator,
- keyTypeInformation,
- localSampleSize,
- globalSampleSize,
- rangeNum,
- sinkParallelism,
- valueRowType,
- options.sortBySize());
- if (tableSortInfo.isSortInCluster()) {
- return rangeShuffleResult
- .map(
- a -> new JoinedRow(convertor.apply(a.f0), new FlinkRowWrapper(a.f1)),
- internalRowType)
- .setParallelism(sinkParallelism)
- // sort the output locally by `SortOperator`
- .transform(
- "LOCAL SORT",
- internalRowType,
- new SortOperator(
- sortKeyType,
- longRowType,
- options.writeBufferSize(),
- options.pageSize(),
- options.localSortMaxNumFileHandles(),
- options.spillCompressOptions(),
- sinkParallelism,
- options.writeBufferSpillDiskSize(),
- options.sequenceFieldSortOrderIsAscending()))
- .setParallelism(sinkParallelism)
- // remove the key column from every row
- .map(
- new RichMapFunction() {
-
- private transient KeyProjectedRow keyProjectedRow;
-
- /**
- * Do not annotate with @override here to maintain
- * compatibility with Flink 1.18-.
- */
- public void open(OpenContext openContext) {
- open(new Configuration());
- }
-
- /**
- * Do not annotate with @override here to maintain
- * compatibility with Flink 2.0+.
- */
- public void open(Configuration parameters) {
- keyProjectedRow = new KeyProjectedRow(valueProjectionMap);
- }
-
- @Override
- public InternalRow map(InternalRow value) {
- return keyProjectedRow.replaceRow(value);
- }
- },
- InternalTypeInfo.fromRowType(valueRowType))
- .setParallelism(sinkParallelism)
- .map(FlinkRowData::new, inputStream.getType())
- .setParallelism(sinkParallelism);
- } else {
- return rangeShuffleResult
- .transform("REMOVE KEY", inputStream.getType(), new RemoveKeyOperator<>())
- .setParallelism(sinkParallelism);
- }
+ private transient KeyProjectedRow keyProjectedRow;
+
+ /**
+ * Do not annotate with @override here to maintain
+ * compatibility with Flink 1.18-.
+ */
+ public void open(OpenContext openContext) {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with @override here to maintain
+ * compatibility with Flink 2.0+.
+ */
+ public void open(Configuration parameters) {
+ keyProjectedRow = new KeyProjectedRow(valueProjectionMap);
+ }
+
+ @Override
+ public InternalRow map(InternalRow value) {
+ return keyProjectedRow.replaceRow(value);
+ }
+ },
+ InternalTypeInfo.fromRowType(valueRowType))
+ .setParallelism(sinkParallelism)
+ .map(FlinkRowData::new, typeInformation)
+ .setParallelism(sinkParallelism);
}
/** Abstract key from a row data. */
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java
index 1d51ca4ebee6..c152090d4b2e 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java
@@ -67,6 +67,8 @@ private void checkColNames() {
public abstract DataStream sort();
+ public abstract DataStream sortInLocal();
+
public static TableSorter getSorter(
StreamExecutionEnvironment batchTEnv,
DataStream origin,
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java
index fad66f364731..9e932be38d44 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java
@@ -61,7 +61,12 @@ public ZorderSorter(
@Override
public DataStream sort() {
- return sortStreamByZOrder(origin, table);
+ return sortStreamByZOrder(origin, table, true);
+ }
+
+ @Override
+ public DataStream sortInLocal() {
+ return sortStreamByZOrder(origin, table, false);
}
/**
@@ -71,7 +76,7 @@ public DataStream sort() {
* @return the sorted data stream
*/
private DataStream sortStreamByZOrder(
- DataStream inputStream, FileStoreTable table) {
+ DataStream inputStream, FileStoreTable table, boolean isGlobalSort) {
final ZIndexer zIndexer =
new ZIndexer(table.rowType(), orderColNames, table.coreOptions().varTypeSize());
return SortUtils.sortStreamByKey(
@@ -105,6 +110,7 @@ public byte[] apply(RowData value) {
}
},
GenericRow::of,
- sortInfo);
+ sortInfo,
+ isGlobalSort);
}
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
index 6c80070528ac..1538f8c083af 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
@@ -683,6 +683,171 @@ public void testClusterWithDeletionVector() throws Exception {
assertThat(splits.get(0).deletionFiles().get().get(0)).isNull();
}
+ @Test
+ public void testClusterWithBucket() throws Exception {
+ Map dynamicOptions = commonOptions();
+ dynamicOptions.put(CoreOptions.BUCKET.key(), "2");
+ dynamicOptions.put(CoreOptions.BUCKET_KEY.key(), "pt");
+ dynamicOptions.put(CoreOptions.BUCKET_APPEND_ORDERED.key(), "false");
+ FileStoreTable table = createTable(null, dynamicOptions);
+
+ BinaryString randomStr = BinaryString.fromString(randomString(150));
+ List messages = new ArrayList<>();
+
+ // first write
+ for (int pt = 0; pt < 2; pt++) {
+ for (int i = 0; i < 3; i++) {
+ for (int j = 0; j < 3; j++) {
+ messages.addAll(write(GenericRow.of(i, j, randomStr, pt)));
+ }
+ }
+ }
+ commit(messages);
+ ReadBuilder readBuilder = table.newReadBuilder().withProjection(new int[] {0, 1, 3});
+ List result1 =
+ getResult(
+ readBuilder.newRead(),
+ readBuilder.newScan().plan().splits(),
+ readBuilder.readType());
+ List expected1 = new ArrayList<>();
+ for (int pt = 0; pt <= 1; pt++) {
+ expected1.add(String.format("+I[0, 0, %s]", pt));
+ expected1.add(String.format("+I[0, 1, %s]", pt));
+ expected1.add(String.format("+I[0, 2, %s]", pt));
+ expected1.add(String.format("+I[1, 0, %s]", pt));
+ expected1.add(String.format("+I[1, 1, %s]", pt));
+ expected1.add(String.format("+I[1, 2, %s]", pt));
+ expected1.add(String.format("+I[2, 0, %s]", pt));
+ expected1.add(String.format("+I[2, 1, %s]", pt));
+ expected1.add(String.format("+I[2, 2, %s]", pt));
+ }
+ assertThat(result1).containsExactlyElementsOf(expected1);
+
+ // first cluster
+ runAction(Collections.emptyList());
+ checkSnapshot(table);
+ List splits = readBuilder.newScan().plan().splits();
+ assertThat(splits.size()).isEqualTo(2);
+ assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(1);
+ assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5);
+ List result2 = getResult(readBuilder.newRead(), splits, readBuilder.readType());
+ List expected2 = new ArrayList<>();
+ for (int pt = 1; pt >= 0; pt--) {
+ expected2.add(String.format("+I[0, 0, %s]", pt));
+ expected2.add(String.format("+I[0, 1, %s]", pt));
+ expected2.add(String.format("+I[1, 0, %s]", pt));
+ expected2.add(String.format("+I[1, 1, %s]", pt));
+ expected2.add(String.format("+I[0, 2, %s]", pt));
+ expected2.add(String.format("+I[1, 2, %s]", pt));
+ expected2.add(String.format("+I[2, 0, %s]", pt));
+ expected2.add(String.format("+I[2, 1, %s]", pt));
+ expected2.add(String.format("+I[2, 2, %s]", pt));
+ }
+ assertThat(result2).containsExactlyElementsOf(expected2);
+
+ // second write
+ messages.clear();
+ for (int pt = 0; pt <= 1; pt++) {
+ messages.addAll(
+ write(
+ GenericRow.of(0, 3, null, pt),
+ GenericRow.of(1, 3, null, pt),
+ GenericRow.of(2, 3, null, pt)));
+ messages.addAll(
+ write(
+ GenericRow.of(3, 0, null, pt),
+ GenericRow.of(3, 1, null, pt),
+ GenericRow.of(3, 2, null, pt),
+ GenericRow.of(3, 3, null, pt)));
+ }
+ commit(messages);
+
+ List result3 =
+ getResult(
+ readBuilder.newRead(),
+ readBuilder.newScan().plan().splits(),
+ readBuilder.readType());
+ List expected3 = new ArrayList<>();
+ for (int pt = 1; pt >= 0; pt--) {
+ expected3.add(String.format("+I[0, 0, %s]", pt));
+ expected3.add(String.format("+I[0, 1, %s]", pt));
+ expected3.add(String.format("+I[1, 0, %s]", pt));
+ expected3.add(String.format("+I[1, 1, %s]", pt));
+ expected3.add(String.format("+I[0, 2, %s]", pt));
+ expected3.add(String.format("+I[1, 2, %s]", pt));
+ expected3.add(String.format("+I[2, 0, %s]", pt));
+ expected3.add(String.format("+I[2, 1, %s]", pt));
+ expected3.add(String.format("+I[2, 2, %s]", pt));
+ expected3.add(String.format("+I[0, 3, %s]", pt));
+ expected3.add(String.format("+I[1, 3, %s]", pt));
+ expected3.add(String.format("+I[2, 3, %s]", pt));
+ expected3.add(String.format("+I[3, 0, %s]", pt));
+ expected3.add(String.format("+I[3, 1, %s]", pt));
+ expected3.add(String.format("+I[3, 2, %s]", pt));
+ expected3.add(String.format("+I[3, 3, %s]", pt));
+ }
+ assertThat(result3).containsExactlyElementsOf(expected3);
+
+ // second cluster(incremental)
+ runAction(Collections.emptyList());
+ checkSnapshot(table);
+ splits = readBuilder.newScan().plan().splits();
+ List result4 = getResult(readBuilder.newRead(), splits, readBuilder.readType());
+ List expected4 = new ArrayList<>();
+ for (int pt = 1; pt >= 0; pt--) {
+ expected4.add(String.format("+I[0, 0, %s]", pt));
+ expected4.add(String.format("+I[0, 1, %s]", pt));
+ expected4.add(String.format("+I[1, 0, %s]", pt));
+ expected4.add(String.format("+I[1, 1, %s]", pt));
+ expected4.add(String.format("+I[0, 2, %s]", pt));
+ expected4.add(String.format("+I[1, 2, %s]", pt));
+ expected4.add(String.format("+I[2, 0, %s]", pt));
+ expected4.add(String.format("+I[2, 1, %s]", pt));
+ expected4.add(String.format("+I[2, 2, %s]", pt));
+ expected4.add(String.format("+I[0, 3, %s]", pt));
+ expected4.add(String.format("+I[1, 3, %s]", pt));
+ expected4.add(String.format("+I[3, 0, %s]", pt));
+ expected4.add(String.format("+I[3, 1, %s]", pt));
+ expected4.add(String.format("+I[2, 3, %s]", pt));
+ expected4.add(String.format("+I[3, 2, %s]", pt));
+ expected4.add(String.format("+I[3, 3, %s]", pt));
+ }
+ assertThat(splits.size()).isEqualTo(2);
+ assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(2);
+ assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5);
+ assertThat(((DataSplit) splits.get(0)).dataFiles().get(1).level()).isEqualTo(4);
+ assertThat(result4).containsExactlyElementsOf(expected4);
+
+ // full cluster
+ runAction(Lists.newArrayList("--compact_strategy", "full"));
+ checkSnapshot(table);
+ splits = readBuilder.newScan().plan().splits();
+ List result5 = getResult(readBuilder.newRead(), splits, readBuilder.readType());
+ List expected5 = new ArrayList<>();
+ for (int pt = 1; pt >= 0; pt--) {
+ expected5.add(String.format("+I[0, 0, %s]", pt));
+ expected5.add(String.format("+I[0, 1, %s]", pt));
+ expected5.add(String.format("+I[1, 0, %s]", pt));
+ expected5.add(String.format("+I[1, 1, %s]", pt));
+ expected5.add(String.format("+I[0, 2, %s]", pt));
+ expected5.add(String.format("+I[0, 3, %s]", pt));
+ expected5.add(String.format("+I[1, 2, %s]", pt));
+ expected5.add(String.format("+I[1, 3, %s]", pt));
+ expected5.add(String.format("+I[2, 0, %s]", pt));
+ expected5.add(String.format("+I[2, 1, %s]", pt));
+ expected5.add(String.format("+I[3, 0, %s]", pt));
+ expected5.add(String.format("+I[3, 1, %s]", pt));
+ expected5.add(String.format("+I[2, 2, %s]", pt));
+ expected5.add(String.format("+I[2, 3, %s]", pt));
+ expected5.add(String.format("+I[3, 2, %s]", pt));
+ expected5.add(String.format("+I[3, 3, %s]", pt));
+ }
+ assertThat(splits.size()).isEqualTo(2);
+ assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(1);
+ assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5);
+ assertThat(result5).containsExactlyElementsOf(expected5);
+ }
+
protected FileStoreTable createTable(String partitionKeys) throws Exception {
return createTable(partitionKeys, commonOptions());
}
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index d3e10d68f9fd..4f780895789f 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -637,11 +637,15 @@ private void clusterIncrementalUnAwareBucketTable(
DataSourceV2Relation relation) {
IncrementalClusterManager incrementalClusterManager =
new IncrementalClusterManager(table, partitionPredicate);
- Map compactUnits =
+ checkArgument(
+ table.bucketMode() != BucketMode.HASH_FIXED,
+ "Clustering for bucketed table is not supported for spark currently.");
+ Map> compactUnits =
incrementalClusterManager.createCompactUnits(fullCompaction);
- Map, CommitMessage>> partitionSplits =
- incrementalClusterManager.toSplitsAndRewriteDvFiles(compactUnits);
+ Map, CommitMessage>>> partitionSplits =
+ incrementalClusterManager.toSplitsAndRewriteDvFiles(
+ compactUnits, table.bucketMode());
// sort in partition
TableSorter sorter =
@@ -656,7 +660,13 @@ private void clusterIncrementalUnAwareBucketTable(
Dataset datasetForWrite =
partitionSplits.values().stream()
- .map(Pair::getKey)
+ .map(
+ bucketEntry -> {
+ checkArgument(
+ bucketEntry.size() == 1,
+ "Unaware-bucket table should only have one bucket.");
+ return bucketEntry.values().iterator().next().getLeft();
+ })
.map(
splits -> {
Dataset dataset =
@@ -679,53 +689,70 @@ private void clusterIncrementalUnAwareBucketTable(
}
// re-organize the commit messages to generate the compact messages
- Map> partitionClustered = new HashMap<>();
+ Map>> partitionClustered = new HashMap<>();
for (CommitMessage commitMessage : JavaConverters.seqAsJavaList(commitMessages)) {
- checkArgument(commitMessage.bucket() == 0);
- partitionClustered
- .computeIfAbsent(commitMessage.partition(), k -> new ArrayList<>())
+ BinaryRow partition = commitMessage.partition();
+ int bucket = commitMessage.bucket();
+ checkArgument(bucket == 0);
+
+ Map> bucketFiles = partitionClustered.get(partition);
+ if (bucketFiles == null) {
+ bucketFiles = new HashMap<>();
+ partitionClustered.put(partition.copy(), bucketFiles);
+ }
+ bucketFiles
+ .computeIfAbsent(bucket, file -> new ArrayList<>())
.addAll(((CommitMessageImpl) commitMessage).newFilesIncrement().newFiles());
+ partitionClustered.put(partition, bucketFiles);
}
List clusterMessages = new ArrayList<>();
- for (Map.Entry> entry : partitionClustered.entrySet()) {
+ for (Map.Entry>> entry :
+ partitionClustered.entrySet()) {
BinaryRow partition = entry.getKey();
- CommitMessageImpl dvCommitMessage =
- (CommitMessageImpl) partitionSplits.get(partition).getValue();
- List clusterBefore = compactUnits.get(partition).files();
- // upgrade the clustered file to outputLevel
- List clusterAfter =
- IncrementalClusterManager.upgrade(
- entry.getValue(), compactUnits.get(partition).outputLevel());
- LOG.info(
- "Partition {}: upgrade file level to {}",
- partition,
- compactUnits.get(partition).outputLevel());
-
- List newIndexFiles = new ArrayList<>();
- List deletedIndexFiles = new ArrayList<>();
- if (dvCommitMessage != null) {
- newIndexFiles = dvCommitMessage.compactIncrement().newIndexFiles();
- deletedIndexFiles = dvCommitMessage.compactIncrement().deletedIndexFiles();
- }
+ Map> bucketEntries = entry.getValue();
+ Map, CommitMessage>> bucketSplits =
+ partitionSplits.get(partition);
+ Map bucketUnits = compactUnits.get(partition);
+ for (Map.Entry> bucketEntry :
+ bucketEntries.entrySet()) {
+ int bucket = bucketEntry.getKey();
+ CommitMessageImpl dvCommitMessage =
+ (CommitMessageImpl) bucketSplits.get(bucket).getValue();
+ List clusterBefore = bucketUnits.get(bucket).files();
+ // upgrade the clustered file to outputLevel
+ List clusterAfter =
+ IncrementalClusterManager.upgrade(
+ bucketEntry.getValue(), bucketUnits.get(bucket).outputLevel());
+ LOG.info(
+ "Partition {}, bucket {}: upgrade file level to {}",
+ partition,
+ bucket,
+ bucketUnits.get(bucket).outputLevel());
+
+ List newIndexFiles = new ArrayList<>();
+ List deletedIndexFiles = new ArrayList<>();
+ if (dvCommitMessage != null) {
+ newIndexFiles = dvCommitMessage.compactIncrement().newIndexFiles();
+ deletedIndexFiles = dvCommitMessage.compactIncrement().deletedIndexFiles();
+ }
- // get the dv index messages
- CompactIncrement compactIncrement =
- new CompactIncrement(
- clusterBefore,
- clusterAfter,
- Collections.emptyList(),
- newIndexFiles,
- deletedIndexFiles);
- clusterMessages.add(
- new CommitMessageImpl(
- partition,
- // bucket 0 is bucket for unaware-bucket table
- // for compatibility with the old design
- 0,
- table.coreOptions().bucket(),
- DataIncrement.emptyIncrement(),
- compactIncrement));
+ // get the dv index messages
+ CompactIncrement compactIncrement =
+ new CompactIncrement(
+ clusterBefore,
+ clusterAfter,
+ Collections.emptyList(),
+ newIndexFiles,
+ deletedIndexFiles);
+ clusterMessages.add(
+ new CommitMessageImpl(
+ partition,
+ bucket,
+ table.coreOptions().bucket(),
+ DataIncrement.emptyIncrement(),
+ compactIncrement));
+ }
}
if (LOG.isDebugEnabled()) {
LOG.debug("Commit messages after reorganizing:{}", clusterMessages);