Ray Data
This documentation is for an unreleased version of Apache Paimon. We recommend you use the latest stable version.

Ray Data #

Read #

This requires ray to be installed.

You can convert the splits into a Ray Dataset and handle it by Ray Data API for distributed processing:

table_read = read_builder.new_read()
ray_dataset = table_read.to_ray(splits)

print(ray_dataset)
# MaterializedDataset(num_blocks=1, num_rows=9, schema={f0: int32, f1: string})

print(ray_dataset.take(3))
# [{'f0': 1, 'f1': 'a'}, {'f0': 2, 'f1': 'b'}, {'f0': 3, 'f1': 'c'}]

print(ray_dataset.to_pandas())
#    f0 f1
# 0   1  a
# 1   2  b
# 2   3  c
# 3   4  d
# ...

The to_ray() method supports Ray Data API parameters for distributed processing:

# Basic usage
ray_dataset = table_read.to_ray(splits)

# Specify number of output blocks
ray_dataset = table_read.to_ray(splits, override_num_blocks=4)

# Configure Ray remote arguments
ray_dataset = table_read.to_ray(
    splits,
    override_num_blocks=4,
    ray_remote_args={"num_cpus": 2, "max_retries": 3}
)

# Use Ray Data operations
mapped_dataset = ray_dataset.map(lambda row: {'value': row['value'] * 2})
filtered_dataset = ray_dataset.filter(lambda row: row['score'] > 80)
df = ray_dataset.to_pandas()

Parameters:

  • override_num_blocks: Optional override for the number of output blocks. By default, Ray automatically determines the optimal number.
  • ray_remote_args: Optional kwargs passed to ray.remote() in read tasks (e.g., {"num_cpus": 2, "max_retries": 3}).
  • concurrency: Optional max number of Ray tasks to run concurrently. By default, dynamically decided based on available resources.
  • **read_args: Additional kwargs passed to the datasource (e.g., per_task_row_limit in Ray 2.52.0+).

Ray Block Size Configuration:

If you need to configure Ray’s block size (e.g., when Paimon splits exceed Ray’s default 128MB block size), set it before calling to_ray():

from ray.data import DataContext

ctx = DataContext.get_current()
ctx.target_max_block_size = 256 * 1024 * 1024  # 256MB (default is 128MB)
ray_dataset = table_read.to_ray(splits)

See Ray Data API Documentation for more details.

Write #

table = catalog.get_table('database_name.table_name')

# 1. Create table write and commit
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()

# 2 Write Ray Dataset (requires ray to be installed)
import ray
ray_dataset = ray.data.read_json("/path/to/data.jsonl")
table_write.write_ray(ray_dataset, overwrite=False, concurrency=2)
# Parameters:
#   - dataset: Ray Dataset to write
#   - overwrite: Whether to overwrite existing data (default: False)
#   - concurrency: Optional max number of concurrent Ray tasks
#   - ray_remote_args: Optional kwargs passed to ray.remote() (e.g., {"num_cpus": 2})
# Note: write_ray() handles commit internally through Ray Datasink API.
#       Skip steps 3-4 if using write_ray() - just close the writer.

# 3. Commit data (required for write_pandas/write_arrow/write_arrow_batch only)
commit_messages = table_write.prepare_commit()
table_commit.commit(commit_messages)

# 4. Close resources
table_write.close()
table_commit.close()

By default, the data will be appended to table. If you want to overwrite table, you should use TableWrite#overwrite API:

# overwrite whole table
write_builder = table.new_batch_write_builder().overwrite()

# overwrite partition 'dt=2024-01-01'
write_builder = table.new_batch_write_builder().overwrite({'dt': '2024-01-01'})
Edit This Page
Copyright © 2025 The Apache Software Foundation. Apache Paimon, Paimon, and its feather logo are trademarks of The Apache Software Foundation.