Manage Snapshots
This section will describe the management and behavior related to snapshots.
Expire Snapshots
Paimon writers generate one or two snapshot per commit. Each snapshot may add some new data files or mark some old data files as deleted. However, the marked data files are not truly deleted because Paimon also supports time traveling to an earlier snapshot. They are only deleted when the snapshot expires.
Currently, expiration is automatically performed by Paimon writers when committing new changes. By expiring old snapshots, old data files and metadata files that are no longer used can be deleted to release disk space.
Snapshot expiration is controlled by the following table properties.
| Option | Required | Default | Type | Description |
|---|---|---|---|---|
snapshot.time-retained | No | 1 h | Duration | The maximum time of completed snapshots to retain. |
snapshot.num-retained.min | No | 10 | Integer | The minimum number of completed snapshots to retain. Should be greater than or equal to 1. |
snapshot.num-retained.max | No | Integer.MAX_VALUE | Integer | The maximum number of completed snapshots to retain. Should be greater than or equal to the minimum number. |
snapshot.expire.execution-mode | No | sync | Enum | Specifies the execution mode of expire. |
snapshot.expire.limit | No | 10 | Integer | The maximum number of snapshots allowed to expire at a time. |
snapshot.clean-empty-directories | No | false | Boolean | Whether to try to delete empty directories (e.g. partition and bucket directories) left behind after the data files are deleted during snapshot expiration. Defaults to false: empty directories are kept. Enabling it has caveats: HDFS may print exceptions in NameNode, and object stores (OSS/S3) may suffer performance issues due to the extra prefix operations required to list and delete directory markers. |
When the number of snapshots is less than snapshot.num-retained.min, no snapshots will be expired(even the condition snapshot.time-retained meet), after which snapshot.num-retained.max and snapshot.time-retained will be used to control the snapshot expiration until the remaining snapshot meets the condition.
Note that snapshot expiration is also what physically deletes data files dropped by partition expiration. However, the empty partition and bucket directories left behind after the data files are deleted are not removed by default. To clean them up, enable snapshot.clean-empty-directories (see the option above). This is off by default because on object stores (OSS/S3) the prefix operations needed to delete directory markers can be expensive.
The following example show more details(snapshot.num-retained.min is 2, snapshot.time-retained is 1h, snapshot.num-retained.max is 5):
snapshot item is described using tuple (snapshotId, corresponding time)
| New Snapshots | All snapshots after expiration check | explanation |
|---|---|---|
(snapshots-1, 2023-07-06 10:00) | (snapshots-1, 2023-07-06 10:00) | No snapshot expired |
(snapshots-2, 2023-07-06 10:20) | (snapshots-1, 2023-07-06 10:00) | No snapshot expired |
(snapshots-3, 2023-07-06 10:40) | (snapshots-1, 2023-07-06 10:00) | No snapshot expired |
(snapshots-4, 2023-07-06 11:00) | (snapshots-1, 2023-07-06 10:00) | No snapshot expired |
(snapshots-5, 2023-07-06 11:20) | (snapshots-2, 2023-07-06 10:20) | snapshot-1 was expired because the condition |
(snapshots-6, 2023-07-06 11:30) | (snapshots-3, 2023-07-06 10:40) | snapshot-2 was expired because the condition |
(snapshots-7, 2023-07-06 11:35) | (snapshots-3, 2023-07-06 10:40) | No snapshot expired |
(snapshots-8, 2023-07-06 11:36) | (snapshots-4, 2023-07-06 11:00) | snapshot-3 was expired because the condition |
Please note that too short retain time or too small retain number may result in:
- Batch queries cannot find the file. For example, the table is relatively large and the batch query takes 10 minutes to read, but the snapshot from 10 minutes ago expires, at which point the batch query will read a deleted snapshot.
- Streaming reading jobs on table files fail to restart. When the job restarts, the snapshot it recorded may have expired. (You can use Consumer Id to protect streaming reading in a small retain time of snapshot expiration).
By default, paimon will delete expired snapshots synchronously. When there are too
many files that need to be deleted, they may not be deleted quickly and back-pressured
to the upstream operator. To avoid this situation, users can use asynchronous expiration
mode by setting snapshot.expire.execution-mode to async. However, if your job runs in
batch mode, it is not recommended to use asynchronous expiration mode, as the expire task
may fail to complete successfully.
Manually expire snapshot
Manually expire a table's snapshots
- Flink SQL
- Flink Action
- Spark
Run the following command:
-- for Flink 1.18
CALL sys.expire_snapshots('database_name.table_name', 2)
-- for Flink 1.19 and later
CALL sys.expire_snapshots(`table` => 'database_name.table_name', retain_max => 2)
CALL sys.expire_snapshots(`table` => 'database_name.table_name', older_than => '2024-01-01 12:00:00')
CALL sys.expire_snapshots(`table` => 'database_name.table_name', older_than => '2024-01-01 12:00:00', retain_min => 10)
CALL sys.expire_snapshots(`table` => 'database_name.table_name', older_than => '2024-01-01 12:00:00', max_deletes => 10, options => 'snapshot.expire.limit=1')
Run the following command:
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-1.5-SNAPSHOT.jar \
expire_snapshots \
--warehouse <warehouse-path> \
--identifier <identifier> \
--older_than <timestamp> \
--version <snapshot-id> \
--max_deletes <max-deletes> \
--retain_max <retain-max> \
--retain_min <retain-min> \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]
Run the following sql:
CALL sys.expire_snapshots(table => 'database_name.table_name', retain_max => 10, options => 'snapshot.expire.limit=1');
Rollback to Snapshot
Rollback a table to a specific snapshot ID.
- Flink SQL
- Flink Action
- Java API
- Spark
Run the following command:
CALL sys.rollback_to(`table` => 'database_name.table_name', snapshot_id => <snasphot-id>);
Run the following command:
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-1.5-SNAPSHOT.jar \
rollback_to \
--warehouse <warehouse-path> \
--database <database-name> \
--table <table-name> \
--version <snapshot-id> \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]
import org.apache.paimon.table.Table;
public class RollbackTo {
public static void main(String[] args) {
// before rollback:
// snapshot-3
// snapshot-4
// snapshot-5
// snapshot-6
// snapshot-7
table.rollbackTo(5);
// after rollback:
// snapshot-3
// snapshot-4
// snapshot-5
}
}
Run the following sql:
CALL sys.rollback(table => 'database_name.table_name', snapshot => snasphot_id);
Remove Orphan Files
Paimon files are deleted physically only when expiring snapshots. However, it is possible that some unexpected errors occurred
when deleting files, so that there may exist files that are not used by Paimon snapshots (so-called "orphan files"). You can
submit a remove_orphan_files job to clean them:
- Spark SQL/Flink SQL
- Flink Action
CALL sys.remove_orphan_files(`table` => 'my_db.my_table', [older_than => '2023-10-31 12:00:00'])
CALL sys.remove_orphan_files(`table` => 'my_db.*', [older_than => '2023-10-31 12:00:00'])
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-1.5-SNAPSHOT.jar \
remove_orphan_files \
--warehouse <warehouse-path> \
--database <database-name> \
--table <table-name> \
[--older_than <timestamp>] \
[--dry_run <false/true>] \
[--parallelism <parallelism>]
To avoid deleting files that are newly added by other writing jobs, this action only deletes orphan files older than
1 day by default. The interval can be modified by --older_than. For example:
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-1.5-SNAPSHOT.jar \
remove_orphan_files \
--warehouse <warehouse-path> \
--database <database-name> \
--table T \
--older_than '2023-10-31 12:00:00'
The table can be * to clean all tables in the database.