Skip to main content

Dedicated Compaction

Paimon's snapshot management supports writing with multiple writers.

info

For S3-like object store, its 'RENAME' does not have atomic semantic. We need to configure Hive metastore and enable 'lock.enabled' option for the catalog.

By default, Paimon supports concurrent writing to different partitions. A recommended mode is that streaming job writes records to Paimon's latest partition, Simultaneously batch job (overwrite) writes records to the historical partition.

So far, everything works very well, but if you need multiple writers to write records to the same partition, it will be a bit more complicated. For example, you don't want to use UNION ALL, you have multiple streaming jobs to write records to a 'partial-update' table. Please refer to the 'Dedicated Compaction Job' below.

Dedicated Compaction Job

By default, Paimon writers will perform compaction as needed during writing records. This is sufficient for most use cases.

Compaction will mark some data files as "deleted" (not really deleted, see expiring snapshots for more info). If multiple writers mark the same file, a conflict will occur when committing the changes. Paimon will automatically resolve the conflict, but this may result in job restarts.

To avoid these downsides, users can also choose to skip compactions in writers, and run a dedicated job only for compaction. As compactions are performed only by the dedicated job, writers can continuously write records without pausing and no conflicts will ever occur.

To skip compactions in writers, set the following table property to true.

OptionRequiredDefaultTypeDescription
write-only
NofalseBooleanIf set to true, compactions and snapshot expiration will be skipped. This option is used along with dedicated compact jobs.

To run a dedicated job for compaction, follow these instructions.

Run the following sql:

CALL sys.compact(
`table` => 'default.T',
partitions => 'p=0',
options => 'sink.parallelism=4',
`where` => 'dt>10 and h<20'
);
info

Similarly, the default is synchronous compaction, which may cause checkpoint timeouts. You can configure table_conf to use Asynchronous Compaction.

Database Compaction Job

You can run the following command to submit a compaction job for multiple database.

Run the following sql:

CALL sys.compact_database(
including_databases => 'includingDatabases',
mode => 'mode',
including_tables => 'includingTables',
excluding_tables => 'excludingTables',
table_options => 'tableOptions'
)

-- example
CALL sys.compact_database(
including_databases => 'db1|db2',
mode => 'combined',
including_tables => 'table_.*',
excluding_tables => 'ignore',
table_options => 'sink.parallelism=4'
)

Sort Compact

If your table is configured with dynamic bucket primary key table or append table , you can trigger a compact with specified column sort to speed up queries.

Run the following sql:

-- sort compact table
CALL sys.compact(`table` => 'default.T', order_strategy => 'zorder', order_by => 'a,b')
info

Sort Compact currently supports only bucket=-1 and batch mode.

Historical Partition Compact

You can run the following command to submit a compaction job for partition which has not received any new data for a period of time. Small files in those partitions will be full compacted.

info

This feature now is only used in batch mode.

For Table

This is for one table.

Run the following sql:

-- history partition compact table
CALL sys.compact(`table` => 'default.T', partition_idle_time => '1 d')

For Databases

This is for multiple tables in different databases.

Run the following sql:

-- history partition compact table
CALL sys.compact_database(
including_databases => 'includingDatabases',
mode => 'mode',
including_tables => 'includingTables',
excluding_tables => 'excludingTables',
table_options => 'tableOptions',
partition_idle_time => 'partition_idle_time'
);

Example: compact historical partitions for tables in database

-- history partition compact table
CALL sys.compact_database(
includingDatabases => 'test_db',
mode => 'combined',
partition_idle_time => '1 d'
);