Creating Tables

Creating Tables #

Creating Catalog Managed Tables #

Tables created in Paimon catalogs are managed by the catalog. When the table is dropped from catalog, its table files will also be deleted.

The following SQL assumes that you have registered and are using a Paimon catalog. It creates a managed table named MyTable with five columns in the catalog’s default database, where dt, hh and user_id are the primary keys.

CREATE TABLE MyTable (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING,
    PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
);
CREATE TABLE MyTable (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING
) TBLPROPERTIES (
    'primary-key' = 'dt,hh,user_id'
);
Inserting jobs on the table should be stopped prior to dropping tables, or table files couldn’t be deleted completely.

Partitioned Tables #

The following SQL creates a table named MyTable with five columns partitioned by dt and hh, where dt, hh and user_id are the primary keys.

CREATE TABLE MyTable (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING,
    PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh);
CREATE TABLE MyTable (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING
) PARTITIONED BY (dt, hh) TBLPROPERTIES (
    'primary-key' = 'dt,hh,user_id'
);
Partition keys must be a subset of primary keys if primary keys are defined.
By configuring partition.expiration-time, expired partitions can be automatically deleted.

Create Table As #

Table can be created and populated by the results of a query, for example, we have a sql like this: CREATE TABLE table_b AS SELECT id, name FORM table_a, The resulting table table_b will be equivalent to create the table and insert the data with the following statement: CREATE TABLE table_b (id INT, name STRING); INSERT INTO table_b SELECT id, name FROM table_a;

We can specify the primary key or partition when use CREATE TABLE AS SELECT, for syntax, please refer to the following sql.


/* For streaming mode, you need to enable the checkpoint. */

CREATE TABLE MyTable (
    user_id BIGINT,
    item_id BIGINT
);
CREATE TABLE MyTableAs AS SELECT * FROM MyTable;

/* partitioned table */
CREATE TABLE MyTablePartition (
     user_id BIGINT,
     item_id BIGINT,
     behavior STRING,
     dt STRING,
     hh STRING
) PARTITIONED BY (dt, hh);
CREATE TABLE MyTablePartitionAs WITH ('partition' = 'dt') AS SELECT * FROM MyTablePartition;
    
/* change options */
CREATE TABLE MyTableOptions (
       user_id BIGINT,
       item_id BIGINT
) WITH ('file.format' = 'orc');
CREATE TABLE MyTableOptionsAs WITH ('file.format' = 'parquet') AS SELECT * FROM MyTableOptions;

/* primary key */
CREATE TABLE MyTablePk (
      user_id BIGINT,
      item_id BIGINT,
      behavior STRING,
      dt STRING,
      hh STRING,
      PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) ;
CREATE TABLE MyTablePkAs WITH ('primary-key' = 'dt,hh') AS SELECT * FROM MyTablePk;


/* primary key + partition */
CREATE TABLE MyTableAll (
      user_id BIGINT,
      item_id BIGINT,
      behavior STRING,
      dt STRING,
      hh STRING,
      PRIMARY KEY (dt, hh, user_id) NOT ENFORCED 
) PARTITIONED BY (dt, hh);
CREATE TABLE MyTableAllAs WITH ('primary-key' = 'dt,hh', 'partition' = 'dt') AS SELECT * FROM MyTableAll;
CREATE TABLE MyTable (
     user_id BIGINT,
     item_id BIGINT
);
CREATE TABLE MyTableAs AS SELECT * FROM MyTable;

/* partitioned table*/
CREATE TABLE MyTablePartition (
      user_id BIGINT,
      item_id BIGINT,
      behavior STRING,
      dt STRING,
      hh STRING
) PARTITIONED BY (dt, hh);
CREATE TABLE MyTablePartitionAs PARTITIONED BY (dt) AS SELECT * FROM MyTablePartition;

/* change TBLPROPERTIES */
CREATE TABLE MyTableOptions (
       user_id BIGINT,
       item_id BIGINT
) TBLPROPERTIES ('file.format' = 'orc');
CREATE TABLE MyTableOptionsAs TBLPROPERTIES ('file.format' = 'parquet') AS SELECT * FROM MyTableOptions;


