Skip to content

Examples

Runnable scripts demonstrating key vgi-rpc features. Each example is self-contained — copy, run, and modify. All examples are tested via test_examples.py to stay in sync with the API.

Getting Started

Hello World

The minimal starting point: define a Protocol, implement it, and call through a typed proxy using in-process pipe transport.

"""Minimal vgi-rpc example: define a service and call it in-process.

This is the quickest way to get started. The service runs in a background
thread and communicates over an in-process pipe — no subprocess or network
needed.

Run::

    python examples/hello_world.py
"""

from __future__ import annotations

from typing import Protocol

from vgi_rpc import serve_pipe


# 1. Define the service interface as a Protocol class.
#    Return types determine the method type (unary here).
class Greeter(Protocol):
    """A simple greeting service."""

    def greet(self, name: str) -> str:
        """Return a greeting for *name*."""
        ...

    def add(self, a: float, b: float) -> float:
        """Add two numbers."""
        ...


# 2. Implement the interface.
class GreeterImpl:
    """Concrete implementation of the Greeter service."""

    def greet(self, name: str) -> str:
        """Return a greeting for *name*."""
        return f"Hello, {name}!"

    def add(self, a: float, b: float) -> float:
        """Add two numbers."""
        return a + b


# 3. Start the server in-process and call methods through a typed proxy.
def main() -> None:
    """Run the example."""
    with serve_pipe(Greeter, GreeterImpl()) as svc:
        print(svc.greet(name="World"))  # Hello, World!
        print(svc.add(a=2.5, b=3.5))  # 6.0


if __name__ == "__main__":
    main()

Streaming

Producer streams (server pushes data, client iterates) and exchange streams (lockstep bidirectional communication). Shows ProducerState, ExchangeState, OutputCollector, and out.finish().

"""Streaming examples: producer streams and exchange (bidirectional) streams.

Producer streams generate a sequence of Arrow RecordBatches from the server.
Exchange streams allow the client to send data and receive transformed results
in a lockstep request/response pattern.

Run::

    python examples/streaming.py
"""

from __future__ import annotations

from dataclasses import dataclass
from typing import Any, Protocol, cast

import pyarrow as pa
import pyarrow.compute as pc

from vgi_rpc import (
    AnnotatedBatch,
    CallContext,
    ExchangeState,
    OutputCollector,
    ProducerState,
    Stream,
    StreamState,
    serve_pipe,
)

# ---------------------------------------------------------------------------
# Producer stream: server generates batches, client iterates
# ---------------------------------------------------------------------------


@dataclass
class CounterState(ProducerState):
    """State for the counter producer stream.

    Extends ``ProducerState`` so only ``produce(out, ctx)`` needs to be
    implemented — no phantom ``input`` parameter to ignore.
    Call ``out.finish()`` to signal the end of the stream.
    """

    limit: int
    current: int = 0

    def produce(self, out: OutputCollector, ctx: CallContext) -> None:
        """Emit one batch per call, finish when done."""
        if self.current >= self.limit:
            out.finish()
            return
        out.emit_pydict({"n": [self.current], "n_squared": [self.current**2]})
        self.current += 1


# ---------------------------------------------------------------------------
# Exchange stream: client sends data, server transforms and returns it
# ---------------------------------------------------------------------------


@dataclass
class ScaleState(ExchangeState):
    """State for the scale exchange stream.

    Extends ``ExchangeState`` so only ``exchange(input, out, ctx)`` needs
    to be implemented.  Exchange streams must emit exactly one output batch
    per call and must not call ``out.finish()``.
    """

    factor: float

    def exchange(self, input: AnnotatedBatch, out: OutputCollector, ctx: CallContext) -> None:
        """Multiply each value by the configured factor."""
        scaled = cast("pa.Array[Any]", pc.multiply(input.batch.column("value"), self.factor))
        out.emit_arrays([scaled])


# ---------------------------------------------------------------------------
# Service definition
# ---------------------------------------------------------------------------

