import ConfigTable from '@site/src/components/ConfigTable'; import mongodbSyncTableHtml from '@site/generated/mongodb_sync_table.html'; import mongodbOperatorHtml from '@site/generated/mongodb_operator.html'; import mongodbFunctionsHtml from '@site/generated/mongodb_functions.html'; import mongodbPathExampleHtml from '@site/generated/mongodb_path_example.html'; import mongodbSyncDatabaseHtml from '@site/generated/mongodb_sync_database.html';
Mongo CDC
Prepare MongoDB Bundled Jar
| Version | Bundled Jar |
|---|---|
| 3.5.0 |
Only CDC 3.5.0 or above is supported.
Synchronizing Tables
By using MongoDBSyncTableAction in a Flink DataStream job or directly through flink run, users can synchronize one collection from MongoDB into one Paimon table.
To use this feature through flink run, run the following shell command.
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-1.5-SNAPSHOT.jar \
mongodb_sync_table \
--warehouse <warehouse-path> \
--database <database-name> \
--table <table-name> \
[--partition_keys <partition_keys>] \
[--computed_column <'column-name=expr-name(args[, ...])'> [--computed_column ...]] \
[--mongodb_conf <mongodb-cdc-source-conf> [--mongodb_conf <mongodb-cdc-source-conf> ...]] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \
[--table_conf <paimon-table-sink-conf> [--table_conf <paimon-table-sink-conf> ...]]
<ConfigTable html={mongodbSyncTableHtml} />
Here are a few points to take note of:
- The
mongodb_confintroduces theschema.start.modeparameter on top of the MongoDB CDC source configuration.schema.start.modeprovides two modes:dynamic(default) andspecified. Indynamicmode, MongoDB schema information is parsed at one level, which forms the basis for schema change evolution. Inspecifiedmode, synchronization takes place according to specified criteria. This can be done by configuringfield.nameto specify the synchronization fields andparser.pathto specify the JSON parsing path for those fields. The difference between the two is that thespecifymode requires the user to explicitly identify the fields to be used and create a mapping table based on those fields. Dynamic mode, on the other hand, ensures that Paimon and MongoDB always keep the top-level fields consistent, eliminating the need to focus on specific fields. Further processing of the data table is required when using values from nested fields. - The
mongodb_confintroduces thedefault.id.generationparameter as an enhancement to the MongoDB CDC source configuration. Thedefault.id.generationsetting offers two distinct behaviors: when set to true and when set to false. Whendefault.id.generationis set to true, the MongoDB CDC source adheres to the default_idgeneration strategy, which involves stripping the outer $oid nesting to provide a more straightforward identifier. This mode simplifies the_idrepresentation, making it more direct and user-friendly. On the contrary, whendefault.id.generationis set to false, the MongoDB CDC source retains the original_idstructure, without any additional processing. This mode offers users the flexibility to work with the raw_idformat as provided by MongoDB, preserving any nested elements like$oid. The choice between the two hinges on the user's preference: the former for a cleaner, simplified_idand the latter for a direct representation of MongoDB's_idstructure.
<ConfigTable html={mongodbOperatorHtml} />
Functions can be invoked at the tail end of a path - the input to a function is the output of the path expression. The function output is dictated by the function itself.
<ConfigTable html={mongodbFunctionsHtml} />
Path Examples
{
"store": {
"book": [
{
"category": "reference",
"author": "Nigel Rees",
"title": "Sayings of the Century",
"price": 8.95
},
{
"category": "fiction",
"author": "Evelyn Waugh",
"title": "Sword of Honour",
"price": 12.99
},
{
"category": "fiction",
"author": "Herman Melville",
"title": "Moby Dick",
"isbn": "0-553-21311-3",
"price": 8.99
},
{
"category": "fiction",
"author": "J. R. R. Tolkien",
"title": "The Lord of the Rings",
"isbn": "0-395-19395-8",
"price": 22.99
}
],
"bicycle": {
"color": "red",
"price": 19.95
}
},
"expensive": 10
}
<ConfigTable html={mongodbPathExampleHtml} />
-
The synchronized table is required to have its primary key set as
_id. This is because MongoDB's change events are recorded before updates in messages. Consequently, we can only convert them into Flink's UPSERT change log stream. The upstart stream demands a unique key, which is why we must declare_idas the primary key. Declaring other columns as primary keys is not feasible, as delete operations only encompass the _id and sharding key, excluding other keys and values. -
MongoDB Change Streams are designed to return simple JSON documents without any data type definitions. This is because MongoDB is a document-oriented database, and one of its core features is the dynamic schema, where documents can contain different fields, and the data types of fields can be flexible. Therefore, the absence of data type definitions in Change Streams is to maintain this flexibility and extensibility. For this reason, we have set all field data types for synchronizing MongoDB to Paimon as String to address the issue of not being able to obtain data types.
If the Paimon table you specify does not exist, this action will automatically create the table. Its schema will be derived from MongoDB collection.
Example 1: synchronize collection into one Paimon table
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-1.5-SNAPSHOT.jar \
mongodb_sync_table \
--warehouse hdfs:///path/to/warehouse \
--database test_db \
--table test_table \
--partition_keys pt \
--computed_column '_year=year(age)' \
--mongodb_conf hosts=127.0.0.1:27017 \
--mongodb_conf username=root \
--mongodb_conf password=123456 \
--mongodb_conf database=source_db \
--mongodb_conf collection=source_table1 \
--catalog_conf metastore=hive \
--catalog_conf uri=thrift://hive-metastore:9083 \
--table_conf bucket=4 \
--table_conf changelog-producer=input \
--table_conf sink.parallelism=4
Example 2: Synchronize collection into a Paimon table according to the specified field mapping.
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-1.5-SNAPSHOT.jar \
mongodb_sync_table \
--warehouse hdfs:///path/to/warehouse \
--database test_db \
--table test_table \
--partition_keys pt \
--mongodb_conf hosts=127.0.0.1:27017 \
--mongodb_conf username=root \
--mongodb_conf password=123456 \
--mongodb_conf database=source_db \
--mongodb_conf collection=source_table1 \
--mongodb_conf schema.start.mode=specified \
--mongodb_conf field.name=_id,name,description \
--mongodb_conf parser.path=$._id,$.name,$.description \
--catalog_conf metastore=hive \
--catalog_conf uri=thrift://hive-metastore:9083 \
--table_conf bucket=4 \
--table_conf changelog-producer=input \
--table_conf sink.parallelism=4
Synchronizing Databases
By using MongoDBSyncDatabaseAction in a Flink DataStream job or directly through flink run, users can synchronize the whole MongoDB database into one Paimon database.
To use this feature through flink run, run the following shell command.
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-1.5-SNAPSHOT.jar \
mongodb_sync_database \
--warehouse <warehouse-path> \
--database <database-name> \
[--table_prefix <paimon-table-prefix>] \
[--table_suffix <paimon-table-suffix>] \
[--including_tables <mongodb-table-name|name-regular-expr>] \
[--excluding_tables <mongodb-table-name|name-regular-expr>] \
[--partition_keys <partition_keys>] \
[--primary_keys <primary-keys>] \
[--mongodb_conf <mongodb-cdc-source-conf> [--mongodb_conf <mongodb-cdc-source-conf> ...]] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \
[--table_conf <paimon-table-sink-conf> [--table_conf <paimon-table-sink-conf> ...]]
<ConfigTable html={mongodbSyncDatabaseHtml} />
All collections to be synchronized need to set _id as the primary key. For each MongoDB collection to be synchronized, if the corresponding Paimon table does not exist, this action will automatically create the table. Its schema will be derived from all specified MongoDB collection. If the Paimon table already exists, its schema will be compared against the schema of all specified MongoDB collection. Any MongoDB tables created after the commencement of the task will automatically be included.
Example 1: synchronize entire database
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-1.5-SNAPSHOT.jar \
mongodb_sync_database \
--warehouse hdfs:///path/to/warehouse \
--database test_db \
--mongodb_conf hosts=127.0.0.1:27017 \
--mongodb_conf username=root \
--mongodb_conf password=123456 \
--mongodb_conf database=source_db \
--catalog_conf metastore=hive \
--catalog_conf uri=thrift://hive-metastore:9083 \
--table_conf bucket=4 \
--table_conf changelog-producer=input \
--table_conf sink.parallelism=4
Example 2: Synchronize the specified table.
<FLINK_HOME>/bin/flink run \
--fromSavepoint savepointPath \
/path/to/paimon-flink-action-1.5-SNAPSHOT.jar \
mongodb_sync_database \
--warehouse hdfs:///path/to/warehouse \
--database test_db \
--mongodb_conf hosts=127.0.0.1:27017 \
--mongodb_conf username=root \
--mongodb_conf password=123456 \
--mongodb_conf database=source_db \
--catalog_conf metastore=hive \
--catalog_conf uri=thrift://hive-metastore:9083 \
--table_conf bucket=4 \
--including_tables 'product|user|address|order|custom'