This documentation is for an unreleased version of Apache Paimon. We recommend you use the latest stable version.
Write Performance #
Paimon’s write performance is closely related to checkpoint, so if you need greater write throughput:
- Flink Configuration (
'flink-conf.yaml'
orSET
in SQL): Increase the checkpoint interval ('execution.checkpointing.interval'
), increase max concurrent checkpoints to 3 ('execution.checkpointing.max-concurrent-checkpoints'
), or just use batch mode. - Increase
write-buffer-size
. - Enable
write-buffer-spillable
. - Rescale bucket number if you are using Fixed-Bucket mode.
Option 'changelog-producer' = 'lookup' or 'full-compaction'
, and option 'full-compaction.delta-commits'
have a
large impact on write performance, if it is a snapshot / full synchronization phase you can unset these options and
then enable them again in the incremental phase.
If you find that the input of the job shows a jagged pattern in the case of backpressure, it may be imbalanced work nodes. You can consider turning on Asynchronous Compaction to observe if the throughput is increased.
Parallelism #
It is recommended that the parallelism of sink should be less than or equal to the number of buckets, preferably equal. You can control the parallelism of the sink with the sink.parallelism
table property.
Option | Required | Default | Type | Description |
---|---|---|---|---|
sink.parallelism |
No | (none) | Integer | Defines the parallelism of the sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator. |
Local Merging #
If your job suffers from primary key data skew
(for example, you want to count the number of views for each page in a website,
and some particular pages are very popular among the users),
you can set 'local-merge-buffer-size'
so that input records will be buffered and merged
before they’re shuffled by bucket and written into sink.
This is particularly useful when the same primary key is updated frequently between snapshots.
The buffer will be flushed when it is full. We recommend starting with 64 mb
when you are faced with data skew but don’t know where to start adjusting buffer size.
(Currently, Local merging not works for CDC ingestion)
File Format #
If you want to achieve ultimate compaction performance, you can consider using row storage file format AVRO.
- The advantage is that you can achieve high write throughput and compaction performance.
- The disadvantage is that your analysis queries will be slow, and the biggest problem with row storage is that it does not have the query projection. For example, if the table have 100 columns but only query a few columns, the IO of row storage cannot be ignored. Additionally, compression efficiency will decrease and storage costs will increase.
This a tradeoff.
Enable row storage through the following options:
file.format = avro
metadata.stats-mode = none
The collection of statistical information for row storage is a bit expensive, so I suggest turning off statistical information as well.
If you don’t want to modify all files to Avro format, at least you can consider modifying the files in the previous
layers to Avro format. You can use 'file.format.per.level' = '0:avro,1:avro'
to specify the files in the first two
layers to be in Avro format.
File Compression #
By default, Paimon uses zstd with level 1, you can modify the compression algorithm:
'file.compression.zstd-level'
: Default zstd level is 1. For higher compression rates, it can be configured to 9, but the read and write speed will significantly decrease.
Stability #
If there are too few buckets or resources, full-compaction may cause the checkpoint timeout, Flink’s default checkpoint timeout is 10 minutes.
If you expect stability even in this case, you can turn up the checkpoint timeout, for example:
execution.checkpointing.timeout = 60 min
Write Initialize #
In the initialization of write, the writer of the bucket needs to read all historical files. If there is a bottleneck
here (For example, writing a large number of partitions simultaneously), you can use write-manifest-cache
to cache
the read manifest data to accelerate initialization.
Write Memory #
There are three main places in Paimon writer that takes up memory:
- Writer’s memory buffer, shared and preempted by all writers of a single task. This memory value can be adjusted by the
write-buffer-size
table property. - Memory consumed when merging several sorted runs for compaction. Can be adjusted by the
num-sorted-run.compaction-trigger
option to change the number of sorted runs to be merged. - If the row is very large, reading too many lines of data at once will consume a lot of memory when making a compaction. Reducing the
read.batch-size
option can alleviate the impact of this case. - The memory consumed by writing columnar ORC file. Decreasing the
orc.write.batch-size
option can reduce the consumption of memory for ORC format. - If files are automatically compaction in the write task, dictionaries for certain large columns can significantly consume memory during compaction.
- To disable dictionary encoding for all fields in Parquet format, set
'parquet.enable.dictionary'= 'false'
. - To disable dictionary encoding for all fields in ORC format, set
orc.dictionary.key.threshold='0'
. Additionally,setorc.column.encoding.direct='field1,field2'
to disable dictionary encoding for specific columns.
- To disable dictionary encoding for all fields in Parquet format, set
If your Flink job does not rely on state, please avoid using managed memory, which you can control with the following Flink parameter:
taskmanager.memory.managed.size=1m
Or you can use Flink managed memory for your write buffer to avoid OOM, set table property:
sink.use-managed-memory-allocator=true
Commit Memory #
Committer node may use a large memory if the amount of data written to the table is particularly large, OOM may occur if the memory is too small. In this case, you need to increase the Committer heap memory, but you may not want to increase the memory of Flink’s TaskManager uniformly, which may lead to a waste of memory.
You can use fine-grained-resource-management of Flink to increase committer heap memory only:
- Configure Flink Configuration
cluster.fine-grained-resource-management.enabled: true
. (This is default after Flink 1.18) - Configure Paimon Table Options:
sink.committer-memory
, for example 300 MB, depends on yourTaskManager
. (sink.committer-cpu
is also supported)
Changelog Compaction #
If Flink’s checkpoint interval is short (for example, 30 seconds) and the number of buckets is large, each snapshot may produce lots of small changelog files. Too many files may put a burden on the distributed storage cluster.
In order to compact small changelog files into large ones, you can set the table option changelog.precommit-compact = true
.
Default value of this option is false, if true, it will add a compact coordinator and worker operator after the writer operator, which copies changelog files into large ones.