Skip to content

Serialization

Arrow serialization layer for automatic schema generation and IPC validation.

ArrowSerializableDataclass

Use ArrowSerializableDataclass as a mixin for dataclasses that need Arrow serialization. The Arrow schema is generated automatically from field annotations:

from dataclasses import dataclass
from typing import Annotated

import pyarrow as pa

from vgi_rpc import ArrowSerializableDataclass, ArrowType


@dataclass(frozen=True)
class Measurement(ArrowSerializableDataclass):
    timestamp: str
    value: float
    count: Annotated[int, ArrowType(pa.int32())]  # explicit Arrow type override

# Auto-generated schema
print(Measurement.ARROW_SCHEMA)
# timestamp: string
# value: double
# count: int32

# Serialize / deserialize
m = Measurement(timestamp="2024-01-01T00:00:00Z", value=42.0, count=7)
data = m.serialize_to_bytes()
m2 = Measurement.deserialize_from_bytes(data)
assert m == m2

These dataclasses work directly as RPC parameters and return types. They're also the base class for StreamState — any field you add to a stream state is automatically serialized between calls.

Type mappings

Python type Arrow type
str utf8
bytes binary
int int64
float float64
bool bool_
list[T] list_<T>
dict[K, V] map_<K, V>
frozenset[T] list_<T>
Enum dictionary(int32, utf8)
Optional[T] nullable T
nested ArrowSerializableDataclass struct
Annotated[T, ArrowType(...)] explicit override

API Reference

ArrowSerializableDataclass

ArrowSerializableDataclass

Mixin for dataclasses with automatic Arrow IPC serialization.

Provides automatic schema generation and serialization/deserialization for frozen dataclasses. The ARROW_SCHEMA is auto-generated from field type annotations.

Auto-detected types: - Basic types: str, bytes, int, float, bool - Generic types: list[T], dict[K, V], frozenset[T] - NewType: unwraps to underlying type (e.g., NewType("Id", bytes) -> binary) - Enum: serializes as dictionary-encoded string via .name - ArrowSerializableDataclass: serializes as struct

Not supported: - tuple: Arrow has no native heterogeneous-tuple type. Use a nested dataclass (ArrowSerializableDataclass) for fixed, named fields, or list[T] for homogeneous sequences.

Optional fields (annotated with | None) are marked as nullable. To override specific field types, use Annotated with ArrowType.

ATTRIBUTE DESCRIPTION
ARROW_SCHEMA

Auto-generated Arrow schema from field annotations.

TYPE: Schema

serialize

serialize(dest: IOBase) -> None

Serialize this instance to an Arrow IPC stream.

PARAMETER DESCRIPTION
dest

The destination to write to (must support binary writes, e.g., stdout pipe, BufferedWriter).

TYPE: IOBase

Source code in vgi_rpc/utils.py
def serialize(self, dest: IOBase) -> None:
    """Serialize this instance to an Arrow IPC stream.

    Args:
        dest: The destination to write to (must support binary writes,
            e.g., stdout pipe, BufferedWriter).

    """
    serialize_record_batch(dest, self._serialize())

serialize_to_bytes

serialize_to_bytes() -> bytes

Serialize this instance to Arrow IPC bytes.

RETURNS DESCRIPTION
bytes

Arrow IPC stream bytes containing a single-row RecordBatch.

Source code in vgi_rpc/utils.py
def serialize_to_bytes(self) -> bytes:
    """Serialize this instance to Arrow IPC bytes.

    Returns:
        Arrow IPC stream bytes containing a single-row RecordBatch.

    """
    return serialize_record_batch_bytes(self._serialize())

deserialize_from_batch classmethod

deserialize_from_batch(
    batch: RecordBatch,
    custom_metadata: KeyValueMetadata | None = None,
    *,
    ipc_validation: IpcValidation = FULL
) -> Self

Deserialize an instance from an Arrow RecordBatch.

PARAMETER DESCRIPTION
batch

Single-row RecordBatch containing the serialized data.

TYPE: RecordBatch

custom_metadata

Optional metadata from the batch (unused, reserved for subclass overrides).

TYPE: KeyValueMetadata | None DEFAULT: None

ipc_validation

Validation level for nested IPC batches.

TYPE: IpcValidation DEFAULT: FULL

RETURNS DESCRIPTION
Self

