Skip to main content

Multimodal API

pypaimon.multimodal provides a high-level API for local Python applications on top of Paimon data-evolution tables. It does not replace the lower-level Catalog, Database, or Table APIs. Instead, a multimodal connection wraps a Paimon catalog plus a default database, and exposes a compact table API for scalar, text, vector, and blob data.

Creating a multimodal table enables row-tracking.enabled, data-evolution.enabled, deletion-vectors.enabled, and blob-as-descriptor by default. Global index scans use full fallback coverage through global-index.search-mode set to FULL. Data files use Vortex by default through file.format = vortex, and vector columns are stored in dedicated Vortex files through vector.file.format set to vortex. Install the Vortex extra when writing or reading default multimodal tables from Python:

pip install pypaimon[vortex]

Connect

connect creates a multimodal connection from Paimon catalog options. The options argument is forwarded to CatalogFactory.create.

import pypaimon.multimodal as pm

conn = pm.connect(
database="default",
options={
"warehouse": "file:///tmp/warehouse",
},
)

Table names without a database are resolved against the connection's default database. Fully-qualified names such as "analytics.docs" are also accepted. Use get_table to open an existing multimodal table. The table must be a data-evolution table without primary keys.

docs = conn.get_table("docs")

Create

import pyarrow as pa

docs = conn.create_table(
"docs",
schema=pa.schema([
pa.field("id", pa.int64()),
pa.field("content", pa.string()),
pa.field("embedding", pa.list_(pa.float32(), 3)),
pa.field("image", pa.large_binary()),
pa.field("category", pa.string()),
]),
ignore_if_exists=True,
)

Partition keys can be specified with partitioned, and table options can be specified with options. The multimodal API accepts pyarrow.Schema objects; it does not accept field dictionaries or pypaimon.Schema. Use Arrow fixed-size list types for vector columns, and Arrow binary or large-binary types for blob columns. When creating from data, pass an explicit schema if you need exact vector or blob types.

Add

add accepts pyarrow.Table, pyarrow.RecordBatch, a list of dictionaries, a dictionary of arrays, or a pandas DataFrame. Input columns are aligned and cast to the Paimon table schema before writing.

docs.add([
{
"id": 2,
"content": "Paimon stores mutable lakehouse tables.",
"embedding": [0.4, 0.5, 0.6],
"category": "lake",
}
])

Overwrite

overwrite accepts the same input formats as add and replaces existing data using Paimon's batch overwrite semantics. On an unpartitioned table it replaces the whole table. On a partitioned table it follows the table's dynamic-partition-overwrite option; with the default dynamic mode, only partitions present in the input data are replaced.

docs.overwrite([
{
"id": 3,
"content": "Fresh replacement text",
"embedding": [0.7, 0.8, 0.9],
"category": "docs",
}
])

For static partition overwrite, disable dynamic partition overwrite on the table and pass the target partition:

docs.overwrite(
[
{
"id": 4,
"content": "Only this day is replaced.",
"category": "docs",
"dt": "2024-01-01",
}
],
partition={"dt": "2024-01-01"},
)

Update

update modifies rows matched by a SQL-like predicate.

docs.update(
where="id = 2",
values={"category": "docs"},
)

Delete

delete removes rows matched by a SQL-like predicate. It uses the same predicate syntax as scan().where(...) and update(...). Multimodal tables enable deletion vectors by default; on partitioned tables, a predicate that references only partition columns uses the partition drop path.

docs.delete(where="id = 2")

Merge

Use merge for idempotent ingestion and matched-row deletes. The builder delegates to Paimon's local MERGE INTO implementation.

execute takes the source rows for the merge. It accepts the same input formats as add, including pyarrow.Table, pyarrow.RecordBatch, a list of dictionaries, a dictionary of arrays, or a pandas DataFrame. A list of dictionaries is convenient for small batches. when_matched_update() updates the columns present in the source data and leaves omitted target columns unchanged. when_matched_delete() deletes matched rows. when_not_matched_insert() inserts the columns present in the source data and fills omitted target columns with null.

from pypaimon.multimodal import source_col

docs.merge("id") \
.when_matched_update(
where="source.category != target.category",
) \
.when_not_matched_insert() \
.execute([
{
"id": 2,
"content": "Updated text",
"embedding": [0.7, 0.8, 0.9],
"category": "docs",
}
])

Use when_matched_delete() for matched rows that should be removed:

docs.merge("id") \
.when_matched_delete(where="source.deleted = TRUE") \
.when_matched_update() \
.execute([
{
"id": 2,
"content": "Updated text",
"embedding": [0.7, 0.8, 0.9],
"category": "docs",
"deleted": True,
}
])

