Ray Data
This requires ray to be installed.
pypaimon.ray exposes a top-level read_paimon / write_paimon facade that
takes a table identifier and catalog options directly, mirroring the shape of
Ray's built-in Iceberg integration. The lower-level TableRead.to_ray() and
TableWrite.write_ray() entry points remain available for callers that have
already resolved a (read_builder, splits) pair or constructed a
table_write via the regular pypaimon API.
If your application uses Daft DataFrames and only needs Ray as Daft's execution backend, see Running Daft on Ray.
Read
read_paimon (recommended)
from pypaimon.ray import read_paimon
ray_dataset = read_paimon(
"database_name.table_name",
catalog_options={"warehouse": "/path/to/warehouse"},
)
print(ray_dataset)
# MaterializedDataset(num_blocks=1, num_rows=9, schema={f0: int32, f1: string})
print(ray_dataset.take(3))
# [{'f0': 1, 'f1': 'a'}, {'f0': 2, 'f1': 'b'}, {'f0': 3, 'f1': 'c'}]
print(ray_dataset.to_pandas())
# f0 f1
# 0 1 a
# 1 2 b
# 2 3 c
# 3 4 d
# ...
read_paimon opens its own catalog and resolves the table, so it is the
single-call equivalent of the four-step CatalogFactory.create → get_table → new_read_builder → to_ray boilerplate.
Projection and limit:
ray_dataset = read_paimon(
"database_name.table_name",
catalog_options={"warehouse": "/path/to/warehouse"},
projection=["id", "score"],
limit=1000,
)
Distribution / scheduling:
ray_dataset = read_paimon(
"database_name.table_name",
catalog_options={"warehouse": "/path/to/warehouse"},
override_num_blocks=4,
ray_remote_args={"num_cpus": 2, "max_retries": 3},
concurrency=8,
)
Time travel:
# Read a specific snapshot.
ray_dataset = read_paimon(
"database_name.table_name",
catalog_options={"warehouse": "/path/to/warehouse"},
snapshot_id=42,
)
# Read a tagged snapshot.
ray_dataset = read_paimon(
"database_name.table_name",
catalog_options={"warehouse": "/path/to/warehouse"},
tag_name="release-2026-04",
)
snapshot_id and tag_name are mutually exclusive.
Parameters:
table_identifier: full table name, e.g."db_name.table_name".catalog_options: kwargs forwarded toCatalogFactory.create(), e.g.{"warehouse": "/path/to/warehouse"}.filter: optionalPredicateto push down into the scan.projection: optional list of column names to read.limit: optional row limit applied at scan planning time.snapshot_id: optional snapshot id to time-travel to. Mutually exclusive withtag_name.tag_name: optional tag name to time-travel to. Mutually exclusive withsnapshot_id.override_num_blocks: optional override for the number of output blocks. Must be>= 1.ray_remote_args: optional kwargs passed toray.remote()in read tasks (e.g.{"num_cpus": 2, "max_retries": 3}).concurrency: optional max number of Ray tasks to run concurrently.**read_args: additional kwargs forwarded toray.data.read_datasource(e.g.per_task_row_limitin Ray 2.52.0+).
TableRead.to_ray() (lower-level)
If you already have a read_builder and splits, you can convert them to a
Ray Dataset directly:
table_read = read_builder.new_read()
splits = read_builder.new_scan().plan().splits()
ray_dataset = table_read.to_ray(
splits,
override_num_blocks=4,
ray_remote_args={"num_cpus": 2, "max_retries": 3},
)
to_ray() accepts the same override_num_blocks, ray_remote_args,
concurrency, and **read_args parameters as read_paimon.
Ray Block Size Configuration
If you need to configure Ray's block size (e.g., when Paimon splits exceed
Ray's default 128MB block size), set it on the DataContext before calling
either read_paimon or to_ray:
from ray.data import DataContext
ctx = DataContext.get_current()
ctx.target_max_block_size = 256 * 1024 * 1024 # 256MB (default is 128MB)
See the Ray Data API documentation for more details.
Write
write_paimon (recommended)
import ray
from pypaimon.ray import write_paimon
ray_dataset = ray.data.read_json("/path/to/data.jsonl")
write_paimon(
ray_dataset,
"database_name.table_name",
catalog_options={"warehouse": "/path/to/warehouse"},
)
write_paimon opens its own catalog, resolves the table, and commits the
write through Ray's Datasink API — there is no separate prepare_commit or
close step to run.
Overwrite mode:
write_paimon(
ray_dataset,
"database_name.table_name",
catalog_options={"warehouse": "/path/to/warehouse"},
overwrite=True,
)
Distribution / scheduling:
write_paimon(
ray_dataset,
"database_name.table_name",
catalog_options={"warehouse": "/path/to/warehouse"},
concurrency=4,
ray_remote_args={"num_cpus": 2},
)
HASH_FIXED pre-clustering:
HASH_FIXED rows are always assigned to the correct Paimon bucket by the writer. Pre-clustering is only a file-count optimization.
By default, write_paimon writes append-only HASH_FIXED tables
without pre-clustering. This avoids Ray groupby().map_groups()
materializing an entire (partition_keys..., bucket) group on one Ray
node.
HASH_FIXED primary-key tables reject the default/off mode. Direct Ray
writes can send the same bucket to multiple writer tasks, and those
writers can allocate overlapping sequence numbers. Use the explicit
map_groups mode until a bounded pre-clustering strategy preserves
per-bucket sequence ordering.
If every (partition_keys..., bucket) group fits in memory on a
single Ray node, you can opt in to the legacy small-file optimization:
write_paimon(
ray_dataset,
"database_name.table_name",
catalog_options={"warehouse": "/path/to/warehouse"},
hash_fixed_precluster="map_groups",
)
hash_fixed_precluster="map_groups" groups rows by
(partition_keys..., bucket) before writing so each group lands in a
single Ray task. This can reduce file count and keeps HASH_FIXED
primary-key sequence generation per bucket in one writer task, but it
inherits Ray's map_groups() memory bound. Large append-only buckets
or hot append-only partitions should use the default mode or
hash_fixed_precluster="off".
For non-HASH_FIXED append-only tables, the dataset is written as-is.
Postpone-bucket primary-key tables (bucket = -2) are also written
as-is to the bucket-postpone directory. HASH_DYNAMIC and
CROSS_PARTITION primary-key Ray writes are not supported and fail fast,
including the default dynamic-bucket primary-key table (bucket = -1).
Ray write tasks create independent Paimon writers, which can assign
overlapping buckets or sequence numbers for those modes.
Parameters:
dataset: the Ray Dataset to write.table_identifier: full table name, e.g."db_name.table_name".catalog_options: kwargs forwarded toCatalogFactory.create().overwrite: ifTrue, overwrite existing data in the table.concurrency: optional max number of Ray write tasks to run concurrently.ray_remote_args: optional kwargs passed toray.remote()in write tasks (e.g.{"num_cpus": 2}).hash_fixed_precluster: HASH_FIXED pre-clustering mode."auto"and"off"write append-only HASH_FIXED tables directly and reject HASH_FIXED primary-key tables."map_groups"enables the legacy small-file optimization for HASH_FIXED primary-key tables and requires each(partition_keys..., bucket)group to fit in memory on one Ray node. This option does not enable Ray writes for HASH_DYNAMIC or CROSS_PARTITION primary-key tables.
TableWrite.write_ray() (lower-level)
If you have already constructed a table_write from a write builder, you can
hand a Ray Dataset directly to it. write_ray() uses the same HASH_FIXED
pre-clustering modes and safety checks as the top-level write_paimon() API.
It commits through the Ray Datasink API, so there is no prepare_commit /
commit step to run for the Ray write itself — just close the writer when you
are done with it:
import ray
table = catalog.get_table('database_name.table_name')
# 1. Create table write.
table_write = table.new_batch_write_builder().new_write()
# 2. Write Ray Dataset
ray_dataset = ray.data.read_json("/path/to/data.jsonl")
table_write.write_ray(
ray_dataset,
overwrite=False,
concurrency=2,
hash_fixed_precluster="auto",
static_partition=None,
)
# Parameters:
# - dataset: Ray Dataset to write
# - overwrite: Whether to overwrite existing data (default: False)
# - concurrency: Optional max number of concurrent Ray tasks
# - ray_remote_args: Optional kwargs passed to ray.remote() (e.g., {"num_cpus": 2})
# - hash_fixed_precluster: Same HASH_FIXED modes and primary-key safety
# checks as write_paimon()
# - static_partition: Optional partition spec to overwrite. When set,
# write_ray() runs in overwrite mode for this partition.
# 3. Close resources
table_write.close()
Overwrite
The top-level write_paimon() API supports whole-table overwrite with the
overwrite=True flag above. With the lower-level write_ray() API, you can
use overwrite=True for whole-table overwrite and static_partition={...} for
partition overwrite:
table_write.write_ray(ray_dataset, overwrite=True)
table_write.write_ray(ray_dataset, static_partition={'dt': '2024-01-01'})
When using the lower-level builder API, you can also configure overwrite mode
on the write builder itself. The resulting table_write carries the overwrite
partition into write_ray(). A static_partition argument passed directly to
write_ray() overrides the builder-level partition:
# overwrite whole table
table_write = table.new_batch_write_builder().overwrite().new_write()
table_write.write_ray(ray_dataset)
# overwrite partition 'dt=2024-01-01'
table_write = (
table.new_batch_write_builder()
.overwrite({'dt': '2024-01-01'})
.new_write()
)
table_write.write_ray(ray_dataset)
Bucket Join
bucket_join joins two co-bucketed tables (same bucket count and the same
bucket-key) on the bucket-key, with no global shuffle: the same key lands in
the same bucket on both sides, so each bucket is read and joined in its own Ray
task. It returns a ray.data.Dataset whose results stay distributed (never
pulled into the driver).
A common use is looking up a global _ROW_ID for a batch of keys without a
shuffle join against a large table: keep a small co-bucketed (key, _ROW_ID)
side table, bucket_join the incoming keys against it, then feed the resulting
row ids into a row-id update.
from pypaimon.ray import bucket_join
ds = bucket_join(
left="database_name.incoming_keys", # co-bucketed table identifier
right="database_name.key_rowid", # co-bucketed table identifier
catalog_options={"warehouse": "/path/to/warehouse"},
on="url", # must equal the bucket-key
left_projection=["url"], # optional; must keep the join key
right_projection=["url", "row_id"], # optional; must keep the join key
)
# ds: ray.data.Dataset of the joined rows, e.g. {"url": ..., "row_id": ...}
Parameters:
left/right: identifiers of the two co-bucketed tables to join.on: the join key(s). Must be exactly the bucket-key — equal keys only co-locate by bucket when joining on the bucket-key.left_projection/right_projection: optional column projections applied on read. If given, each must include the join key.join_type: only"inner"is supported (an outer join would need the union of buckets, which per-bucket intersection cannot produce).ray_remote_args: Ray remote options applied to each per-bucket join task.
Returns: a ray.data.Dataset of the joined rows.
Notes:
- Both tables must be fixed-bucket (
bucket > 0) with the same bucket count and the same bucket-key (same column names, order, and types); otherwisebucket_joinraises. For primary-key tables that do not setbucket-keyexplicitly, the bucket-key resolves to the (partition-trimmed) primary key. - The two sides must not share columns other than the join key, or the
underlying pyarrow join would collide; project them away with
left_projection/right_projectionfirst. - Each side is planned at its own latest snapshot, and one bucket is joined by a single Ray task that reads the whole bucket into memory. Choose a bucket count that spreads keys evenly to avoid skewed, memory-heavy tasks.
- Partitioned tables are not supported yet (bucket ids are per-partition).
Merge Into
merge_into updates or deletes matched rows and optionally inserts unmatched
rows of a data-evolution table from a source, like SQL MERGE INTO.
Matched rows are updated in place by _ROW_ID; only the touched columns are
rewritten. Matched delete clauses are written through deletion vectors.
Requires ray >= 2.50 and a target table with 'data-evolution.enabled' and
'row-tracking.enabled' set. If you use matched delete clauses, the target
must also enable 'deletion-vectors.enabled'.
from pypaimon.ray import merge_into, WhenMatched, WhenNotMatched
metrics = merge_into(
target="database_name.table_name",
source=ray_dataset, # ray.data.Dataset / pa.Table / pandas / table-name str
catalog_options={"warehouse": "/path/to/warehouse"},
on=["id"], # or {"target_col": "source_col"} for renamed keys
when_matched=[WhenMatched.update("*")],
when_not_matched=[WhenNotMatched(insert="*")], # optional
)
print(metrics) # {"num_matched": 3, "num_inserted": 2, "num_unchanged": 0}
Conditional clauses filter which matched/unmatched rows are acted on:
merge_into(
target="db.table",
source=source_ds,
catalog_options=catalog_options,
on=["id"],
when_matched=[WhenMatched.update("*", condition="s.age > t.age")],
when_not_matched=[WhenNotMatched(insert="*", condition="s.age > 18")],
)
Use WhenMatched.delete() to delete matched rows:
merge_into(
target="db.table",
source=source_ds,
catalog_options=catalog_options,
on=["id"],
when_matched=[
WhenMatched.delete(condition="s.deleted = TRUE"),
WhenMatched.update("*"),
],
)
Conditions use SQL-style expressions with s. (source) and t. (target)
column prefixes. WhenNotMatched conditions may only reference source
columns (s.*). Condition evaluation uses DataFusion through the PyPaimon SQL
extra. Install the extra before using conditions: pip install pypaimon[sql].
update/delete/insert:WhenMatched.update(...)updates matched rows,WhenMatched.delete()deletes matched rows, andWhenNotMatched(insert=...)inserts unmatched rows."*"updates/inserts all columns from source, including blob columns. A mapping selects specific columns:from pypaimon.ray import source_col, target_col, litWhenMatched.update({"age": source_col("age"), "name": target_col("name")})WhenMatched.delete()WhenNotMatched(insert={"id": source_col("id"), "status": lit("new")})"s.<col>"/"t.<col>"shorthands also work (t.*only in update). Uselit()for literals starting withs.ort..condition: an optional SQL-style boolean expression. Uses.<col>andt.<col>to reference source and target columns.- Multiple clauses are evaluated in order; the first matching condition wins:
when_matched=[WhenMatched.update("*", condition="s.ts > t.ts"),WhenMatched.update("*"), # fallback for unmatched rows]
Parameters:
source: aray.data.Dataset,pyarrow.Table,pandas.DataFrame, or a Paimon table identifier string. When a string is passed, it reads the table from the samecatalog_optionsat the latest snapshot.on: key columns, or{target_col: source_col}for renamed keys.num_partitions: shuffle parallelism for the join and the write; defaults tomax(1, cluster_cpus * 2). Raise it for large merges on big clusters.ray_remote_args: Ray remote options applied to the merge's map/group tasks (update/delete transform, group write, insert transform).concurrency: scheduling for the insert sink.
Returns: {"num_matched", "num_inserted", "num_unchanged"}. num_matched
counts the rows actually updated or deleted (after condition filtering).
num_unchanged is 0 in the current implementation.
For an end-to-end feature update workflow on Blob tables, see Distributed Feature Backfill with Ray.
Notes:
- Partition key columns cannot be updated by matched update clauses, because cross-partition row movement is not implemented. Matched delete clauses and matched updates of non-partition columns work on partitioned tables. Not-matched inserts into partitioned tables work normally.
- Matched delete clauses require
deletion-vectors.enabled = true. - Blob columns can be updated and inserted by
merge_into. Withupdate="*"orinsert="*", the source must include the corresponding blob columns. If an insert mapping omits a blob column, that column is written asNULL.