Skip to content

Core RPC

The core module provides the server, connection, transport interface, error types, and convenience functions for defining and running RPC services.

Typical Usage

Most users only need serve_pipe (testing) or connect (subprocess):

from vgi_rpc import serve_pipe, connect

# In-process (tests)
with serve_pipe(MyService, MyServiceImpl()) as proxy:
    proxy.my_method(arg=42)

# Subprocess
with connect(MyService, ["python", "worker.py"]) as proxy:
    proxy.my_method(arg=42)

For more control, use RpcServer and RpcConnection directly.

API Reference

RpcServer

RpcServer

RpcServer(
    protocol: type,
    implementation: object,
    *,
    external_location: ExternalLocationConfig | None = None,
    server_id: str | None = None,
    server_version: str = "",
    enable_describe: bool = False,
    ipc_validation: IpcValidation = FULL
)

Dispatches RPC requests to an implementation over IO-stream transports.

Initialize with a protocol type and its implementation.

PARAMETER DESCRIPTION
protocol

The Protocol class defining the RPC interface. If the class declares a protocol_version: ClassVar[str] attribute (canonical semver MAJOR.MINOR.PATCH), the server enforces an exact major+minor match on every dispatched request. See _check_protocol_version for the comparison rule.

TYPE: type

implementation

Object implementing all methods from protocol.

TYPE: object

external_location

Optional ExternalLocation configuration.

TYPE: ExternalLocationConfig | None DEFAULT: None

server_id

Optional server identifier; auto-generated if None.

TYPE: str | None DEFAULT: None

server_version

Build version string included in access log entries.

TYPE: str DEFAULT: ''

enable_describe

When True, the server handles __describe__ requests returning machine-readable method metadata.

TYPE: bool DEFAULT: False

ipc_validation

Validation level for incoming IPC batches. Defaults to FULL for maximum safety.

TYPE: IpcValidation DEFAULT: FULL

Source code in vgi_rpc/rpc/_server.py
def __init__(
    self,
    protocol: type,
    implementation: object,
    *,
    external_location: ExternalLocationConfig | None = None,
    server_id: str | None = None,
    server_version: str = "",
    enable_describe: bool = False,
    ipc_validation: IpcValidation = IpcValidation.FULL,
) -> None:
    """Initialize with a protocol type and its implementation.

    Args:
        protocol: The Protocol class defining the RPC interface. If the
            class declares a ``protocol_version: ClassVar[str]`` attribute
            (canonical semver MAJOR.MINOR.PATCH), the server enforces an
            exact major+minor match on every dispatched request. See
            ``_check_protocol_version`` for the comparison rule.
        implementation: Object implementing all methods from *protocol*.
        external_location: Optional ExternalLocation configuration.
        server_id: Optional server identifier; auto-generated if ``None``.
        server_version: Build version string included in access log entries.
        enable_describe: When ``True``, the server handles ``__describe__``
            requests returning machine-readable method metadata.
        ipc_validation: Validation level for incoming IPC batches.
            Defaults to ``FULL`` for maximum safety.

    """
    self._protocol = protocol
    self._impl = implementation
    self._server_version = server_version
    # Read protocol_version directly from the Protocol class's own dict
    # (vars, not getattr) so subclasses that don't redeclare it get None.
    # When None, the dispatch-boundary check no-ops — opt-in by declaration.
    raw_version = vars(protocol).get("protocol_version")
    if raw_version is None:
        self._protocol_version: str | None = None
        self._protocol_version_parts: tuple[int, int, int] | None = None
    else:
        if not isinstance(raw_version, str):
            raise TypeError(f"{protocol.__name__}.protocol_version must be a str, got {type(raw_version).__name__}")
        self._protocol_version = raw_version
        self._protocol_version_parts = parse_version(raw_version)
    self._ipc_validation = ipc_validation
    self._methods = rpc_methods(protocol)
    self._external_config = external_location
    self._server_id = server_id if server_id is not None else uuid.uuid4().hex[:12]
    self._dispatch_hook: _DispatchHook | None = None
    self._transport_kind: TransportKind | None = None
    self._transport_capabilities: frozenset[str] = frozenset()
    self._transport_lock = threading.Lock()
    _validate_implementation(protocol, implementation, self._methods)

    # Compute protocol_hash regardless of describe flag — it is needed for
    # access-log records on every dispatch.
    from vgi_rpc.introspect import build_describe_batch

    _hash_batch, _hash_md = build_describe_batch(
        protocol.__name__, self._methods, self._server_id, self._protocol_version
    )
    from vgi_rpc.metadata import PROTOCOL_HASH_KEY

    self._protocol_hash: str = _hash_md.get(PROTOCOL_HASH_KEY, b"").decode()

    if enable_describe:
        from vgi_rpc.introspect import DESCRIBE_METHOD_NAME

        self._describe_batch: pa.RecordBatch | None = _hash_batch
        self._describe_metadata: pa.KeyValueMetadata | None = _hash_md
        # Register __describe__ as a synthetic unary method so normal dispatch handles it.
        self._methods = {
            **self._methods,
            DESCRIBE_METHOD_NAME: RpcMethodInfo(
                name=DESCRIBE_METHOD_NAME,
                params_schema=_EMPTY_SCHEMA,
                result_schema=self._describe_batch.schema,
                result_type=type(None),
                method_type=MethodType.UNARY,
                has_return=True,
                doc="Return machine-readable metadata about all server methods.",
            ),
        }
    else:
        self._describe_batch = None
        self._describe_metadata = None

    # Detect which impl methods accept a `ctx` parameter.
    self._ctx_methods: frozenset[str] = frozenset(
        name
        for name in self._methods
        if (method := getattr(implementation, name, None)) is not None
        and "ctx" in inspect.signature(method).parameters
    )

    _logger.info(
        "RpcServer created for %s (server_id=%s, methods=%d)",
        protocol.__name__,
        self._server_id,
        len(self._methods),
        extra={"server_id": self._server_id, "protocol": protocol.__name__, "method_count": len(self._methods)},
    )

    # Auto-attach Sentry instrumentation when the SDK is initialised in
    # this process.  We only consult sentry_sdk when it is already
    # imported, so this never forces the optional dependency on users
    # who have not opted into Sentry.
    if "sentry_sdk" in sys.modules:
        try:
            from vgi_rpc.sentry import _maybe_auto_instrument

            _maybe_auto_instrument(self)
        except ImportError:
            _logger.debug("sentry_sdk imported but vgi_rpc.sentry unavailable", exc_info=True)

