Data Evolution
This documentation is for an unreleased version of Apache Paimon. We recommend you use the latest stable version.

Data Evolution #

PyPaimon for Data Evolution mode. See Data Evolution.

Prerequisites #

To use partial updates / data evolution, enable both options when creating the table:

  • row-tracking.enabled: true
  • data-evolution.enabled: true

Update Columns By Row ID #

You can create TableUpdate.update_by_arrow_with_row_id to update columns to data evolution tables.

The input data should include the _ROW_ID column, update operation will automatically sort and match each _ROW_ID to its corresponding first_row_id, then groups rows with the same first_row_id and writes them to a separate file.

Requirements for _ROW_ID updates

  • Update columns only: include _ROW_ID plus the columns you want to update (partial schema is OK).
import pyarrow as pa
from pypaimon import CatalogFactory, Schema

catalog = CatalogFactory.create({'warehouse': '/tmp/warehouse'})
catalog.create_database('default', False)

simple_pa_schema = pa.schema([
  ('f0', pa.int8()),
  ('f1', pa.int16()),
])
schema = Schema.from_pyarrow_schema(simple_pa_schema,
                                    options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'})
catalog.create_table('default.test_row_tracking', schema, False)
table = catalog.get_table('default.test_row_tracking')

# write all columns
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
expect_data = pa.Table.from_pydict({
  'f0': [-1, 2],
  'f1': [-1001, 1002]
}, schema=simple_pa_schema)
table_write.write_arrow(expect_data)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()

# update partial columns
write_builder = table.new_batch_write_builder()
table_update = write_builder.new_update().with_update_type(['f0'])
table_commit = write_builder.new_commit()
data2 = pa.Table.from_pydict({
  '_ROW_ID': [0, 1],
  'f0': [5, 6],
}, schema=pa.schema([
  ('_ROW_ID', pa.int64()),
  ('f0', pa.int8()),
]))
cmts = table_update.update_by_arrow_with_row_id(data2)
table_commit.commit(cmts)
table_commit.close()

# content should be:
#   'f0': [5, 6],
#   'f1': [-1001, 1002]

Filter by _ROW_ID #

Requires the same Prerequisites (row-tracking and data-evolution enabled). On such tables you can filter by _ROW_ID to prune files at scan time. Supported: equal('_ROW_ID', id), is_in('_ROW_ID', [id1, ...]), between('_ROW_ID', low, high).

pb = table.new_read_builder().new_predicate_builder()
rb = table.new_read_builder().with_filter(pb.equal('_ROW_ID', 0))
result = rb.new_read().to_arrow(rb.new_scan().plan().splits())

Upsert By Key #

If you want to upsert (update-or-insert) rows by one or more business key columns — without manually providing _ROW_ID — use upsert_by_arrow_with_key. For each input row:

  • Key matches an existing row → update that row in place.
  • No match → append as a new row.

Requirements

  • The table must have data-evolution.enabled = true and row-tracking.enabled = true.
  • All upsert_keys must exist in both the table schema and the input data.
  • For partitioned tables, the input data must contain all partition key columns. Partition keys are automatically stripped from upsert_keys during matching (since each partition is processed independently), so you do not need to include them in upsert_keys.

Example: basic upsert

import pyarrow as pa
from pypaimon import CatalogFactory, Schema

catalog = CatalogFactory.create({'warehouse': '/tmp/warehouse'})
catalog.create_database('default', False)

pa_schema = pa.schema([
    ('id', pa.int32()),
    ('name', pa.string()),
    ('age', pa.int32()),
])
schema = Schema.from_pyarrow_schema(
    pa_schema,
    options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'},
)
catalog.create_table('default.users', schema, False)
table = catalog.get_table('default.users')

# write initial data
write_builder = table.new_batch_write_builder()
write = write_builder.new_write()
commit = write_builder.new_commit()
write.write_arrow(pa.Table.from_pydict(
    {'id': [1, 2], 'name': ['Alice', 'Bob'], 'age': [30, 25]},
    schema=pa_schema,
))
commit.commit(write.prepare_commit())
write.close()
commit.close()

# upsert: update id=1, insert id=3
write_builder = table.new_batch_write_builder()
table_update = write_builder.new_update()
table_commit = write_builder.new_commit()

upsert_data = pa.Table.from_pydict(
    {'id': [1, 3], 'name': ['Alice_v2', 'Charlie'], 'age': [31, 28]},
    schema=pa_schema,
)
cmts = table_update.upsert_by_arrow_with_key(upsert_data, upsert_keys=['id'])
table_commit.commit(cmts)
table_commit.close()

# content should be:
#   id=1: name='Alice_v2', age=31   (updated)
#   id=2: name='Bob',      age=25   (unchanged)
#   id=3: name='Charlie',  age=28   (new)

Example: partial-column upsert with update_cols

Combine with_update_type with upsert_by_arrow_with_key to update only specific columns for matched rows while still appending full rows for new keys:

write_builder = table.new_batch_write_builder()
table_update = write_builder.new_update().with_update_type(['age'])
table_commit = write_builder.new_commit()

upsert_data = pa.Table.from_pydict(
    {'id': [1, 4], 'name': ['ignored', 'David'], 'age': [99, 22]},
    schema=pa_schema,
)
cmts = table_update.upsert_by_arrow_with_key(upsert_data, upsert_keys=['id'])
table_commit.commit(cmts)
table_commit.close()

# id=1: only 'age' is updated to 99; 'name' remains 'Alice_v2'
# id=4: appended as a full new row

Example: partitioned table with composite key

partitioned_schema = pa.schema([
    ('id', pa.int32()),
    ('name', pa.string()),
    ('region', pa.string()),
])
schema = Schema.from_pyarrow_schema(
    partitioned_schema,
    partition_keys=['region'],
    options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'},
)
catalog.create_table('default.users_partitioned', schema, False)
table = catalog.get_table('default.users_partitioned')

# ... write initial data ...

write_builder = table.new_batch_write_builder()
table_update = write_builder.new_update()
table_commit = write_builder.new_commit()

upsert_data = pa.Table.from_pydict(
    {'id': [1, 3], 'name': ['Alice_v2', 'Charlie'], 'region': ['US', 'EU']},
    schema=partitioned_schema,
)
# upsert_keys=['id'] only; partition key 'region' is auto-stripped
cmts = table_update.upsert_by_arrow_with_key(upsert_data, upsert_keys=['id'])
table_commit.commit(cmts)
table_commit.close()

Notes

  • Execution is driven partition-by-partition: only one partition’s key set is loaded into memory at a time.
  • Duplicate keys in the input data will raise an error.
  • The upsert is atomic per commit — all matched updates and new appends are included in the same commit.

Update Columns By Shards #

If you want to compute a derived column (or update an existing column based on other columns) without providing _ROW_ID, you can use the shard scan + rewrite workflow:

  • Read only the columns you need (projection)
  • Compute the new values in the same row order
  • Write only the updated columns back
  • Commit per shard

This is useful for backfilling a newly added column, or recomputing a column from other columns.

Example: compute d = c + b - a

import pyarrow as pa
from pypaimon import CatalogFactory, Schema

catalog = CatalogFactory.create({'warehouse': '/tmp/warehouse'})
catalog.create_database('default', False)

table_schema = pa.schema([
    ('a', pa.int32()),
    ('b', pa.int32()),
    ('c', pa.int32()),
    ('d', pa.int32()),
])

schema = Schema.from_pyarrow_schema(
    table_schema,
    options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'},
)
catalog.create_table('default.t', schema, False)
table = catalog.get_table('default.t')

# write initial data (a, b, c only)
write_builder = table.new_batch_write_builder()
write = write_builder.new_write().with_write_type(['a', 'b', 'c'])
commit = write_builder.new_commit()
write.write_arrow(pa.Table.from_pydict({'a': [1, 2], 'b': [10, 20], 'c': [100, 200]}))
commit.commit(write.prepare_commit())
write.close()
commit.close()

# shard update: read (a, b, c), write only (d)
update = write_builder.new_update()
update.with_read_projection(['a', 'b', 'c'])
update.with_update_type(['d'])

shard_idx = 0
num_shards = 1
upd = update.new_shard_updator(shard_idx, num_shards)
reader = upd.arrow_reader()

for batch in iter(reader.read_next_batch, None):
    a = batch.column('a').to_pylist()
    b = batch.column('b').to_pylist()
    c = batch.column('c').to_pylist()
    d = [ci + bi - ai for ai, bi, ci in zip(a, b, c)]

    upd.update_by_arrow_batch(
        pa.RecordBatch.from_pydict({'d': d}, schema=pa.schema([('d', pa.int32())]))
    )

commit_messages = upd.prepare_commit()
commit = write_builder.new_commit()
commit.commit(commit_messages)
commit.close()

Example: update an existing column c = b - a

update = write_builder.new_update()
update.with_read_projection(['a', 'b'])
update.with_update_type(['c'])

upd = update.new_shard_updator(0, 1)
reader = upd.arrow_reader()
for batch in iter(reader.read_next_batch, None):
    a = batch.column('a').to_pylist()
    b = batch.column('b').to_pylist()
    c = [bi - ai for ai, bi in zip(a, b)]
    upd.update_by_arrow_batch(
        pa.RecordBatch.from_pydict({'c': c}, schema=pa.schema([('c', pa.int32())]))
    )

commit_messages = upd.prepare_commit()
commit = write_builder.new_commit()
commit.commit(commit_messages)
commit.close()

Notes

  • Row order matters: the batches you write must have the same number of rows as the batches you read, in the same order for that shard.
  • Parallelism: run multiple shards by calling new_shard_updator(shard_idx, num_shards) for each shard.
Edit This Page
Copyright © 2025 The Apache Software Foundation. Apache Paimon, Paimon, and its feather logo are trademarks of The Apache Software Foundation.