This documentation is for an unreleased version of Apache Paimon. We recommend you use the latest stable version.
Procedures
Procedures #
This section introduce all available spark procedures about paimon.
Procedure Name | Explanation | Example |
---|---|---|
compact |
To compact files. Argument:
";" means "OR".If you want to compact one partition with date=01 and day=01, you need to write 'date=01,day=01'. Left empty for all partitions. (Can't be used together with "where") |
SET spark.sql.shuffle.partitions=10; --set the compact parallelism CALL sys.compact(table => 'T', partitions => 'p=0;p=1', order_strategy => 'zorder', order_by => 'a,b') CALL sys.compact(table => 'T', where => 'p>0 and p<3', order_strategy => 'zorder', order_by => 'a,b') CALL sys.compact(table => 'T', partition_idle_time => '60s') CALL sys.compact(table => 'T', compact_strategy => 'minor') |
expire_snapshots |
To expire snapshots. Argument:
|
CALL sys.expire_snapshots(table => 'default.T', retain_max => 10) |
expire_partitions |
To expire partitions. Argument:
|
CALL sys.expire_partitions(table => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd', timestamp_pattern => '$dt', expire_strategy => 'values-time') |
create_tag |
To create a tag based on given snapshot. Arguments:
|
-- based on snapshot 10 with 1d CALL sys.create_tag(table => 'default.T', tag => 'my_tag', snapshot => 10, time_retained => '1 d') -- based on the latest snapshot CALL sys.create_tag(table => 'default.T', tag => 'my_tag') |
create_tag_from_timestamp |
To create a tag based on given timestamp. Arguments:
|
CALL sys.create_tag_from_timestamp(`table` => 'default.T', `tag` => 'my_tag', `timestamp` => 1724404318750, time_retained => '1 d') |
rename_tag |
Rename a tag with a new tag name. Arguments:
|
CALL sys.rename_tag(table => 'default.T', tag_name => 'tag1', target_tag_name => 'tag2') |
replace_tag |
Replace an existing tag with new tag info. Arguments:
|
CALL sys.replace_tag(table => 'default.T', tag_name => 'tag1', snapshot => 10, time_retained => '1 d') |
delete_tag |
To delete a tag. Arguments:
|
CALL sys.delete_tag(table => 'default.T', tag => 'my_tag') |
expire_tags |
To expire tags by time. Arguments:
|
CALL sys.expire_tags(table => 'default.T', older_than => '2024-09-06 11:00:00') |
rollback |
To rollback to a specific version of target table. Argument:
|
CALL sys.rollback(table => 'default.T', version => 'my_tag') CALL sys.rollback(table => 'default.T', version => 10) |
rollback_to_timestamp |
To rollback to the snapshot which earlier or equal than timestamp. Argument:
|
CALL sys.rollback_to_timestamp(table => 'default.T', timestamp => 1730292023000) |
rollback_to_watermark |
To rollback to the snapshot which earlier or equal than watermark. Argument:
|
CALL sys.rollback_to_watermark(table => 'default.T', watermark => 1730292023000) |
purge_files |
To clear table with purge files directly. Argument:
|
CALL sys.purge_files(table => 'default.T') |
migrate_database |
Migrate hive table to a paimon table. Arguments:
|
CALL sys.migrate_database(source_type => 'hive', database => 'db01', options => 'file.format=parquet', options_map => map('k1','v1'), parallelism => 6) |
migrate_table |
Migrate hive table to a paimon table. Arguments:
|
CALL sys.migrate_table(source_type => 'hive', table => 'default.T', options => 'file.format=parquet', options_map => map('k1','v1'), parallelism => 6) |
migrate_file |
Migrate from hive table to a paimon table. Arguments:
|
CALL sys.migrate_file(source_type => 'hive', table => 'default.T', delete_origin => true, parallelism => 6) |
remove_orphan_files |
To remove the orphan data files and metadata files. Arguments:
|
CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00') CALL sys.remove_orphan_files(table => 'default.*', older_than => '2023-10-31 12:00:00') CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true) CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true, parallelism => '5') CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true, parallelism => '5', mode => 'local') |
remove_unexisting_files |
Procedure to remove unexisting data files from manifest entries. See Java docs for detailed use cases. Arguments:
Note that user is on his own risk using this procedure, which may cause data loss when used outside from the use cases listed in Java docs. |
-- remove unexisting data files in the table `mydb.myt`
CALL sys.remove_unexisting_files(table => 'mydb.myt')
-- only check what files will be removed, but not really remove them (dry run) CALL sys.remove_unexisting_files(table => 'mydb.myt', dry_run = true) |
repair |
Synchronize information from the file system to Metastore. Argument:
|
CALL sys.repair('test_db.T') CALL sys.repair('test_db.T,test_db01,test_db.T2') |
create_branch |
To merge a branch to main branch. Arguments:
|
CALL sys.create_branch(table => 'test_db.T', branch => 'test_branch') CALL sys.create_branch(table => 'test_db.T', branch => 'test_branch', tag => 'my_tag') |
delete_branch |
To merge a branch to main branch. Arguments:
|
CALL sys.delete_branch(table => 'test_db.T', branch => 'test_branch') |
fast_forward |
To fast_forward a branch to main branch. Arguments:
|
CALL sys.fast_forward(table => 'test_db.T', branch => 'test_branch') |
reset_consumer |
To reset or delete consumer. Arguments:
|
-- reset the new next snapshot id in the consumer CALL sys.reset_consumer(table => 'default.T', consumerId => 'myid', nextSnapshotId => 10) -- delete consumer CALL sys.reset_consumer(table => 'default.T', consumerId => 'myid') |
mark_partition_done |
To mark partition to be done. Arguments:
|
-- mark single partition done CALL sys.mark_partition_done(table => 'default.T', parititions => 'day=2024-07-01') -- mark multiple partitions done CALL sys.mark_partition_done(table => 'default.T', parititions => 'day=2024-07-01;day=2024-07-02') |