Specification

Specification #

This is the specification for the Paimon table format, this document standardizes the underlying file structure and design of Paimon.

Terms #

  • Schema: fields, primary keys definition, partition keys definition and options.
  • Snapshot: the entrance to all data committed at some specific time point.
  • Manifest list: includes several manifest files.
  • Manifest: includes several data files or changelog files.
  • Data File: contains incremental records.
  • Changelog File: contains records produced by changelog-producer.
  • Global Index: index for a bucket or partition.
  • Data File Index: index for a data file.

Run Flink SQL with Paimon:

CREATE CATALOG my_catalog WITH (
    'type' = 'paimon',
    'warehouse' = '/your/path'
);       
USE CATALOG my_catalog;

CREATE TABLE my_table (
    k INT PRIMARY KEY NOT ENFORCED,
    f0 INT,
    f1 STRING
);

INSERT INTO my_table VALUES (1, 11, '111');

Take a look to the disk:

warehouse
└── default.db
    └── my_table
        ├── bucket-0
        │   └── data-59f60cb9-44af-48cc-b5ad-59e85c663c8f-0.orc
        ├── index
        │   └── index-5625e6d9-dd44-403b-a738-2b6ea92e20f1-0
        ├── manifest
        │   ├── index-manifest-5d670043-da25-4265-9a26-e31affc98039-0
        │   ├── manifest-6758823b-2010-4d06-aef0-3b1b597723d6-0
        │   ├── manifest-list-9f856d52-5b33-4c10-8933-a0eddfaa25bf-0
        │   └── manifest-list-9f856d52-5b33-4c10-8933-a0eddfaa25bf-1
        ├── schema
        │   └── schema-0
        └── snapshot
            ├── EARLIEST
            ├── LATEST
            └── snapshot-1

Schema #

The version of the schema file starts from 0 and currently retains all versions of the schema. There may be old files that rely on the old schema version, so its deletion should be done with caution.

Schema File is JSON, it includes:

  1. fields: data field list, data field contains id, name, type, field id is used to support schema evolution.
  2. partitionKeys: partition definition of the table, it cannot be modified.
  3. primaryKeys: primary key definition of the table, it cannot be modified.
  4. options: options of the table, including a lot of capabilities and optimizations.

Snapshot #

Each commit generates a snapshot file, and the version of the snapshot file starts from 1 and must be continuous. EARLIEST and LATEST are hint files at the beginning and end of the snapshot list, and they can be inaccurate. When hint files are inaccurate, the read will scan all snapshot files to determine the beginning and end.

Writing commit will preempt the next snapshot id, and once the snapshot file is successfully written, this commit will be visible.

Snapshot File is JSON, it includes:

  1. version: Snapshot file version, current is 3.
  2. id: snapshot id, same to file name.
  3. schemaId: the corresponding schema version for this commit.
  4. baseManifestList: a manifest list recording all changes from the previous snapshots.
  5. deltaManifestList: a manifest list recording all new changes occurred in this snapshot.
  6. changelogManifestList: a manifest list recording all changelog produced in this snapshot, null if no changelog is produced.
  7. indexManifest: a manifest recording all index files of this table, null if no index file.
  8. commitUser: usually generated by UUID, it is used for recovery of streaming writes, one stream write job with one user.
  9. commitIdentifier: transaction id corresponding to streaming write, each transaction may result in multiple commits for different commitKinds.
  10. commitKind: type of changes in this snapshot, including append, compact, overwrite and analyze.
  11. timeMillis: commit time millis.
  12. totalRecordCount: record count of all changes occurred in this snapshot.
  13. deltaRecordCount: record count of all new changes occurred in this snapshot.
  14. changelogRecordCount: record count of all changelog produced in this snapshot.
  15. watermark: watermark for input records, from Flink watermark mechanism, null if there is no watermark.
  16. statistics: stats file name for statistics of this table.

Manifest List #

Manifest List includes meta of several manifest files. Its name contains UUID, it is a avro file, the schema is:

  1. fileName: manifest file name.
  2. fileSize: manifest file size.
  3. numAddedFiles: number added files in manifest.
  4. numDeletedFiles: number deleted files in manifest.
  5. partitionStats: partition stats, the minimum and maximum values of partition fields in this manifest are beneficial for skipping certain manifest files during queries, it is a SimpleStats.
  6. schemaId: schema id when writing this manifest file.

Manifest #

