SQL Write #
Syntax #
INSERT { INTO | OVERWRITE } table_identifier [ part_spec ] [ column_list ] { value_expr | query };
For more information, please check the syntax document:
INSERT INTO #
Use INSERT INTO to apply records and changes to tables.
INSERT INTO my_table SELECT ...
Overwriting the Whole Table #
Use INSERT OVERWRITE to overwrite the whole unpartitioned table.
INSERT OVERWRITE my_table SELECT ...
Overwriting a Partition #
Use INSERT OVERWRITE to overwrite a partition.
INSERT OVERWRITE my_table PARTITION (key1 = value1, key2 = value2, ...) SELECT ...
Dynamic Overwrite #
Spark’s default overwrite mode is static partition overwrite. To enable dynamic overwritten needs these configs below:
--conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
Note : If spark.sql.sources.partitionOverwriteMode is set to dynamic by default in Spark,
in order to ensure that the insert overwrite function of the Paimon table can be used normally,
spark.sql.extensions should be set to org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions.
-- MyTable is a Partitioned Table
-- Static overwrite (Overwrite whole table)
INSERT OVERWRITE my_table SELECT ...
-- Dynamic overwrite
                             SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE my_table SELECT ...
Truncate tables #
TRUNCATE TABLE my_table;
Updating tables #
To enable update needs these configs below:
--conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
spark supports update PrimitiveType and StructType, for example:
-- Syntax
UPDATE table_identifier SET column1 = value1, column2 = value2, ... WHERE condition;
CREATE TABLE t (
  id INT, 
  s STRUCT<c1: INT, c2: STRING>, 
  name STRING)
TBLPROPERTIES (
  'primary-key' = 'id', 
  'merge-engine' = 'deduplicate'
);
-- you can use
UPDATE t SET name = 'a_new' WHERE id = 1;
UPDATE t SET s.c2 = 'a_new' WHERE s.c1 = 1;
Deleting from table #
To enable delete needs these configs below:
--conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
DELETE FROM my_table WHERE currency = 'UNKNOWN';
Merging into table #
Paimon currently supports Merge Into syntax in Spark 3+, which allow a set of updates, insertions and deletions based on a source table in a single commit.
- This only work with primary-key table.
- In update clause, to update primary key columns is not supported.
WHEN NOT MATCHED BY SOURCEsyntax is not supported.
Example: One
This is a simple demo that, if a row exists in the target table update it, else insert it.
-- Here both source and target tables have the same schema: (a INT, b INT, c STRING), and a is a primary key.
MERGE INTO target
USING source
ON target.a = source.a
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
Example: Two
This is a demo with multiple, conditional clauses.
-- Here both source and target tables have the same schema: (a INT, b INT, c STRING), and a is a primary key.
MERGE INTO target
USING source
ON target.a = source.a
WHEN MATCHED AND target.a = 5 THEN
   UPDATE SET b = source.b + target.b      -- when matched and meet the condition 1, then update b;
WHEN MATCHED AND source.c > 'c2' THEN
   UPDATE SET *    -- when matched and meet the condition 2, then update all the columns;
WHEN MATCHED THEN
   DELETE      -- when matched, delete this row in target table;
WHEN NOT MATCHED AND c > 'c9' THEN
   INSERT (a, b, c) VALUES (a, b * 1.1, c)      -- when not matched but meet the condition 3, then transform and insert this row;
WHEN NOT MATCHED THEN
INSERT *      -- when not matched, insert this row without any transformation;
Streaming Write #
Paimon currently supports Spark 3+ for streaming write.
Paimon Structured Streaming only supports the two
appendandcompletemodes.
// Create a paimon table if not exists.
spark.sql(s"""
           |CREATE TABLE T (k INT, v STRING)
           |TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
           |""".stripMargin)
// Here we use MemoryStream to fake a streaming source.
val inputData = MemoryStream[(Int, String)]
val df = inputData.toDS().toDF("k", "v")
// Streaming Write to paimon table.
val stream = df
  .writeStream
  .outputMode("append")
  .option("checkpointLocation", "/path/to/checkpoint")
  .format("paimon")
  .start("/path/to/paimon/sink/table")
Schema Evolution #
Schema evolution is a feature that allows users to easily modify the current schema of a table to adapt to existing data, or new data that changes over time, while maintaining data integrity and consistency.
Paimon supports automatic schema merging of source data and current table data while data is being written, and uses the merged schema as the latest schema of the table, and it only requires configuring write.merge-schema.
data.write
  .format("paimon")
  .mode("append")
  .option("write.merge-schema", "true")
  .save(location)
When enable write.merge-schema, Paimon can allow users to perform the following actions on table schema by default:
- Adding columns
- Up-casting the type of column(e.g. Int -> Long)
Paimon also supports explicit type conversions between certain types (e.g. String -> Date, Long -> Int), it requires an explicit configuration write.merge-schema.explicit-cast.
Schema evolution can be used in streaming mode at the same time.
val inputData = MemoryStream[(Int, String)]
inputData
  .toDS()
  .toDF("col1", "col2")
  .writeStream
  .format("paimon")
  .option("checkpointLocation", "/path/to/checkpoint")
  .option("write.merge-schema", "true")
  .option("write.merge-schema.explicit-cast", "true")
  .start(location)
Here list the configurations.
| Scan Mode | Description | 
|---|---|
| write.merge-schema | If true, merge the data schema and the table schema automatically before write data. | 
| write.merge-schema.explicit-cast | If true, allow to merge data types if the two types meet the rules for explicit casting. |