Iceberg Compatibility
This documentation is for an unreleased version of Apache Paimon. We recommend you use the latest stable version.

Iceberg Compatibility #

Paimon supports generating Iceberg compatible metadata, so that Paimon tables can be consumed directly by Iceberg readers.

Enable Iceberg Compatibility #

Set the following table options, so that Paimon tables can generate Iceberg compatible metadata.

Option Default Type Description
metadata.iceberg.storage
disabled Enum When set, produce Iceberg metadata after a snapshot is committed, so that Iceberg readers can read Paimon's raw data files.
  • disabled: Disable Iceberg compatibility support.
  • table-location: Store Iceberg metadata in each table's directory.
  • hadoop-catalog: Store Iceberg metadata in a separate directory. This directory can be specified as the warehouse directory of an Iceberg Hadoop catalog.
  • hive-catalog: Not only store Iceberg metadata like hadoop-catalog, but also create Iceberg external table in Hive.

For most SQL users, we recommend setting 'metadata.iceberg.storage' = 'hadoop-catalog' or 'metadata.iceberg.storage' = 'hive-catalog', so that all tables can be visited as an Iceberg warehouse. For Iceberg Java API users, you might consider setting 'metadata.iceberg.storage' = 'table-location', so you can visit each table with its table path.

Example: Query Paimon Append Only Tables on Flink/Spark with Iceberg Connector #

Let’s walk through a simple example, where we query Paimon tables with Iceberg connectors in Flink and Spark. Before trying out this example, make sure that your compute engine already supports Iceberg. Please refer to Iceberg’s document if you haven’t set up Iceberg.

Let’s now create a Paimon append only table with Iceberg compatibility enabled and insert some data.

CREATE CATALOG paimon_catalog WITH (
    'type' = 'paimon',
    'warehouse' = '<path-to-warehouse>'
);

CREATE TABLE paimon_catalog.`default`.cities (
    country STRING,
    name STRING
) WITH (
    'metadata.iceberg.storage' = 'hadoop-catalog'
);

INSERT INTO paimon_catalog.`default`.cities VALUES ('usa', 'new york'), ('germany', 'berlin'), ('usa', 'chicago'), ('germany', 'hamburg');

Start spark-sql with the following command line.

spark-sql --jars <path-to-paimon-jar> \
    --conf spark.sql.catalog.paimon_catalog=org.apache.paimon.spark.SparkCatalog \
    --conf spark.sql.catalog.paimon_catalog.warehouse=<path-to-warehouse> \
    --packages org.apache.iceberg:iceberg-spark-runtime-<iceberg-version> \
    --conf spark.sql.catalog.iceberg_catalog=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.iceberg_catalog.type=hadoop \
    --conf spark.sql.catalog.iceberg_catalog.warehouse=<path-to-warehouse>/iceberg \
    --conf spark.sql.catalog.iceberg_catalog.cache-enabled=false \ # disable iceberg catalog caching to quickly see the result
    --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

Run the following Spark SQL to create Paimon table and insert data.

CREATE TABLE paimon_catalog.`default`.cities (
    country STRING,
    name STRING
) TBLPROPERTIES (
    'metadata.iceberg.storage' = 'hadoop-catalog'
);

INSERT INTO paimon_catalog.`default`.cities VALUES ('usa', 'new york'), ('germany', 'berlin'), ('usa', 'chicago'), ('germany', 'hamburg');

Now let’s query this Paimon table with Iceberg connector.

CREATE CATALOG iceberg_catalog WITH (
    'type' = 'iceberg',
    'catalog-type' = 'hadoop',
    'warehouse' = '<path-to-warehouse>/iceberg',
    'cache-enabled' = 'false' -- disable iceberg catalog caching to quickly see the result
);

SELECT * FROM iceberg_catalog.`default`.cities WHERE country = 'germany';
/*
+----+--------------------------------+--------------------------------+
| op |                        country |                           name |
+----+--------------------------------+--------------------------------+
| +I |                        germany |                         berlin |
| +I |                        germany |                        hamburg |
+----+--------------------------------+--------------------------------+
*/
SELECT * FROM iceberg_catalog.`default`.cities WHERE country = 'germany';
/*
germany berlin
germany hamburg
*/

Let’s insert more data and query again.

INSERT INTO paimon_catalog.`default`.cities VALUES ('usa', 'houston'), ('germany', 'munich');

SELECT * FROM iceberg_catalog.`default`.cities WHERE country = 'germany';
/*
+----+--------------------------------+--------------------------------+
| op |                        country |                           name |
+----+--------------------------------+--------------------------------+
| +I |                        germany |                         munich |
| +I |                        germany |                         berlin |
| +I |                        germany |                        hamburg |
+----+--------------------------------+--------------------------------+
*/
INSERT INTO paimon_catalog.`default`.cities VALUES ('usa', 'houston'), ('germany', 'munich');

