Manage Tags

Manage Tags #

Paimon’s snapshots can provide an easy way to query historical data. But in most scenarios, a job will generate too many snapshots and table will expire old snapshots according to table configuration. Snapshot expiration will also delete old data files, and the historical data of expired snapshots cannot be queried anymore.

To solve this problem, you can create a tag based on a snapshot. The tag will maintain the manifests and data files of the snapshot. A typical usage is creating tags daily, then you can maintain the historical data of each day for batch reading.

Automatic Creation #

Paimon supports automatic creation of tags in writing job.

Step 1: Choose Creation Mode

You can set creation mode by table option 'tag.automatic-creation'. Supported values are:

  • process-time: Create TAG based on the time of the machine.
  • watermark: Create TAG based on the watermark of the Sink input.
  • batch: In a batch processing scenario, a tag is generated after the current task is completed.
If you choose Watermark, you may need to specify the time zone of watermark, if watermark is not in the UTC time zone, please configure 'sink.watermark-time-zone'.

Step 2: Choose Creation Period

What frequency is used to generate tags. You can choose 'daily', 'hourly' and 'two-hours' for 'tag.creation-period'.

If you need to wait for late data, you can configure a delay time: 'tag.creation-delay'.

Step 3: Automatic deletion of tags

You can configure 'tag.num-retained-max' or tag.default-time-retained to delete tags automatically.

Example, configure table to create a tag at 0:10 every day, with a maximum retention time of 3 months:

-- Flink SQL
CREATE TABLE t (
    k INT PRIMARY KEY NOT ENFORCED,
    f0 INT,
    ...
) WITH (
    'tag.automatic-creation' = 'process-time',
    'tag.creation-period' = 'daily',
    'tag.creation-delay' = '10 m',
    'tag.num-retained-max' = '90'
);

INSERT INTO t SELECT ...;

-- Spark SQL

-- Read latest snapshot
SELECT * FROM t;

-- Read Tag snapshot
SELECT * FROM t VERSION AS OF '2023-07-26';

-- Read Incremental between Tags
SELECT * FROM paimon_incremental_query('t', '2023-07-25', '2023-07-26');

See Query Tables to see more query for Spark.

Create Tags #

You can create a tag with given name and snapshot ID.

Run the following command:

CALL sys.create_tag(`table` => 'database_name.table_name', tag => 'tag_name', [snapshot_id => <snapshot-id>]);

If snapshot_id unset, snapshot_id defaults to the latest.

Run the following command:

<FLINK_HOME>/bin/flink run \
    /path/to/paimon-flink-action-1.0.0.jar \
    create_tag \
    --warehouse <warehouse-path> \
    --database <database-name> \ 
    --table <table-name> \
    --tag_name <tag-name> \
    [--snapshot <snapshot_id>] \
    [--time_retained <time-retained>] \
    [--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]

If snapshot unset, snapshot_id defaults to the latest.

import org.apache.paimon.table.Table;

public class CreateTag {

    public static void main(String[] args) {
        Table table = ...;
        table.createTag("my-tag", 1);
        table.createTag("my-tag-retained-12-hours", 1, Duration.ofHours(12));
    }
}

Run the following sql:

CALL sys.create_tag(table => 'test.t', tag => 'test_tag', snapshot => 2);

To create a tag with retained 1 day, run the following sql:

CALL sys.create_tag(table => 'test.t', tag => 'test_tag', snapshot => 2, time_retained => '1 d');

To create a tag based on the latest snapshot id, run the following sql:

CALL sys.create_tag(table => 'test.t', tag => 'test_tag');

Delete Tags #

You can delete a tag by its name.

Run the following command:

CALL sys.delete_tag(`table` => 'database_name.table_name', tag => 'tag_name');

Run the following command:

<FLINK_HOME>/bin/flink run \
    /path/to/paimon-flink-action-1.0.0.jar \
    delete_tag \
    --warehouse <warehouse-path> \
    --database <database-name> \ 
    --table <table-name> \
    --tag_name <tag-name> \
    [--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]
import org.apache.paimon.table.Table;

public class DeleteTag {

    public static void main(String[] args) {
        Table table = ...;
        table.deleteTag("my-tag");
    }
}

Run the following sql:

CALL sys.delete_tag(table => 'test.t', tag => 'test_tag');

Rollback to Tag #

Rollback table to a specific tag. All snapshots and tags whose snapshot id is larger than the tag will be deleted (and the data will be deleted too).

Run the following command:

CALL sys.rollback_to(`table` => 'database_name.table_name', tag => 'tag_name');

Run the following command:

<FLINK_HOME>/bin/flink run \
    /path/to/paimon-flink-action-1.0.0.jar \
    rollback_to \
    --warehouse <warehouse-path> \
    --database <database-name> \ 
    --table <table-name> \
    --version <tag-name> \
    [--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]
import org.apache.paimon.table.Table;

public class RollbackTo {

    public static void main(String[] args) {
        // before rollback:
        // snapshot-3 [expired] -> tag3
        // snapshot-4 [expired]
        // snapshot-5 -> tag5
        // snapshot-6
        // snapshot-7
      
        table.rollbackTo("tag3");
        
        // after rollback:
        // snapshot-3 -> tag3
    }
}

Run the following sql:

CALL sys.rollback(table => 'test.t', version => '2');
Edit This Page
Copyright © 2024 The Apache Software Foundation. Apache Paimon, Paimon, and its feather logo are trademarks of The Apache Software Foundation.