Serialization¶
Arrow serialization layer for automatic schema generation and IPC validation.
ArrowSerializableDataclass¶
Use ArrowSerializableDataclass as a mixin for dataclasses that need Arrow serialization. The Arrow schema is generated automatically from field annotations:
from dataclasses import dataclass
from typing import Annotated
import pyarrow as pa
from vgi_rpc import ArrowSerializableDataclass, ArrowType
@dataclass(frozen=True)
class Measurement(ArrowSerializableDataclass):
timestamp: str
value: float
count: Annotated[int, ArrowType(pa.int32())] # explicit Arrow type override
# Auto-generated schema
print(Measurement.ARROW_SCHEMA)
# timestamp: string
# value: double
# count: int32
# Serialize / deserialize
m = Measurement(timestamp="2024-01-01T00:00:00Z", value=42.0, count=7)
data = m.serialize_to_bytes()
m2 = Measurement.deserialize_from_bytes(data)
assert m == m2
These dataclasses work directly as RPC parameters and return types. They're also the base class for StreamState — any field you add to a stream state is automatically serialized between calls.
Type mappings¶
| Python type | Arrow type |
|---|---|
str |
utf8 |
bytes |
binary |
int |
int64 |
float |
float64 |
bool |
bool_ |
list[T] |
list_<T> |
dict[K, V] |
map_<K, V> |
frozenset[T] |
list_<T> |
Enum |
dictionary(int32, utf8) |
Optional[T] |
nullable T |
nested ArrowSerializableDataclass |
struct |
Annotated[T, ArrowType(...)] |
explicit override |
API Reference¶
ArrowSerializableDataclass¶
ArrowSerializableDataclass
¶
Mixin for dataclasses with automatic Arrow IPC serialization.
Provides automatic schema generation and serialization/deserialization for frozen dataclasses. The ARROW_SCHEMA is auto-generated from field type annotations.
Auto-detected types: - Basic types: str, bytes, int, float, bool - Generic types: list[T], dict[K, V], frozenset[T] - NewType: unwraps to underlying type (e.g., NewType("Id", bytes) -> binary) - Enum: serializes as dictionary-encoded string via .name - ArrowSerializableDataclass: serializes as struct
Not supported:
- tuple: Arrow has no native heterogeneous-tuple type. Use a nested
dataclass (ArrowSerializableDataclass) for fixed, named fields, or
list[T] for homogeneous sequences.
Optional fields (annotated with | None) are marked as nullable.
To override specific field types, use Annotated with ArrowType.
| ATTRIBUTE | DESCRIPTION |
|---|---|
ARROW_SCHEMA |
Auto-generated Arrow schema from field annotations.
TYPE:
|
serialize
¶
Serialize this instance to an Arrow IPC stream.
| PARAMETER | DESCRIPTION |
|---|---|
dest
|
The destination to write to (must support binary writes, e.g., stdout pipe, BufferedWriter).
TYPE:
|
Source code in vgi_rpc/utils.py
serialize_to_bytes
¶
Serialize this instance to Arrow IPC bytes.
| RETURNS | DESCRIPTION |
|---|---|
bytes
|
Arrow IPC stream bytes containing a single-row RecordBatch. |
deserialize_from_batch
classmethod
¶
deserialize_from_batch(
batch: RecordBatch,
custom_metadata: KeyValueMetadata | None = None,
*,
ipc_validation: IpcValidation = FULL
) -> Self
Deserialize an instance from an Arrow RecordBatch.
| PARAMETER | DESCRIPTION |
|---|---|
batch
|
Single-row RecordBatch containing the serialized data.
TYPE:
|
custom_metadata
|
Optional metadata from the batch (unused, reserved for subclass overrides).
TYPE:
|
ipc_validation
|
Validation level for nested IPC batches.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Self
|
Deserialized instance of this class. |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If the batch is invalid (wrong row count or missing fields). |
TypeError
|
If a field value has an unexpected type during conversion. |
KeyError
|
If an Enum name cannot be resolved. |
Source code in vgi_rpc/utils.py
755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 | |
deserialize_from_bytes
classmethod
¶
deserialize_from_bytes(
data: bytes, ipc_validation: IpcValidation = FULL
) -> Self
Deserialize an instance from Arrow IPC bytes.
| PARAMETER | DESCRIPTION |
|---|---|
data
|
Arrow IPC stream bytes containing a single-row RecordBatch.
TYPE:
|
ipc_validation
|
Validation level for the deserialized batch.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Self
|
Deserialized instance of this class. |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If the batch is invalid (wrong row count or missing fields). |
IPCError
|
If the IPC stream is malformed or truncated. |
TypeError
|
If a field value has an unexpected type during conversion. |
KeyError
|
If an Enum name cannot be resolved. |
Source code in vgi_rpc/utils.py
ArrowType¶
ArrowType
dataclass
¶
Annotation marker to specify explicit Arrow type for a field.
Use with Annotated to override the default inferred Arrow type:
@dataclass(frozen=True)
class MyData(ArrowSerializableDataclass):
# Override int64 → int32
count: Annotated[int, ArrowType(pa.int32())]
# Override for nested list of int32
matrix: Annotated[
list[list[int]], ArrowType(pa.list_(pa.list_(pa.int32())))
]
IpcValidation¶
IpcValidation
¶
Bases: Enum
Level of validation applied to incoming IPC record batches.
| ATTRIBUTE | DESCRIPTION |
|---|---|
NONE |
No validation — batches are used as-is.
|
STANDARD |
Call
|
FULL |
Call
|
ValidatedReader¶
ValidatedReader
¶
ValidatedReader(
reader: RecordBatchStreamReader,
ipc_validation: IpcValidation,
)
Wrapper around ipc.RecordBatchStreamReader that validates every batch on read.
Proxies the subset of the reader API used by the RPC framework
(read_next_batch, read_next_batch_with_custom_metadata,
schema, and the context manager protocol). Downstream code
needs zero changes — just wrap ipc.open_stream(...) in
ValidatedReader(..., ipc_validation).
When ipc_validation is IpcValidation.NONE, each read still
delegates to the inner reader with minimal extra overhead.
Wrap reader so every batch is validated at ipc_validation level.
Source code in vgi_rpc/utils.py
ipc_validation
property
¶
ipc_validation: IpcValidation
The validation level applied to every batch read.
read_next_batch
¶
Read the next batch, validating it before returning.
read_next_batch_with_custom_metadata
¶
Read the next batch with custom metadata, validating before returning.
Source code in vgi_rpc/utils.py
__enter__
¶
__exit__
¶
__exit__(
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None
Exit the context manager.
Source code in vgi_rpc/utils.py
Validation¶
validate_batch
¶
validate_batch(
batch: RecordBatch, ipc_validation: IpcValidation
) -> None
Validate a RecordBatch at the specified level.
| PARAMETER | DESCRIPTION |
|---|---|
batch
|
The batch to validate.
TYPE:
|
ipc_validation
|
Validation level (NONE, STANDARD, or FULL).
TYPE:
|
| RAISES | DESCRIPTION |
|---|---|
IPCError
|
If validation fails. |
Source code in vgi_rpc/utils.py
Errors¶
IPCError
¶
Bases: Exception
Error during IPC message reading or writing.