Clause predicates use where for update and delete clauses. Refer to source rows with source.<column> and target rows with target.<column>. Install pypaimon[sql] when using merge clause predicates.

When source and target key names differ, pass a mapping from target column to source column:

docs.merge({"id": "doc_id"}) \
.when_matched_update({"category": source_col("new_category")}) \
.when_not_matched_insert({
"id": source_col("doc_id"),
"content": source_col("content"),
"category": source_col("new_category"),
}) \
.execute([
{"doc_id": 3, "content": "New doc", "new_category": "search"},
])

Scan

Use scan() for ordinary table reads. where() accepts SQL-like predicate strings; it does not accept lower-level Predicate objects.

result = (
docs.scan()
.where("category = 'lake'")
.select(["id", "content"])
.limit(10)
.to_pandas()
)

Reading BLOB columns

scan().read_blobs(column) bulk-fetches a BLOB column's bytes for the filtered rows using concurrent, same-file coalesced ranged reads. This is much faster than a per-row loop and avoids the slow row-by-row blob resolution on data-evolution tables. It returns (scalar_table, {column: [bytes | None]}), row-aligned; the scalar table drops all BLOB columns.

scalar, blobs = (
docs.scan()
.where("category = 'lake'")
.read_blobs("image", parallelism=32)
)
images = blobs["image"] # list[bytes | None], aligned with scalar rows

where() takes SQL-like strings only, so filter by a list of keys with IN (...):

ids = ["a", "b", "c"]
in_clause = ", ".join(f"'{i}'" for i in ids)
scalar, blobs = docs.scan().where(f"id IN ({in_clause})").read_blobs("image")

where() is a SQL string, so build the IN (...) clause only from trusted, already-escaped ids -- do not interpolate untrusted external input.

Read several BLOB columns at once (fetched together, not column-by-column):

scalar, blobs = docs.scan().where("category = 'lake'").read_blobs(["image", "audio"])

For a memory-bounded read (e.g. streaming into a trainer), stream_blobs yields one batch at a time, so peak memory is a single batch rather than the whole result:

for scalar, blobs in docs.scan().where("category = 'lake'").stream_blobs("image"):
for img in blobs["image"]:
... # feed the trainer, then drop the batch

Notes:

  • parallelism ~16–32 is the sweet spot over the public network; go higher on an internal/VPC endpoint.
  • Fastest when the matching rows are contiguous (bulk/range reads coalesce into a few large reads); scattered point reads coalesce less.
  • Blob reads are available only on scan(), not on the search() queries.

Row IDs

Multimodal tables enable row-tracking.enabled by default, so each row has a Paimon system column named _ROW_ID. Use _ROW_ID as an internal coordination key for retrieval, reranking, inference, and training jobs. Keep user-visible document IDs, object keys, or primary keys in normal columns.

Use with_row_id() to include _ROW_ID in scan or search results. The method appends _ROW_ID to the current projection; if no projection is set, it returns all table columns plus _ROW_ID.

candidates = (
docs.search([0.1, 0.2, 0.3], column="embedding")
.where("category = 'lake'")
.select(["id", "content"])
.limit(100)
.with_row_id()
.to_pandas()
)

row_ids = candidates["_ROW_ID"].tolist()

When a row-id manifest is persisted beyond a short-lived latest-snapshot workflow, record the source snapshot or tag and pass it back to the read API. scan, search, search_vectors, search_hybrid, and take_row_ids accept snapshot_id or tag_name. These two options are mutually exclusive.

Use take_row_ids to fetch rows selected by an earlier scan, vector search, full-text search, hybrid search, sampler, or split manifest. Results are not guaranteed to follow the input row-id order. Include _ROW_ID and reorder on the client if order matters.

payload = (
docs.take_row_ids(row_ids, snapshot_id=source_snapshot_id)
.select(["id", "content", "image"])
.with_row_id()
.to_arrow()
.to_pylist()
)

payload_by_row_id = {row["_ROW_ID"]: row for row in payload}
ordered_payload = [payload_by_row_id[row_id] for row_id in row_ids]

This pattern keeps broad candidate generation cheap: first search or filter to produce a compact row-id manifest, then fetch only the columns needed by the next stage.

# Stage 1: broad retrieval with a narrow projection.
manifest = (
docs.search(
query_vector,
column="embedding",
snapshot_id=source_snapshot_id,
)
.select(["id"])
.limit(500)
.with_row_id()
.to_pandas()
)

