Spark
This documentation is for an unreleased version of Apache Paimon. We recommend you use the latest stable version.

Spark3 #

This documentation is a guide for using Paimon in Spark3.

Preparation #

Paimon currently supports Spark 3.5, 3.4, 3.3, 3.2 and 3.1. We recommend the latest Spark version for a better experience.

Download the jar file with corresponding version.

Version Jar
Spark 3.5 paimon-spark-3.5-0.8-SNAPSHOT.jar
Spark 3.4 paimon-spark-3.4-0.8-SNAPSHOT.jar
Spark 3.3 paimon-spark-3.3-0.8-SNAPSHOT.jar
Spark 3.2 paimon-spark-3.2-0.8-SNAPSHOT.jar
Spark 3.1 paimon-spark-3.1-0.8-SNAPSHOT.jar

You can also manually build bundled jar from the source code.

To build from source code, clone the git repository.

Build bundled jar with the following command.

mvn clean install -DskipTests

For Spark 3.3, you can find the bundled jar in ./paimon-spark/paimon-spark-3.3/target/paimon-spark-3.3-0.8-SNAPSHOT.jar.

Setup #

If you are using HDFS, make sure that the environment variable HADOOP_HOME or HADOOP_CONF_DIR is set.

Step 1: Specify Paimon Jar File

Append path to paimon jar file to the --jars argument when starting spark-sql.

spark-sql ... --jars /path/to/paimon-spark-3.3-0.8-SNAPSHOT.jar

Alternatively, you can copy paimon-spark-3.3-0.8-SNAPSHOT.jar under spark/jars in your Spark installation directory.

Step 2: Specify Paimon Catalog

When starting spark-sql, use the following command to register Paimon’s Spark catalog with the name paimon. Table files of the warehouse is stored under /tmp/paimon.

spark-sql ... \
    --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
    --conf spark.sql.catalog.paimon.warehouse=file:/tmp/paimon \
    --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions

Catalogs are configured using properties under spark.sql.catalog.(catalog_name). In above case, ‘paimon’ is the catalog name, you can change it to your own favorite catalog name.

After spark-sql command line has started, run the following SQL to create and switch to database default.

USE paimon;
USE default;

After switching to the catalog ('USE paimon'), Spark’s existing tables will not be directly accessible, you can use the spark_catalog.${database_name}.${table_name} to access Spark tables.

When starting spark-sql, use the following command to register Paimon’s Spark Generic catalog to replace Spark default catalog spark_catalog. (default warehouse is Spark spark.sql.warehouse.dir)

Currently, it is only recommended to use SparkGenericCatalog in the case of Hive metastore, Paimon will infer Hive conf from Spark session, you just need to configure Spark’s Hive conf.

spark-sql ... \
    --conf spark.sql.catalog.spark_catalog=org.apache.paimon.spark.SparkGenericCatalog \
    --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions

Using SparkGenericCatalog, you can use Paimon tables in this Catalog or non-Paimon tables such as Spark’s csv, parquet, Hive tables, etc.

Create Table #

create table my_table (
    k int,
    v string
) tblproperties (
    'primary-key' = 'k'
);
create table my_table (
    k int,
    v string
) USING paimon
tblproperties (
    'primary-key' = 'k'
) ;

Insert Table #

Paimon currently supports Spark 3.2+ for SQL write.
INSERT INTO my_table VALUES (1, 'Hi'), (2, 'Hello');

Query Table #

SELECT * FROM my_table;

/*
1	Hi
2	Hello
*/
val dataset = spark.read.format("paimon").load("file:/tmp/paimon/default.db/my_table")
dataset.show()

/*
+---+------+
| k |     v|
+---+------+
|  1|    Hi|
|  2| Hello|
+---+------+
*/

Update Table #

Important table properties setting:

  1. Only primary key table supports this feature.
  2. MergeEngine needs to be deduplicate or partial-update to support this feature.
Warning: we do not support updating primary keys.
UPDATE my_table SET v = 'new_value' WHERE id = 1;

Merge Into Table #

Paimon currently supports Merge Into syntax in Spark 3+, which allow a set of updates, insertions and deletions based on a source table in a single commit.

  1. This only work with primary-key table.
  2. In update clause, to update primary key columns is not supported.
  3. WHEN NOT MATCHED BY SOURCE syntax is not supported.

Example: One

This is a simple demo that, if a row exists in the target table update it, else insert it.


-- Here both source and target tables have the same schema: (a INT, b INT, c STRING), and a is a primary key.

MERGE INTO target
USING source
ON target.a = source.a
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *

Example: Two

This is a demo with multiple, conditional clauses.


-- Here both source and target tables have the same schema: (a INT, b INT, c STRING), and a is a primary key.

MERGE INTO target
USING source
ON target.a = source.a
WHEN MATCHED AND target.a = 5 THEN
   UPDATE SET b = source.b + target.b      -- when matched and meet the condition 1, then update b;
