Procedures

Procedures #

Flink 1.18 and later versions support Call Statements, which make it easier to manipulate data and metadata of Paimon table by writing SQLs instead of submitting Flink jobs.

In 1.18, the procedure only supports passing arguments by position. You must pass all arguments in order, and if you don’t want to pass some arguments, you must use '' as placeholder. For example, if you want to compact table default.t with parallelism 4, but you don’t want to specify partitions and sort strategy, the call statement should be
CALL sys.compact('default.t', '', '', '', 'sink.parallelism=4').

In higher versions, the procedure supports passing arguments by name. You can pass arguments in any order and any optional argument can be omitted. For the above example, the call statement is
CALL sys.compact(`table` => 'default.t', options => 'sink.parallelism=4').

Specify partitions: we use string to represent partition filter. “,” means “AND” and “;” means “OR”. For example, if you want to specify two partitions date=01 and date=02, you need to write ‘date=01;date=02’; If you want to specify one partition with date=01 and day=01, you need to write ‘date=01,day=01’.

Table options syntax: we use string to represent table options. The format is ‘key1=value1,key2=value2…’.

All available procedures are listed below.

Procedure Name Usage Explanation Example
compact -- Use named argument
CALL [catalog.]sys.compact( `table` => 'table', partitions => 'partitions', order_strategy => 'order_strategy', order_by => 'order_by', options => 'options', `where` => 'where', partition_idle_time => 'partition_idle_time', compact_strategy => 'compact_strategy')

-- Use indexed argument
CALL [catalog.]sys.compact('table')

CALL [catalog.]sys.compact('table', 'partitions')

CALL [catalog.]sys.compact('table', 'order_strategy', 'order_by')

CALL [catalog.]sys.compact('table', 'partitions', 'order_strategy', 'order_by')

CALL [catalog.]sys.compact('table', 'partitions', 'order_strategy', 'order_by', 'options')

CALL [catalog.]sys.compact('table', 'partitions', 'order_strategy', 'order_by', 'options', 'where')

CALL [catalog.]sys.compact('table', 'partitions', 'order_strategy', 'order_by', 'options', 'where', 'partition_idle_time')

CALL [catalog.]sys.compact('table', 'partitions', 'order_strategy', 'order_by', 'options', 'where', 'partition_idle_time', 'compact_strategy')

