Procedures
This documentation is for an unreleased version of Apache Paimon. We recommend you use the latest stable version.

Procedures #

This section introduce all available spark procedures about paimon.

Procedure Name Explanation Example
compact To compact files. Argument:
  • table: the target table identifier. Cannot be empty.
  • partitions: partition filter. "," means "AND"
    ";" means "OR".If you want to compact one partition with date=01 and day=01, you need to write 'date=01,day=01'. Left empty for all partitions. (Can't be used together with "where")
  • where: partition predicate. Left empty for all partitions. (Can't be used together with "partitions")
  • order_strategy: 'order' or 'zorder' or 'hilbert' or 'none'. Left empty for 'none'.
  • order_columns: the columns need to be sort. Left empty if 'order_strategy' is 'none'.
  • partition_idle_time: this is used to do a full compaction for partition which had not received any new data for 'partition_idle_time'. And only these partitions will be compacted. This argument can not be used with order compact.
  • SET spark.sql.shuffle.partitions=10; --set the compact parallelism

    CALL sys.compact(table => 'T', partitions => 'p=0;p=1', order_strategy => 'zorder', order_by => 'a,b')

    CALL sys.compact(table => 'T', where => 'p>0 and p<3', order_strategy => 'zorder', order_by => 'a,b')

    CALL sys.compact(table => 'T', partition_idle_time => '60s')
    expire_snapshots To expire snapshots. Argument:
  • table: the target table identifier. Cannot be empty.
  • retain_max: the maximum number of completed snapshots to retain.
  • retain_min: the minimum number of completed snapshots to retain.
  • older_than: timestamp before which snapshots will be removed.
  • max_deletes: the maximum number of snapshots that can be deleted at once.
  • CALL sys.expire_snapshots(table => 'default.T', retain_max => 10)
    expire_partitions To expire partitions. Argument:
  • table: the target table identifier. Cannot be empty.
  • expiration_time: the expiration interval of a partition. A partition will be expired if it‘s lifetime is over this value. Partition time is extracted from the partition value.
  • timestamp_formatter: the formatter to format timestamp from string.
  • timestamp_pattern: the pattern to get a timestamp from partitions.
  • expire_strategy: specifies the expiration strategy for partition expiration, possible values: 'values-time' or 'update-time' , 'values-time' as default.
  • CALL sys.expire_partitions(table => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd', timestamp_pattern => '$dt', expire_strategy => 'values-time')
    create_tag To create a tag based on given snapshot. Arguments:
  • table: the target table identifier. Cannot be empty.
  • tag: name of the new tag. Cannot be empty.
  • snapshot(Long): id of the snapshot which the new tag is based on.
  • time_retained: The maximum time retained for newly created tags.
  • -- based on snapshot 10 with 1d
    CALL sys.create_tag(table => 'default.T', tag => 'my_tag', snapshot => 10, time_retained => '1 d')

    -- based on the latest snapshot
    CALL sys.create_tag(table => 'default.T', tag => 'my_tag')
    create_tag_from_timestamp To create a tag based on given timestamp. Arguments:
  • identifier: the target table identifier. Cannot be empty.
  • tag: name of the new tag.
  • timestamp (Long): Find the first snapshot whose commit-time is greater than this timestamp.
  • time_retained : The maximum time retained for newly created tags.
  • CALL sys.create_tag_from_timestamp(`table` => 'default.T', `tag` => 'my_tag', `timestamp` => 1724404318750, time_retained => '1 d')
    rename_tag Rename a tag with a new tag name. Arguments:
  • table: the target table identifier. Cannot be empty.
  • tag_name: name of the tag. Cannot be empty.
  • target_tag_name: the new tag name to rename. Cannot be empty.
  • CALL sys.rename_tag(table => 'default.T', tag_name => 'tag1', target_tag_name => 'tag2')
    replace_tag Replace an existing tag with new tag info. Arguments:
  • table: the target table identifier. Cannot be empty.
  • tag: name of the existed tag. Cannot be empty.
  • snapshot(Long): id of the snapshot which the tag is based on, it is optional.
  • time_retained: The maximum time retained for the existing tag, it is optional.
  • CALL sys.replace_tag(table => 'default.T', tag_name => 'tag1', snapshot => 10, time_retained => '1 d')
    delete_tag To delete a tag. Arguments:
  • table: the target table identifier. Cannot be empty.
  • tag: name of the tag to be deleted. If you specify multiple tags, delimiter is ','.
  • CALL sys.delete_tag(table => 'default.T', tag => 'my_tag')
    expire_tags To expire tags by time. Arguments:
  • table: the target table identifier. Cannot be empty.
  • older_than: tagCreateTime before which tags will be removed.
  • CALL sys.expire_tags(table => 'default.T', older_than => '2024-09-06 11:00:00')
    rollback To rollback to a specific version of target table. Argument:
  • table: the target table identifier. Cannot be empty.
  • version: id of the snapshot or name of tag that will roll back to.
  • CALL sys.rollback(table => 'default.T', version => 'my_tag')

    CALL sys.rollback(table => 'default.T', version => 10)
    rollback_to_timestamp To rollback to the snapshot which earlier or equal than timestamp. Argument:
  • table: the target table identifier. Cannot be empty.
  • timestamp: roll back to the snapshot which earlier or equal than timestamp.
  • CALL sys.rollback_to_timestamp(table => 'default.T', timestamp => 1730292023000)

    migrate_database Migrate hive table to a paimon table. Arguments:
  • source_type: the origin table's type to be migrated, such as hive. Cannot be empty.
  • database: name of the origin database to be migrated. Cannot be empty.
  • options: the table options of the paimon table to migrate.
  • options_map: Options map for adding key-value options which is a map.
  • parallelism: the parallelism for migrate process, default is core numbers of machine.
  • CALL sys.migrate_database(source_type => 'hive', database => 'db01', options => 'file.format=parquet', options_map => map('k1','v1'), parallelism => 6)
    migrate_table Migrate hive table to a paimon table. Arguments:
  • source_type: the origin table's type to be migrated, such as hive. Cannot be empty.
  • table: name of the origin table to be migrated. Cannot be empty.
  • options: the table options of the paimon table to migrate.
  • target_table: name of the target paimon table to migrate. If not set would keep the same name with origin table
  • delete_origin: If had set target_table, can set delete_origin to decide whether delete the origin table metadata from hms after migrate. Default is true
  • options_map: Options map for adding key-value options which is a map.
  • parallelism: the parallelism for migrate process, default is core numbers of machine.
  • CALL sys.migrate_table(source_type => 'hive', table => 'default.T', options => 'file.format=parquet', options_map => map('k1','v1'), parallelism => 6)
    migrate_file Migrate from hive table to a paimon table. Arguments:
  • source_type: the origin table's type to be migrated, such as hive. Cannot be empty.
  • source_table: name of the origin table to migrate. Cannot be empty.
  • target_table: name of the target table to be migrated. Cannot be empty.
  • delete_origin: If had set target_table, can set delete_origin to decide whether delete the origin table metadata from hms after migrate. Default is true
  • parallelism: the parallelism for migrate process, default is core numbers of machine.
  • CALL sys.migrate_file(source_type => 'hive', table => 'default.T', delete_origin => true, parallelism => 6)
    remove_orphan_files To remove the orphan data files and metadata files. Arguments:
  • table: the target table identifier. Cannot be empty, you can use database_name.* to clean whole database.
  • older_than: to avoid deleting newly written files, this procedure only deletes orphan files older than 1 day by default. This argument can modify the interval.
  • dry_run: when true, view only orphan files, don't actually remove files. Default is false.
  • parallelism: The maximum number of concurrent deleting files. By default is the number of processors available to the Java virtual machine.
  • mode: The mode of remove orphan clean procedure (local or distributed) . By default is distributed.
  • CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00')

    CALL sys.remove_orphan_files(table => 'default.*', older_than => '2023-10-31 12:00:00')

    CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true)

    CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true, parallelism => '5')

    CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true, parallelism => '5', mode => 'local')
    repair Synchronize information from the file system to Metastore. Argument:
  • database_or_table: empty or the target database name or the target table identifier, if you specify multiple tags, delimiter is ','
  • CALL sys.repair('test_db.T')

    CALL sys.repair('test_db.T,test_db01,test_db.T2')
    create_branch To merge a branch to main branch. Arguments:
  • table: the target table identifier. Cannot be empty.
  • branch: name of the branch to be merged.
  • tag: name of the new tag. Cannot be empty.
  • CALL sys.create_branch(table => 'test_db.T', branch => 'test_branch')

    CALL sys.create_branch(table => 'test_db.T', branch => 'test_branch', tag => 'my_tag')

    delete_branch To merge a branch to main branch. Arguments:
  • table: the target table identifier. Cannot be empty.
  • branch: name of the branch to be merged. If you specify multiple branches, delimiter is ','.
  • CALL sys.delete_branch(table => 'test_db.T', branch => 'test_branch')
    fast_forward To fast_forward a branch to main branch. Arguments:
  • table: the target table identifier. Cannot be empty.
  • branch: name of the branch to be merged.
  • CALL sys.fast_forward(table => 'test_db.T', branch => 'test_branch')
    reset_consumer To reset or delete consumer. Arguments:
  • identifier: the target table identifier. Cannot be empty.
  • consumerId: consumer to be reset or deleted.
  • nextSnapshotId (Long): the new next snapshot id of the consumer.
  • -- reset the new next snapshot id in the consumer
    CALL sys.reset_consumer(table => 'default.T', consumerId => 'myid', nextSnapshotId => 10)

    -- delete consumer
    CALL sys.reset_consumer(table => 'default.T', consumerId => 'myid')
    mark_partition_done To mark partition to be done. Arguments:
  • table: the target table identifier. Cannot be empty.
  • partitions: partitions need to be mark done, If you specify multiple partitions, delimiter is ';'.
  • -- mark single partition done
    CALL sys.mark_partition_done(table => 'default.T', parititions => 'day=2024-07-01')

    -- mark multiple partitions done
    CALL sys.mark_partition_done(table => 'default.T', parititions => 'day=2024-07-01;day=2024-07-02')
    Edit This Page
    Copyright © 2024 The Apache Software Foundation. Apache Paimon, Paimon, and its feather logo are trademarks of The Apache Software Foundation.