Skip to main content

SQL Write

Insert Table

The INSERT statement inserts new rows into a table or overwrites the existing data in the table. The inserted rows can be specified by value expressions or result from a query.

Syntax

INSERT { INTO | OVERWRITE } table_identifier [ part_spec ] [ column_list ] { value_expr | query };

Parameters

  • table_identifier: Specifies a table name, which may be optionally qualified with a database name.

  • part_spec: An optional parameter that specifies a comma-separated list of key and value pairs for partitions.

  • column_list: An optional parameter that specifies a comma-separated list of columns belonging to the table_identifier table. Spark will reorder the columns of the input query to match the table schema according to the specified column list.

    Note: Since Spark 3.4, INSERT INTO commands with explicit column lists comprising fewer columns than the target table will automatically add the corresponding default values for the remaining columns (or NULL for any column lacking an explicitly-assigned default value). In Spark 3.3 or earlier, column_list's size must be equal to the target table's column size, otherwise these commands would have failed.

  • value_expr ( { value | NULL } [ , … ] ) [ , ( … ) ]: Specifies the values to be inserted. Either an explicitly specified value or a NULL can be inserted. A comma must be used to separate each value in the clause. More than one set of values can be specified to insert multiple rows.

For more information, please check the syntax document: Spark INSERT Statement

Insert Into

Use INSERT INTO to apply records and changes to tables.

INSERT INTO my_table SELECT ...

Insert Overwrite

Use INSERT OVERWRITE to overwrite the whole table.

INSERT OVERWRITE my_table SELECT ...

Insert Overwrite Partition

Use INSERT OVERWRITE to overwrite a partition.

INSERT OVERWRITE my_table PARTITION (key1 = value1, key2 = value2, ...) SELECT ...

Dynamic Overwrite Partition

Spark's default overwrite mode is static partition overwrite. To enable dynamic overwritten you need to set the Spark session configuration spark.sql.sources.partitionOverwriteMode to dynamic

For example:

CREATE TABLE my_table (id INT, pt STRING) PARTITIONED BY (pt);
INSERT INTO my_table VALUES (1, 'p1'), (2, 'p2');

-- Static overwrite (Overwrite the whole table)
INSERT OVERWRITE my_table VALUES (3, 'p1');
-- or
INSERT OVERWRITE my_table PARTITION (pt) VALUES (3, 'p1');

SELECT * FROM my_table;
/*
+---+---+
| id| pt|
+---+---+
| 3| p1|
+---+---+
*/

-- Static overwrite with specified partitions (Only overwrite pt='p1')
INSERT OVERWRITE my_table PARTITION (pt='p1') VALUES (3);

SELECT * FROM my_table;
/*
+---+---+
| id| pt|
+---+---+
| 2| p2|
| 3| p1|
+---+---+
*/

-- Dynamic overwrite (Only overwrite pt='p1')
SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE my_table VALUES (3, 'p1');

SELECT * FROM my_table;
/*
+---+---+
| id| pt|
+---+---+
| 2| p2|
| 3| p1|
+---+---+
*/

Truncate Table

The TRUNCATE TABLE statement removes all the rows from a table or partition(s).

TRUNCATE TABLE my_table;

Update Table

Updates the column values for the rows that match a predicate. When no predicate is provided, update the column values for all rows.

Note:

info

Update primary key columns is not supported when the target table is a primary key table.

Spark supports update PrimitiveType and StructType, for example:

-- Syntax
UPDATE table_identifier SET column1 = value1, column2 = value2, ... WHERE condition;

CREATE TABLE t (
id INT,
s STRUCT<c1: INT, c2: STRING>,
name STRING)
TBLPROPERTIES (
'primary-key' = 'id',
'merge-engine' = 'deduplicate'
);

-- you can use
UPDATE t SET name = 'a_new' WHERE id = 1;
UPDATE t SET s.c2 = 'a_new' WHERE s.c1 = 1;

Delete From Table

Deletes the rows that match a predicate. When no predicate is provided, deletes all rows.

DELETE FROM my_table WHERE id = 1;

Merge Into Table

Merges a set of updates, insertions and deletions based on a source table into a target table.

info

