Querying Tables
This documentation is for an unreleased version of Apache Paimon. We recommend you use the latest stable version.

Querying Tables #

Just like all other tables, Paimon tables can be queried with SELECT statement.

Batch Query #

Paimon’s batch read returns all the data in a snapshot of the table. By default, batch reads return the latest snapshot.

-- Flink SQL
SET 'execution.runtime-mode' = 'batch';

Batch Time Travel #

Paimon batch reads with time travel can specify a snapshot or a tag and read the corresponding data.

-- read the snapshot with id 1L
SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */;

-- read the snapshot from specified timestamp in unix milliseconds
SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;

-- read tag 'my-tag'
SELECT * FROM t /*+ OPTIONS('scan.tag-name' = 'my-tag') */;

Flink SQL supports time travel syntax after 1.18.

-- read the snapshot from specified timestamp
SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 00:00:00';

-- you can also use some simple expressions (see flink document to get supported functions)
SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 00:00:00' + INTERVAL '1' DAY

Requires Spark 3.3+.

you can use VERSION AS OF and TIMESTAMP AS OF in query to do time travel:

-- read the snapshot with id 1L (use snapshot id as version)
SELECT * FROM t VERSION AS OF 1;

-- read the snapshot from specified timestamp 
SELECT * FROM t TIMESTAMP AS OF '2023-06-01 00:00:00.123';

-- read the snapshot from specified timestamp in unix seconds
SELECT * FROM t TIMESTAMP AS OF 1678883047;

-- read tag 'my-tag'
SELECT * FROM t VERSION AS OF 'my-tag';
If tag’s name is a number and equals to a snapshot id, the VERSION AS OF syntax will consider tag first. For example, if you have a tag named ‘1’ based on snapshot 2, the statement SELECT * FROM t VERSION AS OF '1' actually queries snapshot 2 instead of snapshot 1.
// read the snapshot from specified timestamp in unix seconds
spark.read
    .option("scan.timestamp-millis", "1678883047000")
    .format("paimon")
    .load("path/to/table")
// read the snapshot with id 1L (use snapshot id as version)
spark.read
    .option("scan.snapshot-id", 1)
    .format("paimon")
    .load("path/to/table")
// read tag 'my-tag'
spark.read
    .option("scan.tag-name", "my-tag")
    .format("paimon")
    .load("path/to/table")
-- read the snapshot from specified timestamp with a long value in unix milliseconds
SET SESSION paimon.scan_timestamp_millis=1679486589444;
SELECT * FROM t;
-- read the snapshot from specified timestamp
SELECT * FROM t FOR TIMESTAMP AS OF TIMESTAMP '2023-01-01 00:00:00 Asia/Shanghai';

-- read the snapshot with id 1L (use snapshot id as version)
SELECT * FROM t FOR VERSION AS OF 1;

Hive requires adding the following configuration parameters to the hive-site.xml file:

<!--This parameter is used to configure the whitelist of permissible configuration items allowed for use in SQL standard authorization mode.-->
<property>
  <name>hive.security.authorization.sqlstd.confwhitelist</name>
  <value>mapred.*|hive.*|mapreduce.*|spark.*</value>
</property>

<!--This parameter is an additional configuration for hive.security.authorization.sqlstd.confwhitelist. It allows you to add other permissible configuration items to the existing whitelist.-->
<property>
 <name>hive.security.authorization.sqlstd.confwhitelist.append</name>
  <value>mapred.*|hive.*|mapreduce.*|spark.*</value>
</property>
-- read the snapshot with id 1L (use snapshot id as version)
SET paimon.scan.snapshot-id=1
SELECT * FROM t;
SET paimon.scan.snapshot-id=null;

-- read the snapshot from specified timestamp in unix seconds
SET paimon.scan.timestamp-millis=1679486589444;
SELECT * FROM t;
SET paimon.scan.timestamp-millis=null;
    
-- read tag 'my-tag'
set paimon.scan.tag-name=my-tag;
SELECT * FROM t;
set paimon.scan.tag-name=null;

Batch Incremental #

Read incremental changes between start snapshot (exclusive) and end snapshot.

For example:

  • ‘5,10’ means changes between snapshot 5 and snapshot 10.
  • ‘TAG1,TAG3’ means changes between TAG1 and TAG3.
-- incremental between snapshot ids
SELECT * FROM t /*+ OPTIONS('incremental-between' = '12,20') */;

-- incremental between snapshot time mills
SELECT * FROM t /*+ OPTIONS('incremental-between-timestamp' = '1692169000000,1692169900000') */;

Requires Spark 3.2+.

Paimon supports that use Spark SQL to do the incremental query that implemented by Spark Table Valued Function. To enable this needs these configs below:

--conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions

you can use paimon_incremental_query in query to extract the incremental data:

-- read the incremental data between snapshot id 12 and snapshot id 20.
SELECT * FROM paimon_incremental_query('tableName', 12, 20);
// incremental between snapshot ids
spark.read()
  .format("paimon")
  .option("incremental-between", "12,20")
  .load("path/to/table")

// incremental between snapshot time mills
spark.read()
  .format("paimon")
  .option("incremental-between-timestamp", "1692169000000,1692169900000")
  .load("path/to/table")
-- incremental between snapshot ids
SET paimon.incremental-between='12,20';
SELECT * FROM t;
SET paimon.incremental-between=null;

-- incremental between snapshot time mills
SET paimon.incremental-between-timestamp='1692169000000,1692169900000';
SELECT * FROM t;
SET paimon.incremental-between-timestamp=null;

In Batch SQL, the DELETE records are not allowed to be returned, so records of -D will be dropped. If you want see DELETE records, you can use audit_log table:

SELECT * FROM t$audit_log /*+ OPTIONS('incremental-between' = '12,20') */;