_COUNTER_SCHEMA = pa.schema([pa.field("n", pa.int64()), pa.field("n_squared", pa.int64())])
_SCALE_SCHEMA = pa.schema([pa.field("value", pa.float64())])


class MathService(Protocol):
    """Service demonstrating producer and exchange streams."""

    def count(self, limit: int) -> Stream[StreamState]:
        """Produce *limit* batches of (n, n_squared)."""
        ...

    def scale(self, factor: float) -> Stream[StreamState]:
        """Multiply incoming values by *factor*."""
        ...


class MathServiceImpl:
    """Concrete implementation of MathService."""

    def count(self, limit: int) -> Stream[CounterState]:
        """Produce *limit* batches of (n, n_squared)."""
        return Stream(output_schema=_COUNTER_SCHEMA, state=CounterState(limit=limit))

    def scale(self, factor: float) -> Stream[ScaleState]:
        """Multiply incoming values by *factor*."""
        return Stream(
            output_schema=_SCALE_SCHEMA,
            state=ScaleState(factor=factor),
            input_schema=_SCALE_SCHEMA,  # Setting input_schema makes this an exchange stream
        )


# ---------------------------------------------------------------------------
# Client usage
# ---------------------------------------------------------------------------


def main() -> None:
    """Run the streaming examples."""
    with serve_pipe(MathService, MathServiceImpl()) as svc:
        # --- Producer stream: iterate over server-generated batches ----------
        print("=== Producer stream (count to 5) ===")
        for batch in svc.count(limit=5):
            rows = batch.batch.to_pylist()
            for row in rows:
                print(f"  n={row['n']}  n^2={row['n_squared']}")

        # --- Exchange stream: send data, receive transformed results ---------
        print("\n=== Exchange stream (scale by 10) ===")
        with svc.scale(factor=10.0) as stream:
            for values in [[1.0, 2.0, 3.0], [100.0, 200.0]]:
                input_batch = AnnotatedBatch(pa.RecordBatch.from_pydict({"value": values}))
                result = stream.exchange(input_batch)
                print(f"  input={values}  output={result.batch.column('value').to_pylist()}")


if __name__ == "__main__":
    main()

Structured Types

Using ArrowSerializableDataclass for complex parameters: dataclasses with enums, nested types, and optional fields. Demonstrates automatic Arrow schema inference.

"""Using dataclasses as RPC parameters and return types.

``ArrowSerializableDataclass`` lets you pass structured data across the
RPC boundary. Fields are automatically mapped to Arrow types.

Run::

    python examples/structured_types.py
"""

from __future__ import annotations

from dataclasses import dataclass
from enum import Enum
from typing import Protocol

from vgi_rpc import serve_pipe
from vgi_rpc.utils import ArrowSerializableDataclass

# ---------------------------------------------------------------------------
# Domain types
# ---------------------------------------------------------------------------


class Priority(Enum):
    """Task priority levels."""

    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"


@dataclass(frozen=True)
class Task(ArrowSerializableDataclass):
    """A task with structured fields.

    Supported field types include: str, int, float, bool, Enum,
    list[T], dict[K, V], frozenset[T], Optional types, and nested
    ArrowSerializableDataclass instances.
    """

    title: str
    priority: Priority
    tags: list[str]
    metadata: dict[str, str]
    done: bool = False


@dataclass(frozen=True)
class TaskSummary(ArrowSerializableDataclass):
    """Summary returned by the task service."""

    total: int
    high_priority: int
    titles: list[str]


# ---------------------------------------------------------------------------
# Service
# ---------------------------------------------------------------------------


class TaskService(Protocol):
    """Service that accepts and returns dataclass parameters."""

    def create_task(self, task: Task) -> str:
        """Store a task and return its ID."""
        ...

    def summarize(self) -> TaskSummary:
        """Return a summary of all stored tasks."""
        ...