methods property

methods: Mapping[str, RpcMethodInfo]

Return method metadata for this server's protocol.

implementation property

implementation: object

The implementation object.

external_config property

external_config: ExternalLocationConfig | None

The ExternalLocation configuration, if any.

server_id property

server_id: str

Short random identifier for this server instance.

protocol_name property

protocol_name: str

Name of the Protocol class this server implements.

server_version property

server_version: str

Version string passed at construction (empty if not set).

protocol_version property

protocol_version: str | None

Application protocol surface version declared by the Protocol class.

Read from vars(protocol).get("protocol_version") at construction (a ClassVar[str] in canonical semver MAJOR.MINOR.PATCH form, or None when the Protocol opts out). When set, the server enforces an exact major+minor match on every dispatched request via _check_protocol_version.

protocol_hash property

protocol_hash: str

SHA-256 hex digest of the canonical describe payload.

ctx_methods property

ctx_methods: frozenset[str]

Method names whose implementations accept a ctx parameter.

describe_enabled property

describe_enabled: bool

Whether __describe__ introspection is enabled.

ipc_validation property

ipc_validation: IpcValidation

Validation level for incoming IPC batches.

transport_kind property

transport_kind: TransportKind | None

Coarse identifier of the bound transport, or None before serving begins.

Set by the framework right before the first request is dispatched (lazy on HTTP for fork-safety). Workers may read this directly, or rely on the on_serve_start lifecycle hook for one-shot startup work.

transport_capabilities property

transport_capabilities: frozenset[str]

Capabilities advertised by the bound transport.

Currently includes "shm" when a :class:ShmPipeTransport is bound. Empty before a transport is bound and for kinds without special capabilities.

serve

serve(transport: RpcTransport) -> None

Serve RPC requests in a loop until the transport is closed.

Source code in vgi_rpc/rpc/_server.py
def serve(self, transport: RpcTransport) -> None:
    """Serve RPC requests in a loop until the transport is closed."""
    capabilities: frozenset[str] = frozenset()
    if isinstance(transport, ShmPipeTransport):
        kind = TransportKind.PIPE
        capabilities = frozenset({"shm"})
    elif isinstance(transport, UnixTransport):
        kind = TransportKind.UNIX
    elif isinstance(transport, PipeTransport):
        kind = TransportKind.PIPE
    else:
        kind = TransportKind.PIPE
    self._notify_transport(kind, capabilities)
    while True:
        try:
            self.serve_one(transport)
        except (EOFError, StopIteration):
            break
        except (BrokenPipeError, ConnectionResetError, ConnectionAbortedError):
            _logger.debug(
                "serve loop ending due to broken pipe",
                exc_info=True,
                extra={"server_id": self._server_id},
            )
            break
        except pa.ArrowInvalid:
            _logger.warning(
                "serve loop ending due to ArrowInvalid",
                exc_info=True,
                extra={"server_id": self._server_id},
            )
            break

serve_one

serve_one(transport: RpcTransport) -> None

Handle a single RPC call (any method type) over the given transport.

Protocol-level errors (VersionError, RpcError from missing metadata) are caught, written back as error responses, and the method returns normally so the serve loop can continue.

RAISES DESCRIPTION
ArrowInvalid

If the incoming data is not valid Arrow IPC. An error response is written to transport before raising so the client can read a structured RpcError.