To compact a table. Arguments:
  • table(required): the target table identifier.
  • partitions(optional): partition filter.
  • order_strategy(optional): 'order' or 'zorder' or 'hilbert' or 'none'.
  • order_by(optional): the columns need to be sort. Left empty if 'order_strategy' is 'none'.
  • options(optional): additional dynamic options of the table.
  • where(optional): partition predicate(Can't be used together with "partitions"). Note: as where is a keyword,a pair of backticks need to add around like `where`.
  • partition_idle_time(optional): this is used to do a full compaction for partition which had not received any new data for 'partition_idle_time'. And only these partitions will be compacted. This argument can not be used with order compact.
  • compact_strategy(optional): this determines how to pick files to be merged, the default is determined by the runtime execution mode. 'full' strategy only supports batch mode. All files will be selected for merging. 'minor' strategy: Pick the set of files that need to be merged based on specified conditions.
  • -- use partition filter
    CALL sys.compact(`table` => 'default.T', partitions => 'p=0', order_strategy => 'zorder', order_by => 'a,b', options => 'sink.parallelism=4')
    -- use partition predicate
    CALL sys.compact(`table` => 'default.T', `where` => 'dt>10 and h<20', order_strategy => 'zorder', order_by => 'a,b', options => 'sink.parallelism=4')
    compact_database -- Use named argument
    CALL [catalog.]sys.compact_database( including_databases => 'includingDatabases', mode => 'mode', including_tables => 'includingTables', excluding_tables => 'excludingTables', table_options => 'tableOptions', partition_idle_time => 'partitionIdleTime', compact_strategy => 'compact_strategy')

    -- Use indexed argument
    CALL [catalog.]sys.compact_database()

    CALL [catalog.]sys.compact_database('includingDatabases')

    CALL [catalog.]sys.compact_database('includingDatabases', 'mode')

    CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables')

    CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables')

    CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions')

    CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions', 'partitionIdleTime')

    CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions', 'partitionIdleTime', 'compact_strategy')

    To compact databases. Arguments:
  • includingDatabases: to specify databases. You can use regular expression.
  • mode: compact mode. "divided": start a sink for each table, detecting the new table requires restarting the job; "combined" (default): start a single combined sink for all tables, the new table will be automatically detected.
  • includingTables: to specify tables. You can use regular expression.
  • excludingTables: to specify tables that are not compacted. You can use regular expression.
  • tableOptions: additional dynamic options of the table.
  • partition_idle_time: this is used to do a full compaction for partition which had not received any new data for 'partition_idle_time'. And only these partitions will be compacted.
  • compact_strategy(optional): this determines how to pick files to be merged, the default is determined by the runtime execution mode. 'full' strategy only supports batch mode. All files will be selected for merging. 'minor' strategy: Pick the set of files that need to be merged based on specified conditions.
  • CALL sys.compact_database( including_databases => 'db1|db2', mode => 'combined', including_tables => 'table_.*', excluding_tables => 'ignore', table_options => 'sink.parallelism=4', compat_strategy => 'full')
    create_tag -- Use named argument
    -- based on the specified snapshot
    CALL [catalog.]sys.create_tag(`table` => 'identifier', tag => 'tagName', snapshot_id => snapshotId)
    -- based on the latest snapshot
    CALL [catalog.]sys.create_tag(`table` => 'identifier', snapshot_id => 'tagName')

    -- Use indexed argument
    -- based on the specified snapshot
    CALL [catalog.]sys.create_tag('identifier', 'tagName', snapshotId)
    -- based on the latest snapshot
    CALL [catalog.]sys.create_tag('identifier', 'tagName')
    To create a tag based on given snapshot. Arguments:
  • identifier: the target table identifier. Cannot be empty.
  • tagName: name of the new tag.
  • snapshotId (Long): id of the snapshot which the new tag is based on.
  • time_retained: The maximum time retained for newly created tags.
  • CALL sys.create_tag(`table` => 'default.T', tag => 'my_tag', snapshot_id => cast(10 as bigint), time_retained => '1 d')
    create_tag_from_timestamp -- Create a tag from the first snapshot whose commit-time greater than the specified timestamp.
    -- Use named argument
    CALL [catalog.]sys.create_tag_from_timestamp(`table` => 'identifier', tag => 'tagName', timestamp => timestamp, time_retained => time_retained)

    -- Use indexed argument
    CALL [catalog.]sys.create_tag_from_timestamp('identifier', 'tagName', timestamp, time_retained)
    To create a tag based on given timestamp. Arguments:
  • identifier: the target table identifier. Cannot be empty.
  • tag: name of the new tag.
  • timestamp (Long): Find the first snapshot whose commit-time greater than this timestamp.
  • time_retained : The maximum time retained for newly created tags.
  • -- for Flink 1.18
    CALL sys.create_tag_from_timestamp('default.T', 'my_tag', 1724404318750, '1 d') -- for Flink 1.19 and later
    CALL sys.create_tag_from_timestamp(`table` => 'default.T', `tag` => 'my_tag', `timestamp` => 1724404318750, time_retained => '1 d')
    create_tag_from_watermark -- Create a tag from the first snapshot whose watermark greater than the specified timestamp.
    -- Use named argument
    CALL [catalog.]sys.create_tag_from_watermark(`table` => 'identifier', tag => 'tagName', watermark => watermark, time_retained => time_retained)

    -- Use indexed argument
    CALL [catalog.]sys.create_tag_from_watermark('identifier', 'tagName', watermark, time_retained)
    To create a tag based on given watermark timestamp. Arguments:
  • identifier: the target table identifier. Cannot be empty.
  • tag: name of the new tag.
  • watermark (Long): Find the first snapshot whose watermark greater than the specified watermark.
  • time_retained : The maximum time retained for newly created tags.
  • -- for Flink 1.18
    CALL sys.create_tag_from_watermark('default.T', 'my_tag', 1724404318750, '1 d') -- for Flink 1.19 and later
    CALL sys.create_tag_from_watermark(`table` => 'default.T', `tag` => 'my_tag', `watermark` => 1724404318750, time_retained => '1 d')
    delete_tag -- Use named argument
    CALL [catalog.]sys.delete_tag(`table` => 'identifier', tag => 'tagName')

    -- Use indexed argument
    CALL [catalog.]sys.delete_tag('identifier', 'tagName')
    To delete a tag. Arguments:
  • identifier: the target table identifier. Cannot be empty.
  • tagName: name of the tag to be deleted. If you specify multiple tags, delimiter is ','.
  • CALL sys.delete_tag(`table` => 'default.T', tag => 'my_tag')
    replace_tag -- Use named argument
    -- replace tag with new time retained
    CALL [catalog.]sys.replace_tag(`table` => 'identifier', tag => 'tagName', time_retained => 'timeRetained')
    -- replace tag with new snapshot id and time retained
    CALL [catalog.]sys.replace_tag(`table` => 'identifier', snapshot_id => 'snapshotId')

    -- Use indexed argument
    -- replace tag with new snapshot id and time retained
    CALL [catalog.]sys.replace_tag('identifier', 'tagName', 'snapshotId', 'timeRetained')
    To replace an existing tag with new tag info. Arguments:
  • table: the target table identifier. Cannot be empty.
  • tag: name of the existed tag. Cannot be empty.
  • snapshot(Long): id of the snapshot which the tag is based on, it is optional.
  • time_retained: The maximum time retained for the existing tag, it is optional.
  • -- for Flink 1.18
    CALL sys.replace_tag('default.T', 'my_tag', 5, '1 d')

    -- for Flink 1.19 and later
    CALL sys.replace_tag(`table` => 'default.T', tag => 'my_tag', snapshot_id => 5, time_retained => '1 d')

    expire_tags CALL [catalog.]sys.expire_tags('identifier', 'older_than') To expire tags by time. Arguments:
  • identifier: the target table identifier. Cannot be empty.
  • older_than: tagCreateTime before which tags will be removed.
  • CALL sys.expire_tags(table => 'default.T', older_than => '2024-09-06 11:00:00')
    merge_into -- for Flink 1.18
    CALL [catalog].sys.merge_into('identifier','targetAlias',
    'sourceSqls','sourceTable','mergeCondition',
    'matchedUpsertCondition','matchedUpsertSetting',
    'notMatchedInsertCondition','notMatchedInsertValues',
    'matchedDeleteCondition')

    -- for Flink 1.19 and later
    CALL [catalog].sys.merge_into(
    target_table => 'identifier',
    target_alias => 'targetAlias',
    source_sqls => 'sourceSqls',
    source_table => 'sourceTable',
    merge_condition => 'mergeCondition',
    matched_upsert_condition => 'matchedUpsertCondition',
    matched_upsert_setting => 'matchedUpsertSetting',
    not_matched_insert_condition => 'notMatchedInsertCondition',
    not_matched_insert_values => 'notMatchedInsertValues',
    matched_delete_condition => 'matchedDeleteCondition',
    not_matched_by_source_upsert_condition => 'notMatchedBySourceUpsertCondition',
    not_matched_by_source_upsert_setting => 'notMatchedBySourceUpsertSetting',
    not_matched_by_source_delete_condition => 'notMatchedBySourceDeleteCondition')

    To perform "MERGE INTO" syntax. See merge_into action for details of arguments. -- for matched order rows,
    -- increase the price,
    -- and if there is no match,
    -- insert the order from
    -- the source table
    -- for Flink 1.18
    CALL [catalog].sys.merge_into('default.T','','','default.S','T.id=S.order_id','','price=T.price+20','','*','')

    -- for Flink 1.19 and later
    CALL sys.merge_into(
    target_table => 'default.T',
    source_table => 'default.S',
    merge_condition => 'T.id=S.order_id',
    matched_upsert_setting => 'price=T.price+20',
    not_matched_insert_values => '*')

    remove_orphan_files -- Use named argument
    CALL [catalog.]sys.remove_orphan_files(`table` => 'identifier', older_than => 'olderThan', dry_run => 'dryRun', mode => 'mode')

    -- Use indexed argument
    CALL [catalog.]sys.remove_orphan_files('identifier')

    CALL [catalog.]sys.remove_orphan_files('identifier', 'olderThan')

    CALL [catalog.]sys.remove_orphan_files('identifier', 'olderThan', 'dryRun')

    CALL [catalog.]sys.remove_orphan_files('identifier', 'olderThan', 'dryRun','parallelism')

    CALL [catalog.]sys.remove_orphan_files('identifier', 'olderThan', 'dryRun','parallelism','mode')
    To remove the orphan data files and metadata files. Arguments:
  • identifier: the target table identifier. Cannot be empty, you can use database_name.* to clean whole database.
  • olderThan: to avoid deleting newly written files, this procedure only deletes orphan files older than 1 day by default. This argument can modify the interval.
  • dryRun: when true, view only orphan files, don't actually remove files. Default is false.
  • parallelism: The maximum number of concurrent deleting files. By default is the number of processors available to the Java virtual machine.
  • mode: The mode of remove orphan clean procedure (local or distributed) . By default is distributed.
  • CALL remove_orphan_files(`table` => 'default.T', older_than => '2023-10-31 12:00:00')

    CALL remove_orphan_files(`table` => 'default.*', older_than => '2023-10-31 12:00:00')

    CALL remove_orphan_files(`table` => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true)

    CALL remove_orphan_files(`table` => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => false, parallelism => '5')

    CALL remove_orphan_files(`table` => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => false, parallelism => '5', mode => 'local')
    reset_consumer -- Use named argument
    CALL [catalog.]sys.reset_consumer(`table` => 'identifier', consumer_id => 'consumerId', next_snapshot_id => 'nextSnapshotId')

    -- Use indexed argument
    -- reset the new next snapshot id in the consumer
    CALL [catalog.]sys.reset_consumer('identifier', 'consumerId', nextSnapshotId)

    -- delete consumer
    CALL [catalog.]sys.reset_consumer('identifier', 'consumerId')
    To reset or delete consumer. Arguments:
  • identifier: the target table identifier. Cannot be empty.
  • consumerId: consumer to be reset or deleted.
  • nextSnapshotId (Long): the new next snapshot id of the consumer.
  • CALL sys.reset_consumer(`table` => 'default.T', consumer_id => 'myid', next_snapshot_id => cast(10 as bigint))
    rollback_to -- for Flink 1.18
    -- rollback to a snapshot
    CALL sys.rollback_to('identifier', snapshotId)

    -- rollback to a tag
    CALL sys.rollback_to('identifier', 'tagName')

    -- for Flink 1.19 and later
    -- rollback to a snapshot
    CALL sys.rollback_to(`table` => 'identifier', snapshot_id => snapshotId)

    -- rollback to a tag
    CALL sys.rollback_to(`table` => 'identifier', tag => 'tagName')
    To rollback to a specific version of target table. Argument:
  • identifier: the target table identifier. Cannot be empty.
  • snapshotId (Long): id of the snapshot that will roll back to.
  • tagName: name of the tag that will roll back to.
  • -- for Flink 1.18
    CALL sys.rollback_to('default.T', 10) -- for Flink 1.19 and later
    CALL sys.rollback_to(`table` => 'default.T', snapshot_id => 10)
    rollback_to_timestamp -- for Flink 1.18
    -- rollback to the snapshot which earlier or equal than timestamp.
    CALL sys.rollback_to_timestamp('identifier', timestamp)

    -- for Flink 1.19 and later
    -- rollback to the snapshot which earlier or equal than timestamp.
    CALL sys.rollback_to_timestamp(`table` => 'default.T', `timestamp` => timestamp)

    To rollback to the snapshot which earlier or equal than timestamp. Argument:
  • identifier: the target table identifier. Cannot be empty.
  • timestamp (Long): Roll back to the snapshot which earlier or equal than timestamp.
  • -- for Flink 1.18
    CALL sys.rollback_to_timestamp('default.T', 10) -- for Flink 1.19 and later
    CALL sys.rollback_to_timestamp(`table` => 'default.T', timestamp => 1730292023000)
    rollback_to_watermark -- for Flink 1.18
    -- rollback to the snapshot which earlier or equal than watermark.
    CALL sys.rollback_to_watermark('identifier', watermark)

    -- for Flink 1.19 and later
    -- rollback to the snapshot which earlier or equal than watermark.
    CALL sys.rollback_to_watermark(`table` => 'default.T', `watermark` => watermark)

    To rollback to the snapshot which earlier or equal than watermark. Argument:
  • identifier: the target table identifier. Cannot be empty.
  • watermark (Long): Roll back to the snapshot which earlier or equal than watermark.
  • -- for Flink 1.18
    CALL sys.rollback_to_watermark('default.T', 1730292023000) -- for Flink 1.19 and later
    CALL sys.rollback_to_watermark(`table` => 'default.T', watermark => 1730292023000)
    purge_files -- for Flink 1.18
    -- clear table with purge files directly.
    CALL sys.purge_files('identifier')

    -- for Flink 1.19 and later
    -- clear table with purge files directly.
    CALL sys.purge_files(`table` => 'default.T')

    To clear table with purge files directly. Argument:
  • identifier: the target table identifier. Cannot be empty.
  • -- for Flink 1.18
    CALL sys.purge_files('default.T') -- for Flink 1.19 and later
    CALL sys.purge_files(`table` => 'default.T')
    expire_snapshots -- Use named argument
    CALL [catalog.]sys.expire_snapshots(
    `table` => 'identifier',
    retain_max => 'retain_max',
    retain_min => 'retain_min',
    older_than => 'older_than',
    max_deletes => 'max_deletes')

    -- Use indexed argument
    -- for Flink 1.18
    CALL sys.expire_snapshots(table, retain_max)

    -- for Flink 1.19 and later
    CALL sys.expire_snapshots(table, retain_max, retain_min, older_than, max_deletes)

    To expire snapshots. Argument:
  • table: the target table identifier. Cannot be empty.
  • retain_max: the maximum number of completed snapshots to retain.
  • retain_min: the minimum number of completed snapshots to retain.
  • order_than: timestamp before which snapshots will be removed.
  • max_deletes: the maximum number of snapshots that can be deleted at once.
  • -- for Flink 1.18

    CALL sys.expire_snapshots('default.T', 2)

    -- for Flink 1.19 and later

    CALL sys.expire_snapshots(`table` => 'default.T', retain_max => 2)

    CALL sys.expire_snapshots(`table` => 'default.T', older_than => '2024-01-01 12:00:00')

    CALL sys.expire_snapshots(`table` => 'default.T', older_than => '2024-01-01 12:00:00', retain_min => 10)

    CALL sys.expire_snapshots(`table` => 'default.T', older_than => '2024-01-01 12:00:00', max_deletes => 10)

    expire_partitions CALL sys.expire_partitions(table, expiration_time, timestamp_formatter, expire_strategy)

    To expire partitions. Argument:
  • table: the target table identifier. Cannot be empty.
  • expiration_time: the expiration interval of a partition. A partition will be expired if it‘s lifetime is over this value. Partition time is extracted from the partition value.
  • timestamp_formatter: the formatter to format timestamp from string.
  • timestamp_pattern: the pattern to get a timestamp from partitions.
  • expire_strategy: specifies the expiration strategy for partition expiration, possible values: 'values-time' or 'update-time' , 'values-time' as default.
  • -- for Flink 1.18

    CALL sys.expire_partitions('default.T', '1 d', 'yyyy-MM-dd', '$dt', 'values-time')

    -- for Flink 1.19 and later

    CALL sys.expire_partitions(`table` => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd', expire_strategy => 'values-time')
    CALL sys.expire_partitions(`table` => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd HH:mm', timestamp_pattern => '$dt $hm', expire_strategy => 'values-time')

    repair -- repair all databases and tables in catalog
    CALL sys.repair()

    -- repair all tables in a specific database
    CALL sys.repair('databaseName')

    -- repair a table
    CALL sys.repair('databaseName.tableName')

    -- repair database and table in a string if you specify multiple tags, delimiter is ','
    CALL sys.repair('databaseName01,database02.tableName01,database03')
    Synchronize information from the file system to Metastore. Argument:
  • empty: all databases and tables in catalog.
  • databaseName : the target database name.
  • tableName: the target table identifier.
  • CALL sys.repair(`table` => 'test_db.T')
    rewrite_file_index -- Use named argument
    CALL sys.rewrite_file_index(<`table` => identifier> [, <partitions => partitions>])

    -- Use indexed argument
    CALL sys.rewrite_file_index(<identifier> [, <partitions>])

    Rewrite the file index for the table. Argument:
  • identifier: <databaseName>.<tableName>.
  • partitions : specific partitions.
  • -- rewrite the file index for the whole table
    CALL sys.rewrite_file_index(`table` => 'test_db.T')

    -- repair all tables in a specific partition
    CALL sys.rewrite_file_index(`table` => 'test_db.T', partitions => 'pt=a')

    create_branch -- Use named argument
    CALL [catalog.]sys.create_branch(`table` => 'identifier', branch => 'branchName', tag => 'tagName')

    -- Use indexed argument
    -- based on the specified tag
    CALL [catalog.]sys.create_branch('identifier', 'branchName', 'tagName') -- create empty branch
    CALL [catalog.]sys.create_branch('identifier', 'branchName')
    To create a branch based on given tag, or just create empty branch. Arguments:
  • identifier: the target table identifier. Cannot be empty.
  • branchName: name of the new branch.
  • tagName: name of the tag which the new branch is based on.
  • CALL sys.create_branch(`table` => 'default.T', branch => 'branch1', tag => 'tag1')

    CALL sys.create_branch(`table` => 'default.T', branch => 'branch1')

    delete_branch -- Use named argument
    CALL [catalog.]sys.delete_branch(`table` => 'identifier', branch => 'branchName')

    -- Use indexed argument
    CALL [catalog.]sys.delete_branch('identifier', 'branchName')
    To delete a branch. Arguments:
  • identifier: the target table identifier. Cannot be empty.
  • branchName: name of the branch to be deleted. If you specify multiple branches, delimiter is ','.
  • CALL sys.delete_branch(`table` => 'default.T', branch => 'branch1')
    fast_forward -- Use named argument
    CALL [catalog.]sys.fast_forward(`table` => 'identifier', branch => 'branchName')

    -- Use indexed argument
    CALL [catalog.]sys.fast_forward('identifier', 'branchName')
    To fast_forward a branch to main branch. Arguments:
  • identifier: the target table identifier. Cannot be empty.
  • branchName: name of the branch to be merged.
  • CALL sys.fast_forward(`table` => 'default.T', branch => 'branch1')
    Edit This Page
    Copyright © 2024 The Apache Software Foundation. Apache Paimon, Paimon, and its feather logo are trademarks of The Apache Software Foundation.