class TaskServiceImpl:
    """In-memory task store."""

    def __init__(self) -> None:
        """Initialize the store."""
        self._tasks: list[Task] = []
        self._next_id: int = 0

    def create_task(self, task: Task) -> str:
        """Store a task and return its ID."""
        self._tasks.append(task)
        task_id = f"TASK-{self._next_id}"
        self._next_id += 1
        return task_id

    def summarize(self) -> TaskSummary:
        """Return a summary of all stored tasks."""
        return TaskSummary(
            total=len(self._tasks),
            high_priority=sum(1 for t in self._tasks if t.priority == Priority.HIGH),
            titles=[t.title for t in self._tasks],
        )


# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------


def main() -> None:
    """Run the dataclass example."""
    with serve_pipe(TaskService, TaskServiceImpl()) as svc:
        # Create some tasks using structured dataclass parameters
        id1 = svc.create_task(
            task=Task(
                title="Write documentation",
                priority=Priority.HIGH,
                tags=["docs", "urgent"],
                metadata={"assignee": "alice"},
            )
        )
        print(f"Created: {id1}")

        id2 = svc.create_task(
            task=Task(
                title="Run benchmarks",
                priority=Priority.LOW,
                tags=["perf"],
                metadata={"env": "staging"},
            )
        )
        print(f"Created: {id2}")

        # Get a structured summary back
        summary = svc.summarize()
        print(f"\nTotal tasks:    {summary.total}")
        print(f"High priority:  {summary.high_priority}")
        print(f"Titles:         {summary.titles}")


if __name__ == "__main__":
    main()

Transports

HTTP Server

Serve an RPC service over HTTP using Falcon + waitress. Shows make_wsgi_app with a WSGI server.

"""HTTP server example using Falcon (WSGI) and waitress.

Requires the HTTP extra: ``pip install vgi-rpc[http]``

Start the server::

    python examples/http_server.py

Then run the client in another terminal::

    python examples/http_client.py
"""

from __future__ import annotations

import socket
import sys
from dataclasses import dataclass
from typing import Protocol

import pyarrow as pa
import waitress

from vgi_rpc import (
    CallContext,
    Level,
    OutputCollector,
    ProducerState,
    RpcServer,
    Stream,
    StreamState,
)
from vgi_rpc.http import make_wsgi_app

PORT = 8234


# ---------------------------------------------------------------------------
# Service definition
# ---------------------------------------------------------------------------


@dataclass
class FibState(ProducerState):
    """Produce Fibonacci numbers up to *limit*."""

    limit: int
    a: int = 0
    b: int = 1

    def produce(self, out: OutputCollector, ctx: CallContext) -> None:
        """Emit the next Fibonacci number."""
        if self.a > self.limit:
            out.finish()
            return
        out.emit_pydict({"fib": [self.a]})
        self.a, self.b = self.b, self.a + self.b


class DemoService(Protocol):
    """Demonstration HTTP service."""

    def echo(self, message: str) -> str:
        """Echo a message back."""
        ...

    def fibonacci(self, limit: int) -> Stream[StreamState]:
        """Stream Fibonacci numbers up to *limit*."""
        ...


class DemoServiceImpl:
    """Concrete implementation of DemoService."""

    def echo(self, message: str, ctx: CallContext | None = None) -> str:
        """Echo a message back, with optional server-side logging."""
        if ctx:
            ctx.client_log(Level.INFO, f"Echoing: {message}")
        return message

    def fibonacci(self, limit: int) -> Stream[FibState]:
        """Stream Fibonacci numbers up to *limit*."""
        schema = pa.schema([pa.field("fib", pa.int64())])
        return Stream(output_schema=schema, state=FibState(limit=limit))


# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------


def _find_free_port() -> int:
    """Find a free TCP port on localhost."""
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.bind(("127.0.0.1", 0))
        return int(s.getsockname()[1])


def main() -> None:
    """Start the HTTP server."""
    port = int(sys.argv[1]) if len(sys.argv) > 1 else PORT
    if port == 0:
        port = _find_free_port()

    server = RpcServer(DemoService, DemoServiceImpl())
    app = make_wsgi_app(server)

    print(f"Serving DemoService on http://127.0.0.1:{port}", flush=True)
    waitress.serve(app, host="127.0.0.1", port=port, _quiet=True)


if __name__ == "__main__":
    main()

HTTP Client

