Debezium BSON
This documentation is for an unreleased version of Apache Paimon. We recommend you use the latest stable version.

Debezium BSON Format #

The debezium-bson format is one of the formats supported by Kafka CDC. It is the format obtained by collecting mongodb through debezium, which is similar to debezium-json format. However, MongoDB does not have a fixed schema, and the field types of each document may be different, so the before/after fields in JSON are all string types, while the debezium-json format requires a JSON object type.

Prepare MongoDB BSON Jar #

Can be downloaded from the Maven repository

bson-*.jar

Introduction #

The debezium bson format requires insert/update/delete event messages include the full document, and include a field that represents the state of the document before the change. This requires setting debezium’s capture.mode to change_streams_update_full_with_pre_image and capture.mode.full.update.type to post_image. Before version 6.0 of MongoDB, it was not possible to obtain ‘Update Before’ information. Therefore, using the id field in the Kafka Key as ‘Update before’ information

Here is a simple example for an update operation captured from a Mongodb customers collection in JSON format:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "string",
        "optional": true,
        "name": "io.debezium.data.Json",
        "version": 1,
        "field": "before"
      },
      {
        "type": "string",
        "optional": true,
        "name": "io.debezium.data.Json",
        "version": 1,
        "field": "after"
      },
      ...
    ]
  },
  "payload": {
    "before": "{\"_id\": {\"$oid\" : \"596e275826f08b2730779e1f\"}, \"name\" : \"Anne\", \"create_time\" : {\"$numberLong\" : \"1558965506000\"}, \"tags\":[\"success\"]}",
    "after": "{\"_id\": {\"$oid\" : \"596e275826f08b2730779e1f\"}, \"name\" : \"Anne\", \"create_time\" : {\"$numberLong\" : \"1558965506000\"}, \"tags\":[\"passion\",\"success\"]}",
    "source": {
      "db": "inventory",
      "rs": "rs0",
      "collection": "customers",
      ...
    },
    "op": "u",
    "ts_ms": 1558965515240,
    "ts_us": 1558965515240142,
    "ts_ns": 1558965515240142879
  }
}

This document from the MongoDB collection customers has 4 columns, the _id is a BSON ObjectID, name is a string, create_time is a long, tags is an array of string. The following is the processing result in debezium-bson format:

Document Schema:

Field Name Field Type Key
_id STRING Primary Key
name STRING
create_time STRING
tags STRING

Records:

RowKind _id name create_time tags
-U 596e275826f08b2730779e1f Anne 1558965506000 [“success”]
+U 596e275826f08b2730779e1f Anne 1558965506000 [“passion”,“success”]

How it works #

Because the schema field of the event message does not have the field information of the document, the debezium-bson format does not require event messages to have schema information. The specific operations are as follows:

  • Parse the before/after fields of the event message into BSONDocument.
  • Recursive traversal all fields of BSONDocument and convert BsonValue to Java Object.
  • All top-level fields of before/after are converted to string type, and _id is fixed to primary key
  • If the top-level fields of before/after is a basic type(such as Integer/Long, etc.), it is directly converted to a string, if not, it is converted to a JSON string

Below is a list of top-level field BsonValue conversion examples:

BsonValue Type Json Value Conversion Result String
BsonString
"hello" "hello"
BsonInt32
123 "123"
BsonInt64
  • 1735934393769
  • {"$numberLong": "1735934393769"}
"1735934393769"
BsonDouble
  • {"$numberDouble": "3.14"}
  • {"$numberDouble": "NaN"}
  • {"$numberDouble": "Infinity"}
  • {"$numberDouble": "-Infinity"}
  • "3.14"
  • "NaN"
  • "Infinity"
  • "-Infinity"
BsonBoolean
  • true
  • false
  • "true"
  • "false"
BsonArray
[1,2,{"$numberLong": "1735934393769"}] "[1,2,1735934393769]"
BsonObjectId
{"$oid": "596e275826f08b2730779e1f"} "596e275826f08b2730779e1f"
BsonDateTime
{"$date": 1735934393769 } "1735934393769"
BsonNull
null null
BsonUndefined
{"$undefined": true} null
BsonBinary
{"$binary": "uE2/4v5MSVOiJZkOo3APKQ==", "$type": "0"} "uE2/4v5MSVOiJZkOo3APKQ=="
BsonBinary(type=UUID)
{"$binary": "uE2/4v5MSVOiJZkOo3APKQ==", "$type": "4"} "b84dbfe2-fe4c-4953-a225-990ea3700f29"
BsonDecimal128
  • {"$numberDecimal": "3.14"}
  • {"$numberDecimal": "NaN"}
  • "3.14"
  • "NaN"
BsonRegularExpression
{"$regularExpression": {"pattern": "^pass$", "options": "i"}} "/^pass$/i"
BsonSymbol
{"$symbol": "symbol"} "symbol"
BsonTimestamp
{"$timestamp": {"t": 1736997330, "i": 2}} "1736997330"
BsonMinKey
{"$minKey": 1} "BsonMinKey"
BsonMaxKey
{"$maxKey": 1} "BsonMaxKey"
BsonJavaScript
{"$code": "function(){}"} "function(){}"
BsonJavaScriptWithScope
{"$code": "function(){}", "$scope": {"name": "Anne"}} '{"$code": "function(){}", "$scope": {"name": "Anne"}}'
BsonDocument
{
  "decimalPi": {"$numberDecimal": "3.14"},
  "doublePi": {"$numberDouble": "3.14"},
  "doubleNaN": {"$numberDouble": "NaN"},
  "decimalNaN": {"$numberDecimal": "NaN"},
  "long": {"$numberLong": "100"},
  "bool": true,
  "array": [
    {"$numberInt": "1"},
    {"$numberLong": "2"}
  ]
}
'{
  "decimalPi":3.14,
  "doublePi":3.14,
  "doubleNaN":"NaN",
  "decimalNaN":"NaN",
  "long":100,
  "bool":true,
  "array":[1,2]
}'

How to use #

Use debezium-bson by adding the kafka_conf parameter value.format=debezium-bson. Let’s take table synchronization as an example:

<FLINK_HOME>/bin/flink run \
    /path/to/paimon-flink-action-1.1-SNAPSHOT.jar \
    kafka_sync_table \
    --warehouse hdfs:///path/to/warehouse \
    --database test_db \
    --table ods_mongodb_customers \
    --primary_keys _id \
    --kafka_conf properties.bootstrap.servers=127.0.0.1:9020 \
    --kafka_conf topic=customers \
    --kafka_conf properties.group.id=123456 \
    --kafka_conf value.format=debezium-bson \
    --catalog_conf metastore=filesystem \
    --table_conf bucket=4 \
    --table_conf changelog-producer=input \
    --table_conf sink.parallelism=4
Edit This Page
Copyright © 2024 The Apache Software Foundation. Apache Paimon, Paimon, and its feather logo are trademarks of The Apache Software Foundation.