Streaming Query #

By default, Streaming read produces the latest snapshot on the table upon first startup, and continue to read the latest changes.

Paimon by default ensures that your startup is properly processed with the full amount included.

-- Flink SQL
SET 'execution.runtime-mode' = 'streaming';

You can also do streaming read without the snapshot data, you can use latest scan mode:

-- Continuously reads latest changes without producing a snapshot at the beginning.
SELECT * FROM t /*+ OPTIONS('scan.mode' = 'latest') */;

Streaming Time Travel #

If you only want to process data for today and beyond, you can do so with partitioned filters:

SELECT * FROM t WHERE dt > '2023-06-26';

If it’s not a partitioned table, or you can’t filter by partition, you can use Time travel’s stream read.

-- read changes from snapshot id 1L 
SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */;

-- read changes from snapshot specified timestamp
SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;

-- read snapshot id 1L upon first startup, and continue to read the changes
SELECT * FROM t /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '1') */;

Flink SQL supports time travel syntax after 1.18.

-- read the snapshot from specified timestamp
SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 00:00:00';

-- you can also use some simple expressions (see flink document to get supported functions)
SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 00:00:00' + INTERVAL '1' DAY

Time travel’s stream read rely on snapshots, but by default, snapshot only retains data within 1 hour, which can prevent you from reading older incremental data. So, Paimon also provides another mode for streaming reads, scan.file-creation-time-millis, which provides a rough filtering to retain files generated after timeMillis.

SELECT * FROM t /*+ OPTIONS('scan.file-creation-time-millis' = '1678883047356') */;

Consumer ID #

You can specify the consumer-id when streaming read table:

SELECT * FROM t /*+ OPTIONS('consumer-id' = 'myid') */;

When stream read Paimon tables, the next snapshot id to be recorded into the file system. This has several advantages:

  1. When previous job is stopped, the newly started job can continue to consume from the previous progress without resuming from the state. The newly reading will start reading from next snapshot id found in consumer files. If you don’t want this behavior, you can set 'consumer.ignore-progress' to true.
  2. When deciding whether a snapshot has expired, Paimon looks at all the consumers of the table in the file system, and if there are consumers that still depend on this snapshot, then this snapshot will not be deleted by expiration.
NOTE: The consumer will prevent expiration of the snapshot. You can specify ‘consumer.expiration-time’ to manage the lifetime of consumers.

By default, the consumer uses exactly-once mode to record consumption progress, which strictly ensures that what is recorded in the consumer is the snapshot-id + 1 that all readers have exactly consumed. You can set consumer.mode to at-least-once to allow readers consume snapshots at different rates and record the slowest snapshot-id among all readers into the consumer. This mode can provide more capabilities, such as watermark alignment.

Since the implementation of exactly-once mode and at-least-once mode are completely different, the state of flink is incompatible and cannot be restored from the state when switching modes.

You can reset a consumer with a given consumer ID and next snapshot ID and delete a consumer with a given consumer ID.

First, you need to stop the streaming task using this consumer ID, and then execute the reset consumer action job.

Run the following command:

<FLINK_HOME>/bin/flink run \
    /path/to/paimon-flink-action-0.8-SNAPSHOT.jar \
    reset-consumer \
    --warehouse <warehouse-path> \
    --database <database-name> \ 
    --table <table-name> \
    --consumer_id <consumer-id> \
    [--next_snapshot <next-snapshot-id>] \
    [--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]

please don’t specify –next_snapshot parameter if you want to delete the consumer.

Read Overwrite #

Streaming reading will ignore the commits generated by INSERT OVERWRITE by default. If you want to read the commits of OVERWRITE, you can configure streaming-read-overwrite.

Read Parallelism #

By default, the parallelism of batch reads is the same as the number of splits, while the parallelism of stream reads is the same as the number of buckets, but not greater than scan.infer-parallelism.max.

Disable scan.infer-parallelism, global parallelism will be used for reads.

You can also manually specify the parallelism from scan.parallelism.

Key Default Type Description
scan.infer-parallelism
true Boolean If it is false, parallelism of source are set by global parallelism. Otherwise, source parallelism is inferred from splits number (batch mode) or bucket number(streaming mode).
scan.infer-parallelism.max
1024 Integer If scan.infer-parallelism is true, limit the parallelism of source through this option.
scan.parallelism
(none) Integer Define a custom parallelism for the scan source. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration. If user enable the scan.infer-parallelism, the planner will derive the parallelism by inferred parallelism.

Query Optimization #

Batch Streaming

It is highly recommended to specify partition and primary key filters along with the query, which will speed up the data skipping of the query.

The filter functions that can accelerate data skipping are:

  • =
  • <
  • <=
  • >
  • >=
  • IN (...)
  • LIKE 'abc%'
  • IS NULL

Paimon will sort the data by primary key, which speeds up the point queries and range queries. When using a composite primary key, it is best for the query filters to form a leftmost prefix of the primary key for good acceleration.

Suppose that a table has the following specification:

CREATE TABLE orders (
    catalog_id BIGINT,
    order_id BIGINT,
    .....,
    PRIMARY KEY (catalog_id, order_id) NOT ENFORCED -- composite primary key
);

The query obtains a good acceleration by specifying a range filter for the leftmost prefix of the primary key.

SELECT * FROM orders WHERE catalog_id=1025;

SELECT * FROM orders WHERE catalog_id=1025 AND order_id=29495;

SELECT * FROM orders
  WHERE catalog_id=1025
  AND order_id>2035 AND order_id<6000;

However, the following filter cannot accelerate the query well.

SELECT * FROM orders WHERE order_id=29495;

SELECT * FROM orders WHERE catalog_id=1025 OR order_id=29495;