Connect to an HTTP RPC service using http_connect. The proxy is typed as the Protocol class.

"""HTTP client that connects to the demo HTTP server.

Requires the HTTP extra: ``pip install vgi-rpc[http]``

Start the server first::

    python examples/http_server.py

Then run this client::

    python examples/http_client.py
"""

from __future__ import annotations

from typing import Protocol

from vgi_rpc import Stream, StreamState
from vgi_rpc.http import http_connect

PORT = 8234


class DemoService(Protocol):
    """Demonstration HTTP service.

    This Protocol is duplicated from http_server.py so each file is
    self-contained.  In a real project you'd define the Protocol once in a
    shared module and import it from both sides.
    """

    def echo(self, message: str) -> str:
        """Echo a message back."""
        ...

    def fibonacci(self, limit: int) -> Stream[StreamState]:
        """Stream Fibonacci numbers up to *limit*."""
        ...


def main() -> None:
    """Connect to the HTTP server and make calls."""
    url = f"http://127.0.0.1:{PORT}"

    with http_connect(DemoService, url) as svc:
        # Unary call
        result = svc.echo(message="Hello from HTTP!")
        print(f"echo: {result}")

        # Streaming call
        print("\nFibonacci numbers up to 100:")
        for batch in svc.fibonacci(limit=100):
            for row in batch.batch.to_pylist():
                print(f"  {row['fib']}")


if __name__ == "__main__":
    main()

Subprocess Worker

Entry point for a subprocess RPC worker. Uses run_server to serve over stdin/stdout.

"""Subprocess server entry point.

This script serves an RPC service over stdin/stdout, designed to be spawned
as a child process by a client using ``vgi_rpc.connect()``.

The client example is in ``subprocess_client.py``.

Run the client (which spawns this automatically)::

    python examples/subprocess_client.py
"""

from __future__ import annotations

from typing import Protocol

from vgi_rpc import run_server


class Calculator(Protocol):
    """Simple calculator service."""

    def add(self, a: float, b: float) -> float:
        """Add two numbers."""
        ...

    def multiply(self, a: float, b: float) -> float:
        """Multiply two numbers."""
        ...

    def divide(self, a: float, b: float) -> float:
        """Divide a by b."""
        ...


class CalculatorImpl:
    """Concrete implementation of Calculator."""

    def add(self, a: float, b: float) -> float:
        """Add two numbers."""
        return a + b

    def multiply(self, a: float, b: float) -> float:
        """Multiply two numbers."""
        return a * b

    def divide(self, a: float, b: float) -> float:
        """Divide a by b."""
        if b == 0.0:
            raise ValueError("Division by zero")
        return a / b


def main() -> None:
    """Serve over stdin/stdout."""
    run_server(Calculator, CalculatorImpl())


if __name__ == "__main__":
    main()

Subprocess Client

Connect to a subprocess worker with connect. Shows error handling with RpcError.

"""Client that spawns a subprocess server and calls methods on it.

Uses ``vgi_rpc.connect()`` which launches the worker as a child process
and communicates over stdin/stdout pipes.

Run::

    python examples/subprocess_client.py
"""

from __future__ import annotations

import sys
from pathlib import Path
from typing import Protocol

from vgi_rpc import RpcError, connect

_HERE = Path(__file__).resolve().parent


class Calculator(Protocol):
    """Simple calculator service.

    This Protocol is duplicated from subprocess_worker.py so each file is
    self-contained.  In a real project you'd define the Protocol once in a
    shared module and import it from both sides.
    """

    def add(self, a: float, b: float) -> float:
        """Add two numbers."""
        ...

    def multiply(self, a: float, b: float) -> float:
        """Multiply two numbers."""
        ...

    def divide(self, a: float, b: float) -> float:
        """Divide a by b."""
        ...


