Dedicated Compaction

Dedicated Compaction #

Paimon’s snapshot management supports writing with multiple writers.

For S3-like object store, its 'RENAME' does not have atomic semantic. We need to configure Hive metastore and enable 'lock.enabled' option for the catalog.

By default, Paimon supports concurrent writing to different partitions. A recommended mode is that streaming job writes records to Paimon’s latest partition; Simultaneously batch job (overwrite) writes records to the historical partition.

So far, everything works very well, but if you need multiple writers to write records to the same partition, it will be a bit more complicated. For example, you don’t want to use UNION ALL, you have multiple streaming jobs to write records to a 'partial-update' table. Please refer to the 'Dedicated Compaction Job' below.

Dedicated Compaction Job #

By default, Paimon writers will perform compaction as needed during writing records. This is sufficient for most use cases.

Compaction will mark some data files as “deleted” (not really deleted, see expiring snapshots for more info). If multiple writers mark the same file, a conflict will occur when committing the changes. Paimon will automatically resolve the conflict, but this may result in job restarts.

To avoid these downsides, users can also choose to skip compactions in writers, and run a dedicated job only for compaction. As compactions are performed only by the dedicated job, writers can continuously write records without pausing and no conflicts will ever occur.

To skip compactions in writers, set the following table property to true.

Option Required Default Type Description
write-only
No false Boolean If set to true, compactions and snapshot expiration will be skipped. This option is used along with dedicated compact jobs.

To run a dedicated job for compaction, follow these instructions.

Flink SQL currently does not support statements related to compactions, so we have to submit the compaction job through flink run.

Run the following command to submit a compaction job for the table.

<FLINK_HOME>/bin/flink run \
    /path/to/paimon-flink-action-0.8.2.jar \
    compact \
    --warehouse <warehouse-path> \
    --database <database-name> \ 
    --table <table-name> \
    [--partition <partition-name>] \
    [--table_conf <table_conf>] \
    [--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]

Example: compact table

<FLINK_HOME>/bin/flink run \
    /path/to/paimon-flink-action-0.8.2.jar \
    compact \
    --warehouse s3:///path/to/warehouse \
    --database test_db \
    --table test_table \
    --partition dt=20221126,hh=08 \
    --partition dt=20221127,hh=09 \
    --table_conf sink.parallelism=10 \
    --catalog_conf s3.endpoint=https://****.com \
    --catalog_conf s3.access-key=***** \
    --catalog_conf s3.secret-key=*****

You can use -D execution.runtime-mode=batch or -yD execution.runtime-mode=batch (for the ON-YARN scenario) to control batch or streaming mode. If you submit a batch job, all current table files will be compacted. If you submit a streaming job, the job will continuously monitor new changes to the table and perform compactions as needed.

For more usage of the compact action, see

<FLINK_HOME>/bin/flink run \
    /path/to/paimon-flink-action-0.8.2.jar \
    compact --help
Similarly, the default is synchronous compaction, which may cause checkpoint timeouts. You can configure table_conf to use Asynchronous Compaction.

Database Compaction Job #

You can run the following command to submit a compaction job for multiple database.

<FLINK_HOME>/bin/flink run \
    /path/to/paimon-flink-action-0.8.2.jar \
    compact_database \
    --warehouse <warehouse-path> \
    --including_databases <database-name|name-regular-expr> \ 
    [--including_tables <paimon-table-name|name-regular-expr>] \
    [--excluding_tables <paimon-table-name|name-regular-expr>] \
    [--mode <compact-mode>] \
    [--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \
    [--table_conf <paimon-table_conf> [--table_conf <paimon-table_conf> ...]]
  • --including_databases is used to specify which database is to be compacted. In compact mode, you need to specify a database name, in compact_database mode, you could specify multiple database, regular expression is supported.
  • --including_tables is used to specify which source tables are to be compacted, you must use ‘|’ to separate multiple tables, the format is databaseName.tableName, regular expression is supported. For example, specifying “–including_tables db1.t1|db2.+” means to compact table ‘db1.t1’ and all tables in the db2 database.
  • --excluding_tables is used to specify which source tables are not to be compacted. The usage is same as “–including_tables”. “–excluding_tables” has higher priority than “–including_tables” if you specified both.
  • mode is used to specify compaction mode. Possible values:
    • “divided” (the default mode if you haven’t specified one): start a sink for each table, the compaction of the new table requires restarting the job.
    • “combined”: start a single combined sink for all tables, the new table will be automatically compacted.
  • --catalog_conf is the configuration for Paimon catalog. Each configuration should be specified in the format key=value. See here for a complete list of catalog configurations.
  • --table_conf is the configuration for compaction. Each configuration should be specified in the format key=value. Pivotal configuration is listed below:
Key Default Type Description
continuous.discovery-interval 10 s Duration The discovery interval of continuous reading.
sink.parallelism (none) Integer Defines a custom parallelism for the sink. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration.

You can use -D execution.runtime-mode=batch to control batch or streaming mode. If you submit a batch job, all current table files will be compacted. If you submit a streaming job, the job will continuously monitor new changes to the table and perform compactions as needed.

If you only want to submit the compaction job and don’t want to wait until the job is done, you should submit in detached mode.
You can set --mode combined to enable compacting newly added tables without restarting job.

Example1: compact database

<FLINK_HOME>/bin/flink run \
    /path/to/paimon-flink-action-0.8.2.jar \
    compact_database \
    --warehouse s3:///path/to/warehouse \
    --including_databases test_db \
    --catalog_conf s3.endpoint=https://****.com \
    --catalog_conf s3.access-key=***** \
    --catalog_conf s3.secret-key=*****

Example2: compact database in combined mode

<FLINK_HOME>/bin/flink run \
    /path/to/paimon-flink-action-0.8.2.jar \
    compact_database \
    --warehouse s3:///path/to/warehouse \
    --including_databases test_db \
    --mode combined \
    --catalog_conf s3.endpoint=https://****.com \
    --catalog_conf s3.access-key=***** \
    --catalog_conf s3.secret-key=***** \
    --table_conf continuous.discovery-interval=*****

For more usage of the compact_database action, see

<FLINK_HOME>/bin/flink run \
    /path/to/paimon-flink-action-0.8.2.jar \
    compact_database --help

Sort Compact #

If your table is configured with dynamic bucket primary key table or append table , you can trigger a compact with specified column sort to speed up queries.

<FLINK_HOME>/bin/flink run \
    -D execution.runtime-mode=batch \
    /path/to/paimon-flink-action-0.8.2.jar \
    compact \
    --warehouse <warehouse-path> \
    --database <database-name> \ 
    --table <table-name> \
    --order_strategy <orderType> \
    --order_by <col1,col2,...>
    [--partition <partition-name>] \
    [--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \
    [--table_conf <paimon-table-dynamic-conf> [--table_conf <paimon-table-dynamic-conf>] ...]

There are two new configuration in Sort Compact

Configuration Description
--order_strategy
the order strategy now support "zorder" and "hilbert" and "order". For example: --order_strategy zorder
--order_by
Specify the order columns. For example: --order_by col0, col1

The sort parallelism is the same as the sink parallelism, you can dynamically specify it by add conf --table_conf sink.parallelism=<value>.

Edit This Page
Copyright © 2024 The Apache Software Foundation. Apache Paimon, Paimon, and its feather logo are trademarks of The Apache Software Foundation.