This documentation is for an unreleased version of Apache Paimon. We recommend you use the latest stable version.
Append Table
Append Tables #
Let’s walk through a simple example, where we query Paimon tables with Iceberg connectors in Flink and Spark. Before trying out this example, make sure that your compute engine already supports Iceberg. Please refer to Iceberg’s document if you haven’t set up Iceberg.
Let’s now create a Paimon append only table with Iceberg compatibility enabled and insert some data.
CREATE CATALOG paimon_catalog WITH (
'type' = 'paimon',
'warehouse' = '<path-to-warehouse>'
);
CREATE TABLE paimon_catalog.`default`.cities (
country STRING,
name STRING
) WITH (
'metadata.iceberg.storage' = 'hadoop-catalog'
);
INSERT INTO paimon_catalog.`default`.cities VALUES ('usa', 'new york'), ('germany', 'berlin'), ('usa', 'chicago'), ('germany', 'hamburg');
Start spark-sql
with the following command line.
spark-sql --jars <path-to-paimon-jar> \
--conf spark.sql.catalog.paimon_catalog=org.apache.paimon.spark.SparkCatalog \
--conf spark.sql.catalog.paimon_catalog.warehouse=<path-to-warehouse> \
--packages org.apache.iceberg:iceberg-spark-runtime-<iceberg-version> \
--conf spark.sql.catalog.iceberg_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.iceberg_catalog.type=hadoop \
--conf spark.sql.catalog.iceberg_catalog.warehouse=<path-to-warehouse>/iceberg \
--conf spark.sql.catalog.iceberg_catalog.cache-enabled=false \ # disable iceberg catalog caching to quickly see the result
--conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
Run the following Spark SQL to create Paimon table and insert data.
CREATE TABLE paimon_catalog.`default`.cities (
country STRING,
name STRING
) TBLPROPERTIES (
'metadata.iceberg.storage' = 'hadoop-catalog'
);
INSERT INTO paimon_catalog.`default`.cities VALUES ('usa', 'new york'), ('germany', 'berlin'), ('usa', 'chicago'), ('germany', 'hamburg');
Now let’s query this Paimon table with Iceberg connector.
CREATE CATALOG iceberg_catalog WITH (
'type' = 'iceberg',
'catalog-type' = 'hadoop',
'warehouse' = '<path-to-warehouse>/iceberg',
'cache-enabled' = 'false' -- disable iceberg catalog caching to quickly see the result
);
SELECT * FROM iceberg_catalog.`default`.cities WHERE country = 'germany';
/*
+----+--------------------------------+--------------------------------+
| op | country | name |
+----+--------------------------------+--------------------------------+
| +I | germany | berlin |
| +I | germany | hamburg |
+----+--------------------------------+--------------------------------+
*/
SELECT * FROM iceberg_catalog.`default`.cities WHERE country = 'germany';
/*
germany berlin
germany hamburg
*/
Let’s insert more data and query again.
INSERT INTO paimon_catalog.`default`.cities VALUES ('usa', 'houston'), ('germany', 'munich');
SELECT * FROM iceberg_catalog.`default`.cities WHERE country = 'germany';
/*
+----+--------------------------------+--------------------------------+
| op | country | name |
+----+--------------------------------+--------------------------------+
| +I | germany | munich |
| +I | germany | berlin |
| +I | germany | hamburg |
+----+--------------------------------+--------------------------------+
*/
INSERT INTO paimon_catalog.`default`.cities VALUES ('usa', 'houston'), ('germany', 'munich');
SELECT * FROM iceberg_catalog.`default`.cities WHERE country = 'germany';
/*
germany munich
germany berlin
germany hamburg
*/