Structured Streaming
This documentation is for an unreleased version of Apache Paimon. We recommend you use the latest stable version.

Structured Streaming #

Paimon supports streaming data processing with Spark Structured Streaming, enabling both streaming write and streaming query.

Streaming Write #

Paimon Structured Streaming only supports the two append and complete modes.
// Create a paimon table if not exists.
spark.sql(s"""
           |CREATE TABLE T (k INT, v STRING)
           |TBLPROPERTIES ('primary-key'='k', 'bucket'='3')
           |""".stripMargin)

// Here we use MemoryStream to fake a streaming source.
val inputData = MemoryStream[(Int, String)]
val df = inputData.toDS().toDF("k", "v")

// Streaming Write to paimon table.
val stream = df
  .writeStream
  .outputMode("append")
  .option("checkpointLocation", "/path/to/checkpoint")
  .format("paimon")
  .start("/path/to/paimon/sink/table")

Streaming write also supports Write merge schema.

Streaming Query #

Paimon currently supports Spark 3.3+ for streaming read.

Paimon supports rich scan mode for streaming read. There is a list:

Scan Mode Description
latest
For streaming sources, continuously reads latest changes without producing a snapshot at the beginning.
latest-full
For streaming sources, produces the latest snapshot on the table upon first startup, and continue to read the latest changes.
from-timestamp
For streaming sources, continuously reads changes starting from timestamp specified by "scan.timestamp-millis", without producing a snapshot at the beginning.
from-snapshot
For streaming sources, continuously reads changes starting from snapshot specified by "scan.snapshot-id", without producing a snapshot at the beginning.
from-snapshot-full
For streaming sources, produces from snapshot specified by "scan.snapshot-id" on the table upon first startup, and continuously reads changes.
default
It is equivalent to from-snapshot if "scan.snapshot-id" is specified. It is equivalent to from-timestamp if "timestamp-millis" is specified. Or, It is equivalent to latest-full.

A simple example with default scan mode:

// no any scan-related configs are provided, that will use latest-full scan mode.
val query = spark.readStream
  .format("paimon")
  // by table name
  .table("table_name") 
  // or by location
  // .load("/path/to/paimon/source/table")
  .writeStream
  .format("console")
  .start()

Paimon Structured Streaming also supports a variety of streaming read modes, it can support many triggers and many read limits.

These read limits are supported:

Key Default Type Description
read.stream.maxFilesPerTrigger
(none) Integer The maximum number of files returned in a single batch.
read.stream.maxBytesPerTrigger
(none) Long The maximum number of bytes returned in a single batch.
read.stream.maxRowsPerTrigger
(none) Long The maximum number of rows returned in a single batch.
read.stream.minRowsPerTrigger
(none) Long The minimum number of rows returned in a single batch, which used to create MinRowsReadLimit with read.stream.maxTriggerDelayMs together.
read.stream.maxTriggerDelayMs
(none) Long The maximum delay between two adjacent batches, which used to create MinRowsReadLimit with read.stream.minRowsPerTrigger together.

Example: One

Use org.apache.spark.sql.streaming.Trigger.AvailableNow() and maxBytesPerTrigger defined by paimon.

// Trigger.AvailableNow()) processes all available data at the start
// of the query in one or multiple batches, then terminates the query.
// That set read.stream.maxBytesPerTrigger to 128M means that each
// batch processes a maximum of 128 MB of data.
val query = spark.readStream
  .format("paimon")
  .option("read.stream.maxBytesPerTrigger", "134217728")
  .table("table_name")
  .writeStream
  .format("console")
  .trigger(Trigger.AvailableNow())
  .start()

Example: Two

Use org.apache.spark.sql.connector.read.streaming.ReadMinRows.

// It will not trigger a batch until there are more than 5,000 pieces of data,
// unless the interval between the two batches is more than 300 seconds.
val query = spark.readStream
  .format("paimon")
  .option("read.stream.minRowsPerTrigger", "5000")
  .option("read.stream.maxTriggerDelayMs", "300000")
  .table("table_name")
  .writeStream
  .format("console")
  .start()

Paimon Structured Streaming supports read row in the form of changelog (add rowkind column in row to represent its change type) in two ways:

  • Direct streaming read with the system audit_log table
  • Set read.changelog to true (default is false), then streaming read with table location

Example:

// Option 1
val query1 = spark.readStream
  .format("paimon")
  .table("`table_name$audit_log`")
  .writeStream
  .format("console")
  .start()

// Option 2
val query2 = spark.readStream
  .format("paimon")
  .option("read.changelog", "true")
  .table("table_name")
  .writeStream
  .format("console")
  .start()

/*
+I   1  Hi
+I   2  Hello
*/
Edit This Page
Copyright © 2025 The Apache Software Foundation. Apache Paimon, Paimon, and its feather logo are trademarks of The Apache Software Foundation.