Overview #
Paimon supports a variety of ways to ingest data into Paimon tables with schema evolution. This means that the added columns are synchronized to the Paimon table in real time and the synchronization job will not be restarted for this purpose.
We currently support the following sync ways:
- MySQL Synchronizing Table: synchronize one or multiple tables from MySQL into one Paimon table.
- MySQL Synchronizing Database: synchronize the whole MySQL database into one Paimon database.
- Program API Sync: synchronize your custom DataStream input into one Paimon table.
- Kafka Synchronizing Table: synchronize one Kafka topic’s table into one Paimon table.
- Kafka Synchronizing Database: synchronize one Kafka topic containing multiple tables or multiple topics containing one table each into one Paimon database.
- MongoDB Synchronizing Collection: synchronize one Collection from MongoDB into one Paimon table.
- MongoDB Synchronizing Database: synchronize the whole MongoDB database into one Paimon database.
- Pulsar Synchronizing Table: synchronize one Pulsar topic’s table into one Paimon table.
- Pulsar Synchronizing Database: synchronize one Pulsar topic containing multiple tables or multiple topics containing one table each into one Paimon database.
What is Schema Evolution #
Suppose we have a MySQL table named tableA
, it has three fields: field_1
, field_2
, field_3
. When we want to load
this MySQL table to Paimon, we can do this in Flink SQL, or use MySqlSyncTableAction.
Flink SQL:
In Flink SQL, if we change the table schema of the MySQL table after the ingestion, the table schema change will not be synchronized to Paimon.
MySqlSyncTableAction:
In MySqlSyncTableAction,
if we change the table schema of the MySQL table after the ingestion, the table schema change will be synchronized to Paimon,
and the data of field_4
which is newly added will be synchronized to Paimon too.
Schema Change Evolution #
Cdc Ingestion supports a limited number of schema changes. Currently, the framework can not rename table, drop columns, so the
behaviors of RENAME TABLE
and DROP COLUMN
will be ignored, RENAME COLUMN
will add a new column. Currently supported schema changes includes:
-
Adding columns.
-
Altering column types. More specifically,
- altering from a string type (char, varchar, text) to another string type with longer length,
- altering from a binary type (binary, varbinary, blob) to another binary type with longer length,
- altering from an integer type (tinyint, smallint, int, bigint) to another integer type with wider range,
- altering from a floating-point type (float, double) to another floating-point type with wider range,
are supported.
Computed Functions #
--computed_column
are the definitions of computed columns. The argument field is from source table field name.
Temporal Functions #
Temporal functions can convert date and epoch time to another form. A common use case is to generate partition values.
Function | Description |
---|---|
year(temporal-column [, precision]) |
Extract year from the input. Output is an INT value represent the year. |
month(temporal-column [, precision]) |
Extract month of year from the input. Output is an INT value represent the month of year. |
day(temporal-column [, precision]) |
Extract day of month from the input. Output is an INT value represent the day of month. |
hour(temporal-column [, precision]) |
Extract hour from the input. Output is an INT value represent the hour. |
minute(temporal-column [, precision]) |
Extract minute from the input. Output is an INT value represent the minute. |
second(temporal-column [, precision]) |
Extract second from the input. Output is an INT value represent the second. |
date_format(temporal-column, format-string [, precision]) |
Convert the input to desired formatted string. Output type is STRING. |
The data type of the temporal-column can be one of the following cases:
- DATE, DATETIME or TIMESTAMP.
- Any integer numeric type (such as INT and BIGINT). In this case, the data will be considered as epoch time of
1970-01-01 00:00:00
. You should set precision of the value (default is 0). - STRING. In this case, if you didn’t set the time unit, the data will be considered as formatted string of DATE, DATETIME or TIMESTAMP value. Otherwise, the data will be considered as string value of epoch time. So you must set time unit in the latter case.
The precision represents the unit of the epoch time. Currently, There are four valid precisions: 0
(for epoch seconds),
3
(for epoch milliseconds), 6
(for epoch microseconds) and 9
(for epoch nanoseconds). Take the time point
1970-01-01 00:00:00.123456789
as an example, the epoch seconds are 0, the epoch milliseconds are 123, the epoch microseconds
are 123456, and the epoch nanoseconds are 123456789. The precision should match the input values. You can set precision
in this way: date_format(epoch_col, yyyy-MM-dd, 0)
.
date_format
is a flexible function which is able to convert the temporal value to various formats with different format
strings. A most common format string is yyyy-MM-dd HH:mm:ss.SSS
. Another example is yyyy-ww
which can extract the year
and the week-of-the-year from the input. Note that the output is affected by the locale. For example, in some regions the
first day of a week is Monday while in others is Sunday, so if you use date_format(date_col, yyyy-ww)
and the input of
date_col is 2024-01-07 (Sunday), the output maybe 2024-01
(if the first day of a week is Monday) or 2024-02
(if the
first day of a week is Sunday).
Other Functions #
Function | Description |
---|---|
substring(column,beginInclusive) |
Get column.substring(beginInclusive). Output is a STRING. |
substring(column,beginInclusive,endExclusive) |
Get column.substring(beginInclusive,endExclusive). Output is a STRING. |
truncate(column,width) |
truncate column by width. Output type is same with column.If the column is a STRING, truncate(column,width) will truncate the string to width characters, namely `value.substring(0, width)`. If the column is an INT or LONG, truncate(column,width) will truncate the number with the algorithm `v - (((v % W) + W) % W)`. The `redundant` compute part is to keep the result always positive. If the column is a DECIMAL, truncate(column,width) will truncate the decimal with the algorithm: let `scaled_W = decimal(W, scale(v))`, then return `v - (v % scaled_W)`. |
cast(value,dataType) |
Get a constant value. The output is an atomic type, such as STRING, INT, BOOLEAN, etc. |
Special Data Type Mapping #
- MySQL TINYINT(1) type will be mapped to Boolean by default. If you want to store number (-128~127) in it like MySQL,
you can specify type mapping option
tinyint1-not-bool
(Use--type_mapping
), then the column will be mapped to TINYINT in Paimon table. - You can use type mapping option
to-nullable
(Use--type_mapping
) to ignore all NOT NULL constraints (except primary keys). - You can use type mapping option
to-string
(Use--type_mapping
) to map all MySQL data type to STRING. - You can use type mapping option
char-to-string
(Use--type_mapping
) to map MySQL CHAR(length)/VARCHAR(length) types to STRING. - You can use type mapping option
longtext-to-bytes
(Use--type_mapping
) to map MySQL LONGTEXT types to BYTES. - MySQL
BIGINT UNSIGNED
,BIGINT UNSIGNED ZEROFILL
,SERIAL
will be mapped toDECIMAL(20, 0)
by default. You can use type mapping optionbigint-unsigned-to-bigint
(Use--type_mapping
) to map these types to PaimonBIGINT
, but there is potential data overflow becauseBIGINT UNSIGNED
can store up to 20 digits integer value but PaimonBIGINT
can only store up to 19 digits integer value. So you should ensure the overflow won’t occur when using this option. - MySQL BIT(1) type will be mapped to Boolean.
- When using Hive catalog, MySQL TIME type will be mapped to STRING.
- MySQL BINARY will be mapped to Paimon VARBINARY. This is because the binary value is passed as bytes in binlog, so it should be mapped to byte type (BYTES or VARBINARY). We choose VARBINARY because it can retain the length information.
Custom Job Settings #
Checkpointing #
Use -Dexecution.checkpointing.interval=<interval>
to enable checkpointing and set interval. For 0.7 and later versions,
if you haven’t enabled checkpointing, Paimon will enable checkpointing by default and set checkpoint interval to 180 seconds.
Job Name #
Use -Dpipeline.name=<job-name>
to set custom synchronization job name.
table configuration #
You can use --table_conf
to set table properties and some flink job properties (like sink.parallelism
). If the table is
created by the cdc job, the table’s properties will be equal to the given properties. Otherwise, the job will use the given
properties to alter table’s properties. But note that immutable options (like merge-engine
) and bucket number won’t be altered.