Mongo CDC #
Prepare MongoDB Bundled Jar #
flink-sql-connector-mongodb-cdc-*.jar
only cdc 3.1+ is supported
Synchronizing Tables #
By using MongoDBSyncTableAction in a Flink DataStream job or directly through flink run
, users can synchronize one collection from MongoDB into one Paimon table.
To use this feature through flink run
, run the following shell command.
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-0.9.0.jar \
mongodb_sync_table
--warehouse <warehouse-path> \
--database <database-name> \
--table <table-name> \
[--partition_keys <partition_keys>] \
[--computed_column <'column-name=expr-name(args[, ...])'> [--computed_column ...]] \
[--mongodb_conf <mongodb-cdc-source-conf> [--mongodb_conf <mongodb-cdc-source-conf> ...]] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \
[--table_conf <paimon-table-sink-conf> [--table_conf <paimon-table-sink-conf> ...]]
Configuration | Description |
---|---|
--warehouse |
The path to Paimon warehouse. |
--database |
The database name in Paimon catalog. |
--table |
The Paimon table name. |
--partition_keys |
The partition keys for Paimon table. If there are multiple partition keys, connect them with comma, for example "dt,hh,mm". |
--computed_column |
The definitions of computed columns. The argument field is from MongoDB collection field name. See here for a complete list of configurations. |
--mongodb_conf |
The configuration for Flink CDC MongoDB sources. Each configuration should be specified in the format "key=value". hosts, username, password, database and collection are required configurations, others are optional. See its document for a complete list of configurations. |
--catalog_conf |
The configuration for Paimon catalog. Each configuration should be specified in the format "key=value". See here for a complete list of catalog configurations. |
--table_conf |
The configuration for Paimon table sink. Each configuration should be specified in the format "key=value". See here for a complete list of table configurations. |
Here are a few points to take note of:
- The
mongodb_conf
introduces theschema.start.mode
parameter on top of the MongoDB CDC source configuration.schema.start.mode
provides two modes:dynamic
(default) andspecified
. Indynamic
mode, MongoDB schema information is parsed at one level, which forms the basis for schema change evolution. Inspecified
mode, synchronization takes place according to specified criteria. This can be done by configuringfield.name
to specify the synchronization fields andparser.path
to specify the JSON parsing path for those fields. The difference between the two is that thespecify
mode requires the user to explicitly identify the fields to be used and create a mapping table based on those fields. Dynamic mode, on the other hand, ensures that Paimon and MongoDB always keep the top-level fields consistent, eliminating the need to focus on specific fields. Further processing of the data table is required when using values from nested fields. - The
mongodb_conf
introduces thedefault.id.generation
parameter as an enhancement to the MongoDB CDC source configuration. Thedefault.id.generation
setting offers two distinct behaviors: when set to true and when set to false. Whendefault.id.generation
is set to true, the MongoDB CDC source adheres to the default_id
generation strategy, which involves stripping the outer $oid nesting to provide a more straightforward identifier. This mode simplifies the_id
representation, making it more direct and user-friendly. On the contrary, whendefault.id.generation
is set to false, the MongoDB CDC source retains the original_id
structure, without any additional processing. This mode offers users the flexibility to work with the raw_id
format as provided by MongoDB, preserving any nested elements like$oid
. The choice between the two hinges on the user’s preference: the former for a cleaner, simplified_id
and the latter for a direct representation of MongoDB’s_id
structure.
Operator | Description |
---|---|
$ |
The root element to query. This starts all path expressions. |
@ |
The current node being processed by a filter predicate. |
* |
Wildcard. Available anywhere a name or numeric are required. |
.. |
Deep scan. Available anywhere a name is required. |
. |
Dot-notated child. |
['{name}' (, '{name}')] |
Bracket-notated child or children. |
[{number} (, {number})] |
Bracket-notated child or children. |
[start:end] |
Array index or indexes. |
[?({expression})] |
Filter expression. Expression must evaluate to a boolean value. |
Functions can be invoked at the tail end of a path - the input to a function is the output of the path expression. The function output is dictated by the function itself.
Function | Description | Output type |
---|---|---|
min() |
Provides the min value of an array of numbers. | Double |
max() |
Provides the max value of an array of numbers. | Double |
avg() |
Provides the average value of an array of numbers. | Double |
stddev() |
Provides the standard deviation value of an array of numbers | Double |
length() |
Provides the length of an array | Integer |
sum() |
Provides the sum value of an array of numbers. | Double |
keys() |
Provides the property keys (An alternative for terminal tilde ~) | Set |
concat(X) |
Provides a concatinated version of the path output with a new item. | like input |
append(X) |
add an item to the json path output array | like input |
append(X) |
add an item to the json path output array | like input |
first() |
Provides the first item of an array | Depends on the array |
last() |
Provides the last item of an array | Depends on the array |
index(X) |
Provides the item of an array of index: X, if the X is negative, take from backwards | Depends on the array |
Path Examples
{
"store": {
"book": [
{
"category": "reference",
"author": "Nigel Rees",
"title": "Sayings of the Century",
"price": 8.95
},
{
"category": "fiction",
"author": "Evelyn Waugh",
"title": "Sword of Honour",
"price": 12.99
},
{
"category": "fiction",
"author": "Herman Melville",
"title": "Moby Dick",
"isbn": "0-553-21311-3",
"price": 8.99
},
{
"category": "fiction",
"author": "J. R. R. Tolkien",
"title": "The Lord of the Rings",
"isbn": "0-395-19395-8",
"price": 22.99
}
],
"bicycle": {
"color": "red",
"price": 19.95
}
},
"expensive": 10
}
JsonPath | Result |
---|---|
$.store.book[*].author |
Provides the min value of an array of numbers. |
$..author |
All authors. |
$.store.* |
All things, both books and bicycles. |
$.store..price |
Provides the standard deviation value of an array of numbers. |
$..book[2] |
The third book. |
$..book[-2] |
The second to last book. |
$..book[0,1] |
The first two books. |
$..book[:2] |
All books from index 0 (inclusive) until index 2 (exclusive). |
$..book[1:2] |
All books from index 1 (inclusive) until index 2 (exclusive) |
$..book[-2:] |
Last two books |
$..book[2:] |
All books from index 2 (inclusive) to last |
$..book[?(@.isbn)] |
All books with an ISBN number |
$.store.book[?(@.price < 10)] |
All books in store cheaper than 10 |
$..book[?(@.price <= $['expensive'])] |
All books in store that are not "expensive" |
$..book[?(@.author =~ /.*REES/i)] |
All books matching regex (ignore case) |
$..* |
Give me every thing |
$..book.length() |
The number of books |
-
The synchronized table is required to have its primary key set as
_id
. This is because MongoDB’s change events are recorded before updates in messages. Consequently, we can only convert them into Flink’s UPSERT change log stream. The upstart stream demands a unique key, which is why we must declare_id
as the primary key. Declaring other columns as primary keys is not feasible, as delete operations only encompass the _id and sharding key, excluding other keys and values. -
MongoDB Change Streams are designed to return simple JSON documents without any data type definitions. This is because MongoDB is a document-oriented database, and one of its core features is the dynamic schema, where documents can contain different fields, and the data types of fields can be flexible. Therefore, the absence of data type definitions in Change Streams is to maintain this flexibility and extensibility. For this reason, we have set all field data types for synchronizing MongoDB to Paimon as String to address the issue of not being able to obtain data types.
If the Paimon table you specify does not exist, this action will automatically create the table. Its schema will be derived from MongoDB collection.
Example 1: synchronize collection into one Paimon table
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-0.9.0.jar \
mongodb_sync_table \
--warehouse hdfs:///path/to/warehouse \
--database test_db \
--table test_table \
--partition_keys pt \
--computed_column '_year=year(age)' \
--mongodb_conf hosts=127.0.0.1:27017 \
--mongodb_conf username=root \
--mongodb_conf password=123456 \
--mongodb_conf database=source_db \
--mongodb_conf collection=source_table1 \
--catalog_conf metastore=hive \
--catalog_conf uri=thrift://hive-metastore:9083 \
--table_conf bucket=4 \
--table_conf changelog-producer=input \
--table_conf sink.parallelism=4
Example 2: Synchronize collection into a Paimon table according to the specified field mapping.
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-0.9.0.jar \
mongodb_sync_table \
--warehouse hdfs:///path/to/warehouse \
--database test_db \
--table test_table \
--partition_keys pt \
--mongodb_conf hosts=127.0.0.1:27017 \
--mongodb_conf username=root \
--mongodb_conf password=123456 \
--mongodb_conf database=source_db \
--mongodb_conf collection=source_table1 \
--mongodb_conf schema.start.mode=specified \
--mongodb_conf field.name=_id,name,description \
--mongodb_conf parser.path=$._id,$.name,$.description \
--catalog_conf metastore=hive \
--catalog_conf uri=thrift://hive-metastore:9083 \
--table_conf bucket=4 \
--table_conf changelog-producer=input \
--table_conf sink.parallelism=4
Synchronizing Databases #
By using MongoDBSyncDatabaseAction in a Flink DataStream job or directly through flink run
, users can synchronize the whole MongoDB database into one Paimon database.
To use this feature through flink run
, run the following shell command.
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-0.9.0.jar \
mongodb_sync_database
--warehouse <warehouse-path> \
--database <database-name> \
[--table_prefix <paimon-table-prefix>] \
[--table_suffix <paimon-table-suffix>] \
[--including_tables <mongodb-table-name|name-regular-expr>] \
[--excluding_tables <mongodb-table-name|name-regular-expr>] \
[--partition_keys <partition_keys>] \
[--primary_keys <primary-keys>] \
[--mongodb_conf <mongodb-cdc-source-conf> [--mongodb_conf <mongodb-cdc-source-conf> ...]] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \
[--table_conf <paimon-table-sink-conf> [--table_conf <paimon-table-sink-conf> ...]]
Configuration | Description |
---|---|
--warehouse |
The path to Paimon warehouse. |
--database |
The database name in Paimon catalog. |
--table_prefix |
The prefix of all Paimon tables to be synchronized. For example, if you want all synchronized tables to have "ods_" as prefix, you can specify "--table_prefix ods_". |
--table_suffix |
The suffix of all Paimon tables to be synchronized. The usage is same as "--table_prefix". |
--including_tables |
It is used to specify which source tables are to be synchronized. You must use '|' to separate multiple tables.Because '|' is a special character, a comma is required, for example: 'a|b|c'.Regular expression is supported, for example, specifying "--including_tables test|paimon.*" means to synchronize table 'test' and all tables start with 'paimon'. |
--excluding_tables |
It is used to specify which source tables are not to be synchronized. The usage is same as "--including_tables". "--excluding_tables" has higher priority than "--including_tables" if you specified both. |
--partition_keys |
The partition keys for Paimon table. If there are multiple partition keys, connect them with comma, for example "dt,hh,mm". If the keys are not in source table, the sink table won't set partition keys. |
--primary_keys |
The primary keys for Paimon table. If there are multiple primary keys, connect them with comma, for example "buyer_id,seller_id". If the keys are not in source table, but the source table has primary keys, the sink table will use source table's primary keys. Otherwise, the sink table won't set primary keys. |
--mongodb_conf |
The configuration for Flink CDC MongoDB sources. Each configuration should be specified in the format "key=value". hosts, username, password, database are required configurations, others are optional. See its document for a complete list of configurations. |
--catalog_conf |
The configuration for Paimon catalog. Each configuration should be specified in the format "key=value". See here for a complete list of catalog configurations. |
--table_conf |
The configuration for Paimon table sink. Each configuration should be specified in the format "key=value". See here for a complete list of table configurations. |
All collections to be synchronized need to set _id as the primary key. For each MongoDB collection to be synchronized, if the corresponding Paimon table does not exist, this action will automatically create the table. Its schema will be derived from all specified MongoDB collection. If the Paimon table already exists, its schema will be compared against the schema of all specified MongoDB collection. Any MongoDB tables created after the commencement of the task will automatically be included.
Example 1: synchronize entire database
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-0.9.0.jar \
mongodb_sync_database \
--warehouse hdfs:///path/to/warehouse \
--database test_db \
--mongodb_conf hosts=127.0.0.1:27017 \
--mongodb_conf username=root \
--mongodb_conf password=123456 \
--mongodb_conf database=source_db \
--catalog_conf metastore=hive \
--catalog_conf uri=thrift://hive-metastore:9083 \
--table_conf bucket=4 \
--table_conf changelog-producer=input \
--table_conf sink.parallelism=4
Example 2: Synchronize the specified table.
<FLINK_HOME>/bin/flink run \
--fromSavepoint savepointPath \
/path/to/paimon-flink-action-0.9.0.jar \
mongodb_sync_database \
--warehouse hdfs:///path/to/warehouse \
--database test_db \
--mongodb_conf hosts=127.0.0.1:27017 \
--mongodb_conf username=root \
--mongodb_conf password=123456 \
--mongodb_conf database=source_db \
--catalog_conf metastore=hive \
--catalog_conf uri=thrift://hive-metastore:9083 \
--table_conf bucket=4 \
--including_tables 'product|user|address|order|custom'