/* primary key */
CREATE TABLE MyTablePk (
     user_id BIGINT,
     item_id BIGINT,
     behavior STRING,
     dt STRING,
     hh STRING
) TBLPROPERTIES (
    'primary-key' = 'dt,hh,user_id'
);
CREATE TABLE MyTablePkAs TBLPROPERTIES ('primary-key' = 'dt') AS SELECT * FROM MyTablePk;

/* primary key + partition */
CREATE TABLE MyTableAll (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING
) PARTITIONED BY (dt, hh) TBLPROPERTIES (
    'primary-key' = 'dt,hh,user_id'
);
CREATE TABLE MyTableAllAs PARTITIONED BY (dt) TBLPROPERTIES ('primary-key' = 'dt,hh') AS SELECT * FROM MyTableAll;

Create Table Like #

To create a table with the same schema, partition, and table properties as another table, use CREATE TABLE LIKE.

CREATE TABLE MyTable (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING,
    PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) ;

CREATE TABLE MyTableLike LIKE MyTable;

Table Properties #

Users can specify table properties to enable features or improve performance of Paimon. For a complete list of such properties, see configurations.

The following SQL creates a table named MyTable with five columns partitioned by dt and hh, where dt, hh and user_id are the primary keys. This table has two properties: 'bucket' = '2' and 'bucket-key' = 'user_id'.

CREATE TABLE MyTable (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING,
    PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh) WITH (
    'bucket' = '2',
    'bucket-key' = 'user_id'
);
CREATE TABLE MyTable (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING
) PARTITIONED BY (dt, hh) TBLPROPERTIES (
    'primary-key' = 'dt,hh,user_id',
    'bucket' = '2',
    'bucket-key' = 'user_id'
);

Creating External Tables #

External tables are recorded but not managed by catalogs. If an external table is dropped, its table files will not be deleted.

Paimon external tables can be used in any catalog. If you do not want to create a Paimon catalog and just want to read / write a table, you can consider external tables.

Flink SQL supports reading and writing an external table. External Paimon tables are created by specifying the connector and path table properties. The following SQL creates an external table named MyTable with five columns, where the base path of table files is hdfs://path/to/table.

CREATE TABLE MyTable (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING,
    PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) WITH (
    'connector' = 'paimon',
    'path' = 'hdfs://path/to/table',
    'auto-create' = 'true' -- this table property creates table files for an empty table if table path does not exist
                           -- currently only supported by Flink
);

Spark3 only supports creating external tables through Scala API. The following Scala code loads the table located at hdfs://path/to/table into a DataSet.

val dataset = spark.read.format("paimon").load("hdfs://path/to/table")

Spark2 only supports creating external tables through Scala API. The following Scala code loads the table located at hdfs://path/to/table into a DataSet.

val dataset = spark.read.format("paimon").load("hdfs://path/to/table")

Hive SQL only supports reading from an external table. The following SQL creates an external table named my_table, where the base path of table files is hdfs://path/to/table. As schemas are stored in table files, users do not need to write column definitions.

CREATE EXTERNAL TABLE my_table
STORED BY 'org.apache.paimon.hive.PaimonStorageHandler'
LOCATION 'hdfs://path/to/table';

Creating Temporary Tables #

Temporary tables are only supported by Flink. Like external tables, temporary tables are just recorded but not managed by the current Flink SQL session. If the temporary table is dropped, its resources will not be deleted. Temporary tables are also dropped when Flink SQL session is closed.

If you want to use Paimon catalog along with other tables but do not want to store them in other catalogs, you can create a temporary table. The following Flink SQL creates a Paimon catalog and a temporary table and also illustrates how to use both tables together.

CREATE CATALOG my_catalog WITH (
    'type' = 'paimon',
    'warehouse' = 'hdfs://path/to/warehouse'
);

USE CATALOG my_catalog;

-- Assume that there is already a table named my_table in my_catalog

CREATE TEMPORARY TABLE temp_table (
    k INT,
    v STRING
) WITH (
    'connector' = 'filesystem',
    'path' = 'hdfs://path/to/temp_table.csv',
    'format' = 'csv'
);

SELECT my_table.k, my_table.v, temp_table.v FROM my_table JOIN temp_table ON my_table.k = temp_table.k;