This documentation is for an unreleased version of Apache Paimon. We recommend you use the latest stable version.
Debezium BSON Format #
The debezium-bson format is one of the formats supported by Kafka CDC. It is the format obtained by collecting mongodb through debezium, which is similar to debezium-json format. However, MongoDB does not have a fixed schema, and the field types of each document may be different, so the before/after fields in JSON are all string types, while the debezium-json format requires a JSON object type.
Prepare MongoDB BSON Jar #
Can be downloaded from the Maven repository
Introduction #
The debezium bson format requires insert/update/delete event messages include the full document, and include a field that represents the state of the document before the change. This requires setting debezium’s capture.mode to change_streams_update_full_with_pre_image and capture.mode.full.update.type to post_image. Before version 6.0 of MongoDB, it was not possible to obtain ‘Update Before’ information. Therefore, using the id field in the Kafka Key as ‘Update before’ information
Here is a simple example for an update operation captured from a Mongodb customers collection in JSON format:
"schema": {
"type": "struct",
"fields": [
"type": "string",
"optional": true,
"name": "",
"version": 1,
"field": "before"
"type": "string",
"optional": true,
"name": "",
"version": 1,
"field": "after"
"payload": {
"before": "{\"_id\": {\"$oid\" : \"596e275826f08b2730779e1f\"}, \"name\" : \"Anne\", \"create_time\" : {\"$numberLong\" : \"1558965506000\"}, \"tags\":[\"success\"]}",
"after": "{\"_id\": {\"$oid\" : \"596e275826f08b2730779e1f\"}, \"name\" : \"Anne\", \"create_time\" : {\"$numberLong\" : \"1558965506000\"}, \"tags\":[\"passion\",\"success\"]}",
"source": {
"db": "inventory",
"rs": "rs0",
"collection": "customers",
"op": "u",
"ts_ms": 1558965515240,
"ts_us": 1558965515240142,
"ts_ns": 1558965515240142879
This document from the MongoDB collection customers has 4 columns, the _id is a BSON ObjectID, name is a string, create_time is a long, tags is an array of string. The following is the processing result in debezium-bson format:
Document Schema:
Field Name | Field Type | Key |
_id | STRING | Primary Key |
name | STRING | |
create_time | STRING | |
tags | STRING |
RowKind | _id | name | create_time | tags |
-U | 596e275826f08b2730779e1f | Anne | 1558965506000 | [“success”] |
+U | 596e275826f08b2730779e1f | Anne | 1558965506000 | [“passion”,“success”] |
How it works #
Because the schema field of the event message does not have the field information of the document, the debezium-bson format does not require event messages to have schema information. The specific operations are as follows:
- Parse the before/after fields of the event message into BSONDocument.
- Recursive traversal all fields of BSONDocument and convert BsonValue to Java Object.
- All top-level fields of before/after are converted to string type, and _id is fixed to primary key
- If the top-level fields of before/after is a basic type(such as Integer/Long, etc.), it is directly converted to a string, if not, it is converted to a JSON string
Below is a list of top-level field BsonValue conversion examples:
BsonValue Type | Json Value | Conversion Result String |
BsonString |
"hello" | "hello" |
BsonInt32 |
123 | "123" |
BsonInt64 |
"1735934393769" |
BsonDouble |
BsonBoolean |
BsonArray |
[1,2,{"$numberLong": "1735934393769"}] | "[1,2,1735934393769]" |
BsonObjectId |
{"$oid": "596e275826f08b2730779e1f"} | "596e275826f08b2730779e1f" |
BsonDateTime |
{"$date": 1735934393769 } | "1735934393769" |
BsonNull |
null | null |
BsonUndefined |
{"$undefined": true} | null |
BsonBinary |
{"$binary": "uE2/4v5MSVOiJZkOo3APKQ==", "$type": "0"} | "uE2/4v5MSVOiJZkOo3APKQ==" |
BsonBinary(type=UUID) |
{"$binary": "uE2/4v5MSVOiJZkOo3APKQ==", "$type": "4"} | "b84dbfe2-fe4c-4953-a225-990ea3700f29" |
BsonDecimal128 |
BsonRegularExpression |
{"$regularExpression": {"pattern": "^pass$", "options": "i"}} | "/^pass$/i" |
BsonSymbol |
{"$symbol": "symbol"} | "symbol" |
BsonTimestamp |
{"$timestamp": {"t": 1736997330, "i": 2}} | "1736997330" |
BsonMinKey |
{"$minKey": 1} | "BsonMinKey" |
BsonMaxKey |
{"$maxKey": 1} | "BsonMaxKey" |
BsonJavaScript |
{"$code": "function(){}"} | "function(){}" |
BsonJavaScriptWithScope |
{"$code": "function(){}", "$scope": {"name": "Anne"}} | '{"$code": "function(){}", "$scope": {"name": "Anne"}}' |
BsonDocument |
{ "decimalPi": {"$numberDecimal": "3.14"}, "doublePi": {"$numberDouble": "3.14"}, "doubleNaN": {"$numberDouble": "NaN"}, "decimalNaN": {"$numberDecimal": "NaN"}, "long": {"$numberLong": "100"}, "bool": true, "array": [ {"$numberInt": "1"}, {"$numberLong": "2"} ] } |
'{ "decimalPi":3.14, "doublePi":3.14, "doubleNaN":"NaN", "decimalNaN":"NaN", "long":100, "bool":true, "array":[1,2] }' |
How to use #
Use debezium-bson by adding the kafka_conf parameter value.format=debezium-bson. Let’s take table synchronization as an example:
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-1.1-SNAPSHOT.jar \
kafka_sync_table \
--warehouse hdfs:///path/to/warehouse \
--database test_db \
--table ods_mongodb_customers \
--primary_keys _id \
--kafka_conf properties.bootstrap.servers= \
--kafka_conf topic=customers \
--kafka_conf \
--kafka_conf value.format=debezium-bson \
--catalog_conf metastore=filesystem \
--table_conf bucket=4 \
--table_conf changelog-producer=input \
--table_conf sink.parallelism=4