Source code in vgi_rpc/rpc/_server.py
def serve_one(self, transport: RpcTransport) -> None:
    """Handle a single RPC call (any method type) over the given transport.

    Protocol-level errors (``VersionError``, ``RpcError`` from missing
    metadata) are caught, written back as error responses, and the
    method returns normally so the serve loop can continue.

    Raises:
        pa.ArrowInvalid: If the incoming data is not valid Arrow IPC.
            An error response is written to *transport* before raising so
            the client can read a structured ``RpcError``.

    """
    token = _current_request_id.set(_generate_request_id())
    stats = CallStatistics()
    stats_token = _current_call_stats.set(stats)
    md_token = _current_request_metadata.set(None)
    rb_token = _current_request_batch.set(None)
    sid_token = _current_stream_id.set("")
    dynamic_shm: ShmSegment | None = None
    try:
        try:
            method_name, kwargs = _read_request(transport.reader, self._ipc_validation, self._external_config)
        except pa.ArrowInvalid as exc:
            with contextlib.suppress(BrokenPipeError, OSError):
                _write_error_stream(transport.writer, _EMPTY_SCHEMA, exc, server_id=self._server_id)
            raise
        except (VersionError, RpcError) as exc:
            with contextlib.suppress(BrokenPipeError, OSError):
                _write_error_stream(transport.writer, _EMPTY_SCHEMA, exc, server_id=self._server_id)
            return

        info = self._methods.get(method_name)
        if info is None:
            available = sorted(self._methods.keys())
            _write_error_stream(
                transport.writer,
                _EMPTY_SCHEMA,
                MethodNotImplementedError(f"Unknown method: '{method_name}'. Available methods: {available}"),
                server_id=self._server_id,
            )
            return

        # Application-protocol-version gate. Fires only when the Protocol
        # declared a ``protocol_version`` ClassVar. ``__describe__`` is
        # exempt: it is the diagnostic path a mismatched client uses to
        # introspect the server's version. Failure here writes a typed
        # error stream and returns so the serve loop continues.
        if self._protocol_version_parts is not None and method_name != "__describe__":
            try:
                md = _current_request_metadata.get()
                self._check_protocol_version(md.get(PROTOCOL_VERSION_KEY) if md is not None else None)
            except ProtocolVersionError as exc:
                err_schema = info.result_schema if info.method_type == MethodType.UNARY else _EMPTY_SCHEMA
                _write_error_stream(transport.writer, err_schema, exc, server_id=self._server_id)
                return

        _deserialize_params(kwargs, info.param_types, self._ipc_validation)

        try:
            _validate_params(info.name, kwargs, info.param_types)
        except TypeError as exc:
            err_schema = info.result_schema if info.method_type == MethodType.UNARY else _EMPTY_SCHEMA
            _write_error_stream(transport.writer, err_schema, exc, server_id=self._server_id)
            return

        # Determine SHM segment: prefer transport-level, fall back to request metadata.
        # ``_maybe_attach_shm`` rejects the dynamic path for HTTP — defence-in-depth
        # since HTTP doesn't currently invoke ``serve_one``.
        shm = transport.shm if isinstance(transport, ShmPipeTransport) else None
        if shm is None:
            dynamic_shm = _maybe_attach_shm(_current_request_metadata.get(), self._transport_kind)
            shm = dynamic_shm

        if info.method_type == MethodType.UNARY:
            self._serve_unary(transport, info, kwargs, stats=stats, shm=shm)
        elif info.method_type == MethodType.STREAM:
            self._serve_stream(transport, info, kwargs, stats=stats, shm=shm)
    finally:
        if dynamic_shm is not None:
            with contextlib.suppress(BufferError):
                dynamic_shm.close()
        _current_stream_id.reset(sid_token)
        _current_request_batch.reset(rb_token)
        _current_request_metadata.reset(md_token)
        _current_call_stats.reset(stats_token)
        _current_request_id.reset(token)

RpcConnection

RpcConnection

RpcConnection(
    protocol: type[P],
    transport: RpcTransport,
    on_log: Callable[[Message], None] | None = None,
    *,
    external_location: ExternalLocationConfig | None = None,
    ipc_validation: IpcValidation = FULL
)

Context manager that provides a typed RPC proxy over a transport.

The type parameter P is the Protocol class, enabling IDE autocompletion for all methods defined on the protocol::

with RpcConnection(MyProtocol, transport) as svc:
    result = svc.add(a=1, b=2)   # IDE sees MyProtocol methods

Initialize with a protocol type and transport.

Source code in vgi_rpc/rpc/_client.py
def __init__(
    self,
    protocol: type[P],
    transport: RpcTransport,
    on_log: Callable[[Message], None] | None = None,
    *,
    external_location: ExternalLocationConfig | None = None,
    ipc_validation: IpcValidation = IpcValidation.FULL,
) -> None:
    """Initialize with a protocol type and transport."""
    self._protocol = protocol
    self._transport = transport
    self._on_log = on_log
    self._external_config = external_location
    self._ipc_validation = ipc_validation

__enter__

__enter__() -> P

Enter the context and return a typed proxy.

Source code in vgi_rpc/rpc/_client.py
def __enter__(self) -> P:
    """Enter the context and return a typed proxy."""
    if wire_transport_logger.isEnabledFor(logging.DEBUG):
        wire_transport_logger.debug("RpcConnection open: protocol=%s", self._protocol.__name__)
    return cast(
        P,
        _RpcProxy(
            self._protocol,
            self._transport,
            self._on_log,
            external_config=self._external_config,
            ipc_validation=self._ipc_validation,
        ),
    )

__exit__

__exit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Close the transport.