SELECT * FROM iceberg_catalog.`default`.cities WHERE country = 'germany';
/*
germany munich
germany berlin
germany hamburg
*/

Example: Query Paimon Primary Key Tables on Flink/Spark with Iceberg Connector #

CREATE CATALOG paimon_catalog WITH (
    'type' = 'paimon',
    'warehouse' = '<path-to-warehouse>'
);

CREATE TABLE paimon_catalog.`default`.orders (
    order_id BIGINT,
    status STRING,
    payment DOUBLE,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'metadata.iceberg.storage' = 'hadoop-catalog',
    'compaction.optimization-interval' = '1ms' -- ATTENTION: this option is only for testing, see "timeliness" section below for more information
);

INSERT INTO paimon_catalog.`default`.orders VALUES (1, 'SUBMITTED', CAST(NULL AS DOUBLE)), (2, 'COMPLETED', 200.0), (3, 'SUBMITTED', CAST(NULL AS DOUBLE));

CREATE CATALOG iceberg_catalog WITH (
    'type' = 'iceberg',
    'catalog-type' = 'hadoop',
    'warehouse' = '<path-to-warehouse>/iceberg',
    'cache-enabled' = 'false' -- disable iceberg catalog caching to quickly see the result
);

SELECT * FROM iceberg_catalog.`default`.orders WHERE status = 'COMPLETED';
/*
+----+----------------------+--------------------------------+--------------------------------+
| op |             order_id |                         status |                        payment |
+----+----------------------+--------------------------------+--------------------------------+
| +I |                    2 |                      COMPLETED |                          200.0 |
+----+----------------------+--------------------------------+--------------------------------+
*/

INSERT INTO paimon_catalog.`default`.orders VALUES (1, 'COMPLETED', 100.0);

SELECT * FROM iceberg_catalog.`default`.orders WHERE status = 'COMPLETED';
/*
+----+----------------------+--------------------------------+--------------------------------+
| op |             order_id |                         status |                        payment |
+----+----------------------+--------------------------------+--------------------------------+
| +I |                    1 |                      COMPLETED |                          100.0 |
| +I |                    2 |                      COMPLETED |                          200.0 |
+----+----------------------+--------------------------------+--------------------------------+
*/

Start spark-sql with the following command line.

spark-sql --jars <path-to-paimon-jar> \
    --conf spark.sql.catalog.paimon_catalog=org.apache.paimon.spark.SparkCatalog \
    --conf spark.sql.catalog.paimon_catalog.warehouse=<path-to-warehouse> \
    --packages org.apache.iceberg:iceberg-spark-runtime-<iceberg-version> \
    --conf spark.sql.catalog.iceberg_catalog=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.iceberg_catalog.type=hadoop \
    --conf spark.sql.catalog.iceberg_catalog.warehouse=<path-to-warehouse>/iceberg \
    --conf spark.sql.catalog.iceberg_catalog.cache-enabled=false \ # disable iceberg catalog caching to quickly see the result
    --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

Run the following Spark SQL to create Paimon table, insert/update data, and query with Iceberg catalog.

CREATE TABLE paimon_catalog.`default`.orders (
    order_id BIGINT,
    status STRING,
    payment DOUBLE
) TBLPROPERTIES (
    'primary-key' = 'order_id',
    'metadata.iceberg.storage' = 'hadoop-catalog',
    'compaction.optimization-interval' = '1ms' -- ATTENTION: this option is only for testing, see "timeliness" section below for more information
);

INSERT INTO paimon_catalog.`default`.orders VALUES (1, 'SUBMITTED', CAST(NULL AS DOUBLE)), (2, 'COMPLETED', 200.0), (3, 'SUBMITTED', CAST(NULL AS DOUBLE));

SELECT * FROM iceberg_catalog.`default`.orders WHERE status = 'COMPLETED';
/*
2       COMPLETED       200.0
*/

INSERT INTO paimon_catalog.`default`.orders VALUES (1, 'COMPLETED', 100.0);

SELECT * FROM iceberg_catalog.`default`.orders WHERE status = 'COMPLETED';
/*
2       COMPLETED       200.0
1       COMPLETED       100.0
*/

Timeliness #

Paimon primary key tables organize data files as LSM trees, so data files must be merged in memory before querying. However, Iceberg readers are not able to merge data files, so they can only query data files on the highest level of LSM trees. Data files on the highest level are produced by the full compaction process. So to conclude, for primary key tables, Iceberg readers can only query data after full compaction.

