This documentation is for an unreleased version of Apache Paimon. We recommend you use the latest stable version.
Data Evolution #
PyPaimon for Data Evolution mode. See Data Evolution.
Prerequisites #
To use partial updates / data evolution, enable both options when creating the table:
row-tracking.enabled:truedata-evolution.enabled:true
Update Columns By Row ID #
You can create TableUpdate.update_by_arrow_with_row_id to update columns to data evolution tables.
The input data should include the _ROW_ID column, update operation will automatically sort and match each _ROW_ID to
its corresponding first_row_id, then groups rows with the same first_row_id and writes them to a separate file.
Requirements for _ROW_ID updates
- All rows required: the input table must contain exactly the full table row count (one row per existing row).
- Row id coverage: after sorting by
_ROW_ID, it must be 0..N-1 (no duplicates, no gaps). - Update columns only: include
_ROW_IDplus the columns you want to update (partial schema is OK).
import pyarrow as pa
from pypaimon import CatalogFactory, Schema
catalog = CatalogFactory.create({'warehouse': '/tmp/warehouse'})
catalog.create_database('default', False)
simple_pa_schema = pa.schema([
('f0', pa.int8()),
('f1', pa.int16()),
])
schema = Schema.from_pyarrow_schema(simple_pa_schema,
options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'})
catalog.create_table('default.test_row_tracking', schema, False)
table = catalog.get_table('default.test_row_tracking')
# write all columns
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
expect_data = pa.Table.from_pydict({
'f0': [-1, 2],
'f1': [-1001, 1002]
}, schema=simple_pa_schema)
table_write.write_arrow(expect_data)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()
# update partial columns
write_builder = table.new_batch_write_builder()
table_update = write_builder.new_update().with_update_type(['f0'])
table_commit = write_builder.new_commit()
data2 = pa.Table.from_pydict({
'_ROW_ID': [0, 1],
'f0': [5, 6],
}, schema=pa.schema([
('_ROW_ID', pa.int64()),
('f0', pa.int8()),
]))
cmts = table_update.update_by_arrow_with_row_id(data2)
table_commit.commit(cmts)
table_commit.close()
# content should be:
# 'f0': [5, 6],
# 'f1': [-1001, 1002]
Update Columns By Shards #
If you want to compute a derived column (or update an existing column based on other columns) without providing
_ROW_ID, you can use the shard scan + rewrite workflow:
- Read only the columns you need (projection)
- Compute the new values in the same row order
- Write only the updated columns back
- Commit per shard
This is useful for backfilling a newly added column, or recomputing a column from other columns.
Example: compute d = c + b - a
import pyarrow as pa
from pypaimon import CatalogFactory, Schema
catalog = CatalogFactory.create({'warehouse': '/tmp/warehouse'})
catalog.create_database('default', False)
table_schema = pa.schema([
('a', pa.int32()),
('b', pa.int32()),
('c', pa.int32()),
('d', pa.int32()),
])
schema = Schema.from_pyarrow_schema(
table_schema,
options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'},
)
catalog.create_table('default.t', schema, False)
table = catalog.get_table('default.t')
# write initial data (a, b, c only)
write_builder = table.new_batch_write_builder()
write = write_builder.new_write().with_write_type(['a', 'b', 'c'])
commit = write_builder.new_commit()
write.write_arrow(pa.Table.from_pydict({'a': [1, 2], 'b': [10, 20], 'c': [100, 200]}))
commit.commit(write.prepare_commit())
write.close()
commit.close()
# shard update: read (a, b, c), write only (d)
update = write_builder.new_update()
update.with_read_projection(['a', 'b', 'c'])
update.with_update_type(['d'])
shard_idx = 0
num_shards = 1
upd = update.new_shard_updator(shard_idx, num_shards)
reader = upd.arrow_reader()
for batch in iter(reader.read_next_batch, None):
a = batch.column('a').to_pylist()
b = batch.column('b').to_pylist()
c = batch.column('c').to_pylist()
d = [ci + bi - ai for ai, bi, ci in zip(a, b, c)]
upd.update_by_arrow_batch(
pa.RecordBatch.from_pydict({'d': d}, schema=pa.schema([('d', pa.int32())]))
)
commit_messages = upd.prepare_commit()
commit = write_builder.new_commit()
commit.commit(commit_messages)
commit.close()
Example: update an existing column c = b - a
update = write_builder.new_update()
update.with_read_projection(['a', 'b'])
update.with_update_type(['c'])
upd = update.new_shard_updator(0, 1)
reader = upd.arrow_reader()
for batch in iter(reader.read_next_batch, None):
a = batch.column('a').to_pylist()
b = batch.column('b').to_pylist()
c = [bi - ai for ai, bi in zip(a, b)]
upd.update_by_arrow_batch(
pa.RecordBatch.from_pydict({'c': c}, schema=pa.schema([('c', pa.int32())]))
)
commit_messages = upd.prepare_commit()
commit = write_builder.new_commit()
commit.commit(commit_messages)
commit.close()
Notes
- Row order matters: the batches you write must have the same number of rows as the batches you read, in the same order for that shard.
- Parallelism: run multiple shards by calling
new_shard_updator(shard_idx, num_shards)for each shard.