Spark3

Spark3 #

This documentation is a guide for using Paimon in Spark3.

Preparing Paimon Jar File #

Paimon currently supports Spark 3.3, 3.2 and 3.1. We recommend the latest Spark version for a better experience.

Download the jar file with corresponding version.

Version Jar
Spark 3.3 paimon-spark-3.3-0.4.0-incubating.jar
Spark 3.2 paimon-spark-3.2-0.4.0-incubating.jar
Spark 3.1 paimon-spark-3.1-0.4.0-incubating.jar

You can also manually build bundled jar from the source code.

To build from source code, clone the git repository.

Build bundled jar with the following command.

mvn clean install -DskipTests

For Spark 3.3, you can find the bundled jar in ./paimon-spark/paimon-spark-3.3/target/paimon-spark-3.3-0.4.0-incubating.jar.

Quick Start #

If you are using HDFS, make sure that the environment variable HADOOP_HOME or HADOOP_CONF_DIR is set.

Step 1: Specify Paimon Jar File

Append path to paimon jar file to the --jars argument when starting spark-sql.

spark-sql ... --jars /path/to/paimon-spark-3.3-0.4.0-incubating.jar

Alternatively, you can copy paimon-spark-3.3-0.4.0-incubating.jar under spark/jars in your Spark installation directory.

Step 2: Specify Paimon Catalog

When starting spark-sql, use the following command to register Paimon’s Spark catalog with the name paimon. Table files of the warehouse is stored under /tmp/paimon.

spark-sql ... \
    --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
    --conf spark.sql.catalog.paimon.warehouse=file:/tmp/paimon

After spark-sql command line has started, run the following SQL to create and switch to database paimon.default.

CREATE DATABASE paimon.default;
USE paimon.default;

Step 3: Create a table and Write Some Records

create table my_table (
    k int,
    v string
) tblproperties (
    'primary-key' = 'k'
);

INSERT INTO my_table VALUES (1, 'Hi'), (2, 'Hello');

Step 4: Query Table with SQL

SELECT * FROM my_table;
/*
1	Hi
2	Hello
*/

Step 5: Update the Records

INSERT INTO my_table VALUES (1, 'Hi Again'), (3, 'Test');

SELECT * FROM my_table;
/*
1	Hi Again
2	Hello
3	Test
*/

Step 6: Query Table with Scala API

If you don’t want to use Paimon catalog, you can also run spark-shell and query the table with Scala API.

spark-shell ... --jars /path/to/paimon-spark-3.3-0.4.0-incubating.jar
val dataset = spark.read.format("paimon").load("file:/tmp/paimon/default.db/my_table")
dataset.createOrReplaceTempView("my_table")
spark.sql("SELECT * FROM my_table").show()

Spark Type Conversion #

This section lists all supported type conversion between Spark and Flink. All Spark’s data types are available in package org.apache.spark.sql.types.

Spark Data Type Flink Data Type Atomic Type
StructType RowType false
MapType MapType false
ArrayType ArrayType false
BooleanType BooleanType true
ByteType TinyIntType true
ShortType SmallIntType true
IntegerType IntType true
LongType BigIntType true
FloatType FloatType true
DoubleType DoubleType true
StringType VarCharType, CharType true
DateType DateType true
TimestampType TimestampType, LocalZonedTimestamp true
DecimalType(precision, scale) DecimalType(precision, scale) true
BinaryType VarBinaryType, BinaryType true
  • Currently, Spark’s field comment cannot be described under Flink CLI.
  • Conversion between Spark’s UserDefinedType and Flink’s UserDefinedType is not supported.