By default, there is no guarantee on how frequently Paimon will perform full compaction. You can configure the following table option, so that Paimon is forced to perform full compaction after several commits.

Option Default Type Description
compaction.optimization-interval
(none) Duration Full compaction will be constantly triggered per time interval. First compaction after the job starts will always be full compaction.
full-compaction.delta-commits
(none) Integer Full compaction will be constantly triggered after delta commits. Only implemented in Flink.

Note that full compaction is a resource-consuming process, so the value of this table option should not be too small. We recommend full compaction to be performed once or twice per hour.

Access Paimon Table from Iceberg Hive Catalog #

When creating Paimon table, set 'metadata.iceberg.storage' = 'hive-catalog'. This option value not only store Iceberg metadata like hadoop-catalog, but also create Iceberg external table in Hive. This Paimon table can be accessed from Iceberg Hive catalog later.

To provide information about Hive metastore, you also need to set some (or all) of the following table options when creating Paimon table.

Option Default Type Description
metadata.iceberg.uri
String Hive metastore uri for Iceberg Hive catalog.
metadata.iceberg.hive-conf-dir
String hive-conf-dir for Iceberg Hive catalog.
metadata.iceberg.hadoop-conf-dir
String hadoop-conf-dir for Iceberg Hive catalog.

Example: Query Paimon Append Only Tables on Trino with Iceberg Connector #

In this example, we use Trino Iceberg connector to access Paimon table through Iceberg Hive catalog. Before trying out this example, make sure that you have configured Trino Iceberg connector. See Trino’s document for more information.

Let’s first create a Paimon table with Iceberg compatibility enabled.

CREATE CATALOG paimon_catalog WITH (
    'type' = 'paimon',
    'warehouse' = '<path-to-warehouse>'
);

CREATE TABLE paimon_catalog.`default`.animals (
    kind STRING,
    name STRING
) WITH (
    'metadata.iceberg.storage' = 'hive-catalog',
    'metadata.iceberg.uri' = 'thrift://<host>:<port>'
);

INSERT INTO paimon_catalog.`default`.animals VALUES ('mammal', 'cat'), ('mammal', 'dog'), ('reptile', 'snake'), ('reptile', 'lizard');

Start spark-sql with the following command line.

spark-sql --jars <path-to-paimon-jar> \
    --conf spark.sql.catalog.paimon_catalog=org.apache.paimon.spark.SparkCatalog \
    --conf spark.sql.catalog.paimon_catalog.warehouse=<path-to-warehouse> \
    --packages org.apache.iceberg:iceberg-spark-runtime-<iceberg-version> \
    --conf spark.sql.catalog.iceberg_catalog=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.iceberg_catalog.type=hadoop \
    --conf spark.sql.catalog.iceberg_catalog.warehouse=<path-to-warehouse>/iceberg \
    --conf spark.sql.catalog.iceberg_catalog.cache-enabled=false \ # disable iceberg catalog caching to quickly see the result
    --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

Run the following Spark SQL to create Paimon table, insert/update data, and query with Iceberg catalog.

CREATE TABLE paimon_catalog.`default`.animals (
    kind STRING,
    name STRING
) TBLPROPERTIES (
    'metadata.iceberg.storage' = 'hive-catalog',
    'metadata.iceberg.uri' = 'thrift://<host>:<port>'
);

INSERT INTO paimon_catalog.`default`.animals VALUES ('mammal', 'cat'), ('mammal', 'dog'), ('reptile', 'snake'), ('reptile', 'lizard');

Start Trino using Iceberg catalog and query from Paimon table.

SELECT * FROM animals WHERE class = 'mammal';
/*
   kind | name 
--------+------
 mammal | cat  
 mammal | dog  
*/

Supported Types #

Paimon Iceberg compatibility currently supports the following data types.

Paimon Data Type Iceberg Data Type
BOOLEAN boolean
INT int
BIGINT long
FLOAT float
DOUBLE double
DECIMAL decimal
CHAR string
VARCHAR string
BINARY binary
VARBINARY binary
DATE date
TIMESTAMP* timestamp
TIMESTAMP_LTZ* timestamptz

*: TIMESTAMP and TIMESTAMP_LTZ type only support precision from 4 to 6

Option Default Type Description
metadata.iceberg.compaction.min.file-num
10 Integer Minimum number of Iceberg metadata files to trigger metadata compaction.
metadata.iceberg.compaction.max.file-num
50 Integer If number of small Iceberg metadata files exceeds this limit, always trigger metadata compaction regardless of their total size.
Edit This Page
Copyright © 2024 The Apache Software Foundation. Apache Paimon, Paimon, and its feather logo are trademarks of The Apache Software Foundation.