Source code in vgi_rpc/rpc/_client.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Close the transport."""
    if wire_transport_logger.isEnabledFor(logging.DEBUG):
        wire_transport_logger.debug("RpcConnection close: protocol=%s", self._protocol.__name__)
    self._transport.close()

RpcTransport

RpcTransport

Bases: Protocol

Bidirectional byte stream transport.

reader property

reader: IOBase

Readable binary stream.

writer property

writer: IOBase

Writable binary stream.

close

close() -> None

Close the transport.

Source code in vgi_rpc/rpc/_transport.py
def close(self) -> None:
    """Close the transport."""
    ...

RpcMethodInfo

RpcMethodInfo dataclass

RpcMethodInfo(
    name: str,
    params_schema: Schema,
    result_schema: Schema,
    result_type: object,
    method_type: MethodType,
    has_return: bool,
    doc: str | None,
    param_defaults: dict[str, object] = dict(),
    param_types: dict[str, object] = dict(),
    param_docs: dict[str, str] = dict(),
    header_type: (
        type[ArrowSerializableDataclass] | None
    ) = None,
    is_exchange: bool | None = None,
)

Metadata for a single RPC method, derived from Protocol type hints.

Produced by :func:rpc_methods when introspecting a Protocol class. Each instance describes one method's wire-protocol details: its Arrow schemas, parameter types and defaults, and the original docstring.

ATTRIBUTE DESCRIPTION
name

Method name as it appears on the Protocol.

TYPE: str

params_schema

Arrow schema for the serialized request parameters.

TYPE: Schema

result_schema

Arrow schema for the serialized response (unary only; empty schema for stream methods).

TYPE: Schema

result_type

The raw Python return-type annotation (e.g. float, Stream[MyState]).

TYPE: object

method_type

Whether this is a UNARY or STREAM call.

TYPE: MethodType

has_return

True when the unary method returns a value (False for -> None or stream methods).

TYPE: bool

doc

The method's docstring from the Protocol class, or None if no docstring was provided.

TYPE: str | None

param_defaults

Mapping of parameter name to default value for parameters that have defaults in the Protocol signature.

TYPE: dict[str, object]

param_types

Mapping of parameter name to its Python type annotation (excludes self and return).

TYPE: dict[str, object]

header_type

For stream methods with a header, the concrete ArrowSerializableDataclass subclass for the header. None when the method has no header.

TYPE: type[ArrowSerializableDataclass] | None

is_exchange

For stream methods, True if the state class extends ExchangeState (client must send input), False if it extends ProducerState (server-initiated), None if the state class uses raw StreamState (unknown). Always None for unary methods.

TYPE: bool | None

MethodType

MethodType

Bases: Enum

Classification of RPC method patterns.

Errors

RpcError

RpcError(
    error_type: str,
    error_message: str,
    remote_traceback: str,
    *,
    request_id: str = ""
)

Bases: Exception

Raised on the client side when the server reports an error.

Initialize with error details from the remote side.

Source code in vgi_rpc/rpc/_common.py
def __init__(self, error_type: str, error_message: str, remote_traceback: str, *, request_id: str = "") -> None:
    """Initialize with error details from the remote side."""
    self.error_type = error_type
    self.error_message = error_message
    self.remote_traceback = remote_traceback
    self.request_id = request_id
    super().__init__(f"{error_type}: {error_message}")

VersionError

Bases: Exception

Raised when a request has a missing or incompatible protocol version.

Typed marker errors

These exception classes carry a stable error_kind class attribute that the wire serializer surfaces as the vgi_rpc.error_kind metadata key on the EXCEPTION-level batch. Clients can pattern-match the kind instead of substring-searching the error message.

MethodNotImplementedError

Bases: AttributeError

Raised server-side when no handler is registered for the requested RPC method.

Subclass of AttributeError so existing except AttributeError callers keep working. Carries a stable error_kind class attribute that the wire serializer surfaces as a top-level vgi_rpc.error_kind metadata key on the EXCEPTION-level error batch, so clients can pattern-match on the kind rather than substring-searching the message text.

Used by callers that want to detect "old server doesn't know this method" cleanly (e.g. capability detection + fallback to a legacy RPC method).

SessionLostError

Bases: Exception

Raised server-side when a sticky session token cannot be honoured.

Surfaced over the wire with error_kind="session_lost" so clients can pattern-match the kind without substring-searching the message. Causes include: token presented to a different worker than the one that minted it (server_id mismatch), the registry entry aged out via TTL eviction, AAD mismatch (cross-principal replay), or any other validation failure.

Sticky session machinery is HTTP-only; this error never originates from pipe/unix transports.

ServerDrainingError

Bases: Exception

Raised server-side when a sticky-enabled worker is draining and refuses new sessions.

Surfaced over the wire with error_kind="server_draining". Existing sessions continue to serve through TTL or explicit close; only new ctx.open_session calls are rejected. Operators trigger drain via RpcServer.drain() (typically from a SIGTERM handler) ahead of deploy-time worker rotation.

See also IPCError in the Serialization module.

CallStatistics

CallStatistics dataclass

CallStatistics(
    input_batches: int = 0,
    output_batches: int = 0,
    input_rows: int = 0,
    output_rows: int = 0,
    input_bytes: int = 0,
    output_bytes: int = 0,
)

Mutable accumulator of per-call I/O counters for usage accounting.

Created at dispatch start and populated as batches flow through the server. Surfaced through the access log and OTel dispatch hook.

Byte measurement: uses pa.RecordBatch.get_total_buffer_size() which reports logical Arrow buffer sizes (O(columns), negligible cost). This is an approximation — it does not include IPC framing overhead (padding, schema messages, EOS markers).

ATTRIBUTE DESCRIPTION
input_batches

Number of input batches read by the server.

TYPE: int

output_batches

Number of output batches written by the server.

TYPE: int

input_rows

Total rows across all input batches.

TYPE: int

output_rows

Total rows across all output batches.

TYPE: int

input_bytes

Approximate logical bytes across all input batches.

TYPE: int

output_bytes

Approximate logical bytes across all output batches.

TYPE: int

record_input

record_input(batch: RecordBatch) -> None

Record an input batch's row count and buffer size.

Source code in vgi_rpc/rpc/_common.py
def record_input(self, batch: pa.RecordBatch) -> None:
    """Record an input batch's row count and buffer size."""
    self.input_batches += 1
    self.input_rows += batch.num_rows
    self.input_bytes += batch.get_total_buffer_size()

