This documentation is for an unreleased version of Apache Paimon. We recommend you use the latest stable version.
Procedures
Procedures #
This section introduce all available spark procedures about paimon.
Procedure Name | Explanation | Example | |
---|---|---|---|
compact |
To compact files. Argument:
|
SET spark.sql.shuffle.partitions=10; --set the compact parallelism CALL sys.compact(table => 'T', partitions => 'p=0;p=1', order_strategy => 'zorder', order_by => 'a,b') CALL sys.compact(table => 'T', where => 'p>0 and p<3', order_strategy => 'zorder', order_by => 'a,b') CALL sys.compact(table => 'T', where => 'dt>10 and h<20', order_strategy => 'zorder', order_by => 'a,b', options => 'sink.parallelism=4') CALL sys.compact(table => 'T', partition_idle_time => '60s') CALL sys.compact(table => 'T', compact_strategy => 'minor') |
|
expire_snapshots |
To expire snapshots. Argument:
|
CALL sys.expire_snapshots(table => 'default.T', retain_max => 10, options => 'snapshot.expire.limit=1') | |
expire_partitions |
To expire partitions. Argument:
|
CALL sys.expire_partitions(table => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd', timestamp_pattern => '$dt', expire_strategy => 'values-time', options => 'partition.expiration-max-num=2') | |
create_tag |
To create a tag based on given snapshot. Arguments:
|
-- based on snapshot 10 with 1d CALL sys.create_tag(table => 'default.T', tag => 'my_tag', snapshot => 10, time_retained => '1 d') -- based on the latest snapshot CALL sys.create_tag(table => 'default.T', tag => 'my_tag') |
|
create_tag_from_timestamp |
To create a tag based on given timestamp. Arguments:
|
CALL sys.create_tag_from_timestamp(`table` => 'default.T', `tag` => 'my_tag', `timestamp` => 1724404318750, time_retained => '1 d') | |
rename_tag |
Rename a tag with a new tag name. Arguments:
|
CALL sys.rename_tag(table => 'default.T', tag => 'tag1', target_tag => 'tag2') | |
replace_tag |
Replace an existing tag with new tag info. Arguments:
|
CALL sys.replace_tag(table => 'default.T', tag_name => 'tag1', snapshot => 10, time_retained => '1 d') | |
delete_tag |
To delete a tag. Arguments:
|
CALL sys.delete_tag(table => 'default.T', tag => 'my_tag') | |
expire_tags |
To expire tags by time. Arguments:
|
CALL sys.expire_tags(table => 'default.T', older_than => '2024-09-06 11:00:00') | |
rollback |
To rollback to a specific version of target table, note version/snapshot/tag must set one of them. Argument:
|
CALL sys.rollback(table => 'default.T', version => 'my_tag') CALL sys.rollback(table => 'default.T', version => 10) CALL sys.rollback(table => 'default.T', tag => 'tag1') CALL sys.rollback(table => 'default.T', snapshot => 2) |
|
rollback_to_timestamp |
To rollback to the snapshot which earlier or equal than timestamp. Argument:
|
CALL sys.rollback_to_timestamp(table => 'default.T', timestamp => 1730292023000) |
|
rollback_to_watermark |
To rollback to the snapshot which earlier or equal than watermark. Argument:
|
CALL sys.rollback_to_watermark(table => 'default.T', watermark => 1730292023000) |
|
purge_files |
To clear table with purge files. Argument:
|
CALL sys.purge_files(table => 'default.T') |
|
migrate_database |
Migrate all hive tables in database to paimon tables. Arguments:
|
CALL sys.migrate_database(source_type => 'hive', database => 'db01', options => 'file.format=parquet', options_map => map('k1','v1'), parallelism => 6) | |
migrate_table |
Migrate hive table to a paimon table. Arguments:
|
CALL sys.migrate_table(source_type => 'hive', table => 'default.T', options => 'file.format=parquet', options_map => map('k1','v1'), parallelism => 6) | |
remove_orphan_files |
To remove the orphan data files and metadata files. Arguments:
|
CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00') CALL sys.remove_orphan_files(table => 'default.*', older_than => '2023-10-31 12:00:00') CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true) CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true, parallelism => '5') CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true, parallelism => '5', mode => 'local') |
|
remove_unexisting_files |
Procedure to remove unexisting data files from manifest entries. See Java docs for detailed use cases. Arguments:
Note that user is on his own risk using this procedure, which may cause data loss when used outside from the use cases listed in Java docs. |
-- remove unexisting data files in the table `mydb.myt` CALL sys.remove_unexisting_files(table => 'mydb.myt') -- only check what files will be removed, but not really remove them (dry run) CALL sys.remove_unexisting_files(table => 'mydb.myt', dry_run = true) |
|
repair |
Synchronize information from the file system to Metastore. Argument:
|
CALL sys.repair('test_db.T') CALL sys.repair('test_db.T,test_db01,test_db.T2') |
|
create_branch |
To merge a branch to main branch. Arguments:
|
CALL sys.create_branch(table => 'test_db.T', branch => 'test_branch') CALL sys.create_branch(table => 'test_db.T', branch => 'test_branch', tag => 'my_tag') CALL sys.create_branch(table => 'test_db.T$branch_existBranchName', branch => 'test_branch', tag => 'my_tag') |
|
delete_branch |
To merge a branch to main branch. Arguments:
|
CALL sys.delete_branch(table => 'test_db.T', branch => 'test_branch') | |
fast_forward |
To fast_forward a branch to main branch. Arguments:
|
CALL sys.fast_forward(table => 'test_db.T', branch => 'test_branch') | |
reset_consumer |
To reset or delete consumer. Arguments:
|
-- reset the new next snapshot id in the consumer CALL sys.reset_consumer(table => 'default.T', consumerId => 'myid', nextSnapshotId => 10) -- delete consumer CALL sys.reset_consumer(table => 'default.T', consumerId => 'myid') |
|
clear_consumers |
To clear consumers. Arguments:
|
-- clear all consumers in the table CALL sys.clear_consumers(table => 'default.T') -- clear some consumers in the table (accept regular expression) CALL sys.clear_consumers(table => 'default.T', includingConsumers => 'myid.*') -- clear all consumers except excludingConsumers in the table (accept regular expression) CALL sys.clear_consumers(table => 'default.T', includingConsumers => '', excludingConsumers => 'myid1.*') -- clear all consumers with includingConsumers and excludingConsumers (accept regular expression) CALL sys.clear_consumers(table => 'default.T', includingConsumers => 'myid.*', excludingConsumers => 'myid1.*') |
|
mark_partition_done |
To mark partition to be done. Arguments:
|
-- mark single partition done CALL sys.mark_partition_done(table => 'default.T', parititions => 'day=2024-07-01') -- mark multiple partitions done CALL sys.mark_partition_done(table => 'default.T', parititions => 'day=2024-07-01;day=2024-07-02') |
|
refresh_object_table |
To refresh_object_table a object table. Arguments:
|
CALL sys.refresh_object_table('default.T') | |
compact_manifest |
To compact_manifest the manifests. Arguments:
|
CALL sys.compact_manifest(`table` => 'default.T') | |
alter_view_dialect |
To alter view dialect. Arguments:
|
-- add dialect in the view CALL sys.alter_view_dialect('view_identifier', 'add', 'spark', 'query') CALL sys.alter_view_dialect(`view` => 'view_identifier', `action` => 'add', `query` => 'query') -- update dialect in the view CALL sys.alter_view_dialect('view_identifier', 'update', 'spark', 'query') CALL sys.alter_view_dialect(`view` => 'view_identifier', `action` => 'update', `query` => 'query') -- drop dialect in the view CALL sys.alter_view_dialect('view_identifier', 'drop', 'spark') CALL sys.alter_view_dialect(`view` => 'view_identifier', `action` => 'drop') |
|
create_function |
CALL sys.create_function( 'function_identifier', '[{"id": 0, "name":"length", "type":"INT"}, {"id": 1, "name":"width", "type":"INT"}]', '[{"id": 0, "name":"area", "type":"BIGINT"}]', true, 'comment', 'k1=v1,k2=v2') |
To create a function. Arguments:
|
CALL sys.create_function(`function` => 'function_identifier', `inputParams` => '[{"id": 0, "name":"length", "type":"INT"}, {"id": 1, "name":"width", "type":"INT"}]', `returnParams` => '[{"id": 0, "name":"area", "type":"BIGINT"}]', `deterministic` => true, `comment` => 'comment', `options` => 'k1=v1,k2=v2' ) |
alter_function |
CALL sys.alter_function( 'function_identifier', '{"action" : "addDefinition", "name" : "spark", "definition" : {"type" : "lambda", "definition" : "(Integer length, Integer width) -> { return (long) length * width; }", "language": "JAVA" } }') |
To alter a function. Arguments:
|
CALL sys.alter_function(`function` => 'function_identifier', `change` => '{"action" : "addDefinition", "name" : "spark", "definition" : {"type" : "lambda", "definition" : "(Integer length, Integer width) -> { return (long) length * width; }", "language": "JAVA" } }' ) |
drop_function |
CALL [catalog.]sys.drop_function('function_identifier') |
To drop a function. Arguments:
|
CALL sys.drop_function(`function` => 'function_identifier') |