External Log Systems

External Log Systems #

Aside from underlying table files, changelog of Paimon can also be stored into or consumed from an external log system, such as Kafka. By specifying log.system table property, users can choose which external log system to use.

If an external log system is used, all records written into table files will also be written into the log system. Changes produced by the streaming queries will thus come from the log system instead of table files.

Consistency Guarantees #

By default, changes in the log systems are visible to consumers only after a snapshot, just like table files. This behavior guarantees the exactly-once semantics. That is, each record is seen by the consumers exactly once.

However, users can also specify the table property 'log.consistency' = 'eventual' so that changelog written into the log system can be immediately consumed by the consumers, without waiting for the next snapshot. This behavior decreases the latency of changelog, but it can only guarantee the at-least-once semantics (that is, consumers might see duplicated records) due to possible failures.

If 'log.consistency' = 'eventual' is set, in order to achieve correct results, Paimon source in Flink will automatically adds a “normalize” operator for deduplication. This operator persists the values of each key in states. As one can easily tell, this operator will be very costly and should be avoided.

Supported Log Systems #

Kafka #

Paimon currently supports Flink 1.17, 1.16, 1.15 and 1.14. We recommend the latest Flink version for a better experience.

Download the flink-sql-connector-kafka jar file with corresponding version.

Version Jar
Flink 1.17 flink-sql-connector-kafka-1.17.0.jar
Flink 1.16 flink-sql-connector-kafka-1.16.1.jar
Flink 1.15 flink-sql-connector-kafka-1.15.4.jar
Flink 1.14 flink-sql-connector-kafka_2.11-1.14.4.jar

By specifying 'log.system' = 'kafka', users can write changes into Kafka along with table files.

CREATE TABLE T (...)
WITH (
    'log.system' = 'kafka',
    'kafka.bootstrap.servers' = '...',
    'kafka.topic' = '...'
);

Table Properties for Kafka are listed as follows.

Key Default Type Description
kafka.bootstrap.servers
(none) String Required Kafka server connection string.
kafka.topic
(none) String Topic of this kafka table.