Changelog Producer

Changelog Producer #

Streaming write can continuously produce the latest changes for streaming read.

By specifying the changelog-producer table property when creating the table, users can choose the pattern of changes produced from table files.

changelog-producer may significantly reduce compaction performance, please do not enable it unless necessary.

None #

By default, no extra changelog producer will be applied to the writer of table. Paimon source can only see the merged changes across snapshots, like what keys are removed and what are the new values of some keys.

However, these merged changes cannot form a complete changelog, because we can’t read the old values of the keys directly from them. Merged changes require the consumers to “remember” the values of each key and to rewrite the values without seeing the old ones. Some consumers, however, need the old values to ensure correctness or efficiency.

Consider a consumer which calculates the sum on some grouping keys (might not be equal to the primary keys). If the consumer only sees a new value 5, it cannot determine what values should be added to the summing result. For example, if the old value is 4, it should add 1 to the result. But if the old value is 6, it should in turn subtract 1 from the result. Old values are important for these types of consumers.

To conclude, none changelog producers are best suited for consumers such as a database system. Flink also has a built-in “normalize” operator which persists the values of each key in states. As one can easily tell, this operator will be very costly and should be avoided. (You can force removing “normalize” operator via 'scan.remove-normalize'.)

Input #

By specifying 'changelog-producer' = 'input', Paimon writers rely on their inputs as a source of complete changelog. All input records will be saved in separated changelog files and will be given to the consumers by Paimon sources.

input changelog producer can be used when Paimon writers' inputs are complete changelog, such as from a database CDC, or generated by Flink stateful computation.

Lookup #

If your input can’t produce a complete changelog but you still want to get rid of the costly normalized operator, you may consider using the 'lookup' changelog producer.

By specifying 'changelog-producer' = 'lookup', Paimon will generate changelog through 'lookup' before committing the data writing (You can also enable Async Compaction).

Lookup will cache data on the memory and local disk, you can use the following options to tune performance:

Option Default Type Description
lookup.cache-file-retention
1 h Duration The cached files retention time for lookup. After the file expires, if there is a need for access, it will be re-read from the DFS to build an index on the local disk.
lookup.cache-max-disk-size
unlimited MemorySize Max disk size for lookup cache, you can use this option to limit the use of local disks.
lookup.cache-max-memory-size
256 mb MemorySize Max memory size for lookup cache.

Lookup changelog-producer supports changelog-producer.row-deduplicate to avoid generating -U, +U changelog for the same record.

(Note: Please increase 'execution.checkpointing.max-concurrent-checkpoints' Flink configuration, this is very important for performance).

Full Compaction #

You can also consider using ‘full-compaction’ changelog producer to generate changelog, and is more suitable for scenarios with large latency (For example, 30 minutes).

  1. By specifying 'changelog-producer' = 'full-compaction', Paimon will compare the results between full compactions and produce the differences as changelog. The latency of changelog is affected by the frequency of full compactions.
  2. By specifying full-compaction.delta-commits table property, full compaction will be constantly triggered after delta commits (checkpoints). This is set to 1 by default, so each checkpoint will have a full compression and generate a changelog.

Generally speaking, the cost and consumption of full compaction are high, so we recommend using 'lookup' changelog producer.

Full compaction changelog producer can produce complete changelog for any type of source. However it is not as efficient as the input changelog producer and the latency to produce changelog might be high.

Full-compaction changelog-producer supports changelog-producer.row-deduplicate to avoid generating -U, +U changelog for the same record.

Changelog Merging #

For input, lookup, full-compaction ‘changelog-producer’.

If Flink’s checkpoint interval is short (for example, 30 seconds) and the number of buckets is large, each snapshot may produce lots of small changelog files. Too many files may put a burden on the distributed storage cluster.

In order to compact small changelog files into large ones, you can set the table option changelog.precommit-compact = true. Default value of this option is false, if true, it will add a compact coordinator and worker operator after the writer operator, which copies changelog files into large ones.

Edit This Page
Copyright © 2024 The Apache Software Foundation. Apache Paimon, Paimon, and its feather logo are trademarks of The Apache Software Foundation.