record_output

record_output(batch: RecordBatch) -> None

Record an output batch's row count and buffer size.

Source code in vgi_rpc/rpc/_common.py
def record_output(self, batch: pa.RecordBatch) -> None:
    """Record an output batch's row count and buffer size."""
    self.output_batches += 1
    self.output_rows += batch.num_rows
    self.output_bytes += batch.get_total_buffer_size()

Convenience Functions

run_server

run_server(
    protocol_or_server: type | RpcServer,
    implementation: object | None = None,
) -> None

Serve RPC requests, defaulting to stdin/stdout pipe transport.

This is the recommended entry point for subprocess workers. Accepts either a (protocol, implementation) pair or a pre-built RpcServer.

The function parses sys.argv and supports the following CLI flags:

  • --http — Serve over HTTP instead of stdin/stdout (requires vgi-rpc[http]).
  • --host HOST — HTTP bind address (default 127.0.0.1).
  • --port PORT — HTTP port (default 0, auto-select).
  • --describe — Enable the __describe__ introspection method.
  • --access-log PATH — Append JSONL access log records to PATH. The cross-language conformance contract requires every worker to accept this flag; see docs/access-log-spec.md.
  • --max-response-bytes N — HTTP-only. Cap the outgoing HTTP body of every method response at N bytes (including IPC framing). For producer streams, controls when the framework mints a continuation token to split the response across multiple HTTP turns. Default: no body cap. Env: VGI_RPC_MAX_RESPONSE_BYTES.
  • --max-externalized-response-bytes N — HTTP-only. Cap the total bytes uploaded to external storage during one HTTP response. Default: unbounded. Env: VGI_RPC_MAX_EXTERNALIZED_RESPONSE_BYTES.
  • --max-stream-response-bytes NDeprecated; alias for --max-response-bytes.

Without --http the server runs over stdin/stdout pipes (the default, suitable for SubprocessTransport).

PARAMETER DESCRIPTION
protocol_or_server

A Protocol class (requires implementation) or an already-constructed RpcServer.

TYPE: type | RpcServer

implementation

The implementation object. Required when protocol_or_server is a Protocol class; must be None when passing an RpcServer.

TYPE: object | None DEFAULT: None

RAISES DESCRIPTION
TypeError

On invalid argument combinations.