Updating primary key columns is not supported when the target table is a primary key table.

Syntax

MERGE INTO target
USING source
ON <merge condition>
WHEN MATCHED [AND <condition>] THEN { UPDATE SET ... | DELETE }
WHEN NOT MATCHED [AND <condition>] THEN INSERT ...

Each WHEN clause can be repeated; clauses are evaluated in order, and the first matching one wins for a given row.

Examples

The examples below assume both source and target have schema (a INT, b INT, c STRING), with a as the primary key.

Simple upsert — update existing rows, insert new ones:

MERGE INTO target
USING source
ON target.a = source.a
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *

Multiple conditional clauses:

MERGE INTO target
USING source
ON target.a = source.a
WHEN MATCHED AND target.a = 5 THEN UPDATE SET b = source.b + target.b
WHEN MATCHED AND source.c > 'c2' THEN UPDATE SET *
WHEN MATCHED THEN DELETE
WHEN NOT MATCHED AND c > 'c9' THEN INSERT (a, b, c) VALUES (a, b * 1.1, c)
WHEN NOT MATCHED THEN INSERT *

Column Alignment

Assignments are aligned to the target table by column name.

  • Explicit clauses (UPDATE SET col = expr / INSERT (col list) VALUES ...) — only the mentioned columns are written. Unmentioned target columns preserve their current value for UPDATE, or get NULL / CURRENT_DEFAULT for INSERT.
  • Star clauses (UPDATE SET * / INSERT *) — * expands against the target columns. When source and target columns don't match exactly, the behavior depends on spark.paimon.write.merge-schema; see Column Alignment by Write Path under Write Merge Schema for the full table covering both MERGE INTO * and byName INSERT paths.

Write Merge Schema

When write.merge-schema is enabled, Paimon automatically evolves the table schema during write to accommodate new columns in the incoming data, while preserving data integrity.

info

Since the table schema may be updated during writing, catalog caching needs to be disabled to use this feature. Configure spark.sql.catalog.<catalogName>.cache-enabled to false.

How It Evolves the Schema

Three options control how aggressively the schema evolves; each only takes effect when the previous one is enabled:

Option Description
write.merge-schema
If true, evolve the table schema to accept new columns from the incoming data. Existing column types are preserved and incoming values are cast to them; to also widen existing types, enable write.merge-schema.type-widening.
write.merge-schema.type-widening
Only effective when write.merge-schema is true. If true, widen an existing column type when the incoming data has a wider compatible type (e.g. INT -> BIGINT, DECIMAL precision increase). Lossy changes are still rejected unless write.merge-schema.explicit-cast is also true.
write.merge-schema.explicit-cast
Only effective when write.merge-schema.type-widening is true. If true, also allow lossy type changes between compatible types (e.g. BIGINT -> INT, STRING -> DATE).

Examples

DataFrame batch write:

data.write
.format("paimon")
.mode("append")
.option("write.merge-schema", "true")
.saveAsTable("t")

Spark SQL (requires Spark 3.5+ for BY NAME):

SET `spark.paimon.write.merge-schema` = true;

CREATE TABLE t (a INT, b STRING);
INSERT INTO t VALUES (1, '1'), (2, '2');

INSERT INTO t BY NAME SELECT 3 AS a, '3' AS b, 3 AS c;

Streaming write:

val inputData = MemoryStream[(Int, String)]
inputData
.toDS()
.toDF("col1", "col2")
.writeStream
.format("paimon")
.option("checkpointLocation", "/path/to/checkpoint")
.option("write.merge-schema", "true")
.toTable("t")

Column Alignment by Write Path

When the source schema doesn't match the target schema exactly, the behavior depends on both write.merge-schema and the write path. For nested struct fields, all byName paths behave the same; at the top level, MERGE INTO * differs from regular byName INSERT because * expansion only references target columns.

