Procedures
This documentation is for an unreleased version of Apache Paimon. We recommend you use the latest stable version.

Procedures #

This section introduce all available spark procedures about paimon.

Procedure Name Explanation Example
compact To compact files. Argument:
  • table: the target table identifier. Cannot be empty.
  • partitions: partition filter. "," means "AND"
    ";" means "OR".If you want to compact one partition with date=01 and day=01, you need to write 'date=01,day=01'. Left empty for all partitions. (Can't be used together with "where")
  • where: partition predicate. Left empty for all partitions. (Can't be used together with "partitions")
  • order_strategy: 'order' or 'zorder' or 'hilbert' or 'none'. Left empty for 'none'.
  • order_columns: the columns need to be sort. Left empty if 'order_strategy' is 'none'.
  • partition_idle_time: this is used to do a full compaction for partition which had not received any new data for 'partition_idle_time'. And only these partitions will be compacted. This argument can not be used with order compact.
  • compact_strategy: this determines how to pick files to be merged, the default is determined by the runtime execution mode. 'full' strategy only supports batch mode. All files will be selected for merging. 'minor' strategy: Pick the set of files that need to be merged based on specified conditions.
  • SET spark.sql.shuffle.partitions=10; --set the compact parallelism

    CALL sys.compact(table => 'T', partitions => 'p=0;p=1', order_strategy => 'zorder', order_by => 'a,b')

    CALL sys.compact(table => 'T', where => 'p>0 and p<3', order_strategy => 'zorder', order_by => 'a,b')

    CALL sys.compact(table => 'T', partition_idle_time => '60s')

    CALL sys.compact(table => 'T', compact_strategy => 'minor')

    expire_snapshots To expire snapshots. Argument:
  • table: the target table identifier. Cannot be empty.
  • retain_max: the maximum number of completed snapshots to retain.
  • retain_min: the minimum number of completed snapshots to retain.
  • older_than: timestamp before which snapshots will be removed.
  • max_deletes: the maximum number of snapshots that can be deleted at once.
  • CALL sys.expire_snapshots(table => 'default.T', retain_max => 10)
    expire_partitions To expire partitions. Argument:
  • table: the target table identifier. Cannot be empty.
  • expiration_time: the expiration interval of a partition. A partition will be expired if it‘s lifetime is over this value. Partition time is extracted from the partition value.
  • timestamp_formatter: the formatter to format timestamp from string.
  • timestamp_pattern: the pattern to get a timestamp from partitions.
  • expire_strategy: specifies the expiration strategy for partition expiration, possible values: 'values-time' or 'update-time' , 'values-time' as default.
  • CALL sys.expire_partitions(table => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd', timestamp_pattern => '$dt', expire_strategy => 'values-time')
    create_tag To create a tag based on given snapshot. Arguments:
  • table: the target table identifier. Cannot be empty.
  • tag: name of the new tag. Cannot be empty.
  • snapshot(Long): id of the snapshot which the new tag is based on.
  • time_retained: The maximum time retained for newly created tags.
  • -- based on snapshot 10 with 1d
    CALL sys.create_tag(table => 'default.T', tag => 'my_tag', snapshot => 10, time_retained => '1 d')

    -- based on the latest snapshot
    CALL sys.create_tag(table => 'default.T', tag => 'my_tag')
    create_tag_from_timestamp To create a tag based on given timestamp. Arguments:
  • identifier: the target table identifier. Cannot be empty.
  • tag: name of the new tag.
  • timestamp (Long): Find the first snapshot whose commit-time is greater than this timestamp.
  • time_retained : The maximum time retained for newly created tags.
  • CALL sys.create_tag_from_timestamp(`table` => 'default.T', `tag` => 'my_tag', `timestamp` => 1724404318750, time_retained => '1 d')
    rename_tag Rename a tag with a new tag name. Arguments:
  • table: the target table identifier. Cannot be empty.
  • tag_name: name of the tag. Cannot be empty.
  • target_tag_name: the new tag name to rename. Cannot be empty.
  • CALL sys.rename_tag(table => 'default.T', tag_name => 'tag1', target_tag_name => 'tag2')
    replace_tag Replace an existing tag with new tag info. Arguments:
  • table: the target table identifier. Cannot be empty.
  • tag: name of the existed tag. Cannot be empty.
  • snapshot(Long): id of the snapshot which the tag is based on, it is optional.
  • time_retained: The maximum time retained for the existing tag, it is optional.
  • CALL sys.replace_tag(table => 'default.T', tag_name => 'tag1', snapshot => 10, time_retained => '1 d')
    delete_tag To delete a tag. Arguments:
  • table: the target table identifier. Cannot be empty.
  • tag: name of the tag to be deleted. If you specify multiple tags, delimiter is ','.
  • CALL sys.delete_tag(table => 'default.T', tag => 'my_tag')
    expire_tags To expire tags by time. Arguments:
  • table: the target table identifier. Cannot be empty.
  • older_than: tagCreateTime before which tags will be removed.
  • CALL sys.expire_tags(table => 'default.T', older_than => '2024-09-06 11:00:00')
    rollback To rollback to a specific version of target table. Argument:
  • table: the target table identifier. Cannot be empty.
  • version: id of the snapshot or name of tag that will roll back to.
  • CALL sys.rollback(table => 'default.T', version => 'my_tag')

    CALL sys.rollback(table => 'default.T', version => 10)
    rollback_to_timestamp To rollback to the snapshot which earlier or equal than timestamp. Argument:
  • table: the target table identifier. Cannot be empty.
  • timestamp: roll back to the snapshot which earlier or equal than timestamp.
  • CALL sys.rollback_to_timestamp(table => 'default.T', timestamp => 1730292023000)

    rollback_to_watermark To rollback to the snapshot which earlier or equal than watermark. Argument:
  • table: the target table identifier. Cannot be empty.
  • watermark: roll back to the snapshot which earlier or equal than watermark.
  • CALL sys.rollback_to_watermark(table => 'default.T', watermark => 1730292023000)

    purge_files To clear table with purge files directly. Argument:
  • table: the target table identifier. Cannot be empty.
  • CALL sys.purge_files(table => 'default.T')

    migrate_database Migrate hive table to a paimon table. Arguments:
  • source_type: the origin table's type to be migrated, such as hive. Cannot be empty.
  • database: name of the origin database to be migrated. Cannot be empty.
  • options: the table options of the paimon table to migrate.
  • options_map: Options map for adding key-value options which is a map.
  • parallelism: the parallelism for migrate process, default is core numbers of machine.
  • CALL sys.migrate_database(source_type => 'hive', database => 'db01', options => 'file.format=parquet', options_map => map('k1','v1'), parallelism => 6)
    migrate_table Migrate hive table to a paimon table. Arguments:
  • source_type: the origin table's type to be migrated, such as hive. Cannot be empty.
  • table: name of the origin table to be migrated. Cannot be empty.
  • options: the table options of the paimon table to migrate.
  • target_table: name of the target paimon table to migrate. If not set would keep the same name with origin table
  • delete_origin: If had set target_table, can set delete_origin to decide whether delete the origin table metadata from hms after migrate. Default is true
  • options_map: Options map for adding key-value options which is a map.
  • parallelism: the parallelism for migrate process, default is core numbers of machine.
  • CALL sys.migrate_table(source_type => 'hive', table => 'default.T', options => 'file.format=parquet', options_map => map('k1','v1'), parallelism => 6)
    migrate_file Migrate from hive table to a paimon table. Arguments:
  • source_type: the origin table's type to be migrated, such as hive. Cannot be empty.
  • source_table: name of the origin table to migrate. Cannot be empty.
  • target_table: name of the target table to be migrated. Cannot be empty.
  • delete_origin: If had set target_table, can set delete_origin to decide whether delete the origin table metadata from hms after migrate. Default is true
  • parallelism: the parallelism for migrate process, default is core numbers of machine.
  • CALL sys.migrate_file(source_type => 'hive', table => 'default.T', delete_origin => true, parallelism => 6)
    remove_orphan_files To remove the orphan data files and metadata files. Arguments:
  • table: the target table identifier. Cannot be empty, you can use database_name.* to clean whole database.
  • older_than: to avoid deleting newly written files, this procedure only deletes orphan files older than 1 day by default. This argument can modify the interval.
  • dry_run: when true, view only orphan files, don't actually remove files. Default is false.
  • parallelism: The maximum number of concurrent deleting files. By default is the number of processors available to the Java virtual machine.
  • mode: The mode of remove orphan clean procedure (local or distributed) . By default is distributed.
  • CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00')

    CALL sys.remove_orphan_files(table => 'default.*', older_than => '2023-10-31 12:00:00')

    CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true)

    CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true, parallelism => '5')

    CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true, parallelism => '5', mode => 'local')
    remove_unexisting_files Procedure to remove unexisting data files from manifest entries. See Java docs for detailed use cases. Arguments:
  • identifier: the target table identifier. Cannot be empty, you can use database_name.* to clean whole database.
  • dryRun (optional): only check what files will be removed, but not really remove them. Default is false.
  • parallelism (optional): number of parallelisms to check files in the manifests.

  • Note that user is on his own risk using this procedure, which may cause data loss when used outside from the use cases listed in Java docs.
    -- remove unexisting data files in the table `mydb.myt` CALL sys.remove_unexisting_files(table => 'mydb.myt')
    -- only check what files will be removed, but not really remove them (dry run) CALL sys.remove_unexisting_files(table => 'mydb.myt', dry_run = true)
    repair Synchronize information from the file system to Metastore. Argument:
  • database_or_table: empty or the target database name or the target table identifier, if you specify multiple tags, delimiter is ','
  • CALL sys.repair('test_db.T')

    CALL sys.repair('test_db.T,test_db01,test_db.T2')
    create_branch To merge a branch to main branch. Arguments:
  • table: the target table identifier. Cannot be empty.
  • branch: name of the branch to be merged.
  • tag: name of the new tag. Cannot be empty.
  • CALL sys.create_branch(table => 'test_db.T', branch => 'test_branch')

    CALL sys.create_branch(table => 'test_db.T', branch => 'test_branch', tag => 'my_tag')

    delete_branch To merge a branch to main branch. Arguments:
  • table: the target table identifier. Cannot be empty.
  • branch: name of the branch to be merged. If you specify multiple branches, delimiter is ','.
  • CALL sys.delete_branch(table => 'test_db.T', branch => 'test_branch')
    fast_forward To fast_forward a branch to main branch. Arguments:
  • table: the target table identifier. Cannot be empty.
  • branch: name of the branch to be merged.
  • CALL sys.fast_forward(table => 'test_db.T', branch => 'test_branch')
    reset_consumer To reset or delete consumer. Arguments:
  • identifier: the target table identifier. Cannot be empty.
  • consumerId: consumer to be reset or deleted.
  • nextSnapshotId (Long): the new next snapshot id of the consumer.
  • -- reset the new next snapshot id in the consumer
    CALL sys.reset_consumer(table => 'default.T', consumerId => 'myid', nextSnapshotId => 10)

    -- delete consumer
    CALL sys.reset_consumer(table => 'default.T', consumerId => 'myid')
    mark_partition_done To mark partition to be done. Arguments:
  • table: the target table identifier. Cannot be empty.
  • partitions: partitions need to be mark done, If you specify multiple partitions, delimiter is ';'.
  • -- mark single partition done
    CALL sys.mark_partition_done(table => 'default.T', parititions => 'day=2024-07-01')

    -- mark multiple partitions done
    CALL sys.mark_partition_done(table => 'default.T', parititions => 'day=2024-07-01;day=2024-07-02')
    Edit This Page
    Copyright © 2024 The Apache Software Foundation. Apache Paimon, Paimon, and its feather logo are trademarks of The Apache Software Foundation.