Flink
This documentation is for an unreleased version of Apache Paimon. We recommend you use the latest stable version.

Flink #

This documentation is a guide for using Paimon in Flink.

Preparing Paimon Jar File #

Paimon currently supports Flink 1.17, 1.16, 1.15 and 1.14. We recommend the latest Flink version for a better experience.

Download the jar file with corresponding version.

Currently, paimon provides two types jar: one of which(the bundled jar) is used for read/write data, and the other(action jar) for operations such as manually compaction,

Version Type Jar
Flink 1.18 Bundled Jar paimon-flink-1.18-0.8-SNAPSHOT.jar
Flink 1.17 Bundled Jar paimon-flink-1.17-0.8-SNAPSHOT.jar
Flink 1.16 Bundled Jar paimon-flink-1.16-0.8-SNAPSHOT.jar
Flink 1.15 Bundled Jar paimon-flink-1.15-0.8-SNAPSHOT.jar
Flink 1.14 Bundled Jar paimon-flink-1.14-0.8-SNAPSHOT.jar
Flink Action Action Jar paimon-flink-action-0.8-SNAPSHOT.jar

You can also manually build bundled jar from the source code.

To build from source code, clone the git repository.

Build bundled jar with the following command.

  • mvn clean install -DskipTests

You can find the bundled jar in ./paimon-flink/paimon-flink-<flink-version>/target/paimon-flink-<flink-version>-0.8-SNAPSHOT.jar, and the action jar in ./paimon-flink/paimon-flink-action/target/paimon-flink-action-0.8-SNAPSHOT.jar.

Quick Start #

Step 1: Download Flink

If you haven’t downloaded Flink, you can download Flink, then extract the archive with the following command.

tar -xzf flink-*.tgz

Step 2: Copy Paimon Bundled Jar

Copy paimon bundled jar to the lib directory of your Flink home.

cp paimon-flink-*.jar <FLINK_HOME>/lib/

Step 3: Copy Hadoop Bundled Jar

If the machine is in a hadoop environment, please ensure the value of the environment variable HADOOP_CLASSPATH include path to the common Hadoop libraries, you do not need to use the following pre-bundled Hadoop jar.

Download Pre-bundled Hadoop jar and copy the jar file to the lib directory of your Flink home.

cp flink-shaded-hadoop-2-uber-*.jar <FLINK_HOME>/lib/

Step 4: Start a Flink Local Cluster

In order to run multiple Flink jobs at the same time, you need to modify the cluster configuration in <FLINK_HOME>/conf/flink-conf.yaml.

taskmanager.numberOfTaskSlots: 2

To start a local cluster, run the bash script that comes with Flink:

<FLINK_HOME>/bin/start-cluster.sh

You should be able to navigate to the web UI at localhost:8081 to view the Flink dashboard and see that the cluster is up and running.

You can now start Flink SQL client to execute SQL scripts.

<FLINK_HOME>/bin/sql-client.sh

Step 5: Create a Catalog and a Table

-- if you're trying out Paimon in a distributed environment,
-- the warehouse path should be set to a shared file system, such as HDFS or OSS
CREATE CATALOG my_catalog WITH (
    'type'='paimon',
    'warehouse'='file:/tmp/paimon'
);

USE CATALOG my_catalog;

-- create a word count table
CREATE TABLE word_count (
    word STRING PRIMARY KEY NOT ENFORCED,
    cnt BIGINT
);

Using FlinkGenericCatalog, you need to use Hive metastore. Then, you can use all the tables from Paimon, Hive, and Flink Generic Tables (Kafka and other tables)!

In this mode, you should use ‘connector’ option for creating tables.

Paimon will use hive.metastore.warehouse.dir in your hive-site.xml, please use path with scheme. For example, hdfs://.... Otherwise, Paimon will use the local path.
CREATE CATALOG my_catalog WITH (
    'type'='paimon-generic',
    'hive-conf-dir'='...',
    'hadoop-conf-dir'='...'
);

USE CATALOG my_catalog;

-- create a word count table
CREATE TABLE word_count (
    word STRING PRIMARY KEY NOT ENFORCED,
    cnt BIGINT
) WITH (
    'connector'='paimon'
);

Step 6: Write Data

-- create a word data generator table
CREATE TEMPORARY TABLE word_table (
    word STRING
) WITH (
    'connector' = 'datagen',
    'fields.word.length' = '1'
);

-- paimon requires checkpoint interval in streaming mode
SET 'execution.checkpointing.interval' = '10 s';

-- write streaming data to dynamic table
INSERT INTO word_count SELECT word, COUNT(*) FROM word_table GROUP BY word;

Step 7: OLAP Query

-- use tableau result mode
SET 'sql-client.execution.result-mode' = 'tableau';

-- switch to batch mode
RESET 'execution.checkpointing.interval';
SET 'execution.runtime-mode' = 'batch';

-- olap query the table
SELECT * FROM word_count;

