This documentation is for an unreleased version of Apache Paimon. We recommend you use the latest stable version.
Iceberg Compatibility #
Paimon supports generating Iceberg compatible metadata, so that Paimon tables can be consumed directly by Iceberg readers.
Enable Iceberg Compatibility #
Set the following table options, so that Paimon tables can generate Iceberg compatible metadata.
Option | Default | Type | Description |
---|---|---|---|
metadata.iceberg.storage |
disabled | Enum |
When set, produce Iceberg metadata after a snapshot is committed, so that Iceberg readers can read Paimon's raw data files.
|
For most SQL users, we recommend setting 'metadata.iceberg.storage' = 'hadoop-catalog'
or 'metadata.iceberg.storage' = 'hive-catalog'
,
so that all tables can be visited as an Iceberg warehouse.
For Iceberg Java API users, you might consider setting 'metadata.iceberg.storage' = 'table-location'
,
so you can visit each table with its table path.
Example: Query Paimon Append Only Tables on Flink/Spark with Iceberg Connector #
Let’s walk through a simple example, where we query Paimon tables with Iceberg connectors in Flink and Spark. Before trying out this example, make sure that your compute engine already supports Iceberg. Please refer to Iceberg’s document if you haven’t set up Iceberg.
Let’s now create a Paimon append only table with Iceberg compatibility enabled and insert some data.
CREATE CATALOG paimon_catalog WITH (
'type' = 'paimon',
'warehouse' = '<path-to-warehouse>'
);
CREATE TABLE paimon_catalog.`default`.cities (
country STRING,
name STRING
) WITH (
'metadata.iceberg.storage' = 'hadoop-catalog'
);
INSERT INTO paimon_catalog.`default`.cities VALUES ('usa', 'new york'), ('germany', 'berlin'), ('usa', 'chicago'), ('germany', 'hamburg');
Start spark-sql
with the following command line.
spark-sql --jars <path-to-paimon-jar> \
--conf spark.sql.catalog.paimon_catalog=org.apache.paimon.spark.SparkCatalog \
--conf spark.sql.catalog.paimon_catalog.warehouse=<path-to-warehouse> \
--packages org.apache.iceberg:iceberg-spark-runtime-<iceberg-version> \
--conf spark.sql.catalog.iceberg_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.iceberg_catalog.type=hadoop \
--conf spark.sql.catalog.iceberg_catalog.warehouse=<path-to-warehouse>/iceberg \
--conf spark.sql.catalog.iceberg_catalog.cache-enabled=false \ # disable iceberg catalog caching to quickly see the result
--conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
Run the following Spark SQL to create Paimon table and insert data.
CREATE TABLE paimon_catalog.`default`.cities (
country STRING,
name STRING
) TBLPROPERTIES (
'metadata.iceberg.storage' = 'hadoop-catalog'
);
INSERT INTO paimon_catalog.`default`.cities VALUES ('usa', 'new york'), ('germany', 'berlin'), ('usa', 'chicago'), ('germany', 'hamburg');
Now let’s query this Paimon table with Iceberg connector.
CREATE CATALOG iceberg_catalog WITH (
'type' = 'iceberg',
'catalog-type' = 'hadoop',
'warehouse' = '<path-to-warehouse>/iceberg',
'cache-enabled' = 'false' -- disable iceberg catalog caching to quickly see the result
);
SELECT * FROM iceberg_catalog.`default`.cities WHERE country = 'germany';
/*
+----+--------------------------------+--------------------------------+
| op | country | name |
+----+--------------------------------+--------------------------------+
| +I | germany | berlin |
| +I | germany | hamburg |
+----+--------------------------------+--------------------------------+
*/
SELECT * FROM iceberg_catalog.`default`.cities WHERE country = 'germany';
/*
germany berlin
germany hamburg
*/
Let’s insert more data and query again.
INSERT INTO paimon_catalog.`default`.cities VALUES ('usa', 'houston'), ('germany', 'munich');
SELECT * FROM iceberg_catalog.`default`.cities WHERE country = 'germany';
/*
+----+--------------------------------+--------------------------------+
| op | country | name |
+----+--------------------------------+--------------------------------+
| +I | germany | munich |
| +I | germany | berlin |
| +I | germany | hamburg |
+----+--------------------------------+--------------------------------+
*/
INSERT INTO paimon_catalog.`default`.cities VALUES ('usa', 'houston'), ('germany', 'munich');
SELECT * FROM iceberg_catalog.`default`.cities WHERE country = 'germany';
/*
germany munich
germany berlin
germany hamburg
*/
Example: Query Paimon Primary Key Tables on Flink/Spark with Iceberg Connector #
CREATE CATALOG paimon_catalog WITH (
'type' = 'paimon',
'warehouse' = '<path-to-warehouse>'
);
CREATE TABLE paimon_catalog.`default`.orders (
order_id BIGINT,
status STRING,
payment DOUBLE,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'metadata.iceberg.storage' = 'hadoop-catalog',
'compaction.optimization-interval' = '1ms' -- ATTENTION: this option is only for testing, see "timeliness" section below for more information
);
INSERT INTO paimon_catalog.`default`.orders VALUES (1, 'SUBMITTED', CAST(NULL AS DOUBLE)), (2, 'COMPLETED', 200.0), (3, 'SUBMITTED', CAST(NULL AS DOUBLE));
CREATE CATALOG iceberg_catalog WITH (
'type' = 'iceberg',
'catalog-type' = 'hadoop',
'warehouse' = '<path-to-warehouse>/iceberg',
'cache-enabled' = 'false' -- disable iceberg catalog caching to quickly see the result
);
SELECT * FROM iceberg_catalog.`default`.orders WHERE status = 'COMPLETED';
/*
+----+----------------------+--------------------------------+--------------------------------+
| op | order_id | status | payment |
+----+----------------------+--------------------------------+--------------------------------+
| +I | 2 | COMPLETED | 200.0 |
+----+----------------------+--------------------------------+--------------------------------+
*/
INSERT INTO paimon_catalog.`default`.orders VALUES (1, 'COMPLETED', 100.0);
SELECT * FROM iceberg_catalog.`default`.orders WHERE status = 'COMPLETED';
/*
+----+----------------------+--------------------------------+--------------------------------+
| op | order_id | status | payment |
+----+----------------------+--------------------------------+--------------------------------+
| +I | 1 | COMPLETED | 100.0 |
| +I | 2 | COMPLETED | 200.0 |
+----+----------------------+--------------------------------+--------------------------------+
*/
Start spark-sql
with the following command line.
spark-sql --jars <path-to-paimon-jar> \
--conf spark.sql.catalog.paimon_catalog=org.apache.paimon.spark.SparkCatalog \
--conf spark.sql.catalog.paimon_catalog.warehouse=<path-to-warehouse> \
--packages org.apache.iceberg:iceberg-spark-runtime-<iceberg-version> \
--conf spark.sql.catalog.iceberg_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.iceberg_catalog.type=hadoop \
--conf spark.sql.catalog.iceberg_catalog.warehouse=<path-to-warehouse>/iceberg \
--conf spark.sql.catalog.iceberg_catalog.cache-enabled=false \ # disable iceberg catalog caching to quickly see the result
--conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
Run the following Spark SQL to create Paimon table, insert/update data, and query with Iceberg catalog.
CREATE TABLE paimon_catalog.`default`.orders (
order_id BIGINT,
status STRING,
payment DOUBLE
) TBLPROPERTIES (
'primary-key' = 'order_id',
'metadata.iceberg.storage' = 'hadoop-catalog',
'compaction.optimization-interval' = '1ms' -- ATTENTION: this option is only for testing, see "timeliness" section below for more information
);
INSERT INTO paimon_catalog.`default`.orders VALUES (1, 'SUBMITTED', CAST(NULL AS DOUBLE)), (2, 'COMPLETED', 200.0), (3, 'SUBMITTED', CAST(NULL AS DOUBLE));
SELECT * FROM iceberg_catalog.`default`.orders WHERE status = 'COMPLETED';
/*
2 COMPLETED 200.0
*/
INSERT INTO paimon_catalog.`default`.orders VALUES (1, 'COMPLETED', 100.0);
SELECT * FROM iceberg_catalog.`default`.orders WHERE status = 'COMPLETED';
/*
2 COMPLETED 200.0
1 COMPLETED 100.0
*/
Timeliness #
Paimon primary key tables organize data files as LSM trees, so data files must be merged in memory before querying. However, Iceberg readers are not able to merge data files, so they can only query data files on the highest level of LSM trees. Data files on the highest level are produced by the full compaction process. So to conclude, for primary key tables, Iceberg readers can only query data after full compaction.
By default, there is no guarantee on how frequently Paimon will perform full compaction. You can configure the following table option, so that Paimon is forced to perform full compaction after several commits.
Option | Default | Type | Description |
---|---|---|---|
compaction.optimization-interval |
(none) | Duration | Full compaction will be constantly triggered per time interval. First compaction after the job starts will always be full compaction. |
full-compaction.delta-commits |
(none) | Integer | Full compaction will be constantly triggered after delta commits. Only implemented in Flink. |
Note that full compaction is a resource-consuming process, so the value of this table option should not be too small. We recommend full compaction to be performed once or twice per hour.
Access Paimon Table from Iceberg Hive Catalog #
When creating Paimon table, set 'metadata.iceberg.storage' = 'hive-catalog'
.
This option value not only store Iceberg metadata like hadoop-catalog, but also create Iceberg external table in Hive.
This Paimon table can be accessed from Iceberg Hive catalog later.
To provide information about Hive metastore, you also need to set some (or all) of the following table options when creating Paimon table.
Option | Default | Type | Description |
---|---|---|---|
metadata.iceberg.uri |
String | Hive metastore uri for Iceberg Hive catalog. | |
metadata.iceberg.hive-conf-dir |
String | hive-conf-dir for Iceberg Hive catalog. | |
metadata.iceberg.hadoop-conf-dir |
String | hadoop-conf-dir for Iceberg Hive catalog. |
Example: Query Paimon Append Only Tables on Trino with Iceberg Connector #
In this example, we use Trino Iceberg connector to access Paimon table through Iceberg Hive catalog. Before trying out this example, make sure that you have configured Trino Iceberg connector. See Trino’s document for more information.
Let’s first create a Paimon table with Iceberg compatibility enabled.
CREATE CATALOG paimon_catalog WITH (
'type' = 'paimon',
'warehouse' = '<path-to-warehouse>'
);
CREATE TABLE paimon_catalog.`default`.animals (
kind STRING,
name STRING
) WITH (
'metadata.iceberg.storage' = 'hive-catalog',
'metadata.iceberg.uri' = 'thrift://<host>:<port>'
);
INSERT INTO paimon_catalog.`default`.animals VALUES ('mammal', 'cat'), ('mammal', 'dog'), ('reptile', 'snake'), ('reptile', 'lizard');
Start spark-sql
with the following command line.
spark-sql --jars <path-to-paimon-jar> \
--conf spark.sql.catalog.paimon_catalog=org.apache.paimon.spark.SparkCatalog \
--conf spark.sql.catalog.paimon_catalog.warehouse=<path-to-warehouse> \
--packages org.apache.iceberg:iceberg-spark-runtime-<iceberg-version> \
--conf spark.sql.catalog.iceberg_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.iceberg_catalog.type=hadoop \
--conf spark.sql.catalog.iceberg_catalog.warehouse=<path-to-warehouse>/iceberg \
--conf spark.sql.catalog.iceberg_catalog.cache-enabled=false \ # disable iceberg catalog caching to quickly see the result
--conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
Run the following Spark SQL to create Paimon table, insert/update data, and query with Iceberg catalog.
CREATE TABLE paimon_catalog.`default`.animals (
kind STRING,
name STRING
) TBLPROPERTIES (
'metadata.iceberg.storage' = 'hive-catalog',
'metadata.iceberg.uri' = 'thrift://<host>:<port>'
);
INSERT INTO paimon_catalog.`default`.animals VALUES ('mammal', 'cat'), ('mammal', 'dog'), ('reptile', 'snake'), ('reptile', 'lizard');
Start Trino using Iceberg catalog and query from Paimon table.
SELECT * FROM animals WHERE class = 'mammal';
/*
kind | name
--------+------
mammal | cat
mammal | dog
*/
Supported Types #
Paimon Iceberg compatibility currently supports the following data types.
Paimon Data Type | Iceberg Data Type |
---|---|
BOOLEAN |
boolean |
INT |
int |
BIGINT |
long |
FLOAT |
float |
DOUBLE |
double |
DECIMAL |
decimal |
CHAR |
string |
VARCHAR |
string |
BINARY |
binary |
VARBINARY |
binary |
DATE |
date |
TIMESTAMP * |
timestamp |
TIMESTAMP_LTZ * |
timestamptz |
*: TIMESTAMP
and TIMESTAMP_LTZ
type only support precision from 4 to 6
Other Related Table Options #
Option | Default | Type | Description |
---|---|---|---|
metadata.iceberg.compaction.min.file-num |
10 | Integer | Minimum number of Iceberg metadata files to trigger metadata compaction. |
metadata.iceberg.compaction.max.file-num |
50 | Integer | If number of small Iceberg metadata files exceeds this limit, always trigger metadata compaction regardless of their total size. |