Write pathScenariomerge-schema=false (default)merge-schema=true
byName INSERT (INSERT INTO ... BY NAME / saveAsTable / writeTo)Top-level source-extra columnsThrowsEvolved into the target schema
Top-level target columns missing from sourceNULL-filledNULL-filled
Nested struct source-extra fieldsThrowsEvolved into the target schema
Nested struct target-missing fieldsThrowsNULL-filled
MERGE INTO * (UPDATE * / INSERT *)Top-level source-extra columnsSilently dropped (* only covers target columns)Evolved into the target schema
Top-level target columns missing from sourceThrowsUPDATE * preserves current value; INSERT * fills CURRENT_DEFAULT (or NULL when no default)
Nested struct source-extra fieldsThrowsEvolved into the target schema
Nested struct target-missing fieldsThrowsUPDATE * preserves current value; INSERT * fills CURRENT_DEFAULT (or NULL when no default)

Notes:

  • Position-based writes (e.g. INSERT INTO t VALUES (...) without BY NAME) require an exact column count match and don't engage schema evolution; only byName writes are covered above.
  • Top-level target-missing under merge-schema=false for byName INSERT mirrors Spark's INSERT FILL semantics — only nested missing fields throw.
  • Under strict mode (merge-schema=false), nested source-extra fields throw to avoid silent data loss; for MERGE INTO * at the top level, source-extras are silently dropped because * never references them.

COPY INTO

COPY INTO provides a SQL command for bulk loading data files into Paimon tables and exporting table data to files. Supported formats: CSV, JSON, and Parquet.

info

SQL dialect: Paimon's COPY INTO is a Snowflake-style extension (FILE_FORMAT = (TYPE = ...), PATTERN, FORCE, ON_ERROR), not the Databricks COPY INTO form (FILEFORMAT + FORMAT_OPTIONS (...) / COPY_OPTIONS (...)). It implements only a subset of the Snowflake syntax. In particular, ON_ERROR supports ABORT_STATEMENT (default), CONTINUE, and SKIP_FILE; the Snowflake variants SKIP_FILE_<num> and SKIP_FILE_<num>% are not supported.

CSV Import

COPY INTO table_name [(col1, col2, ...)]
FROM 'source_path'
FILE_FORMAT = (TYPE = CSV [, option = value, ...])
[PATTERN = 'regex']
[FORCE = TRUE|FALSE]
[ON_ERROR = { ABORT_STATEMENT | CONTINUE | SKIP_FILE }]

Basic import:

COPY INTO my_db.my_table
FROM '/data/csv_files/'
FILE_FORMAT = (TYPE = CSV);

Import with explicit column mapping:

-- Only load into specified columns; omitted columns use their DEFAULT value or NULL
COPY INTO my_db.users (id, name)
FROM '/data/new_users/'
FILE_FORMAT = (TYPE = CSV, SKIP_HEADER = 1);

Import with NULL_IF and PATTERN:

COPY INTO my_db.events
FROM '/data/logs/'
FILE_FORMAT = (TYPE = CSV, FIELD_DELIMITER = '|', NULL_IF = ('NULL', '\\N', ''))
PATTERN = '.*\.csv'
FORCE = FALSE;

JSON Import

COPY INTO table_name [(col1, col2, ...)]
FROM 'source_path'
FILE_FORMAT = (TYPE = JSON [, option = value, ...])
[PATTERN = 'regex']
[FORCE = TRUE|FALSE]
[ON_ERROR = { ABORT_STATEMENT | CONTINUE | SKIP_FILE }]

Basic import:

COPY INTO my_db.my_table
FROM '/data/json_files/'
FILE_FORMAT = (TYPE = JSON);

Import multi-line JSON array:

COPY INTO my_db.events
FROM '/data/events/'
FILE_FORMAT = (TYPE = JSON, MULTI_LINE = TRUE);

JSON columns are matched by column name (not by position), so source field order does not matter.

Parquet Import

COPY INTO table_name [(col1, col2, ...)]
FROM 'source_path'
FILE_FORMAT = (TYPE = PARQUET [, option = value, ...])
[PATTERN = 'regex']
[FORCE = TRUE|FALSE]
[ON_ERROR = { ABORT_STATEMENT | CONTINUE | SKIP_FILE }]

Basic import:

COPY INTO my_db.my_table
FROM '/data/parquet_files/'
FILE_FORMAT = (TYPE = PARQUET);

Import with PATTERN:

COPY INTO my_db.events
FROM '/data/lake/'
FILE_FORMAT = (TYPE = PARQUET)
PATTERN = '.*\.parquet'
FORCE = FALSE;

Parquet columns are matched by column name (not by position). Extra columns in the source files are ignored; missing columns become NULL.

Write CSV Files

COPY INTO 'target_path'
FROM { table_name | (SELECT ...) }
FILE_FORMAT = (TYPE = CSV [, option = value, ...])
[OVERWRITE = TRUE|FALSE]

Write with header and overwrite:

COPY INTO '/export/users_backup/'
FROM my_db.users
FILE_FORMAT = (TYPE = CSV, HEADER = TRUE, FIELD_DELIMITER = ',')
OVERWRITE = TRUE;

Write from query:

COPY INTO '/export/active_users/'
FROM (SELECT id, name FROM my_db.users WHERE active = TRUE)
FILE_FORMAT = (TYPE = CSV, HEADER = TRUE);

Write JSON Files

COPY INTO 'target_path'
FROM { table_name | (SELECT ...) }
FILE_FORMAT = (TYPE = JSON [, option = value, ...])
[OVERWRITE = TRUE|FALSE]

Basic JSON export:

COPY INTO '/export/events_backup/'
FROM my_db.events
FILE_FORMAT = (TYPE = JSON)
OVERWRITE = TRUE;

JSON export from query:

COPY INTO '/export/recent_events/'
FROM (SELECT * FROM my_db.events WHERE event_date > '2024-01-01')
FILE_FORMAT = (TYPE = JSON);

Write Parquet Files

COPY INTO 'target_path'
FROM { table_name | (SELECT ...) }
FILE_FORMAT = (TYPE = PARQUET [, option = value, ...])
[OVERWRITE = TRUE|FALSE]

Basic Parquet export:

COPY INTO '/export/data_backup/'
FROM my_db.events
FILE_FORMAT = (TYPE = PARQUET)
OVERWRITE = TRUE;

Export with compression:

COPY INTO '/export/data_compressed/'
FROM my_db.events
FILE_FORMAT = (TYPE = PARQUET, COMPRESSION = GZIP)
OVERWRITE = TRUE;

Parquet export from aggregation query:

COPY INTO '/export/summary/'
FROM (SELECT dept, COUNT(*) AS cnt FROM my_db.employees GROUP BY dept)
FILE_FORMAT = (TYPE = PARQUET);

FILE_FORMAT Options

FILE_FORMAT is required and must include TYPE = CSV, TYPE = JSON, or TYPE = PARQUET.

CSV import options:

OptionDescriptionDefault
TYPEFile format type. CSV, JSON, or PARQUET.(required)
FIELD_DELIMITERColumn delimiter character.,
SKIP_HEADERSkip the first line as header. Only 0 or 1.0
QUOTEQuote character for enclosing fields."
ESCAPEEscape character within quoted fields.\
NULL_IFList of string values to interpret as NULL, e.g. ('NULL', '\\N').(none)
EMPTY_FIELD_AS_NULLTreat empty fields as NULL. TRUE or FALSE.FALSE
COMPRESSIONCompression codec (e.g. GZIP).NONE

JSON import options:

OptionDescriptionDefault
TYPEFile format type. CSV, JSON, or PARQUET.(required)
MULTI_LINEParse multi-line JSON (e.g. JSON arrays or pretty-printed objects).FALSE
NULL_IFList of string values to interpret as NULL.(none)
EMPTY_FIELD_AS_NULLTreat empty string values as NULL.FALSE
COMPRESSIONCompression codec (e.g. GZIP).NONE

Parquet import options:

OptionDescriptionDefault
TYPEFile format type. CSV, JSON, or PARQUET.(required)
COMPRESSIONCompression codec. Usually auto-detected; rarely needed for import.(auto)

CSV write options:

OptionDescriptionDefault
TYPEFile format type. CSV, JSON, or PARQUET.(required)
FIELD_DELIMITERColumn delimiter character.,
HEADERWrite column names as the first line. TRUE or FALSE.FALSE
QUOTEQuote character for enclosing fields."
ESCAPEEscape character within quoted fields.\
COMPRESSIONCompression codec (e.g. GZIP).NONE