Deserialized instance of this class.

RAISES DESCRIPTION
ValueError

If the batch is invalid (wrong row count or missing fields).

TypeError

If a field value has an unexpected type during conversion.

KeyError

If an Enum name cannot be resolved.

Source code in vgi_rpc/utils.py
@classmethod
def deserialize_from_batch(
    cls,
    batch: pa.RecordBatch,
    custom_metadata: pa.KeyValueMetadata | None = None,
    *,
    ipc_validation: IpcValidation = IpcValidation.FULL,
) -> Self:
    """Deserialize an instance from an Arrow RecordBatch.

    Args:
        batch: Single-row RecordBatch containing the serialized data.
        custom_metadata: Optional metadata from the batch (unused,
            reserved for subclass overrides).
        ipc_validation: Validation level for nested IPC batches.

    Returns:
        Deserialized instance of this class.

    Raises:
        ValueError: If the batch is invalid (wrong row count or missing fields).
        TypeError: If a field value has an unexpected type during conversion.
        KeyError: If an Enum name cannot be resolved.

    """
    # Get required fields (those without defaults) from dataclass definition.
    # Fields with defaults or default_factory are optional for compatibility.
    required_fields = []
    for f in dataclass_fields(cls):
        has_default = f.default is not MISSING or f.default_factory is not MISSING
        if not has_default:
            required_fields.append(f.name)

    # Validate and extract row
    row = _validate_single_row_batch(
        batch,
        cls.__name__,
        required_fields=required_fields,
    )

    # Use get_type_hints to resolve string annotations
    try:
        type_hints = get_type_hints(cls)
    except Exception:
        type_hints = {f.name: f.type for f in dataclass_fields(cls)}

    # Convert values back to expected Python types
    kwargs: dict[str, Any] = {}
    for field in dataclass_fields(cls):
        # Check if field is present in the row
        if field.name not in row:
            # Use default if available (for backward compatibility)
            if field.default is not MISSING:
                kwargs[field.name] = field.default
            elif field.default_factory is not MISSING:
                kwargs[field.name] = field.default_factory()
            # If no default, it would have been caught by validate_single_row_batch
            continue

        value = row.get(field.name)
        field_type = type_hints.get(field.name, field.type)

        # Unwrap Annotated to get actual type
        if get_origin(field_type) is Annotated:
            args = get_args(field_type)
            field_type = args[0] if args else field_type

        # Convert value based on field type
        value = cls._convert_value_for_deserialization(value, field_type, ipc_validation)
        kwargs[field.name] = value

    return cls(**kwargs)

deserialize_from_bytes classmethod

deserialize_from_bytes(
    data: bytes, ipc_validation: IpcValidation = FULL
) -> Self

Deserialize an instance from Arrow IPC bytes.

PARAMETER DESCRIPTION
data

Arrow IPC stream bytes containing a single-row RecordBatch.

TYPE: bytes

ipc_validation

Validation level for the deserialized batch.

TYPE: IpcValidation DEFAULT: FULL

RETURNS DESCRIPTION
Self

Deserialized instance of this class.

RAISES DESCRIPTION
ValueError

If the batch is invalid (wrong row count or missing fields).

IPCError

If the IPC stream is malformed or truncated.

TypeError

If a field value has an unexpected type during conversion.

KeyError

If an Enum name cannot be resolved.

Source code in vgi_rpc/utils.py
@classmethod
def deserialize_from_bytes(cls, data: bytes, ipc_validation: IpcValidation = IpcValidation.FULL) -> Self:
    """Deserialize an instance from Arrow IPC bytes.

    Args:
        data: Arrow IPC stream bytes containing a single-row RecordBatch.
        ipc_validation: Validation level for the deserialized batch.

    Returns:
        Deserialized instance of this class.

    Raises:
        ValueError: If the batch is invalid (wrong row count or missing fields).
        IPCError: If the IPC stream is malformed or truncated.
        TypeError: If a field value has an unexpected type during conversion.
        KeyError: If an Enum name cannot be resolved.

    """
    batch, cm = deserialize_record_batch(data, ipc_validation)
    return cls.deserialize_from_batch(batch, cm, ipc_validation=ipc_validation)

ArrowType

ArrowType dataclass

ArrowType(arrow_type: DataType)

Annotation marker to specify explicit Arrow type for a field.

