Write Performance

Write Performance #

Performance of Paimon writers are related with the following factors.

Parallelism #

It is recommended that the parallelism of sink should be less than or equal to the number of buckets, preferably equal. You can control the parallelism of the sink with the sink.parallelism table property.

Option Required Default Type Description
sink.parallelism
No (none) Integer Defines the parallelism of the sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.

Write Initialize #

In the initialization of write, the writer of the bucket needs to read all historical files. If there is a bottleneck here (For example, writing a large number of partitions simultaneously), you can use write-manifest-cache to cache the read manifest data to accelerate initialization.

Compaction #

Number of Sorted Runs to Trigger Compaction #

Paimon uses LSM tree which supports a large number of updates. LSM organizes files in several sorted runs. When querying records from an LSM tree, all sorted runs must be combined to produce a complete view of all records.

One can easily see that too many sorted runs will result in poor query performance. To keep the number of sorted runs in a reasonable range, Paimon writers will automatically perform compactions. The following table property determines the minimum number of sorted runs to trigger a compaction.

Option Required Default Type Description
num-sorted-run.compaction-trigger
No 5 Integer The sorted run number to trigger compaction. Includes level0 files (one file one sorted run) and high-level runs (one level one sorted run).

Compaction will become less frequent when num-sorted-run.compaction-trigger becomes larger, thus improving writing performance. However, if this value becomes too large, more memory and CPU time will be needed when querying the table. This is a trade-off between writing and query performance.

Number of Sorted Runs to Pause Writing #

When number of sorted runs is small, Paimon writers will perform compaction asynchronously in separated threads, so records can be continuously written into the table. However to avoid unbounded growth of sorted runs, writers will have to pause writing when the number of sorted runs hits the threshold. The following table property determines the threshold.

Option Required Default Type Description
num-sorted-run.stop-trigger
No (none) Integer The number of sorted runs that trigger the stopping of writes, the default value is 'num-sorted-run.compaction-trigger' + 1.

Write stalls will become less frequent when num-sorted-run.stop-trigger becomes larger, thus improving writing performance. However, if this value becomes too large, more memory and CPU time will be needed when querying the table. This is a trade-off between writing and query performance.

Dedicated Compaction Job #

By default, Paimon writers will perform compaction as needed when writing records. This is sufficient for most use cases, but there are two downsides:

  • This may result in unstable write throughput because throughput might temporarily drop when performing a compaction.
  • Compaction will mark some data files as “deleted” (not really deleted, see expiring snapshots for more info). If multiple writers mark the same file a conflict will occur when committing the changes. Paimon will automatically resolve the conflict, but this may result in job restarts.

To avoid these downsides, users can also choose to skip compactions in writers, and run a dedicated job only for compaction. As compactions are performed only by the dedicated job, writers can continuously write records without pausing and no conflicts will ever occur.

To skip compactions in writers, set the following table property to true.

Option Required Default Type Description
write-only
No false Boolean If set to true, compactions and snapshot expiration will be skipped. This option is used along with dedicated compact jobs.

To run a dedicated job for compaction, follow these instructions.

Flink SQL currently does not support statements related to compactions, so we have to submit the compaction job through flink run.

Run the following command to submit a compaction job for the table.

<FLINK_HOME>/bin/flink run \
    /path/to/paimon-flink-action-0.4.0-incubating.jar \
    compact \
    --warehouse <warehouse-path> \
    --database <database-name> \ 
    --table <table-name> \
    [--partition <partition-name>] \
    [--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]] \
  • --catalog-conf is the configuration for Paimon catalog. Each configuration should be specified in the format key=value. See here for a complete list of catalog configurations.

If you submit a batch job (set execution.runtime-mode: batch in Flink’s configuration), all current table files will be compacted. If you submit a streaming job (set execution.runtime-mode: streaming in Flink’s configuration), the job will continuously monitor new changes to the table and perform compactions as needed.

If you only want to submit the compaction job and don’t want to wait until the job is done, you should submit in detached mode.

Example

<FLINK_HOME>/bin/flink run \
    /path/to/paimon-flink-action-0.4.0-incubating.jar \
    compact \
    --warehouse s3:///path/to/warehouse \
    --database test_db \
    --table test_table \
    --partition dt=20221126,hh=08 \
    --partition dt=20221127,hh=09 \
    --catalog-conf s3.endpoint=https://****.com \
    --catalog-conf s3.access-key=***** \
    --catalog-conf s3.secret-key=*****

For more usage of the compact action, see

<FLINK_HOME>/bin/flink run \
    /path/to/paimon-flink-action-0.4.0-incubating.jar \
    compact --help

Memory #

There are three main places in Paimon writer that takes up memory:

  • Writer’s memory buffer, shared and preempted by all writers of a single task. This memory value can be adjusted by the write-buffer-size table property.
  • Memory consumed when merging several sorted runs for compaction. Can be adjusted by the num-sorted-run.compaction-trigger option to change the number of sorted runs to be merged.
  • If the row is very large, reading too many lines of data at once can consume a lot of memory when making a compaction. Reducing the read.batch-size option can alleviate the impact of this case.
  • The memory consumed by writing columnar (ORC, Parquet, etc.) file, which is not adjustable.