This documentation is for an unreleased version of Apache Paimon. We recommend you use the latest stable version.
Ray Data #
This requires ray to be installed.
pypaimon.ray exposes a top-level read_paimon / write_paimon facade that
takes a table identifier and catalog options directly, mirroring the shape of
Ray’s built-in Iceberg integration. The lower-level TableRead.to_ray() and
TableWrite.write_ray() entry points remain available for callers that have
already resolved a (read_builder, splits) pair or constructed a
table_write via the regular pypaimon API.
Read #
read_paimon (recommended)
#
from pypaimon.ray import read_paimon
ray_dataset = read_paimon(
"database_name.table_name",
catalog_options={"warehouse": "/path/to/warehouse"},
)
print(ray_dataset)
# MaterializedDataset(num_blocks=1, num_rows=9, schema={f0: int32, f1: string})
print(ray_dataset.take(3))
# [{'f0': 1, 'f1': 'a'}, {'f0': 2, 'f1': 'b'}, {'f0': 3, 'f1': 'c'}]
print(ray_dataset.to_pandas())
# f0 f1
# 0 1 a
# 1 2 b
# 2 3 c
# 3 4 d
# ...
read_paimon opens its own catalog and resolves the table, so it is the
single-call equivalent of the four-step CatalogFactory.create → get_table → new_read_builder → to_ray boilerplate.
Projection and limit:
ray_dataset = read_paimon(
"database_name.table_name",
catalog_options={"warehouse": "/path/to/warehouse"},
projection=["id", "score"],
limit=1000,
)
Distribution / scheduling:
ray_dataset = read_paimon(
"database_name.table_name",
catalog_options={"warehouse": "/path/to/warehouse"},
override_num_blocks=4,
ray_remote_args={"num_cpus": 2, "max_retries": 3},
concurrency=8,
)
Time travel:
# Read a specific snapshot.
ray_dataset = read_paimon(
"database_name.table_name",
catalog_options={"warehouse": "/path/to/warehouse"},
snapshot_id=42,
)
# Read a tagged snapshot.
ray_dataset = read_paimon(
"database_name.table_name",
catalog_options={"warehouse": "/path/to/warehouse"},
tag_name="release-2026-04",
)
snapshot_id and tag_name are mutually exclusive.
Parameters:
table_identifier: full table name, e.g."db_name.table_name".catalog_options: kwargs forwarded toCatalogFactory.create(), e.g.{"warehouse": "/path/to/warehouse"}.filter: optionalPredicateto push down into the scan.projection: optional list of column names to read.limit: optional row limit applied at scan planning time.snapshot_id: optional snapshot id to time-travel to. Mutually exclusive withtag_name.tag_name: optional tag name to time-travel to. Mutually exclusive withsnapshot_id.override_num_blocks: optional override for the number of output blocks. Must be>= 1.ray_remote_args: optional kwargs passed toray.remote()in read tasks (e.g.{"num_cpus": 2, "max_retries": 3}).concurrency: optional max number of Ray tasks to run concurrently.**read_args: additional kwargs forwarded toray.data.read_datasource(e.g.per_task_row_limitin Ray 2.52.0+).
TableRead.to_ray() (lower-level)
#
If you already have a read_builder and splits, you can convert them to a
Ray Dataset directly:
table_read = read_builder.new_read()
splits = read_builder.new_scan().plan().splits()
ray_dataset = table_read.to_ray(
splits,
override_num_blocks=4,
ray_remote_args={"num_cpus": 2, "max_retries": 3},
)
to_ray() accepts the same override_num_blocks, ray_remote_args,
concurrency, and **read_args parameters as read_paimon.
Ray Block Size Configuration #
If you need to configure Ray’s block size (e.g., when Paimon splits exceed
Ray’s default 128MB block size), set it on the DataContext before calling
either read_paimon or to_ray:
from ray.data import DataContext
ctx = DataContext.get_current()
ctx.target_max_block_size = 256 * 1024 * 1024 # 256MB (default is 128MB)
See the Ray Data API documentation for more details.
Write #
write_paimon (recommended)
#
import ray
from pypaimon.ray import write_paimon
ray_dataset = ray.data.read_json("/path/to/data.jsonl")
write_paimon(
ray_dataset,
"database_name.table_name",
catalog_options={"warehouse": "/path/to/warehouse"},
)
write_paimon opens its own catalog, resolves the table, and commits the
write through Ray’s Datasink API — there is no separate prepare_commit or
close step to run.
Overwrite mode:
write_paimon(
ray_dataset,
"database_name.table_name",
catalog_options={"warehouse": "/path/to/warehouse"},
overwrite=True,
)
Distribution / scheduling:
write_paimon(
ray_dataset,
"database_name.table_name",
catalog_options={"warehouse": "/path/to/warehouse"},
concurrency=4,
ray_remote_args={"num_cpus": 2},
)
Parameters:
dataset: the Ray Dataset to write.table_identifier: full table name, e.g."db_name.table_name".catalog_options: kwargs forwarded toCatalogFactory.create().overwrite: ifTrue, overwrite existing data in the table.concurrency: optional max number of Ray write tasks to run concurrently.ray_remote_args: optional kwargs passed toray.remote()in write tasks (e.g.{"num_cpus": 2}).
TableWrite.write_ray() (lower-level)
#
If you have already constructed a table_write from a write builder, you can
hand a Ray Dataset directly to it. write_ray() commits through the Ray
Datasink API, so there is no prepare_commit / commit step to run for the
Ray write itself — just close the writer when you are done with it:
import ray
table = catalog.get_table('database_name.table_name')
# 1. Create table write and commit (commit is only needed for non-Ray writes
# on the same table_write instance — see below).
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
# 2. Write Ray Dataset
ray_dataset = ray.data.read_json("/path/to/data.jsonl")
table_write.write_ray(ray_dataset, overwrite=False, concurrency=2)
# Parameters:
# - dataset: Ray Dataset to write
# - overwrite: Whether to overwrite existing data (default: False)
# - concurrency: Optional max number of concurrent Ray tasks
# - ray_remote_args: Optional kwargs passed to ray.remote() (e.g., {"num_cpus": 2})
# 3. Commit data (required for write_pandas/write_arrow/write_arrow_batch only)
commit_messages = table_write.prepare_commit()
table_commit.commit(commit_messages)
# 4. Close resources
table_write.close()
table_commit.close()
Overwrite at builder level #
The recommended way to overwrite via write_paimon is the overwrite=True
flag above. When using the lower-level builder API, you can also configure
overwrite mode on the write builder itself:
# overwrite whole table
write_builder = table.new_batch_write_builder().overwrite()
# overwrite partition 'dt=2024-01-01'
write_builder = table.new_batch_write_builder().overwrite({'dt': '2024-01-01'})