Use with Annotated to override the default inferred Arrow type:

@dataclass(frozen=True)
class MyData(ArrowSerializableDataclass):
    # Override int64 → int32
    count: Annotated[int, ArrowType(pa.int32())]

    # Override for nested list of int32
    matrix: Annotated[
        list[list[int]], ArrowType(pa.list_(pa.list_(pa.int32())))
    ]

IpcValidation

IpcValidation

Bases: Enum

Level of validation applied to incoming IPC record batches.

ATTRIBUTE DESCRIPTION
NONE

No validation — batches are used as-is.

STANDARD

Call batch.validate() to check schema/column consistency.

FULL

Call batch.validate(full=True) to also verify data buffers.

ValidatedReader

ValidatedReader

ValidatedReader(
    reader: RecordBatchStreamReader,
    ipc_validation: IpcValidation,
)

Wrapper around ipc.RecordBatchStreamReader that validates every batch on read.

Proxies the subset of the reader API used by the RPC framework (read_next_batch, read_next_batch_with_custom_metadata, schema, and the context manager protocol). Downstream code needs zero changes — just wrap ipc.open_stream(...) in ValidatedReader(..., ipc_validation).

When ipc_validation is IpcValidation.NONE, each read still delegates to the inner reader with minimal extra overhead.

Wrap reader so every batch is validated at ipc_validation level.

Source code in vgi_rpc/utils.py
def __init__(self, reader: ipc.RecordBatchStreamReader, ipc_validation: IpcValidation) -> None:
    """Wrap *reader* so every batch is validated at *ipc_validation* level."""
    self._reader = reader
    self._ipc_validation = ipc_validation

ipc_validation property

ipc_validation: IpcValidation

The validation level applied to every batch read.

schema property

schema: Schema

The schema of the underlying IPC stream.

read_next_batch

read_next_batch() -> RecordBatch

Read the next batch, validating it before returning.

Source code in vgi_rpc/utils.py
def read_next_batch(self) -> pa.RecordBatch:
    """Read the next batch, validating it before returning."""
    batch: pa.RecordBatch = self._reader.read_next_batch()
    validate_batch(batch, self._ipc_validation)
    return batch

read_next_batch_with_custom_metadata

read_next_batch_with_custom_metadata() -> (
    tuple[RecordBatch, KeyValueMetadata | None]
)

Read the next batch with custom metadata, validating before returning.

Source code in vgi_rpc/utils.py
def read_next_batch_with_custom_metadata(self) -> tuple[pa.RecordBatch, pa.KeyValueMetadata | None]:
    """Read the next batch with custom metadata, validating before returning."""
    batch, cm = self._reader.read_next_batch_with_custom_metadata()
    validate_batch(batch, self._ipc_validation)
    return batch, cm

__enter__

__enter__() -> Self

Enter the context manager.

Source code in vgi_rpc/utils.py
def __enter__(self) -> Self:
    """Enter the context manager."""
    self._reader.__enter__()
    return self

__exit__

__exit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Exit the context manager.

Source code in vgi_rpc/utils.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit the context manager."""
    self._reader.__exit__(exc_type, exc_val, exc_tb)  # type: ignore[no-untyped-call]

Validation

validate_batch

validate_batch(
    batch: RecordBatch, ipc_validation: IpcValidation
) -> None

Validate a RecordBatch at the specified level.

PARAMETER DESCRIPTION
batch

The batch to validate.

TYPE: RecordBatch

ipc_validation

Validation level (NONE, STANDARD, or FULL).

TYPE: IpcValidation

RAISES DESCRIPTION
IPCError

If validation fails.

Source code in vgi_rpc/utils.py
def validate_batch(batch: pa.RecordBatch, ipc_validation: IpcValidation) -> None:
    """Validate a RecordBatch at the specified level.

    Args:
        batch: The batch to validate.
        ipc_validation: Validation level (NONE, STANDARD, or FULL).

    Raises:
        IPCError: If validation fails.

    """
    if ipc_validation is IpcValidation.NONE:
        return
    try:
        batch.validate(full=ipc_validation is IpcValidation.FULL)
    except pa.ArrowInvalid as exc:
        raise IPCError(f"IPC batch validation failed: {exc}") from exc

Errors

IPCError

Bases: Exception

Error during IPC message reading or writing.