Examples¶
Runnable scripts demonstrating key vgi-rpc features. Each example is self-contained — copy, run, and modify. All examples are tested via test_examples.py to stay in sync with the API.
Getting Started¶
Hello World¶
The minimal starting point: define a Protocol, implement it, and call through a typed proxy using in-process pipe transport.
"""Minimal vgi-rpc example: define a service and call it in-process.
This is the quickest way to get started. The service runs in a background
thread and communicates over an in-process pipe — no subprocess or network
needed.
Run::
python examples/hello_world.py
"""
from __future__ import annotations
from typing import Protocol
from vgi_rpc import serve_pipe
# 1. Define the service interface as a Protocol class.
# Return types determine the method type (unary here).
class Greeter(Protocol):
"""A simple greeting service."""
def greet(self, name: str) -> str:
"""Return a greeting for *name*."""
...
def add(self, a: float, b: float) -> float:
"""Add two numbers."""
...
# 2. Implement the interface.
class GreeterImpl:
"""Concrete implementation of the Greeter service."""
def greet(self, name: str) -> str:
"""Return a greeting for *name*."""
return f"Hello, {name}!"
def add(self, a: float, b: float) -> float:
"""Add two numbers."""
return a + b
# 3. Start the server in-process and call methods through a typed proxy.
def main() -> None:
"""Run the example."""
with serve_pipe(Greeter, GreeterImpl()) as svc:
print(svc.greet(name="World")) # Hello, World!
print(svc.add(a=2.5, b=3.5)) # 6.0
if __name__ == "__main__":
main()
Streaming¶
Producer streams (server pushes data, client iterates) and exchange streams (lockstep bidirectional communication). Shows ProducerState, ExchangeState, OutputCollector, and out.finish().
"""Streaming examples: producer streams and exchange (bidirectional) streams.
Producer streams generate a sequence of Arrow RecordBatches from the server.
Exchange streams allow the client to send data and receive transformed results
in a lockstep request/response pattern.
Run::
python examples/streaming.py
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Protocol, cast
import pyarrow as pa
import pyarrow.compute as pc
from vgi_rpc import (
AnnotatedBatch,
CallContext,
ExchangeState,
OutputCollector,
ProducerState,
Stream,
StreamState,
serve_pipe,
)
# ---------------------------------------------------------------------------
# Producer stream: server generates batches, client iterates
# ---------------------------------------------------------------------------
@dataclass
class CounterState(ProducerState):
"""State for the counter producer stream.
Extends ``ProducerState`` so only ``produce(out, ctx)`` needs to be
implemented — no phantom ``input`` parameter to ignore.
Call ``out.finish()`` to signal the end of the stream.
"""
limit: int
current: int = 0
def produce(self, out: OutputCollector, ctx: CallContext) -> None:
"""Emit one batch per call, finish when done."""
if self.current >= self.limit:
out.finish()
return
out.emit_pydict({"n": [self.current], "n_squared": [self.current**2]})
self.current += 1
# ---------------------------------------------------------------------------
# Exchange stream: client sends data, server transforms and returns it
# ---------------------------------------------------------------------------
@dataclass
class ScaleState(ExchangeState):
"""State for the scale exchange stream.
Extends ``ExchangeState`` so only ``exchange(input, out, ctx)`` needs
to be implemented. Exchange streams must emit exactly one output batch
per call and must not call ``out.finish()``.
"""
factor: float
def exchange(self, input: AnnotatedBatch, out: OutputCollector, ctx: CallContext) -> None:
"""Multiply each value by the configured factor."""
scaled = cast("pa.Array[Any]", pc.multiply(input.batch.column("value"), self.factor))
out.emit_arrays([scaled])
# ---------------------------------------------------------------------------
# Service definition
# ---------------------------------------------------------------------------
_COUNTER_SCHEMA = pa.schema([pa.field("n", pa.int64()), pa.field("n_squared", pa.int64())])
_SCALE_SCHEMA = pa.schema([pa.field("value", pa.float64())])
class MathService(Protocol):
"""Service demonstrating producer and exchange streams."""
def count(self, limit: int) -> Stream[StreamState]:
"""Produce *limit* batches of (n, n_squared)."""
...
def scale(self, factor: float) -> Stream[StreamState]:
"""Multiply incoming values by *factor*."""
...
class MathServiceImpl:
"""Concrete implementation of MathService."""
def count(self, limit: int) -> Stream[CounterState]:
"""Produce *limit* batches of (n, n_squared)."""
return Stream(output_schema=_COUNTER_SCHEMA, state=CounterState(limit=limit))
def scale(self, factor: float) -> Stream[ScaleState]:
"""Multiply incoming values by *factor*."""
return Stream(
output_schema=_SCALE_SCHEMA,
state=ScaleState(factor=factor),
input_schema=_SCALE_SCHEMA, # Setting input_schema makes this an exchange stream
)
# ---------------------------------------------------------------------------
# Client usage
# ---------------------------------------------------------------------------
def main() -> None:
"""Run the streaming examples."""
with serve_pipe(MathService, MathServiceImpl()) as svc:
# --- Producer stream: iterate over server-generated batches ----------
print("=== Producer stream (count to 5) ===")
for batch in svc.count(limit=5):
rows = batch.batch.to_pylist()
for row in rows:
print(f" n={row['n']} n^2={row['n_squared']}")
# --- Exchange stream: send data, receive transformed results ---------
print("\n=== Exchange stream (scale by 10) ===")
with svc.scale(factor=10.0) as stream:
for values in [[1.0, 2.0, 3.0], [100.0, 200.0]]:
input_batch = AnnotatedBatch(pa.RecordBatch.from_pydict({"value": values}))
result = stream.exchange(input_batch)
print(f" input={values} output={result.batch.column('value').to_pylist()}")
if __name__ == "__main__":
main()
Structured Types¶
Using ArrowSerializableDataclass for complex parameters: dataclasses with enums, nested types, and optional fields. Demonstrates automatic Arrow schema inference.
"""Using dataclasses as RPC parameters and return types.
``ArrowSerializableDataclass`` lets you pass structured data across the
RPC boundary. Fields are automatically mapped to Arrow types.
Run::
python examples/structured_types.py
"""
from __future__ import annotations
from dataclasses import dataclass
from enum import Enum
from typing import Protocol
from vgi_rpc import serve_pipe
from vgi_rpc.utils import ArrowSerializableDataclass
# ---------------------------------------------------------------------------
# Domain types
# ---------------------------------------------------------------------------
class Priority(Enum):
"""Task priority levels."""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
@dataclass(frozen=True)
class Task(ArrowSerializableDataclass):
"""A task with structured fields.
Supported field types include: str, int, float, bool, Enum,
list[T], dict[K, V], frozenset[T], Optional types, and nested
ArrowSerializableDataclass instances.
"""
title: str
priority: Priority
tags: list[str]
metadata: dict[str, str]
done: bool = False
@dataclass(frozen=True)
class TaskSummary(ArrowSerializableDataclass):
"""Summary returned by the task service."""
total: int
high_priority: int
titles: list[str]
# ---------------------------------------------------------------------------
# Service
# ---------------------------------------------------------------------------
class TaskService(Protocol):
"""Service that accepts and returns dataclass parameters."""
def create_task(self, task: Task) -> str:
"""Store a task and return its ID."""
...
def summarize(self) -> TaskSummary:
"""Return a summary of all stored tasks."""
...
class TaskServiceImpl:
"""In-memory task store."""
def __init__(self) -> None:
"""Initialize the store."""
self._tasks: list[Task] = []
self._next_id: int = 0
def create_task(self, task: Task) -> str:
"""Store a task and return its ID."""
self._tasks.append(task)
task_id = f"TASK-{self._next_id}"
self._next_id += 1
return task_id
def summarize(self) -> TaskSummary:
"""Return a summary of all stored tasks."""
return TaskSummary(
total=len(self._tasks),
high_priority=sum(1 for t in self._tasks if t.priority == Priority.HIGH),
titles=[t.title for t in self._tasks],
)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> None:
"""Run the dataclass example."""
with serve_pipe(TaskService, TaskServiceImpl()) as svc:
# Create some tasks using structured dataclass parameters
id1 = svc.create_task(
task=Task(
title="Write documentation",
priority=Priority.HIGH,
tags=["docs", "urgent"],
metadata={"assignee": "alice"},
)
)
print(f"Created: {id1}")
id2 = svc.create_task(
task=Task(
title="Run benchmarks",
priority=Priority.LOW,
tags=["perf"],
metadata={"env": "staging"},
)
)
print(f"Created: {id2}")
# Get a structured summary back
summary = svc.summarize()
print(f"\nTotal tasks: {summary.total}")
print(f"High priority: {summary.high_priority}")
print(f"Titles: {summary.titles}")
if __name__ == "__main__":
main()
Transports¶
HTTP Server¶
Serve an RPC service over HTTP using Falcon + waitress. Shows make_wsgi_app with a WSGI server.
"""HTTP server example using Falcon (WSGI) and waitress.
Requires the HTTP extra: ``pip install vgi-rpc[http]``
Start the server::
python examples/http_server.py
Then run the client in another terminal::
python examples/http_client.py
"""
from __future__ import annotations
import socket
import sys
from dataclasses import dataclass
from typing import Protocol
import pyarrow as pa
import waitress
from vgi_rpc import (
CallContext,
Level,
OutputCollector,
ProducerState,
RpcServer,
Stream,
StreamState,
)
from vgi_rpc.http import make_wsgi_app
PORT = 8234
# ---------------------------------------------------------------------------
# Service definition
# ---------------------------------------------------------------------------
@dataclass
class FibState(ProducerState):
"""Produce Fibonacci numbers up to *limit*."""
limit: int
a: int = 0
b: int = 1
def produce(self, out: OutputCollector, ctx: CallContext) -> None:
"""Emit the next Fibonacci number."""
if self.a > self.limit:
out.finish()
return
out.emit_pydict({"fib": [self.a]})
self.a, self.b = self.b, self.a + self.b
class DemoService(Protocol):
"""Demonstration HTTP service."""
def echo(self, message: str) -> str:
"""Echo a message back."""
...
def fibonacci(self, limit: int) -> Stream[StreamState]:
"""Stream Fibonacci numbers up to *limit*."""
...
class DemoServiceImpl:
"""Concrete implementation of DemoService."""
def echo(self, message: str, ctx: CallContext | None = None) -> str:
"""Echo a message back, with optional server-side logging."""
if ctx:
ctx.client_log(Level.INFO, f"Echoing: {message}")
return message
def fibonacci(self, limit: int) -> Stream[FibState]:
"""Stream Fibonacci numbers up to *limit*."""
schema = pa.schema([pa.field("fib", pa.int64())])
return Stream(output_schema=schema, state=FibState(limit=limit))
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def _find_free_port() -> int:
"""Find a free TCP port on localhost."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("127.0.0.1", 0))
return int(s.getsockname()[1])
def main() -> None:
"""Start the HTTP server."""
port = int(sys.argv[1]) if len(sys.argv) > 1 else PORT
if port == 0:
port = _find_free_port()
server = RpcServer(DemoService, DemoServiceImpl())
app = make_wsgi_app(server)
print(f"Serving DemoService on http://127.0.0.1:{port}", flush=True)
waitress.serve(app, host="127.0.0.1", port=port, _quiet=True)
if __name__ == "__main__":
main()
HTTP Client¶
Connect to an HTTP RPC service using http_connect. The proxy is typed as the Protocol class.
"""HTTP client that connects to the demo HTTP server.
Requires the HTTP extra: ``pip install vgi-rpc[http]``
Start the server first::
python examples/http_server.py
Then run this client::
python examples/http_client.py
"""
from __future__ import annotations
from typing import Protocol
from vgi_rpc import Stream, StreamState
from vgi_rpc.http import http_connect
PORT = 8234
class DemoService(Protocol):
"""Demonstration HTTP service.
This Protocol is duplicated from http_server.py so each file is
self-contained. In a real project you'd define the Protocol once in a
shared module and import it from both sides.
"""
def echo(self, message: str) -> str:
"""Echo a message back."""
...
def fibonacci(self, limit: int) -> Stream[StreamState]:
"""Stream Fibonacci numbers up to *limit*."""
...
def main() -> None:
"""Connect to the HTTP server and make calls."""
url = f"http://127.0.0.1:{PORT}"
with http_connect(DemoService, url) as svc:
# Unary call
result = svc.echo(message="Hello from HTTP!")
print(f"echo: {result}")
# Streaming call
print("\nFibonacci numbers up to 100:")
for batch in svc.fibonacci(limit=100):
for row in batch.batch.to_pylist():
print(f" {row['fib']}")
if __name__ == "__main__":
main()
Subprocess Worker¶
Entry point for a subprocess RPC worker. Uses run_server to serve over stdin/stdout.
"""Subprocess server entry point.
This script serves an RPC service over stdin/stdout, designed to be spawned
as a child process by a client using ``vgi_rpc.connect()``.
The client example is in ``subprocess_client.py``.
Run the client (which spawns this automatically)::
python examples/subprocess_client.py
"""
from __future__ import annotations
from typing import Protocol
from vgi_rpc import run_server
class Calculator(Protocol):
"""Simple calculator service."""
def add(self, a: float, b: float) -> float:
"""Add two numbers."""
...
def multiply(self, a: float, b: float) -> float:
"""Multiply two numbers."""
...
def divide(self, a: float, b: float) -> float:
"""Divide a by b."""
...
class CalculatorImpl:
"""Concrete implementation of Calculator."""
def add(self, a: float, b: float) -> float:
"""Add two numbers."""
return a + b
def multiply(self, a: float, b: float) -> float:
"""Multiply two numbers."""
return a * b
def divide(self, a: float, b: float) -> float:
"""Divide a by b."""
if b == 0.0:
raise ValueError("Division by zero")
return a / b
def main() -> None:
"""Serve over stdin/stdout."""
run_server(Calculator, CalculatorImpl())
if __name__ == "__main__":
main()
Subprocess Client¶
Connect to a subprocess worker with connect. Shows error handling with RpcError.
"""Client that spawns a subprocess server and calls methods on it.
Uses ``vgi_rpc.connect()`` which launches the worker as a child process
and communicates over stdin/stdout pipes.
Run::
python examples/subprocess_client.py
"""
from __future__ import annotations
import sys
from pathlib import Path
from typing import Protocol
from vgi_rpc import RpcError, connect
_HERE = Path(__file__).resolve().parent
class Calculator(Protocol):
"""Simple calculator service.
This Protocol is duplicated from subprocess_worker.py so each file is
self-contained. In a real project you'd define the Protocol once in a
shared module and import it from both sides.
"""
def add(self, a: float, b: float) -> float:
"""Add two numbers."""
...
def multiply(self, a: float, b: float) -> float:
"""Multiply two numbers."""
...
def divide(self, a: float, b: float) -> float:
"""Divide a by b."""
...
def main() -> None:
"""Spawn the worker and make RPC calls."""
cmd = [sys.executable, str(_HERE / "subprocess_worker.py")]
with connect(Calculator, cmd) as calc:
print(f"add(2, 3) = {calc.add(a=2, b=3)}")
print(f"multiply(4, 5) = {calc.multiply(a=4, b=5)}")
print(f"divide(10, 3) = {calc.divide(a=10, b=3):.4f}")
# Server-side exceptions are propagated as RpcError
try:
calc.divide(a=1, b=0)
except RpcError as e:
print(f"\nCaught remote error: {e.error_type}: {e.error_message}")
if __name__ == "__main__":
main()
Testing¶
Testing with Pipe Transport¶
Unit-testing pattern using serve_pipe() — no network or subprocess needed. Tests both unary and streaming methods.
"""Testing a service with ``serve_pipe()`` — no subprocess or network needed.
``serve_pipe`` runs the server on a background thread and gives you a typed
proxy. This is the fastest way to write unit tests for your RPC service.
Run::
python examples/testing_pipe.py
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Protocol
import pyarrow as pa
from vgi_rpc import CallContext, OutputCollector, ProducerState, Stream, StreamState, serve_pipe
# ---------------------------------------------------------------------------
# 1. Define a Protocol and implementation (same as production code)
# ---------------------------------------------------------------------------
class MathService(Protocol):
"""A small service with a unary method and a producer stream."""
def add(self, a: float, b: float) -> float:
"""Add two numbers."""
...
def countdown(self, n: int) -> Stream[StreamState]:
"""Count down from *n* to 1."""
...
@dataclass
class CountdownState(ProducerState):
"""State for the countdown producer stream."""
n: int
def produce(self, out: OutputCollector, ctx: CallContext) -> None:
"""Emit one value per tick, finish when done."""
if self.n <= 0:
out.finish()
return
out.emit_pydict({"value": [self.n]})
self.n -= 1
_COUNTDOWN_SCHEMA = pa.schema([pa.field("value", pa.int64())])
class MathServiceImpl:
"""Concrete implementation of MathService."""
def add(self, a: float, b: float) -> float:
"""Add two numbers."""
return a + b
def countdown(self, n: int) -> Stream[CountdownState]:
"""Count down from *n* to 1."""
return Stream(output_schema=_COUNTDOWN_SCHEMA, state=CountdownState(n=n))
# ---------------------------------------------------------------------------
# 2. Test the service using serve_pipe()
# ---------------------------------------------------------------------------
def main() -> None:
"""Run the testing example."""
with serve_pipe(MathService, MathServiceImpl()) as svc:
# --- Unary method ---------------------------------------------------
result = svc.add(a=2.0, b=3.0)
assert result == 5.0
print(f"add(2, 3) = {result}")
# --- Producer stream ------------------------------------------------
values: list[int] = [row["value"] for batch in svc.countdown(n=3) for row in batch.batch.to_pylist()]
assert values == [3, 2, 1]
print(f"countdown(3) = {values}")
print("All assertions passed!")
if __name__ == "__main__":
main()
Testing with HTTP Transport¶
Unit-testing the full HTTP stack (including auth middleware) using make_sync_client() — no real HTTP server needed.
"""Testing the HTTP transport without a running server.
``make_sync_client`` wraps a Falcon ``TestClient`` so you can exercise the
full HTTP transport stack (authentication, serialization, streaming) in-process
with zero network I/O.
Requires ``pip install vgi-rpc[http]``
Run::
python examples/testing_http.py
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Protocol
import falcon
import pyarrow as pa
from vgi_rpc import (
AuthContext,
CallContext,
OutputCollector,
ProducerState,
RpcServer,
Stream,
StreamState,
serve_pipe,
)
from vgi_rpc.http import http_connect, make_sync_client
# ---------------------------------------------------------------------------
# 1. Define a Protocol and implementation
# ---------------------------------------------------------------------------
class DemoService(Protocol):
"""Service used for HTTP testing examples."""
def greet(self, name: str) -> str:
"""Greet by name."""
...
def whoami(self) -> str:
"""Return the caller's identity."""
...
def countdown(self, n: int) -> Stream[StreamState]:
"""Count down from *n* to 1."""
...
@dataclass
class CountdownState(ProducerState):
"""State for the countdown producer stream."""
n: int
def produce(self, out: OutputCollector, ctx: CallContext) -> None:
"""Emit one value per tick, finish when done."""
if self.n <= 0:
out.finish()
return
out.emit_pydict({"value": [self.n]})
self.n -= 1
_COUNTDOWN_SCHEMA = pa.schema([pa.field("value", pa.int64())])
class DemoServiceImpl:
"""Concrete implementation of DemoService."""
def greet(self, name: str) -> str:
"""Greet by name."""
return f"Hello, {name}!"
def whoami(self, ctx: CallContext) -> str:
"""Return the caller's identity."""
return ctx.auth.principal or "anonymous"
def countdown(self, n: int) -> Stream[CountdownState]:
"""Count down from *n* to 1."""
return Stream(output_schema=_COUNTDOWN_SCHEMA, state=CountdownState(n=n))
# ---------------------------------------------------------------------------
# 2. Helper: create an authenticate callback
# ---------------------------------------------------------------------------
def _authenticate(req: falcon.Request) -> AuthContext:
"""Authenticate requests using a simple bearer token."""
token = req.get_header("Authorization") or ""
if token == "Bearer test-token":
return AuthContext(domain="bearer", authenticated=True, principal="alice")
return AuthContext.anonymous()
# ---------------------------------------------------------------------------
# 3. Test with make_sync_client (HTTP transport, no real server)
# ---------------------------------------------------------------------------
def main() -> None:
"""Run the HTTP testing examples."""
server = RpcServer(DemoService, DemoServiceImpl())
# --- Basic HTTP testing -------------------------------------------------
client = make_sync_client(server)
with http_connect(DemoService, client=client) as svc:
result = svc.greet(name="World")
assert result == "Hello, World!"
print(f"greet('World') = {result}")
# Anonymous caller
who = svc.whoami()
assert who == "anonymous"
print(f"whoami() = {who}")
# Producer stream over HTTP
values: list[int] = [row["value"] for batch in svc.countdown(n=3) for row in batch.batch.to_pylist()]
assert values == [3, 2, 1]
print(f"countdown(3) = {values}")
# --- Authenticated HTTP testing -----------------------------------------
client = make_sync_client(
server,
authenticate=_authenticate,
default_headers={"Authorization": "Bearer test-token"},
)
with http_connect(DemoService, client=client) as svc:
who = svc.whoami()
assert who == "alice"
print(f"whoami() [authenticated] = {who}")
# --- Pipe transport for comparison --------------------------------------
with serve_pipe(DemoService, DemoServiceImpl()) as svc:
result = svc.greet(name="Pipe")
assert result == "Hello, Pipe!"
print(f"greet('Pipe') [pipe] = {result}")
print("All assertions passed!")
if __name__ == "__main__":
main()
Advanced Features¶
Authentication¶
HTTP authentication with Bearer tokens. Shows authenticate callback, AuthContext, CallContext.auth.require_authenticated(), and guarded methods.
"""HTTP authentication with Bearer tokens and guarded methods.
Demonstrates how to wire up an ``authenticate`` callback, inject
``CallContext`` into method implementations, and gate access with
``require_authenticated()``. Uses ``make_sync_client`` so no real
HTTP server is needed.
Requires ``pip install vgi-rpc[http]``
Run::
python examples/auth.py
"""
from __future__ import annotations
from typing import Protocol
import falcon
from vgi_rpc import AuthContext, CallContext, RpcServer
from vgi_rpc.http import http_connect, make_sync_client
# ---------------------------------------------------------------------------
# 1. Define an authenticate callback
# ---------------------------------------------------------------------------
def authenticate(req: falcon.Request) -> AuthContext:
"""Extract a Bearer token from the Authorization header.
Returns an authenticated ``AuthContext`` with the token value as
principal and a hard-coded ``role`` claim for demonstration.
Raises:
ValueError: If the header is missing or malformed.
"""
auth_header = req.get_header("Authorization") or ""
if not auth_header.startswith("Bearer "):
raise ValueError("Missing or invalid Authorization header")
principal = auth_header.removeprefix("Bearer ")
return AuthContext(domain="bearer", authenticated=True, principal=principal, claims={"role": "admin"})
# ---------------------------------------------------------------------------
# 2. Define the service Protocol
# ---------------------------------------------------------------------------
class SecureService(Protocol):
"""A service with public and guarded methods."""
def status(self) -> str:
"""Public health-check endpoint."""
...
def whoami(self) -> str:
"""Return the caller's identity."""
...
def secret_data(self) -> str:
"""Return secret data (requires authentication)."""
...
# ---------------------------------------------------------------------------
# 3. Implement the service
# ---------------------------------------------------------------------------
class SecureServiceImpl:
"""Concrete implementation of SecureService."""
def status(self) -> str:
"""Public health-check endpoint."""
return "ok"
def whoami(self, ctx: CallContext) -> str:
"""Return the caller's identity."""
return ctx.auth.principal or "anonymous"
def secret_data(self, ctx: CallContext) -> str:
"""Return secret data (requires authentication)."""
ctx.auth.require_authenticated()
role = ctx.auth.claims.get("role", "unknown")
return f"secret for {ctx.auth.principal} (role={role})"
# ---------------------------------------------------------------------------
# 4. Run the example
# ---------------------------------------------------------------------------
def main() -> None:
"""Create an authenticated HTTP client and call all three methods."""
server = RpcServer(SecureService, SecureServiceImpl())
client = make_sync_client(
server,
signing_key=b"example-signing-key",
authenticate=authenticate,
default_headers={"Authorization": "Bearer alice"},
)
with http_connect(SecureService, client=client) as svc:
print(f"status (public): {svc.status()}")
print(f"whoami: {svc.whoami()}")
print(f"secret_data: {svc.secret_data()}")
if __name__ == "__main__":
main()
Introspection¶
Runtime service discovery with enable_describe=True. Shows introspect() for pipe transport and http_introspect() for HTTP.
"""Runtime service introspection with ``enable_describe``.
Demonstrates how to discover methods, their types, and parameter
signatures at runtime using the built-in ``__describe__`` RPC method
and the ``introspect()`` helper.
Run::
python examples/introspection.py
"""
from __future__ import annotations
import threading
from dataclasses import dataclass
from typing import Protocol
import pyarrow as pa
from vgi_rpc import (
CallContext,
MethodType,
OutputCollector,
ProducerState,
RpcServer,
Stream,
StreamState,
introspect,
make_pipe_pair,
)
# ---------------------------------------------------------------------------
# 1. Define a Protocol with a mix of unary and stream methods
# ---------------------------------------------------------------------------
class DemoService(Protocol):
"""A demo service for introspection."""
def greet(self, name: str) -> str:
"""Greet someone by name."""
...
def add(self, a: float, b: float) -> float:
"""Add two numbers."""
...
def count(self, limit: int) -> Stream[StreamState]:
"""Count from 1 up to *limit*."""
...
# ---------------------------------------------------------------------------
# 2. Implement the service (methods are only called for schema extraction)
# ---------------------------------------------------------------------------
@dataclass
class CountState(ProducerState):
"""State for the count producer stream."""
current: int
limit: int
def produce(self, out: OutputCollector, ctx: CallContext) -> None:
"""Emit one value per tick."""
if self.current > self.limit:
out.finish()
return
out.emit_pydict({"value": [self.current]})
self.current += 1
_COUNT_SCHEMA = pa.schema([pa.field("value", pa.int64())])
class DemoServiceImpl:
"""Concrete implementation of DemoService."""
def greet(self, name: str) -> str:
"""Greet someone by name."""
return f"Hello, {name}!"
def add(self, a: float, b: float) -> float:
"""Add two numbers."""
return a + b
def count(self, limit: int) -> Stream[CountState]:
"""Count from 1 up to *limit*."""
return Stream(output_schema=_COUNT_SCHEMA, state=CountState(current=1, limit=limit))
# ---------------------------------------------------------------------------
# 3. Introspect the service at runtime
# ---------------------------------------------------------------------------
def main() -> None:
"""Start a server with introspection enabled and discover its methods."""
server = RpcServer(DemoService, DemoServiceImpl(), enable_describe=True)
client_pipe, server_pipe = make_pipe_pair()
thread = threading.Thread(target=server.serve, args=(server_pipe,), daemon=True)
thread.start()
try:
desc = introspect(client_pipe)
print(f"Service: {desc.protocol_name}")
print(f"Methods ({len(desc.methods)}):")
for name in sorted(desc.methods):
method = desc.methods[name]
kind = "unary" if method.method_type == MethodType.UNARY else "stream"
params = ", ".join(f"{p}: {method.param_types[p]}" for p in method.param_types)
print(f" {name} ({kind})")
print(f" {params}")
finally:
client_pipe.close()
thread.join(timeout=5)
if __name__ == "__main__":
main()
Shared Memory¶
Zero-copy shared memory transport with ShmPipeTransport and ShmSegment. Demonstrates the side-channel optimization for large batches between co-located processes.
"""Zero-copy shared memory transport with ``ShmPipeTransport``.
Demonstrates how to set up a shared memory segment and wrap pipe
transports in ``ShmPipeTransport`` for zero-copy Arrow batch transfer
between co-located processes (or threads).
Run::
python examples/shared_memory.py
"""
from __future__ import annotations
import contextlib
import threading
from typing import Protocol, cast
from vgi_rpc import RpcServer, ShmPipeTransport, make_pipe_pair
from vgi_rpc.rpc import _RpcProxy
from vgi_rpc.shm import ShmSegment
# ---------------------------------------------------------------------------
# 1. Define a Protocol and implementation
# ---------------------------------------------------------------------------
class MathService(Protocol):
"""A simple math service."""
def add(self, a: float, b: float) -> float:
"""Add two numbers."""
...
def multiply(self, a: float, b: float) -> float:
"""Multiply two numbers."""
...
class MathServiceImpl:
"""Concrete implementation of MathService."""
def add(self, a: float, b: float) -> float:
"""Add two numbers."""
return a + b
def multiply(self, a: float, b: float) -> float:
"""Multiply two numbers."""
return a * b
# ---------------------------------------------------------------------------
# 2. Run the example with shared memory transport
# ---------------------------------------------------------------------------
def main() -> None:
"""Create a shared memory segment, wrap pipes, and call methods."""
# Allocate a 4 MB shared memory segment
shm = ShmSegment.create(4 * 1024 * 1024)
try:
# Create pipe pair and wrap in ShmPipeTransport
client_pipe, server_pipe = make_pipe_pair()
client_transport = ShmPipeTransport(client_pipe, shm)
server_transport = ShmPipeTransport(server_pipe, shm)
# Start server on a daemon thread
server = RpcServer(MathService, MathServiceImpl())
thread = threading.Thread(target=server.serve, args=(server_transport,), daemon=True)
thread.start()
try:
# Create a typed proxy and call methods
svc = cast(MathService, _RpcProxy(MathService, client_transport))
result_add = svc.add(a=2.5, b=3.5)
print(f"add(2.5, 3.5) = {result_add}")
result_mul = svc.multiply(a=4.0, b=5.0)
print(f"multiply(4.0, 5.0) = {result_mul}")
finally:
client_transport.close()
thread.join(timeout=5)
finally:
shm.unlink()
with contextlib.suppress(BufferError):
shm.close()
if __name__ == "__main__":
main()