External Log Systems #
Aside from underlying table files, changelog of Paimon can also be stored into or consumed from an external log system, such as Kafka. By specifying log.system
table property, users can choose which external log system to use.
If an external log system is used, all records written into table files will also be written into the log system. Changes produced by the streaming queries will thus come from the log system instead of table files.
Consistency Guarantees #
By default, changes in the log systems are visible to consumers only after a snapshot, just like table files. This behavior guarantees the exactly-once semantics. That is, each record is seen by the consumers exactly once.
However, users can also specify the table property 'log.consistency' = 'eventual'
so that changelog written into the log system can be immediately consumed by the consumers, without waiting for the next snapshot. This behavior decreases the latency of changelog, but it can only guarantee the at-least-once semantics (that is, consumers might see duplicated records) due to possible failures.
If 'log.consistency' = 'eventual'
is set, in order to achieve correct results, Paimon source in Flink will automatically adds a “normalize” operator for deduplication. This operator persists the values of each key in states. As one can easily tell, this operator will be very costly and should be avoided.
Supported Log Systems #
Kafka #
Preparing flink-sql-connector-kafka Jar File #
Paimon currently supports Flink 1.17, 1.16, 1.15 and 1.14. We recommend the latest Flink version for a better experience.
Download the flink-sql-connector-kafka jar file with corresponding version.
Version | Jar |
---|---|
Flink 1.17 | flink-sql-connector-kafka-1.17.0.jar |
Flink 1.16 | flink-sql-connector-kafka-1.16.1.jar |
Flink 1.15 | flink-sql-connector-kafka-1.15.4.jar |
Flink 1.14 | flink-sql-connector-kafka_2.11-1.14.4.jar |
By specifying 'log.system' = 'kafka'
, users can write changes into Kafka along with table files.
CREATE TABLE T (...)
WITH (
'log.system' = 'kafka',
'kafka.bootstrap.servers' = '...',
'kafka.topic' = '...'
);
Table Properties for Kafka are listed as follows.
Key | Default | Type | Description |
---|---|---|---|
kafka.bootstrap.servers |
(none) | String | Required Kafka server connection string. |
kafka.topic |
(none) | String | Topic of this kafka table. |