Source code in vgi_rpc/rpc/__init__.py
def run_server(protocol_or_server: type | RpcServer, implementation: object | None = None) -> None:
    """Serve RPC requests, defaulting to stdin/stdout pipe transport.

    This is the recommended entry point for subprocess workers.  Accepts
    either a ``(protocol, implementation)`` pair or a pre-built ``RpcServer``.

    The function parses ``sys.argv`` and supports the following CLI flags:

    - ``--http``  — Serve over HTTP instead of stdin/stdout (requires
      ``vgi-rpc[http]``).
    - ``--host HOST`` — HTTP bind address (default ``127.0.0.1``).
    - ``--port PORT`` — HTTP port (default ``0``, auto-select).
    - ``--describe`` — Enable the ``__describe__`` introspection method.
    - ``--access-log PATH`` — Append JSONL access log records to ``PATH``.
      The cross-language conformance contract requires every worker to
      accept this flag; see ``docs/access-log-spec.md``.
    - ``--max-response-bytes N`` — HTTP-only.  Cap the outgoing HTTP body
      of every method response at ``N`` bytes (including IPC framing).
      For producer streams, controls when the framework mints a
      continuation token to split the response across multiple HTTP
      turns.  Default: no body cap.  Env:
      ``VGI_RPC_MAX_RESPONSE_BYTES``.
    - ``--max-externalized-response-bytes N`` — HTTP-only.  Cap the
      total bytes uploaded to external storage during one HTTP response.
      Default: unbounded.  Env:
      ``VGI_RPC_MAX_EXTERNALIZED_RESPONSE_BYTES``.
    - ``--max-stream-response-bytes N`` — **Deprecated**; alias for
      ``--max-response-bytes``.

    Without ``--http`` the server runs over stdin/stdout pipes (the
    default, suitable for ``SubprocessTransport``).

    Args:
        protocol_or_server: A Protocol class (requires *implementation*) or
            an already-constructed ``RpcServer``.
        implementation: The implementation object.  Required when
            *protocol_or_server* is a Protocol class; must be ``None`` when
            passing an ``RpcServer``.

    Raises:
        TypeError: On invalid argument combinations.

    """
    parser = argparse.ArgumentParser(description="vgi-rpc server")
    transport_mode = parser.add_mutually_exclusive_group()
    transport_mode.add_argument(
        "--http", action="store_true", default=False, help="Serve over HTTP instead of stdin/stdout"
    )
    transport_mode.add_argument(
        "--unix",
        metavar="PATH",
        default=None,
        help=(
            "Bind to this Unix domain socket path instead of stdin/stdout. "
            "Mutually exclusive with --http.  When set, --threaded defaults to True "
            "and --idle-timeout governs self-shutdown."
        ),
    )
    parser.add_argument(
        "--idle-timeout",
        type=float,
        default=float(os.environ.get("VGI_RPC_IDLE_TIMEOUT", "300")),
        help=(
            "Self-terminate after this many seconds with zero active connections. "
            "Only meaningful with --unix.  A startup-grace period of max(idle_timeout, 60) "
            "protects against shutdown before the first client.  0 disables.  "
            "Env: VGI_RPC_IDLE_TIMEOUT (default 300)."
        ),
    )
    parser.add_argument(
        "--threaded",
        action=argparse.BooleanOptionalAction,
        default=None,
        help=(
            "Serve each Unix-socket connection in a separate daemon thread "
            "(only meaningful with --unix; default True when --unix is set)."
        ),
    )
    parser.add_argument("--host", default="127.0.0.1", help="HTTP bind address (default: 127.0.0.1)")
    parser.add_argument("--port", type=int, default=0, help="HTTP port (default: auto-select)")
    parser.add_argument(
        "--describe", action="store_true", default=False, help="Enable __describe__ introspection method"
    )
    parser.add_argument(
        "--access-log",
        metavar="PATH",
        default=os.environ.get("VGI_RPC_ACCESS_LOG"),
        help=(
            "Append JSONL access log records to PATH (vgi_rpc.access logger at INFO). "
            "PATH may contain {pid} and {server_id} placeholders. "
            "Env: VGI_RPC_ACCESS_LOG."
        ),
    )
    parser.add_argument(
        "--access-log-max-bytes",
        type=int,
        default=int(os.environ.get("VGI_RPC_ACCESS_LOG_MAX_BYTES", "0")),
        help="Rotate the access log at this size in bytes (0 = no rotation). Env: VGI_RPC_ACCESS_LOG_MAX_BYTES.",
    )
    parser.add_argument(
        "--access-log-backup-count",
        type=int,
        default=int(os.environ.get("VGI_RPC_ACCESS_LOG_BACKUP_COUNT", "5")),
        help="Number of rotated access-log files to retain. Env: VGI_RPC_ACCESS_LOG_BACKUP_COUNT.",
    )
    parser.add_argument(
        "--access-log-when",
        default=os.environ.get("VGI_RPC_ACCESS_LOG_WHEN"),
        help=(
            "Time-based rotation interval (e.g. 'H', 'D', 'midnight'); mutually exclusive with "
            "--access-log-max-bytes. Env: VGI_RPC_ACCESS_LOG_WHEN."
        ),
    )
    parser.add_argument(
        "--access-log-max-record-bytes",
        type=int,
        default=int(os.environ.get("VGI_RPC_ACCESS_LOG_MAX_RECORD_BYTES", "1048576")),
        help=(
            "Maximum size in bytes of one access-log record (default 1048576 = 1 MiB). "
            "Env: VGI_RPC_ACCESS_LOG_MAX_RECORD_BYTES."
        ),
    )
    parser.add_argument(
        "--max-response-bytes",
        type=int,
        default=int(os.environ.get("VGI_RPC_MAX_RESPONSE_BYTES", "0")) or None,
        help=(
            "HTTP-only.  Cap the outgoing HTTP body of every method response "
            "at this many bytes (including IPC framing).  For producer streams, "
            "controls when the framework mints a continuation token to split "
            "the response across HTTP turns.  Default: no body cap.  "
            "Env: VGI_RPC_MAX_RESPONSE_BYTES."
        ),
    )
    parser.add_argument(
        "--max-externalized-response-bytes",
        type=int,
        default=int(os.environ.get("VGI_RPC_MAX_EXTERNALIZED_RESPONSE_BYTES", "0")) or None,
        help=(
            "HTTP-only.  Cap the total bytes uploaded to external storage "
            "during one HTTP response (one producer turn or one unary/exchange "
            "call).  Default: unbounded.  Env: "
            "VGI_RPC_MAX_EXTERNALIZED_RESPONSE_BYTES."
        ),
    )
    parser.add_argument(
        "--max-stream-response-bytes",
        type=int,
        default=int(os.environ.get("VGI_RPC_MAX_STREAM_RESPONSE_BYTES", "0")) or None,
        help=("Deprecated alias for --max-response-bytes.  Env: VGI_RPC_MAX_STREAM_RESPONSE_BYTES (deprecated)."),
    )
    args = parser.parse_args()

    # Deprecation alias
    if args.max_stream_response_bytes is not None:
        if args.max_response_bytes is not None:
            raise SystemExit("Pass either --max-response-bytes or --max-stream-response-bytes, not both")
        print(
            "warning: --max-stream-response-bytes is deprecated; use --max-response-bytes instead.",
            file=sys.stderr,
        )
        args.max_response_bytes = args.max_stream_response_bytes

    if args.access_log_max_bytes and args.access_log_when:
        raise SystemExit("--access-log-max-bytes and --access-log-when are mutually exclusive")

    if args.unix is not None:
        http_only_violations: list[str] = []
        if args.access_log:
            http_only_violations.append("--access-log")
        if args.max_response_bytes is not None:
            http_only_violations.append("--max-response-bytes")
        if args.max_externalized_response_bytes is not None:
            http_only_violations.append("--max-externalized-response-bytes")
        if http_only_violations:
            raise SystemExit(
                f"{', '.join(http_only_violations)} {'are' if len(http_only_violations) > 1 else 'is'} "
                "HTTP-only and cannot be combined with --unix"
            )

    if isinstance(protocol_or_server, RpcServer):
        if implementation is not None:
            raise TypeError("implementation must be None when passing an RpcServer")
        server = protocol_or_server
    elif isinstance(protocol_or_server, type):
        if implementation is None:
            raise TypeError("implementation is required when passing a Protocol class")
        server = RpcServer(protocol_or_server, implementation, enable_describe=args.describe)
    else:
        raise TypeError(f"Expected a Protocol class or RpcServer, got {type(protocol_or_server).__name__}")

    if args.access_log:
        _configure_access_log(
            path=args.access_log,
            max_bytes=args.access_log_max_bytes,
            backup_count=args.access_log_backup_count,
            when=args.access_log_when,
            max_record_bytes=args.access_log_max_record_bytes,
            server_id=server.server_id,
        )

    if args.http:
        try:
            from vgi_rpc.http import serve_http
        except ImportError:
            print("HTTP transport requires vgi-rpc[http]: pip install vgi-rpc[http]", file=sys.stderr)
            sys.exit(1)
        serve_http(
            server,
            host=args.host,
            port=args.port,
            max_response_bytes=args.max_response_bytes,
            max_externalized_response_bytes=args.max_externalized_response_bytes,
        )
    elif args.unix is not None:
        threaded = True if args.threaded is None else args.threaded
        idle_timeout: float | None = args.idle_timeout if args.idle_timeout > 0 else None

        if sys.platform == "win32":
            # CPython has no AF_UNIX on Windows; --unix carries a named-pipe name
            # (\\.\pipe\...) which the launcher constructs. Serve over a named
            # pipe and advertise it with the PIPE: discovery prefix (the C++
            # launcher matches on that prefix per docs/launcher-protocol.md).
            pipe_name = args.unix

            def _emit_discovery_line(bound_path: str) -> None:
                print(f"PIPE:{bound_path}", flush=True)

            serve_named_pipe(
                server,
                pipe_name,
                threaded=threaded,
                idle_timeout=idle_timeout,
                on_bound=_emit_discovery_line,
            )
        else:
            absolute_path = os.path.abspath(args.unix)

            def _emit_discovery_line(bound_path: str) -> None:
                # Mirrors the PORT:<n> convention used by the HTTP transport.  After
                # this line the worker MUST NOT write further data to stdout — see
                # the cross-language launcher contract.
                print(f"UNIX:{bound_path}", flush=True)

            serve_unix(
                server,
                absolute_path,
                threaded=threaded,
                idle_timeout=idle_timeout,
                on_bound=_emit_discovery_line,
            )
    else:
        serve_stdio(server)

