Serialization¶
Arrow serialization layer for automatic schema generation and IPC validation.
ArrowSerializableDataclass¶
Use ArrowSerializableDataclass as a mixin for dataclasses that need Arrow serialization. The Arrow schema is generated automatically from field annotations:
from dataclasses import dataclass
from typing import Annotated
import pyarrow as pa
from vgi_rpc import ArrowSerializableDataclass, ArrowType
@dataclass(frozen=True)
class Measurement(ArrowSerializableDataclass):
timestamp: str
value: float
count: Annotated[int, ArrowType(pa.int32())] # explicit Arrow type override
# Auto-generated schema
print(Measurement.ARROW_SCHEMA)
# timestamp: string
# value: double
# count: int32
# Serialize / deserialize
m = Measurement(timestamp="2024-01-01T00:00:00Z", value=42.0, count=7)
data = m.serialize_to_bytes()
m2 = Measurement.deserialize_from_bytes(data)
assert m == m2
These dataclasses work directly as RPC parameters and return types. They're also the base class for StreamState — any field you add to a stream state is automatically serialized between calls.
Type mappings¶
| Python type | Arrow type |
|---|---|
str |
utf8 |
bytes |
binary |
int |
int64 |
float |
float64 |
bool |
bool_ |
list[T] |
list_<T> |
dict[K, V] |
map_<K, V> |
frozenset[T] |
list_<T> |
Enum |
dictionary(int32, utf8) |
Optional[T] |
nullable T |
nested ArrowSerializableDataclass |
struct |
Annotated[T, ArrowType(...)] |
explicit override |
API Reference¶
ArrowSerializableDataclass¶
ArrowSerializableDataclass
¶
Mixin for dataclasses with automatic Arrow IPC serialization.
Provides automatic schema generation and serialization/deserialization for frozen dataclasses. The ARROW_SCHEMA is auto-generated from field type annotations.
Auto-detected types: - Basic types: str, bytes, int, float, bool - Generic types: list[T], dict[K, V], frozenset[T] - NewType: unwraps to underlying type (e.g., NewType("Id", bytes) -> binary) - Enum: serializes as dictionary-encoded string via .name - ArrowSerializableDataclass: serializes as struct
Not supported:
- tuple: Arrow has no native heterogeneous-tuple type. Use a nested
dataclass (ArrowSerializableDataclass) for fixed, named fields, or
list[T] for homogeneous sequences.
Optional fields (annotated with | None) are marked as nullable.
To override specific field types, use Annotated with ArrowType.
| ATTRIBUTE | DESCRIPTION |
|---|---|
ARROW_SCHEMA |
Auto-generated Arrow schema from field annotations.
TYPE:
|
serialize
¶
Serialize this instance to an Arrow IPC stream.
| PARAMETER | DESCRIPTION |
|---|---|
dest
|
The destination to write to (must support binary writes, e.g., stdout pipe, BufferedWriter).
TYPE:
|
Source code in vgi_rpc/utils.py
serialize_to_bytes
¶
Serialize this instance to Arrow IPC bytes.
| RETURNS | DESCRIPTION |
|---|---|
bytes
|
Arrow IPC stream bytes containing a single-row RecordBatch. |
deserialize_from_batch
classmethod
¶
deserialize_from_batch(
batch: RecordBatch,
custom_metadata: KeyValueMetadata | None = None,
*,
ipc_validation: IpcValidation = FULL
) -> Self
Deserialize an instance from an Arrow RecordBatch.
| PARAMETER | DESCRIPTION |
|---|---|
batch
|
Single-row RecordBatch containing the serialized data.
TYPE:
|
custom_metadata
|
Optional metadata from the batch (unused, reserved for subclass overrides).
TYPE:
|
ipc_validation
|
Validation level for nested IPC batches.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Self
|
Deserialized instance of this class. |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If the batch is invalid (wrong row count or missing fields). |
TypeError
|
If a field value has an unexpected type during conversion. |
KeyError
|
If an Enum name cannot be resolved. |
Source code in vgi_rpc/utils.py
deserialize_from_bytes
classmethod
¶
deserialize_from_bytes(
data: bytes, ipc_validation: IpcValidation = FULL
) -> Self
Deserialize an instance from Arrow IPC bytes.
| PARAMETER | DESCRIPTION |
|---|---|
data
|
Arrow IPC stream bytes containing a single-row RecordBatch.
TYPE:
|
ipc_validation
|
Validation level for the deserialized batch.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Self
|
Deserialized instance of this class. |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If the batch is invalid (wrong row count or missing fields). |
IPCError
|
If the IPC stream is malformed or truncated. |
TypeError
|
If a field value has an unexpected type during conversion. |
KeyError
|
If an Enum name cannot be resolved. |
Source code in vgi_rpc/utils.py
ArrowType¶
ArrowType
dataclass
¶
Annotation marker to specify explicit Arrow type for a field.
Use with Annotated to override the default inferred Arrow type:
@dataclass(frozen=True)
class MyData(ArrowSerializableDataclass):
# Override int64 → int32
count: Annotated[int, ArrowType(pa.int32())]
# Override for nested list of int32
matrix: Annotated[
list[list[int]], ArrowType(pa.list_(pa.list_(pa.int32())))
]
IpcValidation¶
IpcValidation
¶
Bases: Enum
Level of validation applied to incoming IPC record batches.
| ATTRIBUTE | DESCRIPTION |
|---|---|
NONE |
No validation — batches are used as-is.
|
STANDARD |
Call
|
FULL |
Call
|
ValidatedReader¶
ValidatedReader
¶
ValidatedReader(
reader: RecordBatchStreamReader,
ipc_validation: IpcValidation,
)
Wrapper around ipc.RecordBatchStreamReader that validates every batch on read.
Proxies the subset of the reader API used by the RPC framework
(read_next_batch, read_next_batch_with_custom_metadata,
schema, and the context manager protocol). Downstream code
needs zero changes — just wrap ipc.open_stream(...) in
ValidatedReader(..., ipc_validation).
When ipc_validation is IpcValidation.NONE, each read still
delegates to the inner reader with minimal extra overhead.
Wrap reader so every batch is validated at ipc_validation level.
Source code in vgi_rpc/utils.py
ipc_validation
property
¶
ipc_validation: IpcValidation
The validation level applied to every batch read.
read_next_batch
¶
Read the next batch, validating it before returning.
read_next_batch_with_custom_metadata
¶
Read the next batch with custom metadata, validating before returning.
Source code in vgi_rpc/utils.py
__enter__
¶
__exit__
¶
__exit__(
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None
Exit the context manager.
Source code in vgi_rpc/utils.py
Validation¶
validate_batch
¶
validate_batch(
batch: RecordBatch, ipc_validation: IpcValidation
) -> None
Validate a RecordBatch at the specified level.
| PARAMETER | DESCRIPTION |
|---|---|
batch
|
The batch to validate.
TYPE:
|
ipc_validation
|
Validation level (NONE, STANDARD, or FULL).
TYPE:
|
| RAISES | DESCRIPTION |
|---|---|
IPCError
|
If validation fails. |
Source code in vgi_rpc/utils.py
Errors¶
IPCError
¶
Bases: Exception
Error during IPC message reading or writing.