Java-based Implementation For Python API #
Python SDK has defined Python API for Paimon. Currently, there is only a Java-based implementation.
Java-based implementation will launch a JVM and use py4j to execute Java code to read and write Paimon table.
Environment Settings #
SDK Installing #
SDK is published at pypaimon. You can install by
pip install pypaimon
Java Runtime Environment #
This SDK needs JRE 1.8. After installing JRE, make sure that at least one of the following conditions is met:
javacommand is available. You can verify it byjava -version.JAVA_HOMEandPATHvariables are set correctly. For example, you can set:
export JAVA_HOME=/path/to/java-directory
export PATH=$JAVA_HOME/bin:$PATH
Set Environment Variables #
Because we need to launch a JVM to access Java code, JVM environment need to be set. Besides, the java code need Hadoop dependencies, so hadoop environment should be set.
Java classpath #
The package has set necessary paimon core dependencies (Local/Hadoop FileIO, Avro/Orc/Parquet format support and FileSystem/Jdbc/Hive catalog), so If you just test codes in local or in hadoop environment, you don’t need to set classpath.
If you need other dependencies such as OSS/S3 filesystem jars, or special format and catalog ,please prepare jars and set classpath via one of the following ways:
- Set system environment variable:
export _PYPAIMON_JAVA_CLASSPATH=/path/to/jars/* - Set environment variable in Python code:
import os
from pypaimon.py4j import constants
os.environ[constants.PYPAIMON_JAVA_CLASSPATH] = '/path/to/jars/*'
JVM args (optional) #
You can set JVM args via one of the following ways:
- Set system environment variable:
export _PYPAIMON_JVM_ARGS='arg1 arg2 ...' - Set environment variable in Python code:
import os
from pypaimon.py4j import constants
os.environ[constants.PYPAIMON_JVM_ARGS] = 'arg1 arg2 ...'
Hadoop classpath #
If the machine is in a hadoop environment, please ensure the value of the environment variable HADOOP_CLASSPATH include path to the common Hadoop libraries, then you don’t need to set hadoop.
Otherwise, you should set hadoop classpath via one of the following ways:
- Set system environment variable:
export _PYPAIMON_HADOOP_CLASSPATH=/path/to/jars/* - Set environment variable in Python code:
import os
from pypaimon.py4j import constants
os.environ[constants.PYPAIMON_HADOOP_CLASSPATH] = '/path/to/jars/*'
If you just want to test codes in local, we recommend to use Flink Pre-bundled hadoop jar.
Create Catalog #
Before coming into contact with the Table, you need to create a Catalog.
from pypaimon.py4j import Catalog
# Note that keys and values are all string
catalog_options = {
'metastore': 'filesystem',
'warehouse': 'file:///path/to/warehouse'
}
catalog = Catalog.create(catalog_options)
Create Database & Table #
You can use the catalog to create table for writing data.
Create Database (optional) #
Table is located in a database. If you want to create table in a new database, you should create it.
catalog.create_database(
name='database_name',
ignore_if_exists=True, # If you want to raise error if the database exists, set False
properties={'key': 'value'} # optional database properties
)
Create Schema #
Table schema contains fields definition, partition keys, primary keys, table options and comment.
The field definition is described by pyarrow.Schema. All arguments except fields definition are optional.
Generally, there are two ways to build pyarrow.Schema.
First, you can use pyarrow.schema method directly, for example:
import pyarrow as pa
from pypaimon import Schema
pa_schema = pa.schema([
('dt', pa.string()),
('hh', pa.string()),
('pk', pa.int64()),
('value', pa.string())
])
schema = Schema(
pa_schema=pa_schema,
partition_keys=['dt', 'hh'],
primary_keys=['dt', 'hh', 'pk'],
options={'bucket': '2'},
comment='my test table'
)
See Data Types for all supported pyarrow-to-paimon data types mapping.
Second, if you have some Pandas data, the pa_schema can be extracted from DataFrame:
import pandas as pd
import pyarrow as pa
from pypaimon import Schema
# Example DataFrame data
data = {
'dt': ['2024-01-01', '2024-01-01', '2024-01-02'],
'hh': ['12', '15', '20'],
'pk': [1, 2, 3],
'value': ['a', 'b', 'c'],
}
dataframe = pd.DataFrame(data)
# Get Paimon Schema
record_batch = pa.RecordBatch.from_pandas(dataframe)
schema = Schema(
pa_schema=record_batch.schema,
partition_keys=['dt', 'hh'],
primary_keys=['dt', 'hh', 'pk'],
options={'bucket': '2'},
comment='my test table'
)
Create Table #
After building table schema, you can create corresponding table:
schema = ...
catalog.create_table(
identifier='database_name.table_name',
schema=schema,
ignore_if_exists=True # If you want to raise error if the table exists, set False
)
Get Table #
The Table interface provides tools to read and write table.
table = catalog.get_table('database_name.table_name')
Batch Read #
Set Read Parallelism #
TableRead interface provides parallelly reading for multiple splits. You can set 'max-workers': 'N' in catalog_options
to set thread numbers for reading splits. max-workers is 1 by default, that means TableRead will read splits sequentially
if you doesn’t set max-workers.
Get ReadBuilder and Perform pushdown #
A ReadBuilder is used to build reading utils and perform filter and projection pushdown.
table = catalog.get_table('database_name.table_name')
read_builder = table.new_read_builder()
You can use PredicateBuilder to build filters and pushdown them by ReadBuilder:
# Example filter: ('f0' < 3 OR 'f1' > 6) AND 'f3' = 'A'
predicate_builder = read_builder.new_predicate_builder()
predicate1 = predicate_builder.less_than('f0', 3)
predicate2 = predicate_builder.greater_than('f1', 6)
predicate3 = predicate_builder.or_predicates([predicate1, predicate2])
predicate4 = predicate_builder.equal('f3', 'A')
predicate_5 = predicate_builder.and_predicates([predicate3, predicate4])
read_builder = read_builder.with_filter(predicate_5)
See Predicate for all supported filters and building methods.
You can also pushdown projection by ReadBuilder:
# select f3 and f2 columns
read_builder = read_builder.with_projection(['f3', 'f2'])
Scan Plan #
Then you can step into Scan Plan stage to get splits:
table_scan = read_builder.new_scan()
splits = table_scan.plan().splits()
Read Splits #
Finally, you can read data from the splits to various data format.
Apache Arrow #
This requires pyarrow to be installed.
You can read all the data into a pyarrow.Table:
table_read = read_builder.new_read()
pa_table = table_read.to_arrow(splits)
print(pa_table)
# pyarrow.Table
# f0: int32
# f1: string
# ----
# f0: [[1,2,3],[4,5,6],...]
# f1: [["a","b","c"],["d","e","f"],...]
You can also read data into a pyarrow.RecordBatchReader and iterate record batches:
table_read = read_builder.new_read()
for batch in table_read.to_arrow_batch_reader(splits):
print(batch)
# pyarrow.RecordBatch
# f0: int32
# f1: string
# ----
# f0: [1,2,3]
# f1: ["a","b","c"]
Pandas #
This requires pandas to be installed.
You can read all the data into a pandas.DataFrame:
table_read = read_builder.new_read()
df = table_read.to_pandas(splits)
print(df)
# f0 f1
# 0 1 a
# 1 2 b
# 2 3 c
# 3 4 d
# ...
DuckDB #
This requires duckdb to be installed.
You can convert the splits into an in-memory DuckDB table and query it:
table_read = read_builder.new_read()
duckdb_con = table_read.to_duckdb(splits, 'duckdb_table')
print(duckdb_con.query("SELECT * FROM duckdb_table").fetchdf())
# f0 f1
# 0 1 a
# 1 2 b
# 2 3 c
# 3 4 d
# ...
print(duckdb_con.query("SELECT * FROM duckdb_table WHERE f0 = 1").fetchdf())
# f0 f1
# 0 1 a
Ray #
This requires ray to be installed.
You can convert the splits into a Ray dataset and handle it by Ray API:
table_read = read_builder.new_read()
ray_dataset = table_read.to_ray(splits)
print(ray_dataset)
# MaterializedDataset(num_blocks=1, num_rows=9, schema={f0: int32, f1: string})
print(ray_dataset.take(3))
# [{'f0': 1, 'f1': 'a'}, {'f0': 2, 'f1': 'b'}, {'f0': 3, 'f1': 'c'}]
print(ray_dataset.to_pandas())
# f0 f1
# 0 1 a
# 1 2 b
# 2 3 c
# 3 4 d
# ...
Batch Write #
Paimon table write is Two-Phase Commit, you can write many times, but once committed, no more data can be written.
Currently, Python SDK doesn’t support writing primary key table with bucket=-1.
table = catalog.get_table('database_name.table_name')
# 1. Create table write and commit
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
# 2. Write data. Support 3 methods:
# 2.1 Write pandas.DataFrame
dataframe = ...
table_write.write_pandas(dataframe)
# 2.2 Write pyarrow.Table
pa_table = ...
table_write.write_arrow(pa_table)
# 2.3 Write pyarrow.RecordBatch
record_batch = ...
table_write.write_arrow_batch(record_batch)
# 3. Commit data
commit_messages = table_write.prepare_commit()
table_commit.commit(commit_messages)
# 4. Close resources
table_write.close()
table_commit.close()
By default, the data will be appended to table. If you want to overwrite table, you should use TableWrite#overwrite API:
# overwrite whole table
write_builder.overwrite()
# overwrite partition 'dt=2024-01-01'
write_builder.overwrite({'dt': '2024-01-01'})
Data Types #
| pyarrow | Paimon |
|---|---|
| pyarrow.int8() | TINYINT |
| pyarrow.int16() | SMALLINT |
| pyarrow.int32() | INT |
| pyarrow.int64() | BIGINT |
| pyarrow.float16() pyarrow.float32() |
FLOAT |
| pyarrow.float64() | DOUBLE |
| pyarrow.string() | STRING |
| pyarrow.boolean() | BOOLEAN |
Predicate #
| Predicate kind | Predicate method |
|---|---|
| p1 and p2 | PredicateBuilder.and_predicates([p1, p2]) |
| p1 or p2 | PredicateBuilder.or_predicates([p1, p2]) |
| f = literal | PredicateBuilder.equal(f, literal) |
| f != literal | PredicateBuilder.not_equal(f, literal) |
| f < literal | PredicateBuilder.less_than(f, literal) |
| f <= literal | PredicateBuilder.less_or_equal(f, literal) |
| f > literal | PredicateBuilder.greater_than(f, literal) |
| f >= literal | PredicateBuilder.greater_or_equal(f, literal) |
| f is null | PredicateBuilder.is_null(f) |
| f is not null | PredicateBuilder.is_not_null(f) |
| f.startswith(literal) | PredicateBuilder.startswith(f, literal) |
| f.endswith(literal) | PredicateBuilder.endswith(f, literal) |
| f.contains(literal) | PredicateBuilder.contains(f, literal) |
| f is in [l1, l2] | PredicateBuilder.is_in(f, [l1, l2]) |
| f is not in [l1, l2] | PredicateBuilder.is_not_in(f, [l1, l2]) |
| lower <= f <= upper | PredicateBuilder.between(f, lower, upper) |