connect

connect(
    protocol: type[P],
    cmd: list[str],
    *,
    on_log: Callable[[Message], None] | None = None,
    external_location: ExternalLocationConfig | None = None,
    stderr: StderrMode = INHERIT,
    stderr_logger: Logger | None = None,
    ipc_validation: IpcValidation = FULL
) -> Iterator[P]

Connect to a subprocess RPC server.

Context manager that spawns a subprocess, yields a typed proxy, and cleans up on exit.

PARAMETER DESCRIPTION
protocol

The Protocol class defining the RPC interface.

TYPE: type[P]

cmd

Command to spawn the subprocess worker.

TYPE: list[str]

on_log

Optional callback for log messages from the server.

TYPE: Callable[[Message], None] | None DEFAULT: None

external_location

Optional ExternalLocation configuration for resolving and producing externalized batches.

TYPE: ExternalLocationConfig | None DEFAULT: None

stderr

How to handle the child's stderr stream (see :class:StderrMode).

TYPE: StderrMode DEFAULT: INHERIT

stderr_logger

Logger for StderrMode.PIPE output; ignored for other modes. Defaults to logging.getLogger("vgi_rpc.subprocess.stderr").

TYPE: Logger | None DEFAULT: None

ipc_validation

Validation level for incoming IPC batches.

TYPE: IpcValidation DEFAULT: FULL

YIELDS DESCRIPTION
P

A typed RPC proxy supporting all methods defined on protocol.

