Skip to main content

DataFrame

Paimon supports creating table, inserting data, and querying through the Spark DataFrame API.

Create Table

You can specify table properties with option or set partition columns with partitionBy if needed.

val data: DataFrame = Seq((1, "x1", "p1"), (2, "x2", "p2")).toDF("a", "b", "pt")

data.write.format("paimon")
.option("primary-key", "a,pt")
.option("k1", "v1")
.partitionBy("pt")
.saveAsTable("test_tbl") // or .save("/path/to/default.db/test_tbl")

Insert

Insert Into

You can achieve INSERT INTO semantics by setting the mode to append.

val data: DataFrame = ...

data.write.format("paimon")
.mode("append")
.insertInto("test_tbl") // or .saveAsTable("test_tbl") or .save("/path/to/default.db/test_tbl")

Note: insertInto ignores the column names and just uses position-based write, if you need to write by column name, use saveAsTable or save instead.

Insert Overwrite

You can achieve INSERT OVERWRITE semantics by setting the mode to overwrite.

It supports dynamic partition overwritten for partitioned table. To enable dynamic overwritten you need to set the Spark session configuration spark.sql.sources.partitionOverwriteMode to dynamic.

val data: DataFrame = ...

data.write.format("paimon")
.mode("overwrite")
.insertInto("test_tbl") // or .saveAsTable("test_tbl")

{{< hint info >}} Since Spark 3.4, saveAsTable with overwrite mode only overwrites data and preserves the existing table definition (partitions, primary keys, and properties). If you need to replace the table definition, use SQL CREATE OR REPLACE TABLE ... AS SELECT.

Before Spark 3.4, saveAsTable with overwrite mode drops and recreates the table, so the table definition is reset to the DataFrame's schema and only the partitions and options explicitly re-specified via partitionBy() / write options are kept. {{< /hint >}}

Query

spark.read.format("paimon")
.table("t") // or .load("/path/to/default.db/test_tbl")
.show()

To specify the catalog or database, you can use

// recommend
spark.read.format("paimon")
.table("<catalogName>.<databaseName>.<tableName>")

// or
spark.read.format("paimon")
.option("catalog", "<catalogName>")
.option("database", "<databaseName>")
.option("table", "<tableName>")
.load("/path/to/default.db/test_tbl")

You can specify other read configs through option:

// time travel
spark.read.format("paimon")
.option("scan.snapshot-id", 1)
.table("t")