You can execute the query multiple times and observe the changes in the results.

Step 8: Streaming Query

-- switch to streaming mode
SET 'execution.runtime-mode' = 'streaming';

-- track the changes of table and calculate the count interval statistics
SELECT `interval`, COUNT(*) AS interval_cnt FROM
    (SELECT cnt / 10000 AS `interval` FROM word_count) GROUP BY `interval`;

Step 9: Exit

Cancel streaming job in localhost:8081, then execute the following SQL script to exit Flink SQL client.

-- uncomment the following line if you want to drop the dynamic table and clear the files
-- DROP TABLE word_count;

-- exit sql-client
EXIT;

Stop the Flink local cluster.

./bin/stop-cluster.sh

Savepoint and recover #

Because Paimon has its own snapshot management, this may conflict with Flink’s checkpoint management, causing exceptions when restoring from savepoint (don’t worry, it will not cause the storage to be damaged).

It is recommended that you use the following methods to savepoint:

  1. Use Stop with savepoint.
  2. Use Tag with savepoint, and rollback-to-tag before restoring from savepoint.

Using Action Jar #

After the Flink Local Cluster has been started, you can execute the action jar by using the following command

<FLINK_HOME>/bin/flink run \
 /path/to/paimon-flink-action-0.8-SNAPSHOT.jar \
 <action>
 <args>

The following command will used to compact a table

Batch
<FLINK_HOME>/bin/flink run \
 /path/to/paimon-flink-action-0.8-SNAPSHOT.jar \
 compact \
 --path <TABLE_PATH>

See Flink Data Types.

All Flink data types are supported, except that

  • MULTISET is not supported.
  • MAP is not supported as primary keys.

Paimon tasks can create memory pools based on executor memory which will be managed by Flink executor, such as managed memory in Flink task manager. It will improve the stability and performance of sinks by managing writer buffers for multiple tasks through executor.

The following properties can be set if using Flink managed memory:

Option Default Description
sink.use-managed-memory-allocator false If true, flink sink will use managed memory for merge tree; otherwise, it will create an independent memory allocator, which means each task allocates and manages its own memory pool (heap memory), if there are too many tasks in one Executor, it may cause performance issues and even OOM.
sink.managed.writer-buffer-memory 256M Weight of writer buffer in managed memory, Flink will compute the memory size, for writer according to the weight, the actual memory used depends on the running environment. Now the memory size defined in this property are equals to the exact memory allocated to write buffer in runtime.

Use In SQL Users can set memory weight in SQL for Flink Managed Memory, then Flink sink operator will get the memory pool size and create allocator for Paimon writer.

INSERT INTO paimon_table /*+ OPTIONS('sink.use-managed-memory-allocator'='true', 'sink.managed.writer-buffer-memory'='256M') */
SELECT * FROM ....;

Setting dynamic options #

When interacting with the Paimon table, table options can be tuned without changing the options in the catalog. Paimon will extract job-level dynamic options and take effect in the current session. The dynamic option’s key format is paimon.${catalogName}.${dbName}.${tableName}.${config_key}. The catalogName/dbName/tableName can be *, which means matching all the specific parts.

For example:

-- set scan.timestamp-millis=1697018249000 for the table mycatalog.default.T
SET 'paimon.mycatalog.default.T.scan.timestamp-millis' = '1697018249000';
SELECT * FROM T;

-- set scan.timestamp-millis=1697018249000 for the table default.T in any catalog
SET 'paimon.*.default.T.scan.timestamp-millis' = '1697018249000';
SELECT * FROM T;

Procedures #

Flink 1.18 and later versions support Call Statements, which make it easier to manipulate data and metadata of Paimon table by writing SQLs instead of submitting Flink jobs. All available procedures are listed below. Note that when you call a procedure, you must pass all parameters in order, and if you don’t want to pass some parameters, you must use '' as placeholder. For example, if you want to compact table default.t with parallelism 4, but you don’t want to specify partitions and sort strategy, the call statement should be
CALL sys.compact('default.t', '', '', '', 'sink.parallelism=4').

Specify partitions: we use string to represent partition filter. “,” means “AND” and “;” means “OR”. For example, if you want to specify two partitions date=01 and date=02, you need to write ‘date=01;date=02’; If you want to specify one partition with date=01 and day=01, you need to write ‘date=01,day=01’.

table options syntax: we use string to represent table options. The format is ‘key1=value1,key2=value2…’.

Procedure Name Usage Explaination Example
compact CALL [catalog.]sys.compact('identifier')

CALL [catalog.]sys.compact('identifier', 'partitions')

