This documentation is for an unreleased version of Apache Paimon. We recommend you use the latest stable version.
Procedures #
Flink 1.18 and later versions support Call Statements, which make it easier to manipulate data and metadata of Paimon table by writing SQLs instead of submitting Flink jobs.
In 1.18, the procedure only supports passing arguments by position. You must pass all arguments in order, and if you
don’t want to pass some arguments, you must use ''
as placeholder. For example, if you want to compact table default.t
with parallelism 4, but you don’t want to specify partitions and sort strategy, the call statement should be
CALL sys.compact('default.t', '', '', '', 'sink.parallelism=4')
.
In higher versions, the procedure supports passing arguments by name. You can pass arguments in any order and any optional
argument can be omitted. For the above example, the call statement is
CALL sys.compact(`table` => 'default.t', options => 'sink.parallelism=4')
.
Specify partitions: we use string to represent partition filter. “,” means “AND” and “;” means “OR”. For example, if you want to specify two partitions date=01 and date=02, you need to write ‘date=01;date=02’; If you want to specify one partition with date=01 and day=01, you need to write ‘date=01,day=01’.
Table options syntax: we use string to represent table options. The format is ‘key1=value1,key2=value2…’.
All available procedures are listed below.
Procedure Name | Usage | Explanation | Example |
---|---|---|---|
compact |
-- Use named argument CALL [catalog.]sys.compact( `table` => 'table', partitions => 'partitions', order_strategy => 'order_strategy', order_by => 'order_by', options => 'options', `where` => 'where', partition_idle_time => 'partition_idle_time', compact_strategy => 'compact_strategy') -- Use indexed argument CALL [catalog.]sys.compact('table') CALL [catalog.]sys.compact('table', 'partitions') CALL [catalog.]sys.compact('table', 'order_strategy', 'order_by') CALL [catalog.]sys.compact('table', 'partitions', 'order_strategy', 'order_by') CALL [catalog.]sys.compact('table', 'partitions', 'order_strategy', 'order_by', 'options') CALL [catalog.]sys.compact('table', 'partitions', 'order_strategy', 'order_by', 'options', 'where') CALL [catalog.]sys.compact('table', 'partitions', 'order_strategy', 'order_by', 'options', 'where', 'partition_idle_time') CALL [catalog.]sys.compact('table', 'partitions', 'order_strategy', 'order_by', 'options', 'where', 'partition_idle_time', 'compact_strategy') |
To compact a table. Arguments:
|
-- use partition filter CALL sys.compact(`table` => 'default.T', partitions => 'p=0', order_strategy => 'zorder', order_by => 'a,b', options => 'sink.parallelism=4') -- use partition predicate CALL sys.compact(`table` => 'default.T', `where` => 'dt>10 and h<20', order_strategy => 'zorder', order_by => 'a,b', options => 'sink.parallelism=4') |
compact_database |
-- Use named argument CALL [catalog.]sys.compact_database( including_databases => 'includingDatabases', mode => 'mode', including_tables => 'includingTables', excluding_tables => 'excludingTables', table_options => 'tableOptions', partition_idle_time => 'partitionIdleTime', compact_strategy => 'compact_strategy') -- Use indexed argument CALL [catalog.]sys.compact_database() CALL [catalog.]sys.compact_database('includingDatabases') CALL [catalog.]sys.compact_database('includingDatabases', 'mode') CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables') CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables') CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions') CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions', 'partitionIdleTime') CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions', 'partitionIdleTime', 'compact_strategy') |
To compact databases. Arguments:
|
CALL sys.compact_database( including_databases => 'db1|db2', mode => 'combined', including_tables => 'table_.*', excluding_tables => 'ignore', table_options => 'sink.parallelism=4', compat_strategy => 'full') |
create_tag |
-- Use named argument -- based on the specified snapshot CALL [catalog.]sys.create_tag(`table` => 'identifier', tag => 'tagName', snapshot_id => snapshotId) -- based on the latest snapshot CALL [catalog.]sys.create_tag(`table` => 'identifier', tag => 'tagName') -- Use indexed argument -- based on the specified snapshot CALL [catalog.]sys.create_tag('identifier', 'tagName', snapshotId) -- based on the latest snapshot CALL [catalog.]sys.create_tag('identifier', 'tagName') |
To create a tag based on given snapshot. Arguments:
|
CALL sys.create_tag(`table` => 'default.T', tag => 'my_tag', snapshot_id => cast(10 as bigint), time_retained => '1 d') |
create_tag_from_timestamp |
-- Create a tag from the first snapshot whose commit-time greater than the specified timestamp. -- Use named argument CALL [catalog.]sys.create_tag_from_timestamp(`table` => 'identifier', tag => 'tagName', timestamp => timestamp, time_retained => time_retained) -- Use indexed argument CALL [catalog.]sys.create_tag_from_timestamp('identifier', 'tagName', timestamp, time_retained) |
To create a tag based on given timestamp. Arguments:
|
-- for Flink 1.18 CALL sys.create_tag_from_timestamp('default.T', 'my_tag', 1724404318750, '1 d') -- for Flink 1.19 and later CALL sys.create_tag_from_timestamp(`table` => 'default.T', `tag` => 'my_tag', `timestamp` => 1724404318750, time_retained => '1 d') |
create_tag_from_watermark |
-- Create a tag from the first snapshot whose watermark greater than the specified timestamp. -- Use named argument CALL [catalog.]sys.create_tag_from_watermark(`table` => 'identifier', tag => 'tagName', watermark => watermark, time_retained => time_retained) -- Use indexed argument CALL [catalog.]sys.create_tag_from_watermark('identifier', 'tagName', watermark, time_retained) |
To create a tag based on given watermark timestamp. Arguments:
|
-- for Flink 1.18 CALL sys.create_tag_from_watermark('default.T', 'my_tag', 1724404318750, '1 d') -- for Flink 1.19 and later CALL sys.create_tag_from_watermark(`table` => 'default.T', `tag` => 'my_tag', `watermark` => 1724404318750, time_retained => '1 d') |
delete_tag |
-- Use named argument CALL [catalog.]sys.delete_tag(`table` => 'identifier', tag => 'tagName') -- Use indexed argument CALL [catalog.]sys.delete_tag('identifier', 'tagName') |
To delete a tag. Arguments:
|
CALL sys.delete_tag(`table` => 'default.T', tag => 'my_tag') |
replace_tag |
-- Use named argument -- replace tag with new time retained CALL [catalog.]sys.replace_tag(`table` => 'identifier', tag => 'tagName', time_retained => 'timeRetained') -- replace tag with new snapshot id and time retained CALL [catalog.]sys.replace_tag(`table` => 'identifier', snapshot_id => 'snapshotId') -- Use indexed argument -- replace tag with new snapshot id and time retained CALL [catalog.]sys.replace_tag('identifier', 'tagName', 'snapshotId', 'timeRetained') |
To replace an existing tag with new tag info. Arguments:
|
-- for Flink 1.18 CALL sys.replace_tag('default.T', 'my_tag', 5, '1 d') -- for Flink 1.19 and later CALL sys.replace_tag(`table` => 'default.T', tag => 'my_tag', snapshot_id => 5, time_retained => '1 d') |
expire_tags | CALL [catalog.]sys.expire_tags('identifier', 'older_than') |
To expire tags by time. Arguments:
|
CALL sys.expire_tags(table => 'default.T', older_than => '2024-09-06 11:00:00') |
merge_into |
-- for Flink 1.18 CALL [catalog].sys.merge_into('identifier','targetAlias', 'sourceSqls','sourceTable','mergeCondition', 'matchedUpsertCondition','matchedUpsertSetting', 'notMatchedInsertCondition','notMatchedInsertValues', 'matchedDeleteCondition') -- for Flink 1.19 and later CALL [catalog].sys.merge_into( target_table => 'identifier', target_alias => 'targetAlias', source_sqls => 'sourceSqls', source_table => 'sourceTable', merge_condition => 'mergeCondition', matched_upsert_condition => 'matchedUpsertCondition', matched_upsert_setting => 'matchedUpsertSetting', not_matched_insert_condition => 'notMatchedInsertCondition', not_matched_insert_values => 'notMatchedInsertValues', matched_delete_condition => 'matchedDeleteCondition', not_matched_by_source_upsert_condition => 'notMatchedBySourceUpsertCondition', not_matched_by_source_upsert_setting => 'notMatchedBySourceUpsertSetting', not_matched_by_source_delete_condition => 'notMatchedBySourceDeleteCondition') |
To perform "MERGE INTO" syntax. See merge_into action for details of arguments. |
-- for matched order rows, -- increase the price, -- and if there is no match, -- insert the order from -- the source table -- for Flink 1.18 CALL [catalog].sys.merge_into('default.T','','','default.S','T.id=S.order_id','','price=T.price+20','','*','') -- for Flink 1.19 and later CALL sys.merge_into( target_table => 'default.T', source_table => 'default.S', merge_condition => 'T.id=S.order_id', matched_upsert_setting => 'price=T.price+20', not_matched_insert_values => '*') |
remove_orphan_files |
-- Use named argument CALL [catalog.]sys.remove_orphan_files(`table` => 'identifier', older_than => 'olderThan', dry_run => 'dryRun', mode => 'mode') -- Use indexed argument CALL [catalog.]sys.remove_orphan_files('identifier') CALL [catalog.]sys.remove_orphan_files('identifier', 'olderThan') CALL [catalog.]sys.remove_orphan_files('identifier', 'olderThan', 'dryRun') CALL [catalog.]sys.remove_orphan_files('identifier', 'olderThan', 'dryRun','parallelism') CALL [catalog.]sys.remove_orphan_files('identifier', 'olderThan', 'dryRun','parallelism','mode') |
To remove the orphan data files and metadata files. Arguments:
|
CALL remove_orphan_files(`table` => 'default.T', older_than => '2023-10-31 12:00:00') CALL remove_orphan_files(`table` => 'default.*', older_than => '2023-10-31 12:00:00') CALL remove_orphan_files(`table` => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true) CALL remove_orphan_files(`table` => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => false, parallelism => '5') CALL remove_orphan_files(`table` => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => false, parallelism => '5', mode => 'local') |
remove_unexisting_files |
-- Use named argument CALL [catalog.]sys.remove_unexisting_files(`table` => 'identifier', dry_run => 'dryRun', parallelism => 'parallelism') -- Use indexed argument CALL [catalog.]sys.remove_unexisting_files('identifier') CALL [catalog.]sys.remove_unexisting_files('identifier', 'dryRun', 'parallelism') |
Procedure to remove unexisting data files from manifest entries. See Java docs for detailed use cases. Arguments:
Note that user is on his own risk using this procedure, which may cause data loss when used outside from the use cases listed in Java docs. |
-- remove unexisting data files in the table `mydb.myt`
CALL sys.remove_unexisting_files(`table` => 'mydb.myt')
-- only check what files will be removed, but not really remove them (dry run) CALL sys.remove_unexisting_files(`table` => 'mydb.myt', `dry_run` = true) |
reset_consumer |
-- Use named argument CALL [catalog.]sys.reset_consumer(`table` => 'identifier', consumer_id => 'consumerId', next_snapshot_id => 'nextSnapshotId') -- Use indexed argument -- reset the new next snapshot id in the consumer CALL [catalog.]sys.reset_consumer('identifier', 'consumerId', nextSnapshotId) -- delete consumer CALL [catalog.]sys.reset_consumer('identifier', 'consumerId') |
To reset or delete consumer. Arguments:
|
CALL sys.reset_consumer(`table` => 'default.T', consumer_id => 'myid', next_snapshot_id => cast(10 as bigint)) |
rollback_to |
-- for Flink 1.18 -- rollback to a snapshot CALL sys.rollback_to('identifier', snapshotId) -- rollback to a tag CALL sys.rollback_to('identifier', 'tagName') -- for Flink 1.19 and later -- rollback to a snapshot CALL sys.rollback_to(`table` => 'identifier', snapshot_id => snapshotId) -- rollback to a tag CALL sys.rollback_to(`table` => 'identifier', tag => 'tagName') |
To rollback to a specific version of target table. Argument:
|
-- for Flink 1.18 CALL sys.rollback_to('default.T', 10) -- for Flink 1.19 and later CALL sys.rollback_to(`table` => 'default.T', snapshot_id => 10) |
rollback_to_timestamp |
-- for Flink 1.18 -- rollback to the snapshot which earlier or equal than timestamp. CALL sys.rollback_to_timestamp('identifier', timestamp) -- for Flink 1.19 and later -- rollback to the snapshot which earlier or equal than timestamp. CALL sys.rollback_to_timestamp(`table` => 'default.T', `timestamp` => timestamp) |
To rollback to the snapshot which earlier or equal than timestamp. Argument:
|
-- for Flink 1.18 CALL sys.rollback_to_timestamp('default.T', 10) -- for Flink 1.19 and later CALL sys.rollback_to_timestamp(`table` => 'default.T', timestamp => 1730292023000) |
rollback_to_watermark |
-- for Flink 1.18 -- rollback to the snapshot which earlier or equal than watermark. CALL sys.rollback_to_watermark('identifier', watermark) -- for Flink 1.19 and later -- rollback to the snapshot which earlier or equal than watermark. CALL sys.rollback_to_watermark(`table` => 'default.T', `watermark` => watermark) |
To rollback to the snapshot which earlier or equal than watermark. Argument:
|
-- for Flink 1.18 CALL sys.rollback_to_watermark('default.T', 1730292023000) -- for Flink 1.19 and later CALL sys.rollback_to_watermark(`table` => 'default.T', watermark => 1730292023000) |
purge_files |
-- for Flink 1.18 -- clear table with purge files directly. CALL sys.purge_files('identifier') -- for Flink 1.19 and later -- clear table with purge files directly. CALL sys.purge_files(`table` => 'default.T') |
To clear table with purge files directly. Argument:
|
-- for Flink 1.18 CALL sys.purge_files('default.T') -- for Flink 1.19 and later CALL sys.purge_files(`table` => 'default.T') |
expire_snapshots |
-- Use named argument CALL [catalog.]sys.expire_snapshots( `table` => 'identifier', retain_max => 'retain_max', retain_min => 'retain_min', older_than => 'older_than', max_deletes => 'max_deletes') -- Use indexed argument -- for Flink 1.18 CALL sys.expire_snapshots(table, retain_max) -- for Flink 1.19 and later CALL sys.expire_snapshots(table, retain_max, retain_min, older_than, max_deletes) |
To expire snapshots. Argument:
|
-- for Flink 1.18 CALL sys.expire_snapshots('default.T', 2) -- for Flink 1.19 and later CALL sys.expire_snapshots(`table` => 'default.T', retain_max => 2) CALL sys.expire_snapshots(`table` => 'default.T', older_than => '2024-01-01 12:00:00') CALL sys.expire_snapshots(`table` => 'default.T', older_than => '2024-01-01 12:00:00', retain_min => 10) CALL sys.expire_snapshots(`table` => 'default.T', older_than => '2024-01-01 12:00:00', max_deletes => 10) |
expire_partitions |
CALL sys.expire_partitions(table, expiration_time, timestamp_formatter, expire_strategy) |
To expire partitions. Argument:
|
-- for Flink 1.18 CALL sys.expire_partitions('default.T', '1 d', 'yyyy-MM-dd', '$dt', 'values-time') -- for Flink 1.19 and later CALL sys.expire_partitions(`table` => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd', expire_strategy => 'values-time') CALL sys.expire_partitions(`table` => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd HH:mm', timestamp_pattern => '$dt $hm', expire_strategy => 'values-time') |
repair |
-- repair all databases and tables in catalog CALL sys.repair() -- repair all tables in a specific database CALL sys.repair('databaseName') -- repair a table CALL sys.repair('databaseName.tableName') -- repair database and table in a string if you specify multiple tags, delimiter is ',' CALL sys.repair('databaseName01,database02.tableName01,database03') |
Synchronize information from the file system to Metastore. Argument:
|
CALL sys.repair(`table` => 'test_db.T') |
rewrite_file_index |
-- Use named argument CALL sys.rewrite_file_index(<`table` => identifier> [, <partitions => partitions>]) -- Use indexed argument CALL sys.rewrite_file_index(<identifier> [, <partitions>]) |
Rewrite the file index for the table. Argument:
|
-- rewrite the file index for the whole table CALL sys.rewrite_file_index(`table` => 'test_db.T') -- repair all tables in a specific partition CALL sys.rewrite_file_index(`table` => 'test_db.T', partitions => 'pt=a') |
create_branch |
-- Use named argument CALL [catalog.]sys.create_branch(`table` => 'identifier', branch => 'branchName', tag => 'tagName') -- Use indexed argument -- based on the specified tag CALL [catalog.]sys.create_branch('identifier', 'branchName', 'tagName') -- create empty branch CALL [catalog.]sys.create_branch('identifier', 'branchName') |
To create a branch based on given tag, or just create empty branch. Arguments:
|
CALL sys.create_branch(`table` => 'default.T', branch => 'branch1', tag => 'tag1') CALL sys.create_branch(`table` => 'default.T', branch => 'branch1') |
delete_branch |
-- Use named argument CALL [catalog.]sys.delete_branch(`table` => 'identifier', branch => 'branchName') -- Use indexed argument CALL [catalog.]sys.delete_branch('identifier', 'branchName') |
To delete a branch. Arguments:
|
CALL sys.delete_branch(`table` => 'default.T', branch => 'branch1') |
fast_forward |
-- Use named argument CALL [catalog.]sys.fast_forward(`table` => 'identifier', branch => 'branchName') -- Use indexed argument CALL [catalog.]sys.fast_forward('identifier', 'branchName') |
To fast_forward a branch to main branch. Arguments:
|
CALL sys.fast_forward(`table` => 'default.T', branch => 'branch1') |