# Stage 2: expensive reranking payload, fetched only for candidates.
rerank_payload = (
docs.take_row_ids(
manifest["_ROW_ID"].tolist(),
snapshot_id=source_snapshot_id,
)
.select(["id", "title", "body"])
.with_row_id()
.to_list()
)

payload_by_row_id = {row["_ROW_ID"]: row for row in rerank_payload}
rerank_inputs = [
{
"row_id": row_id,
"candidate_rank": rank,
"doc": payload_by_row_id[row_id],
}
for rank, row_id in enumerate(manifest["_ROW_ID"])
]

For offline inference, feature backfills, or training splits, store row IDs in a work queue or manifest table together with the source table snapshot or tag used to produce them. Workers can then read row-id batches and fetch only the columns they need:

def run_inference_worker(docs, row_id_batch, source_snapshot_id):
rows = (
docs.take_row_ids(row_id_batch, snapshot_id=source_snapshot_id)
.select(["id", "content"])
.with_row_id()
.to_list()
)

outputs = []
for row in rows:
outputs.append(
{
"id": row["id"],
"source_row_id": row["_ROW_ID"],
"prediction": model_predict(row["content"]),
}
)
return outputs

Best practices:

  • Treat _ROW_ID as an internal row handle, not a business identifier.
  • Store the source snapshot or tag with persisted row-id manifests.
  • Pass the stored snapshot_id or tag_name when consuming persisted row-id manifests.
  • Use business keys or application columns when writing inference or training outputs back to a table.
  • Reorder take_row_ids results client-side when input order matters.

Create Index

Use create_index to create the global indexes used by search APIs. The index_type argument is required; the multimodal API does not choose a default index implementation. The full-text alias is normalized to tantivy-fulltext.

docs.create_index("embedding", index_type="ivf-pq")
docs.create_index("content", index_type="full-text")

Use search for one vector query or one full-text query.

If the table has exactly one vector column, column can be omitted for vector search. A string query is shorthand for a full-text match query when the table has exactly one text column. To target a specific text column, pass a FullTextQuery object.

Use pre_filter to prune search candidates before ranking. Use where() to filter the rows read from the search result. Both pre_filter and where() accept SQL-like predicate strings. For full-text search, pre_filter must only reference partition columns.

neighbors = (
docs.search(
[0.1, 0.2, 0.3],
column="embedding",
pre_filter="category = 'lake'",
)
.limit(10)
.to_arrow()
)

matches = (
docs.search("paimon vector")
.limit(10)
.to_pandas()
)
from pypaimon.globalindex.full_text_query import FullTextQuery

content_query = FullTextQuery.from_dict({
"match": {
"column": "content",
"terms": "paimon vector",
"operator": "And",
},
})

matches = docs.search(content_query).limit(10).to_list()

Search Hybrid

Use search_hybrid to combine vector and full-text routes, then rerank the merged candidates. Create the indexes required by the routes you use.

Pass route specs to search_hybrid. Each route can set its own candidate limit, weight, and vector index options. String text routes infer the text column when the table has exactly one text column. To target a specific text column, pass a FullTextQuery object to pm.text_route.

pre_filter is applied before ranking. It accepts a SQL-like predicate string. When a hybrid query has a full-text route, pre_filter must only reference partition columns.

# This example assumes the table is partitioned by dt.
hybrid = (
docs.search_hybrid(
[
pm.vector_route("embedding", query_vector, limit=50),
pm.text_route("paimon vector", weight=0.2),
],
pre_filter="dt = '2026-07-01'",
)
.rerank("rrf")
.limit(10)
.to_list()
)

For multiple vector fields, add multiple vector routes:

hybrid = (
docs.search_hybrid(
[
pm.vector_route(
"image_embedding",
image_vector,
weight=0.7,
limit=50,
options={"nprobe": "8"},
),
pm.vector_route(
"text_embedding",
text_vector,
weight=0.3,
limit=50,
),
pm.text_route("paimon vector", weight=0.2),
],
ranker="weighted_score",
)
.limit(10)
.to_list()
)

Dictionary route specs are also accepted:

hybrid = docs.search_hybrid(
[
{
"column": "image_embedding",
"vector": image_vector,
"weight": 0.7,
"limit": 50,
"options": {"nprobe": "8"},
},
{
"column": "text_embedding",
"vector": text_vector,
"weight": 0.3,
"limit": 50,
},
],
)

Dictionary vector routes also accept anns_field, data, and param aliases.

Search Vectors