def main() -> None:
    """Spawn the worker and make RPC calls."""
    cmd = [sys.executable, str(_HERE / "subprocess_worker.py")]

    with connect(Calculator, cmd) as calc:
        print(f"add(2, 3)      = {calc.add(a=2, b=3)}")
        print(f"multiply(4, 5) = {calc.multiply(a=4, b=5)}")
        print(f"divide(10, 3)  = {calc.divide(a=10, b=3):.4f}")

        # Server-side exceptions are propagated as RpcError
        try:
            calc.divide(a=1, b=0)
        except RpcError as e:
            print(f"\nCaught remote error: {e.error_type}: {e.error_message}")


if __name__ == "__main__":
    main()

Testing

Testing with Pipe Transport

Unit-testing pattern using serve_pipe() — no network or subprocess needed. Tests both unary and streaming methods.

"""Testing a service with ``serve_pipe()`` — no subprocess or network needed.

``serve_pipe`` runs the server on a background thread and gives you a typed
proxy.  This is the fastest way to write unit tests for your RPC service.

Run::

    python examples/testing_pipe.py
"""

from __future__ import annotations

from dataclasses import dataclass
from typing import Protocol

import pyarrow as pa

from vgi_rpc import CallContext, OutputCollector, ProducerState, Stream, StreamState, serve_pipe

# ---------------------------------------------------------------------------
# 1. Define a Protocol and implementation (same as production code)
# ---------------------------------------------------------------------------


class MathService(Protocol):
    """A small service with a unary method and a producer stream."""

    def add(self, a: float, b: float) -> float:
        """Add two numbers."""
        ...

    def countdown(self, n: int) -> Stream[StreamState]:
        """Count down from *n* to 1."""
        ...


@dataclass
class CountdownState(ProducerState):
    """State for the countdown producer stream."""

    n: int

    def produce(self, out: OutputCollector, ctx: CallContext) -> None:
        """Emit one value per tick, finish when done."""
        if self.n <= 0:
            out.finish()
            return
        out.emit_pydict({"value": [self.n]})
        self.n -= 1


_COUNTDOWN_SCHEMA = pa.schema([pa.field("value", pa.int64())])


class MathServiceImpl:
    """Concrete implementation of MathService."""

    def add(self, a: float, b: float) -> float:
        """Add two numbers."""
        return a + b

    def countdown(self, n: int) -> Stream[CountdownState]:
        """Count down from *n* to 1."""
        return Stream(output_schema=_COUNTDOWN_SCHEMA, state=CountdownState(n=n))


# ---------------------------------------------------------------------------
# 2. Test the service using serve_pipe()
# ---------------------------------------------------------------------------


def main() -> None:
    """Run the testing example."""
    with serve_pipe(MathService, MathServiceImpl()) as svc:
        # --- Unary method ---------------------------------------------------
        result = svc.add(a=2.0, b=3.0)
        assert result == 5.0
        print(f"add(2, 3) = {result}")

        # --- Producer stream ------------------------------------------------
        values: list[int] = [row["value"] for batch in svc.countdown(n=3) for row in batch.batch.to_pylist()]
        assert values == [3, 2, 1]
        print(f"countdown(3) = {values}")

    print("All assertions passed!")


if __name__ == "__main__":
    main()

Testing with HTTP Transport

Unit-testing the full HTTP stack (including auth middleware) using make_sync_client() — no real HTTP server needed.

"""Testing the HTTP transport without a running server.

``make_sync_client`` wraps a Falcon ``TestClient`` so you can exercise the
full HTTP transport stack (authentication, serialization, streaming) in-process
with zero network I/O.

Requires ``pip install vgi-rpc[http]``

Run::

    python examples/testing_http.py
"""

from __future__ import annotations

from dataclasses import dataclass
from typing import Protocol

import falcon
import pyarrow as pa

from vgi_rpc import (
    AuthContext,
    CallContext,
    OutputCollector,
    ProducerState,
    RpcServer,
    Stream,
    StreamState,
    serve_pipe,
)
from vgi_rpc.http import http_connect, make_sync_client

# ---------------------------------------------------------------------------
# 1. Define a Protocol and implementation
# ---------------------------------------------------------------------------


class DemoService(Protocol):
    """Service used for HTTP testing examples."""

    def greet(self, name: str) -> str:
        """Greet by name."""
        ...

    def whoami(self) -> str:
        """Return the caller's identity."""
        ...

    def countdown(self, n: int) -> Stream[StreamState]:
        """Count down from *n* to 1."""
        ...


