SQL Query #
Just like all other tables, Paimon tables can be queried with SELECT
statement.
Batch Query #
Paimon’s batch read returns all the data in a snapshot of the table. By default, batch reads return the latest snapshot.
Batch Time Travel #
Paimon batch reads with time travel can specify a snapshot or a tag and read the corresponding data.
Requires Spark 3.3+.
you can use VERSION AS OF
and TIMESTAMP AS OF
in query to do time travel:
-- read the snapshot with id 1L (use snapshot id as version)
SELECT * FROM t VERSION AS OF 1;
-- read the snapshot from specified timestamp
SELECT * FROM t TIMESTAMP AS OF '2023-06-01 00:00:00.123';
-- read the snapshot from specified timestamp in unix seconds
SELECT * FROM t TIMESTAMP AS OF 1678883047;
-- read tag 'my-tag'
SELECT * FROM t VERSION AS OF 'my-tag';
-- read the snapshot from specified watermark. will match the first snapshot after the watermark
SELECT * FROM t VERSION AS OF 'watermark-1678883047356';
If tag’s name is a number and equals to a snapshot id, the VERSION AS OF syntax will consider tag first. For example, if
you have a tag named ‘1’ based on snapshot 2, the statement SELECT * FROM t VERSION AS OF '1'
actually queries snapshot 2
instead of snapshot 1.
Batch Incremental #
Read incremental changes between start snapshot (exclusive) and end snapshot.
For example:
- ‘5,10’ means changes between snapshot 5 and snapshot 10.
- ‘TAG1,TAG3’ means changes between TAG1 and TAG3.
By default, will scan changelog files for the table which produces changelog files. Otherwise, scan newly changed files.
You can also force specifying 'incremental-between-scan-mode'
.
Requires Spark 3.2+.
Paimon supports that use Spark SQL to do the incremental query that implemented by Spark Table Valued Function. To enable this needs these configs below:
--conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
you can use paimon_incremental_query
in query to extract the incremental data:
-- read the incremental data between snapshot id 12 and snapshot id 20.
SELECT * FROM paimon_incremental_query('tableName', 12, 20);
In Batch SQL, the DELETE
records are not allowed to be returned, so records of -D
will be dropped.
If you want see DELETE
records, you can query audit_log table.
Streaming Query #
Paimon currently supports Spark 3.3+ for streaming read.
Paimon supports rich scan mode for streaming read. There is a list:
Scan Mode | Description |
---|---|
latest |
For streaming sources, continuously reads latest changes without producing a snapshot at the beginning. |
latest-full |
For streaming sources, produces the latest snapshot on the table upon first startup, and continue to read the latest changes. |
from-timestamp |
For streaming sources, continuously reads changes starting from timestamp specified by "scan.timestamp-millis", without producing a snapshot at the beginning. |
from-snapshot |
For streaming sources, continuously reads changes starting from snapshot specified by "scan.snapshot-id", without producing a snapshot at the beginning. |
from-snapshot-full |
For streaming sources, produces from snapshot specified by "scan.snapshot-id" on the table upon first startup, and continuously reads changes. |
default |
It is equivalent to from-snapshot if "scan.snapshot-id" is specified. It is equivalent to from-timestamp if "timestamp-millis" is specified. Or, It is equivalent to latest-full. |
A simple example with default scan mode:
// no any scan-related configs are provided, that will use latest-full scan mode.
val query = spark.readStream
.format("paimon")
.load("/path/to/paimon/source/table")
.writeStream
.format("console")
.start()
Paimon Structured Streaming also supports a variety of streaming read modes, it can support many triggers and many read limits.
These read limits are supported:
Key | Default | Type | Description |
---|---|---|---|
read.stream.maxFilesPerTrigger |
(none) | Integer | The maximum number of files returned in a single batch. |
read.stream.maxBytesPerTrigger |
(none) | Long | The maximum number of bytes returned in a single batch. |
read.stream.maxRowsPerTrigger |
(none) | Long | The maximum number of rows returned in a single batch. |
read.stream.minRowsPerTrigger |
(none) | Long | The minimum number of rows returned in a single batch, which used to create MinRowsReadLimit with read.stream.maxTriggerDelayMs together. |
read.stream.maxTriggerDelayMs |
(none) | Long | The maximum delay between two adjacent batches, which used to create MinRowsReadLimit with read.stream.minRowsPerTrigger together. |
Example: One
Use org.apache.spark.sql.streaming.Trigger.AvailableNow()
and maxBytesPerTrigger
defined by paimon.
// Trigger.AvailableNow()) processes all available data at the start
// of the query in one or multiple batches, then terminates the query.
// That set read.stream.maxBytesPerTrigger to 128M means that each
// batch processes a maximum of 128 MB of data.
val query = spark.readStream
.format("paimon")
.option("read.stream.maxBytesPerTrigger", "134217728")
.load("/path/to/paimon/source/table")
.writeStream
.format("console")
.trigger(Trigger.AvailableNow())
.start()
Example: Two
Use org.apache.spark.sql.connector.read.streaming.ReadMinRows
.
// It will not trigger a batch until there are more than 5,000 pieces of data,
// unless the interval between the two batches is more than 300 seconds.
val query = spark.readStream
.format("paimon")
.option("read.stream.minRowsPerTrigger", "5000")
.option("read.stream.maxTriggerDelayMs", "300000")
.load("/path/to/paimon/source/table")
.writeStream
.format("console")
.start()
Paimon Structured Streaming supports read row in the form of changelog (add rowkind column in row to represent its change type) in two ways:
- Direct streaming read with the system audit_log table
- Set
read.changelog
to true (default is false), then streaming read with table location
Example:
// Option 1
val query1 = spark.readStream
.format("paimon")
.table("`table_name$audit_log`")
.writeStream
.format("console")
.start()
// Option 2
val query2 = spark.readStream
.format("paimon")
.option("read.changelog", "true")
.load("/path/to/paimon/source/table")
.writeStream
.format("console")
.start()
/*
+I 1 Hi
+I 2 Hello
*/
Query Optimization #
It is highly recommended to specify partition and primary key filters along with the query, which will speed up the data skipping of the query.
The filter functions that can accelerate data skipping are:
=
<
<=
>
>=
IN (...)
LIKE 'abc%'
IS NULL
Paimon will sort the data by primary key, which speeds up the point queries and range queries. When using a composite primary key, it is best for the query filters to form a leftmost prefix of the primary key for good acceleration.
Suppose that a table has the following specification:
CREATE TABLE orders (
catalog_id BIGINT,
order_id BIGINT,
.....,
) TBLPROPERTIES (
'primary-key' = 'catalog_id,order_id'
);
The query obtains a good acceleration by specifying a range filter for the leftmost prefix of the primary key.
SELECT * FROM orders WHERE catalog_id=1025;
SELECT * FROM orders WHERE catalog_id=1025 AND order_id=29495;
SELECT * FROM orders
WHERE catalog_id=1025
AND order_id>2035 AND order_id<6000;
However, the following filter cannot accelerate the query well.
SELECT * FROM orders WHERE order_id=29495;
SELECT * FROM orders WHERE catalog_id=1025 OR order_id=29495;