Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 32 additions & 5 deletions docs/content/append-table/incremental-clustering.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,15 @@ gradually converges to an optimal state, significantly reducing the decision-mak


Incremental Clustering supports:
- Support incremental clustering; minimizing write amplification as possible.
- Support small-file compaction; during rewrites, respect target-file-size.
- Support changing clustering keys; newly ingested data is clustered according to the latest clustering keys.
- Provide a full mode; when selected, the entire dataset will be reclustered.
- Support incremental clustering: minimizing write amplification as possible.
- Support small-file compaction: during rewrites, respect target-file-size.
- Support changing clustering keys: newly ingested data is clustered according to the latest clustering keys.
- Provide a full mode: when in full mode, the entire dataset will be reclustered.

**Only append unaware-bucket table supports Incremental Clustering.**

**Note**: If data ordering within bucketed tables is not a concern, you can set `'bucket-append-ordered'` to `false`
to disable this ordering requirement. This allows you to enable Incremental Clustering for bucketed tables,
which can significantly improve query performance for specific query keys.

## Enable Incremental Clustering

Expand Down Expand Up @@ -85,6 +88,30 @@ To enable Incremental Clustering, the following configuration needs to be set fo

</table>

For bucketed tables, you also need to set the following configuration:
<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Option</th>
<th class="text-left" style="width: 10%">Value</th>
<th class="text-left" style="width: 5%">Required</th>
<th class="text-left" style="width: 10%">Type</th>
<th class="text-left" style="width: 55%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>bucket-append-ordered</h5></td>
<td>false</td>
<td style="word-wrap: break-word;">Yes</td>
<td>Boolean</td>
<td>Must be set to false to disable the ordering requirement for bucketed tables. Default is true.</td>
</tr>
</tbody>

</table>


Once Incremental Clustering for a table is enabled, you can run Incremental Clustering in batch mode periodically
to continuously optimizes data layout of the table and deliver better query performance.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,17 @@
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import static org.apache.paimon.append.cluster.IncrementalClusterManager.constructPartitionLevels;
import static org.apache.paimon.append.cluster.IncrementalClusterManager.logForPartitionLevel;
import static org.apache.paimon.append.cluster.IncrementalClusterManager.constructBucketLevels;
import static org.apache.paimon.append.cluster.IncrementalClusterManager.groupByPtAndBucket;
import static org.apache.paimon.append.cluster.IncrementalClusterManager.logForCompactUnits;
import static org.apache.paimon.append.cluster.IncrementalClusterManager.logForLevels;

/** Handle historical partition for full clustering. */
public class HistoryPartitionCluster {
Expand Down Expand Up @@ -111,23 +112,39 @@ public static HistoryPartitionCluster create(
limit);
}

public Map<BinaryRow, CompactUnit> pickForHistoryPartitions() {
Map<BinaryRow, List<LevelSortedRun>> partitionLevels =
public Map<BinaryRow, Map<Integer, CompactUnit>> createHistoryCompactUnits() {
Map<BinaryRow, Map<Integer, List<LevelSortedRun>>> partitionLevels =
constructLevelsForHistoryPartitions();
logForPartitionLevel(partitionLevels, partitionComputer);
logForLevels(partitionLevels, partitionComputer);

Map<BinaryRow, CompactUnit> units = new HashMap<>();
Map<BinaryRow, Map<Integer, CompactUnit>> units = new HashMap<>();
partitionLevels.forEach(
(k, v) -> {
Optional<CompactUnit> pick =
incrementalClusterStrategy.pick(maxLevel + 1, v, true);
pick.ifPresent(compactUnit -> units.put(k, compactUnit));
(partition, bucketLevels) -> {
Map<Integer, CompactUnit> bucketUnits = new HashMap<>();
bucketLevels.forEach(
(bucket, levels) -> {
Optional<CompactUnit> pick =
incrementalClusterStrategy.pick(maxLevel + 1, levels, true);
pick.ifPresent(
compactUnit -> {
bucketUnits.put(bucket, compactUnit);
if (LOG.isDebugEnabled()) {
logForCompactUnits(
partitionComputer.generatePartValues(
partition),
bucket,
compactUnit);
}
});
});
units.put(partition, bucketUnits);
});
return units;
}

@VisibleForTesting
public Map<BinaryRow, List<LevelSortedRun>> constructLevelsForHistoryPartitions() {
public Map<BinaryRow, Map<Integer, List<LevelSortedRun>>>
constructLevelsForHistoryPartitions() {
long historyMilli =
LocalDateTime.now()
.minus(historyPartitionIdleTime)
Expand All @@ -152,23 +169,19 @@ public Map<BinaryRow, List<LevelSortedRun>> constructLevelsForHistoryPartitions(
.read()
.dataSplits();

Map<BinaryRow, List<DataFileMeta>> historyPartitionFiles = new HashMap<>();
for (DataSplit dataSplit : historyDataSplits) {
historyPartitionFiles
.computeIfAbsent(dataSplit.partition(), k -> new ArrayList<>())
.addAll(dataSplit.dataFiles());
}
Map<BinaryRow, Map<Integer, List<DataFileMeta>>> historyPartitionFiles =
groupByPtAndBucket(historyDataSplits);

return filterPartitions(historyPartitionFiles).entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
entry -> constructPartitionLevels(entry.getValue())));
entry -> constructBucketLevels(entry.getValue())));
}

private Map<BinaryRow, List<DataFileMeta>> filterPartitions(
Map<BinaryRow, List<DataFileMeta>> partitionFiles) {
Map<BinaryRow, List<DataFileMeta>> result = new HashMap<>();
private Map<BinaryRow, Map<Integer, List<DataFileMeta>>> filterPartitions(
Map<BinaryRow, Map<Integer, List<DataFileMeta>>> partitionFiles) {
Map<BinaryRow, Map<Integer, List<DataFileMeta>>> result = new HashMap<>();
partitionFiles.forEach(
(part, files) -> {
if (specifiedPartitions.test(part)) {
Expand Down
Loading