JSON write options:

OptionDescriptionDefault
TYPEFile format type. CSV, JSON, or PARQUET.(required)
DATE_FORMATCustom date format pattern.Spark default
TIMESTAMP_FORMATCustom timestamp format pattern.Spark default
COMPRESSIONCompression codec (e.g. GZIP).NONE

Parquet write options:

OptionDescriptionDefault
TYPEFile format type. CSV, JSON, or PARQUET.(required)
COMPRESSIONCompression codec (SNAPPY, GZIP, NONE, etc.).SNAPPY

Import Options

OptionDescriptionDefault
PATTERNRegex to filter source files by base file name. Only matching files are loaded.(all files)
FORCEFALSE: skip files already loaded (idempotent). TRUE: reload all files.FALSE
ON_ERRORError handling strategy. ABORT_STATEMENT: abort on any error. CONTINUE: skip bad rows and continue loading. SKIP_FILE: skip files that contain errors.ABORT_STATEMENT

File Write Options

OptionDescriptionDefault
OVERWRITEFALSE: fail if target path exists. TRUE: overwrite existing files.FALSE

Column Mapping

When an explicit column list is provided (e.g., COPY INTO t (col1, col2) FROM ...):

  • CSV: Columns are mapped positionally to the specified column list.
  • JSON: Columns are matched by name to the specified column list.
  • Parquet: Columns are matched by name to the specified column list.
  • The number of source columns must match the column list length (CSV). For JSON and Parquet, missing fields in the source become NULL.
  • Columns not in the list are filled with their DEFAULT value (if defined in the table schema) or NULL.
  • Non-nullable columns without a default value that are not in the list will cause an error.

When no column list is provided:

  • CSV: Columns are mapped positionally to all writable columns in the target table. The number of CSV columns must match the number of writable columns.
  • JSON: Columns are matched by name to the writable columns. Missing fields in JSON become NULL.

Repeated Imports

By default (FORCE = FALSE), COPY INTO tracks which files have been successfully loaded. A file is identified by its path, size, and last-modified timestamp.

  • Re-running the same COPY INTO command will skip already-loaded files and return status SKIPPED.
  • If a source file is modified (size or timestamp changes), it becomes eligible for re-loading.
  • FORCE = TRUE bypasses load history and always re-imports all matching files.

Result Output

Import returns one row per source file:

ColumnTypeDescription
file_nameSTRINGSource file name
statusSTRINGLOADED, PARTIALLY_LOADED, LOAD_FAILED, or SKIPPED
rows_loadedBIGINTNumber of rows written
rows_parsedBIGINTNumber of rows parsed from the file
errors_seenBIGINTNumber of error rows (parse or cast failures)
first_errorSTRINGFirst error message encountered (NULL if no errors)

File write returns a single row:

ColumnTypeDescription
output_pathSTRINGTarget output path
file_countINTNumber of files written
rows_writtenBIGINTTotal rows written

Limitations

  • CSV column-count mismatch: Rows with fewer or more columns than the target schema are treated as malformed records. With ON_ERROR = CONTINUE, these rows are skipped and counted as errors.
  • Only CSV, JSON, and Parquet formats are supported.
  • SINGLE = TRUE (single-file output) is not supported.
  • File format options must be specified inline in FILE_FORMAT = (...).
  • File listing is non-recursive: only direct files under the source path are processed. Subdirectories are ignored.
  • PATTERN matches the base file name only (not the full path).
  • Concurrent COPY INTO commands targeting the same table may produce duplicate data.
  • SKIP_HEADER only supports values 0 or 1.
  • FROM (...) accepts any read-only query (e.g. SELECT, WITH ... SELECT, VALUES); statements with side effects (e.g. INSERT, INSERT OVERWRITE DIRECTORY, DDL) are rejected.
  • For a FROM (...) export, rows_written is an execution-time statistic counted by a separate pass before the files are written. Because the DataFrame is lazy and not cached, writing re-executes the query a second time; if the query is non-deterministic (e.g. uses rand(), current_timestamp(), or reads a volatile source), the two runs can produce different rows, so rows_written may not match the actual file contents. The result is intentionally not staged, so the export does not consume extra executor disk.