Structured Streaming
Paimon supports streaming data processing with Spark Structured Streaming, enabling both streaming write and streaming query.
Streaming Write
Paimon Structured Streaming only supports the two append and complete modes.
// Create a paimon table if not exists.
spark.sql(s"""
|CREATE TABLE T (k INT, v STRING)
|TBLPROPERTIES ('primary-key'='k', 'bucket'='3')
|""".stripMargin)
// Here we use MemoryStream to fake a streaming source.
val inputData = MemoryStream[(Int, String)]
val df = inputData.toDS().toDF("k", "v")
// Streaming Write to paimon table.
val stream = df
.writeStream
.outputMode("append")
.option("checkpointLocation", "/path/to/checkpoint")
.format("paimon")
.start("/path/to/paimon/sink/table")
Streaming write also supports Write merge schema.
Streaming Query
Paimon currently supports Spark 3.3+ for streaming read.
Paimon supports rich scan mode for streaming read. There is a list:
| Scan Mode | Description |
|---|---|
latest |
For streaming sources, continuously reads latest changes without producing a snapshot at the beginning. |
latest-full |
For streaming sources, produces the latest snapshot on the table upon first startup, and continue to read the latest changes. |
from-timestamp |
For streaming sources, continuously reads changes starting from timestamp specified by "scan.timestamp-millis", without producing a snapshot at the beginning. |
from-snapshot |
For streaming sources, continuously reads changes starting from snapshot specified by "scan.snapshot-id", without producing a snapshot at the beginning. |
from-snapshot-full |
For streaming sources, produces from snapshot specified by "scan.snapshot-id" on the table upon first startup, and continuously reads changes. |
default |
It is equivalent to from-snapshot if "scan.snapshot-id" is specified. It is equivalent to from-timestamp if "timestamp-millis" is specified. Or, It is equivalent to latest-full. |
A simple example with default scan mode:
// no any scan-related configs are provided, that will use latest-full scan mode.
val query = spark.readStream
.format("paimon")
// by table name
.table("table_name")
// or by location
// .load("/path/to/paimon/source/table")
.writeStream
.format("console")
.start()
Paimon Structured Streaming also supports a variety of streaming read modes, it can support many triggers and many read limits.
These read limits are supported:
| Key | Default | Type | Description |
|---|---|---|---|
read.stream.maxFilesPerTrigger |
(none) | Integer | The maximum number of files returned in a single batch. |
read.stream.maxBytesPerTrigger |
(none) | Long | The maximum number of bytes returned in a single batch. |
read.stream.maxRowsPerTrigger |
(none) | Long | The maximum number of rows returned in a single batch. |
read.stream.minRowsPerTrigger |
(none) | Long | The minimum number of rows returned in a single batch, which used to create MinRowsReadLimit with read.stream.maxTriggerDelayMs together. |
read.stream.maxTriggerDelayMs |
(none) | Long | The maximum delay between two adjacent batches, which used to create MinRowsReadLimit with read.stream.minRowsPerTrigger together. |
Example: One
Use org.apache.spark.sql.streaming.Trigger.AvailableNow() and maxBytesPerTrigger defined by paimon.
// Trigger.AvailableNow()) processes all available data at the start
// of the query in one or multiple batches, then terminates the query.
// That set read.stream.maxBytesPerTrigger to 128M means that each
// batch processes a maximum of 128 MB of data.
val query = spark.readStream
.format("paimon")
.option("read.stream.maxBytesPerTrigger", "134217728")
.table("table_name")
.writeStream
.format("console")
.trigger(Trigger.AvailableNow())
.start()
Example: Two
Use org.apache.spark.sql.connector.read.streaming.ReadMinRows.
// It will not trigger a batch until there are more than 5,000 pieces of data,
// unless the interval between the two batches is more than 300 seconds.
val query = spark.readStream
.format("paimon")
.option("read.stream.minRowsPerTrigger", "5000")
.option("read.stream.maxTriggerDelayMs", "300000")
.table("table_name")
.writeStream
.format("console")
.start()
Paimon Structured Streaming supports read row in the form of changelog (add rowkind column in row to represent its change type) in two ways:
- Direct streaming read with the system audit_log table
- Set
read.changelogto true (default is false), then streaming read with table location
Example:
// Option 1
val query1 = spark.readStream
.format("paimon")
.table("`table_name$audit_log`")
.writeStream
.format("console")
.start()
// Option 2
val query2 = spark.readStream
.format("paimon")
.option("read.changelog", "true")
.table("table_name")
.writeStream
.format("console")
.start()
/*
+I 1 Hi
+I 2 Hello
*/