Use search_vectors for multiple query vectors against one vector column. It returns one result set for each input vector, preserving input order.

batch_neighbors = (
docs.search_vectors(
[
[0.1, 0.2, 0.3],
[0.4, 0.5, 0.6],
],
column="embedding",
pre_filter="category = 'lake'",
)
.limit(10)
.to_list()
)

Blobs

Use blobs() to work with a BLOB column through an S3-like object API. A blob store maps object keys to a table column such as key, object_key, or an explicit key_column.

Put Objects

put_object writes one object. put_objects writes a batch of objects in one Paimon commit. Both methods upsert by key: existing matching rows are updated, and missing keys append new rows. If the same key appears more than once in a single put_objects batch, the last object wins. BlobStore does not enforce object-key uniqueness; that should be modeled with a table-level unique key when available. The object body can be raw bytes, a binary file-like object, or a PyPaimon Blob. Use columns to set the non-key, non-BLOB table columns for the object row. For managed BLOB storage, use head_object or get_object to inspect the stored descriptor after the write.

store = docs.blobs(column="image", key_column="key")

store.put_object(
"images/cat.jpg",
body=open("cat.jpg", "rb"),
columns={"content_type": "image/jpeg"},
)

store.put_objects([
{
"key": "images/dog.jpg",
"body": open("dog.jpg", "rb"),
"columns": {"content_type": "image/jpeg"},
},
{
"key": "images/logo.png",
"body": b"...",
"columns": {"content_type": "image/png"},
},
])

For Paimon-native reference storage, configure the BLOB column as a blob-descriptor-field and pass uri/offset/length or descriptor to put_object and put_objects. The table stores the external BlobDescriptor instead of copying the object into Paimon .blob files. Without blob-descriptor-field, descriptor-backed inputs are streamed into managed Paimon .blob files. External S3 URIs are read with the table's FileIO, so S3 credentials such as fs.s3.accessKeyId, fs.s3.accessKeySecret, fs.s3.securityToken, fs.s3.endpoint, and fs.s3.region can be supplied in the options passed to connect. The same options are used for every external URI; there is no per-object credential override.

store.put_objects([
{
"key": "videos/intro.mp4",
"uri": "s3://bucket/videos/intro.mp4",
"offset": 0,
"length": intro_content_length,
"columns": {"content_type": "video/mp4"},
},
{
"key": "videos/demo.mp4",
"uri": "s3://bucket/videos/demo.mp4",
"offset": 0,
"length": demo_content_length,
"columns": {"content_type": "video/mp4"},
},
])

store.put_object(
"videos/trailer.mp4",
uri="s3://bucket/videos/trailer.mp4",
offset=0,
length=trailer_content_length,
columns={"content_type": "video/mp4"},
)

Read Objects

get_object reads an object by key. Its range parameter accepts a single S3-style byte range such as bytes=0-1023, bytes=1024-, or bytes=-512. head_object and list_objects return object information without opening the object stream. get_object, head_object, and list_objects expose non-key, non-BLOB table columns through columns. These columns are all returned by default. Pass columns to return only selected columns, or [] to skip them. list_objects requires a non-negative limit; limit=0 returns an empty list.

obj = store.get_object("images/cat.jpg", range="bytes=0-1023")
with obj.open() as stream:
chunk = stream.read()

info = store.head_object(
"images/cat.jpg",
columns=["content_type"],
)
content_type = info.columns["content_type"]

objects = store.list_objects(prefix="images/", columns=[])
for obj_info in objects:
print(obj_info.key, obj_info.size, obj_info.columns)

Delete Objects

delete_object deletes one object key. delete_objects deletes a batch of keys in one Paimon commit. Missing keys are ignored, and repeated keys in the same request are folded before deletion. BlobStore does not enforce object-key uniqueness; if multiple table rows already match a key, deleting that key removes all matching rows.

store.delete_object("images/dog.jpg")

store.delete_objects([
"images/old-1.jpg",
"images/old-2.jpg",
])

Update Columns

Amazon S3 updates object metadata by copying the object to the same key with replacement metadata. BlobStore exposes the corresponding Paimon-native operation directly: update_object_columns and update_objects_columns update non-key, non-BLOB table columns without rewriting the blob data. Missing keys raise NoSuchKey.

store.update_object_columns(
"images/cat.jpg",
{"content_type": "image/webp", "owner": "alice"},
)

store.update_objects_columns([
{
"key": "images/dog.jpg",
"columns": {"content_type": "image/webp"},
},
{
"key": "images/logo.png",
"columns": {"content_type": "image/svg+xml"},
},
])