Incremental Clustering #
Paimon currently supports ordering append tables using SFC (Space-Filling Curve)(see sort compact for more info). The resulting data layout typically delivers better performance for queries that target clustering keys. However, with the current SortCompaction, even when neither the data nor the clustering keys have changed, each run still rewrites the entire dataset, which is extremely costly.
To address this, Paimon introduced a more flexible, incremental clustering mechanism—Incremental Clustering. On each run, it selects only a specific subset of files to cluster, avoiding a full rewrite. This enables low-cost, sort-based optimization of the data layout and improves query performance. In addition, with Incremental Clustering, you can adjust clustering keys without rewriting existing data, the layout evolves dynamically as cluster runs and gradually converges to an optimal state, significantly reducing the decision-making complexity around data layout.
Incremental Clustering supports:
- Support incremental clustering; minimizing write amplification as possible.
- Support small-file compaction; during rewrites, respect target-file-size.
- Support changing clustering keys; newly ingested data is clustered according to the latest clustering keys.
- Provide a full mode; when selected, the entire dataset will be reclustered.
Only append unaware-bucket table supports Incremental Clustering.
Enable Incremental Clustering #
To enable Incremental Clustering, the following configuration needs to be set for the table:
| Option | Value | Required | Type | Description |
|---|---|---|---|---|
clustering.incremental |
true | Yes | Boolean | Must be set to true to enable incremental clustering. Default is false. |
clustering.columns |
'clustering-columns' | Yes | String | The clustering columns, in the format 'columnName1,columnName2'. It is not recommended to use partition keys as clustering keys. |
clustering.strategy |
'zorder' or 'hilbert' or 'order' | No | String | The ordering algorithm used for clustering. If not set, It'll decided from the number of clustering columns. 'order' is used for 1 column, 'zorder' for less than 5 columns, and 'hilbert' for 5 or more columns. |
Once Incremental Clustering for a table is enabled, you can run Incremental Clustering in batch mode periodically to continuously optimizes data layout of the table and deliver better query performance.
Note: Since common compaction also rewrites files, it may disrupt the ordered data layout built by Incremental Clustering. Therefore, when Incremental Clustering is enabled, the table no longer supports write-time compaction or dedicated compaction; clustering and small-file merging must be performed exclusively via Incremental Clustering runs.
Run Incremental Clustering #
only support running Incremental Clustering in batch mode.
To run a Incremental Clustering job, follow these instructions.
You don’t need to specify any clustering-related parameters when running Incremental Clustering, these options are already defined as table options. If you need to change clustering settings, please update the corresponding table options.
Run the following sql:
--set the write parallelism, if too big, may generate a large number of small files.
SET spark.sql.shuffle.partitions=10;
-- run incremental clustering
CALL sys.compact(table => 'T')
-- run incremental clustering with full mode, this will recluster all data
CALL sys.compact(table => 'T', compact_strategy => 'full')
Run the following command to submit a incremental clustering job for the table.
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-1.3.0.jar \
compact \
--warehouse <warehouse-path> \
--database <database-name> \
--table <table-name> \
[--compact_strategy <minor / full>] \
[--table_conf <table_conf>] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]
Example: run incremental clustering
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-1.3.0.jar \
compact \
--warehouse s3:///path/to/warehouse \
--database test_db \
--table test_table \
--table_conf sink.parallelism=2 \
--compact_strategy minor \
--catalog_conf s3.endpoint=https://****.com \
--catalog_conf s3.access-key=***** \
--catalog_conf s3.secret-key=*****
--compact_strategyDetermines how to pick files to be cluster, the default isminor.full: All files will be selected for clustered.minor: Pick the set of files that need to be clustered based on specified conditions.
Note: write parallelism is set by sink.parallelism, if too big, may generate a large number of small files.
You can use -D execution.runtime-mode=batch or -yD execution.runtime-mode=batch (for the ON-YARN scenario) to use batch mode.
Implement #
To balance write amplification and sorting effectiveness, Paimon leverages the LSM Tree notion of levels to stratify data files and uses the Universal Compaction strategy to select files for clustering.
- Newly written data lands in level-0; files in level-0 are unclustered.
- All files in level-i are produced by sorting within the same sorting set.
- By analogy with Universal Compaction: in level-0, each file is a sorted run; in level-i, all files together constitute a single sorted run. During clustering, the sorted run is the basic unit of work.
By introducing more levels, we can control the amount of data processed in each clustering run. Data at higher levels is more stably clustered and less likely to be rewritten, thereby mitigating write amplification while maintaining good sorting effectiveness.