Python API
Write and read Mosaic files from Python using PyArrow (pa.RecordBatch).
Setup
The Python API lives in the python/ directory and depends on the
mosaic_ffi shared library and pyarrow.
Build the native library first, then install the Python package:
# 1. Build the native library
cargo build --release -p mosaic-ffi
# 2. Install the Python package (bundles the native lib automatically)
cd python
pip install .
If you prefer a development install without bundling, use pip install -e .
and point the runtime to the native library via one of:
# Option A: MOSAIC_LIB_PATH (recommended)
export MOSAIC_LIB_PATH=/path/to/target/release
# Option B: System library path
export DYLD_LIBRARY_PATH=../target/release # macOS
export LD_LIBRARY_PATH=../target/release # Linux
Writing a File
1. Define an Arrow Schema
import pyarrow as pa
pa_schema = pa.schema([
pa.field("id", pa.int32(), nullable=False),
pa.field("name", pa.utf8()),
pa.field("score", pa.float64()),
pa.field("amount", pa.decimal128(10, 2)),
pa.field("ts", pa.timestamp("ms")),
])
2. Create Writer and Write Batches
Build PyArrow RecordBatch objects and pass them to
write(). The Arrow C Data Interface is used internally
for zero-copy transfer to the native library:
import io
import pyarrow as pa
from mosaic import MosaicWriter, WriterOptions
pa_schema = pa.schema([
pa.field("id", pa.int32()),
pa.field("name", pa.utf8()),
pa.field("score", pa.float64()),
])
opts = WriterOptions(
num_buckets=2,
compression=WriterOptions.COMPRESSION_ZSTD,
zstd_level=1,
)
batch = pa.record_batch([
pa.array(range(1000), type=pa.int32()),
pa.array([f"user_{i}" for i in range(1000)]),
pa.array([i * 1.5 for i in range(1000)]),
], names=["id", "name", "score"])
buf = io.BytesIO()
with MosaicWriter(buf, pa_schema, opts) as writer:
writer.write(batch)
data = buf.getvalue()
WriterOptions
| Parameter | Default | Description |
|---|---|---|
num_buckets | 0 | Number of buckets (0 = auto) |
compression | ZSTD (1) | 0 = none, 1 = Zstd |
zstd_level | 1 | Zstd compression level |
row_group_max_size | 256 MB | Max uncompressed bytes per row group |
max_dict_total_bytes | 32 KB | Max dictionary size per column |
max_dict_entries | 255 | Max distinct values for DICT encoding |
stats_columns | [] | Column names to build min/max stats for filter pushdown |
page_size_threshold | 32 KB | Min avg column page size to enable paged mode (per-column compression) |
Writer Methods
| Method / Property | Return | Description |
|---|---|---|
write(batch) | None | Write a pyarrow.RecordBatch or pyarrow.Table |
estimated_file_size() | int | Estimated output file size in bytes (for file rolling) |
close() | None | Flush remaining data and write footer |
num_row_groups | int | Number of row groups written (available after close) |
get_row_group_statistics(rg) | dict[str, ColumnStatistics] | Column statistics for a row group, keyed by column name (available after close) |
Reading a File
1. Open the Reader
Use from_input_file with callbacks to read from any data source
(memory buffers, remote storage, etc.):
from mosaic import MosaicReader
def read_at(offset, length):
return data[offset:offset + length]
reader = MosaicReader.from_input_file(read_at, len(data))
2. Inspect the Schema
The reader exposes the file schema as a standard PyArrow Schema object.
Columns are in the original input order by default:
print(reader.schema)
for field in reader.schema:
print(f"name={field.name} type={field.type} nullable={field.nullable}")
Reader Methods
| Method / Property | Return | Description |
|---|---|---|
schema | pa.Schema | Arrow Schema for the file (columns in original input order) |
num_row_groups | int | Row group count |
row_group_num_rows(rg) | int | Number of rows in a specific row group |
project(columns) | None | Set projection: subsequent reads return only the named columns in the specified order |
read_row_group(rg) | pa.RecordBatch | Read a row group (all columns or projected columns if project() was called) |
read_all() | pa.Table | Read entire file as a Table |
get_row_group_statistics(rg) | dict[str, ColumnStatistics] | Column statistics for a row group, keyed by column name |
3. Read Row Groups as Arrow RecordBatch
for rg in range(reader.num_row_groups):
batch = reader.read_row_group(rg)
print(f"rows: {batch.num_rows}")
ids = batch.column("id")
names = batch.column("name")
for i in range(batch.num_rows):
print(f"id={ids[i].as_py()} name={names[i].as_py()}")
Projection Pushdown
Use project() to select and reorder columns by name.
Only the buckets containing the projected columns are decompressed, significantly
reducing I/O and memory for wide tables. The output preserves the order you specify:
# Only read "name" and "score" columns, in that order
reader.project(["name", "score"])
batch = reader.read_row_group(rg)
# batch contains only "name" and "score", in that order
# Empty projection: count-only read (0 columns, row count preserved)
reader.project([])
num_rows = reader.row_group_num_rows(rg)
Column Statistics (Filter Pushdown)
When stats columns are configured during writing, statistics are available both from the writer (after close) and from the reader:
opts = WriterOptions(stats_columns=["id", "score"]) # build stats for "id" and "score"
# Get stats directly from the writer after close
with MosaicWriter(buf, pa_schema, opts) as writer:
writer.write(batch)
for rg in range(writer.num_row_groups):
stats = writer.get_row_group_statistics(rg) # dict[str, ColumnStatistics]
id_stats = stats["id"]
null_count = id_stats.null_count
# Or read stats from the reader
for rg in range(reader.num_row_groups):
stats = reader.get_row_group_statistics(rg)
for col_name, stat in stats.items():
null_count = stat.null_count
if stat.has_min_max:
min_val = stat.min # bytes, big-endian wire format
max_val = stat.max
ColumnStatistics
| Attribute | Type | Description |
|---|---|---|
null_count | int | Number of null values |
has_min_max | bool | Whether min/max are available |
min | bytes | Min value as big-endian bytes (None if all-null) |
max | bytes | Max value as big-endian bytes (None if all-null) |
Min/max values are returned as bytes in big-endian format matching the type's
wire format (e.g., 4 bytes for INTEGER, 8 bytes for BIGINT/DOUBLE, raw UTF-8 bytes for STRING).
Convenience Functions
Use write_table() and read_table() for quick Table-level I/O.
They use the same OutputFile / InputFile interfaces as
MosaicWriter and MosaicReader:
import io
import pyarrow as pa
import mosaic
table = pa.table({
"id": pa.array(range(100), type=pa.int32()),
"name": pa.array([f"user_{i}" for i in range(100)]),
})
# Write a Table (OutputFile: file-like object with write/flush)
buf = io.BytesIO()
mosaic.write_table(table, buf)
# Read a Table (InputFile: read_at callback + file_length)
data = buf.getvalue()
table = mosaic.read_table(
lambda offset, length: data[offset:offset + length],
len(data),
)
write_table / read_table
| Function | Description |
|---|---|
write_table(table, stream, options=None) | Write a pa.Table via an OutputFile (file-like object) |
read_table(read_at_fn, file_length, columns=None) | Read a pa.Table via an InputFile; columns is a list of column names to project |
Complete Example
import io
import pyarrow as pa
from mosaic import MosaicWriter, MosaicReader, WriterOptions
# 1. Write
pa_schema = pa.schema([
pa.field("id", pa.int32()),
pa.field("name", pa.utf8()),
pa.field("score", pa.float64()),
])
batch = pa.record_batch([
pa.array(range(100), type=pa.int32()),
pa.array([f"user_{i}" for i in range(100)]),
pa.array([i * 1.5 for i in range(100)]),
], names=["id", "name", "score"])
opts = WriterOptions(num_buckets=2)
buf = io.BytesIO()
with MosaicWriter(buf, pa_schema, opts) as writer:
writer.write(batch)
data = buf.getvalue()
# 2. Read
reader = MosaicReader.from_input_file(
lambda offset, length: data[offset:offset + length],
len(data),
)
with reader:
for rg in range(reader.num_row_groups):
batch = reader.read_row_group(rg)
for i in range(batch.num_rows):
print(f"id={batch.column('id')[i].as_py()} "
f"name={batch.column('name')[i].as_py()} "
f"score={batch.column('score')[i].as_py():.1f}")
project() to select and reorder output columns.
MosaicWriter and
MosaicReader
are context managers. Use with statements to ensure native memory
is freed promptly.