Python API

Write and read Mosaic files from Python using PyArrow (pa.RecordBatch).

Setup

The Python API lives in the python/ directory and depends on the mosaic_ffi shared library and pyarrow. Build the native library first, then install the Python package:

# 1. Build the native library
cargo build --release -p mosaic-ffi

# 2. Install the Python package (bundles the native lib automatically)
cd python
pip install .

If you prefer a development install without bundling, use pip install -e . and point the runtime to the native library via one of:

# Option A: MOSAIC_LIB_PATH (recommended)
export MOSAIC_LIB_PATH=/path/to/target/release

# Option B: System library path
export DYLD_LIBRARY_PATH=../target/release  # macOS
export LD_LIBRARY_PATH=../target/release    # Linux

Writing a File

1. Define an Arrow Schema

import pyarrow as pa

pa_schema = pa.schema([
    pa.field("id", pa.int32(), nullable=False),
    pa.field("name", pa.utf8()),
    pa.field("score", pa.float64()),
    pa.field("amount", pa.decimal128(10, 2)),
    pa.field("ts", pa.timestamp("ms")),
])

2. Create Writer and Write Batches

Build PyArrow RecordBatch objects and pass them to write(). The Arrow C Data Interface is used internally for zero-copy transfer to the native library:

import io
import pyarrow as pa
from mosaic import MosaicWriter, WriterOptions

pa_schema = pa.schema([
    pa.field("id", pa.int32()),
    pa.field("name", pa.utf8()),
    pa.field("score", pa.float64()),
])

opts = WriterOptions(
    num_buckets=2,
    compression=WriterOptions.COMPRESSION_ZSTD,
    zstd_level=1,
)

batch = pa.record_batch([
    pa.array(range(1000), type=pa.int32()),
    pa.array([f"user_{i}" for i in range(1000)]),
    pa.array([i * 1.5 for i in range(1000)]),
], names=["id", "name", "score"])

buf = io.BytesIO()
with MosaicWriter(buf, pa_schema, opts) as writer:
    writer.write(batch)

data = buf.getvalue()

WriterOptions

ParameterDefaultDescription
num_buckets0Number of buckets (0 = auto)
compressionZSTD (1)0 = none, 1 = Zstd
zstd_level1Zstd compression level
row_group_max_size256 MBMax uncompressed bytes per row group
max_dict_total_bytes32 KBMax dictionary size per column
max_dict_entries255Max distinct values for DICT encoding
stats_columns[]Column names to build min/max stats for filter pushdown
page_size_threshold32 KBMin avg column page size to enable paged mode (per-column compression)

Writer Methods

Method / PropertyReturnDescription
write(batch)NoneWrite a pyarrow.RecordBatch or pyarrow.Table
estimated_file_size()intEstimated output file size in bytes (for file rolling)
close()NoneFlush remaining data and write footer
num_row_groupsintNumber of row groups written (available after close)
get_row_group_statistics(rg)dict[str, ColumnStatistics]Column statistics for a row group, keyed by column name (available after close)

Reading a File

1. Open the Reader

Use from_input_file with callbacks to read from any data source (memory buffers, remote storage, etc.):

from mosaic import MosaicReader

def read_at(offset, length):
    return data[offset:offset + length]

reader = MosaicReader.from_input_file(read_at, len(data))

2. Inspect the Schema

The reader exposes the file schema as a standard PyArrow Schema object. Columns are in the original input order by default:

print(reader.schema)
for field in reader.schema:
    print(f"name={field.name} type={field.type} nullable={field.nullable}")

Reader Methods

Method / PropertyReturnDescription
schemapa.SchemaArrow Schema for the file (columns in original input order)
num_row_groupsintRow group count
row_group_num_rows(rg)intNumber of rows in a specific row group
project(columns)NoneSet projection: subsequent reads return only the named columns in the specified order
read_row_group(rg)pa.RecordBatchRead a row group (all columns or projected columns if project() was called)
read_all()pa.TableRead entire file as a Table
get_row_group_statistics(rg)dict[str, ColumnStatistics]Column statistics for a row group, keyed by column name

3. Read Row Groups as Arrow RecordBatch