Manifest includes meta of several data files or changelog files. Its name contains UUID, it is a avro file.

The changes of the file are saved in the manifest, and the file can be added or deleted. Manifests should be in an orderly manner, and the same file may be added or deleted multiple times. The last version should be read. This design can make commit lighter to support file deletion generated by compaction.

The schema is:

  1. kind: ADD or DELETE,
  2. partition: partition spec, a BinaryRow.
  3. bucket: bucket of this file.
  4. totalBuckets: total buckets when write this file, it is used for verification after bucket changes.
  5. file: data file meta.

The data file meta is:

  1. fileName: file name.
  2. fileSize: file size.
  3. rowCount: total number of rows (including add & delete) in this file.
  4. minKey: the minimum key of this file.
  5. maxKey: the maximum key of this file.
  6. keyStats: the statistics of the key.
  7. valueStats: the statistics of the value.
  8. minSequenceNumber: the minimum sequence number.
  9. maxSequenceNumber: the maximum sequence number.
  10. schemaId: schema id when write this file.
  11. level: level of this file, in LSM.
  12. extraFiles: extra files for this file, for example, data file index file.
  13. creationTime: creation time of this file.
  14. deleteRowCount: rowCount = addRowCount + deleteRowCount.
  15. embeddedIndex: if data file index is too small, store the index in manifest.

Partition #

Consider a Partition table via Flink SQL:

CREATE TABLE part_t (
    f0 INT,
    f1 STRING,
    dt STRING
) PARTITIONED BY (dt);

INSERT INTO part_t VALUES (1, '11', '20240514');

The file system will be:

part_t
├── dt=20240514
│   └── bucket-0
│       └── data-ca1c3c38-dc8d-4533-949b-82e195b41bd4-0.orc
├── manifest
│   ├── manifest-08995fe5-c2ac-4f54-9a5f-d3af1fcde41d-0
│   ├── manifest-list-51c16f7b-421c-4bc0-80a0-17677f343358-0
│   └── manifest-list-51c16f7b-421c-4bc0-80a0-17677f343358-1
├── schema
│   └── schema-0
└── snapshot
    ├── EARLIEST
    ├── LATEST
    └── snapshot-1

Paimon adopts the same partitioning concept as Apache Hive to separate data. The files of the partition will be placed in a separate partition directory.

Bucket #

The storage of all Paimon tables relies on buckets, and data files are stored in the bucket directory. The relationship between various table types and buckets in Paimon:

  1. Primary Key Table:
    1. bucket = -1: Default mode, the dynamic bucket mode records which bucket the key corresponds to through the index files. The index records the correspondence between the hash value of the primary-key and the bucket.
    2. bucket = 10: The data is distributed to the corresponding buckets according to the hash value of bucket key ( default is primary key).
  2. Append Table:
    1. bucket = -1: Default mode, ignoring bucket concept, although all data is written to bucket-0, the parallelism of reads and writes is unrestricted.
    2. bucket = 10: You need to define bucket-key too, the data is distributed to the corresponding buckets according to the hash value of bucket key.

Data File #

The name of data file is data-${uuid}-${id}.${format}. For the append table, the file stores the data of the table without adding any new columns. But for the primary key table, each row of data stores additional system columns:

  1. _VALUE_KIND: row is deleted or added. Similar to RocksDB, each row of data can be deleted or added, which will be used for updating the primary key table.
  2. _SEQUENCE_NUMBER: this number is used for comparison during updates, determining which data came first and which data came later.
  3. _KEY_ prefix to key columns, this is to avoid conflicts with columns of the table.

Changelog File #

Changelog file and Data file are exactly the same, it only takes effect on the primary key table. It is similar to the Binlog in a database, recording changes to the data in the table.

Global Index #

Global Index is in the index directory, currently, only two places will use global index:

  1. bucket = -1 + primary key table: in dynamic bucket mode, the index records the correspondence between the hash value of the primary-key and the bucket, each bucket has an index file.
  2. Deletion Vectors: index stores the deletion file, and each bucket has a deletion file.

Data File Index #

Define file-index.bloom-filter.columns, Paimon will create its corresponding index file for each file. If the index file is too small, it will be stored directly in the manifest, or in the directory of the data file. Each data file corresponds to an index file, which has a separate file definition and can contain different types of indexes with multiple columns.

Edit This Page
Copyright © 2024 The Apache Software Foundation. Apache Paimon, Paimon, and its feather logo are trademarks of The Apache Software Foundation.