WHEN MATCHED AND source.c > 'c2' THEN
   UPDATE SET *    -- when matched and meet the condition 2, then update all the columns;
WHEN MATCHED THEN
   DELETE      -- when matched, delete this row in target table;
WHEN NOT MATCHED AND c > 'c9' THEN
   INSERT (a, b, c) VALUES (a, b * 1.1, c)      -- when not matched but meet the condition 3, then transform and insert this row;
WHEN NOT MATCHED THEN
INSERT *      -- when not matched, insert this row without any transformation;

Streaming Write #

Paimon currently supports Spark 3+ for streaming write.

Paimon Structured Streaming only supports the two append and complete modes.

// Create a paimon table if not exists.
spark.sql(s"""
           |CREATE TABLE T (k INT, v STRING)
           |TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
           |""".stripMargin)

// Here we use MemoryStream to fake a streaming source.
val inputData = MemoryStream[(Int, String)]
val df = inputData.toDS().toDF("k", "v")

// Streaming Write to paimon table.
val stream = df
  .writeStream
  .outputMode("append")
  .option("checkpointLocation", "/path/to/checkpoint")
  .format("paimon")
  .start("/path/to/paimon/sink/table")

Streaming Read #

Paimon currently supports Spark 3.3+ for streaming read.

Paimon supports rich scan mode for streaming read. There is a list:

Scan Mode Description
latest
For streaming sources, continuously reads latest changes without producing a snapshot at the beginning.
latest-full
For streaming sources, produces the latest snapshot on the table upon first startup, and continue to read the latest changes.
from-timestamp
For streaming sources, continuously reads changes starting from timestamp specified by "scan.timestamp-millis", without producing a snapshot at the beginning.
from-snapshot
For streaming sources, continuously reads changes starting from snapshot specified by "scan.snapshot-id", without producing a snapshot at the beginning.
from-snapshot-full
For streaming sources, produces from snapshot specified by "scan.snapshot-id" on the table upon first startup, and continuously reads changes.
default
It is equivalent to from-snapshot if "scan.snapshot-id" is specified. It is equivalent to from-timestamp if "timestamp-millis" is specified. Or, It is equivalent to latest-full.

A simple example with default scan mode:

// no any scan-related configs are provided, that will use latest-full scan mode.
val query = spark.readStream
  .format("paimon")
  .load("/path/to/paimon/source/table")
  .writeStream
  .format("console")
  .start()

Paimon Structured Streaming also supports a variety of streaming read modes, it can support many triggers and many read limits.

These read limits are supported:

Key Default Type Description
read.stream.maxFilesPerTrigger
(none) Integer The maximum number of files returned in a single batch.
read.stream.maxBytesPerTrigger
(none) Long The maximum number of bytes returned in a single batch.
read.stream.maxRowsPerTrigger
(none) Long The maximum number of rows returned in a single batch.
read.stream.minRowsPerTrigger
(none) Long The minimum number of rows returned in a single batch, which used to create MinRowsReadLimit with read.stream.maxTriggerDelayMs together.
read.stream.maxTriggerDelayMs
(none) Long The maximum delay between two adjacent batches, which used to create MinRowsReadLimit with read.stream.minRowsPerTrigger together.

Example: One

Use org.apache.spark.sql.streaming.Trigger.AvailableNow() and maxBytesPerTrigger defined by paimon.

// Trigger.AvailableNow()) processes all available data at the start
// of the query in one or multiple batches, then terminates the query.
// That set read.stream.maxBytesPerTrigger to 128M means that each
// batch processes a maximum of 128 MB of data.
val query = spark.readStream
  .format("paimon")
  .option("read.stream.maxBytesPerTrigger", "134217728")
  .load("/path/to/paimon/source/table")
  .writeStream
  .format("console")
  .trigger(Trigger.AvailableNow())
  .start()

Example: Two

Use org.apache.spark.sql.connector.read.streaming.ReadMinRows.

// It will not trigger a batch until there are more than 5,000 pieces of data,
// unless the interval between the two batches is more than 300 seconds.
val query = spark.readStream
  .format("paimon")
  .option("read.stream.minRowsPerTrigger", "5000")
  .option("read.stream.maxTriggerDelayMs", "300000")
  .load("/path/to/paimon/source/table")
  .writeStream
  .format("console")
  .start()

Paimon Structured Streaming supports read row in the form of changelog (add rowkind column in row to represent its change type) in two ways:

  • Direct streaming read with the system audit_log table
  • Set read.changelog to true (default is false), then streaming read with table location

Example:

// Option 1
val query1 = spark.readStream
  .format("paimon")
  .table("`table_name$audit_log`")
  .writeStream
  .format("console")
  .start()

// Option 2
val query2 = spark.readStream
  .format("paimon")
  .option("read.changelog", "true")
  .load("/path/to/paimon/source/table")
  .writeStream
  .format("console")
  .start()

