Configuration #
CoreOptions #
Core options for paimon.
Key | Default | Type | Description |
---|---|---|---|
auto-create |
false | Boolean | Whether to create underlying storage when reading and writing the table. |
bucket |
-1 | Integer | Bucket number for file store. It should either be equal to -1 (dynamic bucket mode), or it must be greater than 0 (fixed bucket mode). |
bucket-key |
(none) | String | Specify the paimon distribution policy. Data is assigned to each bucket according to the hash value of bucket-key. If you specify multiple fields, delimiter is ','. If not specified, the primary key will be used; if there is no primary key, the full row will be used. |
cache-page-size |
64 kb | MemorySize | Memory page size for caching. |
changelog-producer |
none | Enum |
Whether to double write to a changelog file. This changelog file keeps the details of data changes, it can be read directly during stream reads. This can be applied to tables with primary keys. Possible values:
|
changelog-producer.row-deduplicate |
false | Boolean | Whether to generate -U, +U changelog for the same record. This configuration is only valid for the changelog-producer is lookup or full-compaction. |
changelog.num-retained.max |
(none) | Integer | The maximum number of completed changelog to retain. Should be greater than or equal to the minimum number. |
changelog.num-retained.min |
(none) | Integer | The minimum number of completed changelog to retain. Should be greater than or equal to 1. |
changelog.time-retained |
(none) | Duration | The maximum time of completed changelog to retain. |
commit.callback.#.param |
(none) | String | Parameter string for the constructor of class #. Callback class should parse the parameter by itself. |
commit.callbacks |
(none) | String | A list of commit callback classes to be called after a successful commit. Class names are connected with comma (example: com.test.CallbackA,com.sample.CallbackB). |
commit.force-compact |
false | Boolean | Whether to force a compaction before commit. |
commit.force-create-snapshot |
false | Boolean | Whether to force create snapshot on commit. |
compaction.max-size-amplification-percent |
200 | Integer | The size amplification is defined as the amount (in percentage) of additional storage needed to store a single byte of data in the merge tree for changelog mode table. |
compaction.max.file-num |
50 | Integer | For file set [f_0,...,f_N], the maximum file number to trigger a compaction for append-only table, even if sum(size(f_i)) < targetFileSize. This value avoids pending too much small files, which slows down the performance. |
compaction.min.file-num |
5 | Integer | For file set [f_0,...,f_N], the minimum file number which satisfies sum(size(f_i)) >= targetFileSize to trigger a compaction for append-only table. This value avoids almost-full-file to be compacted, which is not cost-effective. |
compaction.optimization-interval |
(none) | Duration | Implying how often to perform an optimization compaction, this configuration is used to ensure the query timeliness of the read-optimized system table. |
compaction.size-ratio |
1 | Integer | Percentage flexibility while comparing sorted run size for changelog mode table. If the candidate sorted run(s) size is 1% smaller than the next sorted run's size, then include next sorted run into this candidate set. |
consumer-id |
(none) | String | Consumer id for recording the offset of consumption in the storage. |
consumer.expiration-time |
(none) | Duration | The expiration interval of consumer files. A consumer file will be expired if it's lifetime after last modification is over this value. |
consumer.ignore-progress |
false | Boolean | Whether to ignore consumer progress for the newly started job. |
consumer.mode |
exactly-once | Enum |
Specify the consumer consistency mode for table. Possible values:
|
continuous.discovery-interval |
10 s | Duration | The discovery interval of continuous reading. |
cross-partition-upsert.bootstrap-parallelism |
10 | Integer | The parallelism for bootstrap in a single task for cross partition upsert. |
cross-partition-upsert.index-ttl |
(none) | Duration | The TTL in rocksdb index for cross partition upsert (primary keys not contain all partition fields), this can avoid maintaining too many indexes and lead to worse and worse performance, but please note that this may also cause data duplication. |
delete.force-produce-changelog |
false | Boolean | Force produce changelog in delete sql, or you can use 'streaming-read-overwrite' to read changelog from overwrite commit. |
deletion-vectors.enabled |
false | Boolean | Whether to enable deletion vectors mode. In this mode, index files containing deletion vectors are generated when data is written, which marks the data for deletion. During read operations, by applying these index files, merging can be avoided. |
dynamic-bucket.assigner-parallelism |
(none) | Integer | Parallelism of assigner operator for dynamic bucket mode, it is related to the number of initialized bucket, too small will lead to insufficient processing speed of assigner. |
dynamic-bucket.initial-buckets |
(none) | Integer | Initial buckets for a partition in assigner operator for dynamic bucket mode. |
dynamic-bucket.target-row-num |
2000000 | Long | If the bucket is -1, for primary key table, is dynamic bucket mode, this option controls the target row number for one bucket. |
dynamic-partition-overwrite |
true | Boolean | Whether only overwrite dynamic partition when overwriting a partitioned table with dynamic partition columns. Works only when the table has partition keys. |
file-index.in-manifest-threshold |
500 bytes | MemorySize | The threshold to store file index bytes in manifest. |
file-index.read.enabled |
true | Boolean | Whether enabled read file index. |
file-reader-async-threshold |
10 mb | MemorySize | The threshold for read file async. |
file.compression |
(none) | String | Default file compression format, orc is lz4 and parquet is snappy. It can be overridden by file.compression.per.level |
file.compression.per.level |
Map | Define different compression policies for different level, you can add the conf like this: 'file.compression.per.level' = '0:lz4,1:zstd'. | |
file.format |
orc | Enum |
Specify the message format of data files, currently orc, parquet and avro are supported. Possible values:
|
file.format.per.level |
Map | Define different file format for different level, you can add the conf like this: 'file.format.per.level' = '0:avro,3:parquet', if the file format for level is not provided, the default format which set by `file.format` will be used. | |
full-compaction.delta-commits |
(none) | Integer | Full compaction will be constantly triggered after delta commits. |
ignore-delete |
false | Boolean | Whether to ignore delete records. |
incremental-between |
(none) | String | Read incremental changes between start snapshot (exclusive) and end snapshot, for example, '5,10' means changes between snapshot 5 and snapshot 10. |
incremental-between-scan-mode |
auto | Enum |
Scan kind when Read incremental changes between start snapshot (exclusive) and end snapshot. Possible values:
|
incremental-between-timestamp |
(none) | String | Read incremental changes between start timestamp (exclusive) and end timestamp, for example, 't1,t2' means changes between timestamp t1 and timestamp t2. |
local-merge-buffer-size |
(none) | MemorySize | Local merge will buffer and merge input records before they're shuffled by bucket and written into sink. The buffer will be flushed when it is full. Mainly to resolve data skew on primary keys. We recommend starting with 64 mb when trying out this feature. |
local-sort.max-num-file-handles |
128 | Integer | The maximal fan-in for external merge sort. It limits the number of file handles. If it is too small, may cause intermediate merging. But if it is too large, it will cause too many files opened at the same time, consume memory and lead to random reading. |
lookup.cache-file-retention |
1 h | Duration | The cached files retention time for lookup. After the file expires, if there is a need for access, it will be re-read from the DFS to build an index on the local disk. |
lookup.cache-max-disk-size |
infinite | MemorySize | Max disk size for lookup cache, you can use this option to limit the use of local disks. |
lookup.cache-max-memory-size |
256 mb | MemorySize | Max memory size for lookup cache. |
lookup.cache-spill-compression |
"lz4" | String | Spill compression for lookup cache, currently none, lz4, lzo and zstd are supported. |
lookup.cache.bloom.filter.enabled |
true | Boolean | Whether to enable the bloom filter for lookup cache. |
lookup.cache.bloom.filter.fpp |
0.05 | Double | Define the default false positive probability for lookup cache bloom filters. |
lookup.hash-load-factor |
0.75 | Float | The index load factor for lookup. |
manifest.format |
avro | Enum |
Specify the message format of manifest files. Possible values:
|
manifest.full-compaction-threshold-size |
16 mb | MemorySize | The size threshold for triggering full compaction of manifest. |
manifest.merge-min-count |
30 | Integer | To avoid frequent manifest merges, this parameter specifies the minimum number of ManifestFileMeta to merge. |
manifest.target-file-size |
8 mb | MemorySize | Suggested file size of a manifest file. |
merge-engine |
deduplicate | Enum |
Specify the merge engine for table with primary key. Possible values:
|
metadata.stats-mode |
"truncate(16)" | String | The mode of metadata stats collection. none, counts, truncate(16), full is available.
|
metastore.partitioned-table |
false | Boolean | Whether to create this table as a partitioned table in metastore. For example, if you want to list all partitions of a Paimon table in Hive, you need to create this table as a partitioned table in Hive metastore. This config option does not affect the default filesystem metastore. |
metastore.tag-to-partition |
(none) | String | Whether to create this table as a partitioned table for mapping non-partitioned table tags in metastore. This allows the Hive engine to view this table in a partitioned table view and use partitioning field to read specific partitions (specific tags). |
metastore.tag-to-partition.preview |
none | Enum |
Whether to preview tag of generated snapshots in metastore. This allows the Hive engine to query specific tag before creation. Possible values:
|
num-levels |
(none) | Integer | Total level number, for example, there are 3 levels, including 0,1,2 levels. |
num-sorted-run.compaction-trigger |
5 | Integer | The sorted run number to trigger compaction. Includes level0 files (one file one sorted run) and high-level runs (one level one sorted run). |
num-sorted-run.stop-trigger |
(none) | Integer | The number of sorted runs that trigger the stopping of writes, the default value is 'num-sorted-run.compaction-trigger' + 3. |
page-size |
64 kb | MemorySize | Memory page size. |
parquet.enable.dictionary |
(none) | Integer | Turn off the dictionary encoding for all fields in parquet. |
partition |
(none) | String | Define partition by table options, cannot define partition on DDL and table options at the same time. |
partition.default-name |
"__DEFAULT_PARTITION__" | String | The default partition name in case the dynamic partition column value is null/empty string. |
partition.expiration-check-interval |
1 h | Duration | The check interval of partition expiration. |
partition.expiration-time |
(none) | Duration | The expiration interval of a partition. A partition will be expired if it‘s lifetime is over this value. Partition time is extracted from the partition value. |
partition.timestamp-formatter |
(none) | String | The formatter to format timestamp from string. It can be used with 'partition.timestamp-pattern' to create a formatter using the specified value.
|
partition.timestamp-pattern |
(none) | String | You can specify a pattern to get a timestamp from partitions. The formatter pattern is defined by 'partition.timestamp-formatter'.
|
primary-key |
(none) | String | Define primary key by table options, cannot define primary key on DDL and table options at the same time. |
read.batch-size |
1024 | Integer | Read batch size for orc and parquet. |
record-level.expire-time |
(none) | Duration | Record level expire time for primary key table, expiration happens in compaction, there is no strong guarantee to expire records in time. You must specific 'record-level.time-field' too. |
record-level.time-field |
(none) | String | Time field for record level expire, it should be a seconds INT. |
rowkind.field |
(none) | String | The field that generates the row kind for primary key table, the row kind determines which data is '+I', '-U', '+U' or '-D'. |
scan.bounded.watermark |
(none) | Long | End condition "watermark" for bounded streaming mode. Stream reading will end when a larger watermark snapshot is encountered. |
scan.file-creation-time-millis |
(none) | Long | After configuring this time, only the data files created after this time will be read. It is independent of snapshots, but it is imprecise filtering (depending on whether or not compaction occurs). |
scan.manifest.parallelism |
(none) | Integer | The parallelism of scanning manifest files, default value is the size of cpu processor. Note: Scale-up this parameter will increase memory usage while scanning manifest files. We can consider downsize it when we encounter an out of memory exception while scanning |
scan.max-splits-per-task |
10 | Integer | Max split size should be cached for one task while scanning. If splits size cached in enumerator are greater than tasks size multiply by this value, scanner will pause scanning. |
scan.mode |
default | Enum |
Specify the scanning behavior of the source. Possible values:
|
scan.plan-sort-partition |
false | Boolean | Whether to sort plan files by partition fields, this allows you to read according to the partition order, even if your partition writes are out of order. It is recommended that you use this for streaming read of the 'append-only' table. By default, streaming read will read the full snapshot first. In order to avoid the disorder reading for partitions, you can open this option. |
scan.snapshot-id |
(none) | Long | Optional snapshot id used in case of "from-snapshot" or "from-snapshot-full" scan mode |
scan.tag-name |
(none) | String | Optional tag name used in case of "from-snapshot" scan mode. |
scan.timestamp-millis |
(none) | Long | Optional timestamp used in case of "from-timestamp" scan mode. If there is no snapshot earlier than this time, the earliest snapshot will be chosen. |
scan.watermark |
(none) | Long | Optional watermark used in case of "from-snapshot" scan mode. If there is no snapshot later than this watermark, will throw an exceptions. |
sequence.field |
(none) | String | The field that generates the sequence number for primary key table, the sequence number determines which data is the most recent. |
sink.watermark-time-zone |
"UTC" | String | The time zone to parse the long watermark value to TIMESTAMP value. The default value is 'UTC', which means the watermark is defined on TIMESTAMP column or not defined. If the watermark is defined on TIMESTAMP_LTZ column, the time zone of watermark is user configured time zone, the value should be the user configured local time zone. The option value is either a full name such as 'America/Los_Angeles', or a custom timezone id such as 'GMT-08:00'. |
snapshot.expire.clean-empty-directories |
true | Boolean | Whether to try to clean empty directories when expiring snapshots. Note that trying to clean directories might throw exceptions in filesystem, but in most cases it won't cause problems. |
snapshot.expire.execution-mode |
sync | Enum |
Specifies the execution mode of expire. Possible values:
|
snapshot.expire.limit |
10 | Integer | The maximum number of snapshots allowed to expire at a time. |
snapshot.num-retained.max |
infinite | Integer | The maximum number of completed snapshots to retain. Should be greater than or equal to the minimum number. |
snapshot.num-retained.min |
10 | Integer | The minimum number of completed snapshots to retain. Should be greater than or equal to 1. |
snapshot.time-retained |
1 h | Duration | The maximum time of completed snapshots to retain. |
snapshot.watermark-idle-timeout |
(none) | Duration | In watermarking, if a source remains idle beyond the specified timeout duration, it triggers snapshot advancement and facilitates tag creation. |
sort-compaction.local-sample.magnification |
1000 | Integer | The magnification of local sample for sort-compaction.The size of local sample is sink parallelism * magnification. |
sort-compaction.range-strategy |
QUANTITY | Enum |
The range strategy of sort compaction, the default value is quantity.
If the data size allocated for the sorting task is uneven,which may lead to performance bottlenecks, the config can be set to size. Possible values:
|
sort-engine |
loser-tree | Enum |
Specify the sort engine for table with primary key. Possible values:
|
sort-spill-buffer-size |
64 mb | MemorySize | Amount of data to spill records to disk in spilled sort. |
sort-spill-threshold |
(none) | Integer | If the maximum number of sort readers exceeds this value, a spill will be attempted. This prevents too many readers from consuming too much memory and causing OOM. |
source.split.open-file-cost |
4 mb | MemorySize | Open file cost of a source file. It is used to avoid reading too many files with a source split, which can be very slow. |
source.split.target-size |
128 mb | MemorySize | Target size of a source split when scanning a bucket. |
spill-compression |
"LZ4" | String | Compression for spill, currently lz4, lzo and zstd are supported. |
streaming-read-mode |
(none) | Enum |
The mode of streaming read that specifies to read the data of table file or log Possible values:
Possible values:
|
streaming-read-overwrite |
false | Boolean | Whether to read the changes from overwrite in streaming mode. Cannot be set to true when changelog producer is full-compaction or lookup because it will read duplicated changes. |
tag.automatic-creation |
none | Enum |
Whether to create tag automatically. And how to generate tags. Possible values:
|
tag.callback.#.param |
(none) | String | Parameter string for the constructor of class #. Callback class should parse the parameter by itself. |
tag.callbacks |
(none) | String | A list of commit callback classes to be called after a successful tag. Class names are connected with comma (example: com.test.CallbackA,com.sample.CallbackB). |
tag.creation-delay |
0 ms | Duration | How long is the delay after the period ends before creating a tag. This can allow some late data to enter the Tag. |
tag.creation-period |
daily | Enum |
What frequency is used to generate tags. Possible values:
|
tag.default-time-retained |
(none) | Duration | The default maximum time retained for newly created tags. It affects both auto-created tags and manually created (by procedure) tags. |
tag.num-retained-max |
(none) | Integer | The maximum number of tags to retain. It only affects auto-created tags. |
tag.period-formatter |
with_dashes | Enum |
The date format for tag periods. Possible values:
|
target-file-size |
128 mb | MemorySize | Target size of a file. |
write-buffer-for-append |
false | Boolean | This option only works for append-only table. Whether the write use write buffer to avoid out-of-memory error. |
write-buffer-size |
256 mb | MemorySize | Amount of data to build up in memory before converting to a sorted on-disk file. |
write-buffer-spill.max-disk-size |
infinite | MemorySize | The max disk to use for write buffer spill. This only work when the write buffer spill is enabled |
write-buffer-spillable |
(none) | Boolean | Whether the write buffer can be spillable. Enabled by default when using object storage. |
write-manifest-cache |
0 bytes | MemorySize | Cache size for reading manifest files for write initialization. |
write-max-writers-to-spill |
5 | Integer | When in batch append inserting, if the writer number is greater than this option, we open the buffer cache and spill function to avoid out-of-memory. |
write-only |
false | Boolean | If set to true, compactions and snapshot expiration will be skipped. This option is used along with dedicated compact jobs. |
zorder.var-length-contribution |
8 | Integer | The bytes of types (CHAR, VARCHAR, BINARY, VARBINARY) devote to the zorder sort. |
CatalogOptions #
Options for paimon catalog.
Key | Default | Type | Description |
---|---|---|---|
client-pool-size |
2 | Integer | Configure the size of the connection pool. |
fs.allow-hadoop-fallback |
true | Boolean | Allow to fallback to hadoop File IO when no file io found for the scheme. |
lineage-meta |
(none) | String | The lineage meta to store table and data lineage information. Possible values:
|
lock-acquire-timeout |
8 min | Duration | The maximum time to wait for acquiring the lock. |
lock-check-max-sleep |
8 s | Duration | The maximum sleep time when retrying to check the lock. |
lock.enabled |
false | Boolean | Enable Catalog Lock. |
lock.type |
(none) | String | The Lock Type for Catalog, such as 'hive', 'zookeeper'. |
metastore |
"filesystem" | String | Metastore of paimon catalog, supports filesystem, hive and jdbc. |
table.type |
managed | Enum |
Type of table. Possible values:
|
uri |
(none) | String | Uri of metastore server. |
warehouse |
(none) | String | The warehouse root path of catalog. |
FilesystemCatalogOptions #
Options for Filesystem catalog.
Key | Default | Type | Description |
---|---|---|---|
case-sensitive |
true | Boolean | Is case sensitive. If case insensitive, you need to set this option to false, and the table name and fields be converted to lowercase. |
HiveCatalogOptions #
Options for Hive catalog.
Key | Default | Type | Description |
---|---|---|---|
hadoop-conf-dir |
(none) | String | File directory of the core-site.xml、hdfs-site.xml、yarn-site.xml、mapred-site.xml. Currently, only local file system paths are supported. If not configured, try to load from 'HADOOP_CONF_DIR' or 'HADOOP_HOME' system environment. Configure Priority: 1.from 'hadoop-conf-dir' 2.from HADOOP_CONF_DIR 3.from HADOOP_HOME/conf 4.HADOOP_HOME/etc/hadoop. |
hive-conf-dir |
(none) | String | File directory of the hive-site.xml , used to create HiveMetastoreClient and security authentication, such as Kerberos, LDAP, Ranger and so on. If not configured, try to load from 'HIVE_CONF_DIR' env. |
location-in-properties |
false | Boolean | Setting the location in properties of hive table/database. If you don't want to access the location by the filesystem of hive when using a object storage such as s3,oss you can set this option to true. |
JdbcCatalogOptions #
Options for Jdbc catalog.
Key | Default | Type | Description |
---|---|---|---|
catalog-key |
"jdbc" | String | Custom jdbc catalog store key. |
lock-key-max-length |
255 | Integer | Set the maximum length of the lock key. The 'lock-key' is composed of concatenating three fields : 'catalog-key', 'database', and 'table'. |
FlinkCatalogOptions #
Flink catalog options for paimon.
Key | Default | Type | Description |
---|---|---|---|
default-database |
"default" | String | |
disable-create-table-in-default-db |
false | Boolean | If true, creating table in default database is not allowed. Default is false. |
FlinkConnectorOptions #
Flink connector options for paimon.
Key | Default | Type | Description |
---|---|---|---|
changelog-producer.lookup-wait |
true | Boolean | When changelog-producer is set to LOOKUP, commit will wait for changelog generation by lookup. |
lookup.async |
false | Boolean | Whether to enable async lookup join. |
lookup.async-thread-number |
16 | Integer | The thread number for lookup async. |
lookup.bootstrap-parallelism |
4 | Integer | The parallelism for bootstrap in a single task for lookup join. |
lookup.cache |
AUTO | Enum |
The cache mode of lookup join. Possible values:
|
lookup.dynamic-partition |
(none) | String | Specific dynamic partition for lookup, only support 'max_pt()' currently. |
lookup.dynamic-partition.refresh-interval |
1 h | Duration | Specific dynamic partition refresh interval for lookup, scan all partitions and obtain corresponding partition. |
scan.infer-parallelism |
true | Boolean | If it is false, parallelism of source are set by global parallelism. Otherwise, source parallelism is inferred from splits number (batch mode) or bucket number(streaming mode). |
scan.infer-parallelism.max |
1024 | Integer | If scan.infer-parallelism is true, limit the parallelism of source through this option. |
scan.parallelism |
(none) | Integer | Define a custom parallelism for the scan source. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration. If user enable the scan.infer-parallelism, the planner will derive the parallelism by inferred parallelism. |
scan.push-down |
true | Boolean | If true, flink will push down projection, filters, limit to the source. The cost is that it is difficult to reuse the source in a job. With flink 1.18 or higher version, it is possible to reuse the source even with projection push down. |
scan.remove-normalize |
false | Boolean | Whether to force the removal of the normalize node when streaming read. Note: This is dangerous and is likely to cause data errors if downstream is used to calculate aggregation and the input is not complete changelog. |
scan.split-enumerator.batch-size |
10 | Integer | How many splits should assign to subtask per batch in StaticFileStoreSplitEnumerator to avoid exceed `akka.framesize` limit. |
scan.split-enumerator.mode |
fair | Enum |
The mode used by StaticFileStoreSplitEnumerator to assign splits. Possible values:
|
scan.watermark.alignment.group |
(none) | String | A group of sources to align watermarks. |
scan.watermark.alignment.max-drift |
(none) | Duration | Maximal drift to align watermarks, before we pause consuming from the source/task/partition. |
scan.watermark.alignment.update-interval |
1 s | Duration | How often tasks should notify coordinator about the current watermark and how often the coordinator should announce the maximal aligned watermark. |
scan.watermark.emit.strategy |
on-event | Enum |
Emit strategy for watermark generation. Possible values:
|
scan.watermark.idle-timeout |
(none) | Duration | If no records flow in a partition of a stream for that amount of time, then that partition is considered "idle" and will not hold back the progress of watermarks in downstream operators. |
sink.committer-cpu |
1.0 | Double | Sink committer cpu to control cpu cores of global committer. |
sink.committer-memory |
(none) | MemorySize | Sink committer memory to control heap memory of global committer. |
sink.cross-partition.managed-memory |
256 mb | MemorySize | Weight of managed memory for RocksDB in cross-partition update, Flink will compute the memory size according to the weight, the actual memory used depends on the running environment. |
sink.managed.writer-buffer-memory |
256 mb | MemorySize | Weight of writer buffer in managed memory, Flink will compute the memory size for writer according to the weight, the actual memory used depends on the running environment. |
sink.parallelism |
(none) | Integer | Defines a custom parallelism for the sink. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration. |
sink.savepoint.auto-tag |
false | Boolean | If true, a tag will be automatically created for the snapshot created by flink savepoint. |
sink.use-managed-memory-allocator |
false | Boolean | If true, flink sink will use managed memory for merge tree; otherwise, it will create an independent memory allocator. |
source.checkpoint-align.enabled |
false | Boolean | Whether to align the flink checkpoint with the snapshot of the paimon table, If true, a checkpoint will only be made if a snapshot is consumed. |
source.checkpoint-align.timeout |
30 s | Duration | If the new snapshot has not been generated when the checkpoint starts to trigger, the enumerator will block the checkpoint and wait for the new snapshot. Set the maximum waiting time to avoid infinite waiting, if timeout, the checkpoint will fail. Note that it should be set smaller than the checkpoint timeout. |
unaware-bucket.compaction.parallelism |
(none) | Integer | Defines a custom parallelism for the unaware-bucket table compaction job. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration. |
SparkCatalogOptions #
Spark catalog options for paimon.
Key | Default | Type | Description |
---|---|---|---|
catalog.create-underlying-session-catalog |
false | Boolean | If true, create and use an underlying session catalog instead of default session catalog when use SparkGenericCatalog. |
defaultDatabase |
"default" | String | The default database name. |
SparkConnectorOptions #
Spark connector options for paimon.
Key | Default | Type | Description |
---|---|---|---|
read.changelog |
false | Boolean | Whether to read row in the form of changelog (add rowkind column in row to represent its change type). |
read.stream.maxBytesPerTrigger |
(none) | Long | The maximum number of bytes returned in a single batch. |
read.stream.maxFilesPerTrigger |
(none) | Integer | The maximum number of files returned in a single batch. |
read.stream.maxRowsPerTrigger |
(none) | Long | The maximum number of rows returned in a single batch. |
read.stream.maxTriggerDelayMs |
(none) | Long | The maximum delay between two adjacent batches, which used to create MinRowsReadLimit with read.stream.minRowsPerTrigger together. |
read.stream.minRowsPerTrigger |
(none) | Long | The minimum number of rows returned in a single batch, which used to create MinRowsReadLimit with read.stream.maxTriggerDelayMs together. |
write.merge-schema |
false | Boolean | If true, merge the data schema and the table schema automatically before write data. |
write.merge-schema.explicit-cast |
false | Boolean | If true, allow to merge data types if the two types meet the rules for explicit casting. |
ORC Options #
Key | Default | Type | Description |
---|---|---|---|
orc.column.encoding.direct |
(none) | Integer | Comma-separated list of fields for which dictionary encoding is to be skipped in orc. |
orc.compress |
"lz4" | String | Define the compression codec for ORC file, if a higher compression ratio is required, it is recommended to configure it as 'zstd', and you can configure: orc.compression.zstd.level |
orc.compression.zstd.level |
3 | Integer | Define the compression level to use with ZStandard codec while writing data. The valid range is 1~22. |
orc.dictionary.key.threshold |
0.8 | Double | If the number of distinct keys in a dictionary is greater than this fraction of the total number of non-null rows, turn off dictionary encoding in orc. Use 0 to always disable dictionary encoding. Use 1 to always use dictionary encoding. |
orc.write.batch-size |
1024 | Integer | write batch size for orc. |
RocksDB Options #
The following options allow users to finely adjust RocksDB for better performance. You can either specify them in table properties or in dynamic table hints.
Key | Default | Type | Description |
---|---|---|---|
lookup.cache-rows |
10000 | Long | The maximum number of rows to store in the cache. |
lookup.continuous.discovery-interval |
(none) | Duration | The discovery interval of lookup continuous reading. This is used as an SQL hint. If it's not configured, the lookup function will fallback to 'continuous.discovery-interval'. |
rocksdb.block.blocksize |
4 kb | MemorySize | The approximate size (in bytes) of user data packed per block. The default blocksize is '4KB'. |
rocksdb.block.cache-size |
128 mb | MemorySize | The amount of the cache for data blocks in RocksDB. |
rocksdb.block.metadata-blocksize |
4 kb | MemorySize | Approximate size of partitioned metadata packed per block. Currently applied to indexes block when partitioned index/filters option is enabled. The default blocksize is '4KB'. |
rocksdb.bloom-filter.bits-per-key |
10.0 | Double | Bits per key that bloom filter will use, this only take effect when bloom filter is used. The default value is 10.0. |
rocksdb.bloom-filter.block-based-mode |
false | Boolean | If true, RocksDB will use block-based filter instead of full filter, this only take effect when bloom filter is used. The default value is 'false'. |
rocksdb.compaction.level.max-size-level-base |
256 mb | MemorySize | The upper-bound of the total size of level base files in bytes. The default value is '256MB'. |
rocksdb.compaction.level.target-file-size-base |
64 mb | MemorySize | The target file size for compaction, which determines a level-1 file size. The default value is '64MB'. |
rocksdb.compaction.level.use-dynamic-size |
false | Boolean | If true, RocksDB will pick target size of each level dynamically. From an empty DB, RocksDB would make last level the base level, which means merging L0 data into the last level, until it exceeds max_bytes_for_level_base. And then repeat this process for second last level and so on. The default value is 'false'. For more information, please refer to RocksDB's doc. |
rocksdb.compaction.style |
LEVEL | Enum |
The specified compaction style for DB. Candidate compaction style is LEVEL, FIFO, UNIVERSAL or NONE, and Flink chooses 'LEVEL' as default style. Possible values:
|
rocksdb.compression.type |
LZ4_COMPRESSION | Enum |
The compression type. Possible values:
|
rocksdb.files.open |
-1 | Integer | The maximum number of open files (per stateful operator) that can be used by the DB, '-1' means no limit. The default value is '-1'. |
rocksdb.thread.num |
2 | Integer | The maximum number of concurrent background flush and compaction jobs (per stateful operator). The default value is '2'. |
rocksdb.use-bloom-filter |
false | Boolean | If true, every newly created SST file will contain a Bloom filter. It is disabled by default. |
rocksdb.writebuffer.count |
2 | Integer | The maximum number of write buffers that are built up in memory. The default value is '2'. |
rocksdb.writebuffer.number-to-merge |
1 | Integer | The minimum number of write buffers that will be merged together before writing to storage. The default value is '1'. |
rocksdb.writebuffer.size |
64 mb | MemorySize | The amount of data built up in memory (backed by an unsorted log on disk) before converting to a sorted on-disk files. The default writebuffer size is '64MB'. |