CALL [catalog.]sys.compact('identifier', 'partitions', 'order_strategy', 'order_columns', 'table_options')
TO compact a table. Arguments:
  • identifier: the target table identifier. Cannot be empty.
  • partitions: partition filter.
  • order_strategy: 'order' or 'zorder' or 'none'. Left empty for 'none'.
  • order_columns: the columns need to be sort. Left empty if 'order_strategy' is 'none'.
  • table_options: additional dynamic options of the table.
  • CALL sys.compact('default.T', 'p=0', 'zorder', 'a,b', 'sink.parallelism=4')
    compact_database CALL [catalog.]sys.compact_database()

    CALL [catalog.]sys.compact_database('includingDatabases')

    CALL [catalog.]sys.compact_database('includingDatabases', 'mode')

    CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables')

    CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables')

    CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions')
    To compact databases. Arguments:
  • includingDatabases: to specify databases. You can use regular expression.
  • mode: compact mode. "divided": start a sink for each table, detecting the new table requires restarting the job; "combined" (default): start a single combined sink for all tables, the new table will be automatically detected.
  • includingTables: to specify tables. You can use regular expression.
  • excludingTables: to specify tables that are not compacted. You can use regular expression.
  • tableOptions: additional dynamic options of the table.
  • CALL sys.compact_database('db1|db2', 'combined', 'table_.*', 'ignore', 'sink.parallelism=4')
    create_tag CALL [catalog.]sys.create_tag('identifier', 'tagName', snapshotId) To create a tag based on given snapshot. Arguments:
  • identifier: the target table identifier. Cannot be empty.
  • tagName: name of the new tag.
  • snapshotId (Long): id of the snapshot which the new tag is based on.
  • CALL sys.create_tag('default.T', 'my_tag', 10)
    delete_tag CALL [catalog.]sys.delete_tag('identifier', 'tagName') To delete a tag. Arguments:
  • identifier: the target table identifier. Cannot be empty.
  • tagName: name of the tag to be deleted.
  • CALL sys.delete_tag('default.T', 'my_tag')
    merge_into -- when matched then upsert
    CALL [catalog.]sys.merge_into('identifier','targetAlias',
    'sourceSqls','sourceTable','mergeCondition',
    'matchedUpsertCondition','matchedUpsertSetting')

    -- when matched then upsert; when not matched then insert
    CALL [catalog.]sys.merge_into('identifier','targetAlias',
    'sourceSqls','sourceTable','mergeCondition',
    'matchedUpsertCondition','matchedUpsertSetting',
    'notMatchedInsertCondition','notMatchedInsertValues')

    -- when matched then delete
    CALL [catalog].sys.merge_into('identifier','targetAlias',
    'sourceSqls','sourceTable','mergeCondition',
    'matchedDeleteCondition')

    -- when matched then upsert + delete;
    -- when not matched then insert
    CALL [catalog].sys.merge_into('identifier','targetAlias',
    'sourceSqls','sourceTable','mergeCondition',
    'matchedUpsertCondition','matchedUpsertSetting',
    'notMatchedInsertCondition','notMatchedInsertValues',
    'matchedDeleteCondition')

    To perform "MERGE INTO" syntax. See merge_into action for details of arguments. -- for matched order rows,
    -- increase the price,
    -- and if there is no match,
    -- insert the order from
    -- the source table
    CALL sys.merge_into('default.T', '', '', 'default.S', 'T.id=S.order_id', '', 'price=T.price+20', '', '*')
    remove_orphan_files CALL [catalog.]sys.remove_orphan_files('identifier')

    CALL [catalog.]sys.remove_orphan_files('identifier', 'olderThan')
    To remove the orphan data files and metadata files. Arguments:
  • identifier: the target table identifier. Cannot be empty.
  • olderThan: to avoid deleting newly written files, this procedure only deletes orphan files older than 1 day by default. This argument can modify the interval.
  • CALL remove_orphan_files('default.T', '2023-10-31 12:00:00')
    reset_consumer -- reset the new next snapshot id in the consumer
    CALL [catalog.]sys.reset_consumer('identifier', 'consumerId', nextSnapshotId)

    -- delete consumer
    CALL [catalog.]sys.reset_consumer('identifier', 'consumerId')
    To reset or delete consumer. Arguments:
  • identifier: the target table identifier. Cannot be empty.
  • consumerId: consumer to be reset or deleted.
  • nextSnapshotId (Long): the new next snapshot id of the consumer.
  • CALL sys.reset_consumer('default.T', 'myid', 10)
    rollback_to -- rollback to a snapshot
    CALL sys.rollback_to('identifier', snapshotId)

    -- rollback to a tag
    CALL sys.rollback_to('identifier', 'tagName')
    To rollback to a specific version of target table. Argument:
  • identifier: the target table identifier. Cannot be empty.
  • snapshotId (Long): id of the snapshot that will roll back to.
  • tagName: name of the tag that will roll back to.
  • CALL sys.rollback_to('default.T', 10)
    expire_snapshots -- expires snapshot
    CALL sys.expire_snapshots('identifier', retainMax)

    To expire snapshots. Argument:
  • identifier: the target table identifier. Cannot be empty.
  • retainMax: the maximum number of completed snapshots to retain.
  • CALL sys.expire_snapshots('default.T', 2)