From b99d64545758d04d92cf5dba306bc01461b1f54c Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Tue, 16 Dec 2025 20:21:49 +0800 Subject: [PATCH 1/8] extract rangeShuffle and localSort --- .../apache/paimon/flink/sorter/SortUtils.java | 210 +++++++++++------- 1 file changed, 126 insertions(+), 84 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java index b30e14551296..db2e23a172e3 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java @@ -91,28 +91,49 @@ public static DataStream sortStreamByKey( final ShuffleKeyConvertor convertor, final TableSortInfo tableSortInfo) { - final RowType valueRowType = table.rowType(); CoreOptions options = table.coreOptions(); + final RowType valueRowType = table.rowType(); final int sinkParallelism = tableSortInfo.getSinkParallelism(); - final int localSampleSize = tableSortInfo.getLocalSampleSize(); - final int globalSampleSize = tableSortInfo.getGlobalSampleSize(); - final int rangeNum = tableSortInfo.getRangeNumber(); - int keyFieldCount = sortKeyType.getFieldCount(); - int valueFieldCount = valueRowType.getFieldCount(); - final int[] valueProjectionMap = new int[valueFieldCount]; - for (int i = 0; i < valueFieldCount; i++) { - valueProjectionMap[i] = i + keyFieldCount; + + DataStream> rangeShuffleResult = + rangeShuffle( + inputStream, + shuffleKeyAbstract, + keyTypeInformation, + shuffleKeyComparator, + tableSortInfo, + valueRowType, + options.sortBySize()); + + if (tableSortInfo.isSortInCluster()) { + return localSort( + rangeShuffleResult, + convertor, + sortKeyType, + valueRowType, + inputStream.getType(), + options, + sinkParallelism); + } else { + return rangeShuffleResult + .transform("REMOVE KEY", inputStream.getType(), new RemoveKeyOperator<>()) + .setParallelism(sinkParallelism); } + } - List keyFields = sortKeyType.getFields(); - List dataFields = valueRowType.getFields(); + public static DataStream> rangeShuffle( + final DataStream inputStream, + final KeyAbstract shuffleKeyAbstract, + final TypeInformation keyTypeInformation, + final SerializableSupplier> shuffleKeyComparator, + final TableSortInfo tableSortInfo, + final RowType valueRowType, + final boolean isSortBySize) { - List fields = new ArrayList<>(); - fields.addAll(keyFields); - fields.addAll(dataFields); - final RowType longRowType = new RowType(fields); - final InternalTypeInfo internalRowType = - InternalTypeInfo.fromRowType(longRowType); + final int sinkParallelism = tableSortInfo.getSinkParallelism(); + final int localSampleSize = tableSortInfo.getLocalSampleSize(); + final int globalSampleSize = tableSortInfo.getGlobalSampleSize(); + final int rangeNum = tableSortInfo.getRangeNumber(); // generate the KEY as the key of Pair. DataStream> inputWithKey = @@ -145,74 +166,95 @@ public Tuple2 map(RowData value) { .setParallelism(inputStream.getParallelism()); // range shuffle by key - DataStream> rangeShuffleResult = - RangeShuffle.rangeShuffleByKey( - inputWithKey, - shuffleKeyComparator, - keyTypeInformation, - localSampleSize, - globalSampleSize, - rangeNum, - sinkParallelism, - valueRowType, - options.sortBySize()); - if (tableSortInfo.isSortInCluster()) { - return rangeShuffleResult - .map( - a -> new JoinedRow(convertor.apply(a.f0), new FlinkRowWrapper(a.f1)), - internalRowType) - .setParallelism(sinkParallelism) - // sort the output locally by `SortOperator` - .transform( - "LOCAL SORT", - internalRowType, - new SortOperator( - sortKeyType, - longRowType, - options.writeBufferSize(), - options.pageSize(), - options.localSortMaxNumFileHandles(), - options.spillCompressOptions(), - sinkParallelism, - options.writeBufferSpillDiskSize(), - options.sequenceFieldSortOrderIsAscending())) - .setParallelism(sinkParallelism) - // remove the key column from every row - .map( - new RichMapFunction() { - - private transient KeyProjectedRow keyProjectedRow; - - /** - * Do not annotate with @override here to maintain - * compatibility with Flink 1.18-. - */ - public void open(OpenContext openContext) { - open(new Configuration()); - } - - /** - * Do not annotate with @override here to maintain - * compatibility with Flink 2.0+. - */ - public void open(Configuration parameters) { - keyProjectedRow = new KeyProjectedRow(valueProjectionMap); - } - - @Override - public InternalRow map(InternalRow value) { - return keyProjectedRow.replaceRow(value); - } - }, - InternalTypeInfo.fromRowType(valueRowType)) - .setParallelism(sinkParallelism) - .map(FlinkRowData::new, inputStream.getType()) - .setParallelism(sinkParallelism); - } else { - return rangeShuffleResult - .transform("REMOVE KEY", inputStream.getType(), new RemoveKeyOperator<>()) - .setParallelism(sinkParallelism); + return RangeShuffle.rangeShuffleByKey( + inputWithKey, + shuffleKeyComparator, + keyTypeInformation, + localSampleSize, + globalSampleSize, + rangeNum, + sinkParallelism, + valueRowType, + isSortBySize); + } + + public static DataStream localSort( + DataStream> rangeShuffleResult, + final ShuffleKeyConvertor convertor, + final RowType sortKeyType, + final RowType valueRowType, + final TypeInformation typeInformation, + final CoreOptions options, + final Integer sinkParallelism) { + + int keyFieldCount = sortKeyType.getFieldCount(); + int valueFieldCount = valueRowType.getFieldCount(); + final int[] valueProjectionMap = new int[valueFieldCount]; + for (int i = 0; i < valueFieldCount; i++) { + valueProjectionMap[i] = i + keyFieldCount; } + + List keyFields = sortKeyType.getFields(); + List dataFields = valueRowType.getFields(); + + List fields = new ArrayList<>(); + fields.addAll(keyFields); + fields.addAll(dataFields); + final RowType longRowType = new RowType(fields); + final InternalTypeInfo internalRowType = + InternalTypeInfo.fromRowType(longRowType); + + return rangeShuffleResult + .map( + a -> new JoinedRow(convertor.apply(a.f0), new FlinkRowWrapper(a.f1)), + internalRowType) + .setParallelism(sinkParallelism) + // sort the output locally by `SortOperator` + .transform( + "LOCAL SORT", + internalRowType, + new SortOperator( + sortKeyType, + longRowType, + options.writeBufferSize(), + options.pageSize(), + options.localSortMaxNumFileHandles(), + options.spillCompressOptions(), + sinkParallelism, + options.writeBufferSpillDiskSize(), + options.sequenceFieldSortOrderIsAscending())) + .setParallelism(sinkParallelism) + // remove the key column from every row + .map( + new RichMapFunction() { + + private transient KeyProjectedRow keyProjectedRow; + + /** + * Do not annotate with @override here to maintain + * compatibility with Flink 1.18-. + */ + public void open(OpenContext openContext) { + open(new Configuration()); + } + + /** + * Do not annotate with @override here to maintain + * compatibility with Flink 2.0+. + */ + public void open(Configuration parameters) { + keyProjectedRow = new KeyProjectedRow(valueProjectionMap); + } + + @Override + public InternalRow map(InternalRow value) { + return keyProjectedRow.replaceRow(value); + } + }, + InternalTypeInfo.fromRowType(valueRowType)) + .setParallelism(sinkParallelism) + .map(FlinkRowData::new, typeInformation) + .setParallelism(sinkParallelism); } /** Abstract key from a row data. */ From 9904b30736c8e0a90899efc69003c6bce9f31dac Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Thu, 18 Dec 2025 10:53:54 +0800 Subject: [PATCH 2/8] support cluster for bucket table --- .../cluster/HistoryPartitionCluster.java | 57 +-- .../cluster/IncrementalClusterManager.java | 332 +++++++++++------- .../cluster/HistoryPartitionClusterTest.java | 2 +- .../IncrementalClusterManagerTest.java | 7 +- .../IncrementalClusterSplitSource.java | 30 +- .../RemoveClusterBeforeFilesOperator.java | 7 +- ...IncrementalClusterCommittableOperator.java | 79 +++-- .../compact/IncrementalClusterCompact.java | 146 ++++++-- .../paimon/flink/sorter/HilbertSorter.java | 12 +- .../paimon/flink/sorter/OrderSorter.java | 15 +- .../apache/paimon/flink/sorter/SortUtils.java | 128 +++++-- .../paimon/flink/sorter/TableSorter.java | 2 + .../paimon/flink/sorter/ZorderSorter.java | 12 +- 13 files changed, 549 insertions(+), 280 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/append/cluster/HistoryPartitionCluster.java b/paimon-core/src/main/java/org/apache/paimon/append/cluster/HistoryPartitionCluster.java index 8acbb8e7785a..7a3f3c494a79 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/cluster/HistoryPartitionCluster.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/cluster/HistoryPartitionCluster.java @@ -38,7 +38,6 @@ import java.time.Duration; import java.time.LocalDateTime; import java.time.ZoneId; -import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -46,8 +45,10 @@ import java.util.Optional; import java.util.stream.Collectors; -import static org.apache.paimon.append.cluster.IncrementalClusterManager.constructPartitionLevels; -import static org.apache.paimon.append.cluster.IncrementalClusterManager.logForPartitionLevel; +import static org.apache.paimon.append.cluster.IncrementalClusterManager.constructBucketLevels; +import static org.apache.paimon.append.cluster.IncrementalClusterManager.groupByPtAndBucket; +import static org.apache.paimon.append.cluster.IncrementalClusterManager.logForCompactUnits; +import static org.apache.paimon.append.cluster.IncrementalClusterManager.logForLevels; /** Handle historical partition for full clustering. */ public class HistoryPartitionCluster { @@ -111,23 +112,39 @@ public static HistoryPartitionCluster create( limit); } - public Map pickForHistoryPartitions() { - Map> partitionLevels = + public Map> createHistoryCompactUnits() { + Map>> partitionLevels = constructLevelsForHistoryPartitions(); - logForPartitionLevel(partitionLevels, partitionComputer); + logForLevels(partitionLevels, partitionComputer); - Map units = new HashMap<>(); + Map> units = new HashMap<>(); partitionLevels.forEach( - (k, v) -> { - Optional pick = - incrementalClusterStrategy.pick(maxLevel + 1, v, true); - pick.ifPresent(compactUnit -> units.put(k, compactUnit)); + (partition, bucketLevels) -> { + Map bucketUnits = new HashMap<>(); + bucketLevels.forEach( + (bucket, levels) -> { + Optional pick = + incrementalClusterStrategy.pick(maxLevel + 1, levels, true); + pick.ifPresent( + compactUnit -> { + bucketUnits.put(bucket, compactUnit); + if (LOG.isDebugEnabled()) { + logForCompactUnits( + partitionComputer.generatePartValues( + partition), + bucket, + compactUnit); + } + }); + }); + units.put(partition, bucketUnits); }); return units; } @VisibleForTesting - public Map> constructLevelsForHistoryPartitions() { + public Map>> + constructLevelsForHistoryPartitions() { long historyMilli = LocalDateTime.now() .minus(historyPartitionIdleTime) @@ -152,23 +169,19 @@ public Map> constructLevelsForHistoryPartitions( .read() .dataSplits(); - Map> historyPartitionFiles = new HashMap<>(); - for (DataSplit dataSplit : historyDataSplits) { - historyPartitionFiles - .computeIfAbsent(dataSplit.partition(), k -> new ArrayList<>()) - .addAll(dataSplit.dataFiles()); - } + Map>> historyPartitionFiles = + groupByPtAndBucket(historyDataSplits); return filterPartitions(historyPartitionFiles).entrySet().stream() .collect( Collectors.toMap( Map.Entry::getKey, - entry -> constructPartitionLevels(entry.getValue()))); + entry -> constructBucketLevels(entry.getValue()))); } - private Map> filterPartitions( - Map> partitionFiles) { - Map> result = new HashMap<>(); + private Map>> filterPartitions( + Map>> partitionFiles) { + Map>> result = new HashMap<>(); partitionFiles.forEach( (part, files) -> { if (specifiedPartitions.test(part)) { diff --git a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java index 1c8da3c03348..f4d49bbcd779 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java @@ -123,167 +123,210 @@ public IncrementalClusterManager( specifiedPartitions); } - public Map createCompactUnits(boolean fullCompaction) { - // 1. construct LSM structure for each partition - Map> partitionLevels = constructLevels(); - logForPartitionLevel(partitionLevels, partitionComputer); + public Map> createCompactUnits(boolean fullCompaction) { + // 1. construct LSM structure for each bucket + Map>> partitionLevels = + constructPartitionLevels(); + if (LOG.isDebugEnabled()) { + logForLevels(partitionLevels, partitionComputer); + } - // 2. pick files to be clustered for each partition - Map units = new HashMap<>(); + // 2. pick files to be clustered for each bucket + Map> units = new HashMap<>(); partitionLevels.forEach( - (k, v) -> { - Optional pick = - incrementalClusterStrategy.pick(numLevels, v, fullCompaction); - pick.ifPresent(compactUnit -> units.put(k, compactUnit)); + (partition, bucketLevels) -> { + Map bucketUnits = new HashMap<>(); + bucketLevels.forEach( + (bucket, levels) -> { + Optional pick = + incrementalClusterStrategy.pick( + numLevels, levels, fullCompaction); + pick.ifPresent( + compactUnit -> { + bucketUnits.put(bucket, compactUnit); + if (LOG.isDebugEnabled()) { + logForCompactUnits( + partitionComputer.generatePartValues( + partition), + bucket, + compactUnit); + } + }); + }); + units.put(partition, bucketUnits); }); if (historyPartitionCluster != null) { - units.putAll(historyPartitionCluster.pickForHistoryPartitions()); + units.putAll(historyPartitionCluster.createHistoryCompactUnits()); } - if (LOG.isDebugEnabled()) { - units.forEach( - (partition, compactUnit) -> { - String filesInfo = - compactUnit.files().stream() - .map( - file -> - String.format( - "%s,%s,%s", - file.fileName(), - file.level(), - file.fileSize())) - .collect(Collectors.joining(", ")); - LOG.debug( - "Partition {}, outputLevel:{}, clustered with {} files: [{}]", - partitionComputer.generatePartValues(partition), - compactUnit.outputLevel(), - compactUnit.files().size(), - filesInfo); - }); - } return units; } - public Map> constructLevels() { + public Map>> constructPartitionLevels() { List dataSplits = snapshotReader.read().dataSplits(); - Map> partitionFiles = new HashMap<>(); - for (DataSplit dataSplit : dataSplits) { - partitionFiles - .computeIfAbsent(dataSplit.partition(), k -> new ArrayList<>()) - .addAll(dataSplit.dataFiles()); - } - return partitionFiles.entrySet().stream() + Map>> partitionLevels = + groupByPtAndBucket(dataSplits); + + return partitionLevels.entrySet().stream() .collect( Collectors.toMap( Map.Entry::getKey, - entry -> constructPartitionLevels(entry.getValue()))); + entry -> constructBucketLevels(entry.getValue()))); } - public static List constructPartitionLevels(List partitionFiles) { - List partitionLevels = new ArrayList<>(); + public static Map> constructBucketLevels( + Map> bucketLevels) { + return bucketLevels.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, entry -> constructLevels(entry.getValue()))); + } + + public static List constructLevels(List files) { + List levels = new ArrayList<>(); Map> levelMap = - partitionFiles.stream().collect(Collectors.groupingBy(DataFileMeta::level)); + files.stream().collect(Collectors.groupingBy(DataFileMeta::level)); for (Map.Entry> entry : levelMap.entrySet()) { int level = entry.getKey(); if (level == 0) { for (DataFileMeta level0File : entry.getValue()) { - partitionLevels.add( - new LevelSortedRun(level, SortedRun.fromSingle(level0File))); + levels.add(new LevelSortedRun(level, SortedRun.fromSingle(level0File))); } } else { // don't need to guarantee that the files within the same sorted run are // non-overlapping here, so we call SortedRun.fromSorted() to avoid sorting and // validation - partitionLevels.add( - new LevelSortedRun(level, SortedRun.fromSorted(entry.getValue()))); + levels.add(new LevelSortedRun(level, SortedRun.fromSorted(entry.getValue()))); } } // sort by level - partitionLevels.sort(Comparator.comparing(LevelSortedRun::level)); + levels.sort(Comparator.comparing(LevelSortedRun::level)); + return levels; + } + + public static Map>> groupByPtAndBucket( + List dataSplits) { + Map>> partitionLevels = new HashMap<>(); + for (DataSplit dataSplit : dataSplits) { + BinaryRow partition = dataSplit.partition(); + Map> bucketLevels = partitionLevels.get(partition); + if (bucketLevels == null) { + bucketLevels = new HashMap<>(); + partitionLevels.put(partition.copy(), bucketLevels); + } + bucketLevels + .computeIfAbsent(dataSplit.bucket(), k -> new ArrayList<>()) + .addAll(dataSplit.dataFiles()); + } return partitionLevels; } - public Map, CommitMessage>> toSplitsAndRewriteDvFiles( - Map compactUnits) { - Map, CommitMessage>> result = new HashMap<>(); + public Map, CommitMessage>>> + toSplitsAndRewriteDvFiles( + Map> compactUnits, BucketMode bucketMode) { + Map, CommitMessage>>> result = new HashMap<>(); boolean dvEnabled = table.coreOptions().deletionVectorsEnabled(); - for (BinaryRow partition : compactUnits.keySet()) { - CompactUnit unit = compactUnits.get(partition); - AppendDeleteFileMaintainer dvMaintainer = - dvEnabled - ? BaseAppendDeleteFileMaintainer.forUnawareAppend( - table.store().newIndexFileHandler(), snapshot, partition) - : null; - List splits = new ArrayList<>(); - - DataSplit.Builder builder = - DataSplit.builder() - .withPartition(partition) - .withBucket(0) - .withTotalBuckets(1) - .isStreaming(false); - - SplitGenerator splitGenerator = snapshotReader.splitGenerator(); - List splitGroups = - splitGenerator.splitForBatch(unit.files()); - - for (SplitGenerator.SplitGroup splitGroup : splitGroups) { - List dataFiles = splitGroup.files; - - String bucketPath = - snapshotReader.pathFactory().bucketPath(partition, 0).toString(); - builder.withDataFiles(dataFiles) - .rawConvertible(splitGroup.rawConvertible) - .withBucketPath(bucketPath); + if (bucketMode == BucketMode.HASH_FIXED) { + checkArgument( + !dvEnabled, + "Deletion vector is not supported for clustering hash fixed bucket table"); + } - if (dvMaintainer != null) { - List dataDeletionFiles = new ArrayList<>(); - for (DataFileMeta file : dataFiles) { - DeletionFile deletionFile = - dvMaintainer.notifyRemovedDeletionVector(file.fileName()); - dataDeletionFiles.add(deletionFile); + for (BinaryRow partition : compactUnits.keySet()) { + Map bucketUnits = compactUnits.get(partition); + Map, CommitMessage>> bucketSplits = new HashMap<>(); + for (Integer bucket : bucketUnits.keySet()) { + CompactUnit unit = bucketUnits.get(bucket); + BaseAppendDeleteFileMaintainer dvMaintainer = + getDvMaintainer(bucketMode, partition, bucket); + List splits = new ArrayList<>(); + + DataSplit.Builder builder = + DataSplit.builder() + .withPartition(partition) + .withBucket(bucket) + .withTotalBuckets(table.coreOptions().bucket()) + .isStreaming(false); + + SplitGenerator splitGenerator = snapshotReader.splitGenerator(); + List splitGroups = + splitGenerator.splitForBatch(unit.files()); + + for (SplitGenerator.SplitGroup splitGroup : splitGroups) { + List dataFiles = splitGroup.files; + + String bucketPath = + snapshotReader.pathFactory().bucketPath(partition, 0).toString(); + builder.withDataFiles(dataFiles) + .rawConvertible(splitGroup.rawConvertible) + .withBucketPath(bucketPath); + + if (dvMaintainer != null) { + List dataDeletionFiles = new ArrayList<>(); + for (DataFileMeta file : dataFiles) { + DeletionFile deletionFile = + ((AppendDeleteFileMaintainer) dvMaintainer) + .notifyRemovedDeletionVector(file.fileName()); + dataDeletionFiles.add(deletionFile); + } + builder.withDataDeletionFiles(dataDeletionFiles); } - builder.withDataDeletionFiles(dataDeletionFiles); + splits.add(builder.build()); } - splits.add(builder.build()); - } - // generate delete dv index meta - CommitMessage dvCommitMessage = null; - if (dvMaintainer != null) { - List newIndexFiles = new ArrayList<>(); - List deletedIndexFiles = new ArrayList<>(); - List indexEntries = dvMaintainer.persist(); - for (IndexManifestEntry entry : indexEntries) { - if (entry.kind() == FileKind.ADD) { - newIndexFiles.add(entry.indexFile()); - } else { - deletedIndexFiles.add(entry.indexFile()); + // generate delete dv index meta + CommitMessage dvCommitMessage = null; + if (dvMaintainer != null) { + List newIndexFiles = new ArrayList<>(); + List deletedIndexFiles = new ArrayList<>(); + List indexEntries = dvMaintainer.persist(); + for (IndexManifestEntry entry : indexEntries) { + if (entry.kind() == FileKind.ADD) { + newIndexFiles.add(entry.indexFile()); + } else { + deletedIndexFiles.add(entry.indexFile()); + } } + dvCommitMessage = + new CommitMessageImpl( + dvMaintainer.getPartition(), + bucket, + table.coreOptions().bucket(), + DataIncrement.emptyIncrement(), + new CompactIncrement( + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + newIndexFiles, + deletedIndexFiles)); } - dvCommitMessage = - new CommitMessageImpl( - dvMaintainer.getPartition(), - 0, - table.coreOptions().bucket(), - DataIncrement.emptyIncrement(), - new CompactIncrement( - Collections.emptyList(), - Collections.emptyList(), - Collections.emptyList(), - newIndexFiles, - deletedIndexFiles)); + + bucketSplits.put(bucket, Pair.of(splits, dvCommitMessage)); } - result.put(partition, Pair.of(splits, dvCommitMessage)); + result.put(partition, bucketSplits); } return result; } + public BaseAppendDeleteFileMaintainer getDvMaintainer( + BucketMode bucketMode, BinaryRow partition, int bucket) { + switch (bucketMode) { + case HASH_FIXED: + // TODO: support dv for hash fixed bucket table + return null; + case BUCKET_UNAWARE: + return BaseAppendDeleteFileMaintainer.forUnawareAppend( + table.store().newIndexFileHandler(), snapshot, partition); + default: + throw new UnsupportedOperationException("Unsupported bucket mode: " + bucketMode); + } + } + public static List upgrade( List filesAfterCluster, int outputLevel) { return filesAfterCluster.stream() @@ -291,28 +334,49 @@ public static List upgrade( .collect(Collectors.toList()); } - public static void logForPartitionLevel( - Map> partitionLevels, + public static void logForLevels( + Map>> partitionLevels, InternalRowPartitionComputer partitionComputer) { - if (LOG.isDebugEnabled()) { - partitionLevels.forEach( - (partition, levelSortedRuns) -> { - String runsInfo = - levelSortedRuns.stream() - .map( - lsr -> - String.format( - "level-%s:%s", - lsr.level(), - lsr.run().files().size())) - .collect(Collectors.joining(",")); - LOG.debug( - "Partition {} has {} runs: [{}]", - partitionComputer.generatePartValues(partition), - levelSortedRuns.size(), - runsInfo); - }); - } + partitionLevels.forEach( + (partition, bucketLevels) -> { + bucketLevels.forEach( + (bucket, levels) -> { + String runsInfo = + levels.stream() + .map( + lsr -> + String.format( + "level-%s:%s", + lsr.level(), + lsr.run().files().size())) + .collect(Collectors.joining(",")); + LOG.debug( + "Partition {}, bucket {} has {} runs: [{}]", + partitionComputer.generatePartValues(partition), + bucket, + levels.size(), + runsInfo); + }); + }); + } + + public static void logForCompactUnits( + Map partition, int bucket, CompactUnit compactUnit) { + String filesInfo = + compactUnit.files().stream() + .map( + file -> + String.format( + "%s,%s,%s", + file.fileName(), file.level(), file.fileSize())) + .collect(Collectors.joining(", ")); + LOG.debug( + "Partition {}, bucket {}, outputLevel:{}, clustered with {} files: [{}]", + partition, + bucket, + compactUnit.outputLevel(), + compactUnit.files().size(), + filesInfo); } public CoreOptions.OrderType clusterCurve() { diff --git a/paimon-core/src/test/java/org/apache/paimon/append/cluster/HistoryPartitionClusterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/cluster/HistoryPartitionClusterTest.java index f1f93ca989b2..12d0bd7099f7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/cluster/HistoryPartitionClusterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/cluster/HistoryPartitionClusterTest.java @@ -80,7 +80,7 @@ public void testHistoryPartitionAutoClustering() throws Exception { RowType.of(DataTypes.INT()), Lists.newArrayList(BinaryRow.singleColumn("pt1")))) .historyPartitionCluster(); - Map> partitionLevels = + Map>> partitionLevels = historyPartitionCluster.constructLevelsForHistoryPartitions(); assertThat(partitionLevels.size()).isEqualTo(1); assertThat(partitionLevels.get(BinaryRow.singleColumn("pt2"))).isNotEmpty(); diff --git a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java index 516fa63b55dc..635dbfe57c6c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java @@ -82,7 +82,7 @@ public void testNonClusterIncremental() throws Exception { } @Test - public void testConstructPartitionLevels() throws Exception { + public void testConstructLevels() throws Exception { // Create a valid table for IncrementalClusterManager Map options = new HashMap<>(); FileStoreTable table = createTable(options, Collections.emptyList()); @@ -108,8 +108,7 @@ public void testConstructPartitionLevels() throws Exception { partitionFiles.add(level2File1); // Call the method under test - List result = - incrementalClusterManager.constructPartitionLevels(partitionFiles); + List result = IncrementalClusterManager.constructLevels(partitionFiles); // Verify the results assertThat(result).hasSize(4); // 2 level-0 runs + 1 level-1 run + 1 level-2 run @@ -191,7 +190,7 @@ public void testHistoryPartitionAutoClustering() throws Exception { PartitionPredicate.fromMultiple( RowType.of(DataTypes.INT()), Lists.newArrayList(BinaryRow.singleColumn("pt3")))); - Map partitionLevels = + Map> partitionLevels = incrementalClusterManager.createCompactUnits(true); assertThat(partitionLevels.size()).isEqualTo(2); assertThat(partitionLevels.get(BinaryRow.singleColumn("pt1"))).isNotNull(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java index 25fe923df57a..3b362c8c4da4 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java @@ -21,12 +21,15 @@ import org.apache.paimon.flink.LogicalTypeConversion; import org.apache.paimon.flink.sink.Committable; import org.apache.paimon.flink.sink.CommittableTypeInfo; +import org.apache.paimon.flink.sink.FlinkStreamPartitioner; import org.apache.paimon.flink.source.AbstractNonCoordinatedSource; import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader; import org.apache.paimon.flink.source.SimpleSourceSplit; import org.apache.paimon.flink.source.operator.ReadOperator; import org.apache.paimon.flink.utils.JavaTypeInfo; +import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.ChannelComputer; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.Split; @@ -42,6 +45,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.transformations.PartitionTransformation; import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner; +import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; @@ -89,7 +93,7 @@ public static Pair, DataStream> buildSource( FileStoreTable table, Map partitionSpec, List splits, - @Nullable CommitMessage dvCommitMessage, + @Nullable List dvCommitMessage, @Nullable Integer parallelism) { DataStream source = env.fromSource( @@ -101,9 +105,13 @@ public static Pair, DataStream> buildSource( new JavaTypeInfo<>(Split.class)) .forceNonParallel(); + StreamPartitioner partitioner = + table.bucketMode() == BucketMode.HASH_FIXED + ? new FlinkStreamPartitioner<>(new SplitChannelComputer()) + : new RebalancePartitioner<>(); + PartitionTransformation partitioned = - new PartitionTransformation<>( - source.getTransformation(), new RebalancePartitioner<>()); + new PartitionTransformation<>(source.getTransformation(), partitioner); if (parallelism != null) { partitioned.setParallelism(parallelism); } @@ -125,4 +133,20 @@ public static Pair, DataStream> buildSource( new RemoveClusterBeforeFilesOperator(dvCommitMessage)) .forceNonParallel()); } + + private static class SplitChannelComputer implements ChannelComputer { + + private transient int numChannels; + + @Override + public void setup(int numChannels) { + this.numChannels = numChannels; + } + + @Override + public int channel(Split record) { + DataSplit dataSplit = (DataSplit) record; + return ChannelComputer.select(dataSplit.partition(), dataSplit.bucket(), numChannels); + } + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java index 83b41940bc4e..0b8beddf299b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java @@ -32,15 +32,16 @@ import javax.annotation.Nullable; import java.util.Collections; +import java.util.List; /** Operator used with {@link IncrementalClusterSplitSource}, to remove files to be clustered. */ public class RemoveClusterBeforeFilesOperator extends BoundedOneInputOperator { - private static final long serialVersionUID = 2L; + private static final long serialVersionUID = 3L; - private final @Nullable CommitMessage dvCommitMessage; + private final @Nullable List dvCommitMessage; - public RemoveClusterBeforeFilesOperator(@Nullable CommitMessage dvCommitMessage) { + public RemoveClusterBeforeFilesOperator(@Nullable List dvCommitMessage) { this.dvCommitMessage = dvCommitMessage; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java index 01c2deba6466..5ec1c7b96e6e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java @@ -27,6 +27,7 @@ import org.apache.paimon.io.DataIncrement; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.paimon.utils.Pair; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.slf4j.Logger; @@ -38,23 +39,21 @@ import java.util.List; import java.util.Map; -import static org.apache.paimon.utils.Preconditions.checkArgument; - /** Rewrite committable for new files written after clustered. */ public class RewriteIncrementalClusterCommittableOperator extends BoundedOneInputOperator { protected static final Logger LOG = LoggerFactory.getLogger(RewriteIncrementalClusterCommittableOperator.class); - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 2L; private final FileStoreTable table; - private final Map outputLevels; + private final Map, Integer> outputLevels; - private transient Map> partitionFiles; + private transient Map>> partitionFiles; public RewriteIncrementalClusterCommittableOperator( - FileStoreTable table, Map outputLevels) { + FileStoreTable table, Map, Integer> outputLevels) { this.table = table; this.outputLevels = outputLevels; } @@ -72,11 +71,18 @@ public void processElement(StreamRecord element) throws Exception { } CommitMessageImpl message = (CommitMessageImpl) committable.wrappedCommittable(); - checkArgument(message.bucket() == 0); BinaryRow partition = message.partition(); - partitionFiles - .computeIfAbsent(partition, file -> new ArrayList<>()) + int bucket = message.bucket(); + + Map> bucketFiles = partitionFiles.get(partition); + if (bucketFiles == null) { + bucketFiles = new HashMap<>(); + partitionFiles.put(partition.copy(), bucketFiles); + } + bucketFiles + .computeIfAbsent(bucket, file -> new ArrayList<>()) .addAll(message.newFilesIncrement().newFiles()); + partitionFiles.put(partition, bucketFiles); } @Override @@ -85,31 +91,38 @@ public void endInput() throws Exception { } protected void emitAll(long checkpointId) { - for (Map.Entry> partitionEntry : partitionFiles.entrySet()) { + for (Map.Entry>> partitionEntry : + partitionFiles.entrySet()) { BinaryRow partition = partitionEntry.getKey(); - // upgrade the clustered file to outputLevel - List clusterAfter = - IncrementalClusterManager.upgrade( - partitionEntry.getValue(), outputLevels.get(partition)); - LOG.info( - "Partition {}: upgrade file level to {}", - partition, - outputLevels.get(partition)); - CompactIncrement compactIncrement = - new CompactIncrement( - Collections.emptyList(), clusterAfter, Collections.emptyList()); - CommitMessageImpl clusterMessage = - new CommitMessageImpl( - partition, - // bucket 0 is bucket for unaware-bucket table - // for compatibility with the old design - 0, - table.coreOptions().bucket(), - DataIncrement.emptyIncrement(), - compactIncrement); - output.collect( - new StreamRecord<>( - new Committable(checkpointId, Committable.Kind.FILE, clusterMessage))); + Map> bucketFiles = partitionEntry.getValue(); + + for (Map.Entry> bucketEntry : bucketFiles.entrySet()) { + int bucket = bucketEntry.getKey(); + // upgrade the clustered file to outputLevel + List clusterAfter = + IncrementalClusterManager.upgrade( + bucketEntry.getValue(), + outputLevels.get(Pair.of(partition, bucket))); + LOG.info( + "Partition {}, bucket {}: upgrade file level to {}", + partition, + bucket, + outputLevels.get(Pair.of(partition, bucket))); + CompactIncrement compactIncrement = + new CompactIncrement( + Collections.emptyList(), clusterAfter, Collections.emptyList()); + CommitMessageImpl clusterMessage = + new CommitMessageImpl( + partition, + bucket, + table.coreOptions().bucket(), + DataIncrement.emptyIncrement(), + compactIncrement); + output.collect( + new StreamRecord<>( + new Committable( + checkpointId, Committable.Kind.FILE, clusterMessage))); + } } partitionFiles.clear(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java index c7fef7026689..a3348809c416 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java @@ -22,16 +22,20 @@ import org.apache.paimon.append.cluster.IncrementalClusterManager; import org.apache.paimon.compact.CompactUnit; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.InternalRow; import org.apache.paimon.flink.cluster.IncrementalClusterSplitSource; import org.apache.paimon.flink.cluster.RewriteIncrementalClusterCommittableOperator; import org.apache.paimon.flink.sink.Committable; import org.apache.paimon.flink.sink.CommittableTypeInfo; +import org.apache.paimon.flink.sink.FixedBucketSink; import org.apache.paimon.flink.sink.FlinkSinkBuilder; +import org.apache.paimon.flink.sink.FlinkWriteSink; import org.apache.paimon.flink.sink.RowAppendTableSink; import org.apache.paimon.flink.sorter.TableSortInfo; import org.apache.paimon.flink.sorter.TableSorter; import org.apache.paimon.options.Options; import org.apache.paimon.partition.PartitionPredicate; +import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.source.DataSplit; @@ -46,12 +50,14 @@ import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_PARALLELISM; +import static org.apache.paimon.utils.Preconditions.checkArgument; /** Compact for incremental clustering. */ public class IncrementalClusterCompact { @@ -61,11 +67,14 @@ public class IncrementalClusterCompact { protected final IncrementalClusterManager clusterManager; protected final String commitUser; protected final InternalRowPartitionComputer partitionComputer; + protected final BucketMode bucketMode; protected final @Nullable Integer parallelism; protected final int localSampleMagnification; - protected final Map compactUnits; - protected final Map, CommitMessage>> compactSplits; + protected final Map> compactUnits; + protected final Map, CommitMessage>>> + compactSplits; + protected final Map, Integer> outputLevels; public IncrementalClusterCompact( StreamExecutionEnvironment env, @@ -78,6 +87,7 @@ public IncrementalClusterCompact( Options options = new Options(table.options()); this.partitionComputer = table.store().partitionComputer(); this.commitUser = CoreOptions.createCommitUser(options); + this.bucketMode = table.bucketMode(); this.parallelism = options.get(SCAN_PARALLELISM); this.localSampleMagnification = table.coreOptions().getLocalSampleMagnification(); if (localSampleMagnification < 20) { @@ -87,10 +97,20 @@ public IncrementalClusterCompact( CoreOptions.SORT_COMPACTION_SAMPLE_MAGNIFICATION.key(), localSampleMagnification)); } + // non-full strategy as default for incremental clustering this.compactUnits = clusterManager.createCompactUnits(fullCompaction != null && fullCompaction); - this.compactSplits = clusterManager.toSplitsAndRewriteDvFiles(compactUnits); + this.compactSplits = + clusterManager.toSplitsAndRewriteDvFiles(compactUnits, table.bucketMode()); + this.outputLevels = new HashMap<>(); + compactUnits.forEach( + (partition, bucketMap) -> { + bucketMap.forEach( + (bucket, unit) -> + outputLevels.put( + Pair.of(partition, bucket), unit.outputLevel())); + }); } public void build() throws Exception { @@ -100,57 +120,82 @@ public void build() throws Exception { } List> dataStreams = new ArrayList<>(); - for (Map.Entry, CommitMessage>> entry : + switch (bucketMode) { + case HASH_FIXED: + buildForFixedBucket(dataStreams); + break; + case BUCKET_UNAWARE: + buildForUnawareBucket(dataStreams); + break; + default: + throw new UnsupportedOperationException("Unsupported bucket mode: " + bucketMode); + } + + buildCommitOperator(dataStreams); + } + + protected void buildForUnawareBucket(List> dataStreams) { + for (Map.Entry, CommitMessage>>> entry : compactSplits.entrySet()) { + BinaryRow partition = entry.getKey(); + checkArgument( + entry.getValue().size() == 1, + "Unaware-bucket table should only have one bucket."); + Pair, CommitMessage> pair = entry.getValue().values().iterator().next(); dataStreams.addAll( buildCompactOperator( - entry.getKey(), - entry.getValue().getKey(), - entry.getValue().getRight(), + partitionComputer.generatePartValues(partition), + pair.getLeft(), + Collections.singletonList(pair.getRight()), parallelism)); } + } - buildCommitOperator(dataStreams); + protected void buildForFixedBucket(List> dataStreams) { + // read data of all partitions and shuffle by partition and bucket + List dataSplits = new ArrayList<>(); + List dvCommitMessages = new ArrayList<>(); + compactSplits.forEach( + (partition, bucketEntry) -> { + bucketEntry.forEach( + (bucket, pair) -> { + dataSplits.addAll(pair.getLeft()); + dvCommitMessages.add(pair.getRight()); + }); + }); + + buildCompactOperator(new LinkedHashMap<>(), dataSplits, dvCommitMessages, parallelism); } /** - * Build for one partition. + * Build compact operator for specified splits. * * @param parallelism Give the caller the opportunity to set parallelism */ protected List> buildCompactOperator( - BinaryRow partition, + LinkedHashMap partitionSpec, List splits, - CommitMessage dvCommitMessage, + List dvCommitMessages, @Nullable Integer parallelism) { - LinkedHashMap partitionSpec = - partitionComputer.generatePartValues(partition); - // 2.1 generate source for current partition + // 2.1 generate source for splits Pair, DataStream> sourcePair = IncrementalClusterSplitSource.buildSource( - env, table, partitionSpec, splits, dvCommitMessage, parallelism); + env, table, partitionSpec, splits, dvCommitMessages, parallelism); - // 2.2 cluster in partition + // 2.2 cluster for splits + // --- for unaware bucket, need global sort + // --- for fixed bucket, just need local sort Integer sinkParallelism = parallelism; if (sinkParallelism == null) { sinkParallelism = sourcePair.getLeft().getParallelism(); } - TableSortInfo sortInfo = - new TableSortInfo.Builder() - .setSortColumns(clusterManager.clusterKeys()) - .setSortStrategy(clusterManager.clusterCurve()) - .setSinkParallelism(sinkParallelism) - .setLocalSampleSize(sinkParallelism * localSampleMagnification) - .setGlobalSampleSize(sinkParallelism * 1000) - .setRangeNumber(sinkParallelism * 10) - .build(); - DataStream sorted = - TableSorter.getSorter(env, sourcePair.getLeft(), table, sortInfo).sort(); + + DataStream sorted = sortDataStream(sourcePair.getLeft(), sinkParallelism); // 2.3 write and then reorganize the committable // set parallelism to null, and it'll forward parallelism when doWrite() - RowAppendTableSink sink = new RowAppendTableSink(table, null, null, null); + FlinkWriteSink sink = getSink(); boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor(); DataStream written = sink.doWrite( @@ -167,14 +212,7 @@ protected List> buildCompactOperator( "Rewrite cluster committable", new CommittableTypeInfo(), new RewriteIncrementalClusterCommittableOperator( - table, - compactUnits.entrySet().stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, - unit -> - unit.getValue() - .outputLevel())))) + table, outputLevels)) .setParallelism(written.getParallelism()); List> dataStreams = new ArrayList<>(); @@ -184,11 +222,43 @@ protected List> buildCompactOperator( } protected void buildCommitOperator(List> dataStreams) { - RowAppendTableSink sink = new RowAppendTableSink(table, null, null, null); + FlinkWriteSink sink = getSink(); DataStream dataStream = dataStreams.get(0); for (int i = 1; i < dataStreams.size(); i++) { dataStream = dataStream.union(dataStreams.get(i)); } sink.doCommit(dataStream, commitUser); } + + protected DataStream sortDataStream( + DataStream input, Integer sinkParallelism) { + TableSortInfo sortInfo = + new TableSortInfo.Builder() + .setSortColumns(clusterManager.clusterKeys()) + .setSortStrategy(clusterManager.clusterCurve()) + .setSinkParallelism(sinkParallelism) + .setLocalSampleSize(sinkParallelism * localSampleMagnification) + .setGlobalSampleSize(sinkParallelism * 1000) + .setRangeNumber(sinkParallelism * 10) + .build(); + switch (bucketMode) { + case HASH_FIXED: + return TableSorter.getSorter(env, input, table, sortInfo).sortInLocal(); + case BUCKET_UNAWARE: + return TableSorter.getSorter(env, input, table, sortInfo).sort(); + default: + throw new UnsupportedOperationException("Unsupported bucket mode: " + bucketMode); + } + } + + protected FlinkWriteSink getSink() { + switch (bucketMode) { + case HASH_FIXED: + return new FixedBucketSink(table, null, null); + case BUCKET_UNAWARE: + return new RowAppendTableSink(table, null, null, null); + default: + throw new UnsupportedOperationException("Unsupported bucket mode: " + bucketMode); + } + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/HilbertSorter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/HilbertSorter.java index 26c698f5ed71..7c96f018871b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/HilbertSorter.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/HilbertSorter.java @@ -61,7 +61,12 @@ public HilbertSorter( @Override public DataStream sort() { - return sortStreamByHilbert(origin, table); + return sortStreamByHilbert(origin, table, true); + } + + @Override + public DataStream sortInLocal() { + return sortStreamByHilbert(origin, table, false); } /** @@ -71,7 +76,7 @@ public DataStream sort() { * @return the sorted data stream */ private DataStream sortStreamByHilbert( - DataStream inputStream, FileStoreTable table) { + DataStream inputStream, FileStoreTable table, boolean isGlobalSort) { final HilbertIndexer hilbertIndexer = new HilbertIndexer(table.rowType(), orderColNames); return SortUtils.sortStreamByKey( inputStream, @@ -102,6 +107,7 @@ public byte[] apply(RowData value) { } }, GenericRow::of, - tableSortInfo); + tableSortInfo, + isGlobalSort); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java index 406195b6fce6..171338160059 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java @@ -49,13 +49,23 @@ public OrderSorter( @Override public DataStream sort() { + return sortStreamByOrder(origin, table, true); + } + + @Override + public DataStream sortInLocal() { + return sortStreamByOrder(origin, table, false); + } + + private DataStream sortStreamByOrder( + DataStream inputStream, FileStoreTable table, boolean isGlobalSort) { final RowType valueRowType = table.rowType(); final int[] keyProjectionMap = table.schema().projection(orderColNames); final RowType keyRowType = addKeyNamePrefix(Projection.of(keyProjectionMap).project(valueRowType)); return SortUtils.sortStreamByKey( - origin, + inputStream, table, keyRowType, InternalTypeInfo.fromRowType(keyRowType), @@ -77,6 +87,7 @@ public InternalRow apply(RowData value) { } }, row -> row, - tableSortInfo); + tableSortInfo, + isGlobalSort); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java index db2e23a172e3..330e441f00bf 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java @@ -78,6 +78,7 @@ public class SortUtils { * @param convertor convert the `KEY` to the sort key, then we can sort in * `BinaryExternalSortBuffer`. * @param tableSortInfo the necessary info of table sort. + * @param isGlobalSort whether to do global sort * @return the global sorted data stream * @param the KEY type in range shuffle */ @@ -89,16 +90,47 @@ public static DataStream sortStreamByKey( final SerializableSupplier> shuffleKeyComparator, final KeyAbstract shuffleKeyAbstract, final ShuffleKeyConvertor convertor, - final TableSortInfo tableSortInfo) { + final TableSortInfo tableSortInfo, + final boolean isGlobalSort) { + return isGlobalSort + ? sortStreamByKeyInGlobal( + inputStream, + table, + sortKeyType, + keyTypeInformation, + shuffleKeyComparator, + shuffleKeyAbstract, + convertor, + tableSortInfo) + : sortStreamByKeyInLocal( + inputStream, + table, + sortKeyType, + keyTypeInformation, + shuffleKeyAbstract, + convertor, + tableSortInfo); + } + public static DataStream sortStreamByKeyInGlobal( + final DataStream inputStream, + final FileStoreTable table, + final RowType sortKeyType, + final TypeInformation keyTypeInformation, + final SerializableSupplier> shuffleKeyComparator, + final KeyAbstract shuffleKeyAbstract, + final ShuffleKeyConvertor convertor, + final TableSortInfo tableSortInfo) { CoreOptions options = table.coreOptions(); final RowType valueRowType = table.rowType(); final int sinkParallelism = tableSortInfo.getSinkParallelism(); + DataStream> inputWithKey = + generateSortKey(inputStream, keyTypeInformation, shuffleKeyAbstract); + DataStream> rangeShuffleResult = rangeShuffle( - inputStream, - shuffleKeyAbstract, + inputWithKey, keyTypeInformation, shuffleKeyComparator, tableSortInfo, @@ -121,9 +153,67 @@ public static DataStream sortStreamByKey( } } - public static DataStream> rangeShuffle( + public static DataStream sortStreamByKeyInLocal( final DataStream inputStream, + final FileStoreTable table, + final RowType sortKeyType, + final TypeInformation keyTypeInformation, final KeyAbstract shuffleKeyAbstract, + final ShuffleKeyConvertor convertor, + final TableSortInfo tableSortInfo) { + CoreOptions options = table.coreOptions(); + final RowType valueRowType = table.rowType(); + final int sinkParallelism = tableSortInfo.getSinkParallelism(); + + DataStream> inputWithKey = + generateSortKey(inputStream, keyTypeInformation, shuffleKeyAbstract); + + return localSort( + inputWithKey, + convertor, + sortKeyType, + valueRowType, + inputStream.getType(), + options, + sinkParallelism); + } + + public static DataStream> generateSortKey( + final DataStream inputStream, + final TypeInformation keyTypeInformation, + final KeyAbstract shuffleKeyAbstract) { + // generate the KEY as the key of Pair. + return inputStream + .map( + new RichMapFunction>() { + + /** + * Do not annotate with @override here to maintain + * compatibility with Flink 1.18-. + */ + public void open(OpenContext openContext) throws Exception { + open(new Configuration()); + } + + /** + * Do not annotate with @override here to maintain + * compatibility with Flink 2.0+. + */ + public void open(Configuration parameters) throws Exception { + shuffleKeyAbstract.open(); + } + + @Override + public Tuple2 map(RowData value) { + return Tuple2.of(shuffleKeyAbstract.apply(value), value); + } + }, + new TupleTypeInfo<>(keyTypeInformation, inputStream.getType())) + .setParallelism(inputStream.getParallelism()); + } + + public static DataStream> rangeShuffle( + final DataStream> inputWithKey, final TypeInformation keyTypeInformation, final SerializableSupplier> shuffleKeyComparator, final TableSortInfo tableSortInfo, @@ -135,36 +225,6 @@ public static DataStream> rangeShuffle( final int globalSampleSize = tableSortInfo.getGlobalSampleSize(); final int rangeNum = tableSortInfo.getRangeNumber(); - // generate the KEY as the key of Pair. - DataStream> inputWithKey = - inputStream - .map( - new RichMapFunction>() { - - /** - * Do not annotate with @override here to maintain - * compatibility with Flink 1.18-. - */ - public void open(OpenContext openContext) throws Exception { - open(new Configuration()); - } - - /** - * Do not annotate with @override here to maintain - * compatibility with Flink 2.0+. - */ - public void open(Configuration parameters) throws Exception { - shuffleKeyAbstract.open(); - } - - @Override - public Tuple2 map(RowData value) { - return Tuple2.of(shuffleKeyAbstract.apply(value), value); - } - }, - new TupleTypeInfo<>(keyTypeInformation, inputStream.getType())) - .setParallelism(inputStream.getParallelism()); - // range shuffle by key return RangeShuffle.rangeShuffleByKey( inputWithKey, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java index 1d51ca4ebee6..c152090d4b2e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java @@ -67,6 +67,8 @@ private void checkColNames() { public abstract DataStream sort(); + public abstract DataStream sortInLocal(); + public static TableSorter getSorter( StreamExecutionEnvironment batchTEnv, DataStream origin, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java index fad66f364731..9e932be38d44 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java @@ -61,7 +61,12 @@ public ZorderSorter( @Override public DataStream sort() { - return sortStreamByZOrder(origin, table); + return sortStreamByZOrder(origin, table, true); + } + + @Override + public DataStream sortInLocal() { + return sortStreamByZOrder(origin, table, false); } /** @@ -71,7 +76,7 @@ public DataStream sort() { * @return the sorted data stream */ private DataStream sortStreamByZOrder( - DataStream inputStream, FileStoreTable table) { + DataStream inputStream, FileStoreTable table, boolean isGlobalSort) { final ZIndexer zIndexer = new ZIndexer(table.rowType(), orderColNames, table.coreOptions().varTypeSize()); return SortUtils.sortStreamByKey( @@ -105,6 +110,7 @@ public byte[] apply(RowData value) { } }, GenericRow::of, - sortInfo); + sortInfo, + isGlobalSort); } } From d2632a02af5927d5a433fac2ac16066cbfa49b07 Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Mon, 22 Dec 2025 19:02:33 +0800 Subject: [PATCH 3/8] minor fix --- .../cluster/IncrementalClusterManager.java | 7 +++---- .../paimon/schema/SchemaValidation.java | 9 +++++---- .../paimon/flink/action/CompactAction.java | 11 ++++++++++- .../IncrementalClusterSplitSource.java | 4 ++-- .../RemoveClusterBeforeFilesOperator.java | 19 ++++++++++++------- .../compact/IncrementalClusterCompact.java | 4 +++- 6 files changed, 35 insertions(+), 19 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java index f4d49bbcd779..e7e2f4b3e62b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java @@ -84,8 +84,7 @@ public IncrementalClusterManager(FileStoreTable table) { public IncrementalClusterManager( FileStoreTable table, @Nullable PartitionPredicate specifiedPartitions) { checkArgument( - table.bucketMode() == BucketMode.BUCKET_UNAWARE, - "only append unaware-bucket table support incremental clustering."); + table.primaryKeys().isEmpty(), "only append table support incremental clustering."); this.table = table; CoreOptions options = table.coreOptions(); checkArgument( @@ -232,7 +231,7 @@ public static Map>> groupByPtAndBucke if (bucketMode == BucketMode.HASH_FIXED) { checkArgument( !dvEnabled, - "Deletion vector is not supported for clustering hash fixed bucket table"); + "Clustering is not supported for fixed-bucket table enabled deletion-vector currently."); } for (BinaryRow partition : compactUnits.keySet()) { @@ -241,7 +240,7 @@ public static Map>> groupByPtAndBucke for (Integer bucket : bucketUnits.keySet()) { CompactUnit unit = bucketUnits.get(bucket); BaseAppendDeleteFileMaintainer dvMaintainer = - getDvMaintainer(bucketMode, partition, bucket); + dvEnabled ? getDvMaintainer(bucketMode, partition, bucket) : null; List splits = new ArrayList<>(); DataSplit.Builder builder = diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 56176f2b83dd..37e9723008ea 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -669,14 +669,15 @@ private static void validateRowTracking(TableSchema schema, CoreOptions options) private static void validateIncrementalClustering(TableSchema schema, CoreOptions options) { if (options.clusteringIncrementalEnabled()) { - checkArgument( - options.bucket() == -1, - "Cannot define %s for incremental clustering table, it only support bucket = -1", - CoreOptions.BUCKET.key()); checkArgument( schema.primaryKeys().isEmpty(), "Cannot define %s for incremental clustering table.", PRIMARY_KEY.key()); + if (options.bucket() != -1) { + checkArgument( + !options.deletionVectorsEnabled(), + "Cannot enable deletion-vectors for incremental clustering table which bucket is not -1."); + } } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java index 2e4442d9d741..ce24dd2319b7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java @@ -154,7 +154,16 @@ protected boolean buildImpl() throws Exception { buildForAppendTableCompact(env, fileStoreTable, isStreaming); } } else { - buildForBucketedTableCompact(env, fileStoreTable, isStreaming); + if (fileStoreTable.primaryKeys().isEmpty() + && fileStoreTable.bucketMode() == BucketMode.HASH_FIXED + && !fileStoreTable.coreOptions().bucketAppendOrdered() + && fileStoreTable.coreOptions().clusteringIncrementalEnabled()) { + new IncrementalClusterCompact( + env, fileStoreTable, partitionPredicate, fullCompaction) + .build(); + } else { + buildForBucketedTableCompact(env, fileStoreTable, isStreaming); + } } return true; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java index 3b362c8c4da4..e9f824db2b1b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java @@ -93,7 +93,7 @@ public static Pair, DataStream> buildSource( FileStoreTable table, Map partitionSpec, List splits, - @Nullable List dvCommitMessage, + @Nullable List dvCommitMessages, @Nullable Integer parallelism) { DataStream source = env.fromSource( @@ -130,7 +130,7 @@ public static Pair, DataStream> buildSource( .transform( "Remove files to be clustered", new CommittableTypeInfo(), - new RemoveClusterBeforeFilesOperator(dvCommitMessage)) + new RemoveClusterBeforeFilesOperator(dvCommitMessages)) .forceNonParallel()); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java index 0b8beddf299b..f7f1347fcff6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java @@ -39,10 +39,10 @@ public class RemoveClusterBeforeFilesOperator extends BoundedOneInputOperator dvCommitMessage; + private final @Nullable List dvCommitMessages; - public RemoveClusterBeforeFilesOperator(@Nullable List dvCommitMessage) { - this.dvCommitMessage = dvCommitMessage; + public RemoveClusterBeforeFilesOperator(@Nullable List dvCommitMessages) { + this.dvCommitMessages = dvCommitMessages; } @Override @@ -69,10 +69,15 @@ public void endInput() throws Exception { } private void emitDvIndexCommitMessages(long checkpointId) { - if (dvCommitMessage != null) { - output.collect( - new StreamRecord<>( - new Committable(checkpointId, Committable.Kind.FILE, dvCommitMessage))); + if (dvCommitMessages != null) { + for (CommitMessage dvCommitMessage : dvCommitMessages) { + if (dvCommitMessage != null) { + output.collect( + new StreamRecord<>( + new Committable( + checkpointId, Committable.Kind.FILE, dvCommitMessage))); + } + } } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java index a3348809c416..08955fd7aea3 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java @@ -164,7 +164,9 @@ protected void buildForFixedBucket(List> dataStreams) { }); }); - buildCompactOperator(new LinkedHashMap<>(), dataSplits, dvCommitMessages, parallelism); + dataStreams.addAll( + buildCompactOperator( + new LinkedHashMap<>(), dataSplits, dvCommitMessages, parallelism)); } /** From f6c43b3ad789814ad30242ce853a2689c53d2c16 Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Mon, 22 Dec 2025 21:00:45 +0800 Subject: [PATCH 4/8] add flink test --- .../IncrementalClusterActionITCase.java | 165 ++++++++++++++++++ 1 file changed, 165 insertions(+) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java index 6c80070528ac..1538f8c083af 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java @@ -683,6 +683,171 @@ public void testClusterWithDeletionVector() throws Exception { assertThat(splits.get(0).deletionFiles().get().get(0)).isNull(); } + @Test + public void testClusterWithBucket() throws Exception { + Map dynamicOptions = commonOptions(); + dynamicOptions.put(CoreOptions.BUCKET.key(), "2"); + dynamicOptions.put(CoreOptions.BUCKET_KEY.key(), "pt"); + dynamicOptions.put(CoreOptions.BUCKET_APPEND_ORDERED.key(), "false"); + FileStoreTable table = createTable(null, dynamicOptions); + + BinaryString randomStr = BinaryString.fromString(randomString(150)); + List messages = new ArrayList<>(); + + // first write + for (int pt = 0; pt < 2; pt++) { + for (int i = 0; i < 3; i++) { + for (int j = 0; j < 3; j++) { + messages.addAll(write(GenericRow.of(i, j, randomStr, pt))); + } + } + } + commit(messages); + ReadBuilder readBuilder = table.newReadBuilder().withProjection(new int[] {0, 1, 3}); + List result1 = + getResult( + readBuilder.newRead(), + readBuilder.newScan().plan().splits(), + readBuilder.readType()); + List expected1 = new ArrayList<>(); + for (int pt = 0; pt <= 1; pt++) { + expected1.add(String.format("+I[0, 0, %s]", pt)); + expected1.add(String.format("+I[0, 1, %s]", pt)); + expected1.add(String.format("+I[0, 2, %s]", pt)); + expected1.add(String.format("+I[1, 0, %s]", pt)); + expected1.add(String.format("+I[1, 1, %s]", pt)); + expected1.add(String.format("+I[1, 2, %s]", pt)); + expected1.add(String.format("+I[2, 0, %s]", pt)); + expected1.add(String.format("+I[2, 1, %s]", pt)); + expected1.add(String.format("+I[2, 2, %s]", pt)); + } + assertThat(result1).containsExactlyElementsOf(expected1); + + // first cluster + runAction(Collections.emptyList()); + checkSnapshot(table); + List splits = readBuilder.newScan().plan().splits(); + assertThat(splits.size()).isEqualTo(2); + assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(1); + assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5); + List result2 = getResult(readBuilder.newRead(), splits, readBuilder.readType()); + List expected2 = new ArrayList<>(); + for (int pt = 1; pt >= 0; pt--) { + expected2.add(String.format("+I[0, 0, %s]", pt)); + expected2.add(String.format("+I[0, 1, %s]", pt)); + expected2.add(String.format("+I[1, 0, %s]", pt)); + expected2.add(String.format("+I[1, 1, %s]", pt)); + expected2.add(String.format("+I[0, 2, %s]", pt)); + expected2.add(String.format("+I[1, 2, %s]", pt)); + expected2.add(String.format("+I[2, 0, %s]", pt)); + expected2.add(String.format("+I[2, 1, %s]", pt)); + expected2.add(String.format("+I[2, 2, %s]", pt)); + } + assertThat(result2).containsExactlyElementsOf(expected2); + + // second write + messages.clear(); + for (int pt = 0; pt <= 1; pt++) { + messages.addAll( + write( + GenericRow.of(0, 3, null, pt), + GenericRow.of(1, 3, null, pt), + GenericRow.of(2, 3, null, pt))); + messages.addAll( + write( + GenericRow.of(3, 0, null, pt), + GenericRow.of(3, 1, null, pt), + GenericRow.of(3, 2, null, pt), + GenericRow.of(3, 3, null, pt))); + } + commit(messages); + + List result3 = + getResult( + readBuilder.newRead(), + readBuilder.newScan().plan().splits(), + readBuilder.readType()); + List expected3 = new ArrayList<>(); + for (int pt = 1; pt >= 0; pt--) { + expected3.add(String.format("+I[0, 0, %s]", pt)); + expected3.add(String.format("+I[0, 1, %s]", pt)); + expected3.add(String.format("+I[1, 0, %s]", pt)); + expected3.add(String.format("+I[1, 1, %s]", pt)); + expected3.add(String.format("+I[0, 2, %s]", pt)); + expected3.add(String.format("+I[1, 2, %s]", pt)); + expected3.add(String.format("+I[2, 0, %s]", pt)); + expected3.add(String.format("+I[2, 1, %s]", pt)); + expected3.add(String.format("+I[2, 2, %s]", pt)); + expected3.add(String.format("+I[0, 3, %s]", pt)); + expected3.add(String.format("+I[1, 3, %s]", pt)); + expected3.add(String.format("+I[2, 3, %s]", pt)); + expected3.add(String.format("+I[3, 0, %s]", pt)); + expected3.add(String.format("+I[3, 1, %s]", pt)); + expected3.add(String.format("+I[3, 2, %s]", pt)); + expected3.add(String.format("+I[3, 3, %s]", pt)); + } + assertThat(result3).containsExactlyElementsOf(expected3); + + // second cluster(incremental) + runAction(Collections.emptyList()); + checkSnapshot(table); + splits = readBuilder.newScan().plan().splits(); + List result4 = getResult(readBuilder.newRead(), splits, readBuilder.readType()); + List expected4 = new ArrayList<>(); + for (int pt = 1; pt >= 0; pt--) { + expected4.add(String.format("+I[0, 0, %s]", pt)); + expected4.add(String.format("+I[0, 1, %s]", pt)); + expected4.add(String.format("+I[1, 0, %s]", pt)); + expected4.add(String.format("+I[1, 1, %s]", pt)); + expected4.add(String.format("+I[0, 2, %s]", pt)); + expected4.add(String.format("+I[1, 2, %s]", pt)); + expected4.add(String.format("+I[2, 0, %s]", pt)); + expected4.add(String.format("+I[2, 1, %s]", pt)); + expected4.add(String.format("+I[2, 2, %s]", pt)); + expected4.add(String.format("+I[0, 3, %s]", pt)); + expected4.add(String.format("+I[1, 3, %s]", pt)); + expected4.add(String.format("+I[3, 0, %s]", pt)); + expected4.add(String.format("+I[3, 1, %s]", pt)); + expected4.add(String.format("+I[2, 3, %s]", pt)); + expected4.add(String.format("+I[3, 2, %s]", pt)); + expected4.add(String.format("+I[3, 3, %s]", pt)); + } + assertThat(splits.size()).isEqualTo(2); + assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(2); + assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5); + assertThat(((DataSplit) splits.get(0)).dataFiles().get(1).level()).isEqualTo(4); + assertThat(result4).containsExactlyElementsOf(expected4); + + // full cluster + runAction(Lists.newArrayList("--compact_strategy", "full")); + checkSnapshot(table); + splits = readBuilder.newScan().plan().splits(); + List result5 = getResult(readBuilder.newRead(), splits, readBuilder.readType()); + List expected5 = new ArrayList<>(); + for (int pt = 1; pt >= 0; pt--) { + expected5.add(String.format("+I[0, 0, %s]", pt)); + expected5.add(String.format("+I[0, 1, %s]", pt)); + expected5.add(String.format("+I[1, 0, %s]", pt)); + expected5.add(String.format("+I[1, 1, %s]", pt)); + expected5.add(String.format("+I[0, 2, %s]", pt)); + expected5.add(String.format("+I[0, 3, %s]", pt)); + expected5.add(String.format("+I[1, 2, %s]", pt)); + expected5.add(String.format("+I[1, 3, %s]", pt)); + expected5.add(String.format("+I[2, 0, %s]", pt)); + expected5.add(String.format("+I[2, 1, %s]", pt)); + expected5.add(String.format("+I[3, 0, %s]", pt)); + expected5.add(String.format("+I[3, 1, %s]", pt)); + expected5.add(String.format("+I[2, 2, %s]", pt)); + expected5.add(String.format("+I[2, 3, %s]", pt)); + expected5.add(String.format("+I[3, 2, %s]", pt)); + expected5.add(String.format("+I[3, 3, %s]", pt)); + } + assertThat(splits.size()).isEqualTo(2); + assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(1); + assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5); + assertThat(result5).containsExactlyElementsOf(expected5); + } + protected FileStoreTable createTable(String partitionKeys) throws Exception { return createTable(partitionKeys, commonOptions()); } From a6d3ec5b1b52795b87f0856a6ac6890b2fd53dcb Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Tue, 23 Dec 2025 17:09:35 +0800 Subject: [PATCH 5/8] change for spark --- .../spark/procedure/CompactProcedure.java | 115 +++++++++++------- 1 file changed, 71 insertions(+), 44 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java index d3e10d68f9fd..4f780895789f 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java @@ -637,11 +637,15 @@ private void clusterIncrementalUnAwareBucketTable( DataSourceV2Relation relation) { IncrementalClusterManager incrementalClusterManager = new IncrementalClusterManager(table, partitionPredicate); - Map compactUnits = + checkArgument( + table.bucketMode() != BucketMode.HASH_FIXED, + "Clustering for bucketed table is not supported for spark currently."); + Map> compactUnits = incrementalClusterManager.createCompactUnits(fullCompaction); - Map, CommitMessage>> partitionSplits = - incrementalClusterManager.toSplitsAndRewriteDvFiles(compactUnits); + Map, CommitMessage>>> partitionSplits = + incrementalClusterManager.toSplitsAndRewriteDvFiles( + compactUnits, table.bucketMode()); // sort in partition TableSorter sorter = @@ -656,7 +660,13 @@ private void clusterIncrementalUnAwareBucketTable( Dataset datasetForWrite = partitionSplits.values().stream() - .map(Pair::getKey) + .map( + bucketEntry -> { + checkArgument( + bucketEntry.size() == 1, + "Unaware-bucket table should only have one bucket."); + return bucketEntry.values().iterator().next().getLeft(); + }) .map( splits -> { Dataset dataset = @@ -679,53 +689,70 @@ private void clusterIncrementalUnAwareBucketTable( } // re-organize the commit messages to generate the compact messages - Map> partitionClustered = new HashMap<>(); + Map>> partitionClustered = new HashMap<>(); for (CommitMessage commitMessage : JavaConverters.seqAsJavaList(commitMessages)) { - checkArgument(commitMessage.bucket() == 0); - partitionClustered - .computeIfAbsent(commitMessage.partition(), k -> new ArrayList<>()) + BinaryRow partition = commitMessage.partition(); + int bucket = commitMessage.bucket(); + checkArgument(bucket == 0); + + Map> bucketFiles = partitionClustered.get(partition); + if (bucketFiles == null) { + bucketFiles = new HashMap<>(); + partitionClustered.put(partition.copy(), bucketFiles); + } + bucketFiles + .computeIfAbsent(bucket, file -> new ArrayList<>()) .addAll(((CommitMessageImpl) commitMessage).newFilesIncrement().newFiles()); + partitionClustered.put(partition, bucketFiles); } List clusterMessages = new ArrayList<>(); - for (Map.Entry> entry : partitionClustered.entrySet()) { + for (Map.Entry>> entry : + partitionClustered.entrySet()) { BinaryRow partition = entry.getKey(); - CommitMessageImpl dvCommitMessage = - (CommitMessageImpl) partitionSplits.get(partition).getValue(); - List clusterBefore = compactUnits.get(partition).files(); - // upgrade the clustered file to outputLevel - List clusterAfter = - IncrementalClusterManager.upgrade( - entry.getValue(), compactUnits.get(partition).outputLevel()); - LOG.info( - "Partition {}: upgrade file level to {}", - partition, - compactUnits.get(partition).outputLevel()); - - List newIndexFiles = new ArrayList<>(); - List deletedIndexFiles = new ArrayList<>(); - if (dvCommitMessage != null) { - newIndexFiles = dvCommitMessage.compactIncrement().newIndexFiles(); - deletedIndexFiles = dvCommitMessage.compactIncrement().deletedIndexFiles(); - } + Map> bucketEntries = entry.getValue(); + Map, CommitMessage>> bucketSplits = + partitionSplits.get(partition); + Map bucketUnits = compactUnits.get(partition); + for (Map.Entry> bucketEntry : + bucketEntries.entrySet()) { + int bucket = bucketEntry.getKey(); + CommitMessageImpl dvCommitMessage = + (CommitMessageImpl) bucketSplits.get(bucket).getValue(); + List clusterBefore = bucketUnits.get(bucket).files(); + // upgrade the clustered file to outputLevel + List clusterAfter = + IncrementalClusterManager.upgrade( + bucketEntry.getValue(), bucketUnits.get(bucket).outputLevel()); + LOG.info( + "Partition {}, bucket {}: upgrade file level to {}", + partition, + bucket, + bucketUnits.get(bucket).outputLevel()); + + List newIndexFiles = new ArrayList<>(); + List deletedIndexFiles = new ArrayList<>(); + if (dvCommitMessage != null) { + newIndexFiles = dvCommitMessage.compactIncrement().newIndexFiles(); + deletedIndexFiles = dvCommitMessage.compactIncrement().deletedIndexFiles(); + } - // get the dv index messages - CompactIncrement compactIncrement = - new CompactIncrement( - clusterBefore, - clusterAfter, - Collections.emptyList(), - newIndexFiles, - deletedIndexFiles); - clusterMessages.add( - new CommitMessageImpl( - partition, - // bucket 0 is bucket for unaware-bucket table - // for compatibility with the old design - 0, - table.coreOptions().bucket(), - DataIncrement.emptyIncrement(), - compactIncrement)); + // get the dv index messages + CompactIncrement compactIncrement = + new CompactIncrement( + clusterBefore, + clusterAfter, + Collections.emptyList(), + newIndexFiles, + deletedIndexFiles); + clusterMessages.add( + new CommitMessageImpl( + partition, + bucket, + table.coreOptions().bucket(), + DataIncrement.emptyIncrement(), + compactIncrement)); + } } if (LOG.isDebugEnabled()) { LOG.debug("Commit messages after reorganizing:{}", clusterMessages); From 119ef8edd7164a97a8ef51d40ecefa6bd4efdb56 Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Tue, 23 Dec 2025 17:19:21 +0800 Subject: [PATCH 6/8] fix --- .../paimon/append/cluster/IncrementalClusterManagerTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java index 635dbfe57c6c..0fd3e5256749 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java @@ -58,15 +58,16 @@ public class IncrementalClusterManagerTest { @TempDir java.nio.file.Path tempDir; @Test - public void testNonUnAwareBucketTable() { + public void testCreateClusterTable() { Map options = new HashMap<>(); options.put(CoreOptions.BUCKET.key(), "1"); options.put(CoreOptions.BUCKET_KEY.key(), "f0"); + options.put(CoreOptions.DELETION_VECTORS_ENABLED.key(), "true"); assertThatThrownBy(() -> createTable(options, Collections.emptyList())) .isInstanceOf(IllegalArgumentException.class) .hasMessageContaining( - "Cannot define bucket for incremental clustering table, it only support bucket = -1"); + "Cannot enable deletion-vectors for incremental clustering table which bucket is not -1."); } @Test From 2e82e2a30fab590a620219c13792c50f1adbb416 Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Tue, 23 Dec 2025 17:33:25 +0800 Subject: [PATCH 7/8] fix for bucket-append-ordered --- .../paimon/append/cluster/IncrementalClusterManager.java | 6 ++++++ .../java/org/apache/paimon/schema/SchemaValidation.java | 4 ++++ .../append/cluster/IncrementalClusterManagerTest.java | 7 ++++++- 3 files changed, 16 insertions(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java index e7e2f4b3e62b..3688788b1385 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java @@ -85,6 +85,12 @@ public IncrementalClusterManager( FileStoreTable table, @Nullable PartitionPredicate specifiedPartitions) { checkArgument( table.primaryKeys().isEmpty(), "only append table support incremental clustering."); + if (table.bucketMode() == BucketMode.HASH_FIXED) { + checkArgument( + !table.coreOptions().bucketAppendOrdered(), + "%s must be false for incremental clustering table.", + CoreOptions.BUCKET_APPEND_ORDERED.key()); + } this.table = table; CoreOptions options = table.coreOptions(); checkArgument( diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 37e9723008ea..602923f7227f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -674,6 +674,10 @@ private static void validateIncrementalClustering(TableSchema schema, CoreOption "Cannot define %s for incremental clustering table.", PRIMARY_KEY.key()); if (options.bucket() != -1) { + checkArgument( + !options.bucketAppendOrdered(), + "%s must be false for incremental clustering table.", + CoreOptions.BUCKET_APPEND_ORDERED.key()); checkArgument( !options.deletionVectorsEnabled(), "Cannot enable deletion-vectors for incremental clustering table which bucket is not -1."); diff --git a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java index 0fd3e5256749..36867a35bfa9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java @@ -62,8 +62,13 @@ public void testCreateClusterTable() { Map options = new HashMap<>(); options.put(CoreOptions.BUCKET.key(), "1"); options.put(CoreOptions.BUCKET_KEY.key(), "f0"); - options.put(CoreOptions.DELETION_VECTORS_ENABLED.key(), "true"); + assertThatThrownBy(() -> createTable(options, Collections.emptyList())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "bucket-append-ordered must be false for incremental clustering table."); + options.put(CoreOptions.BUCKET_APPEND_ORDERED.key(), "false"); + options.put(CoreOptions.DELETION_VECTORS_ENABLED.key(), "true"); assertThatThrownBy(() -> createTable(options, Collections.emptyList())) .isInstanceOf(IllegalArgumentException.class) .hasMessageContaining( From 692346a90c0c13c55010a1b079a5220c61d3da0c Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Tue, 23 Dec 2025 19:58:47 +0800 Subject: [PATCH 8/8] add docs --- .../append-table/incremental-clustering.md | 37 ++++++++++++++++--- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/docs/content/append-table/incremental-clustering.md b/docs/content/append-table/incremental-clustering.md index 0ca3462e1682..be247acb2ee2 100644 --- a/docs/content/append-table/incremental-clustering.md +++ b/docs/content/append-table/incremental-clustering.md @@ -39,12 +39,15 @@ gradually converges to an optimal state, significantly reducing the decision-mak Incremental Clustering supports: -- Support incremental clustering; minimizing write amplification as possible. -- Support small-file compaction; during rewrites, respect target-file-size. -- Support changing clustering keys; newly ingested data is clustered according to the latest clustering keys. -- Provide a full mode; when selected, the entire dataset will be reclustered. +- Support incremental clustering: minimizing write amplification as possible. +- Support small-file compaction: during rewrites, respect target-file-size. +- Support changing clustering keys: newly ingested data is clustered according to the latest clustering keys. +- Provide a full mode: when in full mode, the entire dataset will be reclustered. -**Only append unaware-bucket table supports Incremental Clustering.** + +**Note**: If data ordering within bucketed tables is not a concern, you can set `'bucket-append-ordered'` to `false` +to disable this ordering requirement. This allows you to enable Incremental Clustering for bucketed tables, +which can significantly improve query performance for specific query keys. ## Enable Incremental Clustering @@ -85,6 +88,30 @@ To enable Incremental Clustering, the following configuration needs to be set fo +For bucketed tables, you also need to set the following configuration: + + + + + + + + + + + + + + + + + + + + +
OptionValueRequiredTypeDescription
bucket-append-ordered
falseYesBooleanMust be set to false to disable the ordering requirement for bucketed tables. Default is true.
+ + Once Incremental Clustering for a table is enabled, you can run Incremental Clustering in batch mode periodically to continuously optimizes data layout of the table and deliver better query performance.