Overview
This documentation is for an unreleased version of Apache Paimon. We recommend you use the latest stable version.

Overview #

Paimon supports a variety of ways to ingest data into Paimon tables with schema evolution. This means that the added columns are synchronized to the Paimon table in real time and the synchronization job will not be restarted for this purpose.

We currently support the following sync ways:

  1. MySQL Synchronizing Table: synchronize one or multiple tables from MySQL into one Paimon table.
  2. MySQL Synchronizing Database: synchronize the whole MySQL database into one Paimon database.
  3. Program API Sync: synchronize your custom DataStream input into one Paimon table.
  4. Kafka Synchronizing Table: synchronize one Kafka topic’s table into one Paimon table.
  5. Kafka Synchronizing Database: synchronize one Kafka topic containing multiple tables or multiple topics containing one table each into one Paimon database.
  6. MongoDB Synchronizing Collection: synchronize one Collection from MongoDB into one Paimon table.
  7. MongoDB Synchronizing Database: synchronize the whole MongoDB database into one Paimon database.
  8. Pulsar Synchronizing Table: synchronize one Pulsar topic’s table into one Paimon table.
  9. Pulsar Synchronizing Database: synchronize one Pulsar topic containing multiple tables or multiple topics containing one table each into one Paimon database.

What is Schema Evolution #

Suppose we have a MySQL table named tableA, it has three fields: field_1, field_2, field_3. When we want to load this MySQL table to Paimon, we can do this in Flink SQL, or use MySqlSyncTableAction.

Flink SQL:

In Flink SQL, if we change the table schema of the MySQL table after the ingestion, the table schema change will not be synchronized to Paimon.

MySqlSyncTableAction:

In MySqlSyncTableAction, if we change the table schema of the MySQL table after the ingestion, the table schema change will be synchronized to Paimon, and the data of field_4 which is newly added will be synchronized to Paimon too.

Schema Change Evolution #

Cdc Ingestion supports a limited number of schema changes. Currently, the framework can not rename table, drop columns, so the behaviors of RENAME TABLE and DROP COLUMN will be ignored, RENAME COLUMN will add a new column. Currently supported schema changes includes:

  • Adding columns.

  • Altering column types. More specifically,

    • altering from a string type (char, varchar, text) to another string type with longer length,
    • altering from a binary type (binary, varbinary, blob) to another binary type with longer length,
    • altering from an integer type (tinyint, smallint, int, bigint) to another integer type with wider range,
    • altering from a floating-point type (float, double) to another floating-point type with wider range,

    are supported.

Computed Functions #

--computed_column are the definitions of computed columns. The argument field is from source table field name. Supported expressions are:

Function Description
year(date-column)
Extract year from a DATE, DATETIME or TIMESTAMP (or its corresponding string format). Output is an INT value represent the year.
month(date-column)
Extract month of year from a DATE, DATETIME or TIMESTAMP (or its corresponding string format). Output is an INT value represent the month of year.
day(date-column)
Extract day of month from a DATE, DATETIME or TIMESTAMP (or its corresponding string format). Output is an INT value represent the day of month.
hour(date-column)
Extract hour from a DATE, DATETIME or TIMESTAMP (or its corresponding string format). Output is an INT value represent the hour.
minute(date-column)
Extract minute from a DATE, DATETIME or TIMESTAMP (or its corresponding string format). Output is an INT value represent the minute.
second(date-column)
Extract second from a DATE, DATETIME or TIMESTAMP (or its corresponding string format). Output is an INT value represent the second.
date_format(date-column,format)
Convert date format from a DATE, DATETIME or TIMESTAMP (or its corresponding string format). 'format' is compatible with Java's DateTimeFormatter String (for example, 'yyyy-MM-dd'). Output is a string value in converted date format.
substring(column,beginInclusive)
Get column.substring(beginInclusive). Output is a STRING.
substring(column,beginInclusive,endExclusive)
Get column.substring(beginInclusive,endExclusive). Output is a STRING.
truncate(column,width)
truncate column by width. Output type is same with column.If the column is a STRING, truncate(column,width) will truncate the string to width characters, namely `value.substring(0, width)`. If the column is an INT or LONG, truncate(column,width) will truncate the number with the algorithm `v - (((v % W) + W) % W)`. The `redundant` compute part is to keep the result always positive. If the column is a DECIMAL, truncate(column,width) will truncate the decimal with the algorithm: let `scaled_W = decimal(W, scale(v))`, then return `v - (v % scaled_W)`.

Special Data Type Mapping #

  1. MySQL TINYINT(1) type will be mapped to Boolean by default. If you want to store number (-128~127) in it like MySQL, you can specify type mapping option tinyint1-not-bool (Use --type_mapping), then the column will be mapped to TINYINT in Paimon table.
  2. You can use type mapping option to-nullable (Use --type_mapping) to ignore all NOT NULL constraints (except primary keys).
  3. You can use type mapping option to-string (Use --type_mapping) to map all MySQL data type to STRING.
  4. You can use type mapping option char-to-string (Use --type_mapping) to map MySQL CHAR(length)/VARCHAR(length) types to STRING.
  5. You can use type mapping option longtext-to-bytes (Use --type_mapping) to map MySQL LONGTEXT types to BYTES.
  6. MySQL BIGINT UNSIGNED, BIGINT UNSIGNED ZEROFILL, SERIAL will be mapped to DECIMAL(20, 0) by default. You can use type mapping option bigint-unsigned-to-bigint (Use --type_mapping) to map these types to Paimon BIGINT, but there is potential data overflow because BIGINT UNSIGNED can store up to 20 digits integer value but Paimon BIGINT can only store up to 19 digits integer value. So you should ensure the overflow won’t occur when using this option.
  7. MySQL BIT(1) type will be mapped to Boolean.
  8. When using Hive catalog, MySQL TIME type will be mapped to STRING.
  9. MySQL BINARY will be mapped to Paimon VARBINARY. This is because the binary value is passed as bytes in binlog, so it should be mapped to byte type (BYTES or VARBINARY). We choose VARBINARY because it can retain the length information.

Custom Job Settings #

Checkpointing #

Use -Dexecution.checkpointing.interval=<interval> to enable checkpointing and set interval. For 0.7 and later versions, if you haven’t enabled checkpointing, Paimon will enable checkpointing by default and set checkpoint interval to 180 seconds.

Job Name #

Use -Dpipeline.name=<job-name> to set custom synchronization job name.