Skip to content

vgi-rpc

Transport-agnostic RPC framework built on Apache Arrow IPC serialization.

Built by ๐Ÿšœ Query.Farm

Define RPC interfaces as Python Protocol classes. The framework derives Arrow schemas from type annotations and provides typed client proxies with automatic serialization/deserialization.

Key Features

  • Protocol-based interfaces โ€” define services as typed Python Protocol classes; proxies preserve the Protocol type for full IDE autocompletion
  • Apache Arrow IPC wire format โ€” zero-copy serialization for structured data using PyArrow
  • Two method types โ€” unary and streaming (producer and exchange patterns)
  • Transport-agnostic โ€” in-process pipes, subprocess, Unix domain sockets, shared memory, or HTTP โ€” see Transports
  • Automatic schema inference โ€” Python type annotations map to Arrow types
  • Pluggable authentication โ€” AuthContext + middleware for HTTP auth (JWT, API key, mTLS, etc.)
  • Mutual TLS โ€” client certificate authentication via proxy headers (PEM-in-header, Envoy XFCC) with fingerprint, subject, and custom validation
  • OAuth discovery โ€” RFC 9728 protected resource metadata + JWT authentication via Authlib
  • Runtime introspection โ€” opt-in __describe__ RPC method for dynamic service discovery
  • CLI tool โ€” vgi-rpc describe and vgi-rpc call for ad-hoc service interaction
  • Shared memory transport โ€” zero-copy batch transfer between co-located processes โ€” see Transports
  • IPC validation โ€” configurable batch validation levels for untrusted data
  • Large batch support โ€” transparent externalization to S3/GCS for oversized data
  • Per-call I/O statistics โ€” CallStatistics tracks batches, rows, and bytes for usage accounting (access log + OTel spans)
  • Wire protocol debug logging โ€” enable vgi_rpc.wire at DEBUG for full wire-level visibility โ€” see Logging

Two Method Types

vgi-rpc supports two RPC patterns. The method's return type in the Protocol determines which one is used:

Unary

A single request produces a single response โ€” like a function call across a process boundary. The client sends parameters, the server returns a result.

ClientServer add(a=2, b=3)compute5.0
ClientServer add(a=2, b=3)compute5.0

Use unary for: lookups, computations, CRUD operations โ€” anything that returns one value.

Streaming

A single request opens an ongoing session that produces multiple batches of data. The return type Stream[S] signals a streaming method, where S is a StreamState subclass that holds state between iterations.

There are two streaming patterns:

Producer โ€” the server pushes data to the client (like a generator). The client iterates, the server emits batches until calling out.finish():

ClientServer countdown(n=3){value: [3]}{value: [2]}{value: [1]}[finish]
ClientServer countdown(n=3){value: [3]}{value: [2]}{value: [1]}[finish]

Exchange โ€” lockstep bidirectional streaming. The client sends data, the server responds, one round at a time:

ClientServer transform(factor=2){value: [10]}{result: [20]}{value: [5]}{result: [10]}[close]
ClientServer transform(factor=2){value: [10]}{result: [20]}{value: [5]}{result: [10]}[close]

Use streaming for: pagination, progress reporting, incremental computation, or any workflow where data is produced or exchanged over time.

Installation

pip install vgi-rpc

Optional extras:

pip install vgi-rpc[http]       # HTTP transport (Falcon + httpx)
pip install vgi-rpc[s3]         # S3 storage backend
pip install vgi-rpc[gcs]        # Google Cloud Storage backend
pip install vgi-rpc[cli]        # CLI tool (typer + httpx)
pip install vgi-rpc[external]   # External storage fetch (aiohttp + zstandard)
pip install vgi-rpc[otel]       # OpenTelemetry instrumentation
pip install vgi-rpc[oauth]      # JWT authentication (Authlib)
pip install vgi-rpc[mtls]       # mTLS client certificate auth (cryptography)

Requires Python 3.13+.

Quick Start

Define a service as a Protocol, implement it, and call methods through a typed proxy:

from typing import Protocol

from vgi_rpc import serve_pipe


class Calculator(Protocol):
    """A simple calculator service."""

    def add(self, a: float, b: float) -> float:
        """Add two numbers."""
        ...


class CalculatorImpl:
    """Calculator implementation."""

    def add(self, a: float, b: float) -> float:
        """Add two numbers."""
        return a + b


with serve_pipe(Calculator, CalculatorImpl()) as proxy:
    print(proxy.add(a=2.0, b=3.0))  # 5.0

See the Examples page for streaming, HTTP transport, authentication, OAuth discovery, and more.

Limitations

vgi-rpc is designed for RPC with structured, tabular data. Some things it deliberately does not do:

  • No full-duplex streaming โ€” the exchange pattern is lockstep (one request, one response, repeat), not concurrent bidirectional like gRPC.
  • No client streaming โ€” the client cannot push a stream of batches to the server independently. Use exchange for bidirectional workflows.
  • Columnar data model โ€” all data crosses the wire as Arrow RecordBatch objects. Scalar values are wrapped in single-row batches. If your payloads are small heterogeneous messages, a row-oriented format (protobuf, JSON) may be more natural.
  • No service mesh integration โ€” no built-in load balancing, circuit breaking, or service discovery. The HTTP transport is a standard WSGI app, so you can put it behind any reverse proxy.
  • No async server โ€” the server is synchronous. Streaming methods run in a blocking loop. This keeps the implementation simple but limits concurrency to one request at a time per connection (HTTP transport handles concurrency at the WSGI layer).

Next Steps


vgi-rpc.query.farm ยท ๐Ÿšœ Query.Farm