/*
+I   1  Hi
+I   2  Hello
*/

Schema Evolution #

Schema evolution is a feature that allows users to easily modify the current schema of a table to adapt to existing data, or new data that changes over time, while maintaining data integrity and consistency.

Paimon supports automatic schema merging of source data and current table data while data is being written, and uses the merged schema as the latest schema of the table, and it only requires configuring write.merge-schema.

data.write
  .format("paimon")
  .mode("append")
  .option("write.merge-schema", "true")
  .save(location)

When enable write.merge-schema, Paimon can allow users to perform the following actions on table schema by default:

  • Adding columns
  • Up-casting the type of column(e.g. Int -> Long)

Paimon also supports explicit type conversions between certain types (e.g. String -> Date, Long -> Int), it requires an explicit configuration write.merge-schema.explicit-cast.

Schema evolution can be used in streaming mode at the same time.

val inputData = MemoryStream[(Int, String)]
inputData
  .toDS()
  .toDF("col1", "col2")
  .writeStream
  .format("paimon")
  .option("checkpointLocation", "/path/to/checkpoint")
  .option("write.merge-schema", "true")
  .option("write.merge-schema.explicit-cast", "true")
  .start(location)

Here list the configurations.

Scan Mode Description
write.merge-schema
If true, merge the data schema and the table schema automatically before write data.
write.merge-schema.explicit-cast
If true, allow to merge data types if the two types meet the rules for explicit casting.

Spark Procedure #

This section introduce all available spark procedures about paimon. s

Procedure Name Explaination Example
compact identifier: the target table identifier. Cannot be empty.

partitions: partition filter. Left empty for all partitions.
"," means "AND"
";" means "OR"


order_strategy: 'order' or 'zorder' or 'hilbert' or 'none'. Left empty for 'none'.

order_columns: the columns need to be sort. Left empty if 'order_strategy' is 'none'.

If you want sort compact two partitions date=01 and date=02, you need to write 'date=01;date=02'

If you want sort one partition with date=01 and day=01, you need to write 'date=01,day=01'
SET spark.sql.shuffle.partitions=10; --set the compact parallelism
CALL sys.compact(table => 'T', partitions => 'p=0', order_strategy => 'zorder', order_by => 'a,b')
expire_snapshots To expire snapshots. Argument:
  • table: the target table identifier. Cannot be empty.
  • retain_max: the maximum number of completed snapshots to retain.
  • retain_min: the minimum number of completed snapshots to retain.
  • older_than: timestamp before which snapshots will be removed.
  • max_deletes: the maximum number of snapshots that can be deleted at once.
  • CALL sys.expire_snapshots(table => 'default.T', retain_max => 10)

    Spark Type Conversion #

    This section lists all supported type conversion between Spark and Paimon. All Spark’s data types are available in package org.apache.spark.sql.types.

    Spark Data Type Paimon Data Type Atomic Type
    StructType RowType false
    MapType MapType false
    ArrayType ArrayType false
    BooleanType BooleanType true
    ByteType TinyIntType true
    ShortType SmallIntType true
    IntegerType IntType true
    LongType BigIntType true
    FloatType FloatType true
    DoubleType DoubleType true
    StringType VarCharType, CharType true
    DateType DateType true
    TimestampType TimestampType, LocalZonedTimestamp true
    DecimalType(precision, scale) DecimalType(precision, scale) true
    BinaryType VarBinaryType, BinaryType true
    • Currently, Spark’s field comment cannot be described under Flink CLI.
    • Conversion between Spark’s UserDefinedType and Paimon’s UserDefinedType is not supported.

    Spark 2 #

    Paimon supports Spark 2.4+. We highly recommend using versions above Spark3, as Spark2 only provides reading capabilities.

    Download paimon-spark-2-0.8-SNAPSHOT.jar.
    If you are using HDFS, make sure that the environment variable HADOOP_HOME or HADOOP_CONF_DIR is set.

    Step 1: Prepare Test Data

    Paimon currently only supports reading tables through Spark2. To create a Paimon table with records, please follow our Flink quick start guide.

    After the guide, all table files should be stored under the path /tmp/paimon, or the warehouse path you’ve specified.

    Step 2: Specify Paimon Jar File

    You can append path to paimon jar file to the --jars argument when starting spark-shell.

    spark-shell ... --jars /path/to/paimon-spark-2-0.8-SNAPSHOT.jar
    

    Alternatively, you can copy paimon-spark-2-0.8-SNAPSHOT.jar under spark/jars in your Spark installation directory.

    Step 3: Query Table

    Paimon with Spark 2.4 does not support DDL. You can use the Dataset reader and register the Dataset as a temporary table. In spark shell:

    val dataset = spark.read.format("paimon").load("file:/tmp/paimon/default.db/word_count")
    dataset.createOrReplaceTempView("word_count")
    spark.sql("SELECT * FROM word_count").show()