This documentation is for an unreleased version of Apache Paimon. We recommend you use the latest stable version.
Structured Streaming #
Paimon supports streaming data processing with Spark Structured Streaming, enabling both streaming write and streaming query.
Streaming Write #
Paimon Structured Streaming only supports the twoappend
andcomplete
modes.
// Create a paimon table if not exists.
spark.sql(s"""
|CREATE TABLE T (k INT, v STRING)
|TBLPROPERTIES ('primary-key'='k', 'bucket'='3')
|""".stripMargin)
// Here we use MemoryStream to fake a streaming source.
val inputData = MemoryStream[(Int, String)]
val df = inputData.toDS().toDF("k", "v")
// Streaming Write to paimon table.
val stream = df
.writeStream
.outputMode("append")
.option("checkpointLocation", "/path/to/checkpoint")
.format("paimon")
.start("/path/to/paimon/sink/table")
Streaming write also supports Write merge schema.
Streaming Query #
Paimon currently supports Spark 3.3+ for streaming read.
Paimon supports rich scan mode for streaming read. There is a list:
Scan Mode | Description |
---|---|
latest |
For streaming sources, continuously reads latest changes without producing a snapshot at the beginning. |
latest-full |
For streaming sources, produces the latest snapshot on the table upon first startup, and continue to read the latest changes. |
from-timestamp |
For streaming sources, continuously reads changes starting from timestamp specified by "scan.timestamp-millis", without producing a snapshot at the beginning. |
from-snapshot |
For streaming sources, continuously reads changes starting from snapshot specified by "scan.snapshot-id", without producing a snapshot at the beginning. |
from-snapshot-full |
For streaming sources, produces from snapshot specified by "scan.snapshot-id" on the table upon first startup, and continuously reads changes. |
default |
It is equivalent to from-snapshot if "scan.snapshot-id" is specified. It is equivalent to from-timestamp if "timestamp-millis" is specified. Or, It is equivalent to latest-full. |
A simple example with default scan mode:
// no any scan-related configs are provided, that will use latest-full scan mode.
val query = spark.readStream
.format("paimon")
// by table name
.table("table_name")
// or by location
// .load("/path/to/paimon/source/table")
.writeStream
.format("console")
.start()
Paimon Structured Streaming also supports a variety of streaming read modes, it can support many triggers and many read limits.
These read limits are supported:
Key | Default | Type | Description |
---|---|---|---|
read.stream.maxFilesPerTrigger |
(none) | Integer | The maximum number of files returned in a single batch. |
read.stream.maxBytesPerTrigger |
(none) | Long | The maximum number of bytes returned in a single batch. |
read.stream.maxRowsPerTrigger |
(none) | Long | The maximum number of rows returned in a single batch. |
read.stream.minRowsPerTrigger |
(none) | Long | The minimum number of rows returned in a single batch, which used to create MinRowsReadLimit with read.stream.maxTriggerDelayMs together. |
read.stream.maxTriggerDelayMs |
(none) | Long | The maximum delay between two adjacent batches, which used to create MinRowsReadLimit with read.stream.minRowsPerTrigger together. |
Example: One
Use org.apache.spark.sql.streaming.Trigger.AvailableNow()
and maxBytesPerTrigger
defined by paimon.
// Trigger.AvailableNow()) processes all available data at the start
// of the query in one or multiple batches, then terminates the query.
// That set read.stream.maxBytesPerTrigger to 128M means that each
// batch processes a maximum of 128 MB of data.
val query = spark.readStream
.format("paimon")
.option("read.stream.maxBytesPerTrigger", "134217728")
.table("table_name")
.writeStream
.format("console")
.trigger(Trigger.AvailableNow())
.start()
Example: Two
Use org.apache.spark.sql.connector.read.streaming.ReadMinRows
.
// It will not trigger a batch until there are more than 5,000 pieces of data,
// unless the interval between the two batches is more than 300 seconds.
val query = spark.readStream
.format("paimon")
.option("read.stream.minRowsPerTrigger", "5000")
.option("read.stream.maxTriggerDelayMs", "300000")
.table("table_name")
.writeStream
.format("console")
.start()
Paimon Structured Streaming supports read row in the form of changelog (add rowkind column in row to represent its change type) in two ways:
- Direct streaming read with the system audit_log table
- Set
read.changelog
to true (default is false), then streaming read with table location
Example:
// Option 1
val query1 = spark.readStream
.format("paimon")
.table("`table_name$audit_log`")
.writeStream
.format("console")
.start()
// Option 2
val query2 = spark.readStream
.format("paimon")
.option("read.changelog", "true")
.table("table_name")
.writeStream
.format("console")
.start()
/*
+I 1 Hi
+I 2 Hello
*/