This documentation is for an unreleased version of Apache Paimon. We recommend you use the latest stable version.
Python API #
PyPaimon is a Python implementation for connecting Paimon catalog, reading & writing tables. The complete Python implementation of the brand new PyPaimon does not require JDK installation.
Environment Settings #
SDK Installing #
SDK is published at pypaimon. You can install by
pip install pypaimon
Create Catalog #
Before coming into contact with the Table, you need to create a Catalog.
from pypaimon import CatalogFactory
# Note that keys and values are all string
catalog_options = {
'warehouse': 'file:///path/to/warehouse'
}
catalog = CatalogFactory.create(catalog_options)
Currently, PyPaimon only support filesystem catalog and rest catalog. See Catalog.
Create Database & Table #
You can use the catalog to create table for writing data.
Create Database (optional) #
Table is located in a database. If you want to create table in a new database, you should create it.
catalog.create_database(
name='database_name',
ignore_if_exists=True, # To raise error if the database exists, set False
properties={'key': 'value'} # optional database properties
)
Create Schema #
Table schema contains fields definition, partition keys, primary keys, table options and comment.
The field definition is described by pyarrow.Schema
. All arguments except fields definition are optional.
Generally, there are two ways to build pyarrow.Schema
.
First, you can use pyarrow.schema
method directly, for example:
import pyarrow as pa
from pypaimon import Schema
pa_schema = pa.schema([
('dt', pa.string()),
('hh', pa.string()),
('pk', pa.int64()),
('value', pa.string())
])
schema = Schema.from_pyarrow_schema(
pa_schema=pa_schema,
partition_keys=['dt', 'hh'],
primary_keys=['dt', 'hh', 'pk'],
options={'bucket': '2'},
comment='my test table')
See Data Types for all supported pyarrow-to-paimon
data types mapping.
Second, if you have some Pandas data, the pa_schema
can be extracted from DataFrame
:
import pandas as pd
import pyarrow as pa
from pypaimon import Schema
# Example DataFrame data
data = {
'dt': ['2024-01-01', '2024-01-01', '2024-01-02'],
'hh': ['12', '15', '20'],
'pk': [1, 2, 3],
'value': ['a', 'b', 'c'],
}
dataframe = pd.DataFrame(data)
# Get Paimon Schema
record_batch = pa.RecordBatch.from_pandas(dataframe)
schema = Schema.from_pyarrow_schema(
pa_schema=record_batch.schema,
partition_keys=['dt', 'hh'],
primary_keys=['dt', 'hh', 'pk'],
options={'bucket': '2'},
comment='my test table'
)
Create Table #
After building table schema, you can create corresponding table:
schema = ...
catalog.create_table(
identifier='database_name.table_name',
schema=schema,
ignore_if_exists=True # To raise error if the table exists, set False
)
Get Table #
The Table interface provides tools to read and write table.
table = catalog.get_table('database_name.table_name')
Batch Write #
Paimon table write is Two-Phase Commit, you can write many times, but once committed, no more data can be written.
Currently, the feature of writing multiple times and committing once only supports append only table.
table = catalog.get_table('database_name.table_name')
# 1. Create table write and commit
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
# 2. Write data. Support 3 methods:
# 2.1 Write pandas.DataFrame
dataframe = ...
table_write.write_pandas(dataframe)
# 2.2 Write pyarrow.Table
pa_table = ...
table_write.write_arrow(pa_table)
# 2.3 Write pyarrow.RecordBatch
record_batch = ...
table_write.write_arrow_batch(record_batch)
# 3. Commit data
commit_messages = table_write.prepare_commit()
table_commit.commit(commit_messages)
# 4. Close resources
table_write.close()
table_commit.close()
By default, the data will be appended to table. If you want to overwrite table, you should use TableWrite#overwrite
API:
# overwrite whole table
write_builder = table.new_batch_write_builder().overwrite()
# overwrite partition 'dt=2024-01-01'
write_builder = table.new_batch_write_builder().overwrite({'dt': '2024-01-01'})
Batch Read #
Get ReadBuilder and Perform pushdown #
A ReadBuilder
is used to build reading utils and perform filter and projection pushdown.
table = catalog.get_table('database_name.table_name')
read_builder = table.new_read_builder()
You can use PredicateBuilder
to build filters and pushdown them by ReadBuilder
:
# Example filter: ('f0' < 3 OR 'f1' > 6) AND 'f3' = 'A'
predicate_builder = read_builder.new_predicate_builder()
predicate1 = predicate_builder.less_than('f0', 3)
predicate2 = predicate_builder.greater_than('f1', 6)
predicate3 = predicate_builder.or_predicates([predicate1, predicate2])
predicate4 = predicate_builder.equal('f3', 'A')
predicate_5 = predicate_builder.and_predicates([predicate3, predicate4])
read_builder = read_builder.with_filter(predicate_5)
See Predicate for all supported filters and building methods.
You can also pushdown projection by ReadBuilder
:
# select f3 and f2 columns
read_builder = read_builder.with_projection(['f3', 'f2'])
Scan Plan #
Then you can step into Scan Plan stage to get splits
:
table_scan = read_builder.new_scan()
splits = table_scan.plan().splits()
Read Splits #
Finally, you can read data from the splits
to various data format.
Apache Arrow #
This requires pyarrow
to be installed.
You can read all the data into a pyarrow.Table
:
table_read = read_builder.new_read()
pa_table = table_read.to_arrow(splits)
print(pa_table)
# pyarrow.Table
# f0: int32
# f1: string
# ----
# f0: [[1,2,3],[4,5,6],...]
# f1: [["a","b","c"],["d","e","f"],...]
You can also read data into a pyarrow.RecordBatchReader
and iterate record batches:
table_read = read_builder.new_read()
for batch in table_read.to_arrow_batch_reader(splits):
print(batch)
# pyarrow.RecordBatch
# f0: int32
# f1: string
# ----
# f0: [1,2,3]
# f1: ["a","b","c"]
Python Iterator #
You can read the data row by row into a native Python iterator. This is convenient for custom row-based processing logic.
table_read = read_builder.new_read()
for row in table_read.to_iterator(splits):
print(row)
# [1,2,3]
# ["a","b","c"]
Pandas #
This requires pandas
to be installed.
You can read all the data into a pandas.DataFrame
:
table_read = read_builder.new_read()
df = table_read.to_pandas(splits)
print(df)
# f0 f1
# 0 1 a
# 1 2 b
# 2 3 c
# 3 4 d
# ...
DuckDB #
This requires duckdb
to be installed.
You can convert the splits into an in-memory DuckDB table and query it:
table_read = read_builder.new_read()
duckdb_con = table_read.to_duckdb(splits, 'duckdb_table')
print(duckdb_con.query("SELECT * FROM duckdb_table").fetchdf())
# f0 f1
# 0 1 a
# 1 2 b
# 2 3 c
# 3 4 d
# ...
print(duckdb_con.query("SELECT * FROM duckdb_table WHERE f0 = 1").fetchdf())
# f0 f1
# 0 1 a
Ray #
This requires ray
to be installed.
You can convert the splits into a Ray dataset and handle it by Ray API:
table_read = read_builder.new_read()
ray_dataset = table_read.to_ray(splits)
print(ray_dataset)
# MaterializedDataset(num_blocks=1, num_rows=9, schema={f0: int32, f1: string})
print(ray_dataset.take(3))
# [{'f0': 1, 'f1': 'a'}, {'f0': 2, 'f1': 'b'}, {'f0': 3, 'f1': 'c'}]
print(ray_dataset.to_pandas())
# f0 f1
# 0 1 a
# 1 2 b
# 2 3 c
# 3 4 d
# ...
Data Types #
Python Native Type | PyArrow Type | Paimon Type |
---|---|---|
int |
pyarrow.int8() |
TINYINT |
int |
pyarrow.int16() |
SMALLINT |
int |
pyarrow.int32() |
INT |
int |
pyarrow.int64() |
BIGINT |
float |
pyarrow.float32() |
FLOAT |
float |
pyarrow.float64() |
DOUBLE |
bool |
pyarrow.bool_() |
BOOLEAN |
str |
pyarrow.string() |
STRING , CHAR(n) , VARCHAR(n) |
bytes |
pyarrow.binary() |
BYTES , VARBINARY(n) |
bytes |
pyarrow.binary(length) |
BINARY(length) |
decimal.Decimal |
pyarrow.decimal128(precision, scale) |
DECIMAL(precision, scale) |
datetime.datetime |
pyarrow.timestamp(unit, tz=None) |
TIMESTAMP(p) |
datetime.date |
pyarrow.date32() |
DATE |
datetime.time |
pyarrow.time32(unit) or pyarrow.time64(unit) |
TIME(p) |
Predicate #
Predicate kind | Predicate method |
---|---|
p1 and p2 | PredicateBuilder.and_predicates([p1, p2]) |
p1 or p2 | PredicateBuilder.or_predicates([p1, p2]) |
f = literal | PredicateBuilder.equal(f, literal) |
f != literal | PredicateBuilder.not_equal(f, literal) |
f < literal | PredicateBuilder.less_than(f, literal) |
f <= literal | PredicateBuilder.less_or_equal(f, literal) |
f > literal | PredicateBuilder.greater_than(f, literal) |
f >= literal | PredicateBuilder.greater_or_equal(f, literal) |
f is null | PredicateBuilder.is_null(f) |
f is not null | PredicateBuilder.is_not_null(f) |
f.startswith(literal) | PredicateBuilder.startswith(f, literal) |
f.endswith(literal) | PredicateBuilder.endswith(f, literal) |
f.contains(literal) | PredicateBuilder.contains(f, literal) |
f is in [l1, l2] | PredicateBuilder.is_in(f, [l1, l2]) |
f is not in [l1, l2] | PredicateBuilder.is_not_in(f, [l1, l2]) |
lower <= f <= upper | PredicateBuilder.between(f, lower, upper) |