for rg in range(reader.num_row_groups):
    batch = reader.read_row_group(rg)
    print(f"rows: {batch.num_rows}")

    ids = batch.column("id")
    names = batch.column("name")
    for i in range(batch.num_rows):
        print(f"id={ids[i].as_py()} name={names[i].as_py()}")

Projection Pushdown

Use project() to select and reorder columns by name. Only the buckets containing the projected columns are decompressed, significantly reducing I/O and memory for wide tables. The output preserves the order you specify:

# Only read "name" and "score" columns, in that order
reader.project(["name", "score"])
batch = reader.read_row_group(rg)
# batch contains only "name" and "score", in that order

# Empty projection: count-only read (0 columns, row count preserved)
reader.project([])
num_rows = reader.row_group_num_rows(rg)

Column Statistics (Filter Pushdown)

When stats columns are configured during writing, statistics are available both from the writer (after close) and from the reader:

opts = WriterOptions(stats_columns=["id", "score"])  # build stats for "id" and "score"
# Get stats directly from the writer after close
with MosaicWriter(buf, pa_schema, opts) as writer:
    writer.write(batch)

for rg in range(writer.num_row_groups):
    stats = writer.get_row_group_statistics(rg)  # dict[str, ColumnStatistics]
    id_stats = stats["id"]
    null_count = id_stats.null_count
# Or read stats from the reader
for rg in range(reader.num_row_groups):
    stats = reader.get_row_group_statistics(rg)
    for col_name, stat in stats.items():
        null_count = stat.null_count
        if stat.has_min_max:
            min_val = stat.min   # bytes, big-endian wire format
            max_val = stat.max

ColumnStatistics

AttributeTypeDescription
null_countintNumber of null values
has_min_maxboolWhether min/max are available
minbytesMin value as big-endian bytes (None if all-null)
maxbytesMax value as big-endian bytes (None if all-null)

Min/max values are returned as bytes in big-endian format matching the type's wire format (e.g., 4 bytes for INTEGER, 8 bytes for BIGINT/DOUBLE, raw UTF-8 bytes for STRING).

Convenience Functions

Use write_table() and read_table() for quick Table-level I/O. They use the same OutputFile / InputFile interfaces as MosaicWriter and MosaicReader:

import io
import pyarrow as pa
import mosaic

table = pa.table({
    "id": pa.array(range(100), type=pa.int32()),
    "name": pa.array([f"user_{i}" for i in range(100)]),
})

# Write a Table (OutputFile: file-like object with write/flush)
buf = io.BytesIO()
mosaic.write_table(table, buf)

# Read a Table (InputFile: read_at callback + file_length)
data = buf.getvalue()
table = mosaic.read_table(
    lambda offset, length: data[offset:offset + length],
    len(data),
)

write_table / read_table

FunctionDescription
write_table(table, stream, options=None)Write a pa.Table via an OutputFile (file-like object)
read_table(read_at_fn, file_length, columns=None)Read a pa.Table via an InputFile; columns is a list of column names to project

Complete Example

import io
import pyarrow as pa
from mosaic import MosaicWriter, MosaicReader, WriterOptions

# 1. Write
pa_schema = pa.schema([
    pa.field("id", pa.int32()),
    pa.field("name", pa.utf8()),
    pa.field("score", pa.float64()),
])

batch = pa.record_batch([
    pa.array(range(100), type=pa.int32()),
    pa.array([f"user_{i}" for i in range(100)]),
    pa.array([i * 1.5 for i in range(100)]),
], names=["id", "name", "score"])

opts = WriterOptions(num_buckets=2)

buf = io.BytesIO()
with MosaicWriter(buf, pa_schema, opts) as writer:
    writer.write(batch)

data = buf.getvalue()

# 2. Read
reader = MosaicReader.from_input_file(
    lambda offset, length: data[offset:offset + length],
    len(data),
)
with reader:
    for rg in range(reader.num_row_groups):
        batch = reader.read_row_group(rg)
        for i in range(batch.num_rows):
            print(f"id={batch.column('id')[i].as_py()} "
                  f"name={batch.column('name')[i].as_py()} "
                  f"score={batch.column('score')[i].as_py():.1f}")
Column ordering Columns are stored on disk in name-sorted order for compression. The reader returns columns in the original input order by default. Use project() to select and reorder output columns.
Resource management MosaicWriter and MosaicReader are context managers. Use with statements to ensure native memory is freed promptly.