Skip to main content

Structured Streaming

Paimon supports streaming data processing with Spark Structured Streaming, enabling both streaming write and streaming query.

Streaming Write

info

Paimon Structured Streaming only supports the two append and complete modes.

// Create a paimon table if not exists.
spark.sql(s"""
|CREATE TABLE T (k INT, v STRING)
|TBLPROPERTIES ('primary-key'='k', 'bucket'='3')
|""".stripMargin)

// Here we use MemoryStream to fake a streaming source.
val inputData = MemoryStream[(Int, String)]
val df = inputData.toDS().toDF("k", "v")

// Streaming Write to paimon table.
val stream = df
.writeStream
.outputMode("append")
.option("checkpointLocation", "/path/to/checkpoint")
.format("paimon")
.start("/path/to/paimon/sink/table")

Streaming write also supports Write merge schema.

Streaming Query

info

Paimon currently supports Spark 3.3+ for streaming read.

Paimon supports rich scan mode for streaming read. There is a list:

Scan Mode Description
latest
For streaming sources, continuously reads latest changes without producing a snapshot at the beginning.
latest-full
For streaming sources, produces the latest snapshot on the table upon first startup, and continue to read the latest changes.
from-timestamp
For streaming sources, continuously reads changes starting from timestamp specified by "scan.timestamp-millis", without producing a snapshot at the beginning.
from-snapshot
For streaming sources, continuously reads changes starting from snapshot specified by "scan.snapshot-id", without producing a snapshot at the beginning.
from-snapshot-full
For streaming sources, produces from snapshot specified by "scan.snapshot-id" on the table upon first startup, and continuously reads changes.
default
It is equivalent to from-snapshot if "scan.snapshot-id" is specified. It is equivalent to from-timestamp if "timestamp-millis" is specified. Or, It is equivalent to latest-full.

A simple example with default scan mode:

// no any scan-related configs are provided, that will use latest-full scan mode.
val query = spark.readStream
.format("paimon")
// by table name
.table("table_name")
// or by location
// .load("/path/to/paimon/source/table")
.writeStream
.format("console")
.start()

Paimon Structured Streaming also supports a variety of streaming read modes, it can support many triggers and many read limits.

These read limits are supported:

Key Default Type Description
read.stream.maxFilesPerTrigger
(none) Integer The maximum number of files returned in a single batch.
read.stream.maxBytesPerTrigger
(none) Long The maximum number of bytes returned in a single batch.
read.stream.maxRowsPerTrigger
(none) Long The maximum number of rows returned in a single batch.
read.stream.minRowsPerTrigger
(none) Long The minimum number of rows returned in a single batch, which used to create MinRowsReadLimit with read.stream.maxTriggerDelayMs together.
read.stream.maxTriggerDelayMs
(none) Long The maximum delay between two adjacent batches, which used to create MinRowsReadLimit with read.stream.minRowsPerTrigger together.

Example: One

Use org.apache.spark.sql.streaming.Trigger.AvailableNow() and maxBytesPerTrigger defined by paimon.

// Trigger.AvailableNow()) processes all available data at the start
// of the query in one or multiple batches, then terminates the query.
// That set read.stream.maxBytesPerTrigger to 128M means that each
// batch processes a maximum of 128 MB of data.
val query = spark.readStream
.format("paimon")
.option("read.stream.maxBytesPerTrigger", "134217728")
.table("table_name")
.writeStream
.format("console")
.trigger(Trigger.AvailableNow())
.start()

Example: Two

Use org.apache.spark.sql.connector.read.streaming.ReadMinRows.

// It will not trigger a batch until there are more than 5,000 pieces of data,
// unless the interval between the two batches is more than 300 seconds.
val query = spark.readStream
.format("paimon")
.option("read.stream.minRowsPerTrigger", "5000")
.option("read.stream.maxTriggerDelayMs", "300000")
.table("table_name")
.writeStream
.format("console")
.start()

Paimon Structured Streaming supports read row in the form of changelog (add rowkind column in row to represent its change type) in two ways:

  • Direct streaming read with the system audit_log table
  • Set read.changelog to true (default is false), then streaming read with table location

Example:

// Option 1
val query1 = spark.readStream
.format("paimon")
.table("`table_name$audit_log`")
.writeStream
.format("console")
.start()

// Option 2
val query2 = spark.readStream
.format("paimon")
.option("read.changelog", "true")
.table("table_name")
.writeStream
.format("console")
.start()

/*
+I 1 Hi
+I 2 Hello
*/