Source code in vgi_rpc/rpc/__init__.py
@contextlib.contextmanager
def connect[P](
    protocol: type[P],
    cmd: list[str],
    *,
    on_log: Callable[[Message], None] | None = None,
    external_location: ExternalLocationConfig | None = None,
    stderr: StderrMode = StderrMode.INHERIT,
    stderr_logger: logging.Logger | None = None,
    ipc_validation: IpcValidation = IpcValidation.FULL,
) -> Iterator[P]:
    """Connect to a subprocess RPC server.

    Context manager that spawns a subprocess, yields a typed proxy, and
    cleans up on exit.

    Args:
        protocol: The Protocol class defining the RPC interface.
        cmd: Command to spawn the subprocess worker.
        on_log: Optional callback for log messages from the server.
        external_location: Optional ExternalLocation configuration for
            resolving and producing externalized batches.
        stderr: How to handle the child's stderr stream (see :class:`StderrMode`).
        stderr_logger: Logger for ``StderrMode.PIPE`` output; ignored for
            other modes.  Defaults to
            ``logging.getLogger("vgi_rpc.subprocess.stderr")``.
        ipc_validation: Validation level for incoming IPC batches.

    Yields:
        A typed RPC proxy supporting all methods defined on *protocol*.

    """
    transport = SubprocessTransport(cmd, stderr=stderr, stderr_logger=stderr_logger)
    try:
        with RpcConnection(
            protocol, transport, on_log=on_log, external_location=external_location, ipc_validation=ipc_validation
        ) as proxy:
            yield proxy
    finally:
        transport.close()

serve_pipe

serve_pipe(
    protocol: type[P],
    implementation: object,
    *,
    on_log: Callable[[Message], None] | None = None,
    external_location: ExternalLocationConfig | None = None,
    ipc_validation: IpcValidation | None = None
) -> Iterator[P]

Start an in-process pipe server and yield a typed client proxy.

Useful for tests and demos — no subprocess needed. A background thread runs RpcServer.serve() on the server side of a pipe pair.

PARAMETER DESCRIPTION
protocol

The Protocol class defining the RPC interface.

TYPE: type[P]

implementation

The implementation object.

TYPE: object

on_log

Optional callback for log messages from the server.

TYPE: Callable[[Message], None] | None DEFAULT: None

external_location

Optional ExternalLocation configuration for resolving and producing externalized batches.

TYPE: ExternalLocationConfig | None DEFAULT: None

ipc_validation

Validation level for incoming IPC batches. When None (the default), both components use IpcValidation.FULL.

TYPE: IpcValidation | None DEFAULT: None

YIELDS DESCRIPTION
P

A typed RPC proxy supporting all methods defined on protocol.

Source code in vgi_rpc/rpc/__init__.py
@contextlib.contextmanager
def serve_pipe[P](
    protocol: type[P],
    implementation: object,
    *,
    on_log: Callable[[Message], None] | None = None,
    external_location: ExternalLocationConfig | None = None,
    ipc_validation: IpcValidation | None = None,
) -> Iterator[P]:
    """Start an in-process pipe server and yield a typed client proxy.

    Useful for tests and demos — no subprocess needed.  A background thread
    runs ``RpcServer.serve()`` on the server side of a pipe pair.

    Args:
        protocol: The Protocol class defining the RPC interface.
        implementation: The implementation object.
        on_log: Optional callback for log messages from the server.
        external_location: Optional ExternalLocation configuration for
            resolving and producing externalized batches.
        ipc_validation: Validation level for incoming IPC batches.
            When ``None`` (the default), both components use
            ``IpcValidation.FULL``.

    Yields:
        A typed RPC proxy supporting all methods defined on *protocol*.

    """
    client_transport, server_transport = make_pipe_pair()
    server = RpcServer(
        protocol,
        implementation,
        external_location=external_location,
        ipc_validation=ipc_validation if ipc_validation is not None else IpcValidation.FULL,
    )
    thread = threading.Thread(target=server.serve, args=(server_transport,), daemon=True)
    thread.start()
    try:
        with RpcConnection(
            protocol,
            client_transport,
            on_log=on_log,
            external_location=external_location,
            ipc_validation=ipc_validation if ipc_validation is not None else IpcValidation.FULL,
        ) as proxy:
            yield proxy
    finally:
        client_transport.close()
        thread.join(timeout=5)
        server_transport.close()

describe_rpc

describe_rpc(
    protocol: type,
    *,
    methods: Mapping[str, RpcMethodInfo] | None = None
) -> str

Return a human-readable description of an RPC protocol's methods.

Source code in vgi_rpc/rpc/__init__.py
def describe_rpc(protocol: type, *, methods: Mapping[str, RpcMethodInfo] | None = None) -> str:
    """Return a human-readable description of an RPC protocol's methods."""
    if methods is None:
        methods = rpc_methods(protocol)
    lines: list[str] = [f"RPC Protocol: {protocol.__name__}", ""]

    for name, info in sorted(methods.items()):
        lines.append(f"  {name}({info.method_type.value})")
        lines.append(f"    params: {info.params_schema}")
        if info.method_type == MethodType.UNARY:
            lines.append(f"    result: {info.result_schema}")
        if info.doc:
            lines.append(f"    doc: {info.doc.strip()}")
        lines.append("")

    return "\n".join(lines)

Constants

REQUEST_VERSION module-attribute

REQUEST_VERSION = b'1'