@dataclass
class CountdownState(ProducerState):
    """State for the countdown producer stream."""

    n: int

    def produce(self, out: OutputCollector, ctx: CallContext) -> None:
        """Emit one value per tick, finish when done."""
        if self.n <= 0:
            out.finish()
            return
        out.emit_pydict({"value": [self.n]})
        self.n -= 1


_COUNTDOWN_SCHEMA = pa.schema([pa.field("value", pa.int64())])


class DemoServiceImpl:
    """Concrete implementation of DemoService."""

    def greet(self, name: str) -> str:
        """Greet by name."""
        return f"Hello, {name}!"

    def whoami(self, ctx: CallContext) -> str:
        """Return the caller's identity."""
        return ctx.auth.principal or "anonymous"

    def countdown(self, n: int) -> Stream[CountdownState]:
        """Count down from *n* to 1."""
        return Stream(output_schema=_COUNTDOWN_SCHEMA, state=CountdownState(n=n))


# ---------------------------------------------------------------------------
# 2. Helper: create an authenticate callback
# ---------------------------------------------------------------------------


def _authenticate(req: falcon.Request) -> AuthContext:
    """Authenticate requests using a simple bearer token."""
    token = req.get_header("Authorization") or ""
    if token == "Bearer test-token":
        return AuthContext(domain="bearer", authenticated=True, principal="alice")
    return AuthContext.anonymous()


# ---------------------------------------------------------------------------
# 3. Test with make_sync_client (HTTP transport, no real server)
# ---------------------------------------------------------------------------


def main() -> None:
    """Run the HTTP testing examples."""
    server = RpcServer(DemoService, DemoServiceImpl())

    # --- Basic HTTP testing -------------------------------------------------
    client = make_sync_client(server)
    with http_connect(DemoService, client=client) as svc:
        result = svc.greet(name="World")
        assert result == "Hello, World!"
        print(f"greet('World') = {result}")

        # Anonymous caller
        who = svc.whoami()
        assert who == "anonymous"
        print(f"whoami() = {who}")

        # Producer stream over HTTP
        values: list[int] = [row["value"] for batch in svc.countdown(n=3) for row in batch.batch.to_pylist()]
        assert values == [3, 2, 1]
        print(f"countdown(3) = {values}")

    # --- Authenticated HTTP testing -----------------------------------------
    client = make_sync_client(
        server,
        authenticate=_authenticate,
        default_headers={"Authorization": "Bearer test-token"},
    )
    with http_connect(DemoService, client=client) as svc:
        who = svc.whoami()
        assert who == "alice"
        print(f"whoami() [authenticated] = {who}")

    # --- Pipe transport for comparison --------------------------------------
    with serve_pipe(DemoService, DemoServiceImpl()) as svc:
        result = svc.greet(name="Pipe")
        assert result == "Hello, Pipe!"
        print(f"greet('Pipe') [pipe] = {result}")

    print("All assertions passed!")


if __name__ == "__main__":
    main()

Advanced Features

Authentication

HTTP authentication with Bearer tokens. Shows authenticate callback, AuthContext, CallContext.auth.require_authenticated(), and guarded methods.

"""HTTP authentication with Bearer tokens and guarded methods.

Demonstrates how to wire up an ``authenticate`` callback, inject
``CallContext`` into method implementations, and gate access with
``require_authenticated()``.  Uses ``make_sync_client`` so no real
HTTP server is needed.

Requires ``pip install vgi-rpc[http]``

Run::

    python examples/auth.py
"""

from __future__ import annotations

from typing import Protocol

import falcon

from vgi_rpc import AuthContext, CallContext, RpcServer
from vgi_rpc.http import http_connect, make_sync_client

# ---------------------------------------------------------------------------
# 1. Define an authenticate callback
# ---------------------------------------------------------------------------


def authenticate(req: falcon.Request) -> AuthContext:
    """Extract a Bearer token from the Authorization header.

    Returns an authenticated ``AuthContext`` with the token value as
    principal and a hard-coded ``role`` claim for demonstration.

    Raises:
        ValueError: If the header is missing or malformed.

    """
    auth_header = req.get_header("Authorization") or ""
    if not auth_header.startswith("Bearer "):
        raise ValueError("Missing or invalid Authorization header")
    principal = auth_header.removeprefix("Bearer ")
    return AuthContext(domain="bearer", authenticated=True, principal=principal, claims={"role": "admin"})


# ---------------------------------------------------------------------------
# 2. Define the service Protocol
# ---------------------------------------------------------------------------


class SecureService(Protocol):
    """A service with public and guarded methods."""

    def status(self) -> str:
        """Public health-check endpoint."""
        ...

    def whoami(self) -> str:
        """Return the caller's identity."""
        ...

    def secret_data(self) -> str:
        """Return secret data (requires authentication)."""
        ...


# ---------------------------------------------------------------------------
# 3. Implement the service
# ---------------------------------------------------------------------------


class SecureServiceImpl:
    """Concrete implementation of SecureService."""

    def status(self) -> str:
        """Public health-check endpoint."""
        return "ok"

    def whoami(self, ctx: CallContext) -> str:
        """Return the caller's identity."""
        return ctx.auth.principal or "anonymous"

    def secret_data(self, ctx: CallContext) -> str:
        """Return secret data (requires authentication)."""
        ctx.auth.require_authenticated()
        role = ctx.auth.claims.get("role", "unknown")
        return f"secret for {ctx.auth.principal} (role={role})"


# ---------------------------------------------------------------------------
# 4. Run the example
# ---------------------------------------------------------------------------


def main() -> None:
    """Create an authenticated HTTP client and call all three methods."""
    server = RpcServer(SecureService, SecureServiceImpl())

    client = make_sync_client(
        server,
        signing_key=b"example-signing-key",
        authenticate=authenticate,
        default_headers={"Authorization": "Bearer alice"},
    )

    with http_connect(SecureService, client=client) as svc:
        print(f"status (public):  {svc.status()}")
        print(f"whoami:           {svc.whoami()}")
        print(f"secret_data:      {svc.secret_data()}")


if __name__ == "__main__":
    main()

Introspection

Runtime service discovery with enable_describe=True. Shows introspect() for pipe transport and http_introspect() for HTTP.

"""Runtime service introspection with ``enable_describe``.

Demonstrates how to discover methods, their types, and parameter
signatures at runtime using the built-in ``__describe__`` RPC method
and the ``introspect()`` helper.

Run::

    python examples/introspection.py
"""

from __future__ import annotations

import threading
from dataclasses import dataclass
from typing import Protocol

import pyarrow as pa

from vgi_rpc import (
    CallContext,
    MethodType,
    OutputCollector,
    ProducerState,
    RpcServer,
    Stream,
    StreamState,
    introspect,
    make_pipe_pair,
)

# ---------------------------------------------------------------------------
# 1. Define a Protocol with a mix of unary and stream methods
# ---------------------------------------------------------------------------


class DemoService(Protocol):
    """A demo service for introspection."""

    def greet(self, name: str) -> str:
        """Greet someone by name."""
        ...

    def add(self, a: float, b: float) -> float:
        """Add two numbers."""
        ...

    def count(self, limit: int) -> Stream[StreamState]:
        """Count from 1 up to *limit*."""
        ...


# ---------------------------------------------------------------------------
# 2. Implement the service (methods are only called for schema extraction)
# ---------------------------------------------------------------------------


@dataclass
class CountState(ProducerState):
    """State for the count producer stream."""

    current: int
    limit: int

    def produce(self, out: OutputCollector, ctx: CallContext) -> None:
        """Emit one value per tick."""
        if self.current > self.limit:
            out.finish()
            return
        out.emit_pydict({"value": [self.current]})
        self.current += 1


_COUNT_SCHEMA = pa.schema([pa.field("value", pa.int64())])


class DemoServiceImpl:
    """Concrete implementation of DemoService."""

    def greet(self, name: str) -> str:
        """Greet someone by name."""
        return f"Hello, {name}!"

    def add(self, a: float, b: float) -> float:
        """Add two numbers."""
        return a + b

    def count(self, limit: int) -> Stream[CountState]:
        """Count from 1 up to *limit*."""
        return Stream(output_schema=_COUNT_SCHEMA, state=CountState(current=1, limit=limit))


# ---------------------------------------------------------------------------
# 3. Introspect the service at runtime
# ---------------------------------------------------------------------------


def main() -> None:
    """Start a server with introspection enabled and discover its methods."""
    server = RpcServer(DemoService, DemoServiceImpl(), enable_describe=True)
    client_pipe, server_pipe = make_pipe_pair()

    thread = threading.Thread(target=server.serve, args=(server_pipe,), daemon=True)
    thread.start()

    try:
        desc = introspect(client_pipe)

        print(f"Service: {desc.protocol_name}")
        print(f"Methods ({len(desc.methods)}):")
        for name in sorted(desc.methods):
            method = desc.methods[name]
            kind = "unary" if method.method_type == MethodType.UNARY else "stream"
            params = ", ".join(f"{p}: {method.param_types[p]}" for p in method.param_types)
            print(f"  {name} ({kind})")
            print(f"    {params}")
    finally:
        client_pipe.close()
        thread.join(timeout=5)


if __name__ == "__main__":
    main()

Shared Memory

Zero-copy shared memory transport with ShmPipeTransport and ShmSegment. Demonstrates the side-channel optimization for large batches between co-located processes.

"""Zero-copy shared memory transport with ``ShmPipeTransport``.

Demonstrates how to set up a shared memory segment and wrap pipe
transports in ``ShmPipeTransport`` for zero-copy Arrow batch transfer
between co-located processes (or threads).

Run::

    python examples/shared_memory.py
"""

from __future__ import annotations

import contextlib
import threading
from typing import Protocol, cast

from vgi_rpc import RpcServer, ShmPipeTransport, make_pipe_pair
from vgi_rpc.rpc import _RpcProxy
from vgi_rpc.shm import ShmSegment

# ---------------------------------------------------------------------------
# 1. Define a Protocol and implementation
# ---------------------------------------------------------------------------


class MathService(Protocol):
    """A simple math service."""

    def add(self, a: float, b: float) -> float:
        """Add two numbers."""
        ...

    def multiply(self, a: float, b: float) -> float:
        """Multiply two numbers."""
        ...


class MathServiceImpl:
    """Concrete implementation of MathService."""

    def add(self, a: float, b: float) -> float:
        """Add two numbers."""
        return a + b

    def multiply(self, a: float, b: float) -> float:
        """Multiply two numbers."""
        return a * b


# ---------------------------------------------------------------------------
# 2. Run the example with shared memory transport
# ---------------------------------------------------------------------------


def main() -> None:
    """Create a shared memory segment, wrap pipes, and call methods."""
    # Allocate a 4 MB shared memory segment
    shm = ShmSegment.create(4 * 1024 * 1024)
    try:
        # Create pipe pair and wrap in ShmPipeTransport
        client_pipe, server_pipe = make_pipe_pair()
        client_transport = ShmPipeTransport(client_pipe, shm)
        server_transport = ShmPipeTransport(server_pipe, shm)

        # Start server on a daemon thread
        server = RpcServer(MathService, MathServiceImpl())
        thread = threading.Thread(target=server.serve, args=(server_transport,), daemon=True)
        thread.start()

        try:
            # Create a typed proxy and call methods
            svc = cast(MathService, _RpcProxy(MathService, client_transport))

            result_add = svc.add(a=2.5, b=3.5)
            print(f"add(2.5, 3.5)      = {result_add}")

            result_mul = svc.multiply(a=4.0, b=5.0)
            print(f"multiply(4.0, 5.0) = {result_mul}")
        finally:
            client_transport.close()
            thread.join(timeout=5)
    finally:
        shm.unlink()
        with contextlib.suppress(BufferError):